1%% 2%% %CopyrightBegin% 3%% 4%% Copyright Ericsson AB 2002-2018. All Rights Reserved. 5%% 6%% Licensed under the Apache License, Version 2.0 (the "License"); 7%% you may not use this file except in compliance with the License. 8%% You may obtain a copy of the License at 9%% 10%% http://www.apache.org/licenses/LICENSE-2.0 11%% 12%% Unless required by applicable law or agreed to in writing, software 13%% distributed under the License is distributed on an "AS IS" BASIS, 14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15%% See the License for the specific language governing permissions and 16%% limitations under the License. 17%% 18%% %CopyrightEnd% 19%% 20%% 21 22-module(httpc_handler). 23 24-behaviour(gen_server). 25 26-include_lib("inets/src/http_lib/http_internal.hrl"). 27-include("httpc_internal.hrl"). 28 29-define(IS_STREAMED(Code), ((Code =:= 200) orelse (Code =:= 206))). 30 31%%-------------------------------------------------------------------- 32%% Internal Application API 33-export([ 34 start_link/4, 35 send/2, 36 cancel/2, 37 stream_next/1, 38 info/1 39 ]). 40 41%% gen_server callbacks 42-export([init/1, handle_call/3, handle_cast/2, handle_info/2, 43 terminate/2, code_change/3]). 44 45-record(timers, 46 { 47 request_timers = [] :: [reference()], 48 queue_timer :: reference() | 'undefined' 49 }). 50 51-record(state, 52 { 53 request :: request() | 'undefined', 54 session :: session() | 'undefined', 55 status_line, % {Version, StatusCode, ReasonPharse} 56 headers :: http_response_h() | 'undefined', 57 body :: binary() | 'undefined', 58 mfa, % {Module, Function, Args} 59 pipeline = queue:new() :: queue:queue(), 60 keep_alive = queue:new() :: queue:queue(), 61 status :: undefined | new | pipeline | keep_alive | close | {ssl_tunnel, request()}, 62 canceled = [], % [RequestId] 63 max_header_size = nolimit :: nolimit | integer(), 64 max_body_size = nolimit :: nolimit | integer(), 65 options :: options(), 66 timers = #timers{} :: #timers{}, 67 profile_name :: atom(), % id of httpc_manager process. 68 once = inactive :: 'inactive' | 'once' 69 }). 70 71 72%%==================================================================== 73%% External functions 74%%==================================================================== 75%%-------------------------------------------------------------------- 76%% Function: start_link(Request, Options, ProfileName) -> {ok, Pid} 77%% 78%% Request = #request{} 79%% Options = #options{} 80%% ProfileName = atom() - id of httpc manager process 81%% 82%% Description: Starts a http-request handler process. Intended to be 83%% called by the httpc profile supervisor or the http manager process 84%% if the client is started stand alone form inets. 85%% 86%% Note: Uses proc_lib and gen_server:enter_loop so that waiting 87%% for gen_tcp:connect to timeout in init/1 will not 88%% block the httpc manager process in odd cases such as trying to call 89%% a server that does not exist. (See OTP-6735) The only API function 90%% sending messages to the handler process that can be called before 91%% init has completed is cancel and that is not a problem! (Send and 92%% stream will not be called before the first request has been sent and 93%% the reply or part of it has arrived.) 94%%-------------------------------------------------------------------- 95%%-------------------------------------------------------------------- 96 97start_link(Parent, Request, Options, ProfileName) -> 98 {ok, proc_lib:start_link(?MODULE, init, [[Parent, Request, Options, 99 ProfileName]])}. 100 101%%-------------------------------------------------------------------- 102%% Function: send(Request, Pid) -> ok 103%% Request = #request{} 104%% Pid = pid() - the pid of the http-request handler process. 105%% 106%% Description: Uses this handlers session to send a request. Intended 107%% to be called by the httpc manager process. 108%%-------------------------------------------------------------------- 109send(Request, Pid) -> 110 call(Request, Pid). 111 112 113%%-------------------------------------------------------------------- 114%% Function: cancel(RequestId, Pid) -> ok 115%% RequestId = reference() 116%% Pid = pid() - the pid of the http-request handler process. 117%% 118%% Description: Cancels a request. Intended to be called by the httpc 119%% manager process. 120%%-------------------------------------------------------------------- 121cancel(RequestId, Pid) -> 122 cast({cancel, RequestId}, Pid). 123 124 125%%-------------------------------------------------------------------- 126%% Function: stream_next(Pid) -> ok 127%% Pid = pid() - the pid of the http-request handler process. 128%% 129%% Description: Works as inets:setopts(active, once) but for 130%% body chunks sent to the user. 131%%-------------------------------------------------------------------- 132stream_next(Pid) -> 133 cast(stream_next, Pid). 134 135 136%%-------------------------------------------------------------------- 137%% Function: info(Pid) -> [{Key, Val}] 138%% Pid = pid() - the pid of the http-request handler process. 139%% 140%% Description: 141%% Returns various information related to this handler 142%% Used for debugging and testing 143%%-------------------------------------------------------------------- 144info(Pid) -> 145 try 146 call(info, Pid) 147 catch 148 _:_ -> 149 [] 150 end. 151 152%%-------------------------------------------------------------------- 153%% Function: stream(BodyPart, Request, Code) -> _ 154%% BodyPart = binary() 155%% Request = #request{} 156%% Code = integer() 157%% 158%% Description: Stream the HTTP body to the caller process (client) 159%% or to a file. Note that the data that has been stream 160%% does not have to be saved. (We do not want to use up 161%% memory in vain.) 162%%-------------------------------------------------------------------- 163%% Request should not be streamed 164stream(BodyPart, #request{stream = none} = Request, _) -> 165 {false, BodyPart, Request}; 166 167%% Stream to caller 168stream(BodyPart, #request{stream = Self} = Request, Code) 169 when ?IS_STREAMED(Code) andalso 170 ((Self =:= self) orelse (Self =:= {self, once})) -> 171 httpc_response:send(Request#request.from, 172 {Request#request.id, stream, BodyPart}), 173 {true, <<>>, Request}; 174 175%% Stream to file 176%% This has been moved to start_stream/3 177%% We keep this for backward compatibillity... 178stream(BodyPart, #request{stream = Filename} = Request, Code) 179 when ?IS_STREAMED(Code) andalso is_list(Filename) -> 180 case file:open(Filename, [write, raw, append, delayed_write]) of 181 {ok, Fd} -> 182 stream(BodyPart, Request#request{stream = Fd}, 200); 183 {error, Reason} -> 184 exit({stream_to_file_failed, Reason}) 185 end; 186 187%% Stream to file 188stream(BodyPart, #request{stream = Fd} = Request, Code) 189 when ?IS_STREAMED(Code) -> 190 case file:write(Fd, BodyPart) of 191 ok -> 192 {true, <<>>, Request}; 193 {error, Reason} -> 194 exit({stream_to_file_failed, Reason}) 195 end; 196 197stream(BodyPart, Request,_) -> % only 200 and 206 responses can be streamed 198 {false, BodyPart, Request}. 199 200 201%%==================================================================== 202%% Server functions 203%%==================================================================== 204 205%%-------------------------------------------------------------------- 206%% Function: init([Options, ProfileName]) -> {ok, State} | 207%% {ok, State, Timeout} | ignore | {stop, Reason} 208%% 209%% Options = #options{} 210%% ProfileName = atom() - id of httpc manager process 211%% 212%% Description: Initiates the httpc_handler process 213%% 214%% Note: The init function may not fail, that will kill the 215%% httpc_manager process. We could make the httpc_manager more comlex 216%% but we do not want that so errors will be handled by the process 217%% sending an init_error message to itself. 218%%-------------------------------------------------------------------- 219init([Parent, Request, Options, ProfileName]) -> 220 process_flag(trap_exit, true), 221 222 %% Do not let initial tcp-connection block the manager-process 223 proc_lib:init_ack(Parent, self()), 224 handle_verbose(Options#options.verbose), 225 ProxyOptions = handle_proxy_options(Request#request.scheme, Options), 226 Address = handle_proxy(Request#request.address, ProxyOptions), 227 {ok, State} = 228 %% #state.once should initially be 'inactive' because we 229 %% activate the socket at first regardless of the state. 230 case {Address /= Request#request.address, Request#request.scheme} of 231 {true, https} -> 232 connect_and_send_upgrade_request(Address, Request, 233 #state{options = Options, 234 profile_name = ProfileName}); 235 {_, _} -> 236 connect_and_send_first_request(Address, Request, 237 #state{options = Options, 238 profile_name = ProfileName}) 239 end, 240 gen_server:enter_loop(?MODULE, [], State). 241 242%%-------------------------------------------------------------------- 243%% Function: handle_call(Request, From, State) -> {reply, Reply, State} | 244%% {reply, Reply, State, Timeout} | 245%% {noreply, State} | 246%% {noreply, State, Timeout} | 247%% {stop, Reason, Reply, State} | (terminate/2 is called) 248%% {stop, Reason, State} (terminate/2 is called) 249%% Description: Handling call messages 250%%-------------------------------------------------------------------- 251handle_call(Request, From, State) -> 252 try do_handle_call(Request, From, State) of 253 Result -> 254 Result 255 catch 256 Class:Reason:ST -> 257 {stop, {shutdown, {{Class, Reason}, ST}}, State} 258 end. 259 260 261%%-------------------------------------------------------------------- 262%% Function: handle_cast(Msg, State) -> {noreply, State} | 263%% {noreply, State, Timeout} | 264%% {stop, Reason, State} (terminate/2 is called) 265%% Description: Handling cast messages 266%%-------------------------------------------------------------------- 267handle_cast(Msg, State) -> 268 try do_handle_cast(Msg, State) of 269 Result -> 270 Result 271 catch 272 Class:Reason:ST -> 273 {stop, {shutdown, {{Class, Reason}, ST}}, State} 274 end. 275 276%%-------------------------------------------------------------------- 277%% Function: handle_info(Info, State) -> {noreply, State} | 278%% {noreply, State, Timeout} | 279%% {stop, Reason, State} (terminate/2 is called) 280%% Description: Handling all non call/cast messages 281%%-------------------------------------------------------------------- 282handle_info(Info, State) -> 283 try do_handle_info(Info, State) of 284 Result -> 285 Result 286 catch 287 Class:Reason:ST -> 288 {stop, {shutdown, {{Class, Reason}, ST}}, State} 289 end. 290 291%%-------------------------------------------------------------------- 292%% Function: terminate(Reason, State) -> _ (ignored by gen_server) 293%% Description: Shutdown the httpc_handler 294%%-------------------------------------------------------------------- 295 296terminate(normal, #state{session = undefined}) -> 297 ok; 298 299%% Init error sending, no session information has been setup but 300%% there is a socket that needs closing. 301terminate(normal, 302 #state{session = #session{id = undefined} = Session}) -> 303 close_socket(Session); 304 305%% Socket closed remotely 306terminate(normal, 307 #state{session = #session{socket = {remote_close, Socket}, 308 socket_type = SocketType, 309 id = Id}, 310 profile_name = ProfileName, 311 request = Request, 312 timers = Timers, 313 pipeline = Pipeline, 314 keep_alive = KeepAlive} = State) -> 315 %% Clobber session 316 (catch httpc_manager:delete_session(Id, ProfileName)), 317 318 maybe_retry_queue(Pipeline, State), 319 maybe_retry_queue(KeepAlive, State), 320 321 %% Cancel timers 322 cancel_timers(Timers), 323 324 %% Maybe deliver answers to requests 325 deliver_answer(Request), 326 327 %% And, just in case, close our side (**really** overkill) 328 http_transport:close(SocketType, Socket); 329 330terminate(_Reason, #state{session = #session{id = Id, 331 socket = Socket, 332 socket_type = SocketType}, 333 request = undefined, 334 profile_name = ProfileName, 335 timers = Timers, 336 pipeline = Pipeline, 337 keep_alive = KeepAlive} = State) -> 338 339 %% Clobber session 340 (catch httpc_manager:delete_session(Id, ProfileName)), 341 342 maybe_retry_queue(Pipeline, State), 343 maybe_retry_queue(KeepAlive, State), 344 345 cancel_timer(Timers#timers.queue_timer, timeout_queue), 346 http_transport:close(SocketType, Socket); 347 348terminate(_Reason, #state{request = undefined}) -> 349 ok; 350 351terminate(Reason, #state{request = Request} = State) -> 352 NewState = maybe_send_answer(Request, 353 httpc_response:error(Request, Reason), 354 State), 355 terminate(Reason, NewState#state{request = undefined}). 356 357%%-------------------------------------------------------------------- 358%% Func: code_change(_OldVsn, State, Extra) -> {ok, NewState} 359%% Purpose: Convert process state when code is changed 360%%-------------------------------------------------------------------- 361 362code_change(_, State, _) -> 363 {ok, State}. 364 365%%%-------------------------------------------------------------------- 366%%% Internal functions 367%%%-------------------------------------------------------------------- 368do_handle_call(#request{address = Addr} = Request, _, 369 #state{status = Status, 370 session = #session{type = pipeline} = Session, 371 timers = Timers, 372 options = #options{proxy = Proxy} = _Options, 373 profile_name = ProfileName} = State0) 374 when Status =/= undefined -> 375 Address = handle_proxy(Addr, Proxy), 376 case httpc_request:send(Address, Session, Request) of 377 ok -> 378 379 ?hcrd("request sent", []), 380 381 %% Activate the request time out for the new request 382 State1 = 383 activate_request_timeout(State0#state{request = Request}), 384 385 ClientClose = 386 httpc_request:is_client_closing(Request#request.headers), 387 388 case State0#state.request of 389 #request{} = OldRequest -> %% Old request not yet finished 390 %% Make sure to use the new value of timers in state 391 NewTimers = State1#state.timers, 392 NewPipeline = queue:in(Request, State1#state.pipeline), 393 NewSession = 394 Session#session{queue_length = 395 %% Queue + current 396 queue:len(NewPipeline) + 1, 397 client_close = ClientClose}, 398 insert_session(NewSession, ProfileName), 399 {reply, ok, State1#state{ 400 request = OldRequest, 401 pipeline = NewPipeline, 402 session = NewSession, 403 timers = NewTimers}}; 404 undefined -> 405 %% Note: tcp-message receiving has already been 406 %% activated by handle_pipeline/2. 407 cancel_timer(Timers#timers.queue_timer, 408 timeout_queue), 409 NewSession = 410 Session#session{queue_length = 1, 411 client_close = ClientClose}, 412 httpc_manager:insert_session(NewSession, ProfileName), 413 NewTimers = Timers#timers{queue_timer = undefined}, 414 State = init_wait_for_response_state(Request, State1#state{session = NewSession, 415 timers = NewTimers}), 416 {reply, ok, State} 417 end; 418 {error, Reason} -> 419 NewPipeline = queue:in(Request, State0#state.pipeline), 420 {stop, {shutdown, {pipeline_failed, Reason}}, State0#state{pipeline = NewPipeline}} 421 end; 422 423do_handle_call(#request{address = Addr} = Request, _, 424 #state{status = Status, 425 session = #session{type = keep_alive} = Session, 426 timers = Timers, 427 options = #options{proxy = Proxy} = _Options, 428 profile_name = ProfileName} = State0) 429 when Status =/= undefined -> 430 431 ClientClose = httpc_request:is_client_closing(Request#request.headers), 432 433 case State0#state.request of 434 #request{} -> %% Old request not yet finished 435 %% Make sure to use the new value of timers in state 436 NewKeepAlive = queue:in(Request, State0#state.keep_alive), 437 NewSession = 438 Session#session{queue_length = 439 %% Queue + current 440 queue:len(NewKeepAlive) + 1, 441 client_close = ClientClose}, 442 insert_session(NewSession, ProfileName), 443 {reply, ok, State0#state{keep_alive = NewKeepAlive, 444 session = NewSession}}; 445 undefined -> 446 %% Note: tcp-message receiving has already been 447 %% activated by handle_pipeline/2. 448 cancel_timer(Timers#timers.queue_timer, 449 timeout_queue), 450 NewTimers = Timers#timers{queue_timer = undefined}, 451 State1 = State0#state{timers = NewTimers}, 452 Address = handle_proxy(Addr, Proxy), 453 case httpc_request:send(Address, Session, Request) of 454 ok -> 455 %% Activate the request time out for the new request 456 State2 = 457 activate_request_timeout(State1#state{request = Request}), 458 NewSession = 459 Session#session{queue_length = 1, 460 client_close = ClientClose}, 461 insert_session(NewSession, ProfileName), 462 State = init_wait_for_response_state(Request, State2#state{session = NewSession}), 463 {reply, ok, State}; 464 {error, Reason} -> 465 {stop, {shutdown, {keepalive_failed, Reason}}, State1} 466 end 467 end; 468do_handle_call(info, _, State) -> 469 Info = handler_info(State), 470 {reply, Info, State}. 471 472%% When the request in process has been canceled the handler process is 473%% stopped and the pipelined requests will be reissued or remaining 474%% requests will be sent on a new connection. This is is 475%% based on the assumption that it is probably cheaper to reissue the 476%% requests than to wait for a potentiall large response that we then 477%% only throw away. This of course is not always true maybe we could 478%% do something smarter here?! If the request canceled is not 479%% the one handled right now the same effect will take place in 480%% handle_pipeline/2 when the canceled request is on turn, 481%% handle_keep_alive_queue/2 on the other hand will just skip the 482%% request as if it was never issued as in this case the request will 483%% not have been sent. 484do_handle_cast({cancel, RequestId}, 485 #state{request = #request{id = RequestId} = Request, 486 canceled = Canceled} = State) -> 487 {stop, normal, 488 State#state{canceled = [RequestId | Canceled], 489 request = Request#request{from = answer_sent}}}; 490do_handle_cast({cancel, RequestId}, 491 #state{request = #request{}, 492 canceled = Canceled} = State) -> 493 {noreply, State#state{canceled = [RequestId | Canceled]}}; 494do_handle_cast({cancel, _}, 495 #state{request = undefined} = State) -> 496 {noreply, State}; 497 498do_handle_cast(stream_next, #state{session = Session} = State) -> 499 activate_once(Session), 500 %% Inactivate the #state.once here because we don't want 501 %% next_body_chunk/1 to activate the socket twice. 502 {noreply, State#state{once = inactive}}. 503 504do_handle_info({Proto, _Socket, Data}, 505 #state{mfa = {Module, Function, Args}, 506 request = #request{method = Method} = Request, 507 session = Session, 508 status_line = StatusLine} = State) 509 when (Proto =:= tcp) orelse 510 (Proto =:= ssl) orelse 511 (Proto =:= httpc_handler) -> 512 513 try Module:Function([Data | Args]) of 514 {ok, Result} -> 515 handle_http_msg(Result, State); 516 {_, whole_body, _} when Method =:= head -> 517 handle_response(State#state{body = <<>>}); 518 {Module, whole_body, [Body, Length]} -> 519 {_, Code, _} = StatusLine, 520 {Streamed, NewBody, NewRequest} = stream(Body, Request, Code), 521 %% When we stream we will not keep the already 522 %% streamed data, that would be a waste of memory. 523 NewLength = 524 case Streamed of 525 false -> 526 Length; 527 true -> 528 Length - size(Body) 529 end, 530 531 NewState = next_body_chunk(State, Code), 532 NewMFA = {Module, whole_body, [NewBody, NewLength]}, 533 {noreply, NewState#state{mfa = NewMFA, 534 request = NewRequest}}; 535 {Module, decode_size, 536 [TotalChunk, HexList, AccHeaderSize, 537 {MaxBodySize, BodySoFar, AccLength, MaxHeaderSize}]} 538 when BodySoFar =/= <<>> -> 539 %% The response body is chunk-encoded. Steal decoded 540 %% chunks as much as possible to stream. 541 {_, Code, _} = StatusLine, 542 {_, NewBody, NewRequest} = stream(BodySoFar, Request, Code), 543 NewState = next_body_chunk(State, Code), 544 NewMFA = {Module, decode_size, 545 [TotalChunk, HexList, AccHeaderSize, 546 {MaxBodySize, NewBody, AccLength, MaxHeaderSize}]}, 547 {noreply, NewState#state{mfa = NewMFA, 548 request = NewRequest}}; 549 {Module, decode_data, 550 [ChunkSize, TotalChunk, 551 {MaxBodySize, BodySoFar, AccLength, MaxHeaderSize}]} 552 when TotalChunk =/= <<>> orelse BodySoFar =/= <<>> -> 553 %% The response body is chunk-encoded. Steal decoded 554 %% chunks as much as possible to stream. 555 ChunkSizeToSteal = min(ChunkSize, byte_size(TotalChunk)), 556 <<StolenChunk:ChunkSizeToSteal/binary, NewTotalChunk/binary>> = TotalChunk, 557 StolenBody = <<BodySoFar/binary, StolenChunk/binary>>, 558 NewChunkSize = ChunkSize - ChunkSizeToSteal, 559 {_, Code, _} = StatusLine, 560 561 {_, NewBody, NewRequest} = stream(StolenBody, Request, Code), 562 NewState = next_body_chunk(State, Code), 563 NewMFA = {Module, decode_data, 564 [NewChunkSize, NewTotalChunk, 565 {MaxBodySize, NewBody, AccLength, MaxHeaderSize}]}, 566 {noreply, NewState#state{mfa = NewMFA, 567 request = NewRequest}}; 568 NewMFA -> 569 activate_once(Session), 570 {noreply, State#state{mfa = NewMFA}} 571 catch 572 Class:Reason:ST -> 573 ClientReason = {could_not_parse_as_http, Data}, 574 ClientErrMsg = httpc_response:error(Request, ClientReason), 575 NewState = answer_request(Request, ClientErrMsg, State), 576 {stop, {shutdown, {{Class, Reason}, ST}}, NewState} 577 end; 578 579do_handle_info({Proto, Socket, Data}, 580 #state{mfa = MFA, 581 request = Request, 582 session = Session, 583 status = Status, 584 status_line = StatusLine, 585 profile_name = Profile} = State) 586 when (Proto =:= tcp) orelse 587 (Proto =:= ssl) orelse 588 (Proto =:= httpc_handler) -> 589 590 error_logger:warning_msg("Received unexpected ~p data on ~p" 591 "~n Data: ~p" 592 "~n MFA: ~p" 593 "~n Request: ~p" 594 "~n Session: ~p" 595 "~n Status: ~p" 596 "~n StatusLine: ~p" 597 "~n Profile: ~p" 598 "~n", 599 [Proto, Socket, Data, MFA, 600 Request, Session, Status, StatusLine, Profile]), 601 602 {noreply, State}; 603 604%% The Server may close the connection to indicate that the 605%% whole body is now sent instead of sending an length 606%% indicator. 607do_handle_info({tcp_closed, _}, State = #state{mfa = {_, whole_body, Args}}) -> 608 handle_response(State#state{body = hd(Args)}); 609do_handle_info({ssl_closed, _}, State = #state{mfa = {_, whole_body, Args}}) -> 610 handle_response(State#state{body = hd(Args)}); 611 612%%% Server closes idle pipeline 613do_handle_info({tcp_closed, _}, State = #state{request = undefined}) -> 614 {stop, normal, State}; 615do_handle_info({ssl_closed, _}, State = #state{request = undefined}) -> 616 {stop, normal, State}; 617 618%%% Error cases 619do_handle_info({tcp_closed, _}, #state{session = Session0} = State) -> 620 Socket = Session0#session.socket, 621 Session = Session0#session{socket = {remote_close, Socket}}, 622 %% {stop, session_remotly_closed, State}; 623 {stop, normal, State#state{session = Session}}; 624do_handle_info({ssl_closed, _}, #state{session = Session0} = State) -> 625 Socket = Session0#session.socket, 626 Session = Session0#session{socket = {remote_close, Socket}}, 627 %% {stop, session_remotly_closed, State}; 628 {stop, normal, State#state{session = Session}}; 629do_handle_info({tcp_error, _, _} = Reason, State) -> 630 {stop, Reason, State}; 631do_handle_info({ssl_error, _, _} = Reason, State) -> 632 {stop, Reason, State}; 633 634%% Timeouts 635%% Internally, to a request handling process, a request timeout is 636%% seen as a canceled request. 637do_handle_info({timeout, RequestId}, 638 #state{request = #request{id = RequestId} = Request, 639 canceled = Canceled, 640 profile_name = ProfileName} = State) -> 641 httpc_response:send(Request#request.from, 642 httpc_response:error(Request, timeout)), 643 httpc_manager:request_done(RequestId, ProfileName), 644 {stop, normal, 645 State#state{request = Request#request{from = answer_sent}, 646 canceled = [RequestId | Canceled]}}; 647 648do_handle_info({timeout, RequestId}, 649 #state{canceled = Canceled, 650 profile_name = ProfileName} = State) -> 651 Filter = 652 fun(#request{id = Id, from = From} = Request) when Id =:= RequestId -> 653 %% Notify the owner 654 httpc_response:send(From, 655 httpc_response:error(Request, timeout)), 656 httpc_manager:request_done(RequestId, ProfileName), 657 [Request#request{from = answer_sent}]; 658 (_) -> 659 true 660 end, 661 case State#state.status of 662 pipeline -> 663 Pipeline = queue:filter(Filter, State#state.pipeline), 664 {noreply, State#state{canceled = [RequestId | Canceled], 665 pipeline = Pipeline}}; 666 keep_alive -> 667 KeepAlive = queue:filter(Filter, State#state.keep_alive), 668 {noreply, State#state{canceled = [RequestId | Canceled], 669 keep_alive = KeepAlive}} 670 end; 671 672do_handle_info(timeout_queue, State = #state{request = undefined}) -> 673 {stop, normal, State}; 674 675%% Timing was such as the queue_timeout was not canceled! 676do_handle_info(timeout_queue, #state{timers = Timers} = State) -> 677 {noreply, State#state{timers = 678 Timers#timers{queue_timer = undefined}}}; 679 680%% Setting up the connection to the server somehow failed. 681do_handle_info({init_error, Reason, ClientErrMsg}, 682 State = #state{request = Request}) -> 683 NewState = answer_request(Request, ClientErrMsg, State), 684 {stop, {shutdown, Reason}, NewState}; 685 686%%% httpc_manager process dies. 687do_handle_info({'EXIT', _, _}, State = #state{request = undefined}) -> 688 {stop, normal, State}; 689%%Try to finish the current request anyway, 690%% there is a fairly high probability that it can be done successfully. 691%% Then close the connection, hopefully a new manager is started that 692%% can retry requests in the pipeline. 693do_handle_info({'EXIT', _, _}, State) -> 694 {noreply, State#state{status = close}}. 695 696call(Msg, Pid) -> 697 try gen_server:call(Pid, Msg, infinity) 698 catch 699 exit:{noproc, _} -> 700 {error, closed}; 701 exit:{normal, _} -> 702 {error, closed}; 703 exit:{{shutdown, _},_} -> 704 {error, closed} 705 end. 706 707cast(Msg, Pid) -> 708 gen_server:cast(Pid, Msg). 709 710maybe_retry_queue(Q, State) -> 711 case queue:is_empty(Q) of 712 false -> 713 retry_pipeline(queue:to_list(Q), State); 714 true -> 715 ok 716 end. 717 718maybe_send_answer(#request{from = answer_sent}, _Reason, State) -> 719 State; 720maybe_send_answer(Request, Answer, State) -> 721 answer_request(Request, Answer, State). 722 723deliver_answer(#request{from = From} = Request) 724 when From =/= answer_sent -> 725 Response = httpc_response:error(Request, socket_closed_remotely), 726 httpc_response:send(From, Response); 727deliver_answer(_Request) -> 728 ok. 729 730%%%-------------------------------------------------------------------- 731%%% Internal functions 732%%%-------------------------------------------------------------------- 733 734connect(SocketType, ToAddress, 735 #options{ipfamily = IpFamily, 736 ip = FromAddress, 737 port = FromPort, 738 unix_socket = UnixSocket, 739 socket_opts = Opts0}, Timeout) -> 740 Opts1 = 741 case FromPort of 742 default -> 743 Opts0; 744 _ -> 745 [{port, FromPort} | Opts0] 746 end, 747 Opts2 = 748 case FromAddress of 749 default -> 750 Opts1; 751 _ -> 752 [{ip, FromAddress} | Opts1] 753 end, 754 case IpFamily of 755 inet6fb4 -> 756 Opts3 = [inet6 | Opts2], 757 case http_transport:connect(SocketType, 758 ToAddress, Opts3, Timeout) of 759 {error, Reason6} -> 760 Opts4 = [inet | Opts2], 761 case http_transport:connect(SocketType, 762 ToAddress, Opts4, Timeout) of 763 {error, Reason4} -> 764 {error, {failed_connect, 765 [{to_address, ToAddress}, 766 {inet6, Opts3, Reason6}, 767 {inet, Opts4, Reason4}]}}; 768 OK -> 769 OK 770 end; 771 OK -> 772 OK 773 end; 774 local -> 775 Opts3 = [IpFamily | Opts2], 776 SocketAddr = {local, UnixSocket}, 777 case http_transport:connect(SocketType, {SocketAddr, 0}, Opts3, Timeout) of 778 {error, Reason} -> 779 {error, {failed_connect, [{to_address, SocketAddr}, 780 {IpFamily, Opts3, Reason}]}}; 781 Else -> 782 Else 783 end; 784 _ -> 785 Opts3 = [IpFamily | Opts2], 786 case http_transport:connect(SocketType, ToAddress, Opts3, Timeout) of 787 {error, Reason} -> 788 {error, {failed_connect, [{to_address, ToAddress}, 789 {IpFamily, Opts3, Reason}]}}; 790 Else -> 791 Else 792 end 793 end. 794 795handle_unix_socket_options(#request{unix_socket = UnixSocket}, Options) 796 when UnixSocket =:= undefined -> 797 Options; 798 799handle_unix_socket_options(#request{unix_socket = UnixSocket}, 800 Options = #options{ipfamily = IpFamily}) -> 801 case IpFamily of 802 local -> 803 Options#options{unix_socket = UnixSocket}; 804 Else -> 805 error({badarg, [{ipfamily, Else}, {unix_socket, UnixSocket}]}) 806 end. 807 808connect_and_send_first_request(Address, Request, #state{options = Options0} = State) -> 809 SocketType = socket_type(Request), 810 ConnTimeout = (Request#request.settings)#http_options.connect_timeout, 811 Options = handle_unix_socket_options(Request, Options0), 812 case connect(SocketType, format_address(Address), Options, ConnTimeout) of 813 {ok, Socket} -> 814 ClientClose = 815 httpc_request:is_client_closing( 816 Request#request.headers), 817 SessionType = httpc_manager:session_type(Options), 818 SocketType = socket_type(Request), 819 Session = #session{id = {Request#request.address, self()}, 820 scheme = Request#request.scheme, 821 socket = Socket, 822 socket_type = SocketType, 823 client_close = ClientClose, 824 type = SessionType}, 825 case httpc_request:send(Address, Session, Request) of 826 ok -> 827 TmpState = State#state{request = Request, 828 session = Session, 829 mfa = init_mfa(Request, State), 830 status_line = init_status_line(Request), 831 headers = undefined, 832 body = undefined, 833 status = new}, 834 http_transport:setopts(SocketType, 835 Socket, [{active, once}]), 836 NewState = activate_request_timeout(TmpState), 837 {ok, NewState}; 838 {error, Reason} -> 839 self() ! {init_error, error_sending, 840 httpc_response:error(Request, Reason)}, 841 {ok, State#state{request = Request, 842 session = Session}} 843 end; 844 {error, Reason} -> 845 self() ! {init_error, error_connecting, 846 httpc_response:error(Request, Reason)}, 847 {ok, State#state{request = Request}} 848 end. 849 850connect_and_send_upgrade_request(Address, Request, #state{options = Options0} = State) -> 851 ConnTimeout = (Request#request.settings)#http_options.connect_timeout, 852 SocketType = ip_comm, 853 Options = handle_unix_socket_options(Request, Options0), 854 case connect(SocketType, Address, Options, ConnTimeout) of 855 {ok, Socket} -> 856 SessionType = httpc_manager:session_type(Options), 857 Session = #session{socket = Socket, 858 socket_type = SocketType, 859 id = {Request#request.address, self()}, 860 scheme = http, 861 client_close = false, 862 type = SessionType}, 863 ErrorHandler = 864 fun(ERequest, EState, EReason) -> 865 self() ! {init_error, error_sending, 866 httpc_response:error(ERequest, EReason)}, 867 {ok, EState#state{request = ERequest}} end, 868 tls_tunnel(Address, Request, State#state{session = Session}, ErrorHandler); 869 {error, Reason} -> 870 self() ! {init_error, error_connecting, 871 httpc_response:error(Request, Reason)}, 872 {ok, State#state{request = Request}} 873 end. 874 875handler_info(#state{request = Request, 876 session = Session, 877 status_line = _StatusLine, 878 pipeline = Pipeline, 879 keep_alive = KeepAlive, 880 status = Status, 881 canceled = _Canceled, 882 options = _Options, 883 timers = _Timers} = _State) -> 884 885 %% Info about the current request 886 RequestInfo = 887 case Request of 888 undefined -> 889 []; 890 #request{id = Id, 891 started = ReqStarted} -> 892 [{id, Id}, {started, ReqStarted}] 893 end, 894 895 %% Info about the current session/socket 896 SessionType = Session#session.type, 897 QueueLen = case SessionType of 898 pipeline -> 899 queue:len(Pipeline); 900 keep_alive -> 901 queue:len(KeepAlive) 902 end, 903 Scheme = Session#session.scheme, 904 Socket = Session#session.socket, 905 SocketType = Session#session.socket_type, 906 907 SocketOpts = http_transport:getopts(SocketType, Socket), 908 SocketStats = http_transport:getstat(SocketType, Socket), 909 910 Remote = http_transport:peername(SocketType, Socket), 911 Local = http_transport:sockname(SocketType, Socket), 912 913 SocketInfo = [{remote, Remote}, 914 {local, Local}, 915 {socket_opts, SocketOpts}, 916 {socket_stats, SocketStats}], 917 918 SessionInfo = 919 [{type, SessionType}, 920 {queue_length, QueueLen}, 921 {scheme, Scheme}, 922 {socket_info, SocketInfo}], 923 924 [{status, Status}, 925 {current_request, RequestInfo}, 926 {session, SessionInfo}]. 927 928 929 930handle_http_msg({Version, StatusCode, ReasonPharse, Headers, Body}, 931 State = #state{request = Request}) -> 932 case Headers#http_response_h.'content-type' of 933 "multipart/byteranges" ++ _Param -> 934 exit({not_yet_implemented, multypart_byteranges}); 935 _ -> 936 StatusLine = {Version, StatusCode, ReasonPharse}, 937 {ok, NewRequest} = start_stream(StatusLine, Headers, Request), 938 handle_http_body(Body, 939 State#state{request = NewRequest, 940 status_line = StatusLine, 941 headers = Headers}) 942 end; 943handle_http_msg({ChunkedHeaders, Body}, 944 #state{status_line = {_, Code, _}, headers = Headers} = State) -> 945 NewHeaders = http_chunk:handle_headers(Headers, ChunkedHeaders), 946 {_, NewBody, NewRequest} = stream(Body, State#state.request, Code), 947 handle_response(State#state{headers = NewHeaders, 948 body = NewBody, 949 request = NewRequest}); 950handle_http_msg(Body, #state{status_line = {_,Code, _}} = State) -> 951 {_, NewBody, NewRequest} = stream(Body, State#state.request, Code), 952 handle_response(State#state{body = NewBody, request = NewRequest}). 953 954handle_http_body(_, #state{status = {ssl_tunnel, _}, 955 status_line = {_,200, _}} = State) -> 956 tls_upgrade(State); 957 958handle_http_body(_, #state{status = {ssl_tunnel, Request}, 959 status_line = StatusLine} = State) -> 960 ClientErrMsg = httpc_response:error(Request,{could_no_establish_ssh_tunnel, StatusLine}), 961 NewState = answer_request(Request, ClientErrMsg, State), 962 {stop, normal, NewState}; 963 964%% All 1xx (informational), 204 (no content), and 304 (not modified) 965%% responses MUST NOT include a message-body, and thus are always 966%% terminated by the first empty line after the header fields. 967%% This implies that chunked encoding MUST NOT be used for these 968%% status codes. 969handle_http_body(<<>>, #state{headers = Headers, 970 status_line = {_,StatusCode, _}} = State) 971 when Headers#http_response_h.'transfer-encoding' =/= "chunked" andalso 972 (StatusCode =:= 204 orelse %% No Content 973 StatusCode =:= 304 orelse %% Not Modified 974 100 =< StatusCode andalso StatusCode =< 199) -> %% Informational 975 handle_response(State#state{body = <<>>}); 976 977 978handle_http_body(<<>>, #state{headers = Headers, 979 request = #request{method = head}} = State) 980 when Headers#http_response_h.'transfer-encoding' =/= "chunked" -> 981 handle_response(State#state{body = <<>>}); 982 983handle_http_body(Body, #state{headers = Headers, 984 max_body_size = MaxBodySize, 985 status_line = {_,Code, _}, 986 request = Request} = State) -> 987 TransferEnc = Headers#http_response_h.'transfer-encoding', 988 case case_insensitive_header(TransferEnc) of 989 "chunked" -> 990 try http_chunk:decode(Body, State#state.max_body_size, 991 State#state.max_header_size) of 992 {Module, Function, Args} -> 993 NewState = next_body_chunk(State, Code), 994 {noreply, NewState#state{mfa = 995 {Module, Function, Args}}}; 996 {ok, {ChunkedHeaders, NewBody}} -> 997 NewHeaders = http_chunk:handle_headers(Headers, 998 ChunkedHeaders), 999 case Body of 1000 <<>> -> 1001 handle_response(State#state{headers = NewHeaders, 1002 body = NewBody}); 1003 _ -> 1004 {_, NewBody2, _} = 1005 stream(NewBody, Request, Code), 1006 handle_response(State#state{headers = NewHeaders, 1007 body = NewBody2}) 1008 end 1009 catch throw:{error, Reason} -> 1010 NewState = 1011 answer_request(Request, 1012 httpc_response:error(Request, 1013 Reason), 1014 State), 1015 {stop, normal, NewState} 1016 end; 1017 Enc when Enc =:= "identity"; Enc =:= undefined -> 1018 Length = 1019 list_to_integer(Headers#http_response_h.'content-length'), 1020 case ((Length =< MaxBodySize) orelse (MaxBodySize =:= nolimit)) of 1021 true -> 1022 case httpc_response:whole_body(Body, Length) of 1023 {ok, Body} -> 1024 {_, NewBody, NewRequest} = 1025 stream(Body, Request, Code), 1026 handle_response(State#state{body = NewBody, 1027 request = NewRequest}); 1028 MFA -> 1029 NewState = next_body_chunk(State, Code), 1030 {noreply, NewState#state{mfa = MFA}} 1031 end; 1032 false -> 1033 NewState = 1034 answer_request(Request, 1035 httpc_response:error(Request, 1036 body_too_big), 1037 State), 1038 {stop, normal, NewState} 1039 end; 1040 Encoding when is_list(Encoding) -> 1041 NewState = answer_request(Request, 1042 httpc_response:error(Request, 1043 unknown_encoding), 1044 State), 1045 {stop, normal, NewState} 1046 end. 1047 1048handle_response(#state{status = new} = State) -> 1049 ?hcrd("handle response - status = new", []), 1050 handle_response(try_to_enable_pipeline_or_keep_alive(State)); 1051 1052handle_response(#state{status = Status0} = State0) when Status0 =/= new -> 1053 State = handle_server_closing(State0), 1054 #state{request = Request, 1055 session = Session, 1056 status_line = StatusLine, 1057 headers = Headers, 1058 body = Body, 1059 options = Options, 1060 profile_name = ProfileName} = State, 1061 handle_cookies(Headers, Request, Options, ProfileName), 1062 case httpc_response:result({StatusLine, Headers, Body}, Request) of 1063 %% 100-continue 1064 continue -> 1065 %% Send request body 1066 {_, RequestBody} = Request#request.content, 1067 send_raw(Session, RequestBody), 1068 %% Wait for next response 1069 activate_once(Session), 1070 Relaxed = (Request#request.settings)#http_options.relaxed, 1071 MFA = {httpc_response, parse, 1072 [State#state.max_header_size, Relaxed]}, 1073 {noreply, State#state{mfa = MFA, 1074 status_line = undefined, 1075 headers = undefined, 1076 body = undefined}}; 1077 1078 %% Ignore unexpected 100-continue response and receive the 1079 %% actual response that the server will send right away. 1080 {ignore, Data} -> 1081 Relaxed = (Request#request.settings)#http_options.relaxed, 1082 MFA = {httpc_response, parse, 1083 [State#state.max_header_size, Relaxed]}, 1084 NewState = State#state{mfa = MFA, 1085 status_line = undefined, 1086 headers = undefined, 1087 body = undefined}, 1088 handle_info({httpc_handler, dummy, Data}, NewState); 1089 1090 %% On a redirect or retry the current request becomes 1091 %% obsolete and the manager will create a new request 1092 %% with the same id as the current. 1093 {redirect, NewRequest, Data} -> 1094 ok = httpc_manager:redirect_request(NewRequest, ProfileName), 1095 handle_queue(State#state{request = undefined}, Data); 1096 {retry, TimeNewRequest, Data} -> 1097 ok = httpc_manager:retry_request(TimeNewRequest, ProfileName), 1098 handle_queue(State#state{request = undefined}, Data); 1099 {ok, Msg, Data} -> 1100 stream_remaining_body(Body, Request, StatusLine), 1101 end_stream(StatusLine, Request), 1102 NewState = maybe_send_answer(Request, Msg, State), 1103 handle_queue(NewState, Data); 1104 {stop, Msg} -> 1105 end_stream(StatusLine, Request), 1106 NewState = maybe_send_answer(Request, Msg, State), 1107 {stop, normal, NewState} 1108 end. 1109 1110handle_cookies(_,_, #options{cookies = disabled}, _) -> 1111 ok; 1112%% User wants to verify the cookies before they are stored, 1113%% so the user will have to call a store command. 1114handle_cookies(_,_, #options{cookies = verify}, _) -> 1115 ok; 1116handle_cookies(Headers, Request, #options{cookies = enabled}, ProfileName) -> 1117 {Host, _ } = Request#request.address, 1118 Cookies = httpc_cookie:cookies(Headers#http_response_h.other, 1119 Request#request.path, Host), 1120 httpc_manager:store_cookies(Cookies, Request#request.address, 1121 ProfileName). 1122 1123%% This request could not be pipelined or used as sequential keep alive 1124%% queue 1125handle_queue(#state{status = close} = State, _) -> 1126 {stop, normal, State}; 1127 1128handle_queue(#state{status = keep_alive} = State, Data) -> 1129 handle_keep_alive_queue(State, Data); 1130 1131handle_queue(#state{status = pipeline} = State, Data) -> 1132 handle_pipeline(State, Data). 1133 1134handle_pipeline(#state{status = pipeline, 1135 session = Session, 1136 profile_name = ProfileName, 1137 options = #options{pipeline_timeout = TimeOut}} = State, 1138 Data) -> 1139 case queue:out(State#state.pipeline) of 1140 {empty, _} -> 1141 handle_empty_queue(Session, ProfileName, TimeOut, State); 1142 {{value, NextRequest}, Pipeline} -> 1143 case lists:member(NextRequest#request.id, 1144 State#state.canceled) of 1145 true -> 1146 %% See comment for handle_cast({cancel, RequestId}) 1147 {stop, normal, 1148 State#state{request = 1149 NextRequest#request{from = answer_sent}, 1150 pipeline = Pipeline}}; 1151 false -> 1152 NewSession = 1153 Session#session{queue_length = 1154 %% Queue + current 1155 queue:len(Pipeline) + 1}, 1156 receive_response(NextRequest, 1157 NewSession, Data, 1158 State#state{pipeline = Pipeline}) 1159 end 1160 end. 1161 1162handle_keep_alive_queue(#state{status = keep_alive, 1163 session = Session, 1164 profile_name = ProfileName, 1165 options = #options{keep_alive_timeout = TimeOut, 1166 proxy = Proxy}} = State, 1167 Data) -> 1168 case queue:out(State#state.keep_alive) of 1169 {empty, _} -> 1170 handle_empty_queue(Session, ProfileName, TimeOut, State); 1171 {{value, NextRequest}, KeepAlive} -> 1172 case lists:member(NextRequest#request.id, 1173 State#state.canceled) of 1174 true -> 1175 handle_keep_alive_queue( 1176 State#state{keep_alive = KeepAlive}, Data); 1177 false -> 1178 #request{address = Addr} = NextRequest, 1179 Address = handle_proxy(Addr, Proxy), 1180 case httpc_request:send(Address, Session, NextRequest) of 1181 ok -> 1182 receive_response(NextRequest, 1183 Session, <<>>, 1184 State#state{keep_alive = KeepAlive}); 1185 {error, Reason} -> 1186 {stop, {shutdown, {keepalive_failed, Reason}}, State} 1187 end 1188 end 1189 end. 1190handle_empty_queue(Session, ProfileName, TimeOut, State) -> 1191 %% The server may choose too terminate an idle pipline| keep_alive session 1192 %% in this case we want to receive the close message 1193 %% at once and not when trying to send the next 1194 %% request. 1195 activate_once(Session), 1196 %% If a pipline | keep_alive session has been idle for some time is not 1197 %% closed by the server, the client may want to close it. 1198 NewState = activate_queue_timeout(TimeOut, State), 1199 case update_session(ProfileName, Session, #session.queue_length, 0) of 1200 {stop, Reason} -> 1201 {stop, {shutdown, Reason}, State}; 1202 _ -> 1203 %% Note mfa will be initialized when a new request 1204 %% arrives. 1205 {noreply, 1206 NewState#state{request = undefined, 1207 mfa = undefined, 1208 status_line = undefined, 1209 headers = undefined, 1210 body = undefined 1211 }} 1212 end. 1213 1214receive_response(Request, Session, Data, State) -> 1215 NewState = init_wait_for_response_state(Request, State), 1216 gather_data(Data, Session, NewState). 1217 1218init_wait_for_response_state(Request, State) -> 1219 Relaxed = 1220 (Request#request.settings)#http_options.relaxed, 1221 MFA = {httpc_response, parse, 1222 [State#state.max_header_size, Relaxed]}, 1223 State#state{request = Request, 1224 mfa = MFA, 1225 status_line = undefined, 1226 headers = undefined, 1227 body = undefined}. 1228gather_data(<<>>, Session, State) -> 1229 activate_once(Session), 1230 {noreply, State}; 1231gather_data(Data, _, State) -> 1232 %% If we already received some bytes of 1233 %% the next response 1234 handle_info({httpc_handler, dummy, Data}, State). 1235 1236case_insensitive_header(Str) when is_list(Str) -> 1237 http_util:to_lower(Str); 1238%% Might be undefined if server does not send such a header 1239case_insensitive_header(Str) -> 1240 Str. 1241 1242activate_once(#session{socket = Socket, socket_type = SocketType}) -> 1243 http_transport:setopts(SocketType, Socket, [{active, once}]). 1244 1245close_socket(#session{socket = {remote_close,_}}) -> 1246 ok; 1247close_socket(#session{socket = Socket, socket_type = SocketType}) -> 1248 http_transport:close(SocketType, Socket). 1249 1250activate_request_timeout( 1251 #state{request = #request{timer = OldRef} = Request} = State) -> 1252 Timeout = (Request#request.settings)#http_options.timeout, 1253 case Timeout of 1254 infinity -> 1255 State; 1256 _ -> 1257 ReqId = Request#request.id, 1258 Msg = {timeout, ReqId}, 1259 case OldRef of 1260 undefined -> 1261 ok; 1262 _ -> 1263 %% Timer is already running! This is the case for a redirect or retry 1264 %% We need to restart the timer because the handler pid has changed 1265 cancel_timer(OldRef, Msg) 1266 end, 1267 Ref = erlang:send_after(Timeout, self(), Msg), 1268 Request2 = Request#request{timer = Ref}, 1269 ReqTimers = [{Request#request.id, Ref} | 1270 (State#state.timers)#timers.request_timers], 1271 Timers = #timers{request_timers = ReqTimers}, 1272 State#state{request = Request2, timers = Timers} 1273 end. 1274 1275activate_queue_timeout(infinity, State) -> 1276 State; 1277activate_queue_timeout(Time, State) -> 1278 Ref = erlang:send_after(Time, self(), timeout_queue), 1279 State#state{timers = #timers{queue_timer = Ref}}. 1280 1281 1282is_pipeline_enabled_client(#session{type = pipeline}) -> 1283 true; 1284is_pipeline_enabled_client(_) -> 1285 false. 1286 1287is_keep_alive_enabled_server("HTTP/1." ++ N, _) when (hd(N) >= $1) -> 1288 true; 1289is_keep_alive_enabled_server("HTTP/1.0", 1290 #http_response_h{connection = "keep-alive"}) -> 1291 true; 1292is_keep_alive_enabled_server(_,_) -> 1293 false. 1294 1295is_keep_alive_connection(Headers, #session{client_close = ClientClose}) -> 1296 (not ((ClientClose) orelse httpc_response:is_server_closing(Headers))). 1297 1298try_to_enable_pipeline_or_keep_alive( 1299 #state{session = Session, 1300 request = #request{method = Method}, 1301 status_line = {Version, _, _}, 1302 headers = Headers, 1303 profile_name = ProfileName} = State) -> 1304 case is_keep_alive_enabled_server(Version, Headers) andalso 1305 is_keep_alive_connection(Headers, Session) of 1306 true -> 1307 case (is_pipeline_enabled_client(Session) andalso 1308 httpc_request:is_idempotent(Method)) of 1309 true -> 1310 insert_session(Session, ProfileName), 1311 State#state{status = pipeline}; 1312 false -> 1313 insert_session(Session, ProfileName), 1314 %% Make sure type is keep_alive in session 1315 %% as it in this case might be pipeline 1316 NewSession = Session#session{type = keep_alive}, 1317 State#state{status = keep_alive, 1318 session = NewSession} 1319 end; 1320 false -> 1321 State#state{status = close} 1322 end. 1323 1324handle_server_closing(State = #state{status = close}) -> State; 1325handle_server_closing(State = #state{headers = undefined}) -> State; 1326handle_server_closing(State = #state{headers = Headers}) -> 1327 case httpc_response:is_server_closing(Headers) of 1328 true -> State#state{status = close}; 1329 false -> State 1330 end. 1331 1332answer_request(#request{id = RequestId, from = From} = Request, Msg, 1333 #state{session = Session, 1334 timers = Timers, 1335 profile_name = ProfileName} = State) -> 1336 httpc_response:send(From, Msg), 1337 RequestTimers = Timers#timers.request_timers, 1338 TimerRef = 1339 proplists:get_value(RequestId, RequestTimers, undefined), 1340 Timer = {RequestId, TimerRef}, 1341 cancel_timer(TimerRef, {timeout, Request#request.id}), 1342 httpc_manager:request_done(RequestId, ProfileName), 1343 NewSession = maybe_make_session_available(ProfileName, Session), 1344 Timers2 = Timers#timers{request_timers = lists:delete(Timer, 1345 RequestTimers)}, 1346 State#state{request = Request#request{from = answer_sent}, 1347 session = NewSession, 1348 timers = Timers2}. 1349 1350maybe_make_session_available(ProfileName, 1351 #session{available = false} = Session) -> 1352 update_session(ProfileName, Session, #session.available, true), 1353 Session#session{available = true}; 1354maybe_make_session_available(_ProfileName, Session) -> 1355 Session. 1356 1357cancel_timers(#timers{request_timers = ReqTmrs, queue_timer = QTmr}) -> 1358 cancel_timer(QTmr, timeout_queue), 1359 CancelTimer = fun({_, Timer}) -> cancel_timer(Timer, timeout) end, 1360 lists:foreach(CancelTimer, ReqTmrs). 1361 1362cancel_timer(undefined, _) -> 1363 ok; 1364cancel_timer(Timer, TimeoutMsg) -> 1365 erlang:cancel_timer(Timer), 1366 receive 1367 TimeoutMsg -> 1368 ok 1369 after 0 -> 1370 ok 1371 end. 1372 1373retry_pipeline([], _) -> 1374 ok; 1375 1376%% Skip requests when the answer has already been sent 1377retry_pipeline([#request{from = answer_sent}|PipeLine], State) -> 1378 retry_pipeline(PipeLine, State); 1379 1380retry_pipeline([Request | PipeLine], 1381 #state{timers = Timers, 1382 profile_name = ProfileName} = State) -> 1383 NewState = 1384 case (catch httpc_manager:retry_request(Request, ProfileName)) of 1385 ok -> 1386 RequestTimers = Timers#timers.request_timers, 1387 ReqId = Request#request.id, 1388 TimerRef = 1389 proplists:get_value(ReqId, RequestTimers, undefined), 1390 cancel_timer(TimerRef, {timeout, ReqId}), 1391 NewReqsTimers = lists:delete({ReqId, TimerRef}, RequestTimers), 1392 NewTimers = Timers#timers{request_timers = NewReqsTimers}, 1393 State#state{timers = NewTimers}; 1394 1395 Error -> 1396 answer_request(Request#request.from, 1397 httpc_response:error(Request, Error), State) 1398 end, 1399 retry_pipeline(PipeLine, NewState). 1400 1401handle_proxy_options(https, #options{https_proxy = {HttpsProxy, _} = HttpsProxyOpt}) when 1402 HttpsProxy =/= undefined -> 1403 HttpsProxyOpt; 1404handle_proxy_options(_, #options{proxy = Proxy}) -> 1405 Proxy. 1406 1407%%% Check to see if the given {Host,Port} tuple is in the NoProxyList 1408%%% Returns an eventually updated {Host,Port} tuple, with the proxy address 1409handle_proxy(HostPort = {Host, _Port}, {Proxy, NoProxy}) -> 1410 case Proxy of 1411 undefined -> 1412 HostPort; 1413 Proxy -> 1414 case is_no_proxy_dest(Host, NoProxy) of 1415 true -> 1416 HostPort; 1417 false -> 1418 Proxy 1419 end 1420 end. 1421 1422is_no_proxy_dest(_, []) -> 1423 false; 1424is_no_proxy_dest(Host, [ "*." ++ NoProxyDomain | NoProxyDests]) -> 1425 1426 case is_no_proxy_dest_domain(Host, NoProxyDomain) of 1427 true -> 1428 true; 1429 false -> 1430 is_no_proxy_dest(Host, NoProxyDests) 1431 end; 1432 1433is_no_proxy_dest(Host, [NoProxyDest | NoProxyDests]) -> 1434 IsNoProxyDest = case http_util:is_hostname(NoProxyDest) of 1435 true -> 1436 fun is_no_proxy_host_name/2; 1437 false -> 1438 fun is_no_proxy_dest_address/2 1439 end, 1440 1441 case IsNoProxyDest(Host, NoProxyDest) of 1442 true -> 1443 true; 1444 false -> 1445 is_no_proxy_dest(Host, NoProxyDests) 1446 end. 1447 1448is_no_proxy_host_name(Host, Host) -> 1449 true; 1450is_no_proxy_host_name(_,_) -> 1451 false. 1452 1453is_no_proxy_dest_domain(Dest, DomainPart) -> 1454 lists:suffix(DomainPart, Dest). 1455 1456is_no_proxy_dest_address(Dest, Dest) -> 1457 true; 1458is_no_proxy_dest_address(Dest, AddressPart) -> 1459 lists:prefix(AddressPart, Dest). 1460 1461init_mfa(#request{settings = Settings}, State) -> 1462 case Settings#http_options.version of 1463 "HTTP/0.9" -> 1464 {httpc_response, whole_body, [<<>>, -1]}; 1465 _ -> 1466 Relaxed = Settings#http_options.relaxed, 1467 {httpc_response, parse, [State#state.max_header_size, Relaxed]} 1468 end. 1469 1470init_status_line(#request{settings = Settings}) -> 1471 case Settings#http_options.version of 1472 "HTTP/0.9" -> 1473 {"HTTP/0.9", 200, "OK"}; 1474 _ -> 1475 undefined 1476 end. 1477 1478socket_type(#request{scheme = http}) -> 1479 ip_comm; 1480socket_type(#request{scheme = https, settings = Settings}) -> 1481 Settings#http_options.ssl. 1482 1483start_stream({_Version, _Code, _ReasonPhrase}, _Headers, 1484 #request{stream = none} = Request) -> 1485 {ok, Request}; 1486start_stream({_Version, Code, _ReasonPhrase}, Headers, 1487 #request{stream = self} = Request) 1488 when ?IS_STREAMED(Code) -> 1489 Msg = httpc_response:stream_start(Headers, Request, ignore), 1490 httpc_response:send(Request#request.from, Msg), 1491 {ok, Request}; 1492start_stream({_Version, Code, _ReasonPhrase}, Headers, 1493 #request{stream = {self, once}} = Request) 1494 when ?IS_STREAMED(Code) -> 1495 Msg = httpc_response:stream_start(Headers, Request, self()), 1496 httpc_response:send(Request#request.from, Msg), 1497 {ok, Request}; 1498start_stream({_Version, Code, _ReasonPhrase}, _Headers, 1499 #request{stream = Filename} = Request) 1500 when ?IS_STREAMED(Code) andalso is_list(Filename) -> 1501 case file:open(Filename, [write, raw, append, delayed_write]) of 1502 {ok, Fd} -> 1503 {ok, Request#request{stream = Fd}}; 1504 {error, Reason} -> 1505 exit({stream_to_file_failed, Reason}) 1506 end; 1507start_stream(_StatusLine, _Headers, Request) -> 1508 {ok, Request}. 1509 1510stream_remaining_body(<<>>, _, _) -> 1511 ok; 1512stream_remaining_body(Body, Request, {_, Code, _}) -> 1513 stream(Body, Request, Code). 1514 1515%% Note the end stream message is handled by httpc_response and will 1516%% be sent by answer_request 1517end_stream(_, #request{stream = none}) -> 1518 ok; 1519end_stream(_, #request{stream = self}) -> 1520 ok; 1521end_stream(_, #request{stream = {self, once}}) -> 1522 ok; 1523end_stream({_,200,_}, #request{stream = Fd}) -> 1524 case file:close(Fd) of 1525 ok -> 1526 ok; 1527 {error, enospc} -> % Could be due to delayed_write 1528 file:close(Fd) 1529 end; 1530end_stream({_,206,_}, #request{stream = Fd}) -> 1531 case file:close(Fd) of 1532 ok -> 1533 ok; 1534 {error, enospc} -> % Could be due to delayed_write 1535 file:close(Fd) 1536 end; 1537end_stream(_, _) -> 1538 ok. 1539 1540 1541next_body_chunk(#state{request = #request{stream = {self, once}}, 1542 once = once, 1543 session = Session} = State, 1544 Code) when ?IS_STREAMED(Code) -> 1545 activate_once(Session), 1546 State#state{once = inactive}; 1547next_body_chunk(#state{request = #request{stream = {self, once}}, 1548 once = inactive} = State, 1549 Code) when ?IS_STREAMED(Code) -> 1550 State; %% Wait for user to call stream_next 1551next_body_chunk(#state{session = Session} = State, _) -> 1552 activate_once(Session), 1553 State. 1554 1555handle_verbose(verbose) -> 1556 dbg:p(self(), [r]); 1557handle_verbose(debug) -> 1558 dbg:p(self(), [call]), 1559 dbg:tp(?MODULE, [{'_', [], [{return_trace}]}]); 1560handle_verbose(trace) -> 1561 dbg:p(self(), [call]), 1562 dbg:tpl(?MODULE, [{'_', [], [{return_trace}]}]); 1563handle_verbose(_) -> 1564 ok. 1565 1566send_raw(#session{socket = Socket, socket_type = SocketType}, 1567 {ProcessBody, Acc}) when is_function(ProcessBody, 1) -> 1568 send_raw(SocketType, Socket, ProcessBody, Acc); 1569send_raw(#session{socket = Socket, socket_type = SocketType}, Body) -> 1570 http_transport:send(SocketType, Socket, Body). 1571 1572send_raw(SocketType, Socket, ProcessBody, Acc) -> 1573 case ProcessBody(Acc) of 1574 eof -> 1575 ok; 1576 {ok, Data, NewAcc} -> 1577 DataBin = iolist_to_binary(Data), 1578 case http_transport:send(SocketType, Socket, DataBin) of 1579 ok -> 1580 send_raw(SocketType, Socket, ProcessBody, NewAcc); 1581 Error -> 1582 Error 1583 end 1584 end. 1585 1586tls_tunnel(Address, Request, #state{session = #session{socket = Socket, 1587 socket_type = SocketType} = Session} = State, 1588 ErrorHandler) -> 1589 UpgradeRequest = tls_tunnel_request(Request), 1590 case httpc_request:send(Address, Session, UpgradeRequest) of 1591 ok -> 1592 TmpState = State#state{request = UpgradeRequest, 1593 %% session = Session, 1594 mfa = init_mfa(UpgradeRequest, State), 1595 status_line = 1596 init_status_line(UpgradeRequest), 1597 headers = undefined, 1598 body = undefined}, 1599 http_transport:setopts(SocketType, 1600 Socket, [{active, once}]), 1601 NewState = activate_request_timeout(TmpState), 1602 {ok, NewState#state{status = {ssl_tunnel, Request}}}; 1603 {error, Reason} -> 1604 ErrorHandler(Request, State, Reason) 1605 end. 1606 1607tls_tunnel_request(#request{headers = Headers, 1608 settings = Options, 1609 id = RequestId, 1610 from = From, 1611 address = {Host, Port}= Adress, 1612 ipv6_host_with_brackets = IPV6}) -> 1613 1614 URI = Host ++":" ++ integer_to_list(Port), 1615 1616 #request{ 1617 id = RequestId, 1618 from = From, 1619 scheme = http, %% Use tcp-first and then upgrade! 1620 address = Adress, 1621 path = URI, 1622 pquery = "", 1623 method = connect, 1624 headers = #http_request_h{host = host_header(Headers, URI), 1625 te = "", 1626 pragma = "no-cache", 1627 other = [{"Proxy-Connection", " Keep-Alive"}]}, 1628 settings = Options, 1629 abs_uri = URI, 1630 stream = false, 1631 userinfo = "", 1632 headers_as_is = [], 1633 started = http_util:timestamp(), 1634 ipv6_host_with_brackets = IPV6 1635 }. 1636 1637host_header(#http_request_h{host = Host}, _) -> 1638 Host; 1639 1640%% Handles headers_as_is 1641host_header(_, URI) -> 1642 {ok, {_, _, Host, _, _, _}} = http_uri:parse(URI), 1643 Host. 1644 1645tls_upgrade(#state{status = 1646 {ssl_tunnel, 1647 #request{settings = 1648 #http_options{ssl = {_, TLSOptions0} = SocketType}, 1649 address = {Host, _} = Address} = Request}, 1650 session = #session{socket = TCPSocket} = Session0, 1651 options = Options} = State) -> 1652 1653 TLSOptions = maybe_add_sni(Host, TLSOptions0), 1654 1655 case ssl:connect(TCPSocket, TLSOptions) of 1656 {ok, TLSSocket} -> 1657 ClientClose = httpc_request:is_client_closing(Request#request.headers), 1658 SessionType = httpc_manager:session_type(Options), 1659 Session = Session0#session{ 1660 scheme = https, 1661 socket = TLSSocket, 1662 socket_type = SocketType, 1663 type = SessionType, 1664 client_close = ClientClose}, 1665 httpc_request:send(Address, Session, Request), 1666 http_transport:setopts(SocketType, TLSSocket, [{active, once}]), 1667 NewState = State#state{session = Session, 1668 request = Request, 1669 mfa = init_mfa(Request, State), 1670 status_line = 1671 init_status_line(Request), 1672 headers = undefined, 1673 body = undefined, 1674 status = new 1675 }, 1676 {noreply, activate_request_timeout(NewState)}; 1677 {error, Reason} -> 1678 Error = httpc_response:error(Request, {failed_connect, 1679 [{to_address, Address}, 1680 {tls, TLSOptions, Reason}]}), 1681 maybe_send_answer(Request, Error, State), 1682 {stop, normal, State#state{request = Request}} 1683 end. 1684 1685maybe_add_sni(Host, Options) -> 1686 case http_util:is_hostname(Host) andalso 1687 not lists:keymember(server_name_indication, 1, Options) of 1688 true -> 1689 [{server_name_indication, Host} | Options]; 1690 false -> 1691 Options 1692 end. 1693 1694%% --------------------------------------------------------------------- 1695%% Session wrappers 1696%% --------------------------------------------------------------------- 1697 1698insert_session(Session, ProfileName) -> 1699 httpc_manager:insert_session(Session, ProfileName). 1700 1701 1702update_session(ProfileName, #session{id = SessionId} = Session, Pos, Value) -> 1703 try 1704 begin 1705 httpc_manager:update_session(ProfileName, SessionId, Pos, Value) 1706 end 1707 catch 1708 error:undef -> %% This could happen during code upgrade 1709 Session2 = erlang:setelement(Pos, Session, Value), 1710 insert_session(Session2, ProfileName); 1711 error:badarg -> 1712 {stop, normal}; 1713 T:E:Stacktrace -> 1714 %% Unexpected this must be an error! 1715 error_logger:error_msg("Failed updating session: " 1716 "~n ProfileName: ~p" 1717 "~n SessionId: ~p" 1718 "~n Pos: ~p" 1719 "~n Value: ~p" 1720 "~nwhen" 1721 "~n Session (db) info: ~p" 1722 "~n Session (db): ~p" 1723 "~n Session (record): ~p" 1724 "~n T: ~p" 1725 "~n E: ~p", 1726 [ProfileName, SessionId, Pos, Value, 1727 (catch httpc_manager:which_session_info(ProfileName)), 1728 Session, 1729 (catch httpc_manager:lookup_session(SessionId, ProfileName)), 1730 T, E]), 1731 {stop, {failed_updating_session, 1732 [{profile, ProfileName}, 1733 {session_id, SessionId}, 1734 {pos, Pos}, 1735 {value, Value}, 1736 {etype, T}, 1737 {error, E}, 1738 {stacktrace, Stacktrace}]}} 1739 end. 1740 1741format_address({[$[|T], Port}) -> 1742 {ok, Address} = inet:parse_address(string:strip(T, right, $])), 1743 {Address, Port}; 1744format_address(HostPort) -> 1745 HostPort. 1746