1defmodule Phoenix.Presence do 2 @moduledoc """ 3 Provides Presence tracking to processes and channels. 4 5 This behaviour provides presence features such as fetching 6 presences for a given topic, as well as handling diffs of 7 join and leave events as they occur in real-time. Using this 8 module defines a supervisor and allows the calling module to 9 implement the `Phoenix.Tracker` behaviour which starts a 10 tracker process to handle presence information. 11 12 ## Example Usage 13 14 Start by defining a presence module within your application 15 which uses `Phoenix.Presence` and provide the `:otp_app` which 16 holds your configuration, as well as the `:pubsub_server`. 17 18 defmodule MyApp.Presence do 19 use Phoenix.Presence, otp_app: :my_app, 20 pubsub_server: MyApp.PubSub 21 end 22 23 The `:pubsub_server` must point to an existing pubsub server 24 running in your application, which is included by default as 25 `MyApp.PubSub` for new applications. 26 27 Next, add the new supervisor to your supervision tree in `lib/my_app.ex`: 28 29 children = [ 30 ... 31 MyApp.Presence, 32 ] 33 34 Once added, presences can be tracked in your channel after joining: 35 36 defmodule MyApp.MyChannel do 37 use MyAppWeb, :channel 38 alias MyApp.Presence 39 40 def join("some:topic", _params, socket) do 41 send(self(), :after_join) 42 {:ok, assign(socket, :user_id, ...)} 43 end 44 45 def handle_info(:after_join, socket) do 46 push socket, "presence_state", Presence.list(socket) 47 {:ok, _} = Presence.track(socket, socket.assigns.user_id, %{ 48 online_at: inspect(System.system_time(:seconds)) 49 }) 50 {:noreply, socket} 51 end 52 end 53 54 In the example above, the current presence information for 55 the socket's topic is pushed to the client as a `"presence_state"` event. 56 Next, `Presence.track` is used to register this 57 channel's process as a presence for the socket's user ID, with 58 a map of metadata. 59 60 Finally, a diff of presence join and leave events will be sent to the 61 client as they happen in real-time with the "presence_diff" event. 62 The diff structure will be a map of `:joins` and `:leaves` of the form: 63 64 %{joins: %{"123" => %{metas: [%{status: "away", phx_ref: ...}]}, 65 leaves: %{"456" => %{metas: [%{status: "online", phx_ref: ...}]}, 66 67 See `Phoenix.Presence.list/2` for more information on the presence 68 datastructure. 69 70 ## Fetching Presence Information 71 72 Presence metadata should be minimized and used to store small, 73 ephemeral state, such as a user's "online" or "away" status. 74 More detailed information, such as user details that need to 75 be fetched from the database, can be achieved by overriding the `fetch/2` 76 function. The `fetch/2` callback is triggered when using `list/1` 77 and serves as a mechanism to fetch presence information a single time, 78 before broadcasting the information to all channel subscribers. 79 This prevents N query problems and gives you a single place to group 80 isolated data fetching to extend presence metadata. The function must 81 return a map of data matching the outlined Presence datastructure, 82 including the `:metas` key, but can extend the map of information 83 to include any additional information. For example: 84 85 def fetch(_topic, entries) do 86 query = 87 from u in User, 88 where: u.id in ^Map.keys(entries), 89 select: {u.id, u} 90 91 users = query |> Repo.all |> Enum.into(%{}) 92 93 for {key, %{metas: metas}} <- entries, into: %{} do 94 {key, %{metas: metas, user: users[key]}} 95 end 96 end 97 98 The function above fetches all users from the database who 99 have registered presences for the given topic. The fetched 100 information is then extended with a `:user` key of the user's 101 information, while maintaining the required `:metas` field from the 102 original presence data. 103 """ 104 alias Phoenix.Socket.Broadcast 105 106 @type presences :: %{ String.t => %{metas: [map()]}} 107 @type presence :: %{key: String.t, meta: map()} 108 @type topic :: String.t 109 110 @callback start_link(Keyword.t) :: {:ok, pid()} | {:error, reason :: term()} :: :ignore 111 @callback init(Keyword.t) :: {:ok, pid()} | {:error, reason :: term} 112 @callback track(Phoenix.Socket.t, key :: String.t, meta :: map()) :: {:ok, binary()} | {:error, reason :: term()} 113 @callback track(pid, topic, key :: String.t, meta :: map()) :: {:ok, binary()} | {:error, reason :: term()} 114 @callback untrack(Phoenix.Socket.t, key :: String.t) :: :ok 115 @callback untrack(pid, topic, key :: String.t) :: :ok 116 @callback update(Phoenix.Socket.t, key :: String.t, meta :: map() | (map() -> map())) :: {:ok, binary()} | {:error, reason :: term()} 117 @callback update(pid, topic, key :: String.t, meta :: map() | (map() -> map())) :: {:ok, binary()} | {:error, reason :: term()} 118 @callback fetch(topic, presences) :: presences 119 @callback list(topic) :: presences 120 @callback handle_diff(%{topic => {joins :: presences, leaves :: presences}}, state :: term) :: {:ok, state :: term} 121 122 defmacro __using__(opts) do 123 quote do 124 @opts unquote(opts) 125 @otp_app @opts[:otp_app] || raise "presence expects :otp_app to be given" 126 @behaviour unquote(__MODULE__) 127 @task_supervisor Module.concat(__MODULE__, TaskSupervisor) 128 129 @doc false 130 def child_spec(opts) do 131 %{ 132 id: __MODULE__, 133 start: {__MODULE__, :start_link, [opts]}, 134 type: :supervisor 135 } 136 end 137 138 def start_link(opts \\ []) do 139 opts = Keyword.merge(@opts, opts) 140 Phoenix.Presence.start_link(__MODULE__, @otp_app, @task_supervisor, opts) 141 end 142 143 def init(opts) do 144 server = Keyword.fetch!(opts, :pubsub_server) 145 {:ok, %{pubsub_server: server, 146 node_name: Phoenix.PubSub.node_name(server), 147 task_sup: @task_supervisor}} 148 end 149 150 def track(%Phoenix.Socket{} = socket, key, meta) do 151 track(socket.channel_pid, socket.topic, key, meta) 152 end 153 def track(pid, topic, key, meta) do 154 Phoenix.Tracker.track(__MODULE__, pid, topic, key, meta) 155 end 156 157 def untrack(%Phoenix.Socket{} = socket, key) do 158 untrack(socket.channel_pid, socket.topic, key) 159 end 160 def untrack(pid, topic, key) do 161 Phoenix.Tracker.untrack(__MODULE__, pid, topic, key) 162 end 163 164 def update(%Phoenix.Socket{} = socket, key, meta) do 165 update(socket.channel_pid, socket.topic, key, meta) 166 end 167 def update(pid, topic, key, meta) do 168 Phoenix.Tracker.update(__MODULE__, pid, topic, key, meta) 169 end 170 171 def fetch(_topic, presences), do: presences 172 173 def list(%Phoenix.Socket{topic: topic}), do: list(topic) 174 def list(topic) do 175 Phoenix.Presence.list(__MODULE__, topic) 176 end 177 178 def handle_diff(diff, state) do 179 Phoenix.Presence.handle_diff(__MODULE__, 180 diff, state.node_name, state.pubsub_server, state.task_sup 181 ) 182 {:ok, state} 183 end 184 185 defoverridable fetch: 2, child_spec: 1 186 end 187 end 188 189 @doc false 190 def start_link(module, otp_app, task_supervisor, opts) do 191 import Supervisor.Spec 192 opts = 193 opts 194 |> Keyword.merge(Application.get_env(otp_app, module) || []) 195 |> Keyword.put(:name, module) 196 197 children = [ 198 supervisor(Task.Supervisor, [[name: task_supervisor]]), 199 worker(Phoenix.Tracker, [module, opts, opts]) 200 ] 201 Supervisor.start_link(children, strategy: :one_for_one) 202 end 203 204 @doc false 205 def handle_diff(module, diff, node_name, pubsub_server, sup_name) do 206 Task.Supervisor.start_child(sup_name, fn -> 207 for {topic, {joins, leaves}} <- diff do 208 msg = %Broadcast{topic: topic, event: "presence_diff", payload: %{ 209 joins: module.fetch(topic, group(joins)), 210 leaves: module.fetch(topic, group(leaves)) 211 }} 212 Phoenix.PubSub.direct_broadcast!(node_name, pubsub_server, topic, msg) 213 end 214 end) 215 end 216 217 @doc """ 218 Returns presences for a topic. 219 220 ## Presence datastructure 221 222 The presence information is returned as a map with presences grouped 223 by key, cast as a string, and accumulated metadata, with the following form: 224 225 %{key => %{metas: [%{phx_ref: ..., ...}, ...]}} 226 227 For example, imagine a user with id `123` online from two 228 different devices, as well as a user with id `456` online from 229 just one device. The following presence information might be returned: 230 231 %{"123" => %{metas: [%{status: "away", phx_ref: ...}, 232 %{status: "online", phx_ref: ...}]}, 233 "456" => %{metas: [%{status: "online", phx_ref: ...}]}} 234 235 The keys of the map will usually point to a resource ID. The value 236 will contain a map with a `:metas` key containing a list of metadata 237 for each resource. Additionally, every metadata entry will contain a 238 `:phx_ref` key which can be used to uniquely identify metadata for a 239 given key. In the event that the metadata was previously updated, 240 a `:phx_ref_prev` key will be present containing the previous 241 `:phx_ref` value. 242 """ 243 def list(module, topic) do 244 grouped = 245 module 246 |> Phoenix.Tracker.list(topic) 247 |> group() 248 249 module.fetch(topic, grouped) 250 end 251 252 defp group(presences) do 253 presences 254 |> Enum.reverse() 255 |> Enum.reduce(%{}, fn {key, meta}, acc -> 256 Map.update(acc, to_string(key), %{metas: [meta]}, fn %{metas: metas} -> 257 %{metas: [meta | metas]} 258 end) 259 end) 260 end 261end 262