1.. _guide-canvas:
2
3==============================
4 Canvas: Designing Work-flows
5==============================
6
7.. contents::
8    :local:
9    :depth: 2
10
11.. _canvas-subtasks:
12
13.. _canvas-signatures:
14
15Signatures
16==========
17
18.. versionadded:: 2.0
19
20You just learned how to call a task using the tasks ``delay`` method
21in the :ref:`calling <guide-calling>` guide, and this is often all you need,
22but sometimes you may want to pass the signature of a task invocation to
23another process or as an argument to another function.
24
25A :func:`~celery.signature` wraps the arguments, keyword arguments, and execution options
26of a single task invocation in a way such that it can be passed to functions
27or even serialized and sent across the wire.
28
29- You can create a signature for the ``add`` task using its name like this:
30
31    .. code-block:: pycon
32
33        >>> from celery import signature
34        >>> signature('tasks.add', args=(2, 2), countdown=10)
35        tasks.add(2, 2)
36
37  This task has a signature of arity 2 (two arguments): ``(2, 2)``,
38  and sets the countdown execution option to 10.
39
40- or you can create one using the task's ``signature`` method:
41
42    .. code-block:: pycon
43
44        >>> add.signature((2, 2), countdown=10)
45        tasks.add(2, 2)
46
47- There's also a shortcut using star arguments:
48
49    .. code-block:: pycon
50
51        >>> add.s(2, 2)
52        tasks.add(2, 2)
53
54- Keyword arguments are also supported:
55
56    .. code-block:: pycon
57
58        >>> add.s(2, 2, debug=True)
59        tasks.add(2, 2, debug=True)
60
61- From any signature instance you can inspect the different fields:
62
63    .. code-block:: pycon
64
65        >>> s = add.signature((2, 2), {'debug': True}, countdown=10)
66        >>> s.args
67        (2, 2)
68        >>> s.kwargs
69        {'debug': True}
70        >>> s.options
71        {'countdown': 10}
72
73- It supports the "Calling API" of ``delay``,
74  ``apply_async``, etc., including being called directly (``__call__``).
75
76    Calling the signature will execute the task inline in the current process:
77
78    .. code-block:: pycon
79
80        >>> add(2, 2)
81        4
82        >>> add.s(2, 2)()
83        4
84
85    ``delay`` is our beloved shortcut to ``apply_async`` taking star-arguments:
86
87    .. code-block:: pycon
88
89        >>> result = add.delay(2, 2)
90        >>> result.get()
91        4
92
93    ``apply_async`` takes the same arguments as the
94    :meth:`Task.apply_async <@Task.apply_async>` method:
95
96    .. code-block:: pycon
97
98        >>> add.apply_async(args, kwargs, **options)
99        >>> add.signature(args, kwargs, **options).apply_async()
100
101        >>> add.apply_async((2, 2), countdown=1)
102        >>> add.signature((2, 2), countdown=1).apply_async()
103
104- You can't define options with :meth:`~@Task.s`, but a chaining
105  ``set`` call takes care of that:
106
107    .. code-block:: pycon
108
109        >>> add.s(2, 2).set(countdown=1)
110        proj.tasks.add(2, 2)
111
112Partials
113--------
114
115With a signature, you can execute the task in a worker:
116
117.. code-block:: pycon
118
119    >>> add.s(2, 2).delay()
120    >>> add.s(2, 2).apply_async(countdown=1)
121
122Or you can call it directly in the current process:
123
124.. code-block:: pycon
125
126    >>> add.s(2, 2)()
127    4
128
129Specifying additional args, kwargs, or options to ``apply_async``/``delay``
130creates partials:
131
132- Any arguments added will be prepended to the args in the signature:
133
134    .. code-block:: pycon
135
136        >>> partial = add.s(2)          # incomplete signature
137        >>> partial.delay(4)            # 4 + 2
138        >>> partial.apply_async((4,))  # same
139
140- Any keyword arguments added will be merged with the kwargs in the signature,
141  with the new keyword arguments taking precedence:
142
143    .. code-block:: pycon
144
145        >>> s = add.s(2, 2)
146        >>> s.delay(debug=True)                    # -> add(2, 2, debug=True)
147        >>> s.apply_async(kwargs={'debug': True})  # same
148
149- Any options added will be merged with the options in the signature,
150  with the new options taking precedence:
151
152    .. code-block:: pycon
153
154        >>> s = add.signature((2, 2), countdown=10)
155        >>> s.apply_async(countdown=1)  # countdown is now 1
156
157You can also clone signatures to create derivatives:
158
159.. code-block:: pycon
160
161    >>> s = add.s(2)
162    proj.tasks.add(2)
163
164    >>> s.clone(args=(4,), kwargs={'debug': True})
165    proj.tasks.add(4, 2, debug=True)
166
167Immutability
168------------
169
170.. versionadded:: 3.0
171
172Partials are meant to be used with callbacks, any tasks linked, or chord
173callbacks will be applied with the result of the parent task.
174Sometimes you want to specify a callback that doesn't take
175additional arguments, and in that case you can set the signature
176to be immutable:
177
178.. code-block:: pycon
179
180    >>> add.apply_async((2, 2), link=reset_buffers.signature(immutable=True))
181
182The ``.si()`` shortcut can also be used to create immutable signatures:
183
184.. code-block:: pycon
185
186    >>> add.apply_async((2, 2), link=reset_buffers.si())
187
188Only the execution options can be set when a signature is immutable,
189so it's not possible to call the signature with partial args/kwargs.
190
191.. note::
192
193    In this tutorial I sometimes use the prefix operator `~` to signatures.
194    You probably shouldn't use it in your production code, but it's a handy shortcut
195    when experimenting in the Python shell:
196
197    .. code-block:: pycon
198
199        >>> ~sig
200
201        >>> # is the same as
202        >>> sig.delay().get()
203
204
205.. _canvas-callbacks:
206
207Callbacks
208---------
209
210.. versionadded:: 3.0
211
212Callbacks can be added to any task using the ``link`` argument
213to ``apply_async``:
214
215.. code-block:: pycon
216
217    add.apply_async((2, 2), link=other_task.s())
218
219The callback will only be applied if the task exited successfully,
220and it will be applied with the return value of the parent task as argument.
221
222As I mentioned earlier, any arguments you add to a signature,
223will be prepended to the arguments specified by the signature itself!
224
225If you have the signature:
226
227.. code-block:: pycon
228
229    >>> sig = add.s(10)
230
231then `sig.delay(result)` becomes:
232
233.. code-block:: pycon
234
235    >>> add.apply_async(args=(result, 10))
236
237...
238
239Now let's call our ``add`` task with a callback using partial
240arguments:
241
242.. code-block:: pycon
243
244    >>> add.apply_async((2, 2), link=add.s(8))
245
246As expected this will first launch one task calculating :math:`2 + 2`, then
247another task calculating :math:`4 + 8`.
248
249The Primitives
250==============
251
252.. versionadded:: 3.0
253
254.. topic:: Overview
255
256    - ``group``
257
258        The group primitive is a signature that takes a list of tasks that should
259        be applied in parallel.
260
261    - ``chain``
262
263        The chain primitive lets us link together signatures so that one is called
264        after the other, essentially forming a *chain* of callbacks.
265
266    - ``chord``
267
268        A chord is just like a group but with a callback. A chord consists
269        of a header group and a body,  where the body is a task that should execute
270        after all of the tasks in the header are complete.
271
272    - ``map``
273
274        The map primitive works like the built-in ``map`` function, but creates
275        a temporary task where a list of arguments is applied to the task.
276        For example, ``task.map([1, 2])`` -- results in a single task
277        being called, applying the arguments in order to the task function so
278        that the result is:
279
280        .. code-block:: python
281
282            res = [task(1), task(2)]
283
284    - ``starmap``
285
286        Works exactly like map except the arguments are applied as ``*args``.
287        For example ``add.starmap([(2, 2), (4, 4)])`` results in a single
288        task calling:
289
290        .. code-block:: python
291
292            res = [add(2, 2), add(4, 4)]
293
294    - ``chunks``
295
296        Chunking splits a long list of arguments into parts, for example
297        the operation:
298
299        .. code-block:: pycon
300
301            >>> items = zip(range(1000), range(1000))  # 1000 items
302            >>> add.chunks(items, 10)
303
304        will split the list of items into chunks of 10, resulting in 100
305        tasks (each processing 10 items in sequence).
306
307
308The primitives are also signature objects themselves, so that they can be combined
309in any number of ways to compose complex work-flows.
310
311Here's some examples:
312
313- Simple chain
314
315    Here's a simple chain, the first task executes passing its return value
316    to the next task in the chain, and so on.
317
318    .. code-block:: pycon
319
320        >>> from celery import chain
321
322        >>> # 2 + 2 + 4 + 8
323        >>> res = chain(add.s(2, 2), add.s(4), add.s(8))()
324        >>> res.get()
325        16
326
327    This can also be written using pipes:
328
329    .. code-block:: pycon
330
331        >>> (add.s(2, 2) | add.s(4) | add.s(8))().get()
332        16
333
334- Immutable signatures
335
336    Signatures can be partial so arguments can be
337    added to the existing arguments, but you may not always want that,
338    for example if you don't want the result of the previous task in a chain.
339
340    In that case you can mark the signature as immutable, so that the arguments
341    cannot be changed:
342
343    .. code-block:: pycon
344
345        >>> add.signature((2, 2), immutable=True)
346
347    There's also a ``.si()`` shortcut for this, and this is the preferred way of
348    creating signatures:
349
350    .. code-block:: pycon
351
352        >>> add.si(2, 2)
353
354    Now you can create a chain of independent tasks instead:
355
356    .. code-block:: pycon
357
358        >>> res = (add.si(2, 2) | add.si(4, 4) | add.si(8, 8))()
359        >>> res.get()
360        16
361
362        >>> res.parent.get()
363        8
364
365        >>> res.parent.parent.get()
366        4
367
368- Simple group
369
370    You can easily create a group of tasks to execute in parallel:
371
372    .. code-block:: pycon
373
374        >>> from celery import group
375        >>> res = group(add.s(i, i) for i in range(10))()
376        >>> res.get(timeout=1)
377        [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
378
379- Simple chord
380
381    The chord primitive enables us to add a callback to be called when
382    all of the tasks in a group have finished executing.  This is often
383    required for algorithms that aren't *embarrassingly parallel*:
384
385    .. code-block:: pycon
386
387        >>> from celery import chord
388        >>> res = chord((add.s(i, i) for i in range(10)), xsum.s())()
389        >>> res.get()
390        90
391
392    The above example creates 10 task that all start in parallel,
393    and when all of them are complete the return values are combined
394    into a list and sent to the ``xsum`` task.
395
396    The body of a chord can also be immutable, so that the return value
397    of the group isn't passed on to the callback:
398
399    .. code-block:: pycon
400
401        >>> chord((import_contact.s(c) for c in contacts),
402        ...       notify_complete.si(import_id)).apply_async()
403
404    Note the use of ``.si`` above; this creates an immutable signature,
405    meaning any new arguments passed (including to return value of the
406    previous task) will be ignored.
407
408- Blow your mind by combining
409
410    Chains can be partial too:
411
412    .. code-block:: pycon
413
414        >>> c1 = (add.s(4) | mul.s(8))
415
416        # (16 + 4) * 8
417        >>> res = c1(16)
418        >>> res.get()
419        160
420
421    this means that you can combine chains:
422
423    .. code-block:: pycon
424
425        # ((4 + 16) * 2 + 4) * 8
426        >>> c2 = (add.s(4, 16) | mul.s(2) | (add.s(4) | mul.s(8)))
427
428        >>> res = c2()
429        >>> res.get()
430        352
431
432    Chaining a group together with another task will automatically
433    upgrade it to be a chord:
434
435    .. code-block:: pycon
436
437        >>> c3 = (group(add.s(i, i) for i in range(10)) | xsum.s())
438        >>> res = c3()
439        >>> res.get()
440        90
441
442    Groups and chords accepts partial arguments too, so in a chain
443    the return value of the previous task is forwarded to all tasks in the group:
444
445    .. code-block:: pycon
446
447
448        >>> new_user_workflow = (create_user.s() | group(
449        ...                      import_contacts.s(),
450        ...                      send_welcome_email.s()))
451        ... new_user_workflow.delay(username='artv',
452        ...                         first='Art',
453        ...                         last='Vandelay',
454        ...                         email='art@vandelay.com')
455
456
457    If you don't want to forward arguments to the group then
458    you can make the signatures in the group immutable:
459
460    .. code-block:: pycon
461
462        >>> res = (add.s(4, 4) | group(add.si(i, i) for i in range(10)))()
463        >>> res.get()
464        <GroupResult: de44df8c-821d-4c84-9a6a-44769c738f98 [
465            bc01831b-9486-4e51-b046-480d7c9b78de,
466            2650a1b8-32bf-4771-a645-b0a35dcc791b,
467            dcbee2a5-e92d-4b03-b6eb-7aec60fd30cf,
468            59f92e0a-23ea-41ce-9fad-8645a0e7759c,
469            26e1e707-eccf-4bf4-bbd8-1e1729c3cce3,
470            2d10a5f4-37f0-41b2-96ac-a973b1df024d,
471            e13d3bdb-7ae3-4101-81a4-6f17ee21df2d,
472            104b2be0-7b75-44eb-ac8e-f9220bdfa140,
473            c5c551a5-0386-4973-aa37-b65cbeb2624b,
474            83f72d71-4b71-428e-b604-6f16599a9f37]>
475
476        >>> res.parent.get()
477        8
478
479
480.. _canvas-chain:
481
482Chains
483------
484
485.. versionadded:: 3.0
486
487Tasks can be linked together: the linked task is called when the task
488returns successfully:
489
490.. code-block:: pycon
491
492    >>> res = add.apply_async((2, 2), link=mul.s(16))
493    >>> res.get()
494    4
495
496The linked task will be applied with the result of its parent
497task as the first argument. In the above case where the result was 4,
498this will result in ``mul(4, 16)``.
499
500The results will keep track of any subtasks called by the original task,
501and this can be accessed from the result instance:
502
503.. code-block:: pycon
504
505    >>> res.children
506    [<AsyncResult: 8c350acf-519d-4553-8a53-4ad3a5c5aeb4>]
507
508    >>> res.children[0].get()
509    64
510
511The result instance also has a :meth:`~@AsyncResult.collect` method
512that treats the result as a graph, enabling you to iterate over
513the results:
514
515.. code-block:: pycon
516
517    >>> list(res.collect())
518    [(<AsyncResult: 7b720856-dc5f-4415-9134-5c89def5664e>, 4),
519     (<AsyncResult: 8c350acf-519d-4553-8a53-4ad3a5c5aeb4>, 64)]
520
521By default :meth:`~@AsyncResult.collect` will raise an
522:exc:`~@IncompleteStream` exception if the graph isn't fully
523formed (one of the tasks hasn't completed yet),
524but you can get an intermediate representation of the graph
525too:
526
527.. code-block:: pycon
528
529    >>> for result, value in res.collect(intermediate=True):
530    ....
531
532You can link together as many tasks as you like,
533and signatures can be linked too:
534
535.. code-block:: pycon
536
537    >>> s = add.s(2, 2)
538    >>> s.link(mul.s(4))
539    >>> s.link(log_result.s())
540
541You can also add *error callbacks* using the `on_error` method:
542
543.. code-block:: pycon
544
545    >>> add.s(2, 2).on_error(log_error.s()).delay()
546
547This will result in the following ``.apply_async`` call when the signature
548is applied:
549
550.. code-block:: pycon
551
552    >>> add.apply_async((2, 2), link_error=log_error.s())
553
554The worker won't actually call the errback as a task, but will
555instead call the errback function directly so that the raw request, exception
556and traceback objects can be passed to it.
557
558Here's an example errback:
559
560.. code-block:: python
561
562    from __future__ import print_function
563
564    import os
565
566    from proj.celery import app
567
568    @app.task
569    def log_error(request, exc, traceback):
570        with open(os.path.join('/var/errors', request.id), 'a') as fh:
571            print('--\n\n{0} {1} {2}'.format(
572                task_id, exc, traceback), file=fh)
573
574To make it even easier to link tasks together there's
575a special signature called :class:`~celery.chain` that lets
576you chain tasks together:
577
578.. code-block:: pycon
579
580    >>> from celery import chain
581    >>> from proj.tasks import add, mul
582
583    >>> # (4 + 4) * 8 * 10
584    >>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))
585    proj.tasks.add(4, 4) | proj.tasks.mul(8) | proj.tasks.mul(10)
586
587
588Calling the chain will call the tasks in the current process
589and return the result of the last task in the chain:
590
591.. code-block:: pycon
592
593    >>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))()
594    >>> res.get()
595    640
596
597It also sets ``parent`` attributes so that you can
598work your way up the chain to get intermediate results:
599
600.. code-block:: pycon
601
602    >>> res.parent.get()
603    64
604
605    >>> res.parent.parent.get()
606    8
607
608    >>> res.parent.parent
609    <AsyncResult: eeaad925-6778-4ad1-88c8-b2a63d017933>
610
611
612Chains can also be made using the ``|`` (pipe) operator:
613
614.. code-block:: pycon
615
616    >>> (add.s(2, 2) | mul.s(8) | mul.s(10)).apply_async()
617
618Graphs
619~~~~~~
620
621In addition you can work with the result graph as a
622:class:`~celery.utils.graph.DependencyGraph`:
623
624.. code-block:: pycon
625
626    >>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))()
627
628    >>> res.parent.parent.graph
629    285fa253-fcf8-42ef-8b95-0078897e83e6(1)
630        463afec2-5ed4-4036-b22d-ba067ec64f52(0)
631    872c3995-6fa0-46ca-98c2-5a19155afcf0(2)
632        285fa253-fcf8-42ef-8b95-0078897e83e6(1)
633            463afec2-5ed4-4036-b22d-ba067ec64f52(0)
634
635You can even convert these graphs to *dot* format:
636
637.. code-block:: pycon
638
639    >>> with open('graph.dot', 'w') as fh:
640    ...     res.parent.parent.graph.to_dot(fh)
641
642
643and create images:
644
645.. code-block:: console
646
647    $ dot -Tpng graph.dot -o graph.png
648
649.. image:: ../images/result_graph.png
650
651.. _canvas-group:
652
653Groups
654------
655
656.. versionadded:: 3.0
657
658A group can be used to execute several tasks in parallel.
659
660The :class:`~celery.group` function takes a list of signatures:
661
662.. code-block:: pycon
663
664    >>> from celery import group
665    >>> from proj.tasks import add
666
667    >>> group(add.s(2, 2), add.s(4, 4))
668    (proj.tasks.add(2, 2), proj.tasks.add(4, 4))
669
670If you **call** the group, the tasks will be applied
671one after another in the current process, and a :class:`~celery.result.GroupResult`
672instance is returned that can be used to keep track of the results,
673or tell how many tasks are ready and so on:
674
675.. code-block:: pycon
676
677    >>> g = group(add.s(2, 2), add.s(4, 4))
678    >>> res = g()
679    >>> res.get()
680    [4, 8]
681
682Group also supports iterators:
683
684.. code-block:: pycon
685
686    >>> group(add.s(i, i) for i in range(100))()
687
688A group is a signature object, so it can be used in combination
689with other signatures.
690
691Group Results
692~~~~~~~~~~~~~
693
694The group task returns a special result too,
695this result works just like normal task results, except
696that it works on the group as a whole:
697
698.. code-block:: pycon
699
700    >>> from celery import group
701    >>> from tasks import add
702
703    >>> job = group([
704    ...             add.s(2, 2),
705    ...             add.s(4, 4),
706    ...             add.s(8, 8),
707    ...             add.s(16, 16),
708    ...             add.s(32, 32),
709    ... ])
710
711    >>> result = job.apply_async()
712
713    >>> result.ready()  # have all subtasks completed?
714    True
715    >>> result.successful() # were all subtasks successful?
716    True
717    >>> result.get()
718    [4, 8, 16, 32, 64]
719
720The :class:`~celery.result.GroupResult` takes a list of
721:class:`~celery.result.AsyncResult` instances and operates on them as
722if it was a single task.
723
724It supports the following operations:
725
726* :meth:`~celery.result.GroupResult.successful`
727
728    Return :const:`True` if all of the subtasks finished
729    successfully (e.g., didn't raise an exception).
730
731* :meth:`~celery.result.GroupResult.failed`
732
733    Return :const:`True` if any of the subtasks failed.
734
735* :meth:`~celery.result.GroupResult.waiting`
736
737    Return :const:`True` if any of the subtasks
738    isn't ready yet.
739
740* :meth:`~celery.result.GroupResult.ready`
741
742    Return :const:`True` if all of the subtasks
743    are ready.
744
745* :meth:`~celery.result.GroupResult.completed_count`
746
747    Return the number of completed subtasks.
748
749* :meth:`~celery.result.GroupResult.revoke`
750
751    Revoke all of the subtasks.
752
753* :meth:`~celery.result.GroupResult.join`
754
755    Gather the results of all subtasks
756    and return them in the same order as they were called (as a list).
757
758.. _canvas-chord:
759
760Chords
761------
762
763.. versionadded:: 2.3
764
765.. note::
766
767    Tasks used within a chord must *not* ignore their results. If the result
768    backend is disabled for *any* task (header or body) in your chord you
769    should read ":ref:`chord-important-notes`." Chords are not currently
770    supported with the RPC result backend.
771
772
773A chord is a task that only executes after all of the tasks in a group have
774finished executing.
775
776
777Let's calculate the sum of the expression
778:math:`1 + 1 + 2 + 2 + 3 + 3 ... n + n` up to a hundred digits.
779
780First you need two tasks, :func:`add` and :func:`tsum` (:func:`sum` is
781already a standard function):
782
783.. code-block:: python
784
785    @app.task
786    def add(x, y):
787        return x + y
788
789    @app.task
790    def tsum(numbers):
791        return sum(numbers)
792
793
794Now you can use a chord to calculate each addition step in parallel, and then
795get the sum of the resulting numbers:
796
797.. code-block:: pycon
798
799    >>> from celery import chord
800    >>> from tasks import add, tsum
801
802    >>> chord(add.s(i, i)
803    ...       for i in range(100))(tsum.s()).get()
804    9900
805
806
807This is obviously a very contrived example, the overhead of messaging and
808synchronization makes this a lot slower than its Python counterpart:
809
810.. code-block:: pycon
811
812    >>> sum(i + i for i in range(100))
813
814The synchronization step is costly, so you should avoid using chords as much
815as possible. Still, the chord is a powerful primitive to have in your toolbox
816as synchronization is a required step for many parallel algorithms.
817
818Let's break the chord expression down:
819
820.. code-block:: pycon
821
822    >>> callback = tsum.s()
823    >>> header = [add.s(i, i) for i in range(100)]
824    >>> result = chord(header)(callback)
825    >>> result.get()
826    9900
827
828Remember, the callback can only be executed after all of the tasks in the
829header have returned. Each step in the header is executed as a task, in
830parallel, possibly on different nodes. The callback is then applied with
831the return value of each task in the header. The task id returned by
832:meth:`chord` is the id of the callback, so you can wait for it to complete
833and get the final return value (but remember to :ref:`never have a task wait
834for other tasks <task-synchronous-subtasks>`)
835
836.. _chord-errors:
837
838Error handling
839~~~~~~~~~~~~~~
840
841So what happens if one of the tasks raises an exception?
842
843The chord callback result will transition to the failure state, and the error is set
844to the :exc:`~@ChordError` exception:
845
846.. code-block:: pycon
847
848    >>> c = chord([add.s(4, 4), raising_task.s(), add.s(8, 8)])
849    >>> result = c()
850    >>> result.get()
851
852.. code-block:: pytb
853
854    Traceback (most recent call last):
855      File "<stdin>", line 1, in <module>
856      File "*/celery/result.py", line 120, in get
857        interval=interval)
858      File "*/celery/backends/amqp.py", line 150, in wait_for
859        raise meta['result']
860    celery.exceptions.ChordError: Dependency 97de6f3f-ea67-4517-a21c-d867c61fcb47
861        raised ValueError('something something',)
862
863While the traceback may be different depending on the result backend used,
864you can see that the error description includes the id of the task that failed
865and a string representation of the original exception. You can also
866find the original traceback in ``result.traceback``.
867
868Note that the rest of the tasks will still execute, so the third task
869(``add.s(8, 8)``) is still executed even though the middle task failed.
870Also the :exc:`~@ChordError` only shows the task that failed
871first (in time): it doesn't respect the ordering of the header group.
872
873To perform an action when a chord fails you can therefore attach
874an errback to the chord callback:
875
876.. code-block:: python
877
878    @app.task
879    def on_chord_error(request, exc, traceback):
880        print('Task {0!r} raised error: {1!r}'.format(request.id, exc))
881
882.. code-block:: pycon
883
884    >>> c = (group(add.s(i, i) for i in range(10)) |
885    ...      xsum.s().on_error(on_chord_error.s())).delay()
886
887.. _chord-important-notes:
888
889Important Notes
890~~~~~~~~~~~~~~~
891
892Tasks used within a chord must *not* ignore their results. In practice this
893means that you must enable a :const:`result_backend` in order to use
894chords. Additionally, if :const:`task_ignore_result` is set to :const:`True`
895in your configuration, be sure that the individual tasks to be used within
896the chord are defined with :const:`ignore_result=False`. This applies to both
897Task subclasses and decorated tasks.
898
899Example Task subclass:
900
901.. code-block:: python
902
903    class MyTask(Task):
904        ignore_result = False
905
906
907Example decorated task:
908
909.. code-block:: python
910
911    @app.task(ignore_result=False)
912    def another_task(project):
913        do_something()
914
915By default the synchronization step is implemented by having a recurring task
916poll the completion of the group every second, calling the signature when
917ready.
918
919Example implementation:
920
921.. code-block:: python
922
923    from celery import maybe_signature
924
925    @app.task(bind=True)
926    def unlock_chord(self, group, callback, interval=1, max_retries=None):
927        if group.ready():
928            return maybe_signature(callback).delay(group.join())
929        raise self.retry(countdown=interval, max_retries=max_retries)
930
931
932This is used by all result backends except Redis and Memcached: they
933increment a counter after each task in the header, then applies the callback
934when the counter exceeds the number of tasks in the set.
935
936The Redis and Memcached approach is a much better solution, but not easily
937implemented in other backends (suggestions welcome!).
938
939.. note::
940
941   Chords don't properly work with Redis before version 2.2; you'll need to
942   upgrade to at least redis-server 2.2 to use them.
943
944.. note::
945
946    If you're using chords with the Redis result backend and also overriding
947    the :meth:`Task.after_return` method, you need to make sure to call the
948    super method or else the chord callback won't be applied.
949
950    .. code-block:: python
951
952        def after_return(self, *args, **kwargs):
953            do_something()
954            super(MyTask, self).after_return(*args, **kwargs)
955
956.. _canvas-map:
957
958Map & Starmap
959-------------
960
961:class:`~celery.map` and :class:`~celery.starmap` are built-in tasks
962that calls the task for every element in a sequence.
963
964They differ from group in that
965
966- only one task message is sent
967
968- the operation is sequential.
969
970For example using ``map``:
971
972.. code-block:: pycon
973
974    >>> from proj.tasks import add
975
976    >>> ~xsum.map([range(10), range(100)])
977    [45, 4950]
978
979is the same as having a task doing:
980
981.. code-block:: python
982
983    @app.task
984    def temp():
985        return [xsum(range(10)), xsum(range(100))]
986
987and using ``starmap``:
988
989.. code-block:: pycon
990
991    >>> ~add.starmap(zip(range(10), range(10)))
992    [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
993
994is the same as having a task doing:
995
996.. code-block:: python
997
998    @app.task
999    def temp():
1000        return [add(i, i) for i in range(10)]
1001
1002Both ``map`` and ``starmap`` are signature objects, so they can be used as
1003other signatures and combined in groups etc., for example
1004to call the starmap after 10 seconds:
1005
1006.. code-block:: pycon
1007
1008    >>> add.starmap(zip(range(10), range(10))).apply_async(countdown=10)
1009
1010.. _canvas-chunks:
1011
1012Chunks
1013------
1014
1015Chunking lets you divide an iterable of work into pieces, so that if
1016you have one million objects, you can create 10 tasks with hundred
1017thousand objects each.
1018
1019Some may worry that chunking your tasks results in a degradation
1020of parallelism, but this is rarely true for a busy cluster
1021and in practice since you're avoiding the overhead  of messaging
1022it may considerably increase performance.
1023
1024To create a chunks signature you can use :meth:`@Task.chunks`:
1025
1026.. code-block:: pycon
1027
1028    >>> add.chunks(zip(range(100), range(100)), 10)
1029
1030As with :class:`~celery.group` the act of sending the messages for
1031the chunks will happen in the current process when called:
1032
1033.. code-block:: pycon
1034
1035    >>> from proj.tasks import add
1036
1037    >>> res = add.chunks(zip(range(100), range(100)), 10)()
1038    >>> res.get()
1039    [[0, 2, 4, 6, 8, 10, 12, 14, 16, 18],
1040     [20, 22, 24, 26, 28, 30, 32, 34, 36, 38],
1041     [40, 42, 44, 46, 48, 50, 52, 54, 56, 58],
1042     [60, 62, 64, 66, 68, 70, 72, 74, 76, 78],
1043     [80, 82, 84, 86, 88, 90, 92, 94, 96, 98],
1044     [100, 102, 104, 106, 108, 110, 112, 114, 116, 118],
1045     [120, 122, 124, 126, 128, 130, 132, 134, 136, 138],
1046     [140, 142, 144, 146, 148, 150, 152, 154, 156, 158],
1047     [160, 162, 164, 166, 168, 170, 172, 174, 176, 178],
1048     [180, 182, 184, 186, 188, 190, 192, 194, 196, 198]]
1049
1050while calling ``.apply_async`` will create a dedicated
1051task so that the individual tasks are applied in a worker
1052instead:
1053
1054.. code-block:: pycon
1055
1056    >>> add.chunks(zip(range(100), range(100)), 10).apply_async()
1057
1058You can also convert chunks to a group:
1059
1060.. code-block:: pycon
1061
1062    >>> group = add.chunks(zip(range(100), range(100)), 10).group()
1063
1064and with the group skew the countdown of each task by increments
1065of one:
1066
1067.. code-block:: pycon
1068
1069    >>> group.skew(start=1, stop=10)()
1070
1071This means that the first task will have a countdown of one second, the second
1072task a countdown of two seconds, and so on.
1073