1# Usage: mix run examples/consumer_supervisor.exs
2#
3# Hit Ctrl+C twice to stop it.
4
5defmodule Counter do
6  @moduledoc """
7  This is a simple producer that counts from the given
8  number whenever there is a demand.
9  """
10
11  use GenStage
12
13  def start_link(initial) when is_integer(initial) do
14    GenStage.start_link(__MODULE__, initial, name: __MODULE__)
15  end
16
17  ## Callbacks
18
19  def init(initial) do
20    {:producer, initial}
21  end
22
23  def handle_demand(demand, counter) when demand > 0 do
24    # If the counter is 3 and we ask for 2 items, we will
25    # emit the items 3 and 4, and set the state to 5.
26    events = Enum.to_list(counter..counter+demand-1)
27    {:noreply, events, counter + demand}
28  end
29end
30
31defmodule Consumer do
32  @moduledoc """
33  A consumer will be a consumer supervisor that will
34  spawn printer tasks for each event.
35  """
36
37  use ConsumerSupervisor
38
39  def start_link() do
40    ConsumerSupervisor.start_link(__MODULE__, :ok)
41  end
42
43  # Callbacks
44
45  def init(:ok) do
46    children = [
47      worker(Printer, [], restart: :temporary)
48    ]
49
50    {:ok, children, strategy: :one_for_one, subscribe_to: [{Counter, max_demand: 50}]}
51  end
52end
53
54defmodule Printer do
55  def start_link(event) do
56    Task.start_link(fn ->
57      IO.inspect {self(), event}
58    end)
59  end
60end
61
62defmodule App do
63  @moduledoc """
64  Your application entry-point.
65
66  For actual applications, start/0 should be start/2.
67  """
68
69  def start do
70    import Supervisor.Spec
71
72    children = [
73      worker(Counter, [0]),
74      # We can add as many consumer supervisors as consumers as we want!
75      worker(Consumer, [], id: 1)
76    ]
77
78    Supervisor.start_link(children, strategy: :one_for_one)
79  end
80end
81
82# Start the app and wait forever
83App.start
84Process.sleep(:infinity)
85