1// Copyright (c) The Thanos Authors. 2// Licensed under the Apache License 2.0. 3 4package cacheutil 5 6import ( 7 "context" 8 "errors" 9 "sync" 10 "testing" 11 "time" 12 13 "github.com/bradfitz/gomemcache/memcache" 14 "github.com/fortytw2/leaktest" 15 "github.com/go-kit/kit/log" 16 prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" 17 "github.com/thanos-io/thanos/pkg/testutil" 18) 19 20func TestMemcachedClientConfig_validate(t *testing.T) { 21 tests := map[string]struct { 22 config MemcachedClientConfig 23 expected error 24 }{ 25 "should pass on valid config": { 26 config: MemcachedClientConfig{ 27 Addresses: []string{"127.0.0.1:11211"}, 28 }, 29 expected: nil, 30 }, 31 "should fail on no addresses": { 32 config: MemcachedClientConfig{ 33 Addresses: []string{}, 34 }, 35 expected: errMemcachedConfigNoAddrs, 36 }, 37 } 38 39 for testName, testData := range tests { 40 t.Run(testName, func(t *testing.T) { 41 testutil.Equals(t, testData.expected, testData.config.validate()) 42 }) 43 } 44} 45 46func TestNewMemcachedClient(t *testing.T) { 47 defer leaktest.CheckTimeout(t, 10*time.Second)() 48 49 // Should return error on empty YAML config. 50 conf := []byte{} 51 cache, err := NewMemcachedClient(log.NewNopLogger(), "test", conf, nil) 52 testutil.NotOk(t, err) 53 testutil.Equals(t, (*memcachedClient)(nil), cache) 54 55 // Should return error on invalid YAML config. 56 conf = []byte("invalid") 57 cache, err = NewMemcachedClient(log.NewNopLogger(), "test", conf, nil) 58 testutil.NotOk(t, err) 59 testutil.Equals(t, (*memcachedClient)(nil), cache) 60 61 // Should instance a memcached client with minimum YAML config. 62 conf = []byte(` 63addresses: 64 - 127.0.0.1:11211 65 - 127.0.0.2:11211 66`) 67 cache, err = NewMemcachedClient(log.NewNopLogger(), "test", conf, nil) 68 testutil.Ok(t, err) 69 defer cache.Stop() 70 71 testutil.Equals(t, []string{"127.0.0.1:11211", "127.0.0.2:11211"}, cache.config.Addresses) 72 testutil.Equals(t, defaultMemcachedClientConfig.Timeout, cache.config.Timeout) 73 testutil.Equals(t, defaultMemcachedClientConfig.MaxIdleConnections, cache.config.MaxIdleConnections) 74 testutil.Equals(t, defaultMemcachedClientConfig.MaxAsyncConcurrency, cache.config.MaxAsyncConcurrency) 75 testutil.Equals(t, defaultMemcachedClientConfig.MaxAsyncBufferSize, cache.config.MaxAsyncBufferSize) 76 testutil.Equals(t, defaultMemcachedClientConfig.DNSProviderUpdateInterval, cache.config.DNSProviderUpdateInterval) 77 testutil.Equals(t, defaultMemcachedClientConfig.MaxGetMultiConcurrency, cache.config.MaxGetMultiConcurrency) 78 testutil.Equals(t, defaultMemcachedClientConfig.MaxGetMultiBatchSize, cache.config.MaxGetMultiBatchSize) 79 80 // Should instance a memcached client with configured YAML config. 81 conf = []byte(` 82addresses: 83 - 127.0.0.1:11211 84 - 127.0.0.2:11211 85timeout: 1s 86max_idle_connections: 1 87max_async_concurrency: 1 88max_async_buffer_size: 1 89max_get_multi_concurrency: 1 90max_get_multi_batch_size: 1 91dns_provider_update_interval: 1s 92`) 93 cache, err = NewMemcachedClient(log.NewNopLogger(), "test", conf, nil) 94 testutil.Ok(t, err) 95 defer cache.Stop() 96 97 testutil.Equals(t, []string{"127.0.0.1:11211", "127.0.0.2:11211"}, cache.config.Addresses) 98 testutil.Equals(t, 1*time.Second, cache.config.Timeout) 99 testutil.Equals(t, 1, cache.config.MaxIdleConnections) 100 testutil.Equals(t, 1, cache.config.MaxAsyncConcurrency) 101 testutil.Equals(t, 1, cache.config.MaxAsyncBufferSize) 102 testutil.Equals(t, 1*time.Second, cache.config.DNSProviderUpdateInterval) 103 testutil.Equals(t, 1, cache.config.MaxGetMultiConcurrency) 104 testutil.Equals(t, 1, cache.config.MaxGetMultiBatchSize) 105} 106 107func TestMemcachedClient_SetAsync(t *testing.T) { 108 defer leaktest.CheckTimeout(t, 10*time.Second)() 109 110 ctx := context.Background() 111 config := defaultMemcachedClientConfig 112 config.Addresses = []string{"127.0.0.1:11211"} 113 backendMock := newMemcachedClientBackendMock() 114 115 client, err := prepare(config, backendMock) 116 testutil.Ok(t, err) 117 defer client.Stop() 118 119 testutil.Ok(t, client.SetAsync(ctx, "key-1", []byte("value-1"), time.Second)) 120 testutil.Ok(t, client.SetAsync(ctx, "key-2", []byte("value-2"), time.Second)) 121 testutil.Ok(t, backendMock.waitItems(2)) 122 123 testutil.Equals(t, 2.0, prom_testutil.ToFloat64(client.operations.WithLabelValues(opSet))) 124 testutil.Equals(t, 0.0, prom_testutil.ToFloat64(client.operations.WithLabelValues(opGetMulti))) 125 testutil.Equals(t, 0.0, prom_testutil.ToFloat64(client.failures.WithLabelValues(opSet))) 126} 127 128func TestMemcachedClient_GetMulti(t *testing.T) { 129 defer leaktest.CheckTimeout(t, 10*time.Second)() 130 131 tests := map[string]struct { 132 maxBatchSize int 133 maxConcurrency int 134 mockedGetMultiErrors int 135 initialItems []memcache.Item 136 getKeys []string 137 expectedHits map[string][]byte 138 expectedGetMultiCount int 139 }{ 140 "should fetch keys in a single batch if the input keys is <= the max batch size": { 141 maxBatchSize: 2, 142 maxConcurrency: 5, 143 initialItems: []memcache.Item{ 144 {Key: "key-1", Value: []byte("value-1")}, 145 {Key: "key-2", Value: []byte("value-2")}, 146 }, 147 getKeys: []string{"key-1", "key-2"}, 148 expectedHits: map[string][]byte{ 149 "key-1": []byte("value-1"), 150 "key-2": []byte("value-2"), 151 }, 152 expectedGetMultiCount: 1, 153 }, 154 "should fetch keys in multiple batches if the input keys is > the max batch size": { 155 maxBatchSize: 2, 156 maxConcurrency: 5, 157 initialItems: []memcache.Item{ 158 {Key: "key-1", Value: []byte("value-1")}, 159 {Key: "key-2", Value: []byte("value-2")}, 160 {Key: "key-3", Value: []byte("value-3")}, 161 }, 162 getKeys: []string{"key-1", "key-2", "key-3"}, 163 expectedHits: map[string][]byte{ 164 "key-1": []byte("value-1"), 165 "key-2": []byte("value-2"), 166 "key-3": []byte("value-3"), 167 }, 168 expectedGetMultiCount: 2, 169 }, 170 "should fetch keys in multiple batches on input keys exact multiple of batch size": { 171 maxBatchSize: 2, 172 maxConcurrency: 5, 173 initialItems: []memcache.Item{ 174 {Key: "key-1", Value: []byte("value-1")}, 175 {Key: "key-2", Value: []byte("value-2")}, 176 {Key: "key-3", Value: []byte("value-3")}, 177 {Key: "key-4", Value: []byte("value-4")}, 178 }, 179 getKeys: []string{"key-1", "key-2", "key-3", "key-4"}, 180 expectedHits: map[string][]byte{ 181 "key-1": []byte("value-1"), 182 "key-2": []byte("value-2"), 183 "key-3": []byte("value-3"), 184 "key-4": []byte("value-4"), 185 }, 186 expectedGetMultiCount: 2, 187 }, 188 "should fetch keys in multiple batches on input keys exact multiple of batch size with max concurrency disabled (0)": { 189 maxBatchSize: 2, 190 maxConcurrency: 0, 191 initialItems: []memcache.Item{ 192 {Key: "key-1", Value: []byte("value-1")}, 193 {Key: "key-2", Value: []byte("value-2")}, 194 {Key: "key-3", Value: []byte("value-3")}, 195 {Key: "key-4", Value: []byte("value-4")}, 196 }, 197 getKeys: []string{"key-1", "key-2", "key-3", "key-4"}, 198 expectedHits: map[string][]byte{ 199 "key-1": []byte("value-1"), 200 "key-2": []byte("value-2"), 201 "key-3": []byte("value-3"), 202 "key-4": []byte("value-4"), 203 }, 204 expectedGetMultiCount: 2, 205 }, 206 "should fetch keys in multiple batches on input keys exact multiple of batch size with max concurrency lower than the batches": { 207 maxBatchSize: 1, 208 maxConcurrency: 1, 209 initialItems: []memcache.Item{ 210 {Key: "key-1", Value: []byte("value-1")}, 211 {Key: "key-2", Value: []byte("value-2")}, 212 {Key: "key-3", Value: []byte("value-3")}, 213 {Key: "key-4", Value: []byte("value-4")}, 214 }, 215 getKeys: []string{"key-1", "key-2", "key-3", "key-4"}, 216 expectedHits: map[string][]byte{ 217 "key-1": []byte("value-1"), 218 "key-2": []byte("value-2"), 219 "key-3": []byte("value-3"), 220 "key-4": []byte("value-4"), 221 }, 222 expectedGetMultiCount: 4, 223 }, 224 "should fetch keys in a single batch if max batch size is disabled (0)": { 225 maxBatchSize: 0, 226 maxConcurrency: 5, 227 initialItems: []memcache.Item{ 228 {Key: "key-1", Value: []byte("value-1")}, 229 {Key: "key-2", Value: []byte("value-2")}, 230 {Key: "key-3", Value: []byte("value-3")}, 231 {Key: "key-4", Value: []byte("value-4")}, 232 }, 233 getKeys: []string{"key-1", "key-2", "key-3", "key-4"}, 234 expectedHits: map[string][]byte{ 235 "key-1": []byte("value-1"), 236 "key-2": []byte("value-2"), 237 "key-3": []byte("value-3"), 238 "key-4": []byte("value-4"), 239 }, 240 expectedGetMultiCount: 1, 241 }, 242 "should fetch keys in a single batch if max batch size is disabled (0) and max concurrency is disabled (0)": { 243 maxBatchSize: 0, 244 maxConcurrency: 0, 245 initialItems: []memcache.Item{ 246 {Key: "key-1", Value: []byte("value-1")}, 247 {Key: "key-2", Value: []byte("value-2")}, 248 {Key: "key-3", Value: []byte("value-3")}, 249 {Key: "key-4", Value: []byte("value-4")}, 250 }, 251 getKeys: []string{"key-1", "key-2", "key-3", "key-4"}, 252 expectedHits: map[string][]byte{ 253 "key-1": []byte("value-1"), 254 "key-2": []byte("value-2"), 255 "key-3": []byte("value-3"), 256 "key-4": []byte("value-4"), 257 }, 258 expectedGetMultiCount: 1, 259 }, 260 "should return no hits on all keys missing": { 261 maxBatchSize: 2, 262 maxConcurrency: 5, 263 initialItems: []memcache.Item{ 264 {Key: "key-1", Value: []byte("value-1")}, 265 {Key: "key-2", Value: []byte("value-2")}, 266 }, 267 getKeys: []string{"key-1", "key-2", "key-3", "key-4"}, 268 expectedHits: map[string][]byte{ 269 "key-1": []byte("value-1"), 270 "key-2": []byte("value-2"), 271 }, 272 expectedGetMultiCount: 2, 273 }, 274 "should return no hits on partial errors while fetching batches and no items found": { 275 maxBatchSize: 2, 276 maxConcurrency: 5, 277 mockedGetMultiErrors: 1, 278 initialItems: []memcache.Item{ 279 {Key: "key-1", Value: []byte("value-1")}, 280 {Key: "key-2", Value: []byte("value-2")}, 281 {Key: "key-3", Value: []byte("value-3")}, 282 }, 283 getKeys: []string{"key-5", "key-6", "key-7"}, 284 expectedHits: map[string][]byte{}, 285 expectedGetMultiCount: 2, 286 }, 287 "should return no hits on all errors while fetching batches": { 288 maxBatchSize: 2, 289 maxConcurrency: 5, 290 mockedGetMultiErrors: 2, 291 initialItems: []memcache.Item{ 292 {Key: "key-1", Value: []byte("value-1")}, 293 {Key: "key-2", Value: []byte("value-2")}, 294 {Key: "key-3", Value: []byte("value-3")}, 295 }, 296 getKeys: []string{"key-5", "key-6", "key-7"}, 297 expectedHits: nil, 298 expectedGetMultiCount: 2, 299 }, 300 } 301 302 for testName, testData := range tests { 303 t.Run(testName, func(t *testing.T) { 304 ctx := context.Background() 305 config := defaultMemcachedClientConfig 306 config.Addresses = []string{"127.0.0.1:11211"} 307 config.MaxGetMultiBatchSize = testData.maxBatchSize 308 config.MaxGetMultiConcurrency = testData.maxConcurrency 309 310 backendMock := newMemcachedClientBackendMock() 311 backendMock.getMultiErrors = testData.mockedGetMultiErrors 312 313 client, err := prepare(config, backendMock) 314 testutil.Ok(t, err) 315 defer client.Stop() 316 317 // Populate memcached with the initial items. 318 for _, item := range testData.initialItems { 319 testutil.Ok(t, client.SetAsync(ctx, item.Key, item.Value, time.Second)) 320 } 321 322 // Wait until initial items have been added. 323 testutil.Ok(t, backendMock.waitItems(len(testData.initialItems))) 324 325 // Read back the items. 326 testutil.Equals(t, testData.expectedHits, client.GetMulti(ctx, testData.getKeys)) 327 328 // Ensure the client has interacted with the backend as expected. 329 backendMock.lock.Lock() 330 defer backendMock.lock.Unlock() 331 testutil.Equals(t, testData.expectedGetMultiCount, backendMock.getMultiCount) 332 333 // Ensure metrics are tracked. 334 testutil.Equals(t, float64(testData.expectedGetMultiCount), prom_testutil.ToFloat64(client.operations.WithLabelValues(opGetMulti))) 335 testutil.Equals(t, float64(testData.mockedGetMultiErrors), prom_testutil.ToFloat64(client.failures.WithLabelValues(opGetMulti))) 336 }) 337 } 338} 339 340func prepare(config MemcachedClientConfig, backendMock *memcachedClientBackendMock) (*memcachedClient, error) { 341 logger := log.NewNopLogger() 342 selector := &MemcachedJumpHashSelector{} 343 client, err := newMemcachedClient(logger, "test", backendMock, selector, config, nil) 344 345 return client, err 346} 347 348type memcachedClientBackendMock struct { 349 lock sync.Mutex 350 items map[string]*memcache.Item 351 getMultiCount int 352 getMultiErrors int 353} 354 355func newMemcachedClientBackendMock() *memcachedClientBackendMock { 356 return &memcachedClientBackendMock{ 357 items: map[string]*memcache.Item{}, 358 } 359} 360 361func (c *memcachedClientBackendMock) GetMulti(keys []string) (map[string]*memcache.Item, error) { 362 c.lock.Lock() 363 defer c.lock.Unlock() 364 365 c.getMultiCount++ 366 if c.getMultiCount <= c.getMultiErrors { 367 return nil, errors.New("mocked GetMulti error") 368 } 369 370 items := make(map[string]*memcache.Item) 371 for _, key := range keys { 372 if item, ok := c.items[key]; ok { 373 items[key] = item 374 } 375 } 376 377 return items, nil 378} 379 380func (c *memcachedClientBackendMock) Set(item *memcache.Item) error { 381 c.lock.Lock() 382 defer c.lock.Unlock() 383 384 c.items[item.Key] = item 385 386 return nil 387} 388 389func (c *memcachedClientBackendMock) waitItems(expected int) error { 390 deadline := time.Now().Add(1 * time.Second) 391 392 for time.Now().Before(deadline) { 393 c.lock.Lock() 394 count := len(c.items) 395 c.lock.Unlock() 396 397 if count >= expected { 398 return nil 399 } 400 } 401 402 return errors.New("timeout expired while waiting for items in the memcached mock") 403} 404