1# Usage: mix run examples/consumer_supervisor.exs 2# 3# Hit Ctrl+C twice to stop it. 4 5defmodule Counter do 6 @moduledoc """ 7 This is a simple producer that counts from the given 8 number whenever there is a demand. 9 """ 10 11 use GenStage 12 13 def start_link(initial) when is_integer(initial) do 14 GenStage.start_link(__MODULE__, initial, name: __MODULE__) 15 end 16 17 ## Callbacks 18 19 def init(initial) do 20 {:producer, initial} 21 end 22 23 def handle_demand(demand, counter) when demand > 0 do 24 # If the counter is 3 and we ask for 2 items, we will 25 # emit the items 3 and 4, and set the state to 5. 26 events = Enum.to_list(counter..counter+demand-1) 27 {:noreply, events, counter + demand} 28 end 29end 30 31defmodule Consumer do 32 @moduledoc """ 33 A consumer will be a consumer supervisor that will 34 spawn printer tasks for each event. 35 """ 36 37 use ConsumerSupervisor 38 39 def start_link() do 40 ConsumerSupervisor.start_link(__MODULE__, :ok) 41 end 42 43 # Callbacks 44 45 def init(:ok) do 46 children = [ 47 worker(Printer, [], restart: :temporary) 48 ] 49 50 {:ok, children, strategy: :one_for_one, subscribe_to: [{Counter, max_demand: 50}]} 51 end 52end 53 54defmodule Printer do 55 def start_link(event) do 56 Task.start_link(fn -> 57 IO.inspect {self(), event} 58 end) 59 end 60end 61 62defmodule App do 63 @moduledoc """ 64 Your application entry-point. 65 66 For actual applications, start/0 should be start/2. 67 """ 68 69 def start do 70 import Supervisor.Spec 71 72 children = [ 73 worker(Counter, [0]), 74 # We can add as many consumer supervisors as consumers as we want! 75 worker(Consumer, [], id: 1) 76 ] 77 78 Supervisor.start_link(children, strategy: :one_for_one) 79 end 80end 81 82# Start the app and wait forever 83App.start 84Process.sleep(:infinity) 85