1import doctest 2import unittest 3from test import support 4from itertools import * 5import weakref 6from decimal import Decimal 7from fractions import Fraction 8import operator 9import random 10import copy 11import pickle 12from functools import reduce 13import sys 14import struct 15import threading 16import gc 17 18maxsize = support.MAX_Py_ssize_t 19minsize = -maxsize-1 20 21def lzip(*args): 22 return list(zip(*args)) 23 24def onearg(x): 25 'Test function of one argument' 26 return 2*x 27 28def errfunc(*args): 29 'Test function that raises an error' 30 raise ValueError 31 32def gen3(): 33 'Non-restartable source sequence' 34 for i in (0, 1, 2): 35 yield i 36 37def isEven(x): 38 'Test predicate' 39 return x%2==0 40 41def isOdd(x): 42 'Test predicate' 43 return x%2==1 44 45def tupleize(*args): 46 return args 47 48def irange(n): 49 for i in range(n): 50 yield i 51 52class StopNow: 53 'Class emulating an empty iterable.' 54 def __iter__(self): 55 return self 56 def __next__(self): 57 raise StopIteration 58 59def take(n, seq): 60 'Convenience function for partially consuming a long of infinite iterable' 61 return list(islice(seq, n)) 62 63def prod(iterable): 64 return reduce(operator.mul, iterable, 1) 65 66def fact(n): 67 'Factorial' 68 return prod(range(1, n+1)) 69 70# root level methods for pickling ability 71def testR(r): 72 return r[0] 73 74def testR2(r): 75 return r[2] 76 77def underten(x): 78 return x<10 79 80picklecopiers = [lambda s, proto=proto: pickle.loads(pickle.dumps(s, proto)) 81 for proto in range(pickle.HIGHEST_PROTOCOL + 1)] 82 83class TestBasicOps(unittest.TestCase): 84 85 def pickletest(self, protocol, it, stop=4, take=1, compare=None): 86 """Test that an iterator is the same after pickling, also when part-consumed""" 87 def expand(it, i=0): 88 # Recursively expand iterables, within sensible bounds 89 if i > 10: 90 raise RuntimeError("infinite recursion encountered") 91 if isinstance(it, str): 92 return it 93 try: 94 l = list(islice(it, stop)) 95 except TypeError: 96 return it # can't expand it 97 return [expand(e, i+1) for e in l] 98 99 # Test the initial copy against the original 100 dump = pickle.dumps(it, protocol) 101 i2 = pickle.loads(dump) 102 self.assertEqual(type(it), type(i2)) 103 a, b = expand(it), expand(i2) 104 self.assertEqual(a, b) 105 if compare: 106 c = expand(compare) 107 self.assertEqual(a, c) 108 109 # Take from the copy, and create another copy and compare them. 110 i3 = pickle.loads(dump) 111 took = 0 112 try: 113 for i in range(take): 114 next(i3) 115 took += 1 116 except StopIteration: 117 pass #in case there is less data than 'take' 118 dump = pickle.dumps(i3, protocol) 119 i4 = pickle.loads(dump) 120 a, b = expand(i3), expand(i4) 121 self.assertEqual(a, b) 122 if compare: 123 c = expand(compare[took:]) 124 self.assertEqual(a, c); 125 126 def test_accumulate(self): 127 self.assertEqual(list(accumulate(range(10))), # one positional arg 128 [0, 1, 3, 6, 10, 15, 21, 28, 36, 45]) 129 self.assertEqual(list(accumulate(iterable=range(10))), # kw arg 130 [0, 1, 3, 6, 10, 15, 21, 28, 36, 45]) 131 for typ in int, complex, Decimal, Fraction: # multiple types 132 self.assertEqual( 133 list(accumulate(map(typ, range(10)))), 134 list(map(typ, [0, 1, 3, 6, 10, 15, 21, 28, 36, 45]))) 135 self.assertEqual(list(accumulate('abc')), ['a', 'ab', 'abc']) # works with non-numeric 136 self.assertEqual(list(accumulate([])), []) # empty iterable 137 self.assertEqual(list(accumulate([7])), [7]) # iterable of length one 138 self.assertRaises(TypeError, accumulate, range(10), 5, 6) # too many args 139 self.assertRaises(TypeError, accumulate) # too few args 140 self.assertRaises(TypeError, accumulate, x=range(10)) # unexpected kwd arg 141 self.assertRaises(TypeError, list, accumulate([1, []])) # args that don't add 142 143 s = [2, 8, 9, 5, 7, 0, 3, 4, 1, 6] 144 self.assertEqual(list(accumulate(s, min)), 145 [2, 2, 2, 2, 2, 0, 0, 0, 0, 0]) 146 self.assertEqual(list(accumulate(s, max)), 147 [2, 8, 9, 9, 9, 9, 9, 9, 9, 9]) 148 self.assertEqual(list(accumulate(s, operator.mul)), 149 [2, 16, 144, 720, 5040, 0, 0, 0, 0, 0]) 150 with self.assertRaises(TypeError): 151 list(accumulate(s, chr)) # unary-operation 152 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 153 self.pickletest(proto, accumulate(range(10))) # test pickling 154 self.pickletest(proto, accumulate(range(10), initial=7)) 155 self.assertEqual(list(accumulate([10, 5, 1], initial=None)), [10, 15, 16]) 156 self.assertEqual(list(accumulate([10, 5, 1], initial=100)), [100, 110, 115, 116]) 157 self.assertEqual(list(accumulate([], initial=100)), [100]) 158 with self.assertRaises(TypeError): 159 list(accumulate([10, 20], 100)) 160 161 def test_chain(self): 162 163 def chain2(*iterables): 164 'Pure python version in the docs' 165 for it in iterables: 166 for element in it: 167 yield element 168 169 for c in (chain, chain2): 170 self.assertEqual(list(c('abc', 'def')), list('abcdef')) 171 self.assertEqual(list(c('abc')), list('abc')) 172 self.assertEqual(list(c('')), []) 173 self.assertEqual(take(4, c('abc', 'def')), list('abcd')) 174 self.assertRaises(TypeError, list,c(2, 3)) 175 176 def test_chain_from_iterable(self): 177 self.assertEqual(list(chain.from_iterable(['abc', 'def'])), list('abcdef')) 178 self.assertEqual(list(chain.from_iterable(['abc'])), list('abc')) 179 self.assertEqual(list(chain.from_iterable([''])), []) 180 self.assertEqual(take(4, chain.from_iterable(['abc', 'def'])), list('abcd')) 181 self.assertRaises(TypeError, list, chain.from_iterable([2, 3])) 182 183 def test_chain_reducible(self): 184 for oper in [copy.deepcopy] + picklecopiers: 185 it = chain('abc', 'def') 186 self.assertEqual(list(oper(it)), list('abcdef')) 187 self.assertEqual(next(it), 'a') 188 self.assertEqual(list(oper(it)), list('bcdef')) 189 190 self.assertEqual(list(oper(chain(''))), []) 191 self.assertEqual(take(4, oper(chain('abc', 'def'))), list('abcd')) 192 self.assertRaises(TypeError, list, oper(chain(2, 3))) 193 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 194 self.pickletest(proto, chain('abc', 'def'), compare=list('abcdef')) 195 196 def test_chain_setstate(self): 197 self.assertRaises(TypeError, chain().__setstate__, ()) 198 self.assertRaises(TypeError, chain().__setstate__, []) 199 self.assertRaises(TypeError, chain().__setstate__, 0) 200 self.assertRaises(TypeError, chain().__setstate__, ([],)) 201 self.assertRaises(TypeError, chain().__setstate__, (iter([]), [])) 202 it = chain() 203 it.__setstate__((iter(['abc', 'def']),)) 204 self.assertEqual(list(it), ['a', 'b', 'c', 'd', 'e', 'f']) 205 it = chain() 206 it.__setstate__((iter(['abc', 'def']), iter(['ghi']))) 207 self.assertEqual(list(it), ['ghi', 'a', 'b', 'c', 'd', 'e', 'f']) 208 209 def test_combinations(self): 210 self.assertRaises(TypeError, combinations, 'abc') # missing r argument 211 self.assertRaises(TypeError, combinations, 'abc', 2, 1) # too many arguments 212 self.assertRaises(TypeError, combinations, None) # pool is not iterable 213 self.assertRaises(ValueError, combinations, 'abc', -2) # r is negative 214 215 for op in [lambda a:a] + picklecopiers: 216 self.assertEqual(list(op(combinations('abc', 32))), []) # r > n 217 218 self.assertEqual(list(op(combinations('ABCD', 2))), 219 [('A','B'), ('A','C'), ('A','D'), ('B','C'), ('B','D'), ('C','D')]) 220 testIntermediate = combinations('ABCD', 2) 221 next(testIntermediate) 222 self.assertEqual(list(op(testIntermediate)), 223 [('A','C'), ('A','D'), ('B','C'), ('B','D'), ('C','D')]) 224 225 self.assertEqual(list(op(combinations(range(4), 3))), 226 [(0,1,2), (0,1,3), (0,2,3), (1,2,3)]) 227 testIntermediate = combinations(range(4), 3) 228 next(testIntermediate) 229 self.assertEqual(list(op(testIntermediate)), 230 [(0,1,3), (0,2,3), (1,2,3)]) 231 232 233 def combinations1(iterable, r): 234 'Pure python version shown in the docs' 235 pool = tuple(iterable) 236 n = len(pool) 237 if r > n: 238 return 239 indices = list(range(r)) 240 yield tuple(pool[i] for i in indices) 241 while 1: 242 for i in reversed(range(r)): 243 if indices[i] != i + n - r: 244 break 245 else: 246 return 247 indices[i] += 1 248 for j in range(i+1, r): 249 indices[j] = indices[j-1] + 1 250 yield tuple(pool[i] for i in indices) 251 252 def combinations2(iterable, r): 253 'Pure python version shown in the docs' 254 pool = tuple(iterable) 255 n = len(pool) 256 for indices in permutations(range(n), r): 257 if sorted(indices) == list(indices): 258 yield tuple(pool[i] for i in indices) 259 260 def combinations3(iterable, r): 261 'Pure python version from cwr()' 262 pool = tuple(iterable) 263 n = len(pool) 264 for indices in combinations_with_replacement(range(n), r): 265 if len(set(indices)) == r: 266 yield tuple(pool[i] for i in indices) 267 268 for n in range(7): 269 values = [5*x-12 for x in range(n)] 270 for r in range(n+2): 271 result = list(combinations(values, r)) 272 self.assertEqual(len(result), 0 if r>n else fact(n) / fact(r) / fact(n-r)) # right number of combs 273 self.assertEqual(len(result), len(set(result))) # no repeats 274 self.assertEqual(result, sorted(result)) # lexicographic order 275 for c in result: 276 self.assertEqual(len(c), r) # r-length combinations 277 self.assertEqual(len(set(c)), r) # no duplicate elements 278 self.assertEqual(list(c), sorted(c)) # keep original ordering 279 self.assertTrue(all(e in values for e in c)) # elements taken from input iterable 280 self.assertEqual(list(c), 281 [e for e in values if e in c]) # comb is a subsequence of the input iterable 282 self.assertEqual(result, list(combinations1(values, r))) # matches first pure python version 283 self.assertEqual(result, list(combinations2(values, r))) # matches second pure python version 284 self.assertEqual(result, list(combinations3(values, r))) # matches second pure python version 285 286 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 287 self.pickletest(proto, combinations(values, r)) # test pickling 288 289 @support.bigaddrspacetest 290 def test_combinations_overflow(self): 291 with self.assertRaises((OverflowError, MemoryError)): 292 combinations("AA", 2**29) 293 294 # Test implementation detail: tuple re-use 295 @support.impl_detail("tuple reuse is specific to CPython") 296 def test_combinations_tuple_reuse(self): 297 self.assertEqual(len(set(map(id, combinations('abcde', 3)))), 1) 298 self.assertNotEqual(len(set(map(id, list(combinations('abcde', 3))))), 1) 299 300 def test_combinations_with_replacement(self): 301 cwr = combinations_with_replacement 302 self.assertRaises(TypeError, cwr, 'abc') # missing r argument 303 self.assertRaises(TypeError, cwr, 'abc', 2, 1) # too many arguments 304 self.assertRaises(TypeError, cwr, None) # pool is not iterable 305 self.assertRaises(ValueError, cwr, 'abc', -2) # r is negative 306 307 for op in [lambda a:a] + picklecopiers: 308 self.assertEqual(list(op(cwr('ABC', 2))), 309 [('A','A'), ('A','B'), ('A','C'), ('B','B'), ('B','C'), ('C','C')]) 310 testIntermediate = cwr('ABC', 2) 311 next(testIntermediate) 312 self.assertEqual(list(op(testIntermediate)), 313 [('A','B'), ('A','C'), ('B','B'), ('B','C'), ('C','C')]) 314 315 316 def cwr1(iterable, r): 317 'Pure python version shown in the docs' 318 # number items returned: (n+r-1)! / r! / (n-1)! when n>0 319 pool = tuple(iterable) 320 n = len(pool) 321 if not n and r: 322 return 323 indices = [0] * r 324 yield tuple(pool[i] for i in indices) 325 while 1: 326 for i in reversed(range(r)): 327 if indices[i] != n - 1: 328 break 329 else: 330 return 331 indices[i:] = [indices[i] + 1] * (r - i) 332 yield tuple(pool[i] for i in indices) 333 334 def cwr2(iterable, r): 335 'Pure python version shown in the docs' 336 pool = tuple(iterable) 337 n = len(pool) 338 for indices in product(range(n), repeat=r): 339 if sorted(indices) == list(indices): 340 yield tuple(pool[i] for i in indices) 341 342 def numcombs(n, r): 343 if not n: 344 return 0 if r else 1 345 return fact(n+r-1) / fact(r)/ fact(n-1) 346 347 for n in range(7): 348 values = [5*x-12 for x in range(n)] 349 for r in range(n+2): 350 result = list(cwr(values, r)) 351 352 self.assertEqual(len(result), numcombs(n, r)) # right number of combs 353 self.assertEqual(len(result), len(set(result))) # no repeats 354 self.assertEqual(result, sorted(result)) # lexicographic order 355 356 regular_combs = list(combinations(values, r)) # compare to combs without replacement 357 if n == 0 or r <= 1: 358 self.assertEqual(result, regular_combs) # cases that should be identical 359 else: 360 self.assertTrue(set(result) >= set(regular_combs)) # rest should be supersets of regular combs 361 362 for c in result: 363 self.assertEqual(len(c), r) # r-length combinations 364 noruns = [k for k,v in groupby(c)] # combo without consecutive repeats 365 self.assertEqual(len(noruns), len(set(noruns))) # no repeats other than consecutive 366 self.assertEqual(list(c), sorted(c)) # keep original ordering 367 self.assertTrue(all(e in values for e in c)) # elements taken from input iterable 368 self.assertEqual(noruns, 369 [e for e in values if e in c]) # comb is a subsequence of the input iterable 370 self.assertEqual(result, list(cwr1(values, r))) # matches first pure python version 371 self.assertEqual(result, list(cwr2(values, r))) # matches second pure python version 372 373 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 374 self.pickletest(proto, cwr(values,r)) # test pickling 375 376 @support.bigaddrspacetest 377 def test_combinations_with_replacement_overflow(self): 378 with self.assertRaises((OverflowError, MemoryError)): 379 combinations_with_replacement("AA", 2**30) 380 381 # Test implementation detail: tuple re-use 382 @support.impl_detail("tuple reuse is specific to CPython") 383 def test_combinations_with_replacement_tuple_reuse(self): 384 cwr = combinations_with_replacement 385 self.assertEqual(len(set(map(id, cwr('abcde', 3)))), 1) 386 self.assertNotEqual(len(set(map(id, list(cwr('abcde', 3))))), 1) 387 388 def test_permutations(self): 389 self.assertRaises(TypeError, permutations) # too few arguments 390 self.assertRaises(TypeError, permutations, 'abc', 2, 1) # too many arguments 391 self.assertRaises(TypeError, permutations, None) # pool is not iterable 392 self.assertRaises(ValueError, permutations, 'abc', -2) # r is negative 393 self.assertEqual(list(permutations('abc', 32)), []) # r > n 394 self.assertRaises(TypeError, permutations, 'abc', 's') # r is not an int or None 395 self.assertEqual(list(permutations(range(3), 2)), 396 [(0,1), (0,2), (1,0), (1,2), (2,0), (2,1)]) 397 398 def permutations1(iterable, r=None): 399 'Pure python version shown in the docs' 400 pool = tuple(iterable) 401 n = len(pool) 402 r = n if r is None else r 403 if r > n: 404 return 405 indices = list(range(n)) 406 cycles = list(range(n-r+1, n+1))[::-1] 407 yield tuple(pool[i] for i in indices[:r]) 408 while n: 409 for i in reversed(range(r)): 410 cycles[i] -= 1 411 if cycles[i] == 0: 412 indices[i:] = indices[i+1:] + indices[i:i+1] 413 cycles[i] = n - i 414 else: 415 j = cycles[i] 416 indices[i], indices[-j] = indices[-j], indices[i] 417 yield tuple(pool[i] for i in indices[:r]) 418 break 419 else: 420 return 421 422 def permutations2(iterable, r=None): 423 'Pure python version shown in the docs' 424 pool = tuple(iterable) 425 n = len(pool) 426 r = n if r is None else r 427 for indices in product(range(n), repeat=r): 428 if len(set(indices)) == r: 429 yield tuple(pool[i] for i in indices) 430 431 for n in range(7): 432 values = [5*x-12 for x in range(n)] 433 for r in range(n+2): 434 result = list(permutations(values, r)) 435 self.assertEqual(len(result), 0 if r>n else fact(n) / fact(n-r)) # right number of perms 436 self.assertEqual(len(result), len(set(result))) # no repeats 437 self.assertEqual(result, sorted(result)) # lexicographic order 438 for p in result: 439 self.assertEqual(len(p), r) # r-length permutations 440 self.assertEqual(len(set(p)), r) # no duplicate elements 441 self.assertTrue(all(e in values for e in p)) # elements taken from input iterable 442 self.assertEqual(result, list(permutations1(values, r))) # matches first pure python version 443 self.assertEqual(result, list(permutations2(values, r))) # matches second pure python version 444 if r == n: 445 self.assertEqual(result, list(permutations(values, None))) # test r as None 446 self.assertEqual(result, list(permutations(values))) # test default r 447 448 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 449 self.pickletest(proto, permutations(values, r)) # test pickling 450 451 @support.bigaddrspacetest 452 def test_permutations_overflow(self): 453 with self.assertRaises((OverflowError, MemoryError)): 454 permutations("A", 2**30) 455 456 @support.impl_detail("tuple reuse is specific to CPython") 457 def test_permutations_tuple_reuse(self): 458 self.assertEqual(len(set(map(id, permutations('abcde', 3)))), 1) 459 self.assertNotEqual(len(set(map(id, list(permutations('abcde', 3))))), 1) 460 461 def test_combinatorics(self): 462 # Test relationships between product(), permutations(), 463 # combinations() and combinations_with_replacement(). 464 465 for n in range(6): 466 s = 'ABCDEFG'[:n] 467 for r in range(8): 468 prod = list(product(s, repeat=r)) 469 cwr = list(combinations_with_replacement(s, r)) 470 perm = list(permutations(s, r)) 471 comb = list(combinations(s, r)) 472 473 # Check size 474 self.assertEqual(len(prod), n**r) 475 self.assertEqual(len(cwr), (fact(n+r-1) / fact(r)/ fact(n-1)) if n else (not r)) 476 self.assertEqual(len(perm), 0 if r>n else fact(n) / fact(n-r)) 477 self.assertEqual(len(comb), 0 if r>n else fact(n) / fact(r) / fact(n-r)) 478 479 # Check lexicographic order without repeated tuples 480 self.assertEqual(prod, sorted(set(prod))) 481 self.assertEqual(cwr, sorted(set(cwr))) 482 self.assertEqual(perm, sorted(set(perm))) 483 self.assertEqual(comb, sorted(set(comb))) 484 485 # Check interrelationships 486 self.assertEqual(cwr, [t for t in prod if sorted(t)==list(t)]) # cwr: prods which are sorted 487 self.assertEqual(perm, [t for t in prod if len(set(t))==r]) # perm: prods with no dups 488 self.assertEqual(comb, [t for t in perm if sorted(t)==list(t)]) # comb: perms that are sorted 489 self.assertEqual(comb, [t for t in cwr if len(set(t))==r]) # comb: cwrs without dups 490 self.assertEqual(comb, list(filter(set(cwr).__contains__, perm))) # comb: perm that is a cwr 491 self.assertEqual(comb, list(filter(set(perm).__contains__, cwr))) # comb: cwr that is a perm 492 self.assertEqual(comb, sorted(set(cwr) & set(perm))) # comb: both a cwr and a perm 493 494 def test_compress(self): 495 self.assertEqual(list(compress(data='ABCDEF', selectors=[1,0,1,0,1,1])), list('ACEF')) 496 self.assertEqual(list(compress('ABCDEF', [1,0,1,0,1,1])), list('ACEF')) 497 self.assertEqual(list(compress('ABCDEF', [0,0,0,0,0,0])), list('')) 498 self.assertEqual(list(compress('ABCDEF', [1,1,1,1,1,1])), list('ABCDEF')) 499 self.assertEqual(list(compress('ABCDEF', [1,0,1])), list('AC')) 500 self.assertEqual(list(compress('ABC', [0,1,1,1,1,1])), list('BC')) 501 n = 10000 502 data = chain.from_iterable(repeat(range(6), n)) 503 selectors = chain.from_iterable(repeat((0, 1))) 504 self.assertEqual(list(compress(data, selectors)), [1,3,5] * n) 505 self.assertRaises(TypeError, compress, None, range(6)) # 1st arg not iterable 506 self.assertRaises(TypeError, compress, range(6), None) # 2nd arg not iterable 507 self.assertRaises(TypeError, compress, range(6)) # too few args 508 self.assertRaises(TypeError, compress, range(6), None) # too many args 509 510 # check copy, deepcopy, pickle 511 for op in [lambda a:copy.copy(a), lambda a:copy.deepcopy(a)] + picklecopiers: 512 for data, selectors, result1, result2 in [ 513 ('ABCDEF', [1,0,1,0,1,1], 'ACEF', 'CEF'), 514 ('ABCDEF', [0,0,0,0,0,0], '', ''), 515 ('ABCDEF', [1,1,1,1,1,1], 'ABCDEF', 'BCDEF'), 516 ('ABCDEF', [1,0,1], 'AC', 'C'), 517 ('ABC', [0,1,1,1,1,1], 'BC', 'C'), 518 ]: 519 520 self.assertEqual(list(op(compress(data=data, selectors=selectors))), list(result1)) 521 self.assertEqual(list(op(compress(data, selectors))), list(result1)) 522 testIntermediate = compress(data, selectors) 523 if result1: 524 next(testIntermediate) 525 self.assertEqual(list(op(testIntermediate)), list(result2)) 526 527 528 def test_count(self): 529 self.assertEqual(lzip('abc',count()), [('a', 0), ('b', 1), ('c', 2)]) 530 self.assertEqual(lzip('abc',count(3)), [('a', 3), ('b', 4), ('c', 5)]) 531 self.assertEqual(take(2, lzip('abc',count(3))), [('a', 3), ('b', 4)]) 532 self.assertEqual(take(2, zip('abc',count(-1))), [('a', -1), ('b', 0)]) 533 self.assertEqual(take(2, zip('abc',count(-3))), [('a', -3), ('b', -2)]) 534 self.assertRaises(TypeError, count, 2, 3, 4) 535 self.assertRaises(TypeError, count, 'a') 536 self.assertEqual(take(10, count(maxsize-5)), 537 list(range(maxsize-5, maxsize+5))) 538 self.assertEqual(take(10, count(-maxsize-5)), 539 list(range(-maxsize-5, -maxsize+5))) 540 self.assertEqual(take(3, count(3.25)), [3.25, 4.25, 5.25]) 541 self.assertEqual(take(3, count(3.25-4j)), [3.25-4j, 4.25-4j, 5.25-4j]) 542 self.assertEqual(take(3, count(Decimal('1.1'))), 543 [Decimal('1.1'), Decimal('2.1'), Decimal('3.1')]) 544 self.assertEqual(take(3, count(Fraction(2, 3))), 545 [Fraction(2, 3), Fraction(5, 3), Fraction(8, 3)]) 546 BIGINT = 1<<1000 547 self.assertEqual(take(3, count(BIGINT)), [BIGINT, BIGINT+1, BIGINT+2]) 548 c = count(3) 549 self.assertEqual(repr(c), 'count(3)') 550 next(c) 551 self.assertEqual(repr(c), 'count(4)') 552 c = count(-9) 553 self.assertEqual(repr(c), 'count(-9)') 554 next(c) 555 self.assertEqual(next(c), -8) 556 self.assertEqual(repr(count(10.25)), 'count(10.25)') 557 self.assertEqual(repr(count(10.0)), 'count(10.0)') 558 self.assertEqual(type(next(count(10.0))), float) 559 for i in (-sys.maxsize-5, -sys.maxsize+5 ,-10, -1, 0, 10, sys.maxsize-5, sys.maxsize+5): 560 # Test repr 561 r1 = repr(count(i)) 562 r2 = 'count(%r)'.__mod__(i) 563 self.assertEqual(r1, r2) 564 565 # check copy, deepcopy, pickle 566 for value in -3, 3, maxsize-5, maxsize+5: 567 c = count(value) 568 self.assertEqual(next(copy.copy(c)), value) 569 self.assertEqual(next(copy.deepcopy(c)), value) 570 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 571 self.pickletest(proto, count(value)) 572 573 #check proper internal error handling for large "step' sizes 574 count(1, maxsize+5); sys.exc_info() 575 576 def test_count_with_stride(self): 577 self.assertEqual(lzip('abc',count(2,3)), [('a', 2), ('b', 5), ('c', 8)]) 578 self.assertEqual(lzip('abc',count(start=2,step=3)), 579 [('a', 2), ('b', 5), ('c', 8)]) 580 self.assertEqual(lzip('abc',count(step=-1)), 581 [('a', 0), ('b', -1), ('c', -2)]) 582 self.assertRaises(TypeError, count, 'a', 'b') 583 self.assertEqual(lzip('abc',count(2,0)), [('a', 2), ('b', 2), ('c', 2)]) 584 self.assertEqual(lzip('abc',count(2,1)), [('a', 2), ('b', 3), ('c', 4)]) 585 self.assertEqual(lzip('abc',count(2,3)), [('a', 2), ('b', 5), ('c', 8)]) 586 self.assertEqual(take(20, count(maxsize-15, 3)), take(20, range(maxsize-15, maxsize+100, 3))) 587 self.assertEqual(take(20, count(-maxsize-15, 3)), take(20, range(-maxsize-15,-maxsize+100, 3))) 588 self.assertEqual(take(3, count(10, maxsize+5)), 589 list(range(10, 10+3*(maxsize+5), maxsize+5))) 590 self.assertEqual(take(3, count(2, 1.25)), [2, 3.25, 4.5]) 591 self.assertEqual(take(3, count(2, 3.25-4j)), [2, 5.25-4j, 8.5-8j]) 592 self.assertEqual(take(3, count(Decimal('1.1'), Decimal('.1'))), 593 [Decimal('1.1'), Decimal('1.2'), Decimal('1.3')]) 594 self.assertEqual(take(3, count(Fraction(2,3), Fraction(1,7))), 595 [Fraction(2,3), Fraction(17,21), Fraction(20,21)]) 596 BIGINT = 1<<1000 597 self.assertEqual(take(3, count(step=BIGINT)), [0, BIGINT, 2*BIGINT]) 598 self.assertEqual(repr(take(3, count(10, 2.5))), repr([10, 12.5, 15.0])) 599 c = count(3, 5) 600 self.assertEqual(repr(c), 'count(3, 5)') 601 next(c) 602 self.assertEqual(repr(c), 'count(8, 5)') 603 c = count(-9, 0) 604 self.assertEqual(repr(c), 'count(-9, 0)') 605 next(c) 606 self.assertEqual(repr(c), 'count(-9, 0)') 607 c = count(-9, -3) 608 self.assertEqual(repr(c), 'count(-9, -3)') 609 next(c) 610 self.assertEqual(repr(c), 'count(-12, -3)') 611 self.assertEqual(repr(c), 'count(-12, -3)') 612 self.assertEqual(repr(count(10.5, 1.25)), 'count(10.5, 1.25)') 613 self.assertEqual(repr(count(10.5, 1)), 'count(10.5)') # suppress step=1 when it's an int 614 self.assertEqual(repr(count(10.5, 1.00)), 'count(10.5, 1.0)') # do show float values lilke 1.0 615 self.assertEqual(repr(count(10, 1.00)), 'count(10, 1.0)') 616 c = count(10, 1.0) 617 self.assertEqual(type(next(c)), int) 618 self.assertEqual(type(next(c)), float) 619 for i in (-sys.maxsize-5, -sys.maxsize+5 ,-10, -1, 0, 10, sys.maxsize-5, sys.maxsize+5): 620 for j in (-sys.maxsize-5, -sys.maxsize+5 ,-10, -1, 0, 1, 10, sys.maxsize-5, sys.maxsize+5): 621 # Test repr 622 r1 = repr(count(i, j)) 623 if j == 1: 624 r2 = ('count(%r)' % i) 625 else: 626 r2 = ('count(%r, %r)' % (i, j)) 627 self.assertEqual(r1, r2) 628 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 629 self.pickletest(proto, count(i, j)) 630 631 def test_cycle(self): 632 self.assertEqual(take(10, cycle('abc')), list('abcabcabca')) 633 self.assertEqual(list(cycle('')), []) 634 self.assertRaises(TypeError, cycle) 635 self.assertRaises(TypeError, cycle, 5) 636 self.assertEqual(list(islice(cycle(gen3()),10)), [0,1,2,0,1,2,0,1,2,0]) 637 638 # check copy, deepcopy, pickle 639 c = cycle('abc') 640 self.assertEqual(next(c), 'a') 641 #simple copy currently not supported, because __reduce__ returns 642 #an internal iterator 643 #self.assertEqual(take(10, copy.copy(c)), list('bcabcabcab')) 644 self.assertEqual(take(10, copy.deepcopy(c)), list('bcabcabcab')) 645 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 646 self.assertEqual(take(10, pickle.loads(pickle.dumps(c, proto))), 647 list('bcabcabcab')) 648 next(c) 649 self.assertEqual(take(10, pickle.loads(pickle.dumps(c, proto))), 650 list('cabcabcabc')) 651 next(c) 652 next(c) 653 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 654 self.pickletest(proto, cycle('abc')) 655 656 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 657 # test with partial consumed input iterable 658 it = iter('abcde') 659 c = cycle(it) 660 _ = [next(c) for i in range(2)] # consume 2 of 5 inputs 661 p = pickle.dumps(c, proto) 662 d = pickle.loads(p) # rebuild the cycle object 663 self.assertEqual(take(20, d), list('cdeabcdeabcdeabcdeab')) 664 665 # test with completely consumed input iterable 666 it = iter('abcde') 667 c = cycle(it) 668 _ = [next(c) for i in range(7)] # consume 7 of 5 inputs 669 p = pickle.dumps(c, proto) 670 d = pickle.loads(p) # rebuild the cycle object 671 self.assertEqual(take(20, d), list('cdeabcdeabcdeabcdeab')) 672 673 def test_cycle_setstate(self): 674 # Verify both modes for restoring state 675 676 # Mode 0 is efficient. It uses an incompletely consumed input 677 # iterator to build a cycle object and then passes in state with 678 # a list of previously consumed values. There is no data 679 # overlap between the two. 680 c = cycle('defg') 681 c.__setstate__((list('abc'), 0)) 682 self.assertEqual(take(20, c), list('defgabcdefgabcdefgab')) 683 684 # Mode 1 is inefficient. It starts with a cycle object built 685 # from an iterator over the remaining elements in a partial 686 # cycle and then passes in state with all of the previously 687 # seen values (this overlaps values included in the iterator). 688 c = cycle('defg') 689 c.__setstate__((list('abcdefg'), 1)) 690 self.assertEqual(take(20, c), list('defgabcdefgabcdefgab')) 691 692 # The first argument to setstate needs to be a tuple 693 with self.assertRaises(TypeError): 694 cycle('defg').__setstate__([list('abcdefg'), 0]) 695 696 # The first argument in the setstate tuple must be a list 697 with self.assertRaises(TypeError): 698 c = cycle('defg') 699 c.__setstate__((tuple('defg'), 0)) 700 take(20, c) 701 702 # The second argument in the setstate tuple must be an int 703 with self.assertRaises(TypeError): 704 cycle('defg').__setstate__((list('abcdefg'), 'x')) 705 706 self.assertRaises(TypeError, cycle('').__setstate__, ()) 707 self.assertRaises(TypeError, cycle('').__setstate__, ([],)) 708 709 def test_groupby(self): 710 # Check whether it accepts arguments correctly 711 self.assertEqual([], list(groupby([]))) 712 self.assertEqual([], list(groupby([], key=id))) 713 self.assertRaises(TypeError, list, groupby('abc', [])) 714 self.assertRaises(TypeError, groupby, None) 715 self.assertRaises(TypeError, groupby, 'abc', lambda x:x, 10) 716 717 # Check normal input 718 s = [(0, 10, 20), (0, 11,21), (0,12,21), (1,13,21), (1,14,22), 719 (2,15,22), (3,16,23), (3,17,23)] 720 dup = [] 721 for k, g in groupby(s, lambda r:r[0]): 722 for elem in g: 723 self.assertEqual(k, elem[0]) 724 dup.append(elem) 725 self.assertEqual(s, dup) 726 727 # Check normal pickled 728 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 729 dup = [] 730 for k, g in pickle.loads(pickle.dumps(groupby(s, testR), proto)): 731 for elem in g: 732 self.assertEqual(k, elem[0]) 733 dup.append(elem) 734 self.assertEqual(s, dup) 735 736 # Check nested case 737 dup = [] 738 for k, g in groupby(s, testR): 739 for ik, ig in groupby(g, testR2): 740 for elem in ig: 741 self.assertEqual(k, elem[0]) 742 self.assertEqual(ik, elem[2]) 743 dup.append(elem) 744 self.assertEqual(s, dup) 745 746 # Check nested and pickled 747 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 748 dup = [] 749 for k, g in pickle.loads(pickle.dumps(groupby(s, testR), proto)): 750 for ik, ig in pickle.loads(pickle.dumps(groupby(g, testR2), proto)): 751 for elem in ig: 752 self.assertEqual(k, elem[0]) 753 self.assertEqual(ik, elem[2]) 754 dup.append(elem) 755 self.assertEqual(s, dup) 756 757 758 # Check case where inner iterator is not used 759 keys = [k for k, g in groupby(s, testR)] 760 expectedkeys = set([r[0] for r in s]) 761 self.assertEqual(set(keys), expectedkeys) 762 self.assertEqual(len(keys), len(expectedkeys)) 763 764 # Check case where inner iterator is used after advancing the groupby 765 # iterator 766 s = list(zip('AABBBAAAA', range(9))) 767 it = groupby(s, testR) 768 _, g1 = next(it) 769 _, g2 = next(it) 770 _, g3 = next(it) 771 self.assertEqual(list(g1), []) 772 self.assertEqual(list(g2), []) 773 self.assertEqual(next(g3), ('A', 5)) 774 list(it) # exhaust the groupby iterator 775 self.assertEqual(list(g3), []) 776 777 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 778 it = groupby(s, testR) 779 _, g = next(it) 780 next(it) 781 next(it) 782 self.assertEqual(list(pickle.loads(pickle.dumps(g, proto))), []) 783 784 # Exercise pipes and filters style 785 s = 'abracadabra' 786 # sort s | uniq 787 r = [k for k, g in groupby(sorted(s))] 788 self.assertEqual(r, ['a', 'b', 'c', 'd', 'r']) 789 # sort s | uniq -d 790 r = [k for k, g in groupby(sorted(s)) if list(islice(g,1,2))] 791 self.assertEqual(r, ['a', 'b', 'r']) 792 # sort s | uniq -c 793 r = [(len(list(g)), k) for k, g in groupby(sorted(s))] 794 self.assertEqual(r, [(5, 'a'), (2, 'b'), (1, 'c'), (1, 'd'), (2, 'r')]) 795 # sort s | uniq -c | sort -rn | head -3 796 r = sorted([(len(list(g)) , k) for k, g in groupby(sorted(s))], reverse=True)[:3] 797 self.assertEqual(r, [(5, 'a'), (2, 'r'), (2, 'b')]) 798 799 # iter.__next__ failure 800 class ExpectedError(Exception): 801 pass 802 def delayed_raise(n=0): 803 for i in range(n): 804 yield 'yo' 805 raise ExpectedError 806 def gulp(iterable, keyp=None, func=list): 807 return [func(g) for k, g in groupby(iterable, keyp)] 808 809 # iter.__next__ failure on outer object 810 self.assertRaises(ExpectedError, gulp, delayed_raise(0)) 811 # iter.__next__ failure on inner object 812 self.assertRaises(ExpectedError, gulp, delayed_raise(1)) 813 814 # __eq__ failure 815 class DummyCmp: 816 def __eq__(self, dst): 817 raise ExpectedError 818 s = [DummyCmp(), DummyCmp(), None] 819 820 # __eq__ failure on outer object 821 self.assertRaises(ExpectedError, gulp, s, func=id) 822 # __eq__ failure on inner object 823 self.assertRaises(ExpectedError, gulp, s) 824 825 # keyfunc failure 826 def keyfunc(obj): 827 if keyfunc.skip > 0: 828 keyfunc.skip -= 1 829 return obj 830 else: 831 raise ExpectedError 832 833 # keyfunc failure on outer object 834 keyfunc.skip = 0 835 self.assertRaises(ExpectedError, gulp, [None], keyfunc) 836 keyfunc.skip = 1 837 self.assertRaises(ExpectedError, gulp, [None, None], keyfunc) 838 839 def test_filter(self): 840 self.assertEqual(list(filter(isEven, range(6))), [0,2,4]) 841 self.assertEqual(list(filter(None, [0,1,0,2,0])), [1,2]) 842 self.assertEqual(list(filter(bool, [0,1,0,2,0])), [1,2]) 843 self.assertEqual(take(4, filter(isEven, count())), [0,2,4,6]) 844 self.assertRaises(TypeError, filter) 845 self.assertRaises(TypeError, filter, lambda x:x) 846 self.assertRaises(TypeError, filter, lambda x:x, range(6), 7) 847 self.assertRaises(TypeError, filter, isEven, 3) 848 self.assertRaises(TypeError, next, filter(range(6), range(6))) 849 850 # check copy, deepcopy, pickle 851 ans = [0,2,4] 852 853 c = filter(isEven, range(6)) 854 self.assertEqual(list(copy.copy(c)), ans) 855 c = filter(isEven, range(6)) 856 self.assertEqual(list(copy.deepcopy(c)), ans) 857 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 858 c = filter(isEven, range(6)) 859 self.assertEqual(list(pickle.loads(pickle.dumps(c, proto))), ans) 860 next(c) 861 self.assertEqual(list(pickle.loads(pickle.dumps(c, proto))), ans[1:]) 862 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 863 c = filter(isEven, range(6)) 864 self.pickletest(proto, c) 865 866 def test_filterfalse(self): 867 self.assertEqual(list(filterfalse(isEven, range(6))), [1,3,5]) 868 self.assertEqual(list(filterfalse(None, [0,1,0,2,0])), [0,0,0]) 869 self.assertEqual(list(filterfalse(bool, [0,1,0,2,0])), [0,0,0]) 870 self.assertEqual(take(4, filterfalse(isEven, count())), [1,3,5,7]) 871 self.assertRaises(TypeError, filterfalse) 872 self.assertRaises(TypeError, filterfalse, lambda x:x) 873 self.assertRaises(TypeError, filterfalse, lambda x:x, range(6), 7) 874 self.assertRaises(TypeError, filterfalse, isEven, 3) 875 self.assertRaises(TypeError, next, filterfalse(range(6), range(6))) 876 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 877 self.pickletest(proto, filterfalse(isEven, range(6))) 878 879 def test_zip(self): 880 # XXX This is rather silly now that builtin zip() calls zip()... 881 ans = [(x,y) for x, y in zip('abc',count())] 882 self.assertEqual(ans, [('a', 0), ('b', 1), ('c', 2)]) 883 self.assertEqual(list(zip('abc', range(6))), lzip('abc', range(6))) 884 self.assertEqual(list(zip('abcdef', range(3))), lzip('abcdef', range(3))) 885 self.assertEqual(take(3,zip('abcdef', count())), lzip('abcdef', range(3))) 886 self.assertEqual(list(zip('abcdef')), lzip('abcdef')) 887 self.assertEqual(list(zip()), lzip()) 888 self.assertRaises(TypeError, zip, 3) 889 self.assertRaises(TypeError, zip, range(3), 3) 890 self.assertEqual([tuple(list(pair)) for pair in zip('abc', 'def')], 891 lzip('abc', 'def')) 892 self.assertEqual([pair for pair in zip('abc', 'def')], 893 lzip('abc', 'def')) 894 895 @support.impl_detail("tuple reuse is specific to CPython") 896 def test_zip_tuple_reuse(self): 897 ids = list(map(id, zip('abc', 'def'))) 898 self.assertEqual(min(ids), max(ids)) 899 ids = list(map(id, list(zip('abc', 'def')))) 900 self.assertEqual(len(dict.fromkeys(ids)), len(ids)) 901 902 # check copy, deepcopy, pickle 903 ans = [(x,y) for x, y in copy.copy(zip('abc',count()))] 904 self.assertEqual(ans, [('a', 0), ('b', 1), ('c', 2)]) 905 906 ans = [(x,y) for x, y in copy.deepcopy(zip('abc',count()))] 907 self.assertEqual(ans, [('a', 0), ('b', 1), ('c', 2)]) 908 909 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 910 ans = [(x,y) for x, y in pickle.loads(pickle.dumps(zip('abc',count()), proto))] 911 self.assertEqual(ans, [('a', 0), ('b', 1), ('c', 2)]) 912 913 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 914 testIntermediate = zip('abc',count()) 915 next(testIntermediate) 916 ans = [(x,y) for x, y in pickle.loads(pickle.dumps(testIntermediate, proto))] 917 self.assertEqual(ans, [('b', 1), ('c', 2)]) 918 919 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 920 self.pickletest(proto, zip('abc', count())) 921 922 def test_ziplongest(self): 923 for args in [ 924 ['abc', range(6)], 925 [range(6), 'abc'], 926 [range(1000), range(2000,2100), range(3000,3050)], 927 [range(1000), range(0), range(3000,3050), range(1200), range(1500)], 928 [range(1000), range(0), range(3000,3050), range(1200), range(1500), range(0)], 929 ]: 930 target = [tuple([arg[i] if i < len(arg) else None for arg in args]) 931 for i in range(max(map(len, args)))] 932 self.assertEqual(list(zip_longest(*args)), target) 933 self.assertEqual(list(zip_longest(*args, **{})), target) 934 target = [tuple((e is None and 'X' or e) for e in t) for t in target] # Replace None fills with 'X' 935 self.assertEqual(list(zip_longest(*args, **dict(fillvalue='X'))), target) 936 937 self.assertEqual(take(3,zip_longest('abcdef', count())), list(zip('abcdef', range(3)))) # take 3 from infinite input 938 939 self.assertEqual(list(zip_longest()), list(zip())) 940 self.assertEqual(list(zip_longest([])), list(zip([]))) 941 self.assertEqual(list(zip_longest('abcdef')), list(zip('abcdef'))) 942 943 self.assertEqual(list(zip_longest('abc', 'defg', **{})), 944 list(zip(list('abc')+[None], 'defg'))) # empty keyword dict 945 self.assertRaises(TypeError, zip_longest, 3) 946 self.assertRaises(TypeError, zip_longest, range(3), 3) 947 948 for stmt in [ 949 "zip_longest('abc', fv=1)", 950 "zip_longest('abc', fillvalue=1, bogus_keyword=None)", 951 ]: 952 try: 953 eval(stmt, globals(), locals()) 954 except TypeError: 955 pass 956 else: 957 self.fail('Did not raise Type in: ' + stmt) 958 959 self.assertEqual([tuple(list(pair)) for pair in zip_longest('abc', 'def')], 960 list(zip('abc', 'def'))) 961 self.assertEqual([pair for pair in zip_longest('abc', 'def')], 962 list(zip('abc', 'def'))) 963 964 @support.impl_detail("tuple reuse is specific to CPython") 965 def test_zip_longest_tuple_reuse(self): 966 ids = list(map(id, zip_longest('abc', 'def'))) 967 self.assertEqual(min(ids), max(ids)) 968 ids = list(map(id, list(zip_longest('abc', 'def')))) 969 self.assertEqual(len(dict.fromkeys(ids)), len(ids)) 970 971 def test_zip_longest_pickling(self): 972 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 973 self.pickletest(proto, zip_longest("abc", "def")) 974 self.pickletest(proto, zip_longest("abc", "defgh")) 975 self.pickletest(proto, zip_longest("abc", "defgh", fillvalue=1)) 976 self.pickletest(proto, zip_longest("", "defgh")) 977 978 def test_zip_longest_bad_iterable(self): 979 exception = TypeError() 980 981 class BadIterable: 982 def __iter__(self): 983 raise exception 984 985 with self.assertRaises(TypeError) as cm: 986 zip_longest(BadIterable()) 987 988 self.assertIs(cm.exception, exception) 989 990 def test_bug_7244(self): 991 992 class Repeater: 993 # this class is similar to itertools.repeat 994 def __init__(self, o, t, e): 995 self.o = o 996 self.t = int(t) 997 self.e = e 998 def __iter__(self): # its iterator is itself 999 return self 1000 def __next__(self): 1001 if self.t > 0: 1002 self.t -= 1 1003 return self.o 1004 else: 1005 raise self.e 1006 1007 # Formerly this code in would fail in debug mode 1008 # with Undetected Error and Stop Iteration 1009 r1 = Repeater(1, 3, StopIteration) 1010 r2 = Repeater(2, 4, StopIteration) 1011 def run(r1, r2): 1012 result = [] 1013 for i, j in zip_longest(r1, r2, fillvalue=0): 1014 with support.captured_output('stdout'): 1015 print((i, j)) 1016 result.append((i, j)) 1017 return result 1018 self.assertEqual(run(r1, r2), [(1,2), (1,2), (1,2), (0,2)]) 1019 1020 # Formerly, the RuntimeError would be lost 1021 # and StopIteration would stop as expected 1022 r1 = Repeater(1, 3, RuntimeError) 1023 r2 = Repeater(2, 4, StopIteration) 1024 it = zip_longest(r1, r2, fillvalue=0) 1025 self.assertEqual(next(it), (1, 2)) 1026 self.assertEqual(next(it), (1, 2)) 1027 self.assertEqual(next(it), (1, 2)) 1028 self.assertRaises(RuntimeError, next, it) 1029 1030 def test_pairwise(self): 1031 self.assertEqual(list(pairwise('')), []) 1032 self.assertEqual(list(pairwise('a')), []) 1033 self.assertEqual(list(pairwise('ab')), 1034 [('a', 'b')]), 1035 self.assertEqual(list(pairwise('abcde')), 1036 [('a', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e')]) 1037 self.assertEqual(list(pairwise(range(10_000))), 1038 list(zip(range(10_000), range(1, 10_000)))) 1039 1040 with self.assertRaises(TypeError): 1041 pairwise() # too few arguments 1042 with self.assertRaises(TypeError): 1043 pairwise('abc', 10) # too many arguments 1044 with self.assertRaises(TypeError): 1045 pairwise(iterable='abc') # keyword arguments 1046 with self.assertRaises(TypeError): 1047 pairwise(None) # non-iterable argument 1048 1049 def test_product(self): 1050 for args, result in [ 1051 ([], [()]), # zero iterables 1052 (['ab'], [('a',), ('b',)]), # one iterable 1053 ([range(2), range(3)], [(0,0), (0,1), (0,2), (1,0), (1,1), (1,2)]), # two iterables 1054 ([range(0), range(2), range(3)], []), # first iterable with zero length 1055 ([range(2), range(0), range(3)], []), # middle iterable with zero length 1056 ([range(2), range(3), range(0)], []), # last iterable with zero length 1057 ]: 1058 self.assertEqual(list(product(*args)), result) 1059 for r in range(4): 1060 self.assertEqual(list(product(*(args*r))), 1061 list(product(*args, **dict(repeat=r)))) 1062 self.assertEqual(len(list(product(*[range(7)]*6))), 7**6) 1063 self.assertRaises(TypeError, product, range(6), None) 1064 1065 def product1(*args, **kwds): 1066 pools = list(map(tuple, args)) * kwds.get('repeat', 1) 1067 n = len(pools) 1068 if n == 0: 1069 yield () 1070 return 1071 if any(len(pool) == 0 for pool in pools): 1072 return 1073 indices = [0] * n 1074 yield tuple(pool[i] for pool, i in zip(pools, indices)) 1075 while 1: 1076 for i in reversed(range(n)): # right to left 1077 if indices[i] == len(pools[i]) - 1: 1078 continue 1079 indices[i] += 1 1080 for j in range(i+1, n): 1081 indices[j] = 0 1082 yield tuple(pool[i] for pool, i in zip(pools, indices)) 1083 break 1084 else: 1085 return 1086 1087 def product2(*args, **kwds): 1088 'Pure python version used in docs' 1089 pools = list(map(tuple, args)) * kwds.get('repeat', 1) 1090 result = [[]] 1091 for pool in pools: 1092 result = [x+[y] for x in result for y in pool] 1093 for prod in result: 1094 yield tuple(prod) 1095 1096 argtypes = ['', 'abc', '', range(0), range(4), dict(a=1, b=2, c=3), 1097 set('abcdefg'), range(11), tuple(range(13))] 1098 for i in range(100): 1099 args = [random.choice(argtypes) for j in range(random.randrange(5))] 1100 expected_len = prod(map(len, args)) 1101 self.assertEqual(len(list(product(*args))), expected_len) 1102 self.assertEqual(list(product(*args)), list(product1(*args))) 1103 self.assertEqual(list(product(*args)), list(product2(*args))) 1104 args = map(iter, args) 1105 self.assertEqual(len(list(product(*args))), expected_len) 1106 1107 @support.bigaddrspacetest 1108 def test_product_overflow(self): 1109 with self.assertRaises((OverflowError, MemoryError)): 1110 product(*(['ab']*2**5), repeat=2**25) 1111 1112 @support.impl_detail("tuple reuse is specific to CPython") 1113 def test_product_tuple_reuse(self): 1114 self.assertEqual(len(set(map(id, product('abc', 'def')))), 1) 1115 self.assertNotEqual(len(set(map(id, list(product('abc', 'def'))))), 1) 1116 1117 def test_product_pickling(self): 1118 # check copy, deepcopy, pickle 1119 for args, result in [ 1120 ([], [()]), # zero iterables 1121 (['ab'], [('a',), ('b',)]), # one iterable 1122 ([range(2), range(3)], [(0,0), (0,1), (0,2), (1,0), (1,1), (1,2)]), # two iterables 1123 ([range(0), range(2), range(3)], []), # first iterable with zero length 1124 ([range(2), range(0), range(3)], []), # middle iterable with zero length 1125 ([range(2), range(3), range(0)], []), # last iterable with zero length 1126 ]: 1127 self.assertEqual(list(copy.copy(product(*args))), result) 1128 self.assertEqual(list(copy.deepcopy(product(*args))), result) 1129 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1130 self.pickletest(proto, product(*args)) 1131 1132 def test_product_issue_25021(self): 1133 # test that indices are properly clamped to the length of the tuples 1134 p = product((1, 2),(3,)) 1135 p.__setstate__((0, 0x1000)) # will access tuple element 1 if not clamped 1136 self.assertEqual(next(p), (2, 3)) 1137 # test that empty tuple in the list will result in an immediate StopIteration 1138 p = product((1, 2), (), (3,)) 1139 p.__setstate__((0, 0, 0x1000)) # will access tuple element 1 if not clamped 1140 self.assertRaises(StopIteration, next, p) 1141 1142 def test_repeat(self): 1143 self.assertEqual(list(repeat(object='a', times=3)), ['a', 'a', 'a']) 1144 self.assertEqual(lzip(range(3),repeat('a')), 1145 [(0, 'a'), (1, 'a'), (2, 'a')]) 1146 self.assertEqual(list(repeat('a', 3)), ['a', 'a', 'a']) 1147 self.assertEqual(take(3, repeat('a')), ['a', 'a', 'a']) 1148 self.assertEqual(list(repeat('a', 0)), []) 1149 self.assertEqual(list(repeat('a', -3)), []) 1150 self.assertRaises(TypeError, repeat) 1151 self.assertRaises(TypeError, repeat, None, 3, 4) 1152 self.assertRaises(TypeError, repeat, None, 'a') 1153 r = repeat(1+0j) 1154 self.assertEqual(repr(r), 'repeat((1+0j))') 1155 r = repeat(1+0j, 5) 1156 self.assertEqual(repr(r), 'repeat((1+0j), 5)') 1157 list(r) 1158 self.assertEqual(repr(r), 'repeat((1+0j), 0)') 1159 1160 # check copy, deepcopy, pickle 1161 c = repeat(object='a', times=10) 1162 self.assertEqual(next(c), 'a') 1163 self.assertEqual(take(2, copy.copy(c)), list('a' * 2)) 1164 self.assertEqual(take(2, copy.deepcopy(c)), list('a' * 2)) 1165 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1166 self.pickletest(proto, repeat(object='a', times=10)) 1167 1168 def test_repeat_with_negative_times(self): 1169 self.assertEqual(repr(repeat('a', -1)), "repeat('a', 0)") 1170 self.assertEqual(repr(repeat('a', -2)), "repeat('a', 0)") 1171 self.assertEqual(repr(repeat('a', times=-1)), "repeat('a', 0)") 1172 self.assertEqual(repr(repeat('a', times=-2)), "repeat('a', 0)") 1173 1174 def test_map(self): 1175 self.assertEqual(list(map(operator.pow, range(3), range(1,7))), 1176 [0**1, 1**2, 2**3]) 1177 self.assertEqual(list(map(tupleize, 'abc', range(5))), 1178 [('a',0),('b',1),('c',2)]) 1179 self.assertEqual(list(map(tupleize, 'abc', count())), 1180 [('a',0),('b',1),('c',2)]) 1181 self.assertEqual(take(2,map(tupleize, 'abc', count())), 1182 [('a',0),('b',1)]) 1183 self.assertEqual(list(map(operator.pow, [])), []) 1184 self.assertRaises(TypeError, map) 1185 self.assertRaises(TypeError, list, map(None, range(3), range(3))) 1186 self.assertRaises(TypeError, map, operator.neg) 1187 self.assertRaises(TypeError, next, map(10, range(5))) 1188 self.assertRaises(ValueError, next, map(errfunc, [4], [5])) 1189 self.assertRaises(TypeError, next, map(onearg, [4], [5])) 1190 1191 # check copy, deepcopy, pickle 1192 ans = [('a',0),('b',1),('c',2)] 1193 1194 c = map(tupleize, 'abc', count()) 1195 self.assertEqual(list(copy.copy(c)), ans) 1196 1197 c = map(tupleize, 'abc', count()) 1198 self.assertEqual(list(copy.deepcopy(c)), ans) 1199 1200 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1201 c = map(tupleize, 'abc', count()) 1202 self.pickletest(proto, c) 1203 1204 def test_starmap(self): 1205 self.assertEqual(list(starmap(operator.pow, zip(range(3), range(1,7)))), 1206 [0**1, 1**2, 2**3]) 1207 self.assertEqual(take(3, starmap(operator.pow, zip(count(), count(1)))), 1208 [0**1, 1**2, 2**3]) 1209 self.assertEqual(list(starmap(operator.pow, [])), []) 1210 self.assertEqual(list(starmap(operator.pow, [iter([4,5])])), [4**5]) 1211 self.assertRaises(TypeError, list, starmap(operator.pow, [None])) 1212 self.assertRaises(TypeError, starmap) 1213 self.assertRaises(TypeError, starmap, operator.pow, [(4,5)], 'extra') 1214 self.assertRaises(TypeError, next, starmap(10, [(4,5)])) 1215 self.assertRaises(ValueError, next, starmap(errfunc, [(4,5)])) 1216 self.assertRaises(TypeError, next, starmap(onearg, [(4,5)])) 1217 1218 # check copy, deepcopy, pickle 1219 ans = [0**1, 1**2, 2**3] 1220 1221 c = starmap(operator.pow, zip(range(3), range(1,7))) 1222 self.assertEqual(list(copy.copy(c)), ans) 1223 1224 c = starmap(operator.pow, zip(range(3), range(1,7))) 1225 self.assertEqual(list(copy.deepcopy(c)), ans) 1226 1227 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1228 c = starmap(operator.pow, zip(range(3), range(1,7))) 1229 self.pickletest(proto, c) 1230 1231 def test_islice(self): 1232 for args in [ # islice(args) should agree with range(args) 1233 (10, 20, 3), 1234 (10, 3, 20), 1235 (10, 20), 1236 (10, 10), 1237 (10, 3), 1238 (20,) 1239 ]: 1240 self.assertEqual(list(islice(range(100), *args)), 1241 list(range(*args))) 1242 1243 for args, tgtargs in [ # Stop when seqn is exhausted 1244 ((10, 110, 3), ((10, 100, 3))), 1245 ((10, 110), ((10, 100))), 1246 ((110,), (100,)) 1247 ]: 1248 self.assertEqual(list(islice(range(100), *args)), 1249 list(range(*tgtargs))) 1250 1251 # Test stop=None 1252 self.assertEqual(list(islice(range(10), None)), list(range(10))) 1253 self.assertEqual(list(islice(range(10), None, None)), list(range(10))) 1254 self.assertEqual(list(islice(range(10), None, None, None)), list(range(10))) 1255 self.assertEqual(list(islice(range(10), 2, None)), list(range(2, 10))) 1256 self.assertEqual(list(islice(range(10), 1, None, 2)), list(range(1, 10, 2))) 1257 1258 # Test number of items consumed SF #1171417 1259 it = iter(range(10)) 1260 self.assertEqual(list(islice(it, 3)), list(range(3))) 1261 self.assertEqual(list(it), list(range(3, 10))) 1262 1263 it = iter(range(10)) 1264 self.assertEqual(list(islice(it, 3, 3)), []) 1265 self.assertEqual(list(it), list(range(3, 10))) 1266 1267 # Test invalid arguments 1268 ra = range(10) 1269 self.assertRaises(TypeError, islice, ra) 1270 self.assertRaises(TypeError, islice, ra, 1, 2, 3, 4) 1271 self.assertRaises(ValueError, islice, ra, -5, 10, 1) 1272 self.assertRaises(ValueError, islice, ra, 1, -5, -1) 1273 self.assertRaises(ValueError, islice, ra, 1, 10, -1) 1274 self.assertRaises(ValueError, islice, ra, 1, 10, 0) 1275 self.assertRaises(ValueError, islice, ra, 'a') 1276 self.assertRaises(ValueError, islice, ra, 'a', 1) 1277 self.assertRaises(ValueError, islice, ra, 1, 'a') 1278 self.assertRaises(ValueError, islice, ra, 'a', 1, 1) 1279 self.assertRaises(ValueError, islice, ra, 1, 'a', 1) 1280 self.assertEqual(len(list(islice(count(), 1, 10, maxsize))), 1) 1281 1282 # Issue #10323: Less islice in a predictable state 1283 c = count() 1284 self.assertEqual(list(islice(c, 1, 3, 50)), [1]) 1285 self.assertEqual(next(c), 3) 1286 1287 # check copy, deepcopy, pickle 1288 for args in [ # islice(args) should agree with range(args) 1289 (10, 20, 3), 1290 (10, 3, 20), 1291 (10, 20), 1292 (10, 3), 1293 (20,) 1294 ]: 1295 self.assertEqual(list(copy.copy(islice(range(100), *args))), 1296 list(range(*args))) 1297 self.assertEqual(list(copy.deepcopy(islice(range(100), *args))), 1298 list(range(*args))) 1299 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1300 self.pickletest(proto, islice(range(100), *args)) 1301 1302 # Issue #21321: check source iterator is not referenced 1303 # from islice() after the latter has been exhausted 1304 it = (x for x in (1, 2)) 1305 wr = weakref.ref(it) 1306 it = islice(it, 1) 1307 self.assertIsNotNone(wr()) 1308 list(it) # exhaust the iterator 1309 support.gc_collect() 1310 self.assertIsNone(wr()) 1311 1312 # Issue #30537: islice can accept integer-like objects as 1313 # arguments 1314 class IntLike(object): 1315 def __init__(self, val): 1316 self.val = val 1317 def __index__(self): 1318 return self.val 1319 self.assertEqual(list(islice(range(100), IntLike(10))), list(range(10))) 1320 self.assertEqual(list(islice(range(100), IntLike(10), IntLike(50))), 1321 list(range(10, 50))) 1322 self.assertEqual(list(islice(range(100), IntLike(10), IntLike(50), IntLike(5))), 1323 list(range(10,50,5))) 1324 1325 def test_takewhile(self): 1326 data = [1, 3, 5, 20, 2, 4, 6, 8] 1327 self.assertEqual(list(takewhile(underten, data)), [1, 3, 5]) 1328 self.assertEqual(list(takewhile(underten, [])), []) 1329 self.assertRaises(TypeError, takewhile) 1330 self.assertRaises(TypeError, takewhile, operator.pow) 1331 self.assertRaises(TypeError, takewhile, operator.pow, [(4,5)], 'extra') 1332 self.assertRaises(TypeError, next, takewhile(10, [(4,5)])) 1333 self.assertRaises(ValueError, next, takewhile(errfunc, [(4,5)])) 1334 t = takewhile(bool, [1, 1, 1, 0, 0, 0]) 1335 self.assertEqual(list(t), [1, 1, 1]) 1336 self.assertRaises(StopIteration, next, t) 1337 1338 # check copy, deepcopy, pickle 1339 self.assertEqual(list(copy.copy(takewhile(underten, data))), [1, 3, 5]) 1340 self.assertEqual(list(copy.deepcopy(takewhile(underten, data))), 1341 [1, 3, 5]) 1342 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1343 self.pickletest(proto, takewhile(underten, data)) 1344 1345 def test_dropwhile(self): 1346 data = [1, 3, 5, 20, 2, 4, 6, 8] 1347 self.assertEqual(list(dropwhile(underten, data)), [20, 2, 4, 6, 8]) 1348 self.assertEqual(list(dropwhile(underten, [])), []) 1349 self.assertRaises(TypeError, dropwhile) 1350 self.assertRaises(TypeError, dropwhile, operator.pow) 1351 self.assertRaises(TypeError, dropwhile, operator.pow, [(4,5)], 'extra') 1352 self.assertRaises(TypeError, next, dropwhile(10, [(4,5)])) 1353 self.assertRaises(ValueError, next, dropwhile(errfunc, [(4,5)])) 1354 1355 # check copy, deepcopy, pickle 1356 self.assertEqual(list(copy.copy(dropwhile(underten, data))), [20, 2, 4, 6, 8]) 1357 self.assertEqual(list(copy.deepcopy(dropwhile(underten, data))), 1358 [20, 2, 4, 6, 8]) 1359 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1360 self.pickletest(proto, dropwhile(underten, data)) 1361 1362 def test_tee(self): 1363 n = 200 1364 1365 a, b = tee([]) # test empty iterator 1366 self.assertEqual(list(a), []) 1367 self.assertEqual(list(b), []) 1368 1369 a, b = tee(irange(n)) # test 100% interleaved 1370 self.assertEqual(lzip(a,b), lzip(range(n), range(n))) 1371 1372 a, b = tee(irange(n)) # test 0% interleaved 1373 self.assertEqual(list(a), list(range(n))) 1374 self.assertEqual(list(b), list(range(n))) 1375 1376 a, b = tee(irange(n)) # test dealloc of leading iterator 1377 for i in range(100): 1378 self.assertEqual(next(a), i) 1379 del a 1380 self.assertEqual(list(b), list(range(n))) 1381 1382 a, b = tee(irange(n)) # test dealloc of trailing iterator 1383 for i in range(100): 1384 self.assertEqual(next(a), i) 1385 del b 1386 self.assertEqual(list(a), list(range(100, n))) 1387 1388 for j in range(5): # test randomly interleaved 1389 order = [0]*n + [1]*n 1390 random.shuffle(order) 1391 lists = ([], []) 1392 its = tee(irange(n)) 1393 for i in order: 1394 value = next(its[i]) 1395 lists[i].append(value) 1396 self.assertEqual(lists[0], list(range(n))) 1397 self.assertEqual(lists[1], list(range(n))) 1398 1399 # test argument format checking 1400 self.assertRaises(TypeError, tee) 1401 self.assertRaises(TypeError, tee, 3) 1402 self.assertRaises(TypeError, tee, [1,2], 'x') 1403 self.assertRaises(TypeError, tee, [1,2], 3, 'x') 1404 1405 # tee object should be instantiable 1406 a, b = tee('abc') 1407 c = type(a)('def') 1408 self.assertEqual(list(c), list('def')) 1409 1410 # test long-lagged and multi-way split 1411 a, b, c = tee(range(2000), 3) 1412 for i in range(100): 1413 self.assertEqual(next(a), i) 1414 self.assertEqual(list(b), list(range(2000))) 1415 self.assertEqual([next(c), next(c)], list(range(2))) 1416 self.assertEqual(list(a), list(range(100,2000))) 1417 self.assertEqual(list(c), list(range(2,2000))) 1418 1419 # test values of n 1420 self.assertRaises(TypeError, tee, 'abc', 'invalid') 1421 self.assertRaises(ValueError, tee, [], -1) 1422 for n in range(5): 1423 result = tee('abc', n) 1424 self.assertEqual(type(result), tuple) 1425 self.assertEqual(len(result), n) 1426 self.assertEqual([list(x) for x in result], [list('abc')]*n) 1427 1428 # tee pass-through to copyable iterator 1429 a, b = tee('abc') 1430 c, d = tee(a) 1431 self.assertTrue(a is c) 1432 1433 # test tee_new 1434 t1, t2 = tee('abc') 1435 tnew = type(t1) 1436 self.assertRaises(TypeError, tnew) 1437 self.assertRaises(TypeError, tnew, 10) 1438 t3 = tnew(t1) 1439 self.assertTrue(list(t1) == list(t2) == list(t3) == list('abc')) 1440 1441 # test that tee objects are weak referencable 1442 a, b = tee(range(10)) 1443 p = weakref.proxy(a) 1444 self.assertEqual(getattr(p, '__class__'), type(b)) 1445 del a 1446 support.gc_collect() # For PyPy or other GCs. 1447 self.assertRaises(ReferenceError, getattr, p, '__class__') 1448 1449 ans = list('abc') 1450 long_ans = list(range(10000)) 1451 1452 # check copy 1453 a, b = tee('abc') 1454 self.assertEqual(list(copy.copy(a)), ans) 1455 self.assertEqual(list(copy.copy(b)), ans) 1456 a, b = tee(list(range(10000))) 1457 self.assertEqual(list(copy.copy(a)), long_ans) 1458 self.assertEqual(list(copy.copy(b)), long_ans) 1459 1460 # check partially consumed copy 1461 a, b = tee('abc') 1462 take(2, a) 1463 take(1, b) 1464 self.assertEqual(list(copy.copy(a)), ans[2:]) 1465 self.assertEqual(list(copy.copy(b)), ans[1:]) 1466 self.assertEqual(list(a), ans[2:]) 1467 self.assertEqual(list(b), ans[1:]) 1468 a, b = tee(range(10000)) 1469 take(100, a) 1470 take(60, b) 1471 self.assertEqual(list(copy.copy(a)), long_ans[100:]) 1472 self.assertEqual(list(copy.copy(b)), long_ans[60:]) 1473 self.assertEqual(list(a), long_ans[100:]) 1474 self.assertEqual(list(b), long_ans[60:]) 1475 1476 # check deepcopy 1477 a, b = tee('abc') 1478 self.assertEqual(list(copy.deepcopy(a)), ans) 1479 self.assertEqual(list(copy.deepcopy(b)), ans) 1480 self.assertEqual(list(a), ans) 1481 self.assertEqual(list(b), ans) 1482 a, b = tee(range(10000)) 1483 self.assertEqual(list(copy.deepcopy(a)), long_ans) 1484 self.assertEqual(list(copy.deepcopy(b)), long_ans) 1485 self.assertEqual(list(a), long_ans) 1486 self.assertEqual(list(b), long_ans) 1487 1488 # check partially consumed deepcopy 1489 a, b = tee('abc') 1490 take(2, a) 1491 take(1, b) 1492 self.assertEqual(list(copy.deepcopy(a)), ans[2:]) 1493 self.assertEqual(list(copy.deepcopy(b)), ans[1:]) 1494 self.assertEqual(list(a), ans[2:]) 1495 self.assertEqual(list(b), ans[1:]) 1496 a, b = tee(range(10000)) 1497 take(100, a) 1498 take(60, b) 1499 self.assertEqual(list(copy.deepcopy(a)), long_ans[100:]) 1500 self.assertEqual(list(copy.deepcopy(b)), long_ans[60:]) 1501 self.assertEqual(list(a), long_ans[100:]) 1502 self.assertEqual(list(b), long_ans[60:]) 1503 1504 # check pickle 1505 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1506 self.pickletest(proto, iter(tee('abc'))) 1507 a, b = tee('abc') 1508 self.pickletest(proto, a, compare=ans) 1509 self.pickletest(proto, b, compare=ans) 1510 1511 # Issue 13454: Crash when deleting backward iterator from tee() 1512 def test_tee_del_backward(self): 1513 forward, backward = tee(repeat(None, 20000000)) 1514 try: 1515 any(forward) # exhaust the iterator 1516 del backward 1517 except: 1518 del forward, backward 1519 raise 1520 1521 def test_tee_reenter(self): 1522 class I: 1523 first = True 1524 def __iter__(self): 1525 return self 1526 def __next__(self): 1527 first = self.first 1528 self.first = False 1529 if first: 1530 return next(b) 1531 1532 a, b = tee(I()) 1533 with self.assertRaisesRegex(RuntimeError, "tee"): 1534 next(a) 1535 1536 def test_tee_concurrent(self): 1537 start = threading.Event() 1538 finish = threading.Event() 1539 class I: 1540 def __iter__(self): 1541 return self 1542 def __next__(self): 1543 start.set() 1544 finish.wait() 1545 1546 a, b = tee(I()) 1547 thread = threading.Thread(target=next, args=[a]) 1548 thread.start() 1549 try: 1550 start.wait() 1551 with self.assertRaisesRegex(RuntimeError, "tee"): 1552 next(b) 1553 finally: 1554 finish.set() 1555 thread.join() 1556 1557 def test_StopIteration(self): 1558 self.assertRaises(StopIteration, next, zip()) 1559 1560 for f in (chain, cycle, zip, groupby): 1561 self.assertRaises(StopIteration, next, f([])) 1562 self.assertRaises(StopIteration, next, f(StopNow())) 1563 1564 self.assertRaises(StopIteration, next, islice([], None)) 1565 self.assertRaises(StopIteration, next, islice(StopNow(), None)) 1566 1567 p, q = tee([]) 1568 self.assertRaises(StopIteration, next, p) 1569 self.assertRaises(StopIteration, next, q) 1570 p, q = tee(StopNow()) 1571 self.assertRaises(StopIteration, next, p) 1572 self.assertRaises(StopIteration, next, q) 1573 1574 self.assertRaises(StopIteration, next, repeat(None, 0)) 1575 1576 for f in (filter, filterfalse, map, takewhile, dropwhile, starmap): 1577 self.assertRaises(StopIteration, next, f(lambda x:x, [])) 1578 self.assertRaises(StopIteration, next, f(lambda x:x, StopNow())) 1579 1580 @support.cpython_only 1581 def test_combinations_result_gc(self): 1582 # bpo-42536: combinations's tuple-reuse speed trick breaks the GC's 1583 # assumptions about what can be untracked. Make sure we re-track result 1584 # tuples whenever we reuse them. 1585 it = combinations([None, []], 1) 1586 next(it) 1587 gc.collect() 1588 # That GC collection probably untracked the recycled internal result 1589 # tuple, which has the value (None,). Make sure it's re-tracked when 1590 # it's mutated and returned from __next__: 1591 self.assertTrue(gc.is_tracked(next(it))) 1592 1593 @support.cpython_only 1594 def test_combinations_with_replacement_result_gc(self): 1595 # Ditto for combinations_with_replacement. 1596 it = combinations_with_replacement([None, []], 1) 1597 next(it) 1598 gc.collect() 1599 self.assertTrue(gc.is_tracked(next(it))) 1600 1601 @support.cpython_only 1602 def test_permutations_result_gc(self): 1603 # Ditto for permutations. 1604 it = permutations([None, []], 1) 1605 next(it) 1606 gc.collect() 1607 self.assertTrue(gc.is_tracked(next(it))) 1608 1609 @support.cpython_only 1610 def test_product_result_gc(self): 1611 # Ditto for product. 1612 it = product([None, []]) 1613 next(it) 1614 gc.collect() 1615 self.assertTrue(gc.is_tracked(next(it))) 1616 1617 @support.cpython_only 1618 def test_zip_longest_result_gc(self): 1619 # Ditto for zip_longest. 1620 it = zip_longest([[]]) 1621 gc.collect() 1622 self.assertTrue(gc.is_tracked(next(it))) 1623 1624 1625class TestExamples(unittest.TestCase): 1626 1627 def test_accumulate(self): 1628 self.assertEqual(list(accumulate([1,2,3,4,5])), [1, 3, 6, 10, 15]) 1629 1630 def test_accumulate_reducible(self): 1631 # check copy, deepcopy, pickle 1632 data = [1, 2, 3, 4, 5] 1633 accumulated = [1, 3, 6, 10, 15] 1634 1635 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1636 it = accumulate(data) 1637 self.assertEqual(list(pickle.loads(pickle.dumps(it, proto))), accumulated[:]) 1638 self.assertEqual(next(it), 1) 1639 self.assertEqual(list(pickle.loads(pickle.dumps(it, proto))), accumulated[1:]) 1640 it = accumulate(data) 1641 self.assertEqual(next(it), 1) 1642 self.assertEqual(list(copy.deepcopy(it)), accumulated[1:]) 1643 self.assertEqual(list(copy.copy(it)), accumulated[1:]) 1644 1645 def test_accumulate_reducible_none(self): 1646 # Issue #25718: total is None 1647 it = accumulate([None, None, None], operator.is_) 1648 self.assertEqual(next(it), None) 1649 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1650 it_copy = pickle.loads(pickle.dumps(it, proto)) 1651 self.assertEqual(list(it_copy), [True, False]) 1652 self.assertEqual(list(copy.deepcopy(it)), [True, False]) 1653 self.assertEqual(list(copy.copy(it)), [True, False]) 1654 1655 def test_chain(self): 1656 self.assertEqual(''.join(chain('ABC', 'DEF')), 'ABCDEF') 1657 1658 def test_chain_from_iterable(self): 1659 self.assertEqual(''.join(chain.from_iterable(['ABC', 'DEF'])), 'ABCDEF') 1660 1661 def test_combinations(self): 1662 self.assertEqual(list(combinations('ABCD', 2)), 1663 [('A','B'), ('A','C'), ('A','D'), ('B','C'), ('B','D'), ('C','D')]) 1664 self.assertEqual(list(combinations(range(4), 3)), 1665 [(0,1,2), (0,1,3), (0,2,3), (1,2,3)]) 1666 1667 def test_combinations_with_replacement(self): 1668 self.assertEqual(list(combinations_with_replacement('ABC', 2)), 1669 [('A','A'), ('A','B'), ('A','C'), ('B','B'), ('B','C'), ('C','C')]) 1670 1671 def test_compress(self): 1672 self.assertEqual(list(compress('ABCDEF', [1,0,1,0,1,1])), list('ACEF')) 1673 1674 def test_count(self): 1675 self.assertEqual(list(islice(count(10), 5)), [10, 11, 12, 13, 14]) 1676 1677 def test_cycle(self): 1678 self.assertEqual(list(islice(cycle('ABCD'), 12)), list('ABCDABCDABCD')) 1679 1680 def test_dropwhile(self): 1681 self.assertEqual(list(dropwhile(lambda x: x<5, [1,4,6,4,1])), [6,4,1]) 1682 1683 def test_groupby(self): 1684 self.assertEqual([k for k, g in groupby('AAAABBBCCDAABBB')], 1685 list('ABCDAB')) 1686 self.assertEqual([(list(g)) for k, g in groupby('AAAABBBCCD')], 1687 [list('AAAA'), list('BBB'), list('CC'), list('D')]) 1688 1689 def test_filter(self): 1690 self.assertEqual(list(filter(lambda x: x%2, range(10))), [1,3,5,7,9]) 1691 1692 def test_filterfalse(self): 1693 self.assertEqual(list(filterfalse(lambda x: x%2, range(10))), [0,2,4,6,8]) 1694 1695 def test_map(self): 1696 self.assertEqual(list(map(pow, (2,3,10), (5,2,3))), [32, 9, 1000]) 1697 1698 def test_islice(self): 1699 self.assertEqual(list(islice('ABCDEFG', 2)), list('AB')) 1700 self.assertEqual(list(islice('ABCDEFG', 2, 4)), list('CD')) 1701 self.assertEqual(list(islice('ABCDEFG', 2, None)), list('CDEFG')) 1702 self.assertEqual(list(islice('ABCDEFG', 0, None, 2)), list('ACEG')) 1703 1704 def test_zip(self): 1705 self.assertEqual(list(zip('ABCD', 'xy')), [('A', 'x'), ('B', 'y')]) 1706 1707 def test_zip_longest(self): 1708 self.assertEqual(list(zip_longest('ABCD', 'xy', fillvalue='-')), 1709 [('A', 'x'), ('B', 'y'), ('C', '-'), ('D', '-')]) 1710 1711 def test_permutations(self): 1712 self.assertEqual(list(permutations('ABCD', 2)), 1713 list(map(tuple, 'AB AC AD BA BC BD CA CB CD DA DB DC'.split()))) 1714 self.assertEqual(list(permutations(range(3))), 1715 [(0,1,2), (0,2,1), (1,0,2), (1,2,0), (2,0,1), (2,1,0)]) 1716 1717 def test_product(self): 1718 self.assertEqual(list(product('ABCD', 'xy')), 1719 list(map(tuple, 'Ax Ay Bx By Cx Cy Dx Dy'.split()))) 1720 self.assertEqual(list(product(range(2), repeat=3)), 1721 [(0,0,0), (0,0,1), (0,1,0), (0,1,1), 1722 (1,0,0), (1,0,1), (1,1,0), (1,1,1)]) 1723 1724 def test_repeat(self): 1725 self.assertEqual(list(repeat(10, 3)), [10, 10, 10]) 1726 1727 def test_stapmap(self): 1728 self.assertEqual(list(starmap(pow, [(2,5), (3,2), (10,3)])), 1729 [32, 9, 1000]) 1730 1731 def test_takewhile(self): 1732 self.assertEqual(list(takewhile(lambda x: x<5, [1,4,6,4,1])), [1,4]) 1733 1734 1735class TestPurePythonRoughEquivalents(unittest.TestCase): 1736 1737 @staticmethod 1738 def islice(iterable, *args): 1739 s = slice(*args) 1740 start, stop, step = s.start or 0, s.stop or sys.maxsize, s.step or 1 1741 it = iter(range(start, stop, step)) 1742 try: 1743 nexti = next(it) 1744 except StopIteration: 1745 # Consume *iterable* up to the *start* position. 1746 for i, element in zip(range(start), iterable): 1747 pass 1748 return 1749 try: 1750 for i, element in enumerate(iterable): 1751 if i == nexti: 1752 yield element 1753 nexti = next(it) 1754 except StopIteration: 1755 # Consume to *stop*. 1756 for i, element in zip(range(i + 1, stop), iterable): 1757 pass 1758 1759 def test_islice_recipe(self): 1760 self.assertEqual(list(self.islice('ABCDEFG', 2)), list('AB')) 1761 self.assertEqual(list(self.islice('ABCDEFG', 2, 4)), list('CD')) 1762 self.assertEqual(list(self.islice('ABCDEFG', 2, None)), list('CDEFG')) 1763 self.assertEqual(list(self.islice('ABCDEFG', 0, None, 2)), list('ACEG')) 1764 # Test items consumed. 1765 it = iter(range(10)) 1766 self.assertEqual(list(self.islice(it, 3)), list(range(3))) 1767 self.assertEqual(list(it), list(range(3, 10))) 1768 it = iter(range(10)) 1769 self.assertEqual(list(self.islice(it, 3, 3)), []) 1770 self.assertEqual(list(it), list(range(3, 10))) 1771 # Test that slice finishes in predictable state. 1772 c = count() 1773 self.assertEqual(list(self.islice(c, 1, 3, 50)), [1]) 1774 self.assertEqual(next(c), 3) 1775 1776 1777class TestGC(unittest.TestCase): 1778 1779 def makecycle(self, iterator, container): 1780 container.append(iterator) 1781 next(iterator) 1782 del container, iterator 1783 1784 def test_accumulate(self): 1785 a = [] 1786 self.makecycle(accumulate([1,2,a,3]), a) 1787 1788 def test_chain(self): 1789 a = [] 1790 self.makecycle(chain(a), a) 1791 1792 def test_chain_from_iterable(self): 1793 a = [] 1794 self.makecycle(chain.from_iterable([a]), a) 1795 1796 def test_combinations(self): 1797 a = [] 1798 self.makecycle(combinations([1,2,a,3], 3), a) 1799 1800 def test_combinations_with_replacement(self): 1801 a = [] 1802 self.makecycle(combinations_with_replacement([1,2,a,3], 3), a) 1803 1804 def test_compress(self): 1805 a = [] 1806 self.makecycle(compress('ABCDEF', [1,0,1,0,1,0]), a) 1807 1808 def test_count(self): 1809 a = [] 1810 Int = type('Int', (int,), dict(x=a)) 1811 self.makecycle(count(Int(0), Int(1)), a) 1812 1813 def test_cycle(self): 1814 a = [] 1815 self.makecycle(cycle([a]*2), a) 1816 1817 def test_dropwhile(self): 1818 a = [] 1819 self.makecycle(dropwhile(bool, [0, a, a]), a) 1820 1821 def test_groupby(self): 1822 a = [] 1823 self.makecycle(groupby([a]*2, lambda x:x), a) 1824 1825 def test_issue2246(self): 1826 # Issue 2246 -- the _grouper iterator was not included in GC 1827 n = 10 1828 keyfunc = lambda x: x 1829 for i, j in groupby(range(n), key=keyfunc): 1830 keyfunc.__dict__.setdefault('x',[]).append(j) 1831 1832 def test_filter(self): 1833 a = [] 1834 self.makecycle(filter(lambda x:True, [a]*2), a) 1835 1836 def test_filterfalse(self): 1837 a = [] 1838 self.makecycle(filterfalse(lambda x:False, a), a) 1839 1840 def test_zip(self): 1841 a = [] 1842 self.makecycle(zip([a]*2, [a]*3), a) 1843 1844 def test_zip_longest(self): 1845 a = [] 1846 self.makecycle(zip_longest([a]*2, [a]*3), a) 1847 b = [a, None] 1848 self.makecycle(zip_longest([a]*2, [a]*3, fillvalue=b), a) 1849 1850 def test_map(self): 1851 a = [] 1852 self.makecycle(map(lambda x:x, [a]*2), a) 1853 1854 def test_islice(self): 1855 a = [] 1856 self.makecycle(islice([a]*2, None), a) 1857 1858 def test_pairwise(self): 1859 a = [] 1860 self.makecycle(pairwise([a]*5), a) 1861 1862 def test_permutations(self): 1863 a = [] 1864 self.makecycle(permutations([1,2,a,3], 3), a) 1865 1866 def test_product(self): 1867 a = [] 1868 self.makecycle(product([1,2,a,3], repeat=3), a) 1869 1870 def test_repeat(self): 1871 a = [] 1872 self.makecycle(repeat(a), a) 1873 1874 def test_starmap(self): 1875 a = [] 1876 self.makecycle(starmap(lambda *t: t, [(a,a)]*2), a) 1877 1878 def test_takewhile(self): 1879 a = [] 1880 self.makecycle(takewhile(bool, [1, 0, a, a]), a) 1881 1882def R(seqn): 1883 'Regular generator' 1884 for i in seqn: 1885 yield i 1886 1887class G: 1888 'Sequence using __getitem__' 1889 def __init__(self, seqn): 1890 self.seqn = seqn 1891 def __getitem__(self, i): 1892 return self.seqn[i] 1893 1894class I: 1895 'Sequence using iterator protocol' 1896 def __init__(self, seqn): 1897 self.seqn = seqn 1898 self.i = 0 1899 def __iter__(self): 1900 return self 1901 def __next__(self): 1902 if self.i >= len(self.seqn): raise StopIteration 1903 v = self.seqn[self.i] 1904 self.i += 1 1905 return v 1906 1907class Ig: 1908 'Sequence using iterator protocol defined with a generator' 1909 def __init__(self, seqn): 1910 self.seqn = seqn 1911 self.i = 0 1912 def __iter__(self): 1913 for val in self.seqn: 1914 yield val 1915 1916class X: 1917 'Missing __getitem__ and __iter__' 1918 def __init__(self, seqn): 1919 self.seqn = seqn 1920 self.i = 0 1921 def __next__(self): 1922 if self.i >= len(self.seqn): raise StopIteration 1923 v = self.seqn[self.i] 1924 self.i += 1 1925 return v 1926 1927class N: 1928 'Iterator missing __next__()' 1929 def __init__(self, seqn): 1930 self.seqn = seqn 1931 self.i = 0 1932 def __iter__(self): 1933 return self 1934 1935class E: 1936 'Test propagation of exceptions' 1937 def __init__(self, seqn): 1938 self.seqn = seqn 1939 self.i = 0 1940 def __iter__(self): 1941 return self 1942 def __next__(self): 1943 3 // 0 1944 1945class S: 1946 'Test immediate stop' 1947 def __init__(self, seqn): 1948 pass 1949 def __iter__(self): 1950 return self 1951 def __next__(self): 1952 raise StopIteration 1953 1954def L(seqn): 1955 'Test multiple tiers of iterators' 1956 return chain(map(lambda x:x, R(Ig(G(seqn))))) 1957 1958 1959class TestVariousIteratorArgs(unittest.TestCase): 1960 1961 def test_accumulate(self): 1962 s = [1,2,3,4,5] 1963 r = [1,3,6,10,15] 1964 n = len(s) 1965 for g in (G, I, Ig, L, R): 1966 self.assertEqual(list(accumulate(g(s))), r) 1967 self.assertEqual(list(accumulate(S(s))), []) 1968 self.assertRaises(TypeError, accumulate, X(s)) 1969 self.assertRaises(TypeError, accumulate, N(s)) 1970 self.assertRaises(ZeroDivisionError, list, accumulate(E(s))) 1971 1972 def test_chain(self): 1973 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 1974 for g in (G, I, Ig, S, L, R): 1975 self.assertEqual(list(chain(g(s))), list(g(s))) 1976 self.assertEqual(list(chain(g(s), g(s))), list(g(s))+list(g(s))) 1977 self.assertRaises(TypeError, list, chain(X(s))) 1978 self.assertRaises(TypeError, list, chain(N(s))) 1979 self.assertRaises(ZeroDivisionError, list, chain(E(s))) 1980 1981 def test_compress(self): 1982 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 1983 n = len(s) 1984 for g in (G, I, Ig, S, L, R): 1985 self.assertEqual(list(compress(g(s), repeat(1))), list(g(s))) 1986 self.assertRaises(TypeError, compress, X(s), repeat(1)) 1987 self.assertRaises(TypeError, compress, N(s), repeat(1)) 1988 self.assertRaises(ZeroDivisionError, list, compress(E(s), repeat(1))) 1989 1990 def test_product(self): 1991 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 1992 self.assertRaises(TypeError, product, X(s)) 1993 self.assertRaises(TypeError, product, N(s)) 1994 self.assertRaises(ZeroDivisionError, product, E(s)) 1995 1996 def test_cycle(self): 1997 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 1998 for g in (G, I, Ig, S, L, R): 1999 tgtlen = len(s) * 3 2000 expected = list(g(s))*3 2001 actual = list(islice(cycle(g(s)), tgtlen)) 2002 self.assertEqual(actual, expected) 2003 self.assertRaises(TypeError, cycle, X(s)) 2004 self.assertRaises(TypeError, cycle, N(s)) 2005 self.assertRaises(ZeroDivisionError, list, cycle(E(s))) 2006 2007 def test_groupby(self): 2008 for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)): 2009 for g in (G, I, Ig, S, L, R): 2010 self.assertEqual([k for k, sb in groupby(g(s))], list(g(s))) 2011 self.assertRaises(TypeError, groupby, X(s)) 2012 self.assertRaises(TypeError, groupby, N(s)) 2013 self.assertRaises(ZeroDivisionError, list, groupby(E(s))) 2014 2015 def test_filter(self): 2016 for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)): 2017 for g in (G, I, Ig, S, L, R): 2018 self.assertEqual(list(filter(isEven, g(s))), 2019 [x for x in g(s) if isEven(x)]) 2020 self.assertRaises(TypeError, filter, isEven, X(s)) 2021 self.assertRaises(TypeError, filter, isEven, N(s)) 2022 self.assertRaises(ZeroDivisionError, list, filter(isEven, E(s))) 2023 2024 def test_filterfalse(self): 2025 for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)): 2026 for g in (G, I, Ig, S, L, R): 2027 self.assertEqual(list(filterfalse(isEven, g(s))), 2028 [x for x in g(s) if isOdd(x)]) 2029 self.assertRaises(TypeError, filterfalse, isEven, X(s)) 2030 self.assertRaises(TypeError, filterfalse, isEven, N(s)) 2031 self.assertRaises(ZeroDivisionError, list, filterfalse(isEven, E(s))) 2032 2033 def test_zip(self): 2034 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 2035 for g in (G, I, Ig, S, L, R): 2036 self.assertEqual(list(zip(g(s))), lzip(g(s))) 2037 self.assertEqual(list(zip(g(s), g(s))), lzip(g(s), g(s))) 2038 self.assertRaises(TypeError, zip, X(s)) 2039 self.assertRaises(TypeError, zip, N(s)) 2040 self.assertRaises(ZeroDivisionError, list, zip(E(s))) 2041 2042 def test_ziplongest(self): 2043 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 2044 for g in (G, I, Ig, S, L, R): 2045 self.assertEqual(list(zip_longest(g(s))), list(zip(g(s)))) 2046 self.assertEqual(list(zip_longest(g(s), g(s))), list(zip(g(s), g(s)))) 2047 self.assertRaises(TypeError, zip_longest, X(s)) 2048 self.assertRaises(TypeError, zip_longest, N(s)) 2049 self.assertRaises(ZeroDivisionError, list, zip_longest(E(s))) 2050 2051 def test_map(self): 2052 for s in (range(10), range(0), range(100), (7,11), range(20,50,5)): 2053 for g in (G, I, Ig, S, L, R): 2054 self.assertEqual(list(map(onearg, g(s))), 2055 [onearg(x) for x in g(s)]) 2056 self.assertEqual(list(map(operator.pow, g(s), g(s))), 2057 [x**x for x in g(s)]) 2058 self.assertRaises(TypeError, map, onearg, X(s)) 2059 self.assertRaises(TypeError, map, onearg, N(s)) 2060 self.assertRaises(ZeroDivisionError, list, map(onearg, E(s))) 2061 2062 def test_islice(self): 2063 for s in ("12345", "", range(1000), ('do', 1.2), range(2000,2200,5)): 2064 for g in (G, I, Ig, S, L, R): 2065 self.assertEqual(list(islice(g(s),1,None,2)), list(g(s))[1::2]) 2066 self.assertRaises(TypeError, islice, X(s), 10) 2067 self.assertRaises(TypeError, islice, N(s), 10) 2068 self.assertRaises(ZeroDivisionError, list, islice(E(s), 10)) 2069 2070 def test_pairwise(self): 2071 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 2072 for g in (G, I, Ig, S, L, R): 2073 seq = list(g(s)) 2074 expected = list(zip(seq, seq[1:])) 2075 actual = list(pairwise(g(s))) 2076 self.assertEqual(actual, expected) 2077 self.assertRaises(TypeError, pairwise, X(s)) 2078 self.assertRaises(TypeError, pairwise, N(s)) 2079 self.assertRaises(ZeroDivisionError, list, pairwise(E(s))) 2080 2081 def test_starmap(self): 2082 for s in (range(10), range(0), range(100), (7,11), range(20,50,5)): 2083 for g in (G, I, Ig, S, L, R): 2084 ss = lzip(s, s) 2085 self.assertEqual(list(starmap(operator.pow, g(ss))), 2086 [x**x for x in g(s)]) 2087 self.assertRaises(TypeError, starmap, operator.pow, X(ss)) 2088 self.assertRaises(TypeError, starmap, operator.pow, N(ss)) 2089 self.assertRaises(ZeroDivisionError, list, starmap(operator.pow, E(ss))) 2090 2091 def test_takewhile(self): 2092 for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)): 2093 for g in (G, I, Ig, S, L, R): 2094 tgt = [] 2095 for elem in g(s): 2096 if not isEven(elem): break 2097 tgt.append(elem) 2098 self.assertEqual(list(takewhile(isEven, g(s))), tgt) 2099 self.assertRaises(TypeError, takewhile, isEven, X(s)) 2100 self.assertRaises(TypeError, takewhile, isEven, N(s)) 2101 self.assertRaises(ZeroDivisionError, list, takewhile(isEven, E(s))) 2102 2103 def test_dropwhile(self): 2104 for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)): 2105 for g in (G, I, Ig, S, L, R): 2106 tgt = [] 2107 for elem in g(s): 2108 if not tgt and isOdd(elem): continue 2109 tgt.append(elem) 2110 self.assertEqual(list(dropwhile(isOdd, g(s))), tgt) 2111 self.assertRaises(TypeError, dropwhile, isOdd, X(s)) 2112 self.assertRaises(TypeError, dropwhile, isOdd, N(s)) 2113 self.assertRaises(ZeroDivisionError, list, dropwhile(isOdd, E(s))) 2114 2115 def test_tee(self): 2116 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 2117 for g in (G, I, Ig, S, L, R): 2118 it1, it2 = tee(g(s)) 2119 self.assertEqual(list(it1), list(g(s))) 2120 self.assertEqual(list(it2), list(g(s))) 2121 self.assertRaises(TypeError, tee, X(s)) 2122 self.assertRaises(TypeError, tee, N(s)) 2123 self.assertRaises(ZeroDivisionError, list, tee(E(s))[0]) 2124 2125class LengthTransparency(unittest.TestCase): 2126 2127 def test_repeat(self): 2128 self.assertEqual(operator.length_hint(repeat(None, 50)), 50) 2129 self.assertEqual(operator.length_hint(repeat(None, 0)), 0) 2130 self.assertEqual(operator.length_hint(repeat(None), 12), 12) 2131 2132 def test_repeat_with_negative_times(self): 2133 self.assertEqual(operator.length_hint(repeat(None, -1)), 0) 2134 self.assertEqual(operator.length_hint(repeat(None, -2)), 0) 2135 self.assertEqual(operator.length_hint(repeat(None, times=-1)), 0) 2136 self.assertEqual(operator.length_hint(repeat(None, times=-2)), 0) 2137 2138class RegressionTests(unittest.TestCase): 2139 2140 def test_sf_793826(self): 2141 # Fix Armin Rigo's successful efforts to wreak havoc 2142 2143 def mutatingtuple(tuple1, f, tuple2): 2144 # this builds a tuple t which is a copy of tuple1, 2145 # then calls f(t), then mutates t to be equal to tuple2 2146 # (needs len(tuple1) == len(tuple2)). 2147 def g(value, first=[1]): 2148 if first: 2149 del first[:] 2150 f(next(z)) 2151 return value 2152 items = list(tuple2) 2153 items[1:1] = list(tuple1) 2154 gen = map(g, items) 2155 z = zip(*[gen]*len(tuple1)) 2156 next(z) 2157 2158 def f(t): 2159 global T 2160 T = t 2161 first[:] = list(T) 2162 2163 first = [] 2164 mutatingtuple((1,2,3), f, (4,5,6)) 2165 second = list(T) 2166 self.assertEqual(first, second) 2167 2168 2169 def test_sf_950057(self): 2170 # Make sure that chain() and cycle() catch exceptions immediately 2171 # rather than when shifting between input sources 2172 2173 def gen1(): 2174 hist.append(0) 2175 yield 1 2176 hist.append(1) 2177 raise AssertionError 2178 hist.append(2) 2179 2180 def gen2(x): 2181 hist.append(3) 2182 yield 2 2183 hist.append(4) 2184 2185 hist = [] 2186 self.assertRaises(AssertionError, list, chain(gen1(), gen2(False))) 2187 self.assertEqual(hist, [0,1]) 2188 2189 hist = [] 2190 self.assertRaises(AssertionError, list, chain(gen1(), gen2(True))) 2191 self.assertEqual(hist, [0,1]) 2192 2193 hist = [] 2194 self.assertRaises(AssertionError, list, cycle(gen1())) 2195 self.assertEqual(hist, [0,1]) 2196 2197 @support.skip_if_pgo_task 2198 def test_long_chain_of_empty_iterables(self): 2199 # Make sure itertools.chain doesn't run into recursion limits when 2200 # dealing with long chains of empty iterables. Even with a high 2201 # number this would probably only fail in Py_DEBUG mode. 2202 it = chain.from_iterable(() for unused in range(10000000)) 2203 with self.assertRaises(StopIteration): 2204 next(it) 2205 2206 def test_issue30347_1(self): 2207 def f(n): 2208 if n == 5: 2209 list(b) 2210 return n != 6 2211 for (k, b) in groupby(range(10), f): 2212 list(b) # shouldn't crash 2213 2214 def test_issue30347_2(self): 2215 class K: 2216 def __init__(self, v): 2217 pass 2218 def __eq__(self, other): 2219 nonlocal i 2220 i += 1 2221 if i == 1: 2222 next(g, None) 2223 return True 2224 i = 0 2225 g = next(groupby(range(10), K))[1] 2226 for j in range(2): 2227 next(g, None) # shouldn't crash 2228 2229 2230class SubclassWithKwargsTest(unittest.TestCase): 2231 def test_keywords_in_subclass(self): 2232 # count is not subclassable... 2233 testcases = [ 2234 (repeat, (1, 2), [1, 1]), 2235 (zip, ([1, 2], 'ab'), [(1, 'a'), (2, 'b')]), 2236 (filter, (None, [0, 1]), [1]), 2237 (filterfalse, (None, [0, 1]), [0]), 2238 (chain, ([1, 2], [3, 4]), [1, 2, 3]), 2239 (map, (str, [1, 2]), ['1', '2']), 2240 (starmap, (operator.pow, ((2, 3), (3, 2))), [8, 9]), 2241 (islice, ([1, 2, 3, 4], 1, 3), [2, 3]), 2242 (takewhile, (isEven, [2, 3, 4]), [2]), 2243 (dropwhile, (isEven, [2, 3, 4]), [3, 4]), 2244 (cycle, ([1, 2],), [1, 2, 1]), 2245 (compress, ('ABC', [1, 0, 1]), ['A', 'C']), 2246 ] 2247 for cls, args, result in testcases: 2248 with self.subTest(cls): 2249 class subclass(cls): 2250 pass 2251 u = subclass(*args) 2252 self.assertIs(type(u), subclass) 2253 self.assertEqual(list(islice(u, 0, 3)), result) 2254 with self.assertRaises(TypeError): 2255 subclass(*args, newarg=3) 2256 2257 for cls, args, result in testcases: 2258 # Constructors of repeat, zip, compress accept keyword arguments. 2259 # Their subclasses need overriding __new__ to support new 2260 # keyword arguments. 2261 if cls in [repeat, zip, compress]: 2262 continue 2263 with self.subTest(cls): 2264 class subclass_with_init(cls): 2265 def __init__(self, *args, newarg=None): 2266 self.newarg = newarg 2267 u = subclass_with_init(*args, newarg=3) 2268 self.assertIs(type(u), subclass_with_init) 2269 self.assertEqual(list(islice(u, 0, 3)), result) 2270 self.assertEqual(u.newarg, 3) 2271 2272 for cls, args, result in testcases: 2273 with self.subTest(cls): 2274 class subclass_with_new(cls): 2275 def __new__(cls, *args, newarg=None): 2276 self = super().__new__(cls, *args) 2277 self.newarg = newarg 2278 return self 2279 u = subclass_with_new(*args, newarg=3) 2280 self.assertIs(type(u), subclass_with_new) 2281 self.assertEqual(list(islice(u, 0, 3)), result) 2282 self.assertEqual(u.newarg, 3) 2283 2284 2285@support.cpython_only 2286class SizeofTest(unittest.TestCase): 2287 def setUp(self): 2288 self.ssize_t = struct.calcsize('n') 2289 2290 check_sizeof = support.check_sizeof 2291 2292 def test_product_sizeof(self): 2293 basesize = support.calcobjsize('3Pi') 2294 check = self.check_sizeof 2295 check(product('ab', '12'), basesize + 2 * self.ssize_t) 2296 check(product(*(('abc',) * 10)), basesize + 10 * self.ssize_t) 2297 2298 def test_combinations_sizeof(self): 2299 basesize = support.calcobjsize('3Pni') 2300 check = self.check_sizeof 2301 check(combinations('abcd', 3), basesize + 3 * self.ssize_t) 2302 check(combinations(range(10), 4), basesize + 4 * self.ssize_t) 2303 2304 def test_combinations_with_replacement_sizeof(self): 2305 cwr = combinations_with_replacement 2306 basesize = support.calcobjsize('3Pni') 2307 check = self.check_sizeof 2308 check(cwr('abcd', 3), basesize + 3 * self.ssize_t) 2309 check(cwr(range(10), 4), basesize + 4 * self.ssize_t) 2310 2311 def test_permutations_sizeof(self): 2312 basesize = support.calcobjsize('4Pni') 2313 check = self.check_sizeof 2314 check(permutations('abcd'), 2315 basesize + 4 * self.ssize_t + 4 * self.ssize_t) 2316 check(permutations('abcd', 3), 2317 basesize + 4 * self.ssize_t + 3 * self.ssize_t) 2318 check(permutations('abcde', 3), 2319 basesize + 5 * self.ssize_t + 3 * self.ssize_t) 2320 check(permutations(range(10), 4), 2321 basesize + 10 * self.ssize_t + 4 * self.ssize_t) 2322 2323 2324libreftest = """ Doctest for examples in the library reference: libitertools.tex 2325 2326 2327>>> amounts = [120.15, 764.05, 823.14] 2328>>> for checknum, amount in zip(count(1200), amounts): 2329... print('Check %d is for $%.2f' % (checknum, amount)) 2330... 2331Check 1200 is for $120.15 2332Check 1201 is for $764.05 2333Check 1202 is for $823.14 2334 2335>>> import operator 2336>>> for cube in map(operator.pow, range(1,4), repeat(3)): 2337... print(cube) 2338... 23391 23408 234127 2342 2343>>> reportlines = ['EuroPython', 'Roster', '', 'alex', '', 'laura', '', 'martin', '', 'walter', '', 'samuele'] 2344>>> for name in islice(reportlines, 3, None, 2): 2345... print(name.title()) 2346... 2347Alex 2348Laura 2349Martin 2350Walter 2351Samuele 2352 2353>>> from operator import itemgetter 2354>>> d = dict(a=1, b=2, c=1, d=2, e=1, f=2, g=3) 2355>>> di = sorted(sorted(d.items()), key=itemgetter(1)) 2356>>> for k, g in groupby(di, itemgetter(1)): 2357... print(k, list(map(itemgetter(0), g))) 2358... 23591 ['a', 'c', 'e'] 23602 ['b', 'd', 'f'] 23613 ['g'] 2362 2363# Find runs of consecutive numbers using groupby. The key to the solution 2364# is differencing with a range so that consecutive numbers all appear in 2365# same group. 2366>>> data = [ 1, 4,5,6, 10, 15,16,17,18, 22, 25,26,27,28] 2367>>> for k, g in groupby(enumerate(data), lambda t:t[0]-t[1]): 2368... print(list(map(operator.itemgetter(1), g))) 2369... 2370[1] 2371[4, 5, 6] 2372[10] 2373[15, 16, 17, 18] 2374[22] 2375[25, 26, 27, 28] 2376 2377>>> def take(n, iterable): 2378... "Return first n items of the iterable as a list" 2379... return list(islice(iterable, n)) 2380 2381>>> def prepend(value, iterator): 2382... "Prepend a single value in front of an iterator" 2383... # prepend(1, [2, 3, 4]) -> 1 2 3 4 2384... return chain([value], iterator) 2385 2386>>> def enumerate(iterable, start=0): 2387... return zip(count(start), iterable) 2388 2389>>> def tabulate(function, start=0): 2390... "Return function(0), function(1), ..." 2391... return map(function, count(start)) 2392 2393>>> import collections 2394>>> def consume(iterator, n=None): 2395... "Advance the iterator n-steps ahead. If n is None, consume entirely." 2396... # Use functions that consume iterators at C speed. 2397... if n is None: 2398... # feed the entire iterator into a zero-length deque 2399... collections.deque(iterator, maxlen=0) 2400... else: 2401... # advance to the empty slice starting at position n 2402... next(islice(iterator, n, n), None) 2403 2404>>> def nth(iterable, n, default=None): 2405... "Returns the nth item or a default value" 2406... return next(islice(iterable, n, None), default) 2407 2408>>> def all_equal(iterable): 2409... "Returns True if all the elements are equal to each other" 2410... g = groupby(iterable) 2411... return next(g, True) and not next(g, False) 2412 2413>>> def quantify(iterable, pred=bool): 2414... "Count how many times the predicate is true" 2415... return sum(map(pred, iterable)) 2416 2417>>> def pad_none(iterable): 2418... "Returns the sequence elements and then returns None indefinitely" 2419... return chain(iterable, repeat(None)) 2420 2421>>> def ncycles(iterable, n): 2422... "Returns the sequence elements n times" 2423... return chain(*repeat(iterable, n)) 2424 2425>>> def dotproduct(vec1, vec2): 2426... return sum(map(operator.mul, vec1, vec2)) 2427 2428>>> def flatten(listOfLists): 2429... return list(chain.from_iterable(listOfLists)) 2430 2431>>> def repeatfunc(func, times=None, *args): 2432... "Repeat calls to func with specified arguments." 2433... " Example: repeatfunc(random.random)" 2434... if times is None: 2435... return starmap(func, repeat(args)) 2436... else: 2437... return starmap(func, repeat(args, times)) 2438 2439>>> def triplewise(iterable): 2440... "Return overlapping triplets from an iterable" 2441... # pairwise('ABCDEFG') -> ABC BCD CDE DEF EFG 2442... for (a, _), (b, c) in pairwise(pairwise(iterable)): 2443... yield a, b, c 2444 2445>>> import collections 2446>>> def sliding_window(iterable, n): 2447... # sliding_window('ABCDEFG', 4) -> ABCD BCDE CDEF DEFG 2448... it = iter(iterable) 2449... window = collections.deque(islice(it, n), maxlen=n) 2450... if len(window) == n: 2451... yield tuple(window) 2452... for x in it: 2453... window.append(x) 2454... yield tuple(window) 2455 2456>>> def grouper(n, iterable, fillvalue=None): 2457... "grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx" 2458... args = [iter(iterable)] * n 2459... return zip_longest(*args, fillvalue=fillvalue) 2460 2461>>> def roundrobin(*iterables): 2462... "roundrobin('ABC', 'D', 'EF') --> A D E B F C" 2463... # Recipe credited to George Sakkis 2464... pending = len(iterables) 2465... nexts = cycle(iter(it).__next__ for it in iterables) 2466... while pending: 2467... try: 2468... for next in nexts: 2469... yield next() 2470... except StopIteration: 2471... pending -= 1 2472... nexts = cycle(islice(nexts, pending)) 2473 2474>>> def partition(pred, iterable): 2475... "Use a predicate to partition entries into false entries and true entries" 2476... # partition(is_odd, range(10)) --> 0 2 4 6 8 and 1 3 5 7 9 2477... t1, t2 = tee(iterable) 2478... return filterfalse(pred, t1), filter(pred, t2) 2479 2480>>> def before_and_after(predicate, it): 2481... ''' Variant of takewhile() that allows complete 2482... access to the remainder of the iterator. 2483... 2484... >>> all_upper, remainder = before_and_after(str.isupper, 'ABCdEfGhI') 2485... >>> str.join('', all_upper) 2486... 'ABC' 2487... >>> str.join('', remainder) 2488... 'dEfGhI' 2489... 2490... Note that the first iterator must be fully 2491... consumed before the second iterator can 2492... generate valid results. 2493... ''' 2494... it = iter(it) 2495... transition = [] 2496... def true_iterator(): 2497... for elem in it: 2498... if predicate(elem): 2499... yield elem 2500... else: 2501... transition.append(elem) 2502... return 2503... def remainder_iterator(): 2504... yield from transition 2505... yield from it 2506... return true_iterator(), remainder_iterator() 2507 2508>>> def powerset(iterable): 2509... "powerset([1,2,3]) --> () (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3)" 2510... s = list(iterable) 2511... return chain.from_iterable(combinations(s, r) for r in range(len(s)+1)) 2512 2513>>> def unique_everseen(iterable, key=None): 2514... "List unique elements, preserving order. Remember all elements ever seen." 2515... # unique_everseen('AAAABBBCCDAABBB') --> A B C D 2516... # unique_everseen('ABBCcAD', str.lower) --> A B C D 2517... seen = set() 2518... seen_add = seen.add 2519... if key is None: 2520... for element in iterable: 2521... if element not in seen: 2522... seen_add(element) 2523... yield element 2524... else: 2525... for element in iterable: 2526... k = key(element) 2527... if k not in seen: 2528... seen_add(k) 2529... yield element 2530 2531>>> def unique_justseen(iterable, key=None): 2532... "List unique elements, preserving order. Remember only the element just seen." 2533... # unique_justseen('AAAABBBCCDAABBB') --> A B C D A B 2534... # unique_justseen('ABBCcAD', str.lower) --> A B C A D 2535... return map(next, map(itemgetter(1), groupby(iterable, key))) 2536 2537>>> def first_true(iterable, default=False, pred=None): 2538... '''Returns the first true value in the iterable. 2539... 2540... If no true value is found, returns *default* 2541... 2542... If *pred* is not None, returns the first item 2543... for which pred(item) is true. 2544... 2545... ''' 2546... # first_true([a,b,c], x) --> a or b or c or x 2547... # first_true([a,b], x, f) --> a if f(a) else b if f(b) else x 2548... return next(filter(pred, iterable), default) 2549 2550>>> def nth_combination(iterable, r, index): 2551... 'Equivalent to list(combinations(iterable, r))[index]' 2552... pool = tuple(iterable) 2553... n = len(pool) 2554... if r < 0 or r > n: 2555... raise ValueError 2556... c = 1 2557... k = min(r, n-r) 2558... for i in range(1, k+1): 2559... c = c * (n - k + i) // i 2560... if index < 0: 2561... index += c 2562... if index < 0 or index >= c: 2563... raise IndexError 2564... result = [] 2565... while r: 2566... c, n, r = c*r//n, n-1, r-1 2567... while index >= c: 2568... index -= c 2569... c, n = c*(n-r)//n, n-1 2570... result.append(pool[-1-n]) 2571... return tuple(result) 2572 2573 2574This is not part of the examples but it tests to make sure the definitions 2575perform as purported. 2576 2577>>> take(10, count()) 2578[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 2579 2580>>> list(prepend(1, [2, 3, 4])) 2581[1, 2, 3, 4] 2582 2583>>> list(enumerate('abc')) 2584[(0, 'a'), (1, 'b'), (2, 'c')] 2585 2586>>> list(islice(tabulate(lambda x: 2*x), 4)) 2587[0, 2, 4, 6] 2588 2589>>> it = iter(range(10)) 2590>>> consume(it, 3) 2591>>> next(it) 25923 2593>>> consume(it) 2594>>> next(it, 'Done') 2595'Done' 2596 2597>>> nth('abcde', 3) 2598'd' 2599 2600>>> nth('abcde', 9) is None 2601True 2602 2603>>> [all_equal(s) for s in ('', 'A', 'AAAA', 'AAAB', 'AAABA')] 2604[True, True, True, False, False] 2605 2606>>> quantify(range(99), lambda x: x%2==0) 260750 2608 2609>>> a = [[1, 2, 3], [4, 5, 6]] 2610>>> flatten(a) 2611[1, 2, 3, 4, 5, 6] 2612 2613>>> list(repeatfunc(pow, 5, 2, 3)) 2614[8, 8, 8, 8, 8] 2615 2616>>> import random 2617>>> take(5, map(int, repeatfunc(random.random))) 2618[0, 0, 0, 0, 0] 2619 2620>>> list(islice(pad_none('abc'), 0, 6)) 2621['a', 'b', 'c', None, None, None] 2622 2623>>> list(ncycles('abc', 3)) 2624['a', 'b', 'c', 'a', 'b', 'c', 'a', 'b', 'c'] 2625 2626>>> dotproduct([1,2,3], [4,5,6]) 262732 2628 2629>>> list(grouper(3, 'abcdefg', 'x')) 2630[('a', 'b', 'c'), ('d', 'e', 'f'), ('g', 'x', 'x')] 2631 2632>>> list(triplewise('ABCDEFG')) 2633[('A', 'B', 'C'), ('B', 'C', 'D'), ('C', 'D', 'E'), ('D', 'E', 'F'), ('E', 'F', 'G')] 2634 2635>>> list(sliding_window('ABCDEFG', 4)) 2636[('A', 'B', 'C', 'D'), ('B', 'C', 'D', 'E'), ('C', 'D', 'E', 'F'), ('D', 'E', 'F', 'G')] 2637 2638>>> list(roundrobin('abc', 'd', 'ef')) 2639['a', 'd', 'e', 'b', 'f', 'c'] 2640 2641>>> def is_odd(x): 2642... return x % 2 == 1 2643 2644>>> evens, odds = partition(is_odd, range(10)) 2645>>> list(evens) 2646[0, 2, 4, 6, 8] 2647>>> list(odds) 2648[1, 3, 5, 7, 9] 2649 2650>>> it = iter('ABCdEfGhI') 2651>>> all_upper, remainder = before_and_after(str.isupper, it) 2652>>> ''.join(all_upper) 2653'ABC' 2654>>> ''.join(remainder) 2655'dEfGhI' 2656 2657>>> list(powerset([1,2,3])) 2658[(), (1,), (2,), (3,), (1, 2), (1, 3), (2, 3), (1, 2, 3)] 2659 2660>>> all(len(list(powerset(range(n)))) == 2**n for n in range(18)) 2661True 2662 2663>>> list(powerset('abcde')) == sorted(sorted(set(powerset('abcde'))), key=len) 2664True 2665 2666>>> list(unique_everseen('AAAABBBCCDAABBB')) 2667['A', 'B', 'C', 'D'] 2668 2669>>> list(unique_everseen('ABBCcAD', str.lower)) 2670['A', 'B', 'C', 'D'] 2671 2672>>> list(unique_justseen('AAAABBBCCDAABBB')) 2673['A', 'B', 'C', 'D', 'A', 'B'] 2674 2675>>> list(unique_justseen('ABBCcAD', str.lower)) 2676['A', 'B', 'C', 'A', 'D'] 2677 2678>>> first_true('ABC0DEF1', '9', str.isdigit) 2679'0' 2680 2681>>> population = 'ABCDEFGH' 2682>>> for r in range(len(population) + 1): 2683... seq = list(combinations(population, r)) 2684... for i in range(len(seq)): 2685... assert nth_combination(population, r, i) == seq[i] 2686... for i in range(-len(seq), 0): 2687... assert nth_combination(population, r, i) == seq[i] 2688 2689 2690""" 2691 2692__test__ = {'libreftest' : libreftest} 2693 2694def load_tests(loader, tests, pattern): 2695 tests.addTest(doctest.DocTestSuite()) 2696 return tests 2697 2698 2699if __name__ == "__main__": 2700 unittest.main() 2701