1# partial unit test for gmpy threaded mpz functionality
2# relies on Tim Peters' "doctest.py" test-driver
3
4import gmpy as _g, doctest, sys, operator, gc, queue, threading
5from functools import reduce
6__test__={}
7
8def _tf(N=2, _K=1234**5678):
9    """Takes about 100ms on a first-generation Macbook Pro"""
10    for i in range(N): assert (_g.mpz(1234)**5678)==_K
11a=_g.mpz(123)
12b=_g.mpz(456)
13c=_g.mpz(123456789123456789)
14
15def factorize(x=c):
16    r'''
17    (Takes about 25ms, on c, on a first-generation Macbook Pro)
18    >>> factorize(a)
19    [3, 41]
20    >>> factorize(b)
21    [2, 2, 2, 3, 19]
22    >>>
23    '''
24    import gmpy as _g
25    savex=x
26    prime=2
27    x=_g.mpz(x)
28    factors=[]
29    while x>=prime:
30        newx,mult=x.remove(prime)
31        if mult:
32            factors.extend([int(prime)]*mult)
33            x=newx
34        prime=_g.next_prime(prime)
35    for factor in factors: assert _g.is_prime(factor)
36    from operator import mul
37    assert reduce(mul, factors)==savex
38    return factors
39
40def elemop(N=1000):
41    r'''
42    (Takes about 40ms on a first-generation Macbook Pro)
43    '''
44    for i in range(N):
45        assert a+b == 579
46        assert a-b == -333
47        assert b*a == a*b == 56088
48        assert b%a == 87
49        assert divmod(a, b) == (0, 123)
50        assert divmod(b, a) == (3, 87)
51        assert -a == -123
52        assert pow(a, 10) == 792594609605189126649
53        assert pow(a, 7, b) == 99
54        assert cmp(a, b) == -1
55        assert '7' in str(c)
56        assert '0' not in str(c)
57        assert a.sqrt() == 11
58        assert _g.lcm(a, b) == 18696
59        assert _g.fac(7) == 5040
60        assert _g.fib(17) == 1597
61        assert _g.divm(b, a, 20) == 12
62        assert _g.divm(4, 8, 20) == 3
63        assert _g.divm(4, 8, 20) == 3
64        assert _g.mpz(20) == 20
65        assert _g.mpz(8) == 8
66        assert _g.mpz(4) == 4
67        assert a.invert(100) == 87
68
69def _test(chat=None):
70    if chat:
71        print("Unit tests for gmpy 1.17 (threading)")
72        print("    running on Python", sys.version)
73        print()
74        if _g.gmp_version():
75            print("Testing gmpy %s (GMP %s) with default caching (%s, %s)" % (
76                (_g.version(), _g.gmp_version(), _g.get_cache()[0],
77                _g.get_cache()[1])))
78        else:
79            print("Testing gmpy %s (MPIR %s) with default caching (%s, %s)" % (
80                (_g.version(), _g.mpir_version(), _g.get_cache()[0],
81                _g.get_cache()[1])))
82
83    thismod = sys.modules.get(__name__)
84    doctest.testmod(thismod, report=0)
85
86    if chat: print("Repeating tests, with caching disabled")
87    _g.set_cache(0,128)
88
89    sav = sys.stdout
90    class _Dummy:
91        def write(self,*whatever):
92            pass
93    try:
94        sys.stdout = _Dummy()
95        doctest.testmod(thismod, report=0)
96    finally:
97        sys.stdout = sav
98
99    if chat:
100        print()
101        print("Overall results for thr:")
102    return doctest.master.summarize(chat)
103
104class DoOne(threading.Thread):
105    def __init__(self, q):
106        threading.Thread.__init__(self)
107        self.q = q
108    def run(self):
109        while True:
110            task = self.q.get()
111            if task is None: break
112            task()
113
114def _test_thr(Ntasks=5, Nthreads=1):
115    q = queue.Queue()
116    funcs = (_tf, 1), (factorize, 4), (elemop, 2)
117    for i in range(Ntasks):
118        for f, n in funcs:
119            for x in range(n):
120                q.put(f)
121    for i in range(Nthreads):
122        q.put(None)
123    thrs = [DoOne(q) for i in range(Nthreads)]
124    for t in thrs: t.start()
125    for t in thrs: t.join()
126
127if __name__=='__main__':
128    _test(1)
129
130