1%% Copyright (c) 2013-2018, Loïc Hoguin <essen@ninenines.eu> 2%% 3%% Permission to use, copy, modify, and/or distribute this software for any 4%% purpose with or without fee is hereby granted, provided that the above 5%% copyright notice and this permission notice appear in all copies. 6%% 7%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 8%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 9%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 10%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 11%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 12%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 13%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 14 15-module(gun). 16 17-ifdef(OTP_RELEASE). 18-compile({nowarn_deprecated_function, [{erlang, get_stacktrace, 0}]}). 19-endif. 20 21%% Connection. 22-export([open/2]). 23-export([open/3]). 24-export([open_unix/2]). 25-export([info/1]). 26-export([close/1]). 27-export([shutdown/1]). 28 29%% Requests. 30-export([delete/2]). 31-export([delete/3]). 32-export([delete/4]). 33-export([get/2]). 34-export([get/3]). 35-export([get/4]). 36-export([head/2]). 37-export([head/3]). 38-export([head/4]). 39-export([options/2]). 40-export([options/3]). 41-export([options/4]). 42-export([patch/3]). 43-export([patch/4]). 44-export([patch/5]). 45-export([post/3]). 46-export([post/4]). 47-export([post/5]). 48-export([put/3]). 49-export([put/4]). 50-export([put/5]). 51-export([request/4]). 52-export([request/5]). 53-export([request/6]). 54 55%% Streaming data. 56-export([data/4]). 57 58%% Tunneling. 59-export([connect/2]). 60-export([connect/3]). 61-export([connect/4]). 62 63%% Awaiting gun messages. 64-export([await/2]). 65-export([await/3]). 66-export([await/4]). 67-export([await_body/2]). 68-export([await_body/3]). 69-export([await_body/4]). 70-export([await_up/1]). 71-export([await_up/2]). 72-export([await_up/3]). 73 74%% Flushing gun messages. 75-export([flush/1]). 76 77%% Cancelling a stream. 78-export([cancel/2]). 79 80%% Websocket. 81-export([ws_upgrade/2]). 82-export([ws_upgrade/3]). 83-export([ws_upgrade/4]). 84-export([ws_send/2]). 85 86%% Internals. 87-export([start_link/4]). 88-export([proc_lib_hack/5]). 89-export([system_continue/3]). 90-export([system_terminate/4]). 91-export([system_code_change/4]). 92 93-type headers() :: [{binary(), iodata()}]. 94 95-type ws_close_code() :: 1000..4999. 96-type ws_frame() :: close | ping | pong 97 | {text | binary | close | ping | pong, iodata()} 98 | {close, ws_close_code(), iodata()}. 99 100-type opts() :: #{ 101 connect_timeout => timeout(), 102 http_opts => http_opts(), 103 http2_opts => http2_opts(), 104 protocols => [http | http2], 105 retry => non_neg_integer(), 106 retry_timeout => pos_integer(), 107 trace => boolean(), 108 transport => tcp | tls | ssl, 109 transport_opts => [gen_tcp:connect_option()] | [ssl:connect_option()], 110 ws_opts => ws_opts() 111}. 112-export_type([opts/0]). 113%% @todo Add an option to disable/enable the notowner behavior. 114 115-type connect_destination() :: #{ 116 host := inet:hostname() | inet:ip_address(), 117 port := inet:port_number(), 118 username => iodata(), 119 password => iodata(), 120 protocol => http | http2, %% @todo Remove in Gun 2.0. 121 protocols => [http | http2], 122 transport => tcp | tls, 123 tls_opts => [ssl:connect_option()], 124 tls_handshake_timeout => timeout() 125}. 126-export_type([connect_destination/0]). 127 128-type intermediary() :: #{ 129 type := connect, 130 host := inet:hostname() | inet:ip_address(), 131 port := inet:port_number(), 132 transport := tcp | tls, 133 protocol := http | http2 134}. 135 136%% @todo When/if HTTP/2 CONNECT gets implemented, we will want an option here 137%% to indicate that the request must be sent on an existing CONNECT stream. 138%% This is of course not required for HTTP/1.1 since the CONNECT takes over 139%% the entire connection. 140-type req_opts() :: #{ 141 reply_to => pid() 142}. 143-export_type([req_opts/0]). 144 145-type http_opts() :: #{ 146 keepalive => timeout(), 147 transform_header_name => fun((binary()) -> binary()), 148 version => 'HTTP/1.1' | 'HTTP/1.0' 149}. 150-export_type([http_opts/0]). 151 152-type http2_opts() :: #{ 153 keepalive => timeout() 154}. 155-export_type([http2_opts/0]). 156 157%% @todo keepalive 158-type ws_opts() :: #{ 159 compress => boolean(), 160 protocols => [{binary(), module()}] 161}. 162-export_type([ws_opts/0]). 163 164-record(state, { 165 parent :: pid(), 166 owner :: pid(), 167 owner_ref :: reference(), 168 host :: inet:hostname() | inet:ip_address(), 169 port :: inet:port_number(), 170 origin_host :: inet:hostname() | inet:ip_address(), 171 origin_port :: inet:port_number(), 172 intermediaries = [] :: [intermediary()], 173 opts :: opts(), 174 keepalive_ref :: undefined | reference(), 175 socket :: undefined | inet:socket() | ssl:sslsocket(), 176 transport :: module(), 177 protocol :: module(), 178 protocol_state :: any(), 179 last_error :: any() 180}). 181 182%% Connection. 183 184-spec open(inet:hostname() | inet:ip_address(), inet:port_number()) 185 -> {ok, pid()} | {error, any()}. 186open(Host, Port) -> 187 open(Host, Port, #{}). 188 189-spec open(inet:hostname() | inet:ip_address(), inet:port_number(), opts()) 190 -> {ok, pid()} | {error, any()}. 191open(Host, Port, Opts) when is_list(Host); is_atom(Host); is_tuple(Host) -> 192 do_open(Host, Port, Opts). 193 194-spec open_unix(Path::string(), opts()) 195 -> {ok, pid()} | {error, any()}. 196open_unix(SocketPath, Opts) -> 197 do_open({local, SocketPath}, 0, Opts). 198 199do_open(Host, Port, Opts0) -> 200 %% We accept both ssl and tls but only use tls in the code. 201 Opts = case Opts0 of 202 #{transport := ssl} -> Opts0#{transport => tls}; 203 _ -> Opts0 204 end, 205 case check_options(maps:to_list(Opts)) of 206 ok -> 207 case supervisor:start_child(gun_sup, [self(), Host, Port, Opts]) of 208 OK = {ok, ServerPid} -> 209 consider_tracing(ServerPid, Opts), 210 OK; 211 StartError -> 212 StartError 213 end; 214 CheckError -> 215 CheckError 216 end. 217 218check_options([]) -> 219 ok; 220check_options([{connect_timeout, infinity}|Opts]) -> 221 check_options(Opts); 222check_options([{connect_timeout, T}|Opts]) when is_integer(T), T >= 0 -> 223 check_options(Opts); 224check_options([{http_opts, ProtoOpts}|Opts]) when is_map(ProtoOpts) -> 225 case gun_http:check_options(ProtoOpts) of 226 ok -> 227 check_options(Opts); 228 Error -> 229 Error 230 end; 231check_options([{http2_opts, ProtoOpts}|Opts]) when is_map(ProtoOpts) -> 232 case gun_http2:check_options(ProtoOpts) of 233 ok -> 234 check_options(Opts); 235 Error -> 236 Error 237 end; 238check_options([Opt = {protocols, L}|Opts]) when is_list(L) -> 239 Len = length(L), 240 case length(lists:usort(L)) of 241 Len when Len > 0 -> 242 Check = lists:usort([(P =:= http) orelse (P =:= http2) || P <- L]), 243 case Check of 244 [true] -> 245 check_options(Opts); 246 _ -> 247 {error, {options, Opt}} 248 end; 249 _ -> 250 {error, {options, Opt}} 251 end; 252check_options([{retry, R}|Opts]) when is_integer(R), R >= 0 -> 253 check_options(Opts); 254check_options([{retry_timeout, T}|Opts]) when is_integer(T), T >= 0 -> 255 check_options(Opts); 256check_options([{trace, B}|Opts]) when B =:= true; B =:= false -> 257 check_options(Opts); 258check_options([{transport, T}|Opts]) when T =:= tcp; T =:= tls -> 259 check_options(Opts); 260check_options([{transport_opts, L}|Opts]) when is_list(L) -> 261 check_options(Opts); 262check_options([{ws_opts, ProtoOpts}|Opts]) when is_map(ProtoOpts) -> 263 case gun_ws:check_options(ProtoOpts) of 264 ok -> 265 check_options(Opts); 266 Error -> 267 Error 268 end; 269check_options([Opt|_]) -> 270 {error, {options, Opt}}. 271 272consider_tracing(ServerPid, #{trace := true}) -> 273 dbg:start(), 274 dbg:tracer(), 275 dbg:tpl(gun, [{'_', [], [{return_trace}]}]), 276 dbg:tpl(gun_http, [{'_', [], [{return_trace}]}]), 277 dbg:tpl(gun_http2, [{'_', [], [{return_trace}]}]), 278 dbg:tpl(gun_ws, [{'_', [], [{return_trace}]}]), 279 dbg:p(ServerPid, all); 280consider_tracing(_, _) -> 281 ok. 282 283-spec info(pid()) -> map(). 284info(ServerPid) -> 285 {_, #state{ 286 socket=Socket, 287 transport=Transport, 288 protocol=Protocol, 289 origin_host=OriginHost, 290 origin_port=OriginPort, 291 intermediaries=Intermediaries 292 }} = sys:get_state(ServerPid), 293 {ok, {SockIP, SockPort}} = Transport:sockname(Socket), 294 #{ 295 socket => Socket, 296 transport => Transport:name(), 297 protocol => Protocol:name(), 298 sock_ip => SockIP, 299 sock_port => SockPort, 300 origin_host => OriginHost, 301 origin_port => OriginPort, 302 %% Intermediaries are listed in the order data goes through them. 303 intermediaries => lists:reverse(Intermediaries) 304 }. 305 306-spec close(pid()) -> ok. 307close(ServerPid) -> 308 supervisor:terminate_child(gun_sup, ServerPid). 309 310-spec shutdown(pid()) -> ok. 311shutdown(ServerPid) -> 312 _ = ServerPid ! {shutdown, self()}, 313 ok. 314 315%% Requests. 316 317-spec delete(pid(), iodata()) -> reference(). 318delete(ServerPid, Path) -> 319 request(ServerPid, <<"DELETE">>, Path, []). 320 321-spec delete(pid(), iodata(), headers()) -> reference(). 322delete(ServerPid, Path, Headers) -> 323 request(ServerPid, <<"DELETE">>, Path, Headers). 324 325-spec delete(pid(), iodata(), headers(), req_opts()) -> reference(). 326delete(ServerPid, Path, Headers, ReqOpts) -> 327 request(ServerPid, <<"DELETE">>, Path, Headers, <<>>, ReqOpts). 328 329-spec get(pid(), iodata()) -> reference(). 330get(ServerPid, Path) -> 331 request(ServerPid, <<"GET">>, Path, []). 332 333-spec get(pid(), iodata(), headers()) -> reference(). 334get(ServerPid, Path, Headers) -> 335 request(ServerPid, <<"GET">>, Path, Headers). 336 337-spec get(pid(), iodata(), headers(), req_opts()) -> reference(). 338get(ServerPid, Path, Headers, ReqOpts) -> 339 request(ServerPid, <<"GET">>, Path, Headers, <<>>, ReqOpts). 340 341-spec head(pid(), iodata()) -> reference(). 342head(ServerPid, Path) -> 343 request(ServerPid, <<"HEAD">>, Path, []). 344 345-spec head(pid(), iodata(), headers()) -> reference(). 346head(ServerPid, Path, Headers) -> 347 request(ServerPid, <<"HEAD">>, Path, Headers). 348 349-spec head(pid(), iodata(), headers(), req_opts()) -> reference(). 350head(ServerPid, Path, Headers, ReqOpts) -> 351 request(ServerPid, <<"HEAD">>, Path, Headers, <<>>, ReqOpts). 352 353-spec options(pid(), iodata()) -> reference(). 354options(ServerPid, Path) -> 355 request(ServerPid, <<"OPTIONS">>, Path, []). 356 357-spec options(pid(), iodata(), headers()) -> reference(). 358options(ServerPid, Path, Headers) -> 359 request(ServerPid, <<"OPTIONS">>, Path, Headers). 360 361-spec options(pid(), iodata(), headers(), req_opts()) -> reference(). 362options(ServerPid, Path, Headers, ReqOpts) -> 363 request(ServerPid, <<"OPTIONS">>, Path, Headers, <<>>, ReqOpts). 364 365-spec patch(pid(), iodata(), headers()) -> reference(). 366patch(ServerPid, Path, Headers) -> 367 request(ServerPid, <<"PATCH">>, Path, Headers). 368 369-spec patch(pid(), iodata(), headers(), iodata()) -> reference(). 370patch(ServerPid, Path, Headers, Body) -> 371 request(ServerPid, <<"PATCH">>, Path, Headers, Body). 372 373-spec patch(pid(), iodata(), headers(), iodata(), req_opts()) -> reference(). 374patch(ServerPid, Path, Headers, Body, ReqOpts) -> 375 request(ServerPid, <<"PATCH">>, Path, Headers, Body, ReqOpts). 376 377-spec post(pid(), iodata(), headers()) -> reference(). 378post(ServerPid, Path, Headers) -> 379 request(ServerPid, <<"POST">>, Path, Headers). 380 381-spec post(pid(), iodata(), headers(), iodata()) -> reference(). 382post(ServerPid, Path, Headers, Body) -> 383 request(ServerPid, <<"POST">>, Path, Headers, Body). 384 385-spec post(pid(), iodata(), headers(), iodata(), req_opts()) -> reference(). 386post(ServerPid, Path, Headers, Body, ReqOpts) -> 387 request(ServerPid, <<"POST">>, Path, Headers, Body, ReqOpts). 388 389-spec put(pid(), iodata(), headers()) -> reference(). 390put(ServerPid, Path, Headers) -> 391 request(ServerPid, <<"PUT">>, Path, Headers). 392 393-spec put(pid(), iodata(), headers(), iodata()) -> reference(). 394put(ServerPid, Path, Headers, Body) -> 395 request(ServerPid, <<"PUT">>, Path, Headers, Body). 396 397-spec put(pid(), iodata(), headers(), iodata(), req_opts()) -> reference(). 398put(ServerPid, Path, Headers, Body, ReqOpts) -> 399 request(ServerPid, <<"PUT">>, Path, Headers, Body, ReqOpts). 400 401-spec request(pid(), iodata(), iodata(), headers()) -> reference(). 402request(ServerPid, Method, Path, Headers) -> 403 request(ServerPid, Method, Path, Headers, <<>>, #{}). 404 405-spec request(pid(), iodata(), iodata(), headers(), iodata()) -> reference(). 406request(ServerPid, Method, Path, Headers, Body) -> 407 request(ServerPid, Method, Path, Headers, Body, #{}). 408 409%% @todo Accept header names as maps. 410-spec request(pid(), iodata(), iodata(), headers(), iodata(), req_opts()) -> reference(). 411request(ServerPid, Method, Path, Headers, Body, ReqOpts) -> 412 StreamRef = make_ref(), 413 ReplyTo = maps:get(reply_to, ReqOpts, self()), 414 _ = ServerPid ! {request, ReplyTo, StreamRef, Method, Path, Headers, Body}, 415 StreamRef. 416 417%% Streaming data. 418 419-spec data(pid(), reference(), fin | nofin, iodata()) -> ok. 420data(ServerPid, StreamRef, IsFin, Data) -> 421 _ = ServerPid ! {data, self(), StreamRef, IsFin, Data}, 422 ok. 423 424%% Tunneling. 425 426-spec connect(pid(), connect_destination()) -> reference(). 427connect(ServerPid, Destination) -> 428 connect(ServerPid, Destination, [], #{}). 429 430-spec connect(pid(), connect_destination(), headers()) -> reference(). 431connect(ServerPid, Destination, Headers) -> 432 connect(ServerPid, Destination, Headers, #{}). 433 434-spec connect(pid(), connect_destination(), headers(), req_opts()) -> reference(). 435connect(ServerPid, Destination, Headers, ReqOpts) -> 436 StreamRef = make_ref(), 437 ReplyTo = maps:get(reply_to, ReqOpts, self()), 438 _ = ServerPid ! {connect, ReplyTo, StreamRef, Destination, Headers}, 439 StreamRef. 440 441%% Awaiting gun messages. 442 443%% @todo spec await await_body 444 445await(ServerPid, StreamRef) -> 446 MRef = monitor(process, ServerPid), 447 Res = await(ServerPid, StreamRef, 5000, MRef), 448 demonitor(MRef, [flush]), 449 Res. 450 451await(ServerPid, StreamRef, MRef) when is_reference(MRef) -> 452 await(ServerPid, StreamRef, 5000, MRef); 453await(ServerPid, StreamRef, Timeout) -> 454 MRef = monitor(process, ServerPid), 455 Res = await(ServerPid, StreamRef, Timeout, MRef), 456 demonitor(MRef, [flush]), 457 Res. 458 459%% @todo Add gun_upgrade and gun_ws? 460await(ServerPid, StreamRef, Timeout, MRef) -> 461 receive 462 {gun_inform, ServerPid, StreamRef, Status, Headers} -> 463 {inform, Status, Headers}; 464 {gun_response, ServerPid, StreamRef, IsFin, Status, Headers} -> 465 {response, IsFin, Status, Headers}; 466 {gun_data, ServerPid, StreamRef, IsFin, Data} -> 467 {data, IsFin, Data}; 468 {gun_trailers, ServerPid, StreamRef, Trailers} -> 469 {trailers, Trailers}; 470 {gun_push, ServerPid, StreamRef, NewStreamRef, Method, URI, Headers} -> 471 {push, NewStreamRef, Method, URI, Headers}; 472 {gun_error, ServerPid, StreamRef, Reason} -> 473 {error, Reason}; 474 {gun_error, ServerPid, Reason} -> 475 {error, Reason}; 476 {'DOWN', MRef, process, ServerPid, Reason} -> 477 {error, Reason} 478 after Timeout -> 479 {error, timeout} 480 end. 481 482await_body(ServerPid, StreamRef) -> 483 MRef = monitor(process, ServerPid), 484 Res = await_body(ServerPid, StreamRef, 5000, MRef, <<>>), 485 demonitor(MRef, [flush]), 486 Res. 487 488await_body(ServerPid, StreamRef, MRef) when is_reference(MRef) -> 489 await_body(ServerPid, StreamRef, 5000, MRef, <<>>); 490await_body(ServerPid, StreamRef, Timeout) -> 491 MRef = monitor(process, ServerPid), 492 Res = await_body(ServerPid, StreamRef, Timeout, MRef, <<>>), 493 demonitor(MRef, [flush]), 494 Res. 495 496await_body(ServerPid, StreamRef, Timeout, MRef) -> 497 await_body(ServerPid, StreamRef, Timeout, MRef, <<>>). 498 499await_body(ServerPid, StreamRef, Timeout, MRef, Acc) -> 500 receive 501 {gun_data, ServerPid, StreamRef, nofin, Data} -> 502 await_body(ServerPid, StreamRef, Timeout, MRef, 503 << Acc/binary, Data/binary >>); 504 {gun_data, ServerPid, StreamRef, fin, Data} -> 505 {ok, << Acc/binary, Data/binary >>}; 506 %% It's OK to return trailers here because the client 507 %% specifically requested them. 508 {gun_trailers, ServerPid, StreamRef, Trailers} -> 509 {ok, Acc, Trailers}; 510 {gun_error, ServerPid, StreamRef, Reason} -> 511 {error, Reason}; 512 {gun_error, ServerPid, Reason} -> 513 {error, Reason}; 514 {'DOWN', MRef, process, ServerPid, Reason} -> 515 {error, Reason} 516 after Timeout -> 517 {error, timeout} 518 end. 519 520-spec await_up(pid()) -> {ok, http | http2} | {error, atom()}. 521await_up(ServerPid) -> 522 MRef = monitor(process, ServerPid), 523 Res = await_up(ServerPid, 5000, MRef), 524 demonitor(MRef, [flush]), 525 Res. 526 527-spec await_up(pid(), reference() | timeout()) -> {ok, http | http2} | {error, atom()}. 528await_up(ServerPid, MRef) when is_reference(MRef) -> 529 await_up(ServerPid, 5000, MRef); 530await_up(ServerPid, Timeout) -> 531 MRef = monitor(process, ServerPid), 532 Res = await_up(ServerPid, Timeout, MRef), 533 demonitor(MRef, [flush]), 534 Res. 535 536-spec await_up(pid(), timeout(), reference()) -> {ok, http | http2} | {error, atom()}. 537await_up(ServerPid, Timeout, MRef) -> 538 receive 539 {gun_up, ServerPid, Protocol} -> 540 {ok, Protocol}; 541 {'DOWN', MRef, process, ServerPid, Reason} -> 542 {error, Reason} 543 after Timeout -> 544 {error, timeout} 545 end. 546 547-spec flush(pid() | reference()) -> ok. 548flush(ServerPid) when is_pid(ServerPid) -> 549 flush_pid(ServerPid); 550flush(StreamRef) -> 551 flush_ref(StreamRef). 552 553flush_pid(ServerPid) -> 554 receive 555 {gun_up, ServerPid, _} -> 556 flush_pid(ServerPid); 557 {gun_down, ServerPid, _, _, _, _} -> 558 flush_pid(ServerPid); 559 {gun_inform, ServerPid, _, _, _} -> 560 flush_pid(ServerPid); 561 {gun_response, ServerPid, _, _, _, _} -> 562 flush_pid(ServerPid); 563 {gun_data, ServerPid, _, _, _} -> 564 flush_pid(ServerPid); 565 {gun_trailers, ServerPid, _, _} -> 566 flush_pid(ServerPid); 567 {gun_push, ServerPid, _, _, _, _, _, _} -> 568 flush_pid(ServerPid); 569 {gun_error, ServerPid, _, _} -> 570 flush_pid(ServerPid); 571 {gun_error, ServerPid, _} -> 572 flush_pid(ServerPid); 573 {gun_upgrade, ServerPid, _, _, _} -> 574 flush_pid(ServerPid); 575 {gun_ws, ServerPid, _, _} -> 576 flush_pid(ServerPid); 577 {'DOWN', _, process, ServerPid, _} -> 578 flush_pid(ServerPid) 579 after 0 -> 580 ok 581 end. 582 583flush_ref(StreamRef) -> 584 receive 585 {gun_inform, _, StreamRef, _, _} -> 586 flush_pid(StreamRef); 587 {gun_response, _, StreamRef, _, _, _} -> 588 flush_ref(StreamRef); 589 {gun_data, _, StreamRef, _, _} -> 590 flush_ref(StreamRef); 591 {gun_trailers, _, StreamRef, _} -> 592 flush_ref(StreamRef); 593 {gun_push, _, StreamRef, _, _, _, _, _} -> 594 flush_ref(StreamRef); 595 {gun_error, _, StreamRef, _} -> 596 flush_ref(StreamRef); 597 {gun_upgrade, _, StreamRef, _, _} -> 598 flush_ref(StreamRef); 599 {gun_ws, _, StreamRef, _} -> 600 flush_ref(StreamRef) 601 after 0 -> 602 ok 603 end. 604 605%% Cancelling a stream. 606 607-spec cancel(pid(), reference()) -> ok. 608cancel(ServerPid, StreamRef) -> 609 _ = ServerPid ! {cancel, self(), StreamRef}, 610 ok. 611 612%% @todo Allow upgrading an HTTP/1.1 connection to HTTP/2. 613%% http2_upgrade 614 615%% Websocket. 616 617-spec ws_upgrade(pid(), iodata()) -> reference(). 618ws_upgrade(ServerPid, Path) -> 619 ws_upgrade(ServerPid, Path, []). 620 621-spec ws_upgrade(pid(), iodata(), headers()) -> reference(). 622ws_upgrade(ServerPid, Path, Headers) -> 623 StreamRef = make_ref(), 624 _ = ServerPid ! {ws_upgrade, self(), StreamRef, Path, Headers}, 625 StreamRef. 626 627-spec ws_upgrade(pid(), iodata(), headers(), ws_opts()) -> reference(). 628ws_upgrade(ServerPid, Path, Headers, Opts) -> 629 ok = gun_ws:check_options(Opts), 630 StreamRef = make_ref(), 631 _ = ServerPid ! {ws_upgrade, self(), StreamRef, Path, Headers, Opts}, 632 StreamRef. 633 634%% @todo ws_send/2 will need to be deprecated in favor of a variant with StreamRef. 635%% But it can be kept for the time being since it can still work for HTTP/1.1. 636-spec ws_send(pid(), ws_frame() | [ws_frame()]) -> ok. 637ws_send(ServerPid, Frames) -> 638 _ = ServerPid ! {ws_send, self(), Frames}, 639 ok. 640 641%% Internals. 642 643start_link(Owner, Host, Port, Opts) -> 644 proc_lib:start_link(?MODULE, proc_lib_hack, 645 [self(), Owner, Host, Port, Opts]). 646 647proc_lib_hack(Parent, Owner, Host, Port, Opts) -> 648 try 649 init(Parent, Owner, Host, Port, Opts) 650 catch 651 _:normal -> exit(normal); 652 _:shutdown -> exit(shutdown); 653 _:Reason = {shutdown, _} -> exit(Reason); 654 _:Reason -> exit({Reason, erlang:get_stacktrace()}) 655 end. 656 657init(Parent, Owner, Host, Port, Opts) -> 658 ok = proc_lib:init_ack(Parent, {ok, self()}), 659 Retry = maps:get(retry, Opts, 5), 660 Transport = case maps:get(transport, Opts, default_transport(Port)) of 661 tcp -> gun_tcp; 662 tls -> gun_tls 663 end, 664 OwnerRef = monitor(process, Owner), 665 transport_connect(#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, 666 host=Host, port=Port, origin_host=Host, origin_port=Port, 667 opts=Opts, transport=Transport}, Retry). 668 669default_transport(443) -> tls; 670default_transport(_) -> tcp. 671 672transport_connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport=gun_tls}, Retries) -> 673 TransportOpts = [binary, {active, false}|ensure_alpn( 674 maps:get(protocols, Opts, [http2, http]), 675 maps:get(transport_opts, Opts, []))], 676 case Transport:connect(Host, Port, TransportOpts, maps:get(connect_timeout, Opts, infinity)) of 677 {ok, Socket} -> 678 {Protocol, ProtoOptsKey} = case ssl:negotiated_protocol(Socket) of 679 {ok, <<"h2">>} -> {gun_http2, http2_opts}; 680 _ -> {gun_http, http_opts} 681 end, 682 up(State, Socket, Protocol, ProtoOptsKey); 683 {error, Reason} -> 684 retry(State#state{last_error=Reason}, Retries) 685 end; 686transport_connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport}, Retries) -> 687 TransportOpts = [binary, {active, false} 688 |maps:get(transport_opts, Opts, [])], 689 case Transport:connect(Host, Port, TransportOpts, maps:get(connect_timeout, Opts, infinity)) of 690 {ok, Socket} -> 691 {Protocol, ProtoOptsKey} = case maps:get(protocols, Opts, [http]) of 692 [http] -> {gun_http, http_opts}; 693 [http2] -> {gun_http2, http2_opts} 694 end, 695 up(State, Socket, Protocol, ProtoOptsKey); 696 {error, Reason} -> 697 retry(State#state{last_error=Reason}, Retries) 698 end. 699 700ensure_alpn(Protocols0, TransportOpts) -> 701 Protocols = [case P of 702 http -> <<"http/1.1">>; 703 http2 -> <<"h2">> 704 end || P <- Protocols0], 705 [ 706 {alpn_advertised_protocols, Protocols}, 707 {client_preferred_next_protocols, {client, Protocols, <<"http/1.1">>}} 708 |TransportOpts]. 709 710up(State=#state{owner=Owner, opts=Opts, transport=Transport}, Socket, Protocol, ProtoOptsKey) -> 711 ProtoOpts = maps:get(ProtoOptsKey, Opts, #{}), 712 ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts), 713 Owner ! {gun_up, self(), Protocol:name()}, 714 before_loop(State#state{socket=Socket, protocol=Protocol, protocol_state=ProtoState}). 715 716down(State=#state{owner=Owner, opts=Opts, protocol=Protocol, protocol_state=ProtoState}, Reason) -> 717 {KilledStreams, UnprocessedStreams} = Protocol:down(ProtoState), 718 Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams, UnprocessedStreams}, 719 retry(State#state{socket=undefined, protocol=undefined, protocol_state=undefined, 720 last_error=Reason}, maps:get(retry, Opts, 5)). 721 722retry(#state{last_error=Reason}, 0) -> 723 exit({shutdown, Reason}); 724retry(State=#state{keepalive_ref=KeepaliveRef}, Retries) when is_reference(KeepaliveRef) -> 725 _ = erlang:cancel_timer(KeepaliveRef), 726 %% Flush if we have a keepalive message 727 receive 728 keepalive -> ok 729 after 0 -> 730 ok 731 end, 732 retry_loop(State#state{keepalive_ref=undefined}, Retries - 1); 733retry(State, Retries) -> 734 retry_loop(State, Retries - 1). 735 736retry_loop(State=#state{parent=Parent, opts=Opts}, Retries) -> 737 _ = erlang:send_after(maps:get(retry_timeout, Opts, 5000), self(), retry), 738 receive 739 retry -> 740 transport_connect(State, Retries); 741 {system, From, Request} -> 742 sys:handle_system_msg(Request, From, Parent, ?MODULE, [], 743 {retry_loop, State, Retries}) 744 end. 745 746before_loop(State=#state{opts=Opts, protocol=Protocol}) -> 747 %% @todo Might not be worth checking every time? 748 ProtoOptsKey = case Protocol of 749 gun_http -> http_opts; 750 gun_http2 -> http2_opts 751 end, 752 ProtoOpts = maps:get(ProtoOptsKey, Opts, #{}), 753 Keepalive = maps:get(keepalive, ProtoOpts, 5000), 754 KeepaliveRef = case Keepalive of 755 infinity -> undefined; 756 _ -> erlang:send_after(Keepalive, self(), keepalive) 757 end, 758 loop(State#state{keepalive_ref=KeepaliveRef}). 759 760loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, 761 origin_host=Host, origin_port=Port, opts=Opts, socket=Socket, 762 transport=Transport, protocol=Protocol, protocol_state=ProtoState}) -> 763 {OK, Closed, Error} = Transport:messages(), 764 Transport:setopts(Socket, [{active, once}]), 765 receive 766 {OK, Socket, Data} -> 767 case Protocol:handle(Data, ProtoState) of 768 Commands when is_list(Commands) -> 769 commands(Commands, State); 770 Command -> 771 commands([Command], State) 772 end; 773 {Closed, Socket} -> 774 Protocol:close(ProtoState), 775 Transport:close(Socket), 776 down(State, closed); 777 {Error, Socket, Reason} -> 778 Protocol:close(ProtoState), 779 Transport:close(Socket), 780 down(State, {error, Reason}); 781 {OK, _PreviousSocket, _Data} -> 782 loop(State); 783 {Closed, _PreviousSocket} -> 784 loop(State); 785 {Error, _PreviousSocket, _} -> 786 loop(State); 787 keepalive -> 788 ProtoState2 = Protocol:keepalive(ProtoState), 789 before_loop(State#state{protocol_state=ProtoState2}); 790 {request, ReplyTo, StreamRef, Method, Path, Headers, <<>>} -> 791 ProtoState2 = Protocol:request(ProtoState, 792 StreamRef, ReplyTo, Method, Host, Port, Path, Headers), 793 loop(State#state{protocol_state=ProtoState2}); 794 {request, ReplyTo, StreamRef, Method, Path, Headers, Body} -> 795 ProtoState2 = Protocol:request(ProtoState, 796 StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body), 797 loop(State#state{protocol_state=ProtoState2}); 798 %% @todo Do we want to reject ReplyTo if it's not the process 799 %% who initiated the connection? For both data and cancel. 800 {data, ReplyTo, StreamRef, IsFin, Data} -> 801 ProtoState2 = Protocol:data(ProtoState, 802 StreamRef, ReplyTo, IsFin, Data), 803 loop(State#state{protocol_state=ProtoState2}); 804 {connect, ReplyTo, StreamRef, Destination0, Headers} -> 805 %% The protocol option has been deprecated in favor of the protocols option. 806 %% Nobody probably ended up using it, but let's not break the interface. 807 Destination1 = case Destination0 of 808 #{protocols := _} -> 809 Destination0; 810 #{protocol := DestProto} -> 811 Destination0#{protocols => [DestProto]}; 812 _ -> 813 Destination0 814 end, 815 Destination = case Destination1 of 816 #{transport := tls} -> 817 Destination1#{tls_opts => ensure_alpn( 818 maps:get(protocols, Destination1, [http]), 819 maps:get(tls_opts, Destination1, []))}; 820 _ -> 821 Destination1 822 end, 823 ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo, Destination, Headers), 824 loop(State#state{protocol_state=ProtoState2}); 825 {cancel, ReplyTo, StreamRef} -> 826 ProtoState2 = Protocol:cancel(ProtoState, StreamRef, ReplyTo), 827 loop(State#state{protocol_state=ProtoState2}); 828 %% @todo Maybe make an interface in the protocol module instead of checking on protocol name. 829 %% An interface would also make sure that HTTP/1.0 can't upgrade. 830 {ws_upgrade, Owner, StreamRef, Path, Headers} when Protocol =:= gun_http -> 831 WsOpts = maps:get(ws_opts, Opts, #{}), 832 ProtoState2 = Protocol:ws_upgrade(ProtoState, StreamRef, Host, Port, Path, Headers, WsOpts), 833 loop(State#state{protocol_state=ProtoState2}); 834 {ws_upgrade, Owner, StreamRef, Path, Headers, WsOpts} when Protocol =:= gun_http -> 835 ProtoState2 = Protocol:ws_upgrade(ProtoState, StreamRef, Host, Port, Path, Headers, WsOpts), 836 loop(State#state{protocol_state=ProtoState2}); 837 %% @todo can fail if http/1.0 838 {shutdown, Owner} -> 839 %% @todo Protocol:shutdown? 840 ok; 841 {'DOWN', OwnerRef, process, Owner, Reason} -> 842 Protocol:close(ProtoState), 843 Transport:close(Socket), 844 owner_gone(Reason); 845 {system, From, Request} -> 846 sys:handle_system_msg(Request, From, Parent, ?MODULE, [], 847 {loop, State}); 848 {ws_upgrade, _, StreamRef, _, _} -> 849 Owner ! {gun_error, self(), StreamRef, {badstate, 850 "Websocket is only supported over HTTP/1.1."}}, 851 loop(State); 852 {ws_upgrade, _, StreamRef, _, _, _} -> 853 Owner ! {gun_error, self(), StreamRef, {badstate, 854 "Websocket is only supported over HTTP/1.1."}}, 855 loop(State); 856 {ws_send, _, _} -> 857 Owner ! {gun_error, self(), {badstate, 858 "Connection needs to be upgraded to Websocket " 859 "before the gun:ws_send/1 function can be used."}}, 860 loop(State); 861 %% @todo The ReplyTo patch disabled the notowner behavior. 862 %% We need to add an option to enforce this behavior if needed. 863 Any when is_tuple(Any), is_pid(element(2, Any)) -> 864 element(2, Any) ! {gun_error, self(), {notowner, 865 "Operations are restricted to the owner of the connection."}}, 866 loop(State); 867 Any -> 868 error_logger:error_msg("Unexpected message: ~w~n", [Any]), 869 loop(State) 870 end. 871 872commands([], State) -> 873 loop(State); 874commands([close|_], State=#state{socket=Socket, transport=Transport}) -> 875 Transport:close(Socket), 876 down(State, normal); 877commands([Error={error, _}|_], State=#state{socket=Socket, transport=Transport}) -> 878 Transport:close(Socket), 879 down(State, Error); 880commands([{state, ProtoState}|Tail], State) -> 881 commands(Tail, State#state{protocol_state=ProtoState}); 882%% @todo The scheme should probably not be ignored. 883%% 884%% Order is important: the origin must be changed before 885%% the transport and/or protocol in order to keep track 886%% of the intermediaries properly. 887commands([{origin, _Scheme, Host, Port, Type}|Tail], 888 State=#state{transport=Transport, protocol=Protocol, 889 origin_host=IntermediateHost, origin_port=IntermediatePort, 890 intermediaries=Intermediaries}) -> 891 Info = #{ 892 type => Type, 893 host => IntermediateHost, 894 port => IntermediatePort, 895 transport => Transport:name(), 896 protocol => Protocol:name() 897 }, 898 commands(Tail, State#state{origin_host=Host, origin_port=Port, 899 intermediaries=[Info|Intermediaries]}); 900commands([{switch_transport, Transport, Socket}|Tail], State) -> 901 commands(Tail, State#state{socket=Socket, transport=Transport}); 902%% @todo The two loops should be reunified and this clause generalized. 903commands([{switch_protocol, Protocol=gun_ws, ProtoState}], State) -> 904 ws_loop(State#state{protocol=Protocol, protocol_state=ProtoState}); 905%% @todo And this state should probably not be ignored. 906commands([{switch_protocol, Protocol, _ProtoState0}|Tail], 907 State=#state{owner=Owner, opts=Opts, socket=Socket, transport=Transport}) -> 908 ProtoOpts = maps:get(http2_opts, Opts, #{}), 909 ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts), 910 commands(Tail, State#state{protocol=Protocol, protocol_state=ProtoState}). 911 912ws_loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, socket=Socket, 913 transport=Transport, protocol=Protocol, protocol_state=ProtoState}) -> 914 {OK, Closed, Error} = Transport:messages(), 915 Transport:setopts(Socket, [{active, once}]), 916 receive 917 {OK, Socket, Data} -> 918 case Protocol:handle(Data, ProtoState) of 919 close -> 920 Transport:close(Socket), 921 down(State, normal); 922 ProtoState2 -> 923 ws_loop(State#state{protocol_state=ProtoState2}) 924 end; 925 {Closed, Socket} -> 926 Transport:close(Socket), 927 down(State, closed); 928 {Error, Socket, Reason} -> 929 Transport:close(Socket), 930 down(State, {error, Reason}); 931 %% Ignore any previous HTTP keep-alive. 932 keepalive -> 933 ws_loop(State); 934% {ws_send, Owner, Frames} when is_list(Frames) -> 935% todo; %% @todo 936 {ws_send, Owner, Frame} -> 937 case Protocol:send(Frame, ProtoState) of 938 close -> 939 Transport:close(Socket), 940 down(State, normal); 941 ProtoState2 -> 942 ws_loop(State#state{protocol_state=ProtoState2}) 943 end; 944 {shutdown, Owner} -> 945 %% @todo Protocol:shutdown? %% @todo close frame 946 ok; 947 {'DOWN', OwnerRef, process, Owner, Reason} -> 948 Protocol:close(owner_gone, ProtoState), 949 Transport:close(Socket), 950 owner_gone(Reason); 951 {system, From, Request} -> 952 sys:handle_system_msg(Request, From, Parent, ?MODULE, [], 953 {ws_loop, State}); 954 Any when is_tuple(Any), is_pid(element(2, Any)) -> 955 element(2, Any) ! {gun_error, self(), {notowner, 956 "Operations are restricted to the owner of the connection."}}, 957 ws_loop(State); 958 Any -> 959 error_logger:error_msg("Unexpected message: ~w~n", [Any]) 960 end. 961 962-spec owner_gone(_) -> no_return(). 963owner_gone(normal) -> exit(normal); 964owner_gone(shutdown) -> exit(shutdown); 965owner_gone(Shutdown = {shutdown, _}) -> exit(Shutdown); 966owner_gone(Reason) -> error({owner_gone, Reason}). 967 968system_continue(_, _, {retry_loop, State, Retry}) -> 969 retry_loop(State, Retry); 970system_continue(_, _, {loop, State}) -> 971 loop(State); 972system_continue(_, _, {ws_loop, State}) -> 973 ws_loop(State). 974 975-spec system_terminate(any(), _, _, _) -> no_return(). 976system_terminate(Reason, _, _, _) -> 977 exit(Reason). 978 979system_code_change(Misc, _, _, _) -> 980 {ok, Misc}. 981