1# partial unit test for gmpy threaded mpz functionality 2# relies on Tim Peters' "doctest.py" test-driver 3 4import gmpy as _g, doctest, sys, operator, gc, queue, threading 5from functools import reduce 6__test__={} 7 8def _tf(N=2, _K=1234**5678): 9 """Takes about 100ms on a first-generation Macbook Pro""" 10 for i in range(N): assert (_g.mpz(1234)**5678)==_K 11a=_g.mpz(123) 12b=_g.mpz(456) 13c=_g.mpz(123456789123456789) 14 15def factorize(x=c): 16 r''' 17 (Takes about 25ms, on c, on a first-generation Macbook Pro) 18 >>> factorize(a) 19 [3, 41] 20 >>> factorize(b) 21 [2, 2, 2, 3, 19] 22 >>> 23 ''' 24 import gmpy as _g 25 savex=x 26 prime=2 27 x=_g.mpz(x) 28 factors=[] 29 while x>=prime: 30 newx,mult=x.remove(prime) 31 if mult: 32 factors.extend([int(prime)]*mult) 33 x=newx 34 prime=_g.next_prime(prime) 35 for factor in factors: assert _g.is_prime(factor) 36 from operator import mul 37 assert reduce(mul, factors)==savex 38 return factors 39 40def elemop(N=1000): 41 r''' 42 (Takes about 40ms on a first-generation Macbook Pro) 43 ''' 44 for i in range(N): 45 assert a+b == 579 46 assert a-b == -333 47 assert b*a == a*b == 56088 48 assert b%a == 87 49 assert divmod(a, b) == (0, 123) 50 assert divmod(b, a) == (3, 87) 51 assert -a == -123 52 assert pow(a, 10) == 792594609605189126649 53 assert pow(a, 7, b) == 99 54 assert cmp(a, b) == -1 55 assert '7' in str(c) 56 assert '0' not in str(c) 57 assert a.sqrt() == 11 58 assert _g.lcm(a, b) == 18696 59 assert _g.fac(7) == 5040 60 assert _g.fib(17) == 1597 61 assert _g.divm(b, a, 20) == 12 62 assert _g.divm(4, 8, 20) == 3 63 assert _g.divm(4, 8, 20) == 3 64 assert _g.mpz(20) == 20 65 assert _g.mpz(8) == 8 66 assert _g.mpz(4) == 4 67 assert a.invert(100) == 87 68 69def _test(chat=None): 70 if chat: 71 print("Unit tests for gmpy 1.17 (threading)") 72 print(" running on Python", sys.version) 73 print() 74 if _g.gmp_version(): 75 print("Testing gmpy %s (GMP %s) with default caching (%s, %s)" % ( 76 (_g.version(), _g.gmp_version(), _g.get_cache()[0], 77 _g.get_cache()[1]))) 78 else: 79 print("Testing gmpy %s (MPIR %s) with default caching (%s, %s)" % ( 80 (_g.version(), _g.mpir_version(), _g.get_cache()[0], 81 _g.get_cache()[1]))) 82 83 thismod = sys.modules.get(__name__) 84 doctest.testmod(thismod, report=0) 85 86 if chat: print("Repeating tests, with caching disabled") 87 _g.set_cache(0,128) 88 89 sav = sys.stdout 90 class _Dummy: 91 def write(self,*whatever): 92 pass 93 try: 94 sys.stdout = _Dummy() 95 doctest.testmod(thismod, report=0) 96 finally: 97 sys.stdout = sav 98 99 if chat: 100 print() 101 print("Overall results for thr:") 102 return doctest.master.summarize(chat) 103 104class DoOne(threading.Thread): 105 def __init__(self, q): 106 threading.Thread.__init__(self) 107 self.q = q 108 def run(self): 109 while True: 110 task = self.q.get() 111 if task is None: break 112 task() 113 114def _test_thr(Ntasks=5, Nthreads=1): 115 q = queue.Queue() 116 funcs = (_tf, 1), (factorize, 4), (elemop, 2) 117 for i in range(Ntasks): 118 for f, n in funcs: 119 for x in range(n): 120 q.put(f) 121 for i in range(Nthreads): 122 q.put(None) 123 thrs = [DoOne(q) for i in range(Nthreads)] 124 for t in thrs: t.start() 125 for t in thrs: t.join() 126 127if __name__=='__main__': 128 _test(1) 129 130