1from __future__ import absolute_import, unicode_literals
2
3import copy
4import datetime
5import traceback
6from contextlib import contextmanager
7
8import pytest
9from case import Mock, call, patch, skip
10
11from celery import states, uuid
12from celery.app.task import Context
13from celery.backends.base import SyncBackendMixin
14from celery.exceptions import (CPendingDeprecationWarning,
15                               ImproperlyConfigured, IncompleteStream,
16                               TimeoutError)
17from celery.five import range
18from celery.result import (AsyncResult, EagerResult, GroupResult, ResultSet,
19                           assert_will_not_block, result_from_tuple)
20from celery.utils.serialization import pickle
21
22PYTRACEBACK = """\
23Traceback (most recent call last):
24  File "foo.py", line 2, in foofunc
25    don't matter
26  File "bar.py", line 3, in barfunc
27    don't matter
28Doesn't matter: really!\
29"""
30
31
32def mock_task(name, state, result, traceback=None):
33    return {
34        'id': uuid(), 'name': name, 'state': state,
35        'result': result, 'traceback': traceback,
36    }
37
38
39def save_result(app, task):
40    traceback = task.get('traceback') or 'Some traceback'
41    if task['state'] == states.SUCCESS:
42        app.backend.mark_as_done(task['id'], task['result'])
43    elif task['state'] == states.RETRY:
44        app.backend.mark_as_retry(
45            task['id'], task['result'], traceback=traceback,
46        )
47    else:
48        app.backend.mark_as_failure(
49            task['id'], task['result'], traceback=traceback,
50        )
51
52
53def make_mock_group(app, size=10):
54    tasks = [mock_task('ts%d' % i, states.SUCCESS, i) for i in range(size)]
55    [save_result(app, task) for task in tasks]
56    return [app.AsyncResult(task['id']) for task in tasks]
57
58
59class _MockBackend:
60    def add_pending_result(self, *args, **kwargs):
61        return True
62
63    def wait_for_pending(self, *args, **kwargs):
64        return True
65
66
67class test_AsyncResult:
68
69    def setup(self):
70        self.app.conf.result_cache_max = 100
71        self.app.conf.result_serializer = 'pickle'
72        self.app.conf.result_extended = True
73        self.task1 = mock_task('task1', states.SUCCESS, 'the')
74        self.task2 = mock_task('task2', states.SUCCESS, 'quick')
75        self.task3 = mock_task('task3', states.FAILURE, KeyError('brown'))
76        self.task4 = mock_task('task3', states.RETRY, KeyError('red'))
77        self.task5 = mock_task(
78            'task3', states.FAILURE, KeyError('blue'), PYTRACEBACK,
79        )
80        self.task6 = mock_task('task6', states.SUCCESS, None)
81        for task in (self.task1, self.task2,
82                     self.task3, self.task4, self.task5, self.task6):
83            save_result(self.app, task)
84
85        @self.app.task(shared=False)
86        def mytask():
87            pass
88        self.mytask = mytask
89
90    def test_forget(self):
91        first = Mock()
92        second = self.app.AsyncResult(self.task1['id'], parent=first)
93        third = self.app.AsyncResult(self.task2['id'], parent=second)
94        last = self.app.AsyncResult(self.task3['id'], parent=third)
95        last.forget()
96        first.forget.assert_called_once()
97        assert last.result is None
98        assert second.result is None
99
100    def test_ignored_getter(self):
101        result = self.app.AsyncResult(uuid())
102        assert result.ignored is False
103        result.__delattr__('_ignored')
104        assert result.ignored is False
105
106    @patch('celery.result.task_join_will_block')
107    def test_assert_will_not_block(self, task_join_will_block):
108        task_join_will_block.return_value = True
109        with pytest.raises(RuntimeError):
110            assert_will_not_block()
111        task_join_will_block.return_value = False
112        assert_will_not_block()
113
114    @patch('celery.result.task_join_will_block')
115    def test_get_sync_subtask_option(self, task_join_will_block):
116        task_join_will_block.return_value = True
117        tid = uuid()
118        backend = _MockBackend()
119        res_subtask_async = AsyncResult(tid, backend=backend)
120        with pytest.raises(RuntimeError):
121            res_subtask_async.get()
122        res_subtask_async.get(disable_sync_subtasks=False)
123
124    def test_without_id(self):
125        with pytest.raises(ValueError):
126            AsyncResult(None, app=self.app)
127
128    def test_compat_properties(self):
129        x = self.app.AsyncResult('1')
130        assert x.task_id == x.id
131        x.task_id = '2'
132        assert x.id == '2'
133
134    @pytest.mark.usefixtures('depends_on_current_app')
135    def test_reduce_direct(self):
136        x = AsyncResult('1', app=self.app)
137        fun, args = x.__reduce__()
138        assert fun(*args) == x
139
140    def test_children(self):
141        x = self.app.AsyncResult('1')
142        children = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
143        x._cache = {'children': children, 'status': states.SUCCESS}
144        x.backend = Mock()
145        assert x.children
146        assert len(x.children) == 3
147
148    def test_propagates_for_parent(self):
149        x = self.app.AsyncResult(uuid())
150        x.backend = Mock(name='backend')
151        x.backend.get_task_meta.return_value = {}
152        x.backend.wait_for_pending.return_value = 84
153        x.parent = EagerResult(uuid(), KeyError('foo'), states.FAILURE)
154        with pytest.raises(KeyError):
155            x.get(propagate=True)
156        x.backend.wait_for_pending.assert_not_called()
157
158        x.parent = EagerResult(uuid(), 42, states.SUCCESS)
159        assert x.get(propagate=True) == 84
160        x.backend.wait_for_pending.assert_called()
161
162    def test_get_children(self):
163        tid = uuid()
164        x = self.app.AsyncResult(tid)
165        child = [self.app.AsyncResult(uuid()).as_tuple()
166                 for i in range(10)]
167        x._cache = {'children': child}
168        assert x.children
169        assert len(x.children) == 10
170
171        x._cache = {'status': states.SUCCESS}
172        x.backend._cache[tid] = {'result': None}
173        assert x.children is None
174
175    def test_build_graph_get_leaf_collect(self):
176        x = self.app.AsyncResult('1')
177        x.backend._cache['1'] = {'status': states.SUCCESS, 'result': None}
178        c = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
179        x.iterdeps = Mock()
180        x.iterdeps.return_value = (
181            (None, x),
182            (x, c[0]),
183            (c[0], c[1]),
184            (c[1], c[2])
185        )
186        x.backend.READY_STATES = states.READY_STATES
187        assert x.graph
188        assert x.get_leaf() == 2
189
190        it = x.collect()
191        assert list(it) == [
192            (x, None),
193            (c[0], 0),
194            (c[1], 1),
195            (c[2], 2),
196        ]
197
198    def test_iterdeps(self):
199        x = self.app.AsyncResult('1')
200        c = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
201        x._cache = {'status': states.SUCCESS, 'result': None, 'children': c}
202        for child in c:
203            child.backend = Mock()
204            child.backend.get_children.return_value = []
205        it = x.iterdeps()
206        assert list(it) == [
207            (None, x),
208            (x, c[0]),
209            (x, c[1]),
210            (x, c[2]),
211        ]
212        x._cache = None
213        x.ready = Mock()
214        x.ready.return_value = False
215        with pytest.raises(IncompleteStream):
216            list(x.iterdeps())
217        list(x.iterdeps(intermediate=True))
218
219    def test_eq_not_implemented(self):
220        assert self.app.AsyncResult('1') != object()
221
222    @pytest.mark.usefixtures('depends_on_current_app')
223    def test_reduce(self):
224        a1 = self.app.AsyncResult('uuid')
225        restored = pickle.loads(pickle.dumps(a1))
226        assert restored.id == 'uuid'
227
228        a2 = self.app.AsyncResult('uuid')
229        assert pickle.loads(pickle.dumps(a2)).id == 'uuid'
230
231    def test_maybe_set_cache_empty(self):
232        self.app.AsyncResult('uuid')._maybe_set_cache(None)
233
234    def test_set_cache__children(self):
235        r1 = self.app.AsyncResult('id1')
236        r2 = self.app.AsyncResult('id2')
237        r1._set_cache({'children': [r2.as_tuple()]})
238        assert r2 in r1.children
239
240    def test_successful(self):
241        ok_res = self.app.AsyncResult(self.task1['id'])
242        nok_res = self.app.AsyncResult(self.task3['id'])
243        nok_res2 = self.app.AsyncResult(self.task4['id'])
244
245        assert ok_res.successful()
246        assert not nok_res.successful()
247        assert not nok_res2.successful()
248
249        pending_res = self.app.AsyncResult(uuid())
250        assert not pending_res.successful()
251
252    def test_raising(self):
253        notb = self.app.AsyncResult(self.task3['id'])
254        withtb = self.app.AsyncResult(self.task5['id'])
255
256        with pytest.raises(KeyError):
257            notb.get()
258        with pytest.raises(KeyError) as excinfo:
259            withtb.get()
260
261        tb = [t.strip() for t in traceback.format_tb(excinfo.tb)]
262        assert 'File "foo.py", line 2, in foofunc' not in tb
263        assert 'File "bar.py", line 3, in barfunc' not in tb
264        assert excinfo.value.args[0] == 'blue'
265        assert excinfo.typename == 'KeyError'
266
267    @skip.unless_module('tblib')
268    def test_raising_remote_tracebacks(self):
269        withtb = self.app.AsyncResult(self.task5['id'])
270        self.app.conf.task_remote_tracebacks = True
271        with pytest.raises(KeyError) as excinfo:
272            withtb.get()
273        tb = [t.strip() for t in traceback.format_tb(excinfo.tb)]
274        assert 'File "foo.py", line 2, in foofunc' in tb
275        assert 'File "bar.py", line 3, in barfunc' in tb
276        assert excinfo.value.args[0] == 'blue'
277        assert excinfo.typename == 'KeyError'
278
279    def test_str(self):
280        ok_res = self.app.AsyncResult(self.task1['id'])
281        ok2_res = self.app.AsyncResult(self.task2['id'])
282        nok_res = self.app.AsyncResult(self.task3['id'])
283        assert str(ok_res) == self.task1['id']
284        assert str(ok2_res) == self.task2['id']
285        assert str(nok_res) == self.task3['id']
286
287        pending_id = uuid()
288        pending_res = self.app.AsyncResult(pending_id)
289        assert str(pending_res) == pending_id
290
291    def test_repr(self):
292        ok_res = self.app.AsyncResult(self.task1['id'])
293        ok2_res = self.app.AsyncResult(self.task2['id'])
294        nok_res = self.app.AsyncResult(self.task3['id'])
295        assert repr(ok_res) == '<AsyncResult: %s>' % (self.task1['id'],)
296        assert repr(ok2_res) == '<AsyncResult: %s>' % (self.task2['id'],)
297        assert repr(nok_res) == '<AsyncResult: %s>' % (self.task3['id'],)
298
299        pending_id = uuid()
300        pending_res = self.app.AsyncResult(pending_id)
301        assert repr(pending_res) == '<AsyncResult: %s>' % (pending_id,)
302
303    def test_hash(self):
304        assert (hash(self.app.AsyncResult('x0w991')) ==
305                hash(self.app.AsyncResult('x0w991')))
306        assert (hash(self.app.AsyncResult('x0w991')) !=
307                hash(self.app.AsyncResult('x1w991')))
308
309    def test_get_traceback(self):
310        ok_res = self.app.AsyncResult(self.task1['id'])
311        nok_res = self.app.AsyncResult(self.task3['id'])
312        nok_res2 = self.app.AsyncResult(self.task4['id'])
313        assert not ok_res.traceback
314        assert nok_res.traceback
315        assert nok_res2.traceback
316
317        pending_res = self.app.AsyncResult(uuid())
318        assert not pending_res.traceback
319
320    def test_get__backend_gives_None(self):
321        res = self.app.AsyncResult(self.task1['id'])
322        res.backend.wait_for = Mock(name='wait_for')
323        res.backend.wait_for.return_value = None
324        assert res.get() is None
325
326    def test_get(self):
327        ok_res = self.app.AsyncResult(self.task1['id'])
328        ok2_res = self.app.AsyncResult(self.task2['id'])
329        nok_res = self.app.AsyncResult(self.task3['id'])
330        nok2_res = self.app.AsyncResult(self.task4['id'])
331        none_res = self.app.AsyncResult(self.task6['id'])
332
333        callback = Mock(name='callback')
334
335        assert ok_res.get(callback=callback) == 'the'
336        callback.assert_called_with(ok_res.id, 'the')
337        assert ok2_res.get() == 'quick'
338        with pytest.raises(KeyError):
339            nok_res.get()
340        assert nok_res.get(propagate=False)
341        assert isinstance(nok2_res.result, KeyError)
342        assert ok_res.info == 'the'
343        assert none_res.get() is None
344        assert none_res.state == states.SUCCESS
345
346    def test_get_when_ignored(self):
347        result = self.app.AsyncResult(uuid())
348        result.ignored = True
349        # Does not block
350        assert result.get() is None
351
352    def test_eq_ne(self):
353        r1 = self.app.AsyncResult(self.task1['id'])
354        r2 = self.app.AsyncResult(self.task1['id'])
355        r3 = self.app.AsyncResult(self.task2['id'])
356        assert r1 == r2
357        assert r1 != r3
358        assert r1 == r2.id
359        assert r1 != r3.id
360
361    @pytest.mark.usefixtures('depends_on_current_app')
362    def test_reduce_restore(self):
363        r1 = self.app.AsyncResult(self.task1['id'])
364        fun, args = r1.__reduce__()
365        assert fun(*args) == r1
366
367    def test_get_timeout(self):
368        res = self.app.AsyncResult(self.task4['id'])  # has RETRY state
369        with pytest.raises(TimeoutError):
370            res.get(timeout=0.001)
371
372        pending_res = self.app.AsyncResult(uuid())
373        with patch('celery.result.time') as _time:
374            with pytest.raises(TimeoutError):
375                pending_res.get(timeout=0.001, interval=0.001)
376                _time.sleep.assert_called_with(0.001)
377
378    def test_get_timeout_longer(self):
379        res = self.app.AsyncResult(self.task4['id'])  # has RETRY state
380        with patch('celery.result.time') as _time:
381            with pytest.raises(TimeoutError):
382                res.get(timeout=1, interval=1)
383                _time.sleep.assert_called_with(1)
384
385    def test_ready(self):
386        oks = (self.app.AsyncResult(self.task1['id']),
387               self.app.AsyncResult(self.task2['id']),
388               self.app.AsyncResult(self.task3['id']))
389        assert all(result.ready() for result in oks)
390        assert not self.app.AsyncResult(self.task4['id']).ready()
391
392        assert not self.app.AsyncResult(uuid()).ready()
393
394    def test_del(self):
395        with patch('celery.result.AsyncResult.backend') as backend:
396            result = self.app.AsyncResult(self.task1['id'])
397            result_clone = copy.copy(result)
398            del result
399            assert backend.remove_pending_result.called_once_with(
400                result_clone
401            )
402
403        result = self.app.AsyncResult(self.task1['id'])
404        result.backend = None
405        del result
406
407    def test_get_request_meta(self):
408
409        x = self.app.AsyncResult('1')
410        request = Context(
411            task='foo',
412            children=None,
413            args=['one', 'two'],
414            kwargs={'kwarg1': 'three'},
415            hostname="foo",
416            retries=1,
417            delivery_info={'routing_key': 'celery'}
418        )
419        x.backend.store_result(task_id="1", result='foo', state=states.SUCCESS,
420                               traceback=None, request=request)
421        assert x.name == 'foo'
422        assert x.args == ['one', 'two']
423        assert x.kwargs == {'kwarg1': 'three'}
424        assert x.worker == 'foo'
425        assert x.retries == 1
426        assert x.queue == 'celery'
427        assert isinstance(x.date_done, datetime.datetime)
428        assert x.task_id == "1"
429        assert x.state == "SUCCESS"
430        result = self.app.AsyncResult(self.task4['id'])
431        assert result.date_done is None
432
433    @pytest.mark.parametrize('result_dict, date', [
434        ({'date_done': None}, None),
435        ({'date_done': '1991-10-05T05:41:06'},
436         datetime.datetime(1991, 10, 5, 5, 41, 6)),
437        ({'date_done': datetime.datetime(1991, 10, 5, 5, 41, 6)},
438         datetime.datetime(1991, 10, 5, 5, 41, 6))
439    ])
440    def test_date_done(self, result_dict, date):
441        result = self.app.AsyncResult(uuid())
442        result._cache = result_dict
443        assert result.date_done == date
444
445
446class test_ResultSet:
447
448    def test_resultset_repr(self):
449        assert repr(self.app.ResultSet(
450            [self.app.AsyncResult(t) for t in ['1', '2', '3']]))
451
452    def test_eq_other(self):
453        assert self.app.ResultSet([
454            self.app.AsyncResult(t) for t in [1, 3, 3]]) != 1
455        rs1 = self.app.ResultSet([self.app.AsyncResult(1)])
456        rs2 = self.app.ResultSet([self.app.AsyncResult(1)])
457        assert rs1 == rs2
458
459    def test_get(self):
460        x = self.app.ResultSet([self.app.AsyncResult(t) for t in [1, 2, 3]])
461        b = x.results[0].backend = Mock()
462        b.supports_native_join = False
463        x.join_native = Mock()
464        x.join = Mock()
465        x.get()
466        x.join.assert_called()
467        b.supports_native_join = True
468        x.get()
469        x.join_native.assert_called()
470
471    @patch('celery.result.task_join_will_block')
472    def test_get_sync_subtask_option(self, task_join_will_block):
473        task_join_will_block.return_value = True
474        x = self.app.ResultSet([self.app.AsyncResult(str(t)) for t in [1, 2, 3]])
475        b = x.results[0].backend = Mock()
476        b.supports_native_join = False
477        with pytest.raises(RuntimeError):
478            x.get()
479        with pytest.raises(TimeoutError):
480            x.get(disable_sync_subtasks=False, timeout=0.1)
481
482    def test_join_native_with_group_chain_group(self):
483        """Test group(chain(group)) case, join_native can be run correctly.
484        In group(chain(group)) case, GroupResult has no _cache property, and
485        AsyncBackendMixin.iter_native returns a node instead of node._cache,
486        this test make sure ResultSet.join_native can process correctly both
487        values of AsyncBackendMixin.iter_native returns.
488        """
489        def _get_meta(tid, result=None, children=None):
490            return {
491                'status': states.SUCCESS,
492                'result': result,
493                'children': children,
494                'task_id': tid,
495            }
496
497        results = [self.app.AsyncResult(t) for t in [1, 2, 3]]
498        values = [(_.id, _get_meta(_.id, _)) for _ in results]
499        g_res = GroupResult(6, [self.app.AsyncResult(t) for t in [4, 5]])
500        results += [g_res]
501        values += [(6, g_res.children)]
502        x = self.app.ResultSet(results)
503        x.results[0].backend = Mock()
504        x.results[0].backend.join = Mock()
505        x.results[3][0].get = Mock()
506        x.results[3][0].get.return_value = g_res.results[0]
507        x.results[3][1].get = Mock()
508        x.results[3][1].get.return_value = g_res.results[1]
509        x.iter_native = Mock()
510        x.iter_native.return_value = values.__iter__()
511        x.join_native()
512        x.iter_native.assert_called()
513
514    def test_eq_ne(self):
515        g1 = self.app.ResultSet([
516            self.app.AsyncResult('id1'),
517            self.app.AsyncResult('id2'),
518        ])
519        g2 = self.app.ResultSet([
520            self.app.AsyncResult('id1'),
521            self.app.AsyncResult('id2'),
522        ])
523        g3 = self.app.ResultSet([
524            self.app.AsyncResult('id3'),
525            self.app.AsyncResult('id1'),
526        ])
527        assert g1 == g2
528        assert g1 != g3
529        assert g1 != object()
530
531    def test_takes_app_from_first_task(self):
532        x = ResultSet([self.app.AsyncResult('id1')])
533        assert x.app is x.results[0].app
534        x.app = self.app
535        assert x.app is self.app
536
537    def test_get_empty(self):
538        x = self.app.ResultSet([])
539        assert x.supports_native_join is None
540        x.join = Mock(name='join')
541        x.get()
542        x.join.assert_called()
543
544    def test_add(self):
545        x = self.app.ResultSet([self.app.AsyncResult(1)])
546        x.add(self.app.AsyncResult(2))
547        assert len(x) == 2
548        x.add(self.app.AsyncResult(2))
549        assert len(x) == 2
550
551    @contextmanager
552    def dummy_copy(self):
553        with patch('celery.result.copy') as copy:
554
555            def passt(arg):
556                return arg
557            copy.side_effect = passt
558
559            yield
560
561    def test_iterate_respects_subpolling_interval(self):
562        r1 = self.app.AsyncResult(uuid())
563        r2 = self.app.AsyncResult(uuid())
564        backend = r1.backend = r2.backend = Mock()
565        backend.subpolling_interval = 10
566
567        ready = r1.ready = r2.ready = Mock()
568
569        def se(*args, **kwargs):
570            ready.side_effect = KeyError()
571            return False
572        ready.return_value = False
573        ready.side_effect = se
574
575        x = self.app.ResultSet([r1, r2])
576        with self.dummy_copy():
577            with patch('celery.result.time') as _time:
578                with pytest.warns(CPendingDeprecationWarning):
579                    with pytest.raises(KeyError):
580                        list(x.iterate())
581                _time.sleep.assert_called_with(10)
582
583            backend.subpolling_interval = 0
584            with patch('celery.result.time') as _time:
585                with pytest.warns(CPendingDeprecationWarning):
586                    with pytest.raises(KeyError):
587                        ready.return_value = False
588                        ready.side_effect = se
589                        list(x.iterate())
590                    _time.sleep.assert_not_called()
591
592    def test_times_out(self):
593        r1 = self.app.AsyncResult(uuid)
594        r1.ready = Mock()
595        r1.ready.return_value = False
596        x = self.app.ResultSet([r1])
597        with self.dummy_copy():
598            with patch('celery.result.time'):
599                with pytest.warns(CPendingDeprecationWarning):
600                    with pytest.raises(TimeoutError):
601                        list(x.iterate(timeout=1))
602
603    def test_add_discard(self):
604        x = self.app.ResultSet([])
605        x.add(self.app.AsyncResult('1'))
606        assert self.app.AsyncResult('1') in x.results
607        x.discard(self.app.AsyncResult('1'))
608        x.discard(self.app.AsyncResult('1'))
609        x.discard('1')
610        assert self.app.AsyncResult('1') not in x.results
611
612        x.update([self.app.AsyncResult('2')])
613
614    def test_clear(self):
615        x = self.app.ResultSet([])
616        r = x.results
617        x.clear()
618        assert x.results is r
619
620
621class MockAsyncResultFailure(AsyncResult):
622
623    @property
624    def result(self):
625        return KeyError('baz')
626
627    @property
628    def state(self):
629        return states.FAILURE
630
631    def get(self, propagate=True, **kwargs):
632        if propagate:
633            raise self.result
634        return self.result
635
636
637class MockAsyncResultSuccess(AsyncResult):
638    forgotten = False
639
640    def __init__(self, *args, **kwargs):
641        self._result = kwargs.pop('result', 42)
642        super(MockAsyncResultSuccess, self).__init__(*args, **kwargs)
643
644    def forget(self):
645        self.forgotten = True
646
647    @property
648    def result(self):
649        return self._result
650
651    @property
652    def state(self):
653        return states.SUCCESS
654
655    def get(self, **kwargs):
656        return self.result
657
658
659class SimpleBackend(SyncBackendMixin):
660    ids = []
661
662    def __init__(self, ids=[]):
663        self.ids = ids
664
665    def _ensure_not_eager(self):
666        pass
667
668    def get_many(self, *args, **kwargs):
669        return ((id, {'result': i, 'status': states.SUCCESS})
670                for i, id in enumerate(self.ids))
671
672
673class test_GroupResult:
674
675    def setup(self):
676        self.size = 10
677        self.ts = self.app.GroupResult(
678            uuid(), make_mock_group(self.app, self.size),
679        )
680
681    @pytest.mark.usefixtures('depends_on_current_app')
682    def test_is_pickleable(self):
683        ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
684        assert pickle.loads(pickle.dumps(ts)) == ts
685        ts2 = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
686        assert pickle.loads(pickle.dumps(ts2)) == ts2
687
688    @pytest.mark.usefixtures('depends_on_current_app')
689    def test_reduce(self):
690        ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
691        fun, args = ts.__reduce__()
692        ts2 = fun(*args)
693        assert ts2.id == ts.id
694        assert ts == ts2
695
696    def test_eq_ne(self):
697        ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
698        ts2 = self.app.GroupResult(ts.id, ts.results)
699        ts3 = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
700        ts4 = self.app.GroupResult(ts.id, [self.app.AsyncResult(uuid())])
701        assert ts == ts2
702        assert ts != ts3
703        assert ts != ts4
704        assert ts != object()
705
706    def test_len(self):
707        assert len(self.ts) == self.size
708
709    def test_eq_other(self):
710        assert self.ts != 1
711
712    def test_eq_with_parent(self):
713        # GroupResult instances with different .parent are not equal
714        grp_res = self.app.GroupResult(
715            uuid(), [self.app.AsyncResult(uuid()) for _ in range(10)],
716            parent=self.app.AsyncResult(uuid())
717        )
718        grp_res_2 = self.app.GroupResult(grp_res.id, grp_res.results)
719        assert grp_res != grp_res_2
720
721        grp_res_2.parent = self.app.AsyncResult(uuid())
722        assert grp_res != grp_res_2
723
724        grp_res_2.parent = grp_res.parent
725        assert grp_res == grp_res_2
726
727    @pytest.mark.usefixtures('depends_on_current_app')
728    def test_pickleable(self):
729        assert pickle.loads(pickle.dumps(self.ts))
730
731    def test_iterate_raises(self):
732        ar = MockAsyncResultFailure(uuid(), app=self.app)
733        ts = self.app.GroupResult(uuid(), [ar])
734        with pytest.warns(CPendingDeprecationWarning):
735            it = ts.iterate()
736        with pytest.raises(KeyError):
737            next(it)
738
739    def test_forget(self):
740        subs = [MockAsyncResultSuccess(uuid(), app=self.app),
741                MockAsyncResultSuccess(uuid(), app=self.app)]
742        ts = self.app.GroupResult(uuid(), subs)
743        ts.forget()
744        for sub in subs:
745            assert sub.forgotten
746
747    def test_get_nested_without_native_join(self):
748        backend = SimpleBackend()
749        backend.supports_native_join = False
750        ts = self.app.GroupResult(uuid(), [
751            MockAsyncResultSuccess(uuid(), result='1.1',
752                                   app=self.app, backend=backend),
753            self.app.GroupResult(uuid(), [
754                MockAsyncResultSuccess(uuid(), result='2.1',
755                                       app=self.app, backend=backend),
756                self.app.GroupResult(uuid(), [
757                    MockAsyncResultSuccess(uuid(), result='3.1',
758                                           app=self.app, backend=backend),
759                    MockAsyncResultSuccess(uuid(), result='3.2',
760                                           app=self.app, backend=backend),
761                ]),
762            ]),
763        ])
764        ts.app.backend = backend
765
766        vals = ts.get()
767        assert vals == [
768            '1.1',
769            [
770                '2.1',
771                [
772                    '3.1',
773                    '3.2',
774                ]
775            ],
776        ]
777
778    def test_getitem(self):
779        subs = [MockAsyncResultSuccess(uuid(), app=self.app),
780                MockAsyncResultSuccess(uuid(), app=self.app)]
781        ts = self.app.GroupResult(uuid(), subs)
782        assert ts[0] is subs[0]
783
784    def test_save_restore(self):
785        subs = [MockAsyncResultSuccess(uuid(), app=self.app),
786                MockAsyncResultSuccess(uuid(), app=self.app)]
787        ts = self.app.GroupResult(uuid(), subs)
788        ts.save()
789        with pytest.raises(AttributeError):
790            ts.save(backend=object())
791        assert self.app.GroupResult.restore(ts.id).results == ts.results
792        ts.delete()
793        assert self.app.GroupResult.restore(ts.id) is None
794        with pytest.raises(AttributeError):
795            self.app.GroupResult.restore(ts.id, backend=object())
796
797    def test_save_restore_empty(self):
798        subs = []
799        ts = self.app.GroupResult(uuid(), subs)
800        ts.save()
801        assert isinstance(
802            self.app.GroupResult.restore(ts.id),
803            self.app.GroupResult,
804        )
805        assert self.app.GroupResult.restore(ts.id).results == ts.results == []
806
807    def test_restore_app(self):
808        subs = [MockAsyncResultSuccess(uuid(), app=self.app)]
809        ts = self.app.GroupResult(uuid(), subs)
810        ts.save()
811        restored = GroupResult.restore(ts.id, app=self.app)
812        assert restored.id == ts.id
813
814    def test_restore_current_app_fallback(self):
815        subs = [MockAsyncResultSuccess(uuid(), app=self.app)]
816        ts = self.app.GroupResult(uuid(), subs)
817        ts.save()
818        with pytest.raises(RuntimeError,
819                           match="Test depends on current_app"):
820            GroupResult.restore(ts.id)
821
822    def test_join_native(self):
823        backend = SimpleBackend()
824        results = [self.app.AsyncResult(uuid(), backend=backend)
825                   for i in range(10)]
826        ts = self.app.GroupResult(uuid(), results)
827        ts.app.backend = backend
828        backend.ids = [result.id for result in results]
829        res = ts.join_native()
830        assert res == list(range(10))
831        callback = Mock(name='callback')
832        assert not ts.join_native(callback=callback)
833        callback.assert_has_calls([
834            call(r.id, i) for i, r in enumerate(ts.results)
835        ])
836
837    def test_join_native_raises(self):
838        ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
839        ts.iter_native = Mock()
840        ts.iter_native.return_value = iter([
841            (uuid(), {'status': states.FAILURE, 'result': KeyError()})
842        ])
843        with pytest.raises(KeyError):
844            ts.join_native(propagate=True)
845
846    def test_failed_join_report(self):
847        res = Mock()
848        ts = self.app.GroupResult(uuid(), [res])
849        res.state = states.FAILURE
850        res.backend.is_cached.return_value = True
851        assert next(ts._failed_join_report()) is res
852        res.backend.is_cached.return_value = False
853        with pytest.raises(StopIteration):
854            next(ts._failed_join_report())
855
856    def test_repr(self):
857        assert repr(
858            self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())]))
859
860    def test_children_is_results(self):
861        ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
862        assert ts.children is ts.results
863
864    def test_iter_native(self):
865        backend = SimpleBackend()
866        results = [self.app.AsyncResult(uuid(), backend=backend)
867                   for i in range(10)]
868        ts = self.app.GroupResult(uuid(), results)
869        ts.app.backend = backend
870        backend.ids = [result.id for result in results]
871        assert len(list(ts.iter_native())) == 10
872
873    def test_iterate_yields(self):
874        ar = MockAsyncResultSuccess(uuid(), app=self.app)
875        ar2 = MockAsyncResultSuccess(uuid(), app=self.app)
876        ts = self.app.GroupResult(uuid(), [ar, ar2])
877        with pytest.warns(CPendingDeprecationWarning):
878            it = ts.iterate()
879        assert next(it) == 42
880        assert next(it) == 42
881
882    def test_iterate_eager(self):
883        ar1 = EagerResult(uuid(), 42, states.SUCCESS)
884        ar2 = EagerResult(uuid(), 42, states.SUCCESS)
885        ts = self.app.GroupResult(uuid(), [ar1, ar2])
886        with pytest.warns(CPendingDeprecationWarning):
887            it = ts.iterate()
888        assert next(it) == 42
889        assert next(it) == 42
890
891    def test_join_timeout(self):
892        ar = MockAsyncResultSuccess(uuid(), app=self.app)
893        ar2 = MockAsyncResultSuccess(uuid(), app=self.app)
894        ar3 = self.app.AsyncResult(uuid())
895        ts = self.app.GroupResult(uuid(), [ar, ar2, ar3])
896        with pytest.raises(TimeoutError):
897            ts.join(timeout=0.0000001)
898
899        ar4 = self.app.AsyncResult(uuid())
900        ar4.get = Mock()
901        ts2 = self.app.GroupResult(uuid(), [ar4])
902        assert ts2.join(timeout=0.1)
903        callback = Mock(name='callback')
904        assert not ts2.join(timeout=0.1, callback=callback)
905        callback.assert_called_with(ar4.id, ar4.get())
906
907    def test_iter_native_when_empty_group(self):
908        ts = self.app.GroupResult(uuid(), [])
909        assert list(ts.iter_native()) == []
910
911    def test_iterate_simple(self):
912        with pytest.warns(CPendingDeprecationWarning):
913            it = self.ts.iterate()
914        results = sorted(list(it))
915        assert results == list(range(self.size))
916
917    def test___iter__(self):
918        assert list(iter(self.ts)) == self.ts.results
919
920    def test_join(self):
921        joined = self.ts.join()
922        assert joined == list(range(self.size))
923
924    def test_successful(self):
925        assert self.ts.successful()
926
927    def test_failed(self):
928        assert not self.ts.failed()
929
930    def test_maybe_throw(self):
931        self.ts.results = [Mock(name='r1')]
932        self.ts.maybe_throw()
933        self.ts.results[0].maybe_throw.assert_called_with(
934            callback=None, propagate=True,
935        )
936
937    def test_join__on_message(self):
938        with pytest.raises(ImproperlyConfigured):
939            self.ts.join(on_message=Mock())
940
941    def test_waiting(self):
942        assert not self.ts.waiting()
943
944    def test_ready(self):
945        assert self.ts.ready()
946
947    def test_completed_count(self):
948        assert self.ts.completed_count() == len(self.ts)
949
950
951class test_pending_AsyncResult:
952
953    def test_result(self, app):
954        res = app.AsyncResult(uuid())
955        assert res.result is None
956
957
958class test_failed_AsyncResult:
959
960    def setup(self):
961        self.size = 11
962        self.app.conf.result_serializer = 'pickle'
963        results = make_mock_group(self.app, 10)
964        failed = mock_task('ts11', states.FAILURE, KeyError('Baz'))
965        save_result(self.app, failed)
966        failed_res = self.app.AsyncResult(failed['id'])
967        self.ts = self.app.GroupResult(uuid(), results + [failed_res])
968
969    def test_completed_count(self):
970        assert self.ts.completed_count() == len(self.ts) - 1
971
972    def test_iterate_simple(self):
973        with pytest.warns(CPendingDeprecationWarning):
974            it = self.ts.iterate()
975
976        def consume():
977            return list(it)
978
979        with pytest.raises(KeyError):
980            consume()
981
982    def test_join(self):
983        with pytest.raises(KeyError):
984            self.ts.join()
985
986    def test_successful(self):
987        assert not self.ts.successful()
988
989    def test_failed(self):
990        assert self.ts.failed()
991
992
993class test_pending_Group:
994
995    def setup(self):
996        self.ts = self.app.GroupResult(
997            uuid(), [self.app.AsyncResult(uuid()),
998                     self.app.AsyncResult(uuid())])
999
1000    def test_completed_count(self):
1001        assert self.ts.completed_count() == 0
1002
1003    def test_ready(self):
1004        assert not self.ts.ready()
1005
1006    def test_waiting(self):
1007        assert self.ts.waiting()
1008
1009    def test_join(self):
1010        with pytest.raises(TimeoutError):
1011            self.ts.join(timeout=0.001)
1012
1013    def test_join_longer(self):
1014        with pytest.raises(TimeoutError):
1015            self.ts.join(timeout=1)
1016
1017
1018class test_EagerResult:
1019
1020    def setup(self):
1021        @self.app.task(shared=False)
1022        def raising(x, y):
1023            raise KeyError(x, y)
1024        self.raising = raising
1025
1026    def test_wait_raises(self):
1027        res = self.raising.apply(args=[3, 3])
1028        with pytest.raises(KeyError):
1029            res.wait()
1030        assert res.wait(propagate=False)
1031
1032    def test_wait(self):
1033        res = EagerResult('x', 'x', states.RETRY)
1034        res.wait()
1035        assert res.state == states.RETRY
1036        assert res.status == states.RETRY
1037
1038    def test_forget(self):
1039        res = EagerResult('x', 'x', states.RETRY)
1040        res.forget()
1041
1042    def test_revoke(self):
1043        res = self.raising.apply(args=[3, 3])
1044        assert not res.revoke()
1045
1046    @patch('celery.result.task_join_will_block')
1047    def test_get_sync_subtask_option(self, task_join_will_block):
1048        task_join_will_block.return_value = True
1049        tid = uuid()
1050        res_subtask_async = EagerResult(tid, 'x', 'x', states.SUCCESS)
1051        with pytest.raises(RuntimeError):
1052            res_subtask_async.get()
1053        res_subtask_async.get(disable_sync_subtasks=False)
1054
1055
1056class test_tuples:
1057
1058    def test_AsyncResult(self):
1059        x = self.app.AsyncResult(uuid())
1060        assert x, result_from_tuple(x.as_tuple() == self.app)
1061        assert x, result_from_tuple(x == self.app)
1062
1063    def test_with_parent(self):
1064        x = self.app.AsyncResult(uuid())
1065        x.parent = self.app.AsyncResult(uuid())
1066        y = result_from_tuple(x.as_tuple(), self.app)
1067        assert y == x
1068        assert y.parent == x.parent
1069        assert isinstance(y.parent, AsyncResult)
1070
1071    def test_compat(self):
1072        uid = uuid()
1073        x = result_from_tuple([uid, []], app=self.app)
1074        assert x.id == uid
1075
1076    def test_as_list(self):
1077        uid = uuid()
1078        x = self.app.AsyncResult(uid)
1079        assert x.id == x.as_list()[0]
1080        assert isinstance(x.as_list(), list)
1081
1082    def test_GroupResult(self):
1083        x = self.app.GroupResult(
1084            uuid(), [self.app.AsyncResult(uuid()) for _ in range(10)],
1085        )
1086        assert x, result_from_tuple(x.as_tuple() == self.app)
1087        assert x, result_from_tuple(x == self.app)
1088
1089    def test_GroupResult_with_parent(self):
1090        parent = self.app.AsyncResult(uuid())
1091        result = self.app.GroupResult(
1092            uuid(), [self.app.AsyncResult(uuid()) for _ in range(10)],
1093            parent
1094        )
1095        second_result = result_from_tuple(result.as_tuple(), self.app)
1096        assert second_result == result
1097        assert second_result.parent == parent
1098
1099    def test_GroupResult_as_tuple(self):
1100        parent = self.app.AsyncResult(uuid())
1101        result = self.app.GroupResult(
1102            'group-result-1',
1103            [self.app.AsyncResult('async-result-{}'.format(i))
1104             for i in range(2)],
1105            parent
1106        )
1107        (result_id, parent_tuple), group_results = result.as_tuple()
1108        assert result_id == result.id
1109        assert parent_tuple == parent.as_tuple()
1110        assert parent_tuple[0][0] == parent.id
1111        assert isinstance(group_results, list)
1112        expected_grp_res = [(('async-result-{}'.format(i), None), None)
1113                            for i in range(2)]
1114        assert group_results == expected_grp_res
1115