1defmodule Phoenix.Presence do
2  @moduledoc """
3  Provides Presence tracking to processes and channels.
4
5  This behaviour provides presence features such as fetching
6  presences for a given topic, as well as handling diffs of
7  join and leave events as they occur in real-time. Using this
8  module defines a supervisor and allows the calling module to
9  implement the `Phoenix.Tracker` behaviour which starts a
10  tracker process to handle presence information.
11
12  ## Example Usage
13
14  Start by defining a presence module within your application
15  which uses `Phoenix.Presence` and provide the `:otp_app` which
16  holds your configuration, as well as the `:pubsub_server`.
17
18      defmodule MyApp.Presence do
19        use Phoenix.Presence, otp_app: :my_app,
20                              pubsub_server: MyApp.PubSub
21      end
22
23  The `:pubsub_server` must point to an existing pubsub server
24  running in your application, which is included by default as
25  `MyApp.PubSub` for new applications.
26
27  Next, add the new supervisor to your supervision tree in `lib/my_app.ex`:
28
29      children = [
30        ...
31        MyApp.Presence,
32      ]
33
34  Once added, presences can be tracked in your channel after joining:
35
36      defmodule MyApp.MyChannel do
37        use MyAppWeb, :channel
38        alias MyApp.Presence
39
40        def join("some:topic", _params, socket) do
41          send(self(), :after_join)
42          {:ok, assign(socket, :user_id, ...)}
43        end
44
45        def handle_info(:after_join, socket) do
46          push socket, "presence_state", Presence.list(socket)
47          {:ok, _} = Presence.track(socket, socket.assigns.user_id, %{
48            online_at: inspect(System.system_time(:seconds))
49          })
50          {:noreply, socket}
51        end
52      end
53
54  In the example above, the current presence information for
55  the socket's topic is pushed to the client as a `"presence_state"` event.
56  Next, `Presence.track` is used to register this
57  channel's process as a presence for the socket's user ID, with
58  a map of metadata.
59
60  Finally, a diff of presence join and leave events will be sent to the
61  client as they happen in real-time with the "presence_diff" event.
62  The diff structure will be a map of `:joins` and `:leaves` of the form:
63
64      %{joins: %{"123" => %{metas: [%{status: "away", phx_ref: ...}]},
65        leaves: %{"456" => %{metas: [%{status: "online", phx_ref: ...}]},
66
67  See `Phoenix.Presence.list/2` for more information on the presence
68  datastructure.
69
70  ## Fetching Presence Information
71
72  Presence metadata should be minimized and used to store small,
73  ephemeral state, such as a user's "online" or "away" status.
74  More detailed information, such as user details that need to
75  be fetched from the database, can be achieved by overriding the `fetch/2`
76  function. The `fetch/2` callback is triggered when using `list/1`
77  and serves as a mechanism to fetch presence information a single time,
78  before broadcasting the information to all channel subscribers.
79  This prevents N query problems and gives you a single place to group
80  isolated data fetching to extend presence metadata. The function must
81  return a map of data matching the outlined Presence datastructure,
82  including the `:metas` key, but can extend the map of information
83  to include any additional information. For example:
84
85      def fetch(_topic, entries) do
86        query =
87          from u in User,
88            where: u.id in ^Map.keys(entries),
89            select: {u.id, u}
90
91        users = query |> Repo.all |> Enum.into(%{})
92
93        for {key, %{metas: metas}} <- entries, into: %{} do
94          {key, %{metas: metas, user: users[key]}}
95        end
96      end
97
98  The function above fetches all users from the database who
99  have registered presences for the given topic. The fetched
100  information is then extended with a `:user` key of the user's
101  information, while maintaining the required `:metas` field from the
102  original presence data.
103  """
104  alias Phoenix.Socket.Broadcast
105
106  @type presences :: %{ String.t => %{metas: [map()]}}
107  @type presence :: %{key: String.t, meta: map()}
108  @type topic :: String.t
109
110  @callback start_link(Keyword.t) :: {:ok, pid()} | {:error, reason :: term()} :: :ignore
111  @callback init(Keyword.t) :: {:ok, pid()} | {:error, reason :: term}
112  @callback track(Phoenix.Socket.t, key :: String.t, meta :: map()) :: {:ok, binary()} | {:error, reason :: term()}
113  @callback track(pid, topic, key :: String.t, meta :: map()) :: {:ok, binary()} | {:error, reason :: term()}
114  @callback untrack(Phoenix.Socket.t, key :: String.t) :: :ok
115  @callback untrack(pid, topic, key :: String.t) :: :ok
116  @callback update(Phoenix.Socket.t, key :: String.t, meta :: map() | (map() -> map())) :: {:ok, binary()} | {:error, reason :: term()}
117  @callback update(pid, topic, key :: String.t, meta :: map() | (map() -> map())) :: {:ok, binary()} | {:error, reason :: term()}
118  @callback fetch(topic, presences) :: presences
119  @callback list(topic) :: presences
120  @callback handle_diff(%{topic => {joins :: presences, leaves :: presences}}, state :: term) :: {:ok, state :: term}
121
122  defmacro __using__(opts) do
123    quote do
124      @opts unquote(opts)
125      @otp_app @opts[:otp_app] || raise "presence expects :otp_app to be given"
126      @behaviour unquote(__MODULE__)
127      @task_supervisor Module.concat(__MODULE__, TaskSupervisor)
128
129      @doc false
130      def child_spec(opts) do
131        %{
132          id: __MODULE__,
133          start: {__MODULE__, :start_link, [opts]},
134          type: :supervisor
135        }
136      end
137
138      def start_link(opts \\ []) do
139        opts = Keyword.merge(@opts, opts)
140        Phoenix.Presence.start_link(__MODULE__, @otp_app, @task_supervisor, opts)
141      end
142
143      def init(opts) do
144        server = Keyword.fetch!(opts, :pubsub_server)
145        {:ok, %{pubsub_server: server,
146                node_name: Phoenix.PubSub.node_name(server),
147                task_sup: @task_supervisor}}
148      end
149
150      def track(%Phoenix.Socket{} = socket, key, meta) do
151        track(socket.channel_pid, socket.topic, key, meta)
152      end
153      def track(pid, topic, key, meta) do
154        Phoenix.Tracker.track(__MODULE__, pid, topic, key, meta)
155      end
156
157      def untrack(%Phoenix.Socket{} = socket, key) do
158        untrack(socket.channel_pid, socket.topic, key)
159      end
160      def untrack(pid, topic, key) do
161        Phoenix.Tracker.untrack(__MODULE__, pid, topic, key)
162      end
163
164      def update(%Phoenix.Socket{} = socket, key, meta) do
165        update(socket.channel_pid, socket.topic, key, meta)
166      end
167      def update(pid, topic, key, meta) do
168        Phoenix.Tracker.update(__MODULE__, pid, topic, key, meta)
169      end
170
171      def fetch(_topic, presences), do: presences
172
173      def list(%Phoenix.Socket{topic: topic}), do: list(topic)
174      def list(topic) do
175        Phoenix.Presence.list(__MODULE__, topic)
176      end
177
178      def handle_diff(diff, state) do
179        Phoenix.Presence.handle_diff(__MODULE__,
180          diff, state.node_name, state.pubsub_server, state.task_sup
181        )
182        {:ok, state}
183      end
184
185      defoverridable fetch: 2, child_spec: 1
186    end
187  end
188
189  @doc false
190  def start_link(module, otp_app, task_supervisor, opts) do
191    import Supervisor.Spec
192    opts =
193      opts
194      |> Keyword.merge(Application.get_env(otp_app, module) || [])
195      |> Keyword.put(:name, module)
196
197    children = [
198      supervisor(Task.Supervisor, [[name: task_supervisor]]),
199      worker(Phoenix.Tracker, [module, opts, opts])
200    ]
201    Supervisor.start_link(children, strategy: :one_for_one)
202  end
203
204  @doc false
205  def handle_diff(module, diff, node_name, pubsub_server, sup_name) do
206    Task.Supervisor.start_child(sup_name, fn ->
207      for {topic, {joins, leaves}} <- diff do
208        msg = %Broadcast{topic: topic, event: "presence_diff", payload: %{
209          joins: module.fetch(topic, group(joins)),
210          leaves: module.fetch(topic, group(leaves))
211        }}
212        Phoenix.PubSub.direct_broadcast!(node_name, pubsub_server, topic, msg)
213      end
214    end)
215  end
216
217  @doc """
218  Returns presences for a topic.
219
220  ## Presence datastructure
221
222  The presence information is returned as a map with presences grouped
223  by key, cast as a string, and accumulated metadata, with the following form:
224
225      %{key => %{metas: [%{phx_ref: ..., ...}, ...]}}
226
227  For example, imagine a user with id `123` online from two
228  different devices, as well as a user with id `456` online from
229  just one device. The following presence information might be returned:
230
231      %{"123" => %{metas: [%{status: "away", phx_ref: ...},
232                           %{status: "online", phx_ref: ...}]},
233        "456" => %{metas: [%{status: "online", phx_ref: ...}]}}
234
235  The keys of the map will usually point to a resource ID. The value
236  will contain a map with a `:metas` key containing a list of metadata
237  for each resource. Additionally, every metadata entry will contain a
238  `:phx_ref` key which can be used to uniquely identify metadata for a
239  given key. In the event that the metadata was previously updated,
240  a `:phx_ref_prev` key will be present containing the previous
241  `:phx_ref` value.
242  """
243  def list(module, topic) do
244    grouped =
245      module
246      |> Phoenix.Tracker.list(topic)
247      |> group()
248
249    module.fetch(topic, grouped)
250  end
251
252  defp group(presences) do
253    presences
254    |> Enum.reverse()
255    |> Enum.reduce(%{}, fn {key, meta}, acc ->
256      Map.update(acc, to_string(key), %{metas: [meta]}, fn %{metas: metas} ->
257        %{metas: [meta | metas]}
258      end)
259    end)
260  end
261end
262