1# Copied from python src Python-3.4.0/Lib/test/test_functools.py
2
3import abc
4import collections
5from itertools import permutations
6import pickle
7from random import choice
8import sys
9import unittest
10import fastcache
11import functools
12
13try:
14    from functools import _CacheInfo
15except ImportError:
16    _CacheInfo = collections.namedtuple("CacheInfo",
17                    ["hits", "misses", "maxsize", "currsize"])
18
19class TestLRU(unittest.TestCase):
20
21    def test_lru(self):
22        def orig(x, y):
23            return 3 * x + y
24        f = fastcache.clru_cache(maxsize=20)(orig)
25        hits, misses, maxsize, currsize = f.cache_info()
26        self.assertEqual(maxsize, 20)
27        self.assertEqual(currsize, 0)
28        self.assertEqual(hits, 0)
29        self.assertEqual(misses, 0)
30
31        domain = range(5)
32        for i in range(1000):
33            x, y = choice(domain), choice(domain)
34            actual = f(x, y)
35            expected = orig(x, y)
36            self.assertEqual(actual, expected)
37        hits, misses, maxsize, currsize = f.cache_info()
38        self.assertTrue(hits > misses)
39        self.assertEqual(hits + misses, 1000)
40        self.assertEqual(currsize, 20)
41
42        f.cache_clear()   # test clearing
43        hits, misses, maxsize, currsize = f.cache_info()
44        self.assertEqual(hits, 0)
45        self.assertEqual(misses, 0)
46        self.assertEqual(currsize, 0)
47        f(x, y)
48        hits, misses, maxsize, currsize = f.cache_info()
49        self.assertEqual(hits, 0)
50        self.assertEqual(misses, 1)
51        self.assertEqual(currsize, 1)
52
53        # Test bypassing the cache
54        if hasattr(self, 'assertIs'):
55            self.assertIs(f.__wrapped__, orig)
56        f.__wrapped__(x, y)
57        hits, misses, maxsize, currsize = f.cache_info()
58        self.assertEqual(hits, 0)
59        self.assertEqual(misses, 1)
60        self.assertEqual(currsize, 1)
61
62        # test size zero (which means "never-cache")
63        @fastcache.clru_cache(0)
64        def f():
65            #nonlocal f_cnt
66            f_cnt[0] += 1
67            return 20
68        self.assertEqual(f.cache_info().maxsize, 0)
69        f_cnt = [0]
70        for i in range(5):
71            self.assertEqual(f(), 20)
72        self.assertEqual(f_cnt, [5])
73        hits, misses, maxsize, currsize = f.cache_info()
74        self.assertEqual(hits, 0)
75        self.assertEqual(misses, 5)
76        self.assertEqual(currsize, 0)
77
78        # test size one
79        @fastcache.clru_cache(1)
80        def f():
81            #nonlocal f_cnt
82            f_cnt[0] += 1
83            return 20
84        self.assertEqual(f.cache_info().maxsize, 1)
85        f_cnt[0] = 0
86        for i in range(5):
87            self.assertEqual(f(), 20)
88        self.assertEqual(f_cnt, [1])
89        hits, misses, maxsize, currsize = f.cache_info()
90        self.assertEqual(hits, 4)
91        self.assertEqual(misses, 1)
92        self.assertEqual(currsize, 1)
93
94        # test size two
95        @fastcache.clru_cache(2)
96        def f(x):
97            #nonlocal f_cnt
98            f_cnt[0] += 1
99            return x*10
100        self.assertEqual(f.cache_info().maxsize, 2)
101        f_cnt[0] = 0
102        for x in 7, 9, 7, 9, 7, 9, 8, 8, 8, 9, 9, 9, 8, 8, 8, 7:
103            #    *  *              *                          *
104            self.assertEqual(f(x), x*10)
105        self.assertEqual(f_cnt, [4])
106        hits, misses, maxsize, currsize = f.cache_info()
107        self.assertEqual(hits, 12)
108        self.assertEqual(misses, 4)
109        self.assertEqual(currsize, 2)
110
111    def test_lru_with_maxsize_none(self):
112        @fastcache.clru_cache(maxsize=None)
113        def fib(n):
114            if n < 2:
115                return n
116            return fib(n-1) + fib(n-2)
117        self.assertEqual([fib(n) for n in range(16)],
118            [0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610])
119        self.assertEqual(fib.cache_info(),
120            _CacheInfo(hits=28, misses=16, maxsize=None, currsize=16))
121        fib.cache_clear()
122        self.assertEqual(fib.cache_info(),
123            _CacheInfo(hits=0, misses=0, maxsize=None, currsize=0))
124
125    def test_lru_with_exceptions(self):
126        # Verify that user_function exceptions get passed through without
127        # creating a hard-to-read chained exception.
128        # http://bugs.python.org/issue13177
129        for maxsize in (None, 128):
130            @fastcache.clru_cache(maxsize)
131            def func(i):
132                return 'abc'[i]
133            self.assertEqual(func(0), 'a')
134            try:
135                with self.assertRaises(IndexError) as cm:
136                    func(15)
137                # Does not have this attribute in Py2
138                if hasattr(cm.exception,'__context__'):
139                    self.assertIsNone(cm.exception.__context__)
140                # Verify that the previous exception did not result in a cached entry
141                with self.assertRaises(IndexError):
142                    func(15)
143            except TypeError:
144                # py26 unittest wants assertRaises called with another arg
145                if sys.version_info[:2] != (2, 6):
146                    raise
147                else:
148                    pass
149
150    def test_lru_with_types(self):
151        for maxsize in (None, 128):
152            @fastcache.clru_cache(maxsize=maxsize, typed=True)
153            def square(x):
154                return x * x
155            self.assertEqual(square(3), 9)
156            self.assertEqual(type(square(3)), type(9))
157            self.assertEqual(square(3.0), 9.0)
158            self.assertEqual(type(square(3.0)), type(9.0))
159            self.assertEqual(square(x=3), 9)
160            self.assertEqual(type(square(x=3)), type(9))
161            self.assertEqual(square(x=3.0), 9.0)
162            self.assertEqual(type(square(x=3.0)), type(9.0))
163            self.assertEqual(square.cache_info().hits, 4)
164            self.assertEqual(square.cache_info().misses, 4)
165
166    def test_lru_with_keyword_args(self):
167        @fastcache.clru_cache()
168        def fib(n):
169            if n < 2:
170                return n
171            return fib(n=n-1) + fib(n=n-2)
172        self.assertEqual(
173            [fib(n=number) for number in range(16)],
174            [0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610]
175        )
176        self.assertEqual(fib.cache_info(),
177            _CacheInfo(hits=28, misses=16, maxsize=128, currsize=16))
178        fib.cache_clear()
179        self.assertEqual(fib.cache_info(),
180            _CacheInfo(hits=0, misses=0, maxsize=128, currsize=0))
181
182    def test_lru_with_keyword_args_maxsize_none(self):
183        @fastcache.clru_cache(maxsize=None)
184        def fib(n):
185            if n < 2:
186                return n
187            return fib(n=n-1) + fib(n=n-2)
188        self.assertEqual([fib(n=number) for number in range(16)],
189            [0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610])
190        self.assertEqual(fib.cache_info(),
191            _CacheInfo(hits=28, misses=16, maxsize=None, currsize=16))
192        fib.cache_clear()
193        self.assertEqual(fib.cache_info(),
194            _CacheInfo(hits=0, misses=0, maxsize=None, currsize=0))
195
196    def test_need_for_rlock(self):
197        # This will deadlock on an LRU cache that uses a regular lock
198
199        @fastcache.clru_cache(maxsize=10)
200        def test_func(x):
201            'Used to demonstrate a reentrant lru_cache call within a single thread'
202            return x
203
204        class DoubleEq:
205            'Demonstrate a reentrant lru_cache call within a single thread'
206            def __init__(self, x):
207                self.x = x
208            def __hash__(self):
209                return self.x
210            def __eq__(self, other):
211                if self.x == 2:
212                    test_func(DoubleEq(1))
213                return self.x == other.x
214
215        test_func(DoubleEq(1))                      # Load the cache
216        test_func(DoubleEq(2))                      # Load the cache
217        self.assertEqual(test_func(DoubleEq(2)),    # Trigger a re-entrant __eq__ call
218                         DoubleEq(2))               # Verify the correct return value
219