1defmodule Phoenix.Presence do
2  @moduledoc """
3  Provides Presence tracking to processes and channels.
5  This behaviour provides presence features such as fetching
6  presences for a given topic, as well as handling diffs of
7  join and leave events as they occur in real-time. Using this
8  module defines a supervisor and allows the calling module to
9  implement the `Phoenix.Tracker` behaviour which starts a
10  tracker process to handle presence information.
12  ## Example Usage
14  Start by defining a presence module within your application
15  which uses `Phoenix.Presence` and provide the `:otp_app` which
16  holds your configuration, as well as the `:pubsub_server`.
18      defmodule MyApp.Presence do
19        use Phoenix.Presence, otp_app: :my_app,
20                              pubsub_server: MyApp.PubSub
21      end
23  The `:pubsub_server` must point to an existing pubsub server
24  running in your application, which is included by default as
25  `MyApp.PubSub` for new applications.
27  Next, add the new supervisor to your supervision tree in `lib/my_app.ex`:
29      children = [
30        ...
31        MyApp.Presence,
32      ]
34  Once added, presences can be tracked in your channel after joining:
36      defmodule MyApp.MyChannel do
37        use MyAppWeb, :channel
38        alias MyApp.Presence
40        def join("some:topic", _params, socket) do
41          send(self(), :after_join)
42          {:ok, assign(socket, :user_id, ...)}
43        end
45        def handle_info(:after_join, socket) do
46          push socket, "presence_state", Presence.list(socket)
47          {:ok, _} = Presence.track(socket, socket.assigns.user_id, %{
48            online_at: inspect(System.system_time(:seconds))
49          })
50          {:noreply, socket}
51        end
52      end
54  In the example above, the current presence information for
55  the socket's topic is pushed to the client as a `"presence_state"` event.
56  Next, `Presence.track` is used to register this
57  channel's process as a presence for the socket's user ID, with
58  a map of metadata.
60  Finally, a diff of presence join and leave events will be sent to the
61  client as they happen in real-time with the "presence_diff" event.
62  The diff structure will be a map of `:joins` and `:leaves` of the form:
64      %{joins: %{"123" => %{metas: [%{status: "away", phx_ref: ...}]},
65        leaves: %{"456" => %{metas: [%{status: "online", phx_ref: ...}]},
67  See `Phoenix.Presence.list/2` for more information on the presence
68  datastructure.
70  ## Fetching Presence Information
72  Presence metadata should be minimized and used to store small,
73  ephemeral state, such as a user's "online" or "away" status.
74  More detailed information, such as user details that need to
75  be fetched from the database, can be achieved by overriding the `fetch/2`
76  function. The `fetch/2` callback is triggered when using `list/1`
77  and serves as a mechanism to fetch presence information a single time,
78  before broadcasting the information to all channel subscribers.
79  This prevents N query problems and gives you a single place to group
80  isolated data fetching to extend presence metadata. The function must
81  return a map of data matching the outlined Presence datastructure,
82  including the `:metas` key, but can extend the map of information
83  to include any additional information. For example:
85      def fetch(_topic, entries) do
86        query =
87          from u in User,
88            where: u.id in ^Map.keys(entries),
89            select: {u.id, u}
91        users = query |> Repo.all |> Enum.into(%{})
93        for {key, %{metas: metas}} <- entries, into: %{} do
94          {key, %{metas: metas, user: users[key]}}
95        end
96      end
98  The function above fetches all users from the database who
99  have registered presences for the given topic. The fetched
100  information is then extended with a `:user` key of the user's
101  information, while maintaining the required `:metas` field from the
102  original presence data.
103  """
104  alias Phoenix.Socket.Broadcast
106  @type presences :: %{ String.t => %{metas: [map()]}}
107  @type presence :: %{key: String.t, meta: map()}
108  @type topic :: String.t
110  @callback start_link(Keyword.t) :: {:ok, pid()} | {:error, reason :: term()} :: :ignore
111  @callback init(Keyword.t) :: {:ok, pid()} | {:error, reason :: term}
112  @callback track(Phoenix.Socket.t, key :: String.t, meta :: map()) :: {:ok, binary()} | {:error, reason :: term()}
113  @callback track(pid, topic, key :: String.t, meta :: map()) :: {:ok, binary()} | {:error, reason :: term()}
114  @callback untrack(Phoenix.Socket.t, key :: String.t) :: :ok
115  @callback untrack(pid, topic, key :: String.t) :: :ok
116  @callback update(Phoenix.Socket.t, key :: String.t, meta :: map() | (map() -> map())) :: {:ok, binary()} | {:error, reason :: term()}
117  @callback update(pid, topic, key :: String.t, meta :: map() | (map() -> map())) :: {:ok, binary()} | {:error, reason :: term()}
118  @callback fetch(topic, presences) :: presences
119  @callback list(topic) :: presences
120  @callback handle_diff(%{topic => {joins :: presences, leaves :: presences}}, state :: term) :: {:ok, state :: term}
122  defmacro __using__(opts) do
123    quote do
124      @opts unquote(opts)
125      @otp_app @opts[:otp_app] || raise "presence expects :otp_app to be given"
126      @behaviour unquote(__MODULE__)
127      @task_supervisor Module.concat(__MODULE__, TaskSupervisor)
129      @doc false
130      def child_spec(opts) do
131        %{
132          id: __MODULE__,
133          start: {__MODULE__, :start_link, [opts]},
134          type: :supervisor
135        }
136      end
138      def start_link(opts \\ []) do
139        opts = Keyword.merge(@opts, opts)
140        Phoenix.Presence.start_link(__MODULE__, @otp_app, @task_supervisor, opts)
141      end
143      def init(opts) do
144        server = Keyword.fetch!(opts, :pubsub_server)
145        {:ok, %{pubsub_server: server,
146                node_name: Phoenix.PubSub.node_name(server),
147                task_sup: @task_supervisor}}
148      end
150      def track(%Phoenix.Socket{} = socket, key, meta) do
151        track(socket.channel_pid, socket.topic, key, meta)
152      end
153      def track(pid, topic, key, meta) do
154        Phoenix.Tracker.track(__MODULE__, pid, topic, key, meta)
155      end
157      def untrack(%Phoenix.Socket{} = socket, key) do
158        untrack(socket.channel_pid, socket.topic, key)
159      end
160      def untrack(pid, topic, key) do
161        Phoenix.Tracker.untrack(__MODULE__, pid, topic, key)
162      end
164      def update(%Phoenix.Socket{} = socket, key, meta) do
165        update(socket.channel_pid, socket.topic, key, meta)
166      end
167      def update(pid, topic, key, meta) do
168        Phoenix.Tracker.update(__MODULE__, pid, topic, key, meta)
169      end
171      def fetch(_topic, presences), do: presences
173      def list(%Phoenix.Socket{topic: topic}), do: list(topic)
174      def list(topic) do
175        Phoenix.Presence.list(__MODULE__, topic)
176      end
178      def handle_diff(diff, state) do
179        Phoenix.Presence.handle_diff(__MODULE__,
180          diff, state.node_name, state.pubsub_server, state.task_sup
181        )
182        {:ok, state}
183      end
185      defoverridable fetch: 2, child_spec: 1
186    end
187  end
189  @doc false
190  def start_link(module, otp_app, task_supervisor, opts) do
191    import Supervisor.Spec
192    opts =
193      opts
194      |> Keyword.merge(Application.get_env(otp_app, module) || [])
195      |> Keyword.put(:name, module)
197    children = [
198      supervisor(Task.Supervisor, [[name: task_supervisor]]),
199      worker(Phoenix.Tracker, [module, opts, opts])
200    ]
201    Supervisor.start_link(children, strategy: :one_for_one)
202  end
204  @doc false
205  def handle_diff(module, diff, node_name, pubsub_server, sup_name) do
206    Task.Supervisor.start_child(sup_name, fn ->
207      for {topic, {joins, leaves}} <- diff do
208        msg = %Broadcast{topic: topic, event: "presence_diff", payload: %{
209          joins: module.fetch(topic, group(joins)),
210          leaves: module.fetch(topic, group(leaves))
211        }}
212        Phoenix.PubSub.direct_broadcast!(node_name, pubsub_server, topic, msg)
213      end
214    end)
215  end
217  @doc """
218  Returns presences for a topic.
220  ## Presence datastructure
222  The presence information is returned as a map with presences grouped
223  by key, cast as a string, and accumulated metadata, with the following form:
225      %{key => %{metas: [%{phx_ref: ..., ...}, ...]}}
227  For example, imagine a user with id `123` online from two
228  different devices, as well as a user with id `456` online from
229  just one device. The following presence information might be returned:
231      %{"123" => %{metas: [%{status: "away", phx_ref: ...},
232                           %{status: "online", phx_ref: ...}]},
233        "456" => %{metas: [%{status: "online", phx_ref: ...}]}}
235  The keys of the map will usually point to a resource ID. The value
236  will contain a map with a `:metas` key containing a list of metadata
237  for each resource. Additionally, every metadata entry will contain a
238  `:phx_ref` key which can be used to uniquely identify metadata for a
239  given key. In the event that the metadata was previously updated,
240  a `:phx_ref_prev` key will be present containing the previous
241  `:phx_ref` value.
242  """
243  def list(module, topic) do
244    grouped =
245      module
246      |> Phoenix.Tracker.list(topic)
247      |> group()
249    module.fetch(topic, grouped)
250  end
252  defp group(presences) do
253    presences
254    |> Enum.reverse()
255    |> Enum.reduce(%{}, fn {key, meta}, acc ->
256      Map.update(acc, to_string(key), %{metas: [meta]}, fn %{metas: metas} ->
257        %{metas: [meta | metas]}
258      end)
259    end)
260  end