1defmodule Phoenix.Channel.Server do
2  use GenServer
3  require Logger
4  require Phoenix.Endpoint
5  alias Phoenix.PubSub
6  alias Phoenix.Socket
7  alias Phoenix.Socket.{Broadcast, Message, Reply}
8
9  @moduledoc false
10
11  ## Transport API
12
13  @doc """
14  Joins the channel in socket with authentication payload.
15  """
16  @spec join(Socket.t, map) :: {:ok, map, pid} | {:error, map}
17  def join(socket, auth_payload) do
18    Phoenix.Endpoint.instrument socket, :phoenix_channel_join,
19      %{params: auth_payload, socket: socket}, fn ->
20      ref = make_ref()
21
22      case GenServer.start_link(__MODULE__, {socket, auth_payload, self(), ref}) do
23        {:ok, pid} ->
24          receive do: ({^ref, reply} -> {:ok, reply, pid})
25        :ignore ->
26          receive do: ({^ref, reply} -> {:error, reply})
27        {:error, reason} ->
28          Logger.error fn -> Exception.format_exit(reason) end
29          {:error, %{reason: "join crashed"}}
30      end
31    end
32  end
33
34  @doc """
35  Notifies the channel the client closed.
36
37  This event is synchronous as we want to guarantee
38  proper termination of the channel.
39  """
40  def close(pid, timeout \\ 5000) do
41    # We need to guarantee that the channel has been closed
42    # otherwise the link in the transport will trigger it to
43    # crash.
44    ref = Process.monitor(pid)
45    GenServer.cast(pid, :close)
46    receive do
47      {:DOWN, ^ref, _, _, _} -> :ok
48    after
49      timeout ->
50        Process.exit(pid, :kill)
51        receive do
52          {:DOWN, ^ref, _, _, _} -> :ok
53        end
54    end
55  end
56
57  @doc """
58  Gets the socket from the channel.
59  """
60  def socket(pid) do
61    GenServer.call(pid, :socket)
62  end
63
64  ## Channel API
65
66  @doc """
67  Broadcasts on the given pubsub server with the given
68  `topic`, `event` and `payload`.
69
70  The message is encoded as `Phoenix.Socket.Broadcast`.
71  """
72  def broadcast(pubsub_server, topic, event, payload)
73      when is_binary(topic) and is_binary(event) and is_map(payload) do
74    PubSub.broadcast pubsub_server, topic, %Broadcast{
75      topic: topic,
76      event: event,
77      payload: payload
78    }
79  end
80  def broadcast(_, topic, event, payload) do
81    raise_invalid_message(topic, event, payload)
82  end
83
84  @doc """
85  Broadcasts on the given pubsub server with the given
86  `topic`, `event` and `payload`.
87
88  Raises in case of crashes.
89  """
90  def broadcast!(pubsub_server, topic, event, payload)
91      when is_binary(topic) and is_binary(event) and is_map(payload) do
92    PubSub.broadcast! pubsub_server, topic, %Broadcast{
93      topic: topic,
94      event: event,
95      payload: payload
96    }
97  end
98  def broadcast!(_, topic, event, payload) do
99    raise_invalid_message(topic, event, payload)
100  end
101
102
103  @doc """
104  Broadcasts on the given pubsub server with the given
105  `from`, `topic`, `event` and `payload`.
106
107  The message is encoded as `Phoenix.Socket.Broadcast`.
108  """
109  def broadcast_from(pubsub_server, from, topic, event, payload)
110      when is_binary(topic) and is_binary(event) and is_map(payload) do
111    PubSub.broadcast_from pubsub_server, from, topic, %Broadcast{
112      topic: topic,
113      event: event,
114      payload: payload
115    }
116  end
117  def broadcast_from(_, _from, topic, event, payload) do
118    raise_invalid_message(topic, event, payload)
119  end
120
121  @doc """
122  Broadcasts on the given pubsub server with the given
123  `from`, `topic`, `event` and `payload`.
124
125  Raises in case of crashes.
126  """
127  def broadcast_from!(pubsub_server, from, topic, event, payload)
128      when is_binary(topic) and is_binary(event) and is_map(payload) do
129    PubSub.broadcast_from! pubsub_server, from, topic, %Broadcast{
130      topic: topic,
131      event: event,
132      payload: payload
133    }
134  end
135  def broadcast_from!(_, _from, topic, event, payload) do
136    raise_invalid_message(topic, event, payload)
137  end
138
139  @doc """
140  Pushes a message with the given topic, event and payload
141  to the given process.
142  """
143  def push(pid, topic, event, payload, serializer)
144      when is_binary(topic) and is_binary(event) and is_map(payload) do
145
146    encoded_msg = serializer.encode!(%Message{topic: topic,
147                                              event: event,
148                                              payload: payload})
149    send pid, encoded_msg
150    :ok
151  end
152  def push(_, topic, event, payload, _) do
153    raise_invalid_message(topic, event, payload)
154  end
155
156  @doc """
157  Replies to a given ref to the transport process.
158  """
159  def reply(pid, join_ref, ref, topic, {status, payload}, serializer)
160      when is_binary(topic) and is_map(payload) do
161
162    send pid, serializer.encode!(
163      %Reply{topic: topic, join_ref: join_ref, ref: ref, status: status, payload: payload}
164    )
165    :ok
166  end
167  def reply(_, _, _, topic, {_status, payload}, _) do
168    raise_invalid_message(topic, "phx_reply", payload)
169  end
170
171  @spec raise_invalid_message(topic :: term, event :: term, payload :: term) :: no_return()
172  defp raise_invalid_message(topic, event, payload) do
173    raise ArgumentError, """
174    topic and event must be strings, message must be a map, got:
175
176      topic: #{inspect topic}
177      event: #{inspect event}
178      payload: #{inspect payload}
179    """
180  end
181
182  ## Callbacks
183
184  @doc false
185  def init({socket, auth_payload, parent, ref}) do
186    socket = %{socket | channel_pid: self()}
187
188    case socket.channel.join(socket.topic, auth_payload, socket) do
189      {:ok, socket} ->
190        join(socket, %{}, parent, ref)
191      {:ok, reply, socket} ->
192        join(socket, reply, parent, ref)
193      {:error, reply} ->
194        send(parent, {ref, reply})
195        :ignore
196      other ->
197        raise """
198        channel #{inspect socket.channel}.join/3 is expected to return one of:
199
200            {:ok, Socket.t} |
201            {:ok, reply :: map, Socket.t} |
202            {:error, reply :: map}
203
204        got #{inspect other}
205        """
206    end
207  end
208
209  @doc false
210  def code_change(old, socket, extra) do
211    socket.channel.code_change(old, socket, extra)
212  end
213
214  defp join(socket, reply, parent, ref) do
215    PubSub.subscribe(socket.pubsub_server, socket.topic,
216      link: true,
217      fastlane: {socket.transport_pid,
218                 socket.serializer,
219                 socket.channel.__intercepts__()})
220
221    send(parent, {ref, reply})
222    {:ok, %{socket | joined: true}}
223  end
224
225  @doc false
226  def handle_call(:socket, _from, socket) do
227    {:reply, socket, socket}
228  end
229
230  @doc false
231  def handle_cast(:close, socket) do
232    handle_result({:stop, {:shutdown, :closed}, socket}, :handle_in)
233  end
234
235  def handle_info(%Message{topic: topic, event: "phx_leave", ref: ref}, %{topic: topic} = socket) do
236    handle_result({:stop, {:shutdown, :left}, :ok, put_in(socket.ref, ref)}, :handle_in)
237  end
238
239  def handle_info(%Message{topic: topic, event: event, payload: payload, ref: ref},
240                  %{topic: topic} = socket) do
241    Phoenix.Endpoint.instrument socket, :phoenix_channel_receive,
242      %{ref: ref, event: event, params: payload, socket: socket}, fn ->
243      event
244      |> socket.channel.handle_in(payload, put_in(socket.ref, ref))
245      |> handle_result(:handle_in)
246    end
247  end
248
249  def handle_info(%Broadcast{topic: topic, event: event, payload: payload},
250                  %Socket{topic: topic} = socket) do
251    event
252    |> socket.channel.handle_out(payload, socket)
253    |> handle_result(:handle_out)
254  end
255
256  def handle_info(msg, socket) do
257    msg
258    |> socket.channel.handle_info(socket)
259    |> handle_result(:handle_info)
260  end
261
262  @doc false
263  def terminate(reason, socket) do
264    socket.channel.terminate(reason, socket)
265  end
266
267  @doc false
268  def fastlane(subscribers, from, %Broadcast{event: event} = msg) do
269    Enum.reduce(subscribers, %{}, fn
270      {pid, _fastlanes}, cache when pid == from ->
271        cache
272
273      {pid, nil}, cache ->
274        send(pid, msg)
275        cache
276
277      {pid, {fastlane_pid, serializer, event_intercepts}}, cache ->
278        if event in event_intercepts do
279          send(pid, msg)
280          cache
281        else
282          case Map.fetch(cache, serializer) do
283            {:ok, encoded_msg} ->
284              send(fastlane_pid, encoded_msg)
285              cache
286            :error ->
287              encoded_msg = serializer.fastlane!(msg)
288              send(fastlane_pid, encoded_msg)
289              Map.put(cache, serializer, encoded_msg)
290          end
291        end
292    end)
293  end
294
295  def fastlane(subscribers, from, msg) do
296    Enum.each(subscribers, fn
297      {pid, _} when pid == from -> :noop
298      {pid, _} -> send(pid, msg)
299    end)
300  end
301
302  @doc false
303  # TODO: Revisit in future GenServer releases
304  def unhandled_handle_info(msg, state) do
305    proc =
306      case Process.info(self(), :registered_name) do
307        {_, []}   -> self()
308        {_, name} -> name
309      end
310    :error_logger.warning_msg('~p ~p received unexpected message in handle_info/2: ~p~n',
311                              [__MODULE__, proc, msg])
312    {:noreply, state}
313  end
314
315  ## Handle results
316
317  defp handle_result({:reply, reply, %Socket{} = socket}, callback) do
318    handle_reply(socket, reply, callback)
319    {:noreply, put_in(socket.ref, nil)}
320  end
321
322  defp handle_result({:stop, reason, reply, socket}, callback) do
323    handle_reply(socket, reply, callback)
324    handle_result({:stop, reason, socket}, callback)
325  end
326
327  defp handle_result({:stop, reason, socket}, _callback) do
328    case reason do
329      :normal -> notify_transport_of_graceful_exit(socket)
330      :shutdown -> notify_transport_of_graceful_exit(socket)
331      {:shutdown, _} -> notify_transport_of_graceful_exit(socket)
332      _ -> :noop
333    end
334    {:stop, reason, socket}
335  end
336
337  defp handle_result({:noreply, socket}, _callback) do
338    {:noreply, put_in(socket.ref, nil)}
339  end
340
341  defp handle_result({:noreply, socket, timeout_or_hibernate}, _callback) do
342    {:noreply, put_in(socket.ref, nil), timeout_or_hibernate}
343  end
344
345  defp handle_result(result, :handle_in) do
346    raise """
347    Expected `handle_in/3` to return one of:
348
349        {:noreply, Socket.t} |
350        {:noreply, Socket.t, timeout | :hibernate} |
351        {:reply, {status :: atom, response :: map}, Socket.t} |
352        {:reply, status :: atom, Socket.t} |
353        {:stop, reason :: term, Socket.t} |
354        {:stop, reason :: term, {status :: atom, response :: map}, Socket.t} |
355        {:stop, reason :: term, status :: atom, Socket.t}
356
357    got #{inspect result}
358    """
359  end
360
361  defp handle_result(result, callback) do
362    raise """
363    Expected `#{callback}` to return one of:
364
365        {:noreply, Socket.t} |
366        {:noreply, Socket.t, timeout | :hibernate} |
367        {:stop, reason :: term, Socket.t} |
368
369    got #{inspect result}
370    """
371  end
372
373  ## Handle replies
374
375  defp handle_reply(socket, {status, payload}, :handle_in)
376       when is_atom(status) and is_map(payload) do
377
378    reply(socket.transport_pid, socket.join_ref, socket.ref, socket.topic, {status, payload},
379          socket.serializer)
380  end
381
382  defp handle_reply(socket, status, :handle_in) when is_atom(status) do
383    handle_reply(socket, {status, %{}}, :handle_in)
384  end
385
386  defp handle_reply(_socket, reply, :handle_in) do
387    raise """
388    Channel replies from `handle_in/3` are expected to be one of:
389
390        status :: atom
391        {status :: atom, response :: map}
392
393    for example:
394
395        {:reply, :ok, socket}
396        {:reply, {:ok, %{}}, socket}
397        {:stop, :shutdown, {:error, %{}}, socket}
398
399    got #{inspect reply}
400    """
401  end
402
403  defp handle_reply(_socket, _reply, _other) do
404    raise """
405    Channel replies can only be sent from a `handle_in/3` callback.
406    Use `push/3` to send an out-of-band message down the socket
407    """
408  end
409
410  defp notify_transport_of_graceful_exit(socket) do
411    Phoenix.Socket.Transport.notify_graceful_exit(socket)
412    Process.unlink(socket.transport_pid)
413    :ok
414  end
415end
416