1import os
2from threading import Thread
3import time
4from unittest import TestCase
5import weakref
6
7import pytest
8
9from dogpile.cache.backends.memcached import GenericMemcachedBackend
10from dogpile.cache.backends.memcached import MemcachedBackend
11from dogpile.cache.backends.memcached import PylibmcBackend
12from . import eq_
13from ._fixtures import _GenericBackendTest
14from ._fixtures import _GenericMutexTest
15
16
17MEMCACHED_PORT = os.getenv("DOGPILE_MEMCACHED_PORT", "11211")
18MEMCACHED_URL = "127.0.0.1:%s" % MEMCACHED_PORT
19expect_memcached_running = bool(os.getenv("DOGPILE_MEMCACHED_PORT"))
20
21LOCK_TIMEOUT = 1
22
23
24class _TestMemcachedConn(object):
25    @classmethod
26    def _check_backend_available(cls, backend):
27        try:
28            client = backend._create_client()
29            client.set("x", "y")
30            assert client.get("x") == "y"
31        except Exception:
32            if not expect_memcached_running:
33                pytest.skip(
34                    "memcached is not running or "
35                    "otherwise not functioning correctly"
36                )
37            else:
38                raise
39
40
41class _NonDistributedMemcachedTest(_TestMemcachedConn, _GenericBackendTest):
42    region_args = {"key_mangler": lambda x: x.replace(" ", "_")}
43    config_args = {"arguments": {"url": MEMCACHED_URL}}
44
45
46class _DistributedMemcachedWithTimeoutTest(
47    _TestMemcachedConn, _GenericBackendTest
48):
49    region_args = {"key_mangler": lambda x: x.replace(" ", "_")}
50    config_args = {
51        "arguments": {
52            "url": MEMCACHED_URL,
53            "distributed_lock": True,
54            "lock_timeout": LOCK_TIMEOUT,
55        }
56    }
57
58
59class _DistributedMemcachedTest(_TestMemcachedConn, _GenericBackendTest):
60    region_args = {"key_mangler": lambda x: x.replace(" ", "_")}
61    config_args = {
62        "arguments": {"url": MEMCACHED_URL, "distributed_lock": True}
63    }
64
65
66class _DistributedMemcachedMutexTest(_TestMemcachedConn, _GenericMutexTest):
67    config_args = {
68        "arguments": {"url": MEMCACHED_URL, "distributed_lock": True}
69    }
70
71
72class _DistributedMemcachedMutexWithTimeoutTest(
73    _TestMemcachedConn, _GenericMutexTest
74):
75    config_args = {
76        "arguments": {
77            "url": MEMCACHED_URL,
78            "distributed_lock": True,
79            "lock_timeout": LOCK_TIMEOUT,
80        }
81    }
82
83
84class PylibmcTest(_NonDistributedMemcachedTest):
85    backend = "dogpile.cache.pylibmc"
86
87
88class PylibmcDistributedTest(_DistributedMemcachedTest):
89    backend = "dogpile.cache.pylibmc"
90
91
92class PylibmcDistributedMutexTest(_DistributedMemcachedMutexTest):
93    backend = "dogpile.cache.pylibmc"
94
95
96class BMemcachedSkips(object):
97    def test_threaded_dogpile(self):
98        pytest.skip("bmemcached is too unreliable here")
99
100    def test_threaded_get_multi(self):
101        pytest.skip("bmemcached is too unreliable here")
102
103    def test_mutex_threaded_dogpile(self):
104        pytest.skip("bmemcached is too unreliable here")
105
106    def test_mutex_threaded(self):
107        pytest.skip("bmemcached is too unreliable here")
108
109
110class BMemcachedTest(BMemcachedSkips, _NonDistributedMemcachedTest):
111    backend = "dogpile.cache.bmemcached"
112
113
114class BMemcachedDistributedWithTimeoutTest(
115    BMemcachedSkips, _DistributedMemcachedWithTimeoutTest
116):
117    backend = "dogpile.cache.bmemcached"
118
119
120class BMemcachedDistributedTest(BMemcachedSkips, _DistributedMemcachedTest):
121    backend = "dogpile.cache.bmemcached"
122
123
124class BMemcachedDistributedMutexTest(
125    BMemcachedSkips, _DistributedMemcachedMutexTest
126):
127    backend = "dogpile.cache.bmemcached"
128
129
130class BMemcachedDistributedMutexWithTimeoutTest(
131    BMemcachedSkips, _DistributedMemcachedMutexWithTimeoutTest
132):
133    backend = "dogpile.cache.bmemcached"
134
135
136class MemcachedTest(_NonDistributedMemcachedTest):
137    backend = "dogpile.cache.memcached"
138
139
140class MemcachedDistributedTest(_DistributedMemcachedTest):
141    backend = "dogpile.cache.memcached"
142
143
144class MemcachedDistributedMutexTest(_DistributedMemcachedMutexTest):
145    backend = "dogpile.cache.memcached"
146
147
148class MockGenericMemcachedBackend(GenericMemcachedBackend):
149    def _imports(self):
150        pass
151
152    def _create_client(self):
153        return MockClient(self.url)
154
155
156class MockMemcacheBackend(MemcachedBackend):
157    def _imports(self):
158        pass
159
160    def _create_client(self):
161        return MockClient(self.url)
162
163
164class MockPylibmcBackend(PylibmcBackend):
165    def _imports(self):
166        pass
167
168    def _create_client(self):
169        return MockClient(
170            self.url, binary=self.binary, behaviors=self.behaviors
171        )
172
173
174class MockClient(object):
175    clients = set()
176
177    def __init__(self, *arg, **kw):
178        self.arg = arg
179        self.kw = kw
180        self.canary = []
181        self._cache = {}
182        self.clients.add(weakref.ref(self, MockClient._remove))
183
184    @classmethod
185    def _remove(cls, ref):
186        cls.clients.remove(ref)
187
188    @classmethod
189    def number_of_clients(cls):
190        return len(cls.clients)
191
192    def get(self, key):
193        return self._cache.get(key)
194
195    def set(self, key, value, **kw):
196        self.canary.append(kw)
197        self._cache[key] = value
198
199    def delete(self, key):
200        self._cache.pop(key, None)
201
202
203class PylibmcArgsTest(TestCase):
204    def test_binary_flag(self):
205        backend = MockPylibmcBackend(arguments={"url": "foo", "binary": True})
206        eq_(backend._create_client().kw["binary"], True)
207
208    def test_url_list(self):
209        backend = MockPylibmcBackend(arguments={"url": ["a", "b", "c"]})
210        eq_(backend._create_client().arg[0], ["a", "b", "c"])
211
212    def test_url_scalar(self):
213        backend = MockPylibmcBackend(arguments={"url": "foo"})
214        eq_(backend._create_client().arg[0], ["foo"])
215
216    def test_behaviors(self):
217        backend = MockPylibmcBackend(
218            arguments={"url": "foo", "behaviors": {"q": "p"}}
219        )
220        eq_(backend._create_client().kw["behaviors"], {"q": "p"})
221
222    def test_set_time(self):
223        backend = MockPylibmcBackend(
224            arguments={"url": "foo", "memcached_expire_time": 20}
225        )
226        backend.set("foo", "bar")
227        eq_(backend._clients.memcached.canary, [{"time": 20}])
228
229    def test_set_min_compress_len(self):
230        backend = MockPylibmcBackend(
231            arguments={"url": "foo", "min_compress_len": 20}
232        )
233        backend.set("foo", "bar")
234        eq_(backend._clients.memcached.canary, [{"min_compress_len": 20}])
235
236    def test_no_set_args(self):
237        backend = MockPylibmcBackend(arguments={"url": "foo"})
238        backend.set("foo", "bar")
239        eq_(backend._clients.memcached.canary, [{}])
240
241
242class MemcachedArgstest(TestCase):
243    def test_set_time(self):
244        backend = MockMemcacheBackend(
245            arguments={"url": "foo", "memcached_expire_time": 20}
246        )
247        backend.set("foo", "bar")
248        eq_(backend._clients.memcached.canary, [{"time": 20}])
249
250    def test_set_min_compress_len(self):
251        backend = MockMemcacheBackend(
252            arguments={"url": "foo", "min_compress_len": 20}
253        )
254        backend.set("foo", "bar")
255        eq_(backend._clients.memcached.canary, [{"min_compress_len": 20}])
256
257
258class LocalThreadTest(TestCase):
259    def setUp(self):
260        import gc
261
262        gc.collect()
263        eq_(MockClient.number_of_clients(), 0)
264
265    def test_client_cleanup_1(self):
266        self._test_client_cleanup(1)
267
268    def test_client_cleanup_3(self):
269        self._test_client_cleanup(3)
270
271    def test_client_cleanup_10(self):
272        self._test_client_cleanup(10)
273
274    def _test_client_cleanup(self, count):
275        backend = MockGenericMemcachedBackend(arguments={"url": "foo"})
276        canary = []
277
278        flag = [False]
279
280        def f(delay):
281            backend._clients.memcached
282            canary.append(MockClient.number_of_clients())
283            while not flag[0]:
284                time.sleep(0.02)
285
286        threads = [Thread(target=f, args=(count - i,)) for i in range(count)]
287        for t in threads:
288            t.start()
289        flag[0] = True
290        for t in threads:
291            t.join()
292        eq_(canary, [i + 1 for i in range(count)])
293
294        import gc
295
296        gc.collect()
297        eq_(MockClient.number_of_clients(), 0)
298