1%% Copyright (c) 2013-2018, Loïc Hoguin <essen@ninenines.eu>
2%%
3%% Permission to use, copy, modify, and/or distribute this software for any
4%% purpose with or without fee is hereby granted, provided that the above
5%% copyright notice and this permission notice appear in all copies.
6%%
7%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
8%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
9%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
10%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
11%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
12%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
13%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
14
15-module(gun).
16
17-ifdef(OTP_RELEASE).
18-compile({nowarn_deprecated_function, [{erlang, get_stacktrace, 0}]}).
19-endif.
20
21%% Connection.
22-export([open/2]).
23-export([open/3]).
24-export([open_unix/2]).
25-export([info/1]).
26-export([close/1]).
27-export([shutdown/1]).
28
29%% Requests.
30-export([delete/2]).
31-export([delete/3]).
32-export([delete/4]).
33-export([get/2]).
34-export([get/3]).
35-export([get/4]).
36-export([head/2]).
37-export([head/3]).
38-export([head/4]).
39-export([options/2]).
40-export([options/3]).
41-export([options/4]).
42-export([patch/3]).
43-export([patch/4]).
44-export([patch/5]).
45-export([post/3]).
46-export([post/4]).
47-export([post/5]).
48-export([put/3]).
49-export([put/4]).
50-export([put/5]).
51-export([request/4]).
52-export([request/5]).
53-export([request/6]).
54
55%% Streaming data.
56-export([data/4]).
57
58%% Tunneling.
59-export([connect/2]).
60-export([connect/3]).
61-export([connect/4]).
62
63%% Awaiting gun messages.
64-export([await/2]).
65-export([await/3]).
66-export([await/4]).
67-export([await_body/2]).
68-export([await_body/3]).
69-export([await_body/4]).
70-export([await_up/1]).
71-export([await_up/2]).
72-export([await_up/3]).
73
74%% Flushing gun messages.
75-export([flush/1]).
76
77%% Cancelling a stream.
78-export([cancel/2]).
79
80%% Websocket.
81-export([ws_upgrade/2]).
82-export([ws_upgrade/3]).
83-export([ws_upgrade/4]).
84-export([ws_send/2]).
85
86%% Internals.
87-export([start_link/4]).
88-export([proc_lib_hack/5]).
89-export([system_continue/3]).
90-export([system_terminate/4]).
91-export([system_code_change/4]).
92
93-type headers() :: [{binary(), iodata()}].
94
95-type ws_close_code() :: 1000..4999.
96-type ws_frame() :: close | ping | pong
97	| {text | binary | close | ping | pong, iodata()}
98	| {close, ws_close_code(), iodata()}.
99
100-type opts() :: #{
101	connect_timeout => timeout(),
102	http_opts       => http_opts(),
103	http2_opts      => http2_opts(),
104	protocols       => [http | http2],
105	retry           => non_neg_integer(),
106	retry_timeout   => pos_integer(),
107	trace           => boolean(),
108	transport       => tcp | tls | ssl,
109	transport_opts  => [gen_tcp:connect_option()] | [ssl:connect_option()],
110	ws_opts         => ws_opts()
111}.
112-export_type([opts/0]).
113%% @todo Add an option to disable/enable the notowner behavior.
114
115-type connect_destination() :: #{
116	host := inet:hostname() | inet:ip_address(),
117	port := inet:port_number(),
118	username => iodata(),
119	password => iodata(),
120	protocol => http | http2, %% @todo Remove in Gun 2.0.
121	protocols => [http | http2],
122	transport => tcp | tls,
123	tls_opts => [ssl:connect_option()],
124	tls_handshake_timeout => timeout()
125}.
126-export_type([connect_destination/0]).
127
128-type intermediary() :: #{
129	type := connect,
130	host := inet:hostname() | inet:ip_address(),
131	port := inet:port_number(),
132	transport := tcp | tls,
133	protocol := http | http2
134}.
135
136%% @todo When/if HTTP/2 CONNECT gets implemented, we will want an option here
137%% to indicate that the request must be sent on an existing CONNECT stream.
138%% This is of course not required for HTTP/1.1 since the CONNECT takes over
139%% the entire connection.
140-type req_opts() :: #{
141	reply_to => pid()
142}.
143-export_type([req_opts/0]).
144
145-type http_opts() :: #{
146	keepalive             => timeout(),
147	transform_header_name => fun((binary()) -> binary()),
148	version               => 'HTTP/1.1' | 'HTTP/1.0'
149}.
150-export_type([http_opts/0]).
151
152-type http2_opts() :: #{
153	keepalive => timeout()
154}.
155-export_type([http2_opts/0]).
156
157%% @todo keepalive
158-type ws_opts() :: #{
159	compress => boolean(),
160	protocols => [{binary(), module()}]
161}.
162-export_type([ws_opts/0]).
163
164-record(state, {
165	parent :: pid(),
166	owner :: pid(),
167	owner_ref :: reference(),
168	host :: inet:hostname() | inet:ip_address(),
169	port :: inet:port_number(),
170	origin_host :: inet:hostname() | inet:ip_address(),
171	origin_port :: inet:port_number(),
172	intermediaries = [] :: [intermediary()],
173	opts :: opts(),
174	keepalive_ref :: undefined | reference(),
175	socket :: undefined | inet:socket() | ssl:sslsocket(),
176	transport :: module(),
177	protocol :: module(),
178	protocol_state :: any(),
179	last_error :: any()
180}).
181
182%% Connection.
183
184-spec open(inet:hostname() | inet:ip_address(), inet:port_number())
185	-> {ok, pid()} | {error, any()}.
186open(Host, Port) ->
187	open(Host, Port, #{}).
188
189-spec open(inet:hostname() | inet:ip_address(), inet:port_number(), opts())
190	-> {ok, pid()} | {error, any()}.
191open(Host, Port, Opts) when is_list(Host); is_atom(Host); is_tuple(Host) ->
192	do_open(Host, Port, Opts).
193
194-spec open_unix(Path::string(), opts())
195	-> {ok, pid()} | {error, any()}.
196open_unix(SocketPath, Opts) ->
197	do_open({local, SocketPath}, 0, Opts).
198
199do_open(Host, Port, Opts0) ->
200	%% We accept both ssl and tls but only use tls in the code.
201	Opts = case Opts0 of
202		#{transport := ssl} -> Opts0#{transport => tls};
203		_ -> Opts0
204	end,
205	case check_options(maps:to_list(Opts)) of
206		ok ->
207			case supervisor:start_child(gun_sup, [self(), Host, Port, Opts]) of
208				OK = {ok, ServerPid} ->
209					consider_tracing(ServerPid, Opts),
210					OK;
211				StartError ->
212					StartError
213			end;
214		CheckError ->
215			CheckError
216	end.
217
218check_options([]) ->
219	ok;
220check_options([{connect_timeout, infinity}|Opts]) ->
221	check_options(Opts);
222check_options([{connect_timeout, T}|Opts]) when is_integer(T), T >= 0 ->
223	check_options(Opts);
224check_options([{http_opts, ProtoOpts}|Opts]) when is_map(ProtoOpts) ->
225	case gun_http:check_options(ProtoOpts) of
226		ok ->
227			check_options(Opts);
228		Error ->
229			Error
230	end;
231check_options([{http2_opts, ProtoOpts}|Opts]) when is_map(ProtoOpts) ->
232	case gun_http2:check_options(ProtoOpts) of
233		ok ->
234			check_options(Opts);
235		Error ->
236			Error
237	end;
238check_options([Opt = {protocols, L}|Opts]) when is_list(L) ->
239	Len = length(L),
240	case length(lists:usort(L)) of
241		Len when Len > 0 ->
242			Check = lists:usort([(P =:= http) orelse (P =:= http2) || P <- L]),
243			case Check of
244				[true] ->
245					check_options(Opts);
246				_ ->
247					{error, {options, Opt}}
248			end;
249		_ ->
250			{error, {options, Opt}}
251	end;
252check_options([{retry, R}|Opts]) when is_integer(R), R >= 0 ->
253	check_options(Opts);
254check_options([{retry_timeout, T}|Opts]) when is_integer(T), T >= 0 ->
255	check_options(Opts);
256check_options([{trace, B}|Opts]) when B =:= true; B =:= false ->
257	check_options(Opts);
258check_options([{transport, T}|Opts]) when T =:= tcp; T =:= tls ->
259	check_options(Opts);
260check_options([{transport_opts, L}|Opts]) when is_list(L) ->
261	check_options(Opts);
262check_options([{ws_opts, ProtoOpts}|Opts]) when is_map(ProtoOpts) ->
263	case gun_ws:check_options(ProtoOpts) of
264		ok ->
265			check_options(Opts);
266		Error ->
267			Error
268	end;
269check_options([Opt|_]) ->
270	{error, {options, Opt}}.
271
272consider_tracing(ServerPid, #{trace := true}) ->
273	dbg:start(),
274	dbg:tracer(),
275	dbg:tpl(gun, [{'_', [], [{return_trace}]}]),
276	dbg:tpl(gun_http, [{'_', [], [{return_trace}]}]),
277	dbg:tpl(gun_http2, [{'_', [], [{return_trace}]}]),
278	dbg:tpl(gun_ws, [{'_', [], [{return_trace}]}]),
279	dbg:p(ServerPid, all);
280consider_tracing(_, _) ->
281	ok.
282
283-spec info(pid()) -> map().
284info(ServerPid) ->
285	{_, #state{
286		socket=Socket,
287		transport=Transport,
288		protocol=Protocol,
289		origin_host=OriginHost,
290		origin_port=OriginPort,
291		intermediaries=Intermediaries
292	}} = sys:get_state(ServerPid),
293	{ok, {SockIP, SockPort}} = Transport:sockname(Socket),
294	#{
295		socket => Socket,
296		transport => Transport:name(),
297		protocol => Protocol:name(),
298		sock_ip => SockIP,
299		sock_port => SockPort,
300		origin_host => OriginHost,
301		origin_port => OriginPort,
302		%% Intermediaries are listed in the order data goes through them.
303		intermediaries => lists:reverse(Intermediaries)
304	}.
305
306-spec close(pid()) -> ok.
307close(ServerPid) ->
308	supervisor:terminate_child(gun_sup, ServerPid).
309
310-spec shutdown(pid()) -> ok.
311shutdown(ServerPid) ->
312	_ = ServerPid ! {shutdown, self()},
313	ok.
314
315%% Requests.
316
317-spec delete(pid(), iodata()) -> reference().
318delete(ServerPid, Path) ->
319	request(ServerPid, <<"DELETE">>, Path, []).
320
321-spec delete(pid(), iodata(), headers()) -> reference().
322delete(ServerPid, Path, Headers) ->
323	request(ServerPid, <<"DELETE">>, Path, Headers).
324
325-spec delete(pid(), iodata(), headers(), req_opts()) -> reference().
326delete(ServerPid, Path, Headers, ReqOpts) ->
327	request(ServerPid, <<"DELETE">>, Path, Headers, <<>>, ReqOpts).
328
329-spec get(pid(), iodata()) -> reference().
330get(ServerPid, Path) ->
331	request(ServerPid, <<"GET">>, Path, []).
332
333-spec get(pid(), iodata(), headers()) -> reference().
334get(ServerPid, Path, Headers) ->
335	request(ServerPid, <<"GET">>, Path, Headers).
336
337-spec get(pid(), iodata(), headers(), req_opts()) -> reference().
338get(ServerPid, Path, Headers, ReqOpts) ->
339	request(ServerPid, <<"GET">>, Path, Headers, <<>>, ReqOpts).
340
341-spec head(pid(), iodata()) -> reference().
342head(ServerPid, Path) ->
343	request(ServerPid, <<"HEAD">>, Path, []).
344
345-spec head(pid(), iodata(), headers()) -> reference().
346head(ServerPid, Path, Headers) ->
347	request(ServerPid, <<"HEAD">>, Path, Headers).
348
349-spec head(pid(), iodata(), headers(), req_opts()) -> reference().
350head(ServerPid, Path, Headers, ReqOpts) ->
351	request(ServerPid, <<"HEAD">>, Path, Headers, <<>>, ReqOpts).
352
353-spec options(pid(), iodata()) -> reference().
354options(ServerPid, Path) ->
355	request(ServerPid, <<"OPTIONS">>, Path, []).
356
357-spec options(pid(), iodata(), headers()) -> reference().
358options(ServerPid, Path, Headers) ->
359	request(ServerPid, <<"OPTIONS">>, Path, Headers).
360
361-spec options(pid(), iodata(), headers(), req_opts()) -> reference().
362options(ServerPid, Path, Headers, ReqOpts) ->
363	request(ServerPid, <<"OPTIONS">>, Path, Headers, <<>>, ReqOpts).
364
365-spec patch(pid(), iodata(), headers()) -> reference().
366patch(ServerPid, Path, Headers) ->
367	request(ServerPid, <<"PATCH">>, Path, Headers).
368
369-spec patch(pid(), iodata(), headers(), iodata()) -> reference().
370patch(ServerPid, Path, Headers, Body) ->
371	request(ServerPid, <<"PATCH">>, Path, Headers, Body).
372
373-spec patch(pid(), iodata(), headers(), iodata(), req_opts()) -> reference().
374patch(ServerPid, Path, Headers, Body, ReqOpts) ->
375	request(ServerPid, <<"PATCH">>, Path, Headers, Body, ReqOpts).
376
377-spec post(pid(), iodata(), headers()) -> reference().
378post(ServerPid, Path, Headers) ->
379	request(ServerPid, <<"POST">>, Path, Headers).
380
381-spec post(pid(), iodata(), headers(), iodata()) -> reference().
382post(ServerPid, Path, Headers, Body) ->
383	request(ServerPid, <<"POST">>, Path, Headers, Body).
384
385-spec post(pid(), iodata(), headers(), iodata(), req_opts()) -> reference().
386post(ServerPid, Path, Headers, Body, ReqOpts) ->
387	request(ServerPid, <<"POST">>, Path, Headers, Body, ReqOpts).
388
389-spec put(pid(), iodata(), headers()) -> reference().
390put(ServerPid, Path, Headers) ->
391	request(ServerPid, <<"PUT">>, Path, Headers).
392
393-spec put(pid(), iodata(), headers(), iodata()) -> reference().
394put(ServerPid, Path, Headers, Body) ->
395	request(ServerPid, <<"PUT">>, Path, Headers, Body).
396
397-spec put(pid(), iodata(), headers(), iodata(), req_opts()) -> reference().
398put(ServerPid, Path, Headers, Body, ReqOpts) ->
399	request(ServerPid, <<"PUT">>, Path, Headers, Body, ReqOpts).
400
401-spec request(pid(), iodata(), iodata(), headers()) -> reference().
402request(ServerPid, Method, Path, Headers) ->
403	request(ServerPid, Method, Path, Headers, <<>>, #{}).
404
405-spec request(pid(), iodata(), iodata(), headers(), iodata()) -> reference().
406request(ServerPid, Method, Path, Headers, Body) ->
407	request(ServerPid, Method, Path, Headers, Body, #{}).
408
409%% @todo Accept header names as maps.
410-spec request(pid(), iodata(), iodata(), headers(), iodata(), req_opts()) -> reference().
411request(ServerPid, Method, Path, Headers, Body, ReqOpts) ->
412	StreamRef = make_ref(),
413	ReplyTo = maps:get(reply_to, ReqOpts, self()),
414	_ = ServerPid ! {request, ReplyTo, StreamRef, Method, Path, Headers, Body},
415	StreamRef.
416
417%% Streaming data.
418
419-spec data(pid(), reference(), fin | nofin, iodata()) -> ok.
420data(ServerPid, StreamRef, IsFin, Data) ->
421	_ = ServerPid ! {data, self(), StreamRef, IsFin, Data},
422	ok.
423
424%% Tunneling.
425
426-spec connect(pid(), connect_destination()) -> reference().
427connect(ServerPid, Destination) ->
428	connect(ServerPid, Destination, [], #{}).
429
430-spec connect(pid(), connect_destination(), headers()) -> reference().
431connect(ServerPid, Destination, Headers) ->
432	connect(ServerPid, Destination, Headers, #{}).
433
434-spec connect(pid(), connect_destination(), headers(), req_opts()) -> reference().
435connect(ServerPid, Destination, Headers, ReqOpts) ->
436	StreamRef = make_ref(),
437	ReplyTo = maps:get(reply_to, ReqOpts, self()),
438	_ = ServerPid ! {connect, ReplyTo, StreamRef, Destination, Headers},
439	StreamRef.
440
441%% Awaiting gun messages.
442
443%% @todo spec await await_body
444
445await(ServerPid, StreamRef) ->
446	MRef = monitor(process, ServerPid),
447	Res = await(ServerPid, StreamRef, 5000, MRef),
448	demonitor(MRef, [flush]),
449	Res.
450
451await(ServerPid, StreamRef, MRef) when is_reference(MRef) ->
452	await(ServerPid, StreamRef, 5000, MRef);
453await(ServerPid, StreamRef, Timeout) ->
454	MRef = monitor(process, ServerPid),
455	Res = await(ServerPid, StreamRef, Timeout, MRef),
456	demonitor(MRef, [flush]),
457	Res.
458
459%% @todo Add gun_upgrade and gun_ws?
460await(ServerPid, StreamRef, Timeout, MRef) ->
461	receive
462		{gun_inform, ServerPid, StreamRef, Status, Headers} ->
463			{inform, Status, Headers};
464		{gun_response, ServerPid, StreamRef, IsFin, Status, Headers} ->
465			{response, IsFin, Status, Headers};
466		{gun_data, ServerPid, StreamRef, IsFin, Data} ->
467			{data, IsFin, Data};
468		{gun_trailers, ServerPid, StreamRef, Trailers} ->
469			{trailers, Trailers};
470		{gun_push, ServerPid, StreamRef, NewStreamRef, Method, URI, Headers} ->
471			{push, NewStreamRef, Method, URI, Headers};
472		{gun_error, ServerPid, StreamRef, Reason} ->
473			{error, Reason};
474		{gun_error, ServerPid, Reason} ->
475			{error, Reason};
476		{'DOWN', MRef, process, ServerPid, Reason} ->
477			{error, Reason}
478	after Timeout ->
479		{error, timeout}
480	end.
481
482await_body(ServerPid, StreamRef) ->
483	MRef = monitor(process, ServerPid),
484	Res = await_body(ServerPid, StreamRef, 5000, MRef, <<>>),
485	demonitor(MRef, [flush]),
486	Res.
487
488await_body(ServerPid, StreamRef, MRef) when is_reference(MRef) ->
489	await_body(ServerPid, StreamRef, 5000, MRef, <<>>);
490await_body(ServerPid, StreamRef, Timeout) ->
491	MRef = monitor(process, ServerPid),
492	Res = await_body(ServerPid, StreamRef, Timeout, MRef, <<>>),
493	demonitor(MRef, [flush]),
494	Res.
495
496await_body(ServerPid, StreamRef, Timeout, MRef) ->
497	await_body(ServerPid, StreamRef, Timeout, MRef, <<>>).
498
499await_body(ServerPid, StreamRef, Timeout, MRef, Acc) ->
500	receive
501		{gun_data, ServerPid, StreamRef, nofin, Data} ->
502			await_body(ServerPid, StreamRef, Timeout, MRef,
503				<< Acc/binary, Data/binary >>);
504		{gun_data, ServerPid, StreamRef, fin, Data} ->
505			{ok, << Acc/binary, Data/binary >>};
506		%% It's OK to return trailers here because the client
507		%% specifically requested them.
508		{gun_trailers, ServerPid, StreamRef, Trailers} ->
509			{ok, Acc, Trailers};
510		{gun_error, ServerPid, StreamRef, Reason} ->
511			{error, Reason};
512		{gun_error, ServerPid, Reason} ->
513			{error, Reason};
514		{'DOWN', MRef, process, ServerPid, Reason} ->
515			{error, Reason}
516	after Timeout ->
517		{error, timeout}
518	end.
519
520-spec await_up(pid()) -> {ok, http | http2} | {error, atom()}.
521await_up(ServerPid) ->
522	MRef = monitor(process, ServerPid),
523	Res = await_up(ServerPid, 5000, MRef),
524	demonitor(MRef, [flush]),
525	Res.
526
527-spec await_up(pid(), reference() | timeout()) -> {ok, http | http2} | {error, atom()}.
528await_up(ServerPid, MRef) when is_reference(MRef) ->
529	await_up(ServerPid, 5000, MRef);
530await_up(ServerPid, Timeout) ->
531	MRef = monitor(process, ServerPid),
532	Res = await_up(ServerPid, Timeout, MRef),
533	demonitor(MRef, [flush]),
534	Res.
535
536-spec await_up(pid(), timeout(), reference()) -> {ok, http | http2} | {error, atom()}.
537await_up(ServerPid, Timeout, MRef) ->
538	receive
539		{gun_up, ServerPid, Protocol} ->
540			{ok, Protocol};
541		{'DOWN', MRef, process, ServerPid, Reason} ->
542			{error, Reason}
543	after Timeout ->
544		{error, timeout}
545	end.
546
547-spec flush(pid() | reference()) -> ok.
548flush(ServerPid) when is_pid(ServerPid) ->
549	flush_pid(ServerPid);
550flush(StreamRef) ->
551	flush_ref(StreamRef).
552
553flush_pid(ServerPid) ->
554	receive
555		{gun_up, ServerPid, _} ->
556			flush_pid(ServerPid);
557		{gun_down, ServerPid, _, _, _, _} ->
558			flush_pid(ServerPid);
559		{gun_inform, ServerPid, _, _, _} ->
560			flush_pid(ServerPid);
561		{gun_response, ServerPid, _, _, _, _} ->
562			flush_pid(ServerPid);
563		{gun_data, ServerPid, _, _, _} ->
564			flush_pid(ServerPid);
565		{gun_trailers, ServerPid, _, _} ->
566			flush_pid(ServerPid);
567		{gun_push, ServerPid, _, _, _, _, _, _} ->
568			flush_pid(ServerPid);
569		{gun_error, ServerPid, _, _} ->
570			flush_pid(ServerPid);
571		{gun_error, ServerPid, _} ->
572			flush_pid(ServerPid);
573		{gun_upgrade, ServerPid, _, _, _} ->
574			flush_pid(ServerPid);
575		{gun_ws, ServerPid, _, _} ->
576			flush_pid(ServerPid);
577		{'DOWN', _, process, ServerPid, _} ->
578			flush_pid(ServerPid)
579	after 0 ->
580		ok
581	end.
582
583flush_ref(StreamRef) ->
584	receive
585		{gun_inform, _, StreamRef, _, _} ->
586			flush_pid(StreamRef);
587		{gun_response, _, StreamRef, _, _, _} ->
588			flush_ref(StreamRef);
589		{gun_data, _, StreamRef, _, _} ->
590			flush_ref(StreamRef);
591		{gun_trailers, _, StreamRef, _} ->
592			flush_ref(StreamRef);
593		{gun_push, _, StreamRef, _, _, _, _, _} ->
594			flush_ref(StreamRef);
595		{gun_error, _, StreamRef, _} ->
596			flush_ref(StreamRef);
597		{gun_upgrade, _, StreamRef, _, _} ->
598			flush_ref(StreamRef);
599		{gun_ws, _, StreamRef, _} ->
600			flush_ref(StreamRef)
601	after 0 ->
602		ok
603	end.
604
605%% Cancelling a stream.
606
607-spec cancel(pid(), reference()) -> ok.
608cancel(ServerPid, StreamRef) ->
609	_ = ServerPid ! {cancel, self(), StreamRef},
610	ok.
611
612%% @todo Allow upgrading an HTTP/1.1 connection to HTTP/2.
613%% http2_upgrade
614
615%% Websocket.
616
617-spec ws_upgrade(pid(), iodata()) -> reference().
618ws_upgrade(ServerPid, Path) ->
619	ws_upgrade(ServerPid, Path, []).
620
621-spec ws_upgrade(pid(), iodata(), headers()) -> reference().
622ws_upgrade(ServerPid, Path, Headers) ->
623	StreamRef = make_ref(),
624	_ = ServerPid ! {ws_upgrade, self(), StreamRef, Path, Headers},
625	StreamRef.
626
627-spec ws_upgrade(pid(), iodata(), headers(), ws_opts()) -> reference().
628ws_upgrade(ServerPid, Path, Headers, Opts) ->
629	ok = gun_ws:check_options(Opts),
630	StreamRef = make_ref(),
631	_ = ServerPid ! {ws_upgrade, self(), StreamRef, Path, Headers, Opts},
632	StreamRef.
633
634%% @todo ws_send/2 will need to be deprecated in favor of a variant with StreamRef.
635%% But it can be kept for the time being since it can still work for HTTP/1.1.
636-spec ws_send(pid(), ws_frame() | [ws_frame()]) -> ok.
637ws_send(ServerPid, Frames) ->
638	_ = ServerPid ! {ws_send, self(), Frames},
639	ok.
640
641%% Internals.
642
643start_link(Owner, Host, Port, Opts) ->
644	proc_lib:start_link(?MODULE, proc_lib_hack,
645		[self(), Owner, Host, Port, Opts]).
646
647proc_lib_hack(Parent, Owner, Host, Port, Opts) ->
648	try
649		init(Parent, Owner, Host, Port, Opts)
650	catch
651		_:normal -> exit(normal);
652		_:shutdown -> exit(shutdown);
653		_:Reason = {shutdown, _} -> exit(Reason);
654		_:Reason -> exit({Reason, erlang:get_stacktrace()})
655	end.
656
657init(Parent, Owner, Host, Port, Opts) ->
658	ok = proc_lib:init_ack(Parent, {ok, self()}),
659	Retry = maps:get(retry, Opts, 5),
660	Transport = case maps:get(transport, Opts, default_transport(Port)) of
661		tcp -> gun_tcp;
662		tls -> gun_tls
663	end,
664	OwnerRef = monitor(process, Owner),
665	transport_connect(#state{parent=Parent, owner=Owner, owner_ref=OwnerRef,
666		host=Host, port=Port, origin_host=Host, origin_port=Port,
667		opts=Opts, transport=Transport}, Retry).
668
669default_transport(443) -> tls;
670default_transport(_) -> tcp.
671
672transport_connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport=gun_tls}, Retries) ->
673	TransportOpts = [binary, {active, false}|ensure_alpn(
674		maps:get(protocols, Opts, [http2, http]),
675		maps:get(transport_opts, Opts, []))],
676	case Transport:connect(Host, Port, TransportOpts, maps:get(connect_timeout, Opts, infinity)) of
677		{ok, Socket} ->
678			{Protocol, ProtoOptsKey} = case ssl:negotiated_protocol(Socket) of
679				{ok, <<"h2">>} -> {gun_http2, http2_opts};
680				_ -> {gun_http, http_opts}
681			end,
682			up(State, Socket, Protocol, ProtoOptsKey);
683		{error, Reason} ->
684			retry(State#state{last_error=Reason}, Retries)
685	end;
686transport_connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport}, Retries) ->
687	TransportOpts = [binary, {active, false}
688		|maps:get(transport_opts, Opts, [])],
689	case Transport:connect(Host, Port, TransportOpts, maps:get(connect_timeout, Opts, infinity)) of
690		{ok, Socket} ->
691			{Protocol, ProtoOptsKey} = case maps:get(protocols, Opts, [http]) of
692				[http] -> {gun_http, http_opts};
693				[http2] -> {gun_http2, http2_opts}
694			end,
695			up(State, Socket, Protocol, ProtoOptsKey);
696		{error, Reason} ->
697			retry(State#state{last_error=Reason}, Retries)
698	end.
699
700ensure_alpn(Protocols0, TransportOpts) ->
701	Protocols = [case P of
702		http -> <<"http/1.1">>;
703		http2 -> <<"h2">>
704	end || P <- Protocols0],
705	[
706		{alpn_advertised_protocols, Protocols},
707		{client_preferred_next_protocols, {client, Protocols, <<"http/1.1">>}}
708	|TransportOpts].
709
710up(State=#state{owner=Owner, opts=Opts, transport=Transport}, Socket, Protocol, ProtoOptsKey) ->
711	ProtoOpts = maps:get(ProtoOptsKey, Opts, #{}),
712	ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts),
713	Owner ! {gun_up, self(), Protocol:name()},
714	before_loop(State#state{socket=Socket, protocol=Protocol, protocol_state=ProtoState}).
715
716down(State=#state{owner=Owner, opts=Opts, protocol=Protocol, protocol_state=ProtoState}, Reason) ->
717	{KilledStreams, UnprocessedStreams} = Protocol:down(ProtoState),
718	Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams, UnprocessedStreams},
719	retry(State#state{socket=undefined, protocol=undefined, protocol_state=undefined,
720		last_error=Reason}, maps:get(retry, Opts, 5)).
721
722retry(#state{last_error=Reason}, 0) ->
723	exit({shutdown, Reason});
724retry(State=#state{keepalive_ref=KeepaliveRef}, Retries) when is_reference(KeepaliveRef) ->
725	_ = erlang:cancel_timer(KeepaliveRef),
726	%% Flush if we have a keepalive message
727	receive
728		keepalive -> ok
729	after 0 ->
730		ok
731	end,
732	retry_loop(State#state{keepalive_ref=undefined}, Retries - 1);
733retry(State, Retries) ->
734	retry_loop(State, Retries - 1).
735
736retry_loop(State=#state{parent=Parent, opts=Opts}, Retries) ->
737	_ = erlang:send_after(maps:get(retry_timeout, Opts, 5000), self(), retry),
738	receive
739		retry ->
740			transport_connect(State, Retries);
741		{system, From, Request} ->
742			sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
743				{retry_loop, State, Retries})
744	end.
745
746before_loop(State=#state{opts=Opts, protocol=Protocol}) ->
747	%% @todo Might not be worth checking every time?
748	ProtoOptsKey = case Protocol of
749		gun_http -> http_opts;
750		gun_http2 -> http2_opts
751	end,
752	ProtoOpts = maps:get(ProtoOptsKey, Opts, #{}),
753	Keepalive = maps:get(keepalive, ProtoOpts, 5000),
754	KeepaliveRef = case Keepalive of
755		infinity -> undefined;
756		_ -> erlang:send_after(Keepalive, self(), keepalive)
757	end,
758	loop(State#state{keepalive_ref=KeepaliveRef}).
759
760loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef,
761		origin_host=Host, origin_port=Port, opts=Opts, socket=Socket,
762		transport=Transport, protocol=Protocol, protocol_state=ProtoState}) ->
763	{OK, Closed, Error} = Transport:messages(),
764	Transport:setopts(Socket, [{active, once}]),
765	receive
766		{OK, Socket, Data} ->
767			case Protocol:handle(Data, ProtoState) of
768				Commands when is_list(Commands) ->
769					commands(Commands, State);
770				Command ->
771					commands([Command], State)
772			end;
773		{Closed, Socket} ->
774			Protocol:close(ProtoState),
775			Transport:close(Socket),
776			down(State, closed);
777		{Error, Socket, Reason} ->
778			Protocol:close(ProtoState),
779			Transport:close(Socket),
780			down(State, {error, Reason});
781		{OK, _PreviousSocket, _Data} ->
782			loop(State);
783		{Closed, _PreviousSocket} ->
784			loop(State);
785		{Error, _PreviousSocket, _} ->
786			loop(State);
787		keepalive ->
788			ProtoState2 = Protocol:keepalive(ProtoState),
789			before_loop(State#state{protocol_state=ProtoState2});
790		{request, ReplyTo, StreamRef, Method, Path, Headers, <<>>} ->
791			ProtoState2 = Protocol:request(ProtoState,
792				StreamRef, ReplyTo, Method, Host, Port, Path, Headers),
793			loop(State#state{protocol_state=ProtoState2});
794		{request, ReplyTo, StreamRef, Method, Path, Headers, Body} ->
795			ProtoState2 = Protocol:request(ProtoState,
796				StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body),
797			loop(State#state{protocol_state=ProtoState2});
798		%% @todo Do we want to reject ReplyTo if it's not the process
799		%% who initiated the connection? For both data and cancel.
800		{data, ReplyTo, StreamRef, IsFin, Data} ->
801			ProtoState2 = Protocol:data(ProtoState,
802				StreamRef, ReplyTo, IsFin, Data),
803			loop(State#state{protocol_state=ProtoState2});
804		{connect, ReplyTo, StreamRef, Destination0, Headers} ->
805			%% The protocol option has been deprecated in favor of the protocols option.
806			%% Nobody probably ended up using it, but let's not break the interface.
807			Destination1 = case Destination0 of
808				#{protocols := _} ->
809					Destination0;
810				#{protocol := DestProto} ->
811					Destination0#{protocols => [DestProto]};
812				_ ->
813					Destination0
814			end,
815			Destination = case Destination1 of
816				#{transport := tls} ->
817					Destination1#{tls_opts => ensure_alpn(
818						maps:get(protocols, Destination1, [http]),
819						maps:get(tls_opts, Destination1, []))};
820				_ ->
821					Destination1
822			end,
823			ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo, Destination, Headers),
824			loop(State#state{protocol_state=ProtoState2});
825		{cancel, ReplyTo, StreamRef} ->
826			ProtoState2 = Protocol:cancel(ProtoState, StreamRef, ReplyTo),
827			loop(State#state{protocol_state=ProtoState2});
828		%% @todo Maybe make an interface in the protocol module instead of checking on protocol name.
829		%% An interface would also make sure that HTTP/1.0 can't upgrade.
830		{ws_upgrade, Owner, StreamRef, Path, Headers} when Protocol =:= gun_http ->
831			WsOpts = maps:get(ws_opts, Opts, #{}),
832			ProtoState2 = Protocol:ws_upgrade(ProtoState, StreamRef, Host, Port, Path, Headers, WsOpts),
833			loop(State#state{protocol_state=ProtoState2});
834		{ws_upgrade, Owner, StreamRef, Path, Headers, WsOpts} when Protocol =:= gun_http ->
835			ProtoState2 = Protocol:ws_upgrade(ProtoState, StreamRef, Host, Port, Path, Headers, WsOpts),
836			loop(State#state{protocol_state=ProtoState2});
837			%% @todo can fail if http/1.0
838		{shutdown, Owner} ->
839			%% @todo Protocol:shutdown?
840			ok;
841		{'DOWN', OwnerRef, process, Owner, Reason} ->
842			Protocol:close(ProtoState),
843			Transport:close(Socket),
844			owner_gone(Reason);
845		{system, From, Request} ->
846			sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
847				{loop, State});
848		{ws_upgrade, _, StreamRef, _, _} ->
849			Owner ! {gun_error, self(), StreamRef, {badstate,
850				"Websocket is only supported over HTTP/1.1."}},
851			loop(State);
852		{ws_upgrade, _, StreamRef, _, _, _} ->
853			Owner ! {gun_error, self(), StreamRef, {badstate,
854				"Websocket is only supported over HTTP/1.1."}},
855			loop(State);
856		{ws_send, _, _} ->
857			Owner ! {gun_error, self(), {badstate,
858				"Connection needs to be upgraded to Websocket "
859				"before the gun:ws_send/1 function can be used."}},
860			loop(State);
861		%% @todo The ReplyTo patch disabled the notowner behavior.
862		%% We need to add an option to enforce this behavior if needed.
863		Any when is_tuple(Any), is_pid(element(2, Any)) ->
864			element(2, Any) ! {gun_error, self(), {notowner,
865				"Operations are restricted to the owner of the connection."}},
866			loop(State);
867		Any ->
868			error_logger:error_msg("Unexpected message: ~w~n", [Any]),
869			loop(State)
870	end.
871
872commands([], State) ->
873	loop(State);
874commands([close|_], State=#state{socket=Socket, transport=Transport}) ->
875	Transport:close(Socket),
876	down(State, normal);
877commands([Error={error, _}|_], State=#state{socket=Socket, transport=Transport}) ->
878	Transport:close(Socket),
879	down(State, Error);
880commands([{state, ProtoState}|Tail], State) ->
881	commands(Tail, State#state{protocol_state=ProtoState});
882%% @todo The scheme should probably not be ignored.
883%%
884%% Order is important: the origin must be changed before
885%% the transport and/or protocol in order to keep track
886%% of the intermediaries properly.
887commands([{origin, _Scheme, Host, Port, Type}|Tail],
888		State=#state{transport=Transport, protocol=Protocol,
889			origin_host=IntermediateHost, origin_port=IntermediatePort,
890			intermediaries=Intermediaries}) ->
891	Info = #{
892		type => Type,
893		host => IntermediateHost,
894		port => IntermediatePort,
895		transport => Transport:name(),
896		protocol => Protocol:name()
897	},
898	commands(Tail, State#state{origin_host=Host, origin_port=Port,
899		intermediaries=[Info|Intermediaries]});
900commands([{switch_transport, Transport, Socket}|Tail], State) ->
901	commands(Tail, State#state{socket=Socket, transport=Transport});
902%% @todo The two loops should be reunified and this clause generalized.
903commands([{switch_protocol, Protocol=gun_ws, ProtoState}], State) ->
904	ws_loop(State#state{protocol=Protocol, protocol_state=ProtoState});
905%% @todo And this state should probably not be ignored.
906commands([{switch_protocol, Protocol, _ProtoState0}|Tail],
907		State=#state{owner=Owner, opts=Opts, socket=Socket, transport=Transport}) ->
908	ProtoOpts = maps:get(http2_opts, Opts, #{}),
909	ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts),
910	commands(Tail, State#state{protocol=Protocol, protocol_state=ProtoState}).
911
912ws_loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, socket=Socket,
913		transport=Transport, protocol=Protocol, protocol_state=ProtoState}) ->
914	{OK, Closed, Error} = Transport:messages(),
915	Transport:setopts(Socket, [{active, once}]),
916	receive
917		{OK, Socket, Data} ->
918			case Protocol:handle(Data, ProtoState) of
919				close ->
920					Transport:close(Socket),
921					down(State, normal);
922				ProtoState2 ->
923					ws_loop(State#state{protocol_state=ProtoState2})
924			end;
925		{Closed, Socket} ->
926			Transport:close(Socket),
927			down(State, closed);
928		{Error, Socket, Reason} ->
929			Transport:close(Socket),
930			down(State, {error, Reason});
931		%% Ignore any previous HTTP keep-alive.
932		keepalive ->
933			ws_loop(State);
934%		{ws_send, Owner, Frames} when is_list(Frames) ->
935%			todo; %% @todo
936		{ws_send, Owner, Frame} ->
937			case Protocol:send(Frame, ProtoState) of
938				close ->
939					Transport:close(Socket),
940					down(State, normal);
941				ProtoState2 ->
942					ws_loop(State#state{protocol_state=ProtoState2})
943			end;
944		{shutdown, Owner} ->
945			%% @todo Protocol:shutdown? %% @todo close frame
946			ok;
947		{'DOWN', OwnerRef, process, Owner, Reason} ->
948			Protocol:close(owner_gone, ProtoState),
949			Transport:close(Socket),
950			owner_gone(Reason);
951		{system, From, Request} ->
952			sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
953				{ws_loop, State});
954		Any when is_tuple(Any), is_pid(element(2, Any)) ->
955			element(2, Any) ! {gun_error, self(), {notowner,
956				"Operations are restricted to the owner of the connection."}},
957			ws_loop(State);
958		Any ->
959			error_logger:error_msg("Unexpected message: ~w~n", [Any])
960	end.
961
962-spec owner_gone(_) -> no_return().
963owner_gone(normal) -> exit(normal);
964owner_gone(shutdown) -> exit(shutdown);
965owner_gone(Shutdown = {shutdown, _}) -> exit(Shutdown);
966owner_gone(Reason) -> error({owner_gone, Reason}).
967
968system_continue(_, _, {retry_loop, State, Retry}) ->
969	retry_loop(State, Retry);
970system_continue(_, _, {loop, State}) ->
971	loop(State);
972system_continue(_, _, {ws_loop, State}) ->
973	ws_loop(State).
974
975-spec system_terminate(any(), _, _, _) -> no_return().
976system_terminate(Reason, _, _, _) ->
977	exit(Reason).
978
979system_code_change(Misc, _, _, _) ->
980	{ok, Misc}.
981