1defmodule Task do
2  @moduledoc """
3  Conveniences for spawning and awaiting tasks.
4
5  Tasks are processes meant to execute one particular
6  action throughout their lifetime, often with little or no
7  communication with other processes. The most common use case
8  for tasks is to convert sequential code into concurrent code
9  by computing a value asynchronously:
10
11      task = Task.async(fn -> do_some_work() end)
12      res = do_some_other_work()
13      res + Task.await(task)
14
15  Tasks spawned with `async` can be awaited on by their caller
16  process (and only their caller) as shown in the example above.
17  They are implemented by spawning a process that sends a message
18  to the caller once the given computation is performed.
19
20  Besides `async/1` and `await/2`, tasks can also be
21  started as part of a supervision tree and dynamically spawned
22  on remote nodes. We will explore these scenarios next.
23
24  ## async and await
25
26  One of the common uses of tasks is to convert sequential code
27  into concurrent code with `Task.async/1` while keeping its semantics.
28  When invoked, a new process will be created, linked and monitored
29  by the caller. Once the task action finishes, a message will be sent
30  to the caller with the result.
31
32  `Task.await/2` is used to read the message sent by the task.
33
34  There are two important things to consider when using `async`:
35
36    1. If you are using async tasks, you **must await** a reply
37       as they are *always* sent. If you are not expecting a reply,
38       consider using `Task.start_link/1` detailed below.
39
40    2. async tasks link the caller and the spawned process. This
41       means that, if the caller crashes, the task will crash
42       too and vice-versa. This is on purpose: if the process
43       meant to receive the result no longer exists, there is
44       no purpose in completing the computation.
45
46       If this is not desired, you will want to use supervised
47       tasks, described next.
48
49  ## Dynamically supervised tasks
50
51  The `Task.Supervisor` module allows developers to dynamically
52  create multiple supervised tasks.
53
54  A short example is:
55
56      {:ok, pid} = Task.Supervisor.start_link()
57
58      task =
59        Task.Supervisor.async(pid, fn ->
60          # Do something
61        end)
62
63      Task.await(task)
64
65  However, in the majority of cases, you want to add the task supervisor
66  to your supervision tree:
67
68      Supervisor.start_link([
69        {Task.Supervisor, name: MyApp.TaskSupervisor}
70      ], strategy: :one_for_one)
71
72  And now you can use async/await once again passig the name of
73  the supervisor isntead of the pid:
74
75      Task.Supervisor.async(MyApp.TaskSupervisor, fn ->
76        # Do something
77      end)
78      |> Task.await()
79
80  We encourage developers to rely on supervised tasks as much as
81  possible. Supervised tasks enable a huge variety of patterns
82  which allows you explicit control on how to handle the results,
83  errors, and timeouts. Here is a summary:
84
85    * Use `Task.Supervisor.start_child/2` to start a fire-and-forget
86      task and you don't care about its results nor about if it completes
87      successfully
88
89    * Use `Task.Supervisor.async/2` + `Task.await/2` allows you to execute
90      tasks concurrently and retrieve its result. If the task fails,
91      the caller will also fail
92
93    * Use `Task.Supervisor.async_nolink/2` + `Task.yield/2` + `Task.shutdown/2`
94      allows you to execute tasks concurrently and retrieve their results
95      or the reason they failed within a given time frame. If the task fails,
96      the caller won't fail: you will receive the error reason either on
97      `yield` or `shutdown`
98
99  See the `Task.Supervisor` module for details on the supported operations.
100
101  ### Distributed tasks
102
103  Since Elixir provides a `Task.Supervisor`, it is easy to use one
104  to dynamically start tasks across nodes:
105
106      # On the remote node
107      Task.Supervisor.start_link(name: MyApp.DistSupervisor)
108
109      # On the client
110      supervisor = {MyApp.DistSupervisor, :remote@local}
111      Task.Supervisor.async(supervisor, MyMod, :my_fun, [arg1, arg2, arg3])
112
113  Note that, when working with distributed tasks, one should use the
114  `Task.Supervisor.async/4` function that expects explicit module, function,
115  and arguments, instead of `Task.Supervisor.async/2` that works with anonymous
116  functions. That's because anonymous functions expect the same module version
117  to exist on all involved nodes. Check the `Agent` module documentation for
118  more information on distributed processes as the limitations described there
119  apply to the whole ecosystem.
120
121  ## Statically supervised tasks
122
123  The `Task` module implements the `child_spec/1` function, which
124  allows it to be started directly under a regular `Supervisor` -
125  instead of a `Task.Supervisor` - by passing a tuple with a function
126  to run:
127
128      Supervisor.start_link([
129        {Task, fn -> :some_work end}
130      ], strategy: :one_for_one)
131
132  This is often useful when you need to execute some steps while
133  setting up your supervision tree. For example: to warm up caches,
134  log the initialization status, etc.
135
136  If you don't want to put the Task code directly under the `Supervisor`,
137  you can wrap the `Task` in its own module, similar to how you would
138  do with a `GenServer` or an `Agent`:
139
140      defmodule MyTask do
141        use Task
142
143        def start_link(arg) do
144          Task.start_link(__MODULE__, :run, [arg])
145        end
146
147        def run(arg) do
148          # ...
149        end
150      end
151
152  And then passing it to the supervisor:
153
154      Supervisor.start_link([
155        {MyTask, arg}
156      ], strategy: :one_for_one)
157
158  Since these tasks are supervised and not directly linked to the caller,
159  they cannot be awaited on. By default, the functions `Task.start`
160  and `Task.start_link` are for fire-and-forget tasks, where you don't
161  care about the results or if it completes successfully or not.
162
163  `use Task` defines a `child_spec/1` function, allowing the
164  defined module to be put under a supervision tree. The generated
165  `child_spec/1` can be customized with the following options:
166
167    * `:id` - the child specification identifier, defaults to the current module
168    * `:restart` - when the child should be restarted, defaults to `:temporary`
169    * `:shutdown` - how to shut down the child, either immediately or by giving it time to shut down
170
171  Opposite to `GenServer`, `Agent` and `Supervisor`, a Task has
172  a default `:restart` of `:temporary`. This means the task will
173  not be restarted even if it crashes. If you desire the task to
174  be restarted for non-successful exits, do:
175
176      use Task, restart: :transient
177
178  If you want the task to always be restarted:
179
180      use Task, restart: :permanent
181
182  See the "Child specification" section in the `Supervisor` module
183  for more detailed information. The `@doc` annotation immediately
184  preceding `use Task` will be attached to the generated `child_spec/1`
185  function.
186
187  ## Ancestor and Caller Tracking
188
189  Whenever you start a new process, Elixir annotates the parent of that process
190  through the `$ancestors` key in the process dictionary. This is often used to
191  track the hierarchy inside a supervision tree.
192
193  For example, we recommend developers to always start tasks under a supervisor.
194  This provides more visibility and allows you to control how those tasks are
195  terminated when a node shuts down. That might look something like
196  `Task.Supervisor.start_child(MySupervisor, task_specification)`. This means
197  that, although your code is the one who invokes the task, the actual ancestor of
198  the task is the supervisor, as the supervisor is the one effectively starting it.
199
200  To track the relationship between your code and the task, we use the `$callers`
201  key in the process dictionary. Therefore, assuming the `Task.Supervisor` call
202  above, we have:
203
204      [your code] -- calls --> [supervisor] ---- spawns --> [task]
205
206  Which means we store the following relationships:
207
208      [your code]              [supervisor] <-- ancestor -- [task]
209          ^                                                  |
210          |--------------------- caller ---------------------|
211
212  The list of callers of the current process can be retrieved from the Process
213  dictionary with `Process.get(:"$callers")`. This will return either `nil` or
214  a list `[pid_n, ..., pid2, pid1]` with at least one entry Where `pid_n` is
215  the PID that called the current process, `pid2` called `pid_n`, and `pid2` was
216  called by `pid1`.
217
218  If a task crashes, the callers field is included as part of the log message
219  metadata under the `:callers` key.
220  """
221
222  @doc """
223  The Task struct.
224
225  It contains these fields:
226
227    * `:pid` - the PID of the task process; `nil` if the task does
228      not use a task process
229
230    * `:ref` - the task monitor reference
231
232    * `:owner` - the PID of the process that started the task
233
234  """
235  @enforce_keys [:pid, :ref, :owner]
236  defstruct pid: nil, ref: nil, owner: nil
237
238  @typedoc """
239  The Task type.
240
241  See `%Task{}` for information about each field of the structure.
242  """
243  @type t :: %__MODULE__{
244          pid: pid() | nil,
245          ref: reference() | nil,
246          owner: pid() | nil
247        }
248
249  defguardp is_timeout(timeout)
250            when timeout == :infinity or (is_integer(timeout) and timeout >= 0)
251
252  @doc """
253  Returns a specification to start a task under a supervisor.
254
255  `arg` is passed as the argument to `Task.start_link/1` in the `:start` field
256  of the spec.
257
258  For more information, see the `Supervisor` module,
259  the `Supervisor.child_spec/2` function and the `t:Supervisor.child_spec/0` type.
260  """
261  @doc since: "1.5.0"
262  @spec child_spec(term) :: Supervisor.child_spec()
263  def child_spec(arg) do
264    %{
265      id: Task,
266      start: {Task, :start_link, [arg]},
267      restart: :temporary
268    }
269  end
270
271  @doc false
272  defmacro __using__(opts) do
273    quote location: :keep, bind_quoted: [opts: opts] do
274      unless Module.has_attribute?(__MODULE__, :doc) do
275        @doc """
276        Returns a specification to start this module under a supervisor.
277
278        `arg` is passed as the argument to `Task.start_link/1` in the `:start` field
279        of the spec.
280
281        For more information, see the `Supervisor` module,
282        the `Supervisor.child_spec/2` function and the `t:Supervisor.child_spec/0` type.
283        """
284      end
285
286      def child_spec(arg) do
287        default = %{
288          id: __MODULE__,
289          start: {__MODULE__, :start_link, [arg]},
290          restart: :temporary
291        }
292
293        Supervisor.child_spec(default, unquote(Macro.escape(opts)))
294      end
295
296      defoverridable child_spec: 1
297    end
298  end
299
300  @doc """
301  Starts a task as part of a supervision tree with the given `fun`.
302
303  `fun` must be a zero-arity anonymous function.
304
305  This is used to start a statically supervised task under a supervision tree.
306  """
307  @spec start_link((() -> any)) :: {:ok, pid}
308  def start_link(fun) when is_function(fun, 0) do
309    start_link(:erlang, :apply, [fun, []])
310  end
311
312  @doc """
313  Starts a task as part of a supervision tree with the given
314  `module`, `function`, and `args`.
315
316  This is used to start a statically supervised task under a supervision tree.
317  """
318  @spec start_link(module, atom, [term]) :: {:ok, pid}
319  def start_link(module, function, args)
320      when is_atom(module) and is_atom(function) and is_list(args) do
321    mfa = {module, function, args}
322    Task.Supervised.start_link(get_owner(self()), get_callers(self()), mfa)
323  end
324
325  @doc """
326  Starts a task.
327
328  `fun` must be a zero-arity anonymous function.
329
330  This should only used when the task is used for side-effects
331  (like I/O) and you have no interest on its results nor if it
332  completes successfully.
333
334  If the current node is shutdown, the node will terminate even
335  if the task was not completed. For this reason, we recommend
336  to use `Task.Supervisor.start_child/2` instead, which allows
337  you to control the shutdown time via the `:shutdown` option.
338  """
339  @spec start((() -> any)) :: {:ok, pid}
340  def start(fun) when is_function(fun, 0) do
341    start(:erlang, :apply, [fun, []])
342  end
343
344  @doc """
345  Starts a task.
346
347  This should only used when the task is used for side-effects
348  (like I/O) and you have no interest on its results nor if it
349  completes successfully.
350
351  If the current node is shutdown, the node will terminate even
352  if the task was not completed. For this reason, we recommend
353  to use `Task.Supervisor.start_child/2` instead, which allows
354  you to control the shutdown time via the `:shutdown` option.
355  """
356  @spec start(module, atom, [term]) :: {:ok, pid}
357  def start(module, function_name, args)
358      when is_atom(module) and is_atom(function_name) and is_list(args) do
359    mfa = {module, function_name, args}
360    Task.Supervised.start(get_owner(self()), get_callers(self()), mfa)
361  end
362
363  @doc """
364  Starts a task that must be awaited on.
365
366  `fun` must be a zero-arity anonymous function. This function
367  spawns a process that is linked to and monitored by the caller
368  process. A `Task` struct is returned containing the relevant
369  information. Developers must eventually call `Task.await/2` or
370  `Task.yield/2` followed by `Task.shutdown/2` on the returned task.
371
372  Read the `Task` module documentation for more information about
373  the general usage of async tasks.
374
375  ## Linking
376
377  This function spawns a process that is linked to and monitored
378  by the caller process. The linking part is important because it
379  aborts the task if the parent process dies. It also guarantees
380  the code before async/await has the same properties after you
381  add the async call. For example, imagine you have this:
382
383      x = heavy_fun()
384      y = some_fun()
385      x + y
386
387  Now you want to make the `heavy_fun()` async:
388
389      x = Task.async(&heavy_fun/0)
390      y = some_fun()
391      Task.await(x) + y
392
393  As before, if `heavy_fun/0` fails, the whole computation will
394  fail, including the parent process. If you don't want the task
395  to fail then you must change the `heavy_fun/0` code in the
396  same way you would achieve it if you didn't have the async call.
397  For example, to either return `{:ok, val} | :error` results or,
398  in more extreme cases, by using `try/rescue`. In other words,
399  an asynchronous task should be thought of as an extension of a
400  process rather than a mechanism to isolate it from all errors.
401
402  If you don't want to link the caller to the task, then you
403  must use a supervised task with `Task.Supervisor` and call
404  `Task.Supervisor.async_nolink/2`.
405
406  In any case, avoid any of the following:
407
408    * Setting `:trap_exit` to `true` - trapping exits should be
409      used only in special circumstances as it would make your
410      process immune to not only exits from the task but from
411      any other processes.
412
413      Moreover, even when trapping exits, calling `await` will
414      still exit if the task has terminated without sending its
415      result back.
416
417    * Unlinking the task process started with `async`/`await`.
418      If you unlink the processes and the task does not belong
419      to any supervisor, you may leave dangling tasks in case
420      the parent dies.
421
422  """
423  @spec async((() -> any)) :: t
424  def async(fun) when is_function(fun, 0) do
425    async(:erlang, :apply, [fun, []])
426  end
427
428  @doc """
429  Starts a task that must be awaited on.
430
431  Similar to `async/1` except the function to be started is
432  specified by the given `module`, `function_name`, and `args`.
433  """
434  @spec async(module, atom, [term]) :: t
435  def async(module, function_name, args)
436      when is_atom(module) and is_atom(function_name) and is_list(args) do
437    mfa = {module, function_name, args}
438    owner = self()
439    {:ok, pid} = Task.Supervised.start_link(get_owner(owner), get_callers(owner), :nomonitor, mfa)
440    ref = Process.monitor(pid)
441    send(pid, {owner, ref})
442    %Task{pid: pid, ref: ref, owner: owner}
443  end
444
445  @doc """
446  Returns a stream where the given function (`module` and `function_name`)
447  is mapped concurrently on each element in `enumerable`.
448
449  Each element of `enumerable` will be prepended to the given `args` and
450  processed by its own task. The tasks will be linked to an intermediate
451  process that is then linked to the current process. This means a failure
452  in a task terminates the current process and a failure in the current process
453  terminates all tasks.
454
455  When streamed, each task will emit `{:ok, value}` upon successful
456  completion or `{:exit, reason}` if the caller is trapping exits.
457  The order of results depends on the value of the `:ordered` option.
458
459  The level of concurrency and the time tasks are allowed to run can
460  be controlled via options (see the "Options" section below).
461
462  Consider using `Task.Supervisor.async_stream/6` to start tasks
463  under a supervisor. If you find yourself trapping exits to handle exits
464  inside the async stream, consider using `Task.Supervisor.async_stream_nolink/6`
465  to start tasks that are not linked to the calling process.
466
467  ## Options
468
469    * `:max_concurrency` - sets the maximum number of tasks to run
470      at the same time. Defaults to `System.schedulers_online/0`.
471
472    * `:ordered` - whether the results should be returned in the same order
473      as the input stream. When the output is ordered, Elixir may need to
474      buffer results to emit them in the original order. Setting this option
475      to false disables the need to buffer at the cost of removing ordering.
476      This is also useful when you're using the tasks only for the side effects.
477      Note that regardless of what `:ordered` is set to, the tasks will
478      process asynchronously. If you need to process elements in order,
479      consider using `Enum.map/2` or `Enum.each/2` instead. Defaults to `true`.
480
481    * `:timeout` - the maximum amount of time (in milliseconds or `:infinity`)
482      each task is allowed to execute for. Defaults to `5000`.
483
484    * `:on_timeout` - what to do when a task times out. The possible
485      values are:
486      * `:exit` (default) - the process that spawned the tasks exits.
487      * `:kill_task` - the task that timed out is killed. The value
488        emitted for that task is `{:exit, :timeout}`.
489
490  ## Example
491
492  Let's build a stream and then enumerate it:
493
494      stream = Task.async_stream(collection, Mod, :expensive_fun, [])
495      Enum.to_list(stream)
496
497  The concurrency can be increased or decreased using the `:max_concurrency`
498  option. For example, if the tasks are IO heavy, the value can be increased:
499
500      max_concurrency = System.schedulers_online() * 2
501      stream = Task.async_stream(collection, Mod, :expensive_fun, [], max_concurrency: max_concurrency)
502      Enum.to_list(stream)
503
504  If you do not care about the results of the computation, you can run
505  the stream with `Stream.run/1`. Also set `ordered: false`, as you don't
506  care about the order of the results either:
507
508      stream = Task.async_stream(collection, Mod, :expensive_fun, [], ordered: false)
509      Stream.run(stream)
510
511  ## Attention: async + take
512
513  Given items in an async stream are processed concurrently, doing
514  `async_stream` followed by `Enum.take/2` may cause more items than
515  requested to be processed. Let's see an example:
516
517      1..100
518      |> Task.async_stream(fn i ->
519        Process.sleep(100)
520        IO.puts(to_string(i))
521      end)
522      |> Enum.take(10)
523
524  For a machine with 8 cores, the above will process 16 items instead
525  of 10. The reason is that `async_stream/5` always have 8 elements
526  processing at once. So by the time `Enum` says it got all elements
527  it needed, there are still 6 elements left to be processed.
528
529  The solution here is to use `Stream.take/2` instead of `Enum.take/2`
530  to filter elements before-hand:
531
532      1..100
533      |> Stream.take(10)
534      |> Task.async_stream(fn i ->
535        Process.sleep(100)
536        IO.puts(to_string(i))
537      end)
538      |> Enum.to_list()
539
540  If for some reason you cannot take the elements before hand,
541  you can use `:max_concurrency` to limit how many elements
542  may be over processed at the cost of reducing concurrency.
543  """
544  @doc since: "1.4.0"
545  @spec async_stream(Enumerable.t(), module, atom, [term], keyword) :: Enumerable.t()
546  def async_stream(enumerable, module, function_name, args, options \\ [])
547      when is_atom(module) and is_atom(function_name) and is_list(args) do
548    build_stream(enumerable, {module, function_name, args}, options)
549  end
550
551  @doc """
552  Returns a stream that runs the given function `fun` concurrently
553  on each element in `enumerable`.
554
555  Works the same as `async_stream/5` but with an anonymous function instead of a
556  module-function-arguments tuple. `fun` must be a one-arity anonymous function.
557
558  Each `enumerable` element is passed as argument to the given function `fun` and
559  processed by its own task. The tasks will be linked to the current process,
560  similarly to `async/1`.
561
562  ## Example
563
564  Count the code points in each string asynchronously, then add the counts together using reduce.
565
566      iex> strings = ["long string", "longer string", "there are many of these"]
567      iex> stream = Task.async_stream(strings, fn text -> text |> String.codepoints() |> Enum.count() end)
568      iex> Enum.reduce(stream, 0, fn {:ok, num}, acc -> num + acc end)
569      47
570
571  See `async_stream/5` for discussion, options, and more examples.
572  """
573  @doc since: "1.4.0"
574  @spec async_stream(Enumerable.t(), (term -> term), keyword) :: Enumerable.t()
575  def async_stream(enumerable, fun, options \\ [])
576      when is_function(fun, 1) and is_list(options) do
577    build_stream(enumerable, fun, options)
578  end
579
580  defp build_stream(enumerable, fun, options) do
581    &Task.Supervised.stream(enumerable, &1, &2, fun, options, fn [owner | _] = callers, mfa ->
582      {:ok, pid} = Task.Supervised.start_link(get_owner(owner), callers, :nomonitor, mfa)
583      {:ok, :link, pid}
584    end)
585  end
586
587  # Returns a tuple with the node where this is executed and either the
588  # registered name of the given PID or the PID of where this is executed. Used
589  # when exiting from tasks to print out from where the task was started.
590  defp get_owner(pid) do
591    self_or_name =
592      case Process.info(pid, :registered_name) do
593        {:registered_name, name} when is_atom(name) -> name
594        _ -> pid
595      end
596
597    {node(), self_or_name, pid}
598  end
599
600  defp get_callers(owner) do
601    case :erlang.get(:"$callers") do
602      [_ | _] = list -> [owner | list]
603      _ -> [owner]
604    end
605  end
606
607  @doc ~S"""
608  Awaits a task reply and returns it.
609
610  In case the task process dies, the current process will exit with the same
611  reason as the task.
612
613  A timeout, in milliseconds or `:infinity`, can be given with a default value
614  of `5000`. If the timeout is exceeded, then the current process will exit. If
615  the task process is linked to the current process which is the case when a
616  task is started with `async`, then the task process will also exit. If the
617  task process is trapping exits or not linked to the current process, then it
618  will continue to run.
619
620  This function assumes the task's monitor is still active or the monitor's
621  `:DOWN` message is in the message queue. If it has been demonitored, or the
622  message already received, this function will wait for the duration of the
623  timeout awaiting the message.
624
625  This function can only be called once for any given task. If you want
626  to be able to check multiple times if a long-running task has finished
627  its computation, use `yield/2` instead.
628
629  ## Examples
630
631      iex> task = Task.async(fn -> 1 + 1 end)
632      iex> Task.await(task)
633      2
634
635  ## Compatibility with OTP behaviours
636
637  It is not recommended to `await` a long-running task inside an OTP
638  behaviour such as `GenServer`. Instead, you should match on the message
639  coming from a task inside your `c:GenServer.handle_info/2` callback.
640
641  A GenServer will receive two messages on `handle_info/2`:
642
643    * `{ref, result}` - the reply message where `ref` is the monitor
644      reference returned by the `task.ref` and `result` is the task
645      result
646
647    * `{:DOWN, ref, :process, pid, reason}` - since all tasks are also
648      monitored, you will also receive the `:DOWN` message delivered by
649      `Process.monitor/1`. If you receive the `:DOWN` message without a
650      a reply, it means the task crashed
651
652  Another consideration to have in mind is that tasks started by `Task.async/1`
653  are always linked to their callers and you may not want the GenServer to
654  crash if the task crashes. Therefore, it is preferable to instead use
655  `Task.Supervisor.async_nolink/3` inside OTP behaviours. For completeness, here
656  is an example of a GenServer that start tasks and handles their results:
657
658      defmodule GenServerTaskExample do
659        use GenServer
660
661        def start_link(opts) do
662          GenServer.start_link(__MODULE__, :ok, opts)
663        end
664
665        def init(_opts) do
666          # We will keep all running tasks in a map
667          {:ok, %{tasks: %{}}}
668        end
669
670        # Imagine we invoke a task from the GenServer to access a URL...
671        def handle_call(:some_message, _from, state) do
672          url = ...
673          task = Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn -> fetch_url(url) end)
674
675          # After we start the task, we store its reference and the url it is fetching
676          state = put_in(state.tasks[task.ref], url)
677
678          {:reply, :ok, state}
679        end
680
681        # If the task succeeds...
682        def handle_info({ref, result}, state) do
683          # The task succeed so we can cancel the monitoring and discard the DOWN message
684          Process.demonitor(ref, [:flush])
685
686          {url, state} = pop_in(state.tasks[ref])
687          IO.puts "Got #{inspect(result)} for URL #{inspect url}"
688          {:noreply, state}
689        end
690
691        # If the task fails...
692        def handle_info({:DOWN, ref, _, _, reason}, state) do
693          {url, state} = pop_in(state.tasks[ref])
694          IO.puts "URL #{inspect url} failed with reason #{inspect(reason)}"
695          {:noreply, state}
696        end
697      end
698
699  With the server defined, you will want to start the task supervisor
700  above and the GenServer in your supervision tree:
701
702      children = [
703        {Task.Supervisor, name: MyApp.TaskSupervisor},
704        {GenServerTaskExample, name: MyApp.GenServerTaskExample}
705      ]
706
707      Supervisor.start_link(children, strategy: :one_for_one)
708
709  """
710  @spec await(t, timeout) :: term
711  def await(%Task{ref: ref, owner: owner} = task, timeout \\ 5000) when is_timeout(timeout) do
712    if owner != self() do
713      raise ArgumentError, invalid_owner_error(task)
714    end
715
716    receive do
717      {^ref, reply} ->
718        Process.demonitor(ref, [:flush])
719        reply
720
721      {:DOWN, ^ref, _, proc, reason} ->
722        exit({reason(reason, proc), {__MODULE__, :await, [task, timeout]}})
723    after
724      timeout ->
725        Process.demonitor(ref, [:flush])
726        exit({:timeout, {__MODULE__, :await, [task, timeout]}})
727    end
728  end
729
730  @doc """
731  Awaits replies from multiple tasks and returns them.
732
733  This function receives a list of tasks and waits for their replies in the
734  given time interval. It returns a list of the results, in the same order as
735  the tasks supplied in the `tasks` input argument.
736
737  If any of the task processes dies, the current process will exit with the
738  same reason as that task.
739
740  A timeout, in milliseconds or `:infinity`, can be given with a default value
741  of `5000`. If the timeout is exceeded, then the current process will exit.
742  Any task processes that are linked to the current process (which is the case
743  when a task is started with `async`) will also exit. Any task processes that
744  are trapping exits or not linked to the current process will continue to run.
745
746  This function assumes the tasks' monitors are still active or the monitors'
747  `:DOWN` message is in the message queue. If any tasks have been demonitored,
748  or the message already received, this function will wait for the duration of
749  the timeout.
750
751  This function can only be called once for any given task. If you want to be
752  able to check multiple times if a long-running task has finished its
753  computation, use `yield_many/2` instead.
754
755  ## Compatibility with OTP behaviours
756
757  It is not recommended to `await` long-running tasks inside an OTP behaviour
758  such as `GenServer`. See `await/2` for more information.
759
760  ## Examples
761
762      iex> tasks = [
763      ...>   Task.async(fn -> 1 + 1 end),
764      ...>   Task.async(fn -> 2 + 3 end)
765      ...> ]
766      iex> Task.await_many(tasks)
767      [2, 5]
768
769  """
770  @doc since: "1.11.0"
771  @spec await_many([t], timeout) :: [term]
772  def await_many(tasks, timeout \\ 5000) when is_timeout(timeout) do
773    awaiting =
774      for task <- tasks, into: %{} do
775        %Task{ref: ref, owner: owner} = task
776
777        if owner != self() do
778          raise ArgumentError, invalid_owner_error(task)
779        end
780
781        {ref, true}
782      end
783
784    timeout_ref = make_ref()
785
786    timer_ref =
787      if timeout != :infinity do
788        Process.send_after(self(), timeout_ref, timeout)
789      end
790
791    try do
792      await_many(tasks, timeout, awaiting, %{}, timeout_ref)
793    after
794      timer_ref && Process.cancel_timer(timer_ref)
795      receive do: (^timeout_ref -> :ok), after: (0 -> :ok)
796    end
797  end
798
799  defp await_many(tasks, _timeout, awaiting, replies, _timeout_ref)
800       when map_size(awaiting) == 0 do
801    for %{ref: ref} <- tasks, do: Map.fetch!(replies, ref)
802  end
803
804  defp await_many(tasks, timeout, awaiting, replies, timeout_ref) do
805    receive do
806      ^timeout_ref ->
807        demonitor_pending_tasks(awaiting)
808        exit({:timeout, {__MODULE__, :await_many, [tasks, timeout]}})
809
810      {:DOWN, ref, _, proc, reason} when is_map_key(awaiting, ref) ->
811        demonitor_pending_tasks(awaiting)
812        exit({reason(reason, proc), {__MODULE__, :await_many, [tasks, timeout]}})
813
814      {ref, reply} when is_map_key(awaiting, ref) ->
815        Process.demonitor(ref, [:flush])
816
817        await_many(
818          tasks,
819          timeout,
820          Map.delete(awaiting, ref),
821          Map.put(replies, ref, reply),
822          timeout_ref
823        )
824    end
825  end
826
827  defp demonitor_pending_tasks(awaiting) do
828    Enum.each(awaiting, fn {ref, _} ->
829      Process.demonitor(ref, [:flush])
830    end)
831  end
832
833  @doc false
834  @deprecated "Pattern match directly on the message instead"
835  def find(tasks, {ref, reply}) when is_reference(ref) do
836    Enum.find_value(tasks, fn
837      %Task{ref: ^ref} = task ->
838        Process.demonitor(ref, [:flush])
839        {reply, task}
840
841      %Task{} ->
842        nil
843    end)
844  end
845
846  def find(tasks, {:DOWN, ref, _, proc, reason} = msg) when is_reference(ref) do
847    find = fn %Task{ref: task_ref} -> task_ref == ref end
848
849    if Enum.find(tasks, find) do
850      exit({reason(reason, proc), {__MODULE__, :find, [tasks, msg]}})
851    end
852  end
853
854  def find(_tasks, _msg) do
855    nil
856  end
857
858  @doc ~S"""
859  Temporarily blocks the current process waiting for a task reply.
860
861  Returns `{:ok, reply}` if the reply is received, `nil` if
862  no reply has arrived, or `{:exit, reason}` if the task has already
863  exited. Keep in mind that normally a task failure also causes
864  the process owning the task to exit. Therefore this function can
865  return `{:exit, reason}` only if
866
867    * the task process exited with the reason `:normal`
868    * it isn't linked to the caller
869    * the caller is trapping exits
870
871  A timeout, in milliseconds or `:infinity`, can be given with a default value
872  of `5000`. If the time runs out before a message from the task is received,
873  this function will return `nil` and the monitor will remain active. Therefore
874  `yield/2` can be called multiple times on the same task.
875
876  This function assumes the task's monitor is still active or the
877  monitor's `:DOWN` message is in the message queue. If it has been
878  demonitored or the message already received, this function will wait
879  for the duration of the timeout awaiting the message.
880
881  If you intend to shut the task down if it has not responded within `timeout`
882  milliseconds, you should chain this together with `shutdown/1`, like so:
883
884      case Task.yield(task, timeout) || Task.shutdown(task) do
885        {:ok, result} ->
886          result
887
888        nil ->
889          Logger.warn("Failed to get a result in #{timeout}ms")
890          nil
891      end
892
893  That ensures that if the task completes after the `timeout` but before `shutdown/1`
894  has been called, you will still get the result, since `shutdown/1` is designed to
895  handle this case and return the result.
896  """
897  @spec yield(t, timeout) :: {:ok, term} | {:exit, term} | nil
898  def yield(%Task{ref: ref, owner: owner} = task, timeout \\ 5000) when is_timeout(timeout) do
899    if owner != self() do
900      raise ArgumentError, invalid_owner_error(task)
901    end
902
903    receive do
904      {^ref, reply} ->
905        Process.demonitor(ref, [:flush])
906        {:ok, reply}
907
908      {:DOWN, ^ref, _, proc, :noconnection} ->
909        exit({reason(:noconnection, proc), {__MODULE__, :yield, [task, timeout]}})
910
911      {:DOWN, ^ref, _, _, reason} ->
912        {:exit, reason}
913    after
914      timeout ->
915        nil
916    end
917  end
918
919  @doc """
920  Yields to multiple tasks in the given time interval.
921
922  This function receives a list of tasks and waits for their
923  replies in the given time interval. It returns a list
924  of two-element tuples, with the task as the first element
925  and the yielded result as the second. The tasks in the returned
926  list will be in the same order as the tasks supplied in the `tasks`
927  input argument.
928
929  Similarly to `yield/2`, each task's result will be
930
931    * `{:ok, term}` if the task has successfully reported its
932      result back in the given time interval
933    * `{:exit, reason}` if the task has died
934    * `nil` if the task keeps running past the timeout
935
936  A timeout, in milliseconds or `:infinity`, can be given with a default value
937  of `5000`.
938
939  Check `yield/2` for more information.
940
941  ## Example
942
943  `Task.yield_many/2` allows developers to spawn multiple tasks
944  and retrieve the results received in a given timeframe.
945  If we combine it with `Task.shutdown/2`, it allows us to gather
946  those results and cancel the tasks that have not replied in time.
947
948  Let's see an example.
949
950      tasks =
951        for i <- 1..10 do
952          Task.async(fn ->
953            Process.sleep(i * 1000)
954            i
955          end)
956        end
957
958      tasks_with_results = Task.yield_many(tasks, 5000)
959
960      results =
961        Enum.map(tasks_with_results, fn {task, res} ->
962          # Shut down the tasks that did not reply nor exit
963          res || Task.shutdown(task, :brutal_kill)
964        end)
965
966      # Here we are matching only on {:ok, value} and
967      # ignoring {:exit, _} (crashed tasks) and `nil` (no replies)
968      for {:ok, value} <- results do
969        IO.inspect(value)
970      end
971
972  In the example above, we create tasks that sleep from 1
973  up to 10 seconds and return the number of seconds they slept for.
974  If you execute the code all at once, you should see 1 up to 5
975  printed, as those were the tasks that have replied in the
976  given time. All other tasks will have been shut down using
977  the `Task.shutdown/2` call.
978  """
979  @spec yield_many([t], timeout) :: [{t, {:ok, term} | {:exit, term} | nil}]
980  def yield_many(tasks, timeout \\ 5000) when is_timeout(timeout) do
981    timeout_ref = make_ref()
982
983    timer_ref =
984      if timeout != :infinity do
985        Process.send_after(self(), timeout_ref, timeout)
986      end
987
988    try do
989      yield_many(tasks, timeout_ref, :infinity)
990    catch
991      {:noconnection, reason} ->
992        exit({reason, {__MODULE__, :yield_many, [tasks, timeout]}})
993    after
994      timer_ref && Process.cancel_timer(timer_ref)
995      receive do: (^timeout_ref -> :ok), after: (0 -> :ok)
996    end
997  end
998
999  defp yield_many([%Task{ref: ref, owner: owner} = task | rest], timeout_ref, timeout) do
1000    if owner != self() do
1001      raise ArgumentError, invalid_owner_error(task)
1002    end
1003
1004    receive do
1005      {^ref, reply} ->
1006        Process.demonitor(ref, [:flush])
1007        [{task, {:ok, reply}} | yield_many(rest, timeout_ref, timeout)]
1008
1009      {:DOWN, ^ref, _, proc, :noconnection} ->
1010        throw({:noconnection, reason(:noconnection, proc)})
1011
1012      {:DOWN, ^ref, _, _, reason} ->
1013        [{task, {:exit, reason}} | yield_many(rest, timeout_ref, timeout)]
1014
1015      ^timeout_ref ->
1016        [{task, nil} | yield_many(rest, timeout_ref, 0)]
1017    after
1018      timeout ->
1019        [{task, nil} | yield_many(rest, timeout_ref, 0)]
1020    end
1021  end
1022
1023  defp yield_many([], _timeout_ref, _timeout) do
1024    []
1025  end
1026
1027  @doc """
1028  Unlinks and shuts down the task, and then checks for a reply.
1029
1030  Returns `{:ok, reply}` if the reply is received while shutting down the task,
1031  `{:exit, reason}` if the task died, otherwise `nil`.
1032
1033  The second argument is either a timeout or `:brutal_kill`. In case
1034  of a timeout, a `:shutdown` exit signal is sent to the task process
1035  and if it does not exit within the timeout, it is killed. With `:brutal_kill`
1036  the task is killed straight away. In case the task terminates abnormally
1037  (possibly killed by another process), this function will exit with the same reason.
1038
1039  It is not required to call this function when terminating the caller, unless
1040  exiting with reason `:normal` or if the task is trapping exits. If the caller is
1041  exiting with a reason other than `:normal` and the task is not trapping exits, the
1042  caller's exit signal will stop the task. The caller can exit with reason
1043  `:shutdown` to shut down all of its linked processes, including tasks, that
1044  are not trapping exits without generating any log messages.
1045
1046  If a task's monitor has already been demonitored or received and there is not
1047  a response waiting in the message queue this function will return
1048  `{:exit, :noproc}` as the result or exit reason can not be determined.
1049  """
1050  @spec shutdown(t, timeout | :brutal_kill) :: {:ok, term} | {:exit, term} | nil
1051  def shutdown(task, shutdown \\ 5000)
1052
1053  def shutdown(%Task{pid: nil} = task, _) do
1054    raise ArgumentError, "task #{inspect(task)} does not have an associated task process"
1055  end
1056
1057  def shutdown(%Task{owner: owner} = task, _) when owner != self() do
1058    raise ArgumentError, invalid_owner_error(task)
1059  end
1060
1061  def shutdown(%Task{pid: pid} = task, :brutal_kill) do
1062    mon = Process.monitor(pid)
1063    exit(pid, :kill)
1064
1065    case shutdown_receive(task, mon, :brutal_kill, :infinity) do
1066      {:down, proc, :noconnection} ->
1067        exit({reason(:noconnection, proc), {__MODULE__, :shutdown, [task, :brutal_kill]}})
1068
1069      {:down, _, reason} ->
1070        {:exit, reason}
1071
1072      result ->
1073        result
1074    end
1075  end
1076
1077  def shutdown(%Task{pid: pid} = task, timeout) when is_timeout(timeout) do
1078    mon = Process.monitor(pid)
1079    exit(pid, :shutdown)
1080
1081    case shutdown_receive(task, mon, :shutdown, timeout) do
1082      {:down, proc, :noconnection} ->
1083        exit({reason(:noconnection, proc), {__MODULE__, :shutdown, [task, timeout]}})
1084
1085      {:down, _, reason} ->
1086        {:exit, reason}
1087
1088      result ->
1089        result
1090    end
1091  end
1092
1093  ## Helpers
1094
1095  defp reason(:noconnection, proc), do: {:nodedown, monitor_node(proc)}
1096  defp reason(reason, _), do: reason
1097
1098  defp monitor_node(pid) when is_pid(pid), do: node(pid)
1099  defp monitor_node({_, node}), do: node
1100
1101  # spawn a process to ensure task gets exit signal if process dies from exit signal
1102  # between unlink and exit.
1103  defp exit(task, reason) do
1104    caller = self()
1105    ref = make_ref()
1106    enforcer = spawn(fn -> enforce_exit(task, reason, caller, ref) end)
1107    Process.unlink(task)
1108    Process.exit(task, reason)
1109    send(enforcer, {:done, ref})
1110    :ok
1111  end
1112
1113  defp enforce_exit(pid, reason, caller, ref) do
1114    mon = Process.monitor(caller)
1115
1116    receive do
1117      {:done, ^ref} -> :ok
1118      {:DOWN, ^mon, _, _, _} -> Process.exit(pid, reason)
1119    end
1120  end
1121
1122  defp shutdown_receive(%{ref: ref} = task, mon, type, timeout) do
1123    receive do
1124      {:DOWN, ^mon, _, _, :shutdown} when type in [:shutdown, :timeout_kill] ->
1125        Process.demonitor(ref, [:flush])
1126        flush_reply(ref)
1127
1128      {:DOWN, ^mon, _, _, :killed} when type == :brutal_kill ->
1129        Process.demonitor(ref, [:flush])
1130        flush_reply(ref)
1131
1132      {:DOWN, ^mon, _, proc, :noproc} ->
1133        reason = flush_noproc(ref, proc, type)
1134        flush_reply(ref) || reason
1135
1136      {:DOWN, ^mon, _, proc, reason} ->
1137        Process.demonitor(ref, [:flush])
1138        flush_reply(ref) || {:down, proc, reason}
1139    after
1140      timeout ->
1141        Process.exit(task.pid, :kill)
1142        shutdown_receive(task, mon, :timeout_kill, :infinity)
1143    end
1144  end
1145
1146  defp flush_reply(ref) do
1147    receive do
1148      {^ref, reply} -> {:ok, reply}
1149    after
1150      0 -> nil
1151    end
1152  end
1153
1154  defp flush_noproc(ref, proc, type) do
1155    receive do
1156      {:DOWN, ^ref, _, _, :shutdown} when type in [:shutdown, :timeout_kill] ->
1157        nil
1158
1159      {:DOWN, ^ref, _, _, :killed} when type == :brutal_kill ->
1160        nil
1161
1162      {:DOWN, ^ref, _, _, reason} ->
1163        {:down, proc, reason}
1164    after
1165      0 ->
1166        Process.demonitor(ref, [:flush])
1167        {:down, proc, :noproc}
1168    end
1169  end
1170
1171  defp invalid_owner_error(task) do
1172    "task #{inspect(task)} must be queried from the owner but was queried from #{inspect(self())}"
1173  end
1174end
1175