1.. _guide-canvas: 2 3============================== 4 Canvas: Designing Work-flows 5============================== 6 7.. contents:: 8 :local: 9 :depth: 2 10 11.. _canvas-subtasks: 12 13.. _canvas-signatures: 14 15Signatures 16========== 17 18.. versionadded:: 2.0 19 20You just learned how to call a task using the tasks ``delay`` method 21in the :ref:`calling <guide-calling>` guide, and this is often all you need, 22but sometimes you may want to pass the signature of a task invocation to 23another process or as an argument to another function. 24 25A :func:`~celery.signature` wraps the arguments, keyword arguments, and execution options 26of a single task invocation in a way such that it can be passed to functions 27or even serialized and sent across the wire. 28 29- You can create a signature for the ``add`` task using its name like this: 30 31 .. code-block:: pycon 32 33 >>> from celery import signature 34 >>> signature('tasks.add', args=(2, 2), countdown=10) 35 tasks.add(2, 2) 36 37 This task has a signature of arity 2 (two arguments): ``(2, 2)``, 38 and sets the countdown execution option to 10. 39 40- or you can create one using the task's ``signature`` method: 41 42 .. code-block:: pycon 43 44 >>> add.signature((2, 2), countdown=10) 45 tasks.add(2, 2) 46 47- There's also a shortcut using star arguments: 48 49 .. code-block:: pycon 50 51 >>> add.s(2, 2) 52 tasks.add(2, 2) 53 54- Keyword arguments are also supported: 55 56 .. code-block:: pycon 57 58 >>> add.s(2, 2, debug=True) 59 tasks.add(2, 2, debug=True) 60 61- From any signature instance you can inspect the different fields: 62 63 .. code-block:: pycon 64 65 >>> s = add.signature((2, 2), {'debug': True}, countdown=10) 66 >>> s.args 67 (2, 2) 68 >>> s.kwargs 69 {'debug': True} 70 >>> s.options 71 {'countdown': 10} 72 73- It supports the "Calling API" of ``delay``, 74 ``apply_async``, etc., including being called directly (``__call__``). 75 76 Calling the signature will execute the task inline in the current process: 77 78 .. code-block:: pycon 79 80 >>> add(2, 2) 81 4 82 >>> add.s(2, 2)() 83 4 84 85 ``delay`` is our beloved shortcut to ``apply_async`` taking star-arguments: 86 87 .. code-block:: pycon 88 89 >>> result = add.delay(2, 2) 90 >>> result.get() 91 4 92 93 ``apply_async`` takes the same arguments as the 94 :meth:`Task.apply_async <@Task.apply_async>` method: 95 96 .. code-block:: pycon 97 98 >>> add.apply_async(args, kwargs, **options) 99 >>> add.signature(args, kwargs, **options).apply_async() 100 101 >>> add.apply_async((2, 2), countdown=1) 102 >>> add.signature((2, 2), countdown=1).apply_async() 103 104- You can't define options with :meth:`~@Task.s`, but a chaining 105 ``set`` call takes care of that: 106 107 .. code-block:: pycon 108 109 >>> add.s(2, 2).set(countdown=1) 110 proj.tasks.add(2, 2) 111 112Partials 113-------- 114 115With a signature, you can execute the task in a worker: 116 117.. code-block:: pycon 118 119 >>> add.s(2, 2).delay() 120 >>> add.s(2, 2).apply_async(countdown=1) 121 122Or you can call it directly in the current process: 123 124.. code-block:: pycon 125 126 >>> add.s(2, 2)() 127 4 128 129Specifying additional args, kwargs, or options to ``apply_async``/``delay`` 130creates partials: 131 132- Any arguments added will be prepended to the args in the signature: 133 134 .. code-block:: pycon 135 136 >>> partial = add.s(2) # incomplete signature 137 >>> partial.delay(4) # 4 + 2 138 >>> partial.apply_async((4,)) # same 139 140- Any keyword arguments added will be merged with the kwargs in the signature, 141 with the new keyword arguments taking precedence: 142 143 .. code-block:: pycon 144 145 >>> s = add.s(2, 2) 146 >>> s.delay(debug=True) # -> add(2, 2, debug=True) 147 >>> s.apply_async(kwargs={'debug': True}) # same 148 149- Any options added will be merged with the options in the signature, 150 with the new options taking precedence: 151 152 .. code-block:: pycon 153 154 >>> s = add.signature((2, 2), countdown=10) 155 >>> s.apply_async(countdown=1) # countdown is now 1 156 157You can also clone signatures to create derivatives: 158 159.. code-block:: pycon 160 161 >>> s = add.s(2) 162 proj.tasks.add(2) 163 164 >>> s.clone(args=(4,), kwargs={'debug': True}) 165 proj.tasks.add(4, 2, debug=True) 166 167Immutability 168------------ 169 170.. versionadded:: 3.0 171 172Partials are meant to be used with callbacks, any tasks linked, or chord 173callbacks will be applied with the result of the parent task. 174Sometimes you want to specify a callback that doesn't take 175additional arguments, and in that case you can set the signature 176to be immutable: 177 178.. code-block:: pycon 179 180 >>> add.apply_async((2, 2), link=reset_buffers.signature(immutable=True)) 181 182The ``.si()`` shortcut can also be used to create immutable signatures: 183 184.. code-block:: pycon 185 186 >>> add.apply_async((2, 2), link=reset_buffers.si()) 187 188Only the execution options can be set when a signature is immutable, 189so it's not possible to call the signature with partial args/kwargs. 190 191.. note:: 192 193 In this tutorial I sometimes use the prefix operator `~` to signatures. 194 You probably shouldn't use it in your production code, but it's a handy shortcut 195 when experimenting in the Python shell: 196 197 .. code-block:: pycon 198 199 >>> ~sig 200 201 >>> # is the same as 202 >>> sig.delay().get() 203 204 205.. _canvas-callbacks: 206 207Callbacks 208--------- 209 210.. versionadded:: 3.0 211 212Callbacks can be added to any task using the ``link`` argument 213to ``apply_async``: 214 215.. code-block:: pycon 216 217 add.apply_async((2, 2), link=other_task.s()) 218 219The callback will only be applied if the task exited successfully, 220and it will be applied with the return value of the parent task as argument. 221 222As I mentioned earlier, any arguments you add to a signature, 223will be prepended to the arguments specified by the signature itself! 224 225If you have the signature: 226 227.. code-block:: pycon 228 229 >>> sig = add.s(10) 230 231then `sig.delay(result)` becomes: 232 233.. code-block:: pycon 234 235 >>> add.apply_async(args=(result, 10)) 236 237... 238 239Now let's call our ``add`` task with a callback using partial 240arguments: 241 242.. code-block:: pycon 243 244 >>> add.apply_async((2, 2), link=add.s(8)) 245 246As expected this will first launch one task calculating :math:`2 + 2`, then 247another task calculating :math:`4 + 8`. 248 249The Primitives 250============== 251 252.. versionadded:: 3.0 253 254.. topic:: Overview 255 256 - ``group`` 257 258 The group primitive is a signature that takes a list of tasks that should 259 be applied in parallel. 260 261 - ``chain`` 262 263 The chain primitive lets us link together signatures so that one is called 264 after the other, essentially forming a *chain* of callbacks. 265 266 - ``chord`` 267 268 A chord is just like a group but with a callback. A chord consists 269 of a header group and a body, where the body is a task that should execute 270 after all of the tasks in the header are complete. 271 272 - ``map`` 273 274 The map primitive works like the built-in ``map`` function, but creates 275 a temporary task where a list of arguments is applied to the task. 276 For example, ``task.map([1, 2])`` -- results in a single task 277 being called, applying the arguments in order to the task function so 278 that the result is: 279 280 .. code-block:: python 281 282 res = [task(1), task(2)] 283 284 - ``starmap`` 285 286 Works exactly like map except the arguments are applied as ``*args``. 287 For example ``add.starmap([(2, 2), (4, 4)])`` results in a single 288 task calling: 289 290 .. code-block:: python 291 292 res = [add(2, 2), add(4, 4)] 293 294 - ``chunks`` 295 296 Chunking splits a long list of arguments into parts, for example 297 the operation: 298 299 .. code-block:: pycon 300 301 >>> items = zip(range(1000), range(1000)) # 1000 items 302 >>> add.chunks(items, 10) 303 304 will split the list of items into chunks of 10, resulting in 100 305 tasks (each processing 10 items in sequence). 306 307 308The primitives are also signature objects themselves, so that they can be combined 309in any number of ways to compose complex work-flows. 310 311Here's some examples: 312 313- Simple chain 314 315 Here's a simple chain, the first task executes passing its return value 316 to the next task in the chain, and so on. 317 318 .. code-block:: pycon 319 320 >>> from celery import chain 321 322 >>> # 2 + 2 + 4 + 8 323 >>> res = chain(add.s(2, 2), add.s(4), add.s(8))() 324 >>> res.get() 325 16 326 327 This can also be written using pipes: 328 329 .. code-block:: pycon 330 331 >>> (add.s(2, 2) | add.s(4) | add.s(8))().get() 332 16 333 334- Immutable signatures 335 336 Signatures can be partial so arguments can be 337 added to the existing arguments, but you may not always want that, 338 for example if you don't want the result of the previous task in a chain. 339 340 In that case you can mark the signature as immutable, so that the arguments 341 cannot be changed: 342 343 .. code-block:: pycon 344 345 >>> add.signature((2, 2), immutable=True) 346 347 There's also a ``.si()`` shortcut for this, and this is the preferred way of 348 creating signatures: 349 350 .. code-block:: pycon 351 352 >>> add.si(2, 2) 353 354 Now you can create a chain of independent tasks instead: 355 356 .. code-block:: pycon 357 358 >>> res = (add.si(2, 2) | add.si(4, 4) | add.si(8, 8))() 359 >>> res.get() 360 16 361 362 >>> res.parent.get() 363 8 364 365 >>> res.parent.parent.get() 366 4 367 368- Simple group 369 370 You can easily create a group of tasks to execute in parallel: 371 372 .. code-block:: pycon 373 374 >>> from celery import group 375 >>> res = group(add.s(i, i) for i in range(10))() 376 >>> res.get(timeout=1) 377 [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] 378 379- Simple chord 380 381 The chord primitive enables us to add a callback to be called when 382 all of the tasks in a group have finished executing. This is often 383 required for algorithms that aren't *embarrassingly parallel*: 384 385 .. code-block:: pycon 386 387 >>> from celery import chord 388 >>> res = chord((add.s(i, i) for i in range(10)), xsum.s())() 389 >>> res.get() 390 90 391 392 The above example creates 10 task that all start in parallel, 393 and when all of them are complete the return values are combined 394 into a list and sent to the ``xsum`` task. 395 396 The body of a chord can also be immutable, so that the return value 397 of the group isn't passed on to the callback: 398 399 .. code-block:: pycon 400 401 >>> chord((import_contact.s(c) for c in contacts), 402 ... notify_complete.si(import_id)).apply_async() 403 404 Note the use of ``.si`` above; this creates an immutable signature, 405 meaning any new arguments passed (including to return value of the 406 previous task) will be ignored. 407 408- Blow your mind by combining 409 410 Chains can be partial too: 411 412 .. code-block:: pycon 413 414 >>> c1 = (add.s(4) | mul.s(8)) 415 416 # (16 + 4) * 8 417 >>> res = c1(16) 418 >>> res.get() 419 160 420 421 this means that you can combine chains: 422 423 .. code-block:: pycon 424 425 # ((4 + 16) * 2 + 4) * 8 426 >>> c2 = (add.s(4, 16) | mul.s(2) | (add.s(4) | mul.s(8))) 427 428 >>> res = c2() 429 >>> res.get() 430 352 431 432 Chaining a group together with another task will automatically 433 upgrade it to be a chord: 434 435 .. code-block:: pycon 436 437 >>> c3 = (group(add.s(i, i) for i in range(10)) | xsum.s()) 438 >>> res = c3() 439 >>> res.get() 440 90 441 442 Groups and chords accepts partial arguments too, so in a chain 443 the return value of the previous task is forwarded to all tasks in the group: 444 445 .. code-block:: pycon 446 447 448 >>> new_user_workflow = (create_user.s() | group( 449 ... import_contacts.s(), 450 ... send_welcome_email.s())) 451 ... new_user_workflow.delay(username='artv', 452 ... first='Art', 453 ... last='Vandelay', 454 ... email='art@vandelay.com') 455 456 457 If you don't want to forward arguments to the group then 458 you can make the signatures in the group immutable: 459 460 .. code-block:: pycon 461 462 >>> res = (add.s(4, 4) | group(add.si(i, i) for i in range(10)))() 463 >>> res.get() 464 <GroupResult: de44df8c-821d-4c84-9a6a-44769c738f98 [ 465 bc01831b-9486-4e51-b046-480d7c9b78de, 466 2650a1b8-32bf-4771-a645-b0a35dcc791b, 467 dcbee2a5-e92d-4b03-b6eb-7aec60fd30cf, 468 59f92e0a-23ea-41ce-9fad-8645a0e7759c, 469 26e1e707-eccf-4bf4-bbd8-1e1729c3cce3, 470 2d10a5f4-37f0-41b2-96ac-a973b1df024d, 471 e13d3bdb-7ae3-4101-81a4-6f17ee21df2d, 472 104b2be0-7b75-44eb-ac8e-f9220bdfa140, 473 c5c551a5-0386-4973-aa37-b65cbeb2624b, 474 83f72d71-4b71-428e-b604-6f16599a9f37]> 475 476 >>> res.parent.get() 477 8 478 479 480.. _canvas-chain: 481 482Chains 483------ 484 485.. versionadded:: 3.0 486 487Tasks can be linked together: the linked task is called when the task 488returns successfully: 489 490.. code-block:: pycon 491 492 >>> res = add.apply_async((2, 2), link=mul.s(16)) 493 >>> res.get() 494 4 495 496The linked task will be applied with the result of its parent 497task as the first argument. In the above case where the result was 4, 498this will result in ``mul(4, 16)``. 499 500The results will keep track of any subtasks called by the original task, 501and this can be accessed from the result instance: 502 503.. code-block:: pycon 504 505 >>> res.children 506 [<AsyncResult: 8c350acf-519d-4553-8a53-4ad3a5c5aeb4>] 507 508 >>> res.children[0].get() 509 64 510 511The result instance also has a :meth:`~@AsyncResult.collect` method 512that treats the result as a graph, enabling you to iterate over 513the results: 514 515.. code-block:: pycon 516 517 >>> list(res.collect()) 518 [(<AsyncResult: 7b720856-dc5f-4415-9134-5c89def5664e>, 4), 519 (<AsyncResult: 8c350acf-519d-4553-8a53-4ad3a5c5aeb4>, 64)] 520 521By default :meth:`~@AsyncResult.collect` will raise an 522:exc:`~@IncompleteStream` exception if the graph isn't fully 523formed (one of the tasks hasn't completed yet), 524but you can get an intermediate representation of the graph 525too: 526 527.. code-block:: pycon 528 529 >>> for result, value in res.collect(intermediate=True): 530 .... 531 532You can link together as many tasks as you like, 533and signatures can be linked too: 534 535.. code-block:: pycon 536 537 >>> s = add.s(2, 2) 538 >>> s.link(mul.s(4)) 539 >>> s.link(log_result.s()) 540 541You can also add *error callbacks* using the `on_error` method: 542 543.. code-block:: pycon 544 545 >>> add.s(2, 2).on_error(log_error.s()).delay() 546 547This will result in the following ``.apply_async`` call when the signature 548is applied: 549 550.. code-block:: pycon 551 552 >>> add.apply_async((2, 2), link_error=log_error.s()) 553 554The worker won't actually call the errback as a task, but will 555instead call the errback function directly so that the raw request, exception 556and traceback objects can be passed to it. 557 558Here's an example errback: 559 560.. code-block:: python 561 562 from __future__ import print_function 563 564 import os 565 566 from proj.celery import app 567 568 @app.task 569 def log_error(request, exc, traceback): 570 with open(os.path.join('/var/errors', request.id), 'a') as fh: 571 print('--\n\n{0} {1} {2}'.format( 572 task_id, exc, traceback), file=fh) 573 574To make it even easier to link tasks together there's 575a special signature called :class:`~celery.chain` that lets 576you chain tasks together: 577 578.. code-block:: pycon 579 580 >>> from celery import chain 581 >>> from proj.tasks import add, mul 582 583 >>> # (4 + 4) * 8 * 10 584 >>> res = chain(add.s(4, 4), mul.s(8), mul.s(10)) 585 proj.tasks.add(4, 4) | proj.tasks.mul(8) | proj.tasks.mul(10) 586 587 588Calling the chain will call the tasks in the current process 589and return the result of the last task in the chain: 590 591.. code-block:: pycon 592 593 >>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))() 594 >>> res.get() 595 640 596 597It also sets ``parent`` attributes so that you can 598work your way up the chain to get intermediate results: 599 600.. code-block:: pycon 601 602 >>> res.parent.get() 603 64 604 605 >>> res.parent.parent.get() 606 8 607 608 >>> res.parent.parent 609 <AsyncResult: eeaad925-6778-4ad1-88c8-b2a63d017933> 610 611 612Chains can also be made using the ``|`` (pipe) operator: 613 614.. code-block:: pycon 615 616 >>> (add.s(2, 2) | mul.s(8) | mul.s(10)).apply_async() 617 618Graphs 619~~~~~~ 620 621In addition you can work with the result graph as a 622:class:`~celery.utils.graph.DependencyGraph`: 623 624.. code-block:: pycon 625 626 >>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))() 627 628 >>> res.parent.parent.graph 629 285fa253-fcf8-42ef-8b95-0078897e83e6(1) 630 463afec2-5ed4-4036-b22d-ba067ec64f52(0) 631 872c3995-6fa0-46ca-98c2-5a19155afcf0(2) 632 285fa253-fcf8-42ef-8b95-0078897e83e6(1) 633 463afec2-5ed4-4036-b22d-ba067ec64f52(0) 634 635You can even convert these graphs to *dot* format: 636 637.. code-block:: pycon 638 639 >>> with open('graph.dot', 'w') as fh: 640 ... res.parent.parent.graph.to_dot(fh) 641 642 643and create images: 644 645.. code-block:: console 646 647 $ dot -Tpng graph.dot -o graph.png 648 649.. image:: ../images/result_graph.png 650 651.. _canvas-group: 652 653Groups 654------ 655 656.. versionadded:: 3.0 657 658A group can be used to execute several tasks in parallel. 659 660The :class:`~celery.group` function takes a list of signatures: 661 662.. code-block:: pycon 663 664 >>> from celery import group 665 >>> from proj.tasks import add 666 667 >>> group(add.s(2, 2), add.s(4, 4)) 668 (proj.tasks.add(2, 2), proj.tasks.add(4, 4)) 669 670If you **call** the group, the tasks will be applied 671one after another in the current process, and a :class:`~celery.result.GroupResult` 672instance is returned that can be used to keep track of the results, 673or tell how many tasks are ready and so on: 674 675.. code-block:: pycon 676 677 >>> g = group(add.s(2, 2), add.s(4, 4)) 678 >>> res = g() 679 >>> res.get() 680 [4, 8] 681 682Group also supports iterators: 683 684.. code-block:: pycon 685 686 >>> group(add.s(i, i) for i in range(100))() 687 688A group is a signature object, so it can be used in combination 689with other signatures. 690 691Group Results 692~~~~~~~~~~~~~ 693 694The group task returns a special result too, 695this result works just like normal task results, except 696that it works on the group as a whole: 697 698.. code-block:: pycon 699 700 >>> from celery import group 701 >>> from tasks import add 702 703 >>> job = group([ 704 ... add.s(2, 2), 705 ... add.s(4, 4), 706 ... add.s(8, 8), 707 ... add.s(16, 16), 708 ... add.s(32, 32), 709 ... ]) 710 711 >>> result = job.apply_async() 712 713 >>> result.ready() # have all subtasks completed? 714 True 715 >>> result.successful() # were all subtasks successful? 716 True 717 >>> result.get() 718 [4, 8, 16, 32, 64] 719 720The :class:`~celery.result.GroupResult` takes a list of 721:class:`~celery.result.AsyncResult` instances and operates on them as 722if it was a single task. 723 724It supports the following operations: 725 726* :meth:`~celery.result.GroupResult.successful` 727 728 Return :const:`True` if all of the subtasks finished 729 successfully (e.g., didn't raise an exception). 730 731* :meth:`~celery.result.GroupResult.failed` 732 733 Return :const:`True` if any of the subtasks failed. 734 735* :meth:`~celery.result.GroupResult.waiting` 736 737 Return :const:`True` if any of the subtasks 738 isn't ready yet. 739 740* :meth:`~celery.result.GroupResult.ready` 741 742 Return :const:`True` if all of the subtasks 743 are ready. 744 745* :meth:`~celery.result.GroupResult.completed_count` 746 747 Return the number of completed subtasks. 748 749* :meth:`~celery.result.GroupResult.revoke` 750 751 Revoke all of the subtasks. 752 753* :meth:`~celery.result.GroupResult.join` 754 755 Gather the results of all subtasks 756 and return them in the same order as they were called (as a list). 757 758.. _canvas-chord: 759 760Chords 761------ 762 763.. versionadded:: 2.3 764 765.. note:: 766 767 Tasks used within a chord must *not* ignore their results. If the result 768 backend is disabled for *any* task (header or body) in your chord you 769 should read ":ref:`chord-important-notes`." Chords are not currently 770 supported with the RPC result backend. 771 772 773A chord is a task that only executes after all of the tasks in a group have 774finished executing. 775 776 777Let's calculate the sum of the expression 778:math:`1 + 1 + 2 + 2 + 3 + 3 ... n + n` up to a hundred digits. 779 780First you need two tasks, :func:`add` and :func:`tsum` (:func:`sum` is 781already a standard function): 782 783.. code-block:: python 784 785 @app.task 786 def add(x, y): 787 return x + y 788 789 @app.task 790 def tsum(numbers): 791 return sum(numbers) 792 793 794Now you can use a chord to calculate each addition step in parallel, and then 795get the sum of the resulting numbers: 796 797.. code-block:: pycon 798 799 >>> from celery import chord 800 >>> from tasks import add, tsum 801 802 >>> chord(add.s(i, i) 803 ... for i in range(100))(tsum.s()).get() 804 9900 805 806 807This is obviously a very contrived example, the overhead of messaging and 808synchronization makes this a lot slower than its Python counterpart: 809 810.. code-block:: pycon 811 812 >>> sum(i + i for i in range(100)) 813 814The synchronization step is costly, so you should avoid using chords as much 815as possible. Still, the chord is a powerful primitive to have in your toolbox 816as synchronization is a required step for many parallel algorithms. 817 818Let's break the chord expression down: 819 820.. code-block:: pycon 821 822 >>> callback = tsum.s() 823 >>> header = [add.s(i, i) for i in range(100)] 824 >>> result = chord(header)(callback) 825 >>> result.get() 826 9900 827 828Remember, the callback can only be executed after all of the tasks in the 829header have returned. Each step in the header is executed as a task, in 830parallel, possibly on different nodes. The callback is then applied with 831the return value of each task in the header. The task id returned by 832:meth:`chord` is the id of the callback, so you can wait for it to complete 833and get the final return value (but remember to :ref:`never have a task wait 834for other tasks <task-synchronous-subtasks>`) 835 836.. _chord-errors: 837 838Error handling 839~~~~~~~~~~~~~~ 840 841So what happens if one of the tasks raises an exception? 842 843The chord callback result will transition to the failure state, and the error is set 844to the :exc:`~@ChordError` exception: 845 846.. code-block:: pycon 847 848 >>> c = chord([add.s(4, 4), raising_task.s(), add.s(8, 8)]) 849 >>> result = c() 850 >>> result.get() 851 852.. code-block:: pytb 853 854 Traceback (most recent call last): 855 File "<stdin>", line 1, in <module> 856 File "*/celery/result.py", line 120, in get 857 interval=interval) 858 File "*/celery/backends/amqp.py", line 150, in wait_for 859 raise meta['result'] 860 celery.exceptions.ChordError: Dependency 97de6f3f-ea67-4517-a21c-d867c61fcb47 861 raised ValueError('something something',) 862 863While the traceback may be different depending on the result backend used, 864you can see that the error description includes the id of the task that failed 865and a string representation of the original exception. You can also 866find the original traceback in ``result.traceback``. 867 868Note that the rest of the tasks will still execute, so the third task 869(``add.s(8, 8)``) is still executed even though the middle task failed. 870Also the :exc:`~@ChordError` only shows the task that failed 871first (in time): it doesn't respect the ordering of the header group. 872 873To perform an action when a chord fails you can therefore attach 874an errback to the chord callback: 875 876.. code-block:: python 877 878 @app.task 879 def on_chord_error(request, exc, traceback): 880 print('Task {0!r} raised error: {1!r}'.format(request.id, exc)) 881 882.. code-block:: pycon 883 884 >>> c = (group(add.s(i, i) for i in range(10)) | 885 ... xsum.s().on_error(on_chord_error.s())).delay() 886 887.. _chord-important-notes: 888 889Important Notes 890~~~~~~~~~~~~~~~ 891 892Tasks used within a chord must *not* ignore their results. In practice this 893means that you must enable a :const:`result_backend` in order to use 894chords. Additionally, if :const:`task_ignore_result` is set to :const:`True` 895in your configuration, be sure that the individual tasks to be used within 896the chord are defined with :const:`ignore_result=False`. This applies to both 897Task subclasses and decorated tasks. 898 899Example Task subclass: 900 901.. code-block:: python 902 903 class MyTask(Task): 904 ignore_result = False 905 906 907Example decorated task: 908 909.. code-block:: python 910 911 @app.task(ignore_result=False) 912 def another_task(project): 913 do_something() 914 915By default the synchronization step is implemented by having a recurring task 916poll the completion of the group every second, calling the signature when 917ready. 918 919Example implementation: 920 921.. code-block:: python 922 923 from celery import maybe_signature 924 925 @app.task(bind=True) 926 def unlock_chord(self, group, callback, interval=1, max_retries=None): 927 if group.ready(): 928 return maybe_signature(callback).delay(group.join()) 929 raise self.retry(countdown=interval, max_retries=max_retries) 930 931 932This is used by all result backends except Redis and Memcached: they 933increment a counter after each task in the header, then applies the callback 934when the counter exceeds the number of tasks in the set. 935 936The Redis and Memcached approach is a much better solution, but not easily 937implemented in other backends (suggestions welcome!). 938 939.. note:: 940 941 Chords don't properly work with Redis before version 2.2; you'll need to 942 upgrade to at least redis-server 2.2 to use them. 943 944.. note:: 945 946 If you're using chords with the Redis result backend and also overriding 947 the :meth:`Task.after_return` method, you need to make sure to call the 948 super method or else the chord callback won't be applied. 949 950 .. code-block:: python 951 952 def after_return(self, *args, **kwargs): 953 do_something() 954 super(MyTask, self).after_return(*args, **kwargs) 955 956.. _canvas-map: 957 958Map & Starmap 959------------- 960 961:class:`~celery.map` and :class:`~celery.starmap` are built-in tasks 962that calls the task for every element in a sequence. 963 964They differ from group in that 965 966- only one task message is sent 967 968- the operation is sequential. 969 970For example using ``map``: 971 972.. code-block:: pycon 973 974 >>> from proj.tasks import add 975 976 >>> ~xsum.map([range(10), range(100)]) 977 [45, 4950] 978 979is the same as having a task doing: 980 981.. code-block:: python 982 983 @app.task 984 def temp(): 985 return [xsum(range(10)), xsum(range(100))] 986 987and using ``starmap``: 988 989.. code-block:: pycon 990 991 >>> ~add.starmap(zip(range(10), range(10))) 992 [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] 993 994is the same as having a task doing: 995 996.. code-block:: python 997 998 @app.task 999 def temp(): 1000 return [add(i, i) for i in range(10)] 1001 1002Both ``map`` and ``starmap`` are signature objects, so they can be used as 1003other signatures and combined in groups etc., for example 1004to call the starmap after 10 seconds: 1005 1006.. code-block:: pycon 1007 1008 >>> add.starmap(zip(range(10), range(10))).apply_async(countdown=10) 1009 1010.. _canvas-chunks: 1011 1012Chunks 1013------ 1014 1015Chunking lets you divide an iterable of work into pieces, so that if 1016you have one million objects, you can create 10 tasks with hundred 1017thousand objects each. 1018 1019Some may worry that chunking your tasks results in a degradation 1020of parallelism, but this is rarely true for a busy cluster 1021and in practice since you're avoiding the overhead of messaging 1022it may considerably increase performance. 1023 1024To create a chunks signature you can use :meth:`@Task.chunks`: 1025 1026.. code-block:: pycon 1027 1028 >>> add.chunks(zip(range(100), range(100)), 10) 1029 1030As with :class:`~celery.group` the act of sending the messages for 1031the chunks will happen in the current process when called: 1032 1033.. code-block:: pycon 1034 1035 >>> from proj.tasks import add 1036 1037 >>> res = add.chunks(zip(range(100), range(100)), 10)() 1038 >>> res.get() 1039 [[0, 2, 4, 6, 8, 10, 12, 14, 16, 18], 1040 [20, 22, 24, 26, 28, 30, 32, 34, 36, 38], 1041 [40, 42, 44, 46, 48, 50, 52, 54, 56, 58], 1042 [60, 62, 64, 66, 68, 70, 72, 74, 76, 78], 1043 [80, 82, 84, 86, 88, 90, 92, 94, 96, 98], 1044 [100, 102, 104, 106, 108, 110, 112, 114, 116, 118], 1045 [120, 122, 124, 126, 128, 130, 132, 134, 136, 138], 1046 [140, 142, 144, 146, 148, 150, 152, 154, 156, 158], 1047 [160, 162, 164, 166, 168, 170, 172, 174, 176, 178], 1048 [180, 182, 184, 186, 188, 190, 192, 194, 196, 198]] 1049 1050while calling ``.apply_async`` will create a dedicated 1051task so that the individual tasks are applied in a worker 1052instead: 1053 1054.. code-block:: pycon 1055 1056 >>> add.chunks(zip(range(100), range(100)), 10).apply_async() 1057 1058You can also convert chunks to a group: 1059 1060.. code-block:: pycon 1061 1062 >>> group = add.chunks(zip(range(100), range(100)), 10).group() 1063 1064and with the group skew the countdown of each task by increments 1065of one: 1066 1067.. code-block:: pycon 1068 1069 >>> group.skew(start=1, stop=10)() 1070 1071This means that the first task will have a countdown of one second, the second 1072task a countdown of two seconds, and so on. 1073