1from __future__ import absolute_import, unicode_literals 2 3import copy 4import datetime 5import traceback 6from contextlib import contextmanager 7 8import pytest 9from case import Mock, call, patch, skip 10 11from celery import states, uuid 12from celery.app.task import Context 13from celery.backends.base import SyncBackendMixin 14from celery.exceptions import (CPendingDeprecationWarning, 15 ImproperlyConfigured, IncompleteStream, 16 TimeoutError) 17from celery.five import range 18from celery.result import (AsyncResult, EagerResult, GroupResult, ResultSet, 19 assert_will_not_block, result_from_tuple) 20from celery.utils.serialization import pickle 21 22PYTRACEBACK = """\ 23Traceback (most recent call last): 24 File "foo.py", line 2, in foofunc 25 don't matter 26 File "bar.py", line 3, in barfunc 27 don't matter 28Doesn't matter: really!\ 29""" 30 31 32def mock_task(name, state, result, traceback=None): 33 return { 34 'id': uuid(), 'name': name, 'state': state, 35 'result': result, 'traceback': traceback, 36 } 37 38 39def save_result(app, task): 40 traceback = task.get('traceback') or 'Some traceback' 41 if task['state'] == states.SUCCESS: 42 app.backend.mark_as_done(task['id'], task['result']) 43 elif task['state'] == states.RETRY: 44 app.backend.mark_as_retry( 45 task['id'], task['result'], traceback=traceback, 46 ) 47 else: 48 app.backend.mark_as_failure( 49 task['id'], task['result'], traceback=traceback, 50 ) 51 52 53def make_mock_group(app, size=10): 54 tasks = [mock_task('ts%d' % i, states.SUCCESS, i) for i in range(size)] 55 [save_result(app, task) for task in tasks] 56 return [app.AsyncResult(task['id']) for task in tasks] 57 58 59class _MockBackend: 60 def add_pending_result(self, *args, **kwargs): 61 return True 62 63 def wait_for_pending(self, *args, **kwargs): 64 return True 65 66 67class test_AsyncResult: 68 69 def setup(self): 70 self.app.conf.result_cache_max = 100 71 self.app.conf.result_serializer = 'pickle' 72 self.app.conf.result_extended = True 73 self.task1 = mock_task('task1', states.SUCCESS, 'the') 74 self.task2 = mock_task('task2', states.SUCCESS, 'quick') 75 self.task3 = mock_task('task3', states.FAILURE, KeyError('brown')) 76 self.task4 = mock_task('task3', states.RETRY, KeyError('red')) 77 self.task5 = mock_task( 78 'task3', states.FAILURE, KeyError('blue'), PYTRACEBACK, 79 ) 80 self.task6 = mock_task('task6', states.SUCCESS, None) 81 for task in (self.task1, self.task2, 82 self.task3, self.task4, self.task5, self.task6): 83 save_result(self.app, task) 84 85 @self.app.task(shared=False) 86 def mytask(): 87 pass 88 self.mytask = mytask 89 90 def test_forget(self): 91 first = Mock() 92 second = self.app.AsyncResult(self.task1['id'], parent=first) 93 third = self.app.AsyncResult(self.task2['id'], parent=second) 94 last = self.app.AsyncResult(self.task3['id'], parent=third) 95 last.forget() 96 first.forget.assert_called_once() 97 assert last.result is None 98 assert second.result is None 99 100 def test_ignored_getter(self): 101 result = self.app.AsyncResult(uuid()) 102 assert result.ignored is False 103 result.__delattr__('_ignored') 104 assert result.ignored is False 105 106 @patch('celery.result.task_join_will_block') 107 def test_assert_will_not_block(self, task_join_will_block): 108 task_join_will_block.return_value = True 109 with pytest.raises(RuntimeError): 110 assert_will_not_block() 111 task_join_will_block.return_value = False 112 assert_will_not_block() 113 114 @patch('celery.result.task_join_will_block') 115 def test_get_sync_subtask_option(self, task_join_will_block): 116 task_join_will_block.return_value = True 117 tid = uuid() 118 backend = _MockBackend() 119 res_subtask_async = AsyncResult(tid, backend=backend) 120 with pytest.raises(RuntimeError): 121 res_subtask_async.get() 122 res_subtask_async.get(disable_sync_subtasks=False) 123 124 def test_without_id(self): 125 with pytest.raises(ValueError): 126 AsyncResult(None, app=self.app) 127 128 def test_compat_properties(self): 129 x = self.app.AsyncResult('1') 130 assert x.task_id == x.id 131 x.task_id = '2' 132 assert x.id == '2' 133 134 @pytest.mark.usefixtures('depends_on_current_app') 135 def test_reduce_direct(self): 136 x = AsyncResult('1', app=self.app) 137 fun, args = x.__reduce__() 138 assert fun(*args) == x 139 140 def test_children(self): 141 x = self.app.AsyncResult('1') 142 children = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)] 143 x._cache = {'children': children, 'status': states.SUCCESS} 144 x.backend = Mock() 145 assert x.children 146 assert len(x.children) == 3 147 148 def test_propagates_for_parent(self): 149 x = self.app.AsyncResult(uuid()) 150 x.backend = Mock(name='backend') 151 x.backend.get_task_meta.return_value = {} 152 x.backend.wait_for_pending.return_value = 84 153 x.parent = EagerResult(uuid(), KeyError('foo'), states.FAILURE) 154 with pytest.raises(KeyError): 155 x.get(propagate=True) 156 x.backend.wait_for_pending.assert_not_called() 157 158 x.parent = EagerResult(uuid(), 42, states.SUCCESS) 159 assert x.get(propagate=True) == 84 160 x.backend.wait_for_pending.assert_called() 161 162 def test_get_children(self): 163 tid = uuid() 164 x = self.app.AsyncResult(tid) 165 child = [self.app.AsyncResult(uuid()).as_tuple() 166 for i in range(10)] 167 x._cache = {'children': child} 168 assert x.children 169 assert len(x.children) == 10 170 171 x._cache = {'status': states.SUCCESS} 172 x.backend._cache[tid] = {'result': None} 173 assert x.children is None 174 175 def test_build_graph_get_leaf_collect(self): 176 x = self.app.AsyncResult('1') 177 x.backend._cache['1'] = {'status': states.SUCCESS, 'result': None} 178 c = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)] 179 x.iterdeps = Mock() 180 x.iterdeps.return_value = ( 181 (None, x), 182 (x, c[0]), 183 (c[0], c[1]), 184 (c[1], c[2]) 185 ) 186 x.backend.READY_STATES = states.READY_STATES 187 assert x.graph 188 assert x.get_leaf() == 2 189 190 it = x.collect() 191 assert list(it) == [ 192 (x, None), 193 (c[0], 0), 194 (c[1], 1), 195 (c[2], 2), 196 ] 197 198 def test_iterdeps(self): 199 x = self.app.AsyncResult('1') 200 c = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)] 201 x._cache = {'status': states.SUCCESS, 'result': None, 'children': c} 202 for child in c: 203 child.backend = Mock() 204 child.backend.get_children.return_value = [] 205 it = x.iterdeps() 206 assert list(it) == [ 207 (None, x), 208 (x, c[0]), 209 (x, c[1]), 210 (x, c[2]), 211 ] 212 x._cache = None 213 x.ready = Mock() 214 x.ready.return_value = False 215 with pytest.raises(IncompleteStream): 216 list(x.iterdeps()) 217 list(x.iterdeps(intermediate=True)) 218 219 def test_eq_not_implemented(self): 220 assert self.app.AsyncResult('1') != object() 221 222 @pytest.mark.usefixtures('depends_on_current_app') 223 def test_reduce(self): 224 a1 = self.app.AsyncResult('uuid') 225 restored = pickle.loads(pickle.dumps(a1)) 226 assert restored.id == 'uuid' 227 228 a2 = self.app.AsyncResult('uuid') 229 assert pickle.loads(pickle.dumps(a2)).id == 'uuid' 230 231 def test_maybe_set_cache_empty(self): 232 self.app.AsyncResult('uuid')._maybe_set_cache(None) 233 234 def test_set_cache__children(self): 235 r1 = self.app.AsyncResult('id1') 236 r2 = self.app.AsyncResult('id2') 237 r1._set_cache({'children': [r2.as_tuple()]}) 238 assert r2 in r1.children 239 240 def test_successful(self): 241 ok_res = self.app.AsyncResult(self.task1['id']) 242 nok_res = self.app.AsyncResult(self.task3['id']) 243 nok_res2 = self.app.AsyncResult(self.task4['id']) 244 245 assert ok_res.successful() 246 assert not nok_res.successful() 247 assert not nok_res2.successful() 248 249 pending_res = self.app.AsyncResult(uuid()) 250 assert not pending_res.successful() 251 252 def test_raising(self): 253 notb = self.app.AsyncResult(self.task3['id']) 254 withtb = self.app.AsyncResult(self.task5['id']) 255 256 with pytest.raises(KeyError): 257 notb.get() 258 with pytest.raises(KeyError) as excinfo: 259 withtb.get() 260 261 tb = [t.strip() for t in traceback.format_tb(excinfo.tb)] 262 assert 'File "foo.py", line 2, in foofunc' not in tb 263 assert 'File "bar.py", line 3, in barfunc' not in tb 264 assert excinfo.value.args[0] == 'blue' 265 assert excinfo.typename == 'KeyError' 266 267 @skip.unless_module('tblib') 268 def test_raising_remote_tracebacks(self): 269 withtb = self.app.AsyncResult(self.task5['id']) 270 self.app.conf.task_remote_tracebacks = True 271 with pytest.raises(KeyError) as excinfo: 272 withtb.get() 273 tb = [t.strip() for t in traceback.format_tb(excinfo.tb)] 274 assert 'File "foo.py", line 2, in foofunc' in tb 275 assert 'File "bar.py", line 3, in barfunc' in tb 276 assert excinfo.value.args[0] == 'blue' 277 assert excinfo.typename == 'KeyError' 278 279 def test_str(self): 280 ok_res = self.app.AsyncResult(self.task1['id']) 281 ok2_res = self.app.AsyncResult(self.task2['id']) 282 nok_res = self.app.AsyncResult(self.task3['id']) 283 assert str(ok_res) == self.task1['id'] 284 assert str(ok2_res) == self.task2['id'] 285 assert str(nok_res) == self.task3['id'] 286 287 pending_id = uuid() 288 pending_res = self.app.AsyncResult(pending_id) 289 assert str(pending_res) == pending_id 290 291 def test_repr(self): 292 ok_res = self.app.AsyncResult(self.task1['id']) 293 ok2_res = self.app.AsyncResult(self.task2['id']) 294 nok_res = self.app.AsyncResult(self.task3['id']) 295 assert repr(ok_res) == '<AsyncResult: %s>' % (self.task1['id'],) 296 assert repr(ok2_res) == '<AsyncResult: %s>' % (self.task2['id'],) 297 assert repr(nok_res) == '<AsyncResult: %s>' % (self.task3['id'],) 298 299 pending_id = uuid() 300 pending_res = self.app.AsyncResult(pending_id) 301 assert repr(pending_res) == '<AsyncResult: %s>' % (pending_id,) 302 303 def test_hash(self): 304 assert (hash(self.app.AsyncResult('x0w991')) == 305 hash(self.app.AsyncResult('x0w991'))) 306 assert (hash(self.app.AsyncResult('x0w991')) != 307 hash(self.app.AsyncResult('x1w991'))) 308 309 def test_get_traceback(self): 310 ok_res = self.app.AsyncResult(self.task1['id']) 311 nok_res = self.app.AsyncResult(self.task3['id']) 312 nok_res2 = self.app.AsyncResult(self.task4['id']) 313 assert not ok_res.traceback 314 assert nok_res.traceback 315 assert nok_res2.traceback 316 317 pending_res = self.app.AsyncResult(uuid()) 318 assert not pending_res.traceback 319 320 def test_get__backend_gives_None(self): 321 res = self.app.AsyncResult(self.task1['id']) 322 res.backend.wait_for = Mock(name='wait_for') 323 res.backend.wait_for.return_value = None 324 assert res.get() is None 325 326 def test_get(self): 327 ok_res = self.app.AsyncResult(self.task1['id']) 328 ok2_res = self.app.AsyncResult(self.task2['id']) 329 nok_res = self.app.AsyncResult(self.task3['id']) 330 nok2_res = self.app.AsyncResult(self.task4['id']) 331 none_res = self.app.AsyncResult(self.task6['id']) 332 333 callback = Mock(name='callback') 334 335 assert ok_res.get(callback=callback) == 'the' 336 callback.assert_called_with(ok_res.id, 'the') 337 assert ok2_res.get() == 'quick' 338 with pytest.raises(KeyError): 339 nok_res.get() 340 assert nok_res.get(propagate=False) 341 assert isinstance(nok2_res.result, KeyError) 342 assert ok_res.info == 'the' 343 assert none_res.get() is None 344 assert none_res.state == states.SUCCESS 345 346 def test_get_when_ignored(self): 347 result = self.app.AsyncResult(uuid()) 348 result.ignored = True 349 # Does not block 350 assert result.get() is None 351 352 def test_eq_ne(self): 353 r1 = self.app.AsyncResult(self.task1['id']) 354 r2 = self.app.AsyncResult(self.task1['id']) 355 r3 = self.app.AsyncResult(self.task2['id']) 356 assert r1 == r2 357 assert r1 != r3 358 assert r1 == r2.id 359 assert r1 != r3.id 360 361 @pytest.mark.usefixtures('depends_on_current_app') 362 def test_reduce_restore(self): 363 r1 = self.app.AsyncResult(self.task1['id']) 364 fun, args = r1.__reduce__() 365 assert fun(*args) == r1 366 367 def test_get_timeout(self): 368 res = self.app.AsyncResult(self.task4['id']) # has RETRY state 369 with pytest.raises(TimeoutError): 370 res.get(timeout=0.001) 371 372 pending_res = self.app.AsyncResult(uuid()) 373 with patch('celery.result.time') as _time: 374 with pytest.raises(TimeoutError): 375 pending_res.get(timeout=0.001, interval=0.001) 376 _time.sleep.assert_called_with(0.001) 377 378 def test_get_timeout_longer(self): 379 res = self.app.AsyncResult(self.task4['id']) # has RETRY state 380 with patch('celery.result.time') as _time: 381 with pytest.raises(TimeoutError): 382 res.get(timeout=1, interval=1) 383 _time.sleep.assert_called_with(1) 384 385 def test_ready(self): 386 oks = (self.app.AsyncResult(self.task1['id']), 387 self.app.AsyncResult(self.task2['id']), 388 self.app.AsyncResult(self.task3['id'])) 389 assert all(result.ready() for result in oks) 390 assert not self.app.AsyncResult(self.task4['id']).ready() 391 392 assert not self.app.AsyncResult(uuid()).ready() 393 394 def test_del(self): 395 with patch('celery.result.AsyncResult.backend') as backend: 396 result = self.app.AsyncResult(self.task1['id']) 397 result_clone = copy.copy(result) 398 del result 399 assert backend.remove_pending_result.called_once_with( 400 result_clone 401 ) 402 403 result = self.app.AsyncResult(self.task1['id']) 404 result.backend = None 405 del result 406 407 def test_get_request_meta(self): 408 409 x = self.app.AsyncResult('1') 410 request = Context( 411 task='foo', 412 children=None, 413 args=['one', 'two'], 414 kwargs={'kwarg1': 'three'}, 415 hostname="foo", 416 retries=1, 417 delivery_info={'routing_key': 'celery'} 418 ) 419 x.backend.store_result(task_id="1", result='foo', state=states.SUCCESS, 420 traceback=None, request=request) 421 assert x.name == 'foo' 422 assert x.args == ['one', 'two'] 423 assert x.kwargs == {'kwarg1': 'three'} 424 assert x.worker == 'foo' 425 assert x.retries == 1 426 assert x.queue == 'celery' 427 assert isinstance(x.date_done, datetime.datetime) 428 assert x.task_id == "1" 429 assert x.state == "SUCCESS" 430 result = self.app.AsyncResult(self.task4['id']) 431 assert result.date_done is None 432 433 @pytest.mark.parametrize('result_dict, date', [ 434 ({'date_done': None}, None), 435 ({'date_done': '1991-10-05T05:41:06'}, 436 datetime.datetime(1991, 10, 5, 5, 41, 6)), 437 ({'date_done': datetime.datetime(1991, 10, 5, 5, 41, 6)}, 438 datetime.datetime(1991, 10, 5, 5, 41, 6)) 439 ]) 440 def test_date_done(self, result_dict, date): 441 result = self.app.AsyncResult(uuid()) 442 result._cache = result_dict 443 assert result.date_done == date 444 445 446class test_ResultSet: 447 448 def test_resultset_repr(self): 449 assert repr(self.app.ResultSet( 450 [self.app.AsyncResult(t) for t in ['1', '2', '3']])) 451 452 def test_eq_other(self): 453 assert self.app.ResultSet([ 454 self.app.AsyncResult(t) for t in [1, 3, 3]]) != 1 455 rs1 = self.app.ResultSet([self.app.AsyncResult(1)]) 456 rs2 = self.app.ResultSet([self.app.AsyncResult(1)]) 457 assert rs1 == rs2 458 459 def test_get(self): 460 x = self.app.ResultSet([self.app.AsyncResult(t) for t in [1, 2, 3]]) 461 b = x.results[0].backend = Mock() 462 b.supports_native_join = False 463 x.join_native = Mock() 464 x.join = Mock() 465 x.get() 466 x.join.assert_called() 467 b.supports_native_join = True 468 x.get() 469 x.join_native.assert_called() 470 471 @patch('celery.result.task_join_will_block') 472 def test_get_sync_subtask_option(self, task_join_will_block): 473 task_join_will_block.return_value = True 474 x = self.app.ResultSet([self.app.AsyncResult(str(t)) for t in [1, 2, 3]]) 475 b = x.results[0].backend = Mock() 476 b.supports_native_join = False 477 with pytest.raises(RuntimeError): 478 x.get() 479 with pytest.raises(TimeoutError): 480 x.get(disable_sync_subtasks=False, timeout=0.1) 481 482 def test_join_native_with_group_chain_group(self): 483 """Test group(chain(group)) case, join_native can be run correctly. 484 In group(chain(group)) case, GroupResult has no _cache property, and 485 AsyncBackendMixin.iter_native returns a node instead of node._cache, 486 this test make sure ResultSet.join_native can process correctly both 487 values of AsyncBackendMixin.iter_native returns. 488 """ 489 def _get_meta(tid, result=None, children=None): 490 return { 491 'status': states.SUCCESS, 492 'result': result, 493 'children': children, 494 'task_id': tid, 495 } 496 497 results = [self.app.AsyncResult(t) for t in [1, 2, 3]] 498 values = [(_.id, _get_meta(_.id, _)) for _ in results] 499 g_res = GroupResult(6, [self.app.AsyncResult(t) for t in [4, 5]]) 500 results += [g_res] 501 values += [(6, g_res.children)] 502 x = self.app.ResultSet(results) 503 x.results[0].backend = Mock() 504 x.results[0].backend.join = Mock() 505 x.results[3][0].get = Mock() 506 x.results[3][0].get.return_value = g_res.results[0] 507 x.results[3][1].get = Mock() 508 x.results[3][1].get.return_value = g_res.results[1] 509 x.iter_native = Mock() 510 x.iter_native.return_value = values.__iter__() 511 x.join_native() 512 x.iter_native.assert_called() 513 514 def test_eq_ne(self): 515 g1 = self.app.ResultSet([ 516 self.app.AsyncResult('id1'), 517 self.app.AsyncResult('id2'), 518 ]) 519 g2 = self.app.ResultSet([ 520 self.app.AsyncResult('id1'), 521 self.app.AsyncResult('id2'), 522 ]) 523 g3 = self.app.ResultSet([ 524 self.app.AsyncResult('id3'), 525 self.app.AsyncResult('id1'), 526 ]) 527 assert g1 == g2 528 assert g1 != g3 529 assert g1 != object() 530 531 def test_takes_app_from_first_task(self): 532 x = ResultSet([self.app.AsyncResult('id1')]) 533 assert x.app is x.results[0].app 534 x.app = self.app 535 assert x.app is self.app 536 537 def test_get_empty(self): 538 x = self.app.ResultSet([]) 539 assert x.supports_native_join is None 540 x.join = Mock(name='join') 541 x.get() 542 x.join.assert_called() 543 544 def test_add(self): 545 x = self.app.ResultSet([self.app.AsyncResult(1)]) 546 x.add(self.app.AsyncResult(2)) 547 assert len(x) == 2 548 x.add(self.app.AsyncResult(2)) 549 assert len(x) == 2 550 551 @contextmanager 552 def dummy_copy(self): 553 with patch('celery.result.copy') as copy: 554 555 def passt(arg): 556 return arg 557 copy.side_effect = passt 558 559 yield 560 561 def test_iterate_respects_subpolling_interval(self): 562 r1 = self.app.AsyncResult(uuid()) 563 r2 = self.app.AsyncResult(uuid()) 564 backend = r1.backend = r2.backend = Mock() 565 backend.subpolling_interval = 10 566 567 ready = r1.ready = r2.ready = Mock() 568 569 def se(*args, **kwargs): 570 ready.side_effect = KeyError() 571 return False 572 ready.return_value = False 573 ready.side_effect = se 574 575 x = self.app.ResultSet([r1, r2]) 576 with self.dummy_copy(): 577 with patch('celery.result.time') as _time: 578 with pytest.warns(CPendingDeprecationWarning): 579 with pytest.raises(KeyError): 580 list(x.iterate()) 581 _time.sleep.assert_called_with(10) 582 583 backend.subpolling_interval = 0 584 with patch('celery.result.time') as _time: 585 with pytest.warns(CPendingDeprecationWarning): 586 with pytest.raises(KeyError): 587 ready.return_value = False 588 ready.side_effect = se 589 list(x.iterate()) 590 _time.sleep.assert_not_called() 591 592 def test_times_out(self): 593 r1 = self.app.AsyncResult(uuid) 594 r1.ready = Mock() 595 r1.ready.return_value = False 596 x = self.app.ResultSet([r1]) 597 with self.dummy_copy(): 598 with patch('celery.result.time'): 599 with pytest.warns(CPendingDeprecationWarning): 600 with pytest.raises(TimeoutError): 601 list(x.iterate(timeout=1)) 602 603 def test_add_discard(self): 604 x = self.app.ResultSet([]) 605 x.add(self.app.AsyncResult('1')) 606 assert self.app.AsyncResult('1') in x.results 607 x.discard(self.app.AsyncResult('1')) 608 x.discard(self.app.AsyncResult('1')) 609 x.discard('1') 610 assert self.app.AsyncResult('1') not in x.results 611 612 x.update([self.app.AsyncResult('2')]) 613 614 def test_clear(self): 615 x = self.app.ResultSet([]) 616 r = x.results 617 x.clear() 618 assert x.results is r 619 620 621class MockAsyncResultFailure(AsyncResult): 622 623 @property 624 def result(self): 625 return KeyError('baz') 626 627 @property 628 def state(self): 629 return states.FAILURE 630 631 def get(self, propagate=True, **kwargs): 632 if propagate: 633 raise self.result 634 return self.result 635 636 637class MockAsyncResultSuccess(AsyncResult): 638 forgotten = False 639 640 def __init__(self, *args, **kwargs): 641 self._result = kwargs.pop('result', 42) 642 super(MockAsyncResultSuccess, self).__init__(*args, **kwargs) 643 644 def forget(self): 645 self.forgotten = True 646 647 @property 648 def result(self): 649 return self._result 650 651 @property 652 def state(self): 653 return states.SUCCESS 654 655 def get(self, **kwargs): 656 return self.result 657 658 659class SimpleBackend(SyncBackendMixin): 660 ids = [] 661 662 def __init__(self, ids=[]): 663 self.ids = ids 664 665 def _ensure_not_eager(self): 666 pass 667 668 def get_many(self, *args, **kwargs): 669 return ((id, {'result': i, 'status': states.SUCCESS}) 670 for i, id in enumerate(self.ids)) 671 672 673class test_GroupResult: 674 675 def setup(self): 676 self.size = 10 677 self.ts = self.app.GroupResult( 678 uuid(), make_mock_group(self.app, self.size), 679 ) 680 681 @pytest.mark.usefixtures('depends_on_current_app') 682 def test_is_pickleable(self): 683 ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())]) 684 assert pickle.loads(pickle.dumps(ts)) == ts 685 ts2 = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())]) 686 assert pickle.loads(pickle.dumps(ts2)) == ts2 687 688 @pytest.mark.usefixtures('depends_on_current_app') 689 def test_reduce(self): 690 ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())]) 691 fun, args = ts.__reduce__() 692 ts2 = fun(*args) 693 assert ts2.id == ts.id 694 assert ts == ts2 695 696 def test_eq_ne(self): 697 ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())]) 698 ts2 = self.app.GroupResult(ts.id, ts.results) 699 ts3 = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())]) 700 ts4 = self.app.GroupResult(ts.id, [self.app.AsyncResult(uuid())]) 701 assert ts == ts2 702 assert ts != ts3 703 assert ts != ts4 704 assert ts != object() 705 706 def test_len(self): 707 assert len(self.ts) == self.size 708 709 def test_eq_other(self): 710 assert self.ts != 1 711 712 def test_eq_with_parent(self): 713 # GroupResult instances with different .parent are not equal 714 grp_res = self.app.GroupResult( 715 uuid(), [self.app.AsyncResult(uuid()) for _ in range(10)], 716 parent=self.app.AsyncResult(uuid()) 717 ) 718 grp_res_2 = self.app.GroupResult(grp_res.id, grp_res.results) 719 assert grp_res != grp_res_2 720 721 grp_res_2.parent = self.app.AsyncResult(uuid()) 722 assert grp_res != grp_res_2 723 724 grp_res_2.parent = grp_res.parent 725 assert grp_res == grp_res_2 726 727 @pytest.mark.usefixtures('depends_on_current_app') 728 def test_pickleable(self): 729 assert pickle.loads(pickle.dumps(self.ts)) 730 731 def test_iterate_raises(self): 732 ar = MockAsyncResultFailure(uuid(), app=self.app) 733 ts = self.app.GroupResult(uuid(), [ar]) 734 with pytest.warns(CPendingDeprecationWarning): 735 it = ts.iterate() 736 with pytest.raises(KeyError): 737 next(it) 738 739 def test_forget(self): 740 subs = [MockAsyncResultSuccess(uuid(), app=self.app), 741 MockAsyncResultSuccess(uuid(), app=self.app)] 742 ts = self.app.GroupResult(uuid(), subs) 743 ts.forget() 744 for sub in subs: 745 assert sub.forgotten 746 747 def test_get_nested_without_native_join(self): 748 backend = SimpleBackend() 749 backend.supports_native_join = False 750 ts = self.app.GroupResult(uuid(), [ 751 MockAsyncResultSuccess(uuid(), result='1.1', 752 app=self.app, backend=backend), 753 self.app.GroupResult(uuid(), [ 754 MockAsyncResultSuccess(uuid(), result='2.1', 755 app=self.app, backend=backend), 756 self.app.GroupResult(uuid(), [ 757 MockAsyncResultSuccess(uuid(), result='3.1', 758 app=self.app, backend=backend), 759 MockAsyncResultSuccess(uuid(), result='3.2', 760 app=self.app, backend=backend), 761 ]), 762 ]), 763 ]) 764 ts.app.backend = backend 765 766 vals = ts.get() 767 assert vals == [ 768 '1.1', 769 [ 770 '2.1', 771 [ 772 '3.1', 773 '3.2', 774 ] 775 ], 776 ] 777 778 def test_getitem(self): 779 subs = [MockAsyncResultSuccess(uuid(), app=self.app), 780 MockAsyncResultSuccess(uuid(), app=self.app)] 781 ts = self.app.GroupResult(uuid(), subs) 782 assert ts[0] is subs[0] 783 784 def test_save_restore(self): 785 subs = [MockAsyncResultSuccess(uuid(), app=self.app), 786 MockAsyncResultSuccess(uuid(), app=self.app)] 787 ts = self.app.GroupResult(uuid(), subs) 788 ts.save() 789 with pytest.raises(AttributeError): 790 ts.save(backend=object()) 791 assert self.app.GroupResult.restore(ts.id).results == ts.results 792 ts.delete() 793 assert self.app.GroupResult.restore(ts.id) is None 794 with pytest.raises(AttributeError): 795 self.app.GroupResult.restore(ts.id, backend=object()) 796 797 def test_save_restore_empty(self): 798 subs = [] 799 ts = self.app.GroupResult(uuid(), subs) 800 ts.save() 801 assert isinstance( 802 self.app.GroupResult.restore(ts.id), 803 self.app.GroupResult, 804 ) 805 assert self.app.GroupResult.restore(ts.id).results == ts.results == [] 806 807 def test_restore_app(self): 808 subs = [MockAsyncResultSuccess(uuid(), app=self.app)] 809 ts = self.app.GroupResult(uuid(), subs) 810 ts.save() 811 restored = GroupResult.restore(ts.id, app=self.app) 812 assert restored.id == ts.id 813 814 def test_restore_current_app_fallback(self): 815 subs = [MockAsyncResultSuccess(uuid(), app=self.app)] 816 ts = self.app.GroupResult(uuid(), subs) 817 ts.save() 818 with pytest.raises(RuntimeError, 819 match="Test depends on current_app"): 820 GroupResult.restore(ts.id) 821 822 def test_join_native(self): 823 backend = SimpleBackend() 824 results = [self.app.AsyncResult(uuid(), backend=backend) 825 for i in range(10)] 826 ts = self.app.GroupResult(uuid(), results) 827 ts.app.backend = backend 828 backend.ids = [result.id for result in results] 829 res = ts.join_native() 830 assert res == list(range(10)) 831 callback = Mock(name='callback') 832 assert not ts.join_native(callback=callback) 833 callback.assert_has_calls([ 834 call(r.id, i) for i, r in enumerate(ts.results) 835 ]) 836 837 def test_join_native_raises(self): 838 ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())]) 839 ts.iter_native = Mock() 840 ts.iter_native.return_value = iter([ 841 (uuid(), {'status': states.FAILURE, 'result': KeyError()}) 842 ]) 843 with pytest.raises(KeyError): 844 ts.join_native(propagate=True) 845 846 def test_failed_join_report(self): 847 res = Mock() 848 ts = self.app.GroupResult(uuid(), [res]) 849 res.state = states.FAILURE 850 res.backend.is_cached.return_value = True 851 assert next(ts._failed_join_report()) is res 852 res.backend.is_cached.return_value = False 853 with pytest.raises(StopIteration): 854 next(ts._failed_join_report()) 855 856 def test_repr(self): 857 assert repr( 858 self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])) 859 860 def test_children_is_results(self): 861 ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())]) 862 assert ts.children is ts.results 863 864 def test_iter_native(self): 865 backend = SimpleBackend() 866 results = [self.app.AsyncResult(uuid(), backend=backend) 867 for i in range(10)] 868 ts = self.app.GroupResult(uuid(), results) 869 ts.app.backend = backend 870 backend.ids = [result.id for result in results] 871 assert len(list(ts.iter_native())) == 10 872 873 def test_iterate_yields(self): 874 ar = MockAsyncResultSuccess(uuid(), app=self.app) 875 ar2 = MockAsyncResultSuccess(uuid(), app=self.app) 876 ts = self.app.GroupResult(uuid(), [ar, ar2]) 877 with pytest.warns(CPendingDeprecationWarning): 878 it = ts.iterate() 879 assert next(it) == 42 880 assert next(it) == 42 881 882 def test_iterate_eager(self): 883 ar1 = EagerResult(uuid(), 42, states.SUCCESS) 884 ar2 = EagerResult(uuid(), 42, states.SUCCESS) 885 ts = self.app.GroupResult(uuid(), [ar1, ar2]) 886 with pytest.warns(CPendingDeprecationWarning): 887 it = ts.iterate() 888 assert next(it) == 42 889 assert next(it) == 42 890 891 def test_join_timeout(self): 892 ar = MockAsyncResultSuccess(uuid(), app=self.app) 893 ar2 = MockAsyncResultSuccess(uuid(), app=self.app) 894 ar3 = self.app.AsyncResult(uuid()) 895 ts = self.app.GroupResult(uuid(), [ar, ar2, ar3]) 896 with pytest.raises(TimeoutError): 897 ts.join(timeout=0.0000001) 898 899 ar4 = self.app.AsyncResult(uuid()) 900 ar4.get = Mock() 901 ts2 = self.app.GroupResult(uuid(), [ar4]) 902 assert ts2.join(timeout=0.1) 903 callback = Mock(name='callback') 904 assert not ts2.join(timeout=0.1, callback=callback) 905 callback.assert_called_with(ar4.id, ar4.get()) 906 907 def test_iter_native_when_empty_group(self): 908 ts = self.app.GroupResult(uuid(), []) 909 assert list(ts.iter_native()) == [] 910 911 def test_iterate_simple(self): 912 with pytest.warns(CPendingDeprecationWarning): 913 it = self.ts.iterate() 914 results = sorted(list(it)) 915 assert results == list(range(self.size)) 916 917 def test___iter__(self): 918 assert list(iter(self.ts)) == self.ts.results 919 920 def test_join(self): 921 joined = self.ts.join() 922 assert joined == list(range(self.size)) 923 924 def test_successful(self): 925 assert self.ts.successful() 926 927 def test_failed(self): 928 assert not self.ts.failed() 929 930 def test_maybe_throw(self): 931 self.ts.results = [Mock(name='r1')] 932 self.ts.maybe_throw() 933 self.ts.results[0].maybe_throw.assert_called_with( 934 callback=None, propagate=True, 935 ) 936 937 def test_join__on_message(self): 938 with pytest.raises(ImproperlyConfigured): 939 self.ts.join(on_message=Mock()) 940 941 def test_waiting(self): 942 assert not self.ts.waiting() 943 944 def test_ready(self): 945 assert self.ts.ready() 946 947 def test_completed_count(self): 948 assert self.ts.completed_count() == len(self.ts) 949 950 951class test_pending_AsyncResult: 952 953 def test_result(self, app): 954 res = app.AsyncResult(uuid()) 955 assert res.result is None 956 957 958class test_failed_AsyncResult: 959 960 def setup(self): 961 self.size = 11 962 self.app.conf.result_serializer = 'pickle' 963 results = make_mock_group(self.app, 10) 964 failed = mock_task('ts11', states.FAILURE, KeyError('Baz')) 965 save_result(self.app, failed) 966 failed_res = self.app.AsyncResult(failed['id']) 967 self.ts = self.app.GroupResult(uuid(), results + [failed_res]) 968 969 def test_completed_count(self): 970 assert self.ts.completed_count() == len(self.ts) - 1 971 972 def test_iterate_simple(self): 973 with pytest.warns(CPendingDeprecationWarning): 974 it = self.ts.iterate() 975 976 def consume(): 977 return list(it) 978 979 with pytest.raises(KeyError): 980 consume() 981 982 def test_join(self): 983 with pytest.raises(KeyError): 984 self.ts.join() 985 986 def test_successful(self): 987 assert not self.ts.successful() 988 989 def test_failed(self): 990 assert self.ts.failed() 991 992 993class test_pending_Group: 994 995 def setup(self): 996 self.ts = self.app.GroupResult( 997 uuid(), [self.app.AsyncResult(uuid()), 998 self.app.AsyncResult(uuid())]) 999 1000 def test_completed_count(self): 1001 assert self.ts.completed_count() == 0 1002 1003 def test_ready(self): 1004 assert not self.ts.ready() 1005 1006 def test_waiting(self): 1007 assert self.ts.waiting() 1008 1009 def test_join(self): 1010 with pytest.raises(TimeoutError): 1011 self.ts.join(timeout=0.001) 1012 1013 def test_join_longer(self): 1014 with pytest.raises(TimeoutError): 1015 self.ts.join(timeout=1) 1016 1017 1018class test_EagerResult: 1019 1020 def setup(self): 1021 @self.app.task(shared=False) 1022 def raising(x, y): 1023 raise KeyError(x, y) 1024 self.raising = raising 1025 1026 def test_wait_raises(self): 1027 res = self.raising.apply(args=[3, 3]) 1028 with pytest.raises(KeyError): 1029 res.wait() 1030 assert res.wait(propagate=False) 1031 1032 def test_wait(self): 1033 res = EagerResult('x', 'x', states.RETRY) 1034 res.wait() 1035 assert res.state == states.RETRY 1036 assert res.status == states.RETRY 1037 1038 def test_forget(self): 1039 res = EagerResult('x', 'x', states.RETRY) 1040 res.forget() 1041 1042 def test_revoke(self): 1043 res = self.raising.apply(args=[3, 3]) 1044 assert not res.revoke() 1045 1046 @patch('celery.result.task_join_will_block') 1047 def test_get_sync_subtask_option(self, task_join_will_block): 1048 task_join_will_block.return_value = True 1049 tid = uuid() 1050 res_subtask_async = EagerResult(tid, 'x', 'x', states.SUCCESS) 1051 with pytest.raises(RuntimeError): 1052 res_subtask_async.get() 1053 res_subtask_async.get(disable_sync_subtasks=False) 1054 1055 1056class test_tuples: 1057 1058 def test_AsyncResult(self): 1059 x = self.app.AsyncResult(uuid()) 1060 assert x, result_from_tuple(x.as_tuple() == self.app) 1061 assert x, result_from_tuple(x == self.app) 1062 1063 def test_with_parent(self): 1064 x = self.app.AsyncResult(uuid()) 1065 x.parent = self.app.AsyncResult(uuid()) 1066 y = result_from_tuple(x.as_tuple(), self.app) 1067 assert y == x 1068 assert y.parent == x.parent 1069 assert isinstance(y.parent, AsyncResult) 1070 1071 def test_compat(self): 1072 uid = uuid() 1073 x = result_from_tuple([uid, []], app=self.app) 1074 assert x.id == uid 1075 1076 def test_as_list(self): 1077 uid = uuid() 1078 x = self.app.AsyncResult(uid) 1079 assert x.id == x.as_list()[0] 1080 assert isinstance(x.as_list(), list) 1081 1082 def test_GroupResult(self): 1083 x = self.app.GroupResult( 1084 uuid(), [self.app.AsyncResult(uuid()) for _ in range(10)], 1085 ) 1086 assert x, result_from_tuple(x.as_tuple() == self.app) 1087 assert x, result_from_tuple(x == self.app) 1088 1089 def test_GroupResult_with_parent(self): 1090 parent = self.app.AsyncResult(uuid()) 1091 result = self.app.GroupResult( 1092 uuid(), [self.app.AsyncResult(uuid()) for _ in range(10)], 1093 parent 1094 ) 1095 second_result = result_from_tuple(result.as_tuple(), self.app) 1096 assert second_result == result 1097 assert second_result.parent == parent 1098 1099 def test_GroupResult_as_tuple(self): 1100 parent = self.app.AsyncResult(uuid()) 1101 result = self.app.GroupResult( 1102 'group-result-1', 1103 [self.app.AsyncResult('async-result-{}'.format(i)) 1104 for i in range(2)], 1105 parent 1106 ) 1107 (result_id, parent_tuple), group_results = result.as_tuple() 1108 assert result_id == result.id 1109 assert parent_tuple == parent.as_tuple() 1110 assert parent_tuple[0][0] == parent.id 1111 assert isinstance(group_results, list) 1112 expected_grp_res = [(('async-result-{}'.format(i), None), None) 1113 for i in range(2)] 1114 assert group_results == expected_grp_res 1115