1defmodule Task do 2 @moduledoc """ 3 Conveniences for spawning and awaiting tasks. 4 5 Tasks are processes meant to execute one particular 6 action throughout their lifetime, often with little or no 7 communication with other processes. The most common use case 8 for tasks is to convert sequential code into concurrent code 9 by computing a value asynchronously: 10 11 task = Task.async(fn -> do_some_work() end) 12 res = do_some_other_work() 13 res + Task.await(task) 14 15 Tasks spawned with `async` can be awaited on by their caller 16 process (and only their caller) as shown in the example above. 17 They are implemented by spawning a process that sends a message 18 to the caller once the given computation is performed. 19 20 Besides `async/1` and `await/2`, tasks can also be 21 started as part of a supervision tree and dynamically spawned 22 on remote nodes. We will explore these scenarios next. 23 24 ## async and await 25 26 One of the common uses of tasks is to convert sequential code 27 into concurrent code with `Task.async/1` while keeping its semantics. 28 When invoked, a new process will be created, linked and monitored 29 by the caller. Once the task action finishes, a message will be sent 30 to the caller with the result. 31 32 `Task.await/2` is used to read the message sent by the task. 33 34 There are two important things to consider when using `async`: 35 36 1. If you are using async tasks, you **must await** a reply 37 as they are *always* sent. If you are not expecting a reply, 38 consider using `Task.start_link/1` detailed below. 39 40 2. async tasks link the caller and the spawned process. This 41 means that, if the caller crashes, the task will crash 42 too and vice-versa. This is on purpose: if the process 43 meant to receive the result no longer exists, there is 44 no purpose in completing the computation. 45 46 If this is not desired, you will want to use supervised 47 tasks, described next. 48 49 ## Dynamically supervised tasks 50 51 The `Task.Supervisor` module allows developers to dynamically 52 create multiple supervised tasks. 53 54 A short example is: 55 56 {:ok, pid} = Task.Supervisor.start_link() 57 58 task = 59 Task.Supervisor.async(pid, fn -> 60 # Do something 61 end) 62 63 Task.await(task) 64 65 However, in the majority of cases, you want to add the task supervisor 66 to your supervision tree: 67 68 Supervisor.start_link([ 69 {Task.Supervisor, name: MyApp.TaskSupervisor} 70 ], strategy: :one_for_one) 71 72 And now you can use async/await once again passig the name of 73 the supervisor isntead of the pid: 74 75 Task.Supervisor.async(MyApp.TaskSupervisor, fn -> 76 # Do something 77 end) 78 |> Task.await() 79 80 We encourage developers to rely on supervised tasks as much as 81 possible. Supervised tasks enable a huge variety of patterns 82 which allows you explicit control on how to handle the results, 83 errors, and timeouts. Here is a summary: 84 85 * Use `Task.Supervisor.start_child/2` to start a fire-and-forget 86 task and you don't care about its results nor about if it completes 87 successfully 88 89 * Use `Task.Supervisor.async/2` + `Task.await/2` allows you to execute 90 tasks concurrently and retrieve its result. If the task fails, 91 the caller will also fail 92 93 * Use `Task.Supervisor.async_nolink/2` + `Task.yield/2` + `Task.shutdown/2` 94 allows you to execute tasks concurrently and retrieve their results 95 or the reason they failed within a given time frame. If the task fails, 96 the caller won't fail: you will receive the error reason either on 97 `yield` or `shutdown` 98 99 See the `Task.Supervisor` module for details on the supported operations. 100 101 ### Distributed tasks 102 103 Since Elixir provides a `Task.Supervisor`, it is easy to use one 104 to dynamically start tasks across nodes: 105 106 # On the remote node 107 Task.Supervisor.start_link(name: MyApp.DistSupervisor) 108 109 # On the client 110 supervisor = {MyApp.DistSupervisor, :remote@local} 111 Task.Supervisor.async(supervisor, MyMod, :my_fun, [arg1, arg2, arg3]) 112 113 Note that, when working with distributed tasks, one should use the 114 `Task.Supervisor.async/4` function that expects explicit module, function, 115 and arguments, instead of `Task.Supervisor.async/2` that works with anonymous 116 functions. That's because anonymous functions expect the same module version 117 to exist on all involved nodes. Check the `Agent` module documentation for 118 more information on distributed processes as the limitations described there 119 apply to the whole ecosystem. 120 121 ## Statically supervised tasks 122 123 The `Task` module implements the `child_spec/1` function, which 124 allows it to be started directly under a regular `Supervisor` - 125 instead of a `Task.Supervisor` - by passing a tuple with a function 126 to run: 127 128 Supervisor.start_link([ 129 {Task, fn -> :some_work end} 130 ], strategy: :one_for_one) 131 132 This is often useful when you need to execute some steps while 133 setting up your supervision tree. For example: to warm up caches, 134 log the initialization status, etc. 135 136 If you don't want to put the Task code directly under the `Supervisor`, 137 you can wrap the `Task` in its own module, similar to how you would 138 do with a `GenServer` or an `Agent`: 139 140 defmodule MyTask do 141 use Task 142 143 def start_link(arg) do 144 Task.start_link(__MODULE__, :run, [arg]) 145 end 146 147 def run(arg) do 148 # ... 149 end 150 end 151 152 And then passing it to the supervisor: 153 154 Supervisor.start_link([ 155 {MyTask, arg} 156 ], strategy: :one_for_one) 157 158 Since these tasks are supervised and not directly linked to the caller, 159 they cannot be awaited on. By default, the functions `Task.start` 160 and `Task.start_link` are for fire-and-forget tasks, where you don't 161 care about the results or if it completes successfully or not. 162 163 `use Task` defines a `child_spec/1` function, allowing the 164 defined module to be put under a supervision tree. The generated 165 `child_spec/1` can be customized with the following options: 166 167 * `:id` - the child specification identifier, defaults to the current module 168 * `:restart` - when the child should be restarted, defaults to `:temporary` 169 * `:shutdown` - how to shut down the child, either immediately or by giving it time to shut down 170 171 Opposite to `GenServer`, `Agent` and `Supervisor`, a Task has 172 a default `:restart` of `:temporary`. This means the task will 173 not be restarted even if it crashes. If you desire the task to 174 be restarted for non-successful exits, do: 175 176 use Task, restart: :transient 177 178 If you want the task to always be restarted: 179 180 use Task, restart: :permanent 181 182 See the "Child specification" section in the `Supervisor` module 183 for more detailed information. The `@doc` annotation immediately 184 preceding `use Task` will be attached to the generated `child_spec/1` 185 function. 186 187 ## Ancestor and Caller Tracking 188 189 Whenever you start a new process, Elixir annotates the parent of that process 190 through the `$ancestors` key in the process dictionary. This is often used to 191 track the hierarchy inside a supervision tree. 192 193 For example, we recommend developers to always start tasks under a supervisor. 194 This provides more visibility and allows you to control how those tasks are 195 terminated when a node shuts down. That might look something like 196 `Task.Supervisor.start_child(MySupervisor, task_specification)`. This means 197 that, although your code is the one who invokes the task, the actual ancestor of 198 the task is the supervisor, as the supervisor is the one effectively starting it. 199 200 To track the relationship between your code and the task, we use the `$callers` 201 key in the process dictionary. Therefore, assuming the `Task.Supervisor` call 202 above, we have: 203 204 [your code] -- calls --> [supervisor] ---- spawns --> [task] 205 206 Which means we store the following relationships: 207 208 [your code] [supervisor] <-- ancestor -- [task] 209 ^ | 210 |--------------------- caller ---------------------| 211 212 The list of callers of the current process can be retrieved from the Process 213 dictionary with `Process.get(:"$callers")`. This will return either `nil` or 214 a list `[pid_n, ..., pid2, pid1]` with at least one entry Where `pid_n` is 215 the PID that called the current process, `pid2` called `pid_n`, and `pid2` was 216 called by `pid1`. 217 218 If a task crashes, the callers field is included as part of the log message 219 metadata under the `:callers` key. 220 """ 221 222 @doc """ 223 The Task struct. 224 225 It contains these fields: 226 227 * `:pid` - the PID of the task process; `nil` if the task does 228 not use a task process 229 230 * `:ref` - the task monitor reference 231 232 * `:owner` - the PID of the process that started the task 233 234 """ 235 @enforce_keys [:pid, :ref, :owner] 236 defstruct pid: nil, ref: nil, owner: nil 237 238 @typedoc """ 239 The Task type. 240 241 See `%Task{}` for information about each field of the structure. 242 """ 243 @type t :: %__MODULE__{ 244 pid: pid() | nil, 245 ref: reference() | nil, 246 owner: pid() | nil 247 } 248 249 defguardp is_timeout(timeout) 250 when timeout == :infinity or (is_integer(timeout) and timeout >= 0) 251 252 @doc """ 253 Returns a specification to start a task under a supervisor. 254 255 `arg` is passed as the argument to `Task.start_link/1` in the `:start` field 256 of the spec. 257 258 For more information, see the `Supervisor` module, 259 the `Supervisor.child_spec/2` function and the `t:Supervisor.child_spec/0` type. 260 """ 261 @doc since: "1.5.0" 262 @spec child_spec(term) :: Supervisor.child_spec() 263 def child_spec(arg) do 264 %{ 265 id: Task, 266 start: {Task, :start_link, [arg]}, 267 restart: :temporary 268 } 269 end 270 271 @doc false 272 defmacro __using__(opts) do 273 quote location: :keep, bind_quoted: [opts: opts] do 274 unless Module.has_attribute?(__MODULE__, :doc) do 275 @doc """ 276 Returns a specification to start this module under a supervisor. 277 278 `arg` is passed as the argument to `Task.start_link/1` in the `:start` field 279 of the spec. 280 281 For more information, see the `Supervisor` module, 282 the `Supervisor.child_spec/2` function and the `t:Supervisor.child_spec/0` type. 283 """ 284 end 285 286 def child_spec(arg) do 287 default = %{ 288 id: __MODULE__, 289 start: {__MODULE__, :start_link, [arg]}, 290 restart: :temporary 291 } 292 293 Supervisor.child_spec(default, unquote(Macro.escape(opts))) 294 end 295 296 defoverridable child_spec: 1 297 end 298 end 299 300 @doc """ 301 Starts a task as part of a supervision tree with the given `fun`. 302 303 `fun` must be a zero-arity anonymous function. 304 305 This is used to start a statically supervised task under a supervision tree. 306 """ 307 @spec start_link((() -> any)) :: {:ok, pid} 308 def start_link(fun) when is_function(fun, 0) do 309 start_link(:erlang, :apply, [fun, []]) 310 end 311 312 @doc """ 313 Starts a task as part of a supervision tree with the given 314 `module`, `function`, and `args`. 315 316 This is used to start a statically supervised task under a supervision tree. 317 """ 318 @spec start_link(module, atom, [term]) :: {:ok, pid} 319 def start_link(module, function, args) 320 when is_atom(module) and is_atom(function) and is_list(args) do 321 mfa = {module, function, args} 322 Task.Supervised.start_link(get_owner(self()), get_callers(self()), mfa) 323 end 324 325 @doc """ 326 Starts a task. 327 328 `fun` must be a zero-arity anonymous function. 329 330 This should only used when the task is used for side-effects 331 (like I/O) and you have no interest on its results nor if it 332 completes successfully. 333 334 If the current node is shutdown, the node will terminate even 335 if the task was not completed. For this reason, we recommend 336 to use `Task.Supervisor.start_child/2` instead, which allows 337 you to control the shutdown time via the `:shutdown` option. 338 """ 339 @spec start((() -> any)) :: {:ok, pid} 340 def start(fun) when is_function(fun, 0) do 341 start(:erlang, :apply, [fun, []]) 342 end 343 344 @doc """ 345 Starts a task. 346 347 This should only used when the task is used for side-effects 348 (like I/O) and you have no interest on its results nor if it 349 completes successfully. 350 351 If the current node is shutdown, the node will terminate even 352 if the task was not completed. For this reason, we recommend 353 to use `Task.Supervisor.start_child/2` instead, which allows 354 you to control the shutdown time via the `:shutdown` option. 355 """ 356 @spec start(module, atom, [term]) :: {:ok, pid} 357 def start(module, function_name, args) 358 when is_atom(module) and is_atom(function_name) and is_list(args) do 359 mfa = {module, function_name, args} 360 Task.Supervised.start(get_owner(self()), get_callers(self()), mfa) 361 end 362 363 @doc """ 364 Starts a task that must be awaited on. 365 366 `fun` must be a zero-arity anonymous function. This function 367 spawns a process that is linked to and monitored by the caller 368 process. A `Task` struct is returned containing the relevant 369 information. Developers must eventually call `Task.await/2` or 370 `Task.yield/2` followed by `Task.shutdown/2` on the returned task. 371 372 Read the `Task` module documentation for more information about 373 the general usage of async tasks. 374 375 ## Linking 376 377 This function spawns a process that is linked to and monitored 378 by the caller process. The linking part is important because it 379 aborts the task if the parent process dies. It also guarantees 380 the code before async/await has the same properties after you 381 add the async call. For example, imagine you have this: 382 383 x = heavy_fun() 384 y = some_fun() 385 x + y 386 387 Now you want to make the `heavy_fun()` async: 388 389 x = Task.async(&heavy_fun/0) 390 y = some_fun() 391 Task.await(x) + y 392 393 As before, if `heavy_fun/0` fails, the whole computation will 394 fail, including the parent process. If you don't want the task 395 to fail then you must change the `heavy_fun/0` code in the 396 same way you would achieve it if you didn't have the async call. 397 For example, to either return `{:ok, val} | :error` results or, 398 in more extreme cases, by using `try/rescue`. In other words, 399 an asynchronous task should be thought of as an extension of a 400 process rather than a mechanism to isolate it from all errors. 401 402 If you don't want to link the caller to the task, then you 403 must use a supervised task with `Task.Supervisor` and call 404 `Task.Supervisor.async_nolink/2`. 405 406 In any case, avoid any of the following: 407 408 * Setting `:trap_exit` to `true` - trapping exits should be 409 used only in special circumstances as it would make your 410 process immune to not only exits from the task but from 411 any other processes. 412 413 Moreover, even when trapping exits, calling `await` will 414 still exit if the task has terminated without sending its 415 result back. 416 417 * Unlinking the task process started with `async`/`await`. 418 If you unlink the processes and the task does not belong 419 to any supervisor, you may leave dangling tasks in case 420 the parent dies. 421 422 """ 423 @spec async((() -> any)) :: t 424 def async(fun) when is_function(fun, 0) do 425 async(:erlang, :apply, [fun, []]) 426 end 427 428 @doc """ 429 Starts a task that must be awaited on. 430 431 Similar to `async/1` except the function to be started is 432 specified by the given `module`, `function_name`, and `args`. 433 """ 434 @spec async(module, atom, [term]) :: t 435 def async(module, function_name, args) 436 when is_atom(module) and is_atom(function_name) and is_list(args) do 437 mfa = {module, function_name, args} 438 owner = self() 439 {:ok, pid} = Task.Supervised.start_link(get_owner(owner), get_callers(owner), :nomonitor, mfa) 440 ref = Process.monitor(pid) 441 send(pid, {owner, ref}) 442 %Task{pid: pid, ref: ref, owner: owner} 443 end 444 445 @doc """ 446 Returns a stream where the given function (`module` and `function_name`) 447 is mapped concurrently on each element in `enumerable`. 448 449 Each element of `enumerable` will be prepended to the given `args` and 450 processed by its own task. The tasks will be linked to an intermediate 451 process that is then linked to the current process. This means a failure 452 in a task terminates the current process and a failure in the current process 453 terminates all tasks. 454 455 When streamed, each task will emit `{:ok, value}` upon successful 456 completion or `{:exit, reason}` if the caller is trapping exits. 457 The order of results depends on the value of the `:ordered` option. 458 459 The level of concurrency and the time tasks are allowed to run can 460 be controlled via options (see the "Options" section below). 461 462 Consider using `Task.Supervisor.async_stream/6` to start tasks 463 under a supervisor. If you find yourself trapping exits to handle exits 464 inside the async stream, consider using `Task.Supervisor.async_stream_nolink/6` 465 to start tasks that are not linked to the calling process. 466 467 ## Options 468 469 * `:max_concurrency` - sets the maximum number of tasks to run 470 at the same time. Defaults to `System.schedulers_online/0`. 471 472 * `:ordered` - whether the results should be returned in the same order 473 as the input stream. When the output is ordered, Elixir may need to 474 buffer results to emit them in the original order. Setting this option 475 to false disables the need to buffer at the cost of removing ordering. 476 This is also useful when you're using the tasks only for the side effects. 477 Note that regardless of what `:ordered` is set to, the tasks will 478 process asynchronously. If you need to process elements in order, 479 consider using `Enum.map/2` or `Enum.each/2` instead. Defaults to `true`. 480 481 * `:timeout` - the maximum amount of time (in milliseconds or `:infinity`) 482 each task is allowed to execute for. Defaults to `5000`. 483 484 * `:on_timeout` - what to do when a task times out. The possible 485 values are: 486 * `:exit` (default) - the process that spawned the tasks exits. 487 * `:kill_task` - the task that timed out is killed. The value 488 emitted for that task is `{:exit, :timeout}`. 489 490 ## Example 491 492 Let's build a stream and then enumerate it: 493 494 stream = Task.async_stream(collection, Mod, :expensive_fun, []) 495 Enum.to_list(stream) 496 497 The concurrency can be increased or decreased using the `:max_concurrency` 498 option. For example, if the tasks are IO heavy, the value can be increased: 499 500 max_concurrency = System.schedulers_online() * 2 501 stream = Task.async_stream(collection, Mod, :expensive_fun, [], max_concurrency: max_concurrency) 502 Enum.to_list(stream) 503 504 If you do not care about the results of the computation, you can run 505 the stream with `Stream.run/1`. Also set `ordered: false`, as you don't 506 care about the order of the results either: 507 508 stream = Task.async_stream(collection, Mod, :expensive_fun, [], ordered: false) 509 Stream.run(stream) 510 511 ## Attention: async + take 512 513 Given items in an async stream are processed concurrently, doing 514 `async_stream` followed by `Enum.take/2` may cause more items than 515 requested to be processed. Let's see an example: 516 517 1..100 518 |> Task.async_stream(fn i -> 519 Process.sleep(100) 520 IO.puts(to_string(i)) 521 end) 522 |> Enum.take(10) 523 524 For a machine with 8 cores, the above will process 16 items instead 525 of 10. The reason is that `async_stream/5` always have 8 elements 526 processing at once. So by the time `Enum` says it got all elements 527 it needed, there are still 6 elements left to be processed. 528 529 The solution here is to use `Stream.take/2` instead of `Enum.take/2` 530 to filter elements before-hand: 531 532 1..100 533 |> Stream.take(10) 534 |> Task.async_stream(fn i -> 535 Process.sleep(100) 536 IO.puts(to_string(i)) 537 end) 538 |> Enum.to_list() 539 540 If for some reason you cannot take the elements before hand, 541 you can use `:max_concurrency` to limit how many elements 542 may be over processed at the cost of reducing concurrency. 543 """ 544 @doc since: "1.4.0" 545 @spec async_stream(Enumerable.t(), module, atom, [term], keyword) :: Enumerable.t() 546 def async_stream(enumerable, module, function_name, args, options \\ []) 547 when is_atom(module) and is_atom(function_name) and is_list(args) do 548 build_stream(enumerable, {module, function_name, args}, options) 549 end 550 551 @doc """ 552 Returns a stream that runs the given function `fun` concurrently 553 on each element in `enumerable`. 554 555 Works the same as `async_stream/5` but with an anonymous function instead of a 556 module-function-arguments tuple. `fun` must be a one-arity anonymous function. 557 558 Each `enumerable` element is passed as argument to the given function `fun` and 559 processed by its own task. The tasks will be linked to the current process, 560 similarly to `async/1`. 561 562 ## Example 563 564 Count the code points in each string asynchronously, then add the counts together using reduce. 565 566 iex> strings = ["long string", "longer string", "there are many of these"] 567 iex> stream = Task.async_stream(strings, fn text -> text |> String.codepoints() |> Enum.count() end) 568 iex> Enum.reduce(stream, 0, fn {:ok, num}, acc -> num + acc end) 569 47 570 571 See `async_stream/5` for discussion, options, and more examples. 572 """ 573 @doc since: "1.4.0" 574 @spec async_stream(Enumerable.t(), (term -> term), keyword) :: Enumerable.t() 575 def async_stream(enumerable, fun, options \\ []) 576 when is_function(fun, 1) and is_list(options) do 577 build_stream(enumerable, fun, options) 578 end 579 580 defp build_stream(enumerable, fun, options) do 581 &Task.Supervised.stream(enumerable, &1, &2, fun, options, fn [owner | _] = callers, mfa -> 582 {:ok, pid} = Task.Supervised.start_link(get_owner(owner), callers, :nomonitor, mfa) 583 {:ok, :link, pid} 584 end) 585 end 586 587 # Returns a tuple with the node where this is executed and either the 588 # registered name of the given PID or the PID of where this is executed. Used 589 # when exiting from tasks to print out from where the task was started. 590 defp get_owner(pid) do 591 self_or_name = 592 case Process.info(pid, :registered_name) do 593 {:registered_name, name} when is_atom(name) -> name 594 _ -> pid 595 end 596 597 {node(), self_or_name, pid} 598 end 599 600 defp get_callers(owner) do 601 case :erlang.get(:"$callers") do 602 [_ | _] = list -> [owner | list] 603 _ -> [owner] 604 end 605 end 606 607 @doc ~S""" 608 Awaits a task reply and returns it. 609 610 In case the task process dies, the current process will exit with the same 611 reason as the task. 612 613 A timeout, in milliseconds or `:infinity`, can be given with a default value 614 of `5000`. If the timeout is exceeded, then the current process will exit. If 615 the task process is linked to the current process which is the case when a 616 task is started with `async`, then the task process will also exit. If the 617 task process is trapping exits or not linked to the current process, then it 618 will continue to run. 619 620 This function assumes the task's monitor is still active or the monitor's 621 `:DOWN` message is in the message queue. If it has been demonitored, or the 622 message already received, this function will wait for the duration of the 623 timeout awaiting the message. 624 625 This function can only be called once for any given task. If you want 626 to be able to check multiple times if a long-running task has finished 627 its computation, use `yield/2` instead. 628 629 ## Examples 630 631 iex> task = Task.async(fn -> 1 + 1 end) 632 iex> Task.await(task) 633 2 634 635 ## Compatibility with OTP behaviours 636 637 It is not recommended to `await` a long-running task inside an OTP 638 behaviour such as `GenServer`. Instead, you should match on the message 639 coming from a task inside your `c:GenServer.handle_info/2` callback. 640 641 A GenServer will receive two messages on `handle_info/2`: 642 643 * `{ref, result}` - the reply message where `ref` is the monitor 644 reference returned by the `task.ref` and `result` is the task 645 result 646 647 * `{:DOWN, ref, :process, pid, reason}` - since all tasks are also 648 monitored, you will also receive the `:DOWN` message delivered by 649 `Process.monitor/1`. If you receive the `:DOWN` message without a 650 a reply, it means the task crashed 651 652 Another consideration to have in mind is that tasks started by `Task.async/1` 653 are always linked to their callers and you may not want the GenServer to 654 crash if the task crashes. Therefore, it is preferable to instead use 655 `Task.Supervisor.async_nolink/3` inside OTP behaviours. For completeness, here 656 is an example of a GenServer that start tasks and handles their results: 657 658 defmodule GenServerTaskExample do 659 use GenServer 660 661 def start_link(opts) do 662 GenServer.start_link(__MODULE__, :ok, opts) 663 end 664 665 def init(_opts) do 666 # We will keep all running tasks in a map 667 {:ok, %{tasks: %{}}} 668 end 669 670 # Imagine we invoke a task from the GenServer to access a URL... 671 def handle_call(:some_message, _from, state) do 672 url = ... 673 task = Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn -> fetch_url(url) end) 674 675 # After we start the task, we store its reference and the url it is fetching 676 state = put_in(state.tasks[task.ref], url) 677 678 {:reply, :ok, state} 679 end 680 681 # If the task succeeds... 682 def handle_info({ref, result}, state) do 683 # The task succeed so we can cancel the monitoring and discard the DOWN message 684 Process.demonitor(ref, [:flush]) 685 686 {url, state} = pop_in(state.tasks[ref]) 687 IO.puts "Got #{inspect(result)} for URL #{inspect url}" 688 {:noreply, state} 689 end 690 691 # If the task fails... 692 def handle_info({:DOWN, ref, _, _, reason}, state) do 693 {url, state} = pop_in(state.tasks[ref]) 694 IO.puts "URL #{inspect url} failed with reason #{inspect(reason)}" 695 {:noreply, state} 696 end 697 end 698 699 With the server defined, you will want to start the task supervisor 700 above and the GenServer in your supervision tree: 701 702 children = [ 703 {Task.Supervisor, name: MyApp.TaskSupervisor}, 704 {GenServerTaskExample, name: MyApp.GenServerTaskExample} 705 ] 706 707 Supervisor.start_link(children, strategy: :one_for_one) 708 709 """ 710 @spec await(t, timeout) :: term 711 def await(%Task{ref: ref, owner: owner} = task, timeout \\ 5000) when is_timeout(timeout) do 712 if owner != self() do 713 raise ArgumentError, invalid_owner_error(task) 714 end 715 716 receive do 717 {^ref, reply} -> 718 Process.demonitor(ref, [:flush]) 719 reply 720 721 {:DOWN, ^ref, _, proc, reason} -> 722 exit({reason(reason, proc), {__MODULE__, :await, [task, timeout]}}) 723 after 724 timeout -> 725 Process.demonitor(ref, [:flush]) 726 exit({:timeout, {__MODULE__, :await, [task, timeout]}}) 727 end 728 end 729 730 @doc """ 731 Awaits replies from multiple tasks and returns them. 732 733 This function receives a list of tasks and waits for their replies in the 734 given time interval. It returns a list of the results, in the same order as 735 the tasks supplied in the `tasks` input argument. 736 737 If any of the task processes dies, the current process will exit with the 738 same reason as that task. 739 740 A timeout, in milliseconds or `:infinity`, can be given with a default value 741 of `5000`. If the timeout is exceeded, then the current process will exit. 742 Any task processes that are linked to the current process (which is the case 743 when a task is started with `async`) will also exit. Any task processes that 744 are trapping exits or not linked to the current process will continue to run. 745 746 This function assumes the tasks' monitors are still active or the monitors' 747 `:DOWN` message is in the message queue. If any tasks have been demonitored, 748 or the message already received, this function will wait for the duration of 749 the timeout. 750 751 This function can only be called once for any given task. If you want to be 752 able to check multiple times if a long-running task has finished its 753 computation, use `yield_many/2` instead. 754 755 ## Compatibility with OTP behaviours 756 757 It is not recommended to `await` long-running tasks inside an OTP behaviour 758 such as `GenServer`. See `await/2` for more information. 759 760 ## Examples 761 762 iex> tasks = [ 763 ...> Task.async(fn -> 1 + 1 end), 764 ...> Task.async(fn -> 2 + 3 end) 765 ...> ] 766 iex> Task.await_many(tasks) 767 [2, 5] 768 769 """ 770 @doc since: "1.11.0" 771 @spec await_many([t], timeout) :: [term] 772 def await_many(tasks, timeout \\ 5000) when is_timeout(timeout) do 773 awaiting = 774 for task <- tasks, into: %{} do 775 %Task{ref: ref, owner: owner} = task 776 777 if owner != self() do 778 raise ArgumentError, invalid_owner_error(task) 779 end 780 781 {ref, true} 782 end 783 784 timeout_ref = make_ref() 785 786 timer_ref = 787 if timeout != :infinity do 788 Process.send_after(self(), timeout_ref, timeout) 789 end 790 791 try do 792 await_many(tasks, timeout, awaiting, %{}, timeout_ref) 793 after 794 timer_ref && Process.cancel_timer(timer_ref) 795 receive do: (^timeout_ref -> :ok), after: (0 -> :ok) 796 end 797 end 798 799 defp await_many(tasks, _timeout, awaiting, replies, _timeout_ref) 800 when map_size(awaiting) == 0 do 801 for %{ref: ref} <- tasks, do: Map.fetch!(replies, ref) 802 end 803 804 defp await_many(tasks, timeout, awaiting, replies, timeout_ref) do 805 receive do 806 ^timeout_ref -> 807 demonitor_pending_tasks(awaiting) 808 exit({:timeout, {__MODULE__, :await_many, [tasks, timeout]}}) 809 810 {:DOWN, ref, _, proc, reason} when is_map_key(awaiting, ref) -> 811 demonitor_pending_tasks(awaiting) 812 exit({reason(reason, proc), {__MODULE__, :await_many, [tasks, timeout]}}) 813 814 {ref, reply} when is_map_key(awaiting, ref) -> 815 Process.demonitor(ref, [:flush]) 816 817 await_many( 818 tasks, 819 timeout, 820 Map.delete(awaiting, ref), 821 Map.put(replies, ref, reply), 822 timeout_ref 823 ) 824 end 825 end 826 827 defp demonitor_pending_tasks(awaiting) do 828 Enum.each(awaiting, fn {ref, _} -> 829 Process.demonitor(ref, [:flush]) 830 end) 831 end 832 833 @doc false 834 @deprecated "Pattern match directly on the message instead" 835 def find(tasks, {ref, reply}) when is_reference(ref) do 836 Enum.find_value(tasks, fn 837 %Task{ref: ^ref} = task -> 838 Process.demonitor(ref, [:flush]) 839 {reply, task} 840 841 %Task{} -> 842 nil 843 end) 844 end 845 846 def find(tasks, {:DOWN, ref, _, proc, reason} = msg) when is_reference(ref) do 847 find = fn %Task{ref: task_ref} -> task_ref == ref end 848 849 if Enum.find(tasks, find) do 850 exit({reason(reason, proc), {__MODULE__, :find, [tasks, msg]}}) 851 end 852 end 853 854 def find(_tasks, _msg) do 855 nil 856 end 857 858 @doc ~S""" 859 Temporarily blocks the current process waiting for a task reply. 860 861 Returns `{:ok, reply}` if the reply is received, `nil` if 862 no reply has arrived, or `{:exit, reason}` if the task has already 863 exited. Keep in mind that normally a task failure also causes 864 the process owning the task to exit. Therefore this function can 865 return `{:exit, reason}` only if 866 867 * the task process exited with the reason `:normal` 868 * it isn't linked to the caller 869 * the caller is trapping exits 870 871 A timeout, in milliseconds or `:infinity`, can be given with a default value 872 of `5000`. If the time runs out before a message from the task is received, 873 this function will return `nil` and the monitor will remain active. Therefore 874 `yield/2` can be called multiple times on the same task. 875 876 This function assumes the task's monitor is still active or the 877 monitor's `:DOWN` message is in the message queue. If it has been 878 demonitored or the message already received, this function will wait 879 for the duration of the timeout awaiting the message. 880 881 If you intend to shut the task down if it has not responded within `timeout` 882 milliseconds, you should chain this together with `shutdown/1`, like so: 883 884 case Task.yield(task, timeout) || Task.shutdown(task) do 885 {:ok, result} -> 886 result 887 888 nil -> 889 Logger.warn("Failed to get a result in #{timeout}ms") 890 nil 891 end 892 893 That ensures that if the task completes after the `timeout` but before `shutdown/1` 894 has been called, you will still get the result, since `shutdown/1` is designed to 895 handle this case and return the result. 896 """ 897 @spec yield(t, timeout) :: {:ok, term} | {:exit, term} | nil 898 def yield(%Task{ref: ref, owner: owner} = task, timeout \\ 5000) when is_timeout(timeout) do 899 if owner != self() do 900 raise ArgumentError, invalid_owner_error(task) 901 end 902 903 receive do 904 {^ref, reply} -> 905 Process.demonitor(ref, [:flush]) 906 {:ok, reply} 907 908 {:DOWN, ^ref, _, proc, :noconnection} -> 909 exit({reason(:noconnection, proc), {__MODULE__, :yield, [task, timeout]}}) 910 911 {:DOWN, ^ref, _, _, reason} -> 912 {:exit, reason} 913 after 914 timeout -> 915 nil 916 end 917 end 918 919 @doc """ 920 Yields to multiple tasks in the given time interval. 921 922 This function receives a list of tasks and waits for their 923 replies in the given time interval. It returns a list 924 of two-element tuples, with the task as the first element 925 and the yielded result as the second. The tasks in the returned 926 list will be in the same order as the tasks supplied in the `tasks` 927 input argument. 928 929 Similarly to `yield/2`, each task's result will be 930 931 * `{:ok, term}` if the task has successfully reported its 932 result back in the given time interval 933 * `{:exit, reason}` if the task has died 934 * `nil` if the task keeps running past the timeout 935 936 A timeout, in milliseconds or `:infinity`, can be given with a default value 937 of `5000`. 938 939 Check `yield/2` for more information. 940 941 ## Example 942 943 `Task.yield_many/2` allows developers to spawn multiple tasks 944 and retrieve the results received in a given timeframe. 945 If we combine it with `Task.shutdown/2`, it allows us to gather 946 those results and cancel the tasks that have not replied in time. 947 948 Let's see an example. 949 950 tasks = 951 for i <- 1..10 do 952 Task.async(fn -> 953 Process.sleep(i * 1000) 954 i 955 end) 956 end 957 958 tasks_with_results = Task.yield_many(tasks, 5000) 959 960 results = 961 Enum.map(tasks_with_results, fn {task, res} -> 962 # Shut down the tasks that did not reply nor exit 963 res || Task.shutdown(task, :brutal_kill) 964 end) 965 966 # Here we are matching only on {:ok, value} and 967 # ignoring {:exit, _} (crashed tasks) and `nil` (no replies) 968 for {:ok, value} <- results do 969 IO.inspect(value) 970 end 971 972 In the example above, we create tasks that sleep from 1 973 up to 10 seconds and return the number of seconds they slept for. 974 If you execute the code all at once, you should see 1 up to 5 975 printed, as those were the tasks that have replied in the 976 given time. All other tasks will have been shut down using 977 the `Task.shutdown/2` call. 978 """ 979 @spec yield_many([t], timeout) :: [{t, {:ok, term} | {:exit, term} | nil}] 980 def yield_many(tasks, timeout \\ 5000) when is_timeout(timeout) do 981 timeout_ref = make_ref() 982 983 timer_ref = 984 if timeout != :infinity do 985 Process.send_after(self(), timeout_ref, timeout) 986 end 987 988 try do 989 yield_many(tasks, timeout_ref, :infinity) 990 catch 991 {:noconnection, reason} -> 992 exit({reason, {__MODULE__, :yield_many, [tasks, timeout]}}) 993 after 994 timer_ref && Process.cancel_timer(timer_ref) 995 receive do: (^timeout_ref -> :ok), after: (0 -> :ok) 996 end 997 end 998 999 defp yield_many([%Task{ref: ref, owner: owner} = task | rest], timeout_ref, timeout) do 1000 if owner != self() do 1001 raise ArgumentError, invalid_owner_error(task) 1002 end 1003 1004 receive do 1005 {^ref, reply} -> 1006 Process.demonitor(ref, [:flush]) 1007 [{task, {:ok, reply}} | yield_many(rest, timeout_ref, timeout)] 1008 1009 {:DOWN, ^ref, _, proc, :noconnection} -> 1010 throw({:noconnection, reason(:noconnection, proc)}) 1011 1012 {:DOWN, ^ref, _, _, reason} -> 1013 [{task, {:exit, reason}} | yield_many(rest, timeout_ref, timeout)] 1014 1015 ^timeout_ref -> 1016 [{task, nil} | yield_many(rest, timeout_ref, 0)] 1017 after 1018 timeout -> 1019 [{task, nil} | yield_many(rest, timeout_ref, 0)] 1020 end 1021 end 1022 1023 defp yield_many([], _timeout_ref, _timeout) do 1024 [] 1025 end 1026 1027 @doc """ 1028 Unlinks and shuts down the task, and then checks for a reply. 1029 1030 Returns `{:ok, reply}` if the reply is received while shutting down the task, 1031 `{:exit, reason}` if the task died, otherwise `nil`. 1032 1033 The second argument is either a timeout or `:brutal_kill`. In case 1034 of a timeout, a `:shutdown` exit signal is sent to the task process 1035 and if it does not exit within the timeout, it is killed. With `:brutal_kill` 1036 the task is killed straight away. In case the task terminates abnormally 1037 (possibly killed by another process), this function will exit with the same reason. 1038 1039 It is not required to call this function when terminating the caller, unless 1040 exiting with reason `:normal` or if the task is trapping exits. If the caller is 1041 exiting with a reason other than `:normal` and the task is not trapping exits, the 1042 caller's exit signal will stop the task. The caller can exit with reason 1043 `:shutdown` to shut down all of its linked processes, including tasks, that 1044 are not trapping exits without generating any log messages. 1045 1046 If a task's monitor has already been demonitored or received and there is not 1047 a response waiting in the message queue this function will return 1048 `{:exit, :noproc}` as the result or exit reason can not be determined. 1049 """ 1050 @spec shutdown(t, timeout | :brutal_kill) :: {:ok, term} | {:exit, term} | nil 1051 def shutdown(task, shutdown \\ 5000) 1052 1053 def shutdown(%Task{pid: nil} = task, _) do 1054 raise ArgumentError, "task #{inspect(task)} does not have an associated task process" 1055 end 1056 1057 def shutdown(%Task{owner: owner} = task, _) when owner != self() do 1058 raise ArgumentError, invalid_owner_error(task) 1059 end 1060 1061 def shutdown(%Task{pid: pid} = task, :brutal_kill) do 1062 mon = Process.monitor(pid) 1063 exit(pid, :kill) 1064 1065 case shutdown_receive(task, mon, :brutal_kill, :infinity) do 1066 {:down, proc, :noconnection} -> 1067 exit({reason(:noconnection, proc), {__MODULE__, :shutdown, [task, :brutal_kill]}}) 1068 1069 {:down, _, reason} -> 1070 {:exit, reason} 1071 1072 result -> 1073 result 1074 end 1075 end 1076 1077 def shutdown(%Task{pid: pid} = task, timeout) when is_timeout(timeout) do 1078 mon = Process.monitor(pid) 1079 exit(pid, :shutdown) 1080 1081 case shutdown_receive(task, mon, :shutdown, timeout) do 1082 {:down, proc, :noconnection} -> 1083 exit({reason(:noconnection, proc), {__MODULE__, :shutdown, [task, timeout]}}) 1084 1085 {:down, _, reason} -> 1086 {:exit, reason} 1087 1088 result -> 1089 result 1090 end 1091 end 1092 1093 ## Helpers 1094 1095 defp reason(:noconnection, proc), do: {:nodedown, monitor_node(proc)} 1096 defp reason(reason, _), do: reason 1097 1098 defp monitor_node(pid) when is_pid(pid), do: node(pid) 1099 defp monitor_node({_, node}), do: node 1100 1101 # spawn a process to ensure task gets exit signal if process dies from exit signal 1102 # between unlink and exit. 1103 defp exit(task, reason) do 1104 caller = self() 1105 ref = make_ref() 1106 enforcer = spawn(fn -> enforce_exit(task, reason, caller, ref) end) 1107 Process.unlink(task) 1108 Process.exit(task, reason) 1109 send(enforcer, {:done, ref}) 1110 :ok 1111 end 1112 1113 defp enforce_exit(pid, reason, caller, ref) do 1114 mon = Process.monitor(caller) 1115 1116 receive do 1117 {:done, ^ref} -> :ok 1118 {:DOWN, ^mon, _, _, _} -> Process.exit(pid, reason) 1119 end 1120 end 1121 1122 defp shutdown_receive(%{ref: ref} = task, mon, type, timeout) do 1123 receive do 1124 {:DOWN, ^mon, _, _, :shutdown} when type in [:shutdown, :timeout_kill] -> 1125 Process.demonitor(ref, [:flush]) 1126 flush_reply(ref) 1127 1128 {:DOWN, ^mon, _, _, :killed} when type == :brutal_kill -> 1129 Process.demonitor(ref, [:flush]) 1130 flush_reply(ref) 1131 1132 {:DOWN, ^mon, _, proc, :noproc} -> 1133 reason = flush_noproc(ref, proc, type) 1134 flush_reply(ref) || reason 1135 1136 {:DOWN, ^mon, _, proc, reason} -> 1137 Process.demonitor(ref, [:flush]) 1138 flush_reply(ref) || {:down, proc, reason} 1139 after 1140 timeout -> 1141 Process.exit(task.pid, :kill) 1142 shutdown_receive(task, mon, :timeout_kill, :infinity) 1143 end 1144 end 1145 1146 defp flush_reply(ref) do 1147 receive do 1148 {^ref, reply} -> {:ok, reply} 1149 after 1150 0 -> nil 1151 end 1152 end 1153 1154 defp flush_noproc(ref, proc, type) do 1155 receive do 1156 {:DOWN, ^ref, _, _, :shutdown} when type in [:shutdown, :timeout_kill] -> 1157 nil 1158 1159 {:DOWN, ^ref, _, _, :killed} when type == :brutal_kill -> 1160 nil 1161 1162 {:DOWN, ^ref, _, _, reason} -> 1163 {:down, proc, reason} 1164 after 1165 0 -> 1166 Process.demonitor(ref, [:flush]) 1167 {:down, proc, :noproc} 1168 end 1169 end 1170 1171 defp invalid_owner_error(task) do 1172 "task #{inspect(task)} must be queried from the owner but was queried from #{inspect(self())}" 1173 end 1174end 1175