1defmodule Phoenix.Channel.Server do 2 use GenServer 3 require Logger 4 require Phoenix.Endpoint 5 alias Phoenix.PubSub 6 alias Phoenix.Socket 7 alias Phoenix.Socket.{Broadcast, Message, Reply} 8 9 @moduledoc false 10 11 ## Transport API 12 13 @doc """ 14 Joins the channel in socket with authentication payload. 15 """ 16 @spec join(Socket.t, map) :: {:ok, map, pid} | {:error, map} 17 def join(socket, auth_payload) do 18 Phoenix.Endpoint.instrument socket, :phoenix_channel_join, 19 %{params: auth_payload, socket: socket}, fn -> 20 ref = make_ref() 21 22 case GenServer.start_link(__MODULE__, {socket, auth_payload, self(), ref}) do 23 {:ok, pid} -> 24 receive do: ({^ref, reply} -> {:ok, reply, pid}) 25 :ignore -> 26 receive do: ({^ref, reply} -> {:error, reply}) 27 {:error, reason} -> 28 Logger.error fn -> Exception.format_exit(reason) end 29 {:error, %{reason: "join crashed"}} 30 end 31 end 32 end 33 34 @doc """ 35 Notifies the channel the client closed. 36 37 This event is synchronous as we want to guarantee 38 proper termination of the channel. 39 """ 40 def close(pid, timeout \\ 5000) do 41 # We need to guarantee that the channel has been closed 42 # otherwise the link in the transport will trigger it to 43 # crash. 44 ref = Process.monitor(pid) 45 GenServer.cast(pid, :close) 46 receive do 47 {:DOWN, ^ref, _, _, _} -> :ok 48 after 49 timeout -> 50 Process.exit(pid, :kill) 51 receive do 52 {:DOWN, ^ref, _, _, _} -> :ok 53 end 54 end 55 end 56 57 @doc """ 58 Gets the socket from the channel. 59 """ 60 def socket(pid) do 61 GenServer.call(pid, :socket) 62 end 63 64 ## Channel API 65 66 @doc """ 67 Broadcasts on the given pubsub server with the given 68 `topic`, `event` and `payload`. 69 70 The message is encoded as `Phoenix.Socket.Broadcast`. 71 """ 72 def broadcast(pubsub_server, topic, event, payload) 73 when is_binary(topic) and is_binary(event) and is_map(payload) do 74 PubSub.broadcast pubsub_server, topic, %Broadcast{ 75 topic: topic, 76 event: event, 77 payload: payload 78 } 79 end 80 def broadcast(_, topic, event, payload) do 81 raise_invalid_message(topic, event, payload) 82 end 83 84 @doc """ 85 Broadcasts on the given pubsub server with the given 86 `topic`, `event` and `payload`. 87 88 Raises in case of crashes. 89 """ 90 def broadcast!(pubsub_server, topic, event, payload) 91 when is_binary(topic) and is_binary(event) and is_map(payload) do 92 PubSub.broadcast! pubsub_server, topic, %Broadcast{ 93 topic: topic, 94 event: event, 95 payload: payload 96 } 97 end 98 def broadcast!(_, topic, event, payload) do 99 raise_invalid_message(topic, event, payload) 100 end 101 102 103 @doc """ 104 Broadcasts on the given pubsub server with the given 105 `from`, `topic`, `event` and `payload`. 106 107 The message is encoded as `Phoenix.Socket.Broadcast`. 108 """ 109 def broadcast_from(pubsub_server, from, topic, event, payload) 110 when is_binary(topic) and is_binary(event) and is_map(payload) do 111 PubSub.broadcast_from pubsub_server, from, topic, %Broadcast{ 112 topic: topic, 113 event: event, 114 payload: payload 115 } 116 end 117 def broadcast_from(_, _from, topic, event, payload) do 118 raise_invalid_message(topic, event, payload) 119 end 120 121 @doc """ 122 Broadcasts on the given pubsub server with the given 123 `from`, `topic`, `event` and `payload`. 124 125 Raises in case of crashes. 126 """ 127 def broadcast_from!(pubsub_server, from, topic, event, payload) 128 when is_binary(topic) and is_binary(event) and is_map(payload) do 129 PubSub.broadcast_from! pubsub_server, from, topic, %Broadcast{ 130 topic: topic, 131 event: event, 132 payload: payload 133 } 134 end 135 def broadcast_from!(_, _from, topic, event, payload) do 136 raise_invalid_message(topic, event, payload) 137 end 138 139 @doc """ 140 Pushes a message with the given topic, event and payload 141 to the given process. 142 """ 143 def push(pid, topic, event, payload, serializer) 144 when is_binary(topic) and is_binary(event) and is_map(payload) do 145 146 encoded_msg = serializer.encode!(%Message{topic: topic, 147 event: event, 148 payload: payload}) 149 send pid, encoded_msg 150 :ok 151 end 152 def push(_, topic, event, payload, _) do 153 raise_invalid_message(topic, event, payload) 154 end 155 156 @doc """ 157 Replies to a given ref to the transport process. 158 """ 159 def reply(pid, join_ref, ref, topic, {status, payload}, serializer) 160 when is_binary(topic) and is_map(payload) do 161 162 send pid, serializer.encode!( 163 %Reply{topic: topic, join_ref: join_ref, ref: ref, status: status, payload: payload} 164 ) 165 :ok 166 end 167 def reply(_, _, _, topic, {_status, payload}, _) do 168 raise_invalid_message(topic, "phx_reply", payload) 169 end 170 171 @spec raise_invalid_message(topic :: term, event :: term, payload :: term) :: no_return() 172 defp raise_invalid_message(topic, event, payload) do 173 raise ArgumentError, """ 174 topic and event must be strings, message must be a map, got: 175 176 topic: #{inspect topic} 177 event: #{inspect event} 178 payload: #{inspect payload} 179 """ 180 end 181 182 ## Callbacks 183 184 @doc false 185 def init({socket, auth_payload, parent, ref}) do 186 socket = %{socket | channel_pid: self()} 187 188 case socket.channel.join(socket.topic, auth_payload, socket) do 189 {:ok, socket} -> 190 join(socket, %{}, parent, ref) 191 {:ok, reply, socket} -> 192 join(socket, reply, parent, ref) 193 {:error, reply} -> 194 send(parent, {ref, reply}) 195 :ignore 196 other -> 197 raise """ 198 channel #{inspect socket.channel}.join/3 is expected to return one of: 199 200 {:ok, Socket.t} | 201 {:ok, reply :: map, Socket.t} | 202 {:error, reply :: map} 203 204 got #{inspect other} 205 """ 206 end 207 end 208 209 @doc false 210 def code_change(old, socket, extra) do 211 socket.channel.code_change(old, socket, extra) 212 end 213 214 defp join(socket, reply, parent, ref) do 215 PubSub.subscribe(socket.pubsub_server, socket.topic, 216 link: true, 217 fastlane: {socket.transport_pid, 218 socket.serializer, 219 socket.channel.__intercepts__()}) 220 221 send(parent, {ref, reply}) 222 {:ok, %{socket | joined: true}} 223 end 224 225 @doc false 226 def handle_call(:socket, _from, socket) do 227 {:reply, socket, socket} 228 end 229 230 @doc false 231 def handle_cast(:close, socket) do 232 handle_result({:stop, {:shutdown, :closed}, socket}, :handle_in) 233 end 234 235 def handle_info(%Message{topic: topic, event: "phx_leave", ref: ref}, %{topic: topic} = socket) do 236 handle_result({:stop, {:shutdown, :left}, :ok, put_in(socket.ref, ref)}, :handle_in) 237 end 238 239 def handle_info(%Message{topic: topic, event: event, payload: payload, ref: ref}, 240 %{topic: topic} = socket) do 241 Phoenix.Endpoint.instrument socket, :phoenix_channel_receive, 242 %{ref: ref, event: event, params: payload, socket: socket}, fn -> 243 event 244 |> socket.channel.handle_in(payload, put_in(socket.ref, ref)) 245 |> handle_result(:handle_in) 246 end 247 end 248 249 def handle_info(%Broadcast{topic: topic, event: event, payload: payload}, 250 %Socket{topic: topic} = socket) do 251 event 252 |> socket.channel.handle_out(payload, socket) 253 |> handle_result(:handle_out) 254 end 255 256 def handle_info(msg, socket) do 257 msg 258 |> socket.channel.handle_info(socket) 259 |> handle_result(:handle_info) 260 end 261 262 @doc false 263 def terminate(reason, socket) do 264 socket.channel.terminate(reason, socket) 265 end 266 267 @doc false 268 def fastlane(subscribers, from, %Broadcast{event: event} = msg) do 269 Enum.reduce(subscribers, %{}, fn 270 {pid, _fastlanes}, cache when pid == from -> 271 cache 272 273 {pid, nil}, cache -> 274 send(pid, msg) 275 cache 276 277 {pid, {fastlane_pid, serializer, event_intercepts}}, cache -> 278 if event in event_intercepts do 279 send(pid, msg) 280 cache 281 else 282 case Map.fetch(cache, serializer) do 283 {:ok, encoded_msg} -> 284 send(fastlane_pid, encoded_msg) 285 cache 286 :error -> 287 encoded_msg = serializer.fastlane!(msg) 288 send(fastlane_pid, encoded_msg) 289 Map.put(cache, serializer, encoded_msg) 290 end 291 end 292 end) 293 end 294 295 def fastlane(subscribers, from, msg) do 296 Enum.each(subscribers, fn 297 {pid, _} when pid == from -> :noop 298 {pid, _} -> send(pid, msg) 299 end) 300 end 301 302 @doc false 303 # TODO: Revisit in future GenServer releases 304 def unhandled_handle_info(msg, state) do 305 proc = 306 case Process.info(self(), :registered_name) do 307 {_, []} -> self() 308 {_, name} -> name 309 end 310 :error_logger.warning_msg('~p ~p received unexpected message in handle_info/2: ~p~n', 311 [__MODULE__, proc, msg]) 312 {:noreply, state} 313 end 314 315 ## Handle results 316 317 defp handle_result({:reply, reply, %Socket{} = socket}, callback) do 318 handle_reply(socket, reply, callback) 319 {:noreply, put_in(socket.ref, nil)} 320 end 321 322 defp handle_result({:stop, reason, reply, socket}, callback) do 323 handle_reply(socket, reply, callback) 324 handle_result({:stop, reason, socket}, callback) 325 end 326 327 defp handle_result({:stop, reason, socket}, _callback) do 328 case reason do 329 :normal -> notify_transport_of_graceful_exit(socket) 330 :shutdown -> notify_transport_of_graceful_exit(socket) 331 {:shutdown, _} -> notify_transport_of_graceful_exit(socket) 332 _ -> :noop 333 end 334 {:stop, reason, socket} 335 end 336 337 defp handle_result({:noreply, socket}, _callback) do 338 {:noreply, put_in(socket.ref, nil)} 339 end 340 341 defp handle_result({:noreply, socket, timeout_or_hibernate}, _callback) do 342 {:noreply, put_in(socket.ref, nil), timeout_or_hibernate} 343 end 344 345 defp handle_result(result, :handle_in) do 346 raise """ 347 Expected `handle_in/3` to return one of: 348 349 {:noreply, Socket.t} | 350 {:noreply, Socket.t, timeout | :hibernate} | 351 {:reply, {status :: atom, response :: map}, Socket.t} | 352 {:reply, status :: atom, Socket.t} | 353 {:stop, reason :: term, Socket.t} | 354 {:stop, reason :: term, {status :: atom, response :: map}, Socket.t} | 355 {:stop, reason :: term, status :: atom, Socket.t} 356 357 got #{inspect result} 358 """ 359 end 360 361 defp handle_result(result, callback) do 362 raise """ 363 Expected `#{callback}` to return one of: 364 365 {:noreply, Socket.t} | 366 {:noreply, Socket.t, timeout | :hibernate} | 367 {:stop, reason :: term, Socket.t} | 368 369 got #{inspect result} 370 """ 371 end 372 373 ## Handle replies 374 375 defp handle_reply(socket, {status, payload}, :handle_in) 376 when is_atom(status) and is_map(payload) do 377 378 reply(socket.transport_pid, socket.join_ref, socket.ref, socket.topic, {status, payload}, 379 socket.serializer) 380 end 381 382 defp handle_reply(socket, status, :handle_in) when is_atom(status) do 383 handle_reply(socket, {status, %{}}, :handle_in) 384 end 385 386 defp handle_reply(_socket, reply, :handle_in) do 387 raise """ 388 Channel replies from `handle_in/3` are expected to be one of: 389 390 status :: atom 391 {status :: atom, response :: map} 392 393 for example: 394 395 {:reply, :ok, socket} 396 {:reply, {:ok, %{}}, socket} 397 {:stop, :shutdown, {:error, %{}}, socket} 398 399 got #{inspect reply} 400 """ 401 end 402 403 defp handle_reply(_socket, _reply, _other) do 404 raise """ 405 Channel replies can only be sent from a `handle_in/3` callback. 406 Use `push/3` to send an out-of-band message down the socket 407 """ 408 end 409 410 defp notify_transport_of_graceful_exit(socket) do 411 Phoenix.Socket.Transport.notify_graceful_exit(socket) 412 Process.unlink(socket.transport_pid) 413 :ok 414 end 415end 416