1%%
2%% %CopyrightBegin%
3%%
4%% Copyright Ericsson AB 2002-2018. All Rights Reserved.
5%%
6%% Licensed under the Apache License, Version 2.0 (the "License");
7%% you may not use this file except in compliance with the License.
8%% You may obtain a copy of the License at
9%%
10%%     http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing, software
13%% distributed under the License is distributed on an "AS IS" BASIS,
14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15%% See the License for the specific language governing permissions and
16%% limitations under the License.
17%%
18%% %CopyrightEnd%
19%%
20%%
21
22-module(httpc_handler).
23
24-behaviour(gen_server).
25
26-include_lib("inets/src/http_lib/http_internal.hrl").
27-include("httpc_internal.hrl").
28
29-define(IS_STREAMED(Code), ((Code =:= 200) orelse (Code =:= 206))).
30
31%%--------------------------------------------------------------------
32%% Internal Application API
33-export([
34         start_link/4,
35         send/2,
36         cancel/2,
37         stream_next/1,
38         info/1
39        ]).
40
41%% gen_server callbacks
42-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
43         terminate/2, code_change/3]).
44
45-record(timers,
46        {
47          request_timers = [] :: [reference()],
48          queue_timer         :: reference() | 'undefined'
49         }).
50
51-record(state,
52        {
53          request                   :: request() | 'undefined',
54          session                   :: session() | 'undefined',
55          status_line,               % {Version, StatusCode, ReasonPharse}
56          headers                   :: http_response_h() | 'undefined',
57          body                      :: binary() | 'undefined',
58          mfa,                       % {Module, Function, Args}
59          pipeline = queue:new()    :: queue:queue(),
60          keep_alive = queue:new()  :: queue:queue(),
61          status                    :: undefined | new | pipeline | keep_alive | close | {ssl_tunnel, request()},
62          canceled = [],             % [RequestId]
63          max_header_size = nolimit :: nolimit | integer(),
64          max_body_size = nolimit   :: nolimit | integer(),
65          options                   :: options(),
66          timers = #timers{}        :: #timers{},
67          profile_name              :: atom(), % id of httpc_manager process.
68          once = inactive           :: 'inactive' | 'once'
69         }).
70
71
72%%====================================================================
73%% External functions
74%%====================================================================
75%%--------------------------------------------------------------------
76%% Function: start_link(Request, Options, ProfileName) -> {ok, Pid}
77%%
78%%      Request = #request{}
79%%      Options =  #options{}
80%%      ProfileName = atom() - id of httpc manager process
81%%
82%% Description: Starts a http-request handler process. Intended to be
83%% called by the httpc profile supervisor or the http manager process
84%% if the client is started stand alone form inets.
85%%
86%% Note: Uses proc_lib and gen_server:enter_loop so that waiting
87%% for gen_tcp:connect to timeout in init/1 will not
88%% block the httpc manager process in odd cases such as trying to call
89%% a server that does not exist. (See OTP-6735) The only API function
90%% sending messages to the handler process that can be called before
91%% init has completed is cancel and that is not a problem! (Send and
92%% stream will not be called before the first request has been sent and
93%% the reply or part of it has arrived.)
94%%--------------------------------------------------------------------
95%%--------------------------------------------------------------------
96
97start_link(Parent, Request, Options, ProfileName) ->
98    {ok, proc_lib:start_link(?MODULE, init, [[Parent, Request, Options,
99                                              ProfileName]])}.
100
101%%--------------------------------------------------------------------
102%% Function: send(Request, Pid) -> ok
103%%      Request = #request{}
104%%      Pid = pid() - the pid of the http-request handler process.
105%%
106%% Description: Uses this handlers session to send a request. Intended
107%% to be called by the httpc manager process.
108%%--------------------------------------------------------------------
109send(Request, Pid) ->
110    call(Request, Pid).
111
112
113%%--------------------------------------------------------------------
114%% Function: cancel(RequestId, Pid) -> ok
115%%      RequestId = reference()
116%%      Pid = pid() -  the pid of the http-request handler process.
117%%
118%% Description: Cancels a request. Intended to be called by the httpc
119%% manager process.
120%%--------------------------------------------------------------------
121cancel(RequestId, Pid) ->
122    cast({cancel, RequestId}, Pid).
123
124
125%%--------------------------------------------------------------------
126%% Function: stream_next(Pid) -> ok
127%%      Pid = pid() -  the pid of the http-request handler process.
128%%
129%% Description: Works as inets:setopts(active, once) but for
130%% body chunks sent to the user.
131%%--------------------------------------------------------------------
132stream_next(Pid) ->
133    cast(stream_next, Pid).
134
135
136%%--------------------------------------------------------------------
137%% Function: info(Pid) -> [{Key, Val}]
138%%      Pid = pid() -  the pid of the http-request handler process.
139%%
140%% Description:
141%%     Returns various information related to this handler
142%%     Used for debugging and testing
143%%--------------------------------------------------------------------
144info(Pid) ->
145    try
146	call(info, Pid)
147    catch
148	_:_ ->
149	    []
150    end.
151
152%%--------------------------------------------------------------------
153%% Function: stream(BodyPart, Request, Code) -> _
154%%      BodyPart = binary()
155%%      Request = #request{}
156%%      Code = integer()
157%%
158%% Description: Stream the HTTP body to the caller process (client)
159%%              or to a file. Note that the data that has been stream
160%%              does not have to be saved. (We do not want to use up
161%%              memory in vain.)
162%%--------------------------------------------------------------------
163%% Request should not be streamed
164stream(BodyPart, #request{stream = none} = Request, _) ->
165    {false, BodyPart, Request};
166
167%% Stream to caller
168stream(BodyPart, #request{stream = Self} = Request, Code)
169  when ?IS_STREAMED(Code) andalso
170       ((Self =:= self) orelse (Self =:= {self, once})) ->
171    httpc_response:send(Request#request.from,
172                        {Request#request.id, stream, BodyPart}),
173    {true, <<>>, Request};
174
175%% Stream to file
176%% This has been moved to start_stream/3
177%% We keep this for backward compatibillity...
178stream(BodyPart, #request{stream = Filename} = Request, Code)
179  when ?IS_STREAMED(Code) andalso is_list(Filename) ->
180    case file:open(Filename, [write, raw, append, delayed_write]) of
181        {ok, Fd} ->
182            stream(BodyPart, Request#request{stream = Fd}, 200);
183        {error, Reason} ->
184            exit({stream_to_file_failed, Reason})
185    end;
186
187%% Stream to file
188stream(BodyPart, #request{stream = Fd} = Request, Code)
189  when ?IS_STREAMED(Code) ->
190    case file:write(Fd, BodyPart) of
191        ok ->
192            {true, <<>>, Request};
193        {error, Reason} ->
194            exit({stream_to_file_failed, Reason})
195    end;
196
197stream(BodyPart, Request,_) -> % only 200 and 206 responses can be streamed
198    {false, BodyPart, Request}.
199
200
201%%====================================================================
202%% Server functions
203%%====================================================================
204
205%%--------------------------------------------------------------------
206%% Function: init([Options, ProfileName]) -> {ok, State} |
207%%                       {ok, State, Timeout} | ignore | {stop, Reason}
208%%
209%%      Options =  #options{}
210%%      ProfileName = atom() - id of httpc manager process
211%%
212%% Description: Initiates the httpc_handler process
213%%
214%% Note: The init function may not fail, that will kill the
215%% httpc_manager process. We could make the httpc_manager more comlex
216%% but we do not want that so errors will be handled by the process
217%% sending an init_error message to itself.
218%%--------------------------------------------------------------------
219init([Parent, Request, Options, ProfileName]) ->
220    process_flag(trap_exit, true),
221
222    %% Do not let initial tcp-connection block the manager-process
223    proc_lib:init_ack(Parent, self()),
224    handle_verbose(Options#options.verbose),
225    ProxyOptions = handle_proxy_options(Request#request.scheme, Options),
226    Address = handle_proxy(Request#request.address, ProxyOptions),
227    {ok, State} =
228        %% #state.once should initially be 'inactive' because we
229        %% activate the socket at first regardless of the state.
230        case {Address /= Request#request.address, Request#request.scheme} of
231            {true, https} ->
232                connect_and_send_upgrade_request(Address, Request,
233                                                 #state{options = Options,
234                                                        profile_name = ProfileName});
235            {_, _} ->
236                connect_and_send_first_request(Address, Request,
237                                   #state{options = Options,
238                                          profile_name = ProfileName})
239        end,
240    gen_server:enter_loop(?MODULE, [], State).
241
242%%--------------------------------------------------------------------
243%% Function: handle_call(Request, From, State) -> {reply, Reply, State} |
244%%          {reply, Reply, State, Timeout} |
245%%          {noreply, State}               |
246%%          {noreply, State, Timeout}      |
247%%          {stop, Reason, Reply, State}   | (terminate/2 is called)
248%%          {stop, Reason, State}            (terminate/2 is called)
249%% Description: Handling call messages
250%%--------------------------------------------------------------------
251handle_call(Request, From, State) ->
252    try do_handle_call(Request, From, State) of
253	Result ->
254	    Result
255    catch
256	Class:Reason:ST ->
257	    {stop, {shutdown, {{Class, Reason}, ST}}, State}
258    end.
259
260
261%%--------------------------------------------------------------------
262%% Function: handle_cast(Msg, State) -> {noreply, State} |
263%%          {noreply, State, Timeout} |
264%%          {stop, Reason, State}            (terminate/2 is called)
265%% Description: Handling cast messages
266%%--------------------------------------------------------------------
267handle_cast(Msg, State) ->
268    try do_handle_cast(Msg, State) of
269	Result ->
270	    Result
271    catch
272	Class:Reason:ST ->
273	    {stop, {shutdown, {{Class, Reason}, ST}}, State}
274    end.
275
276%%--------------------------------------------------------------------
277%% Function: handle_info(Info, State) -> {noreply, State} |
278%%          {noreply, State, Timeout} |
279%%          {stop, Reason, State}            (terminate/2 is called)
280%% Description: Handling all non call/cast messages
281%%--------------------------------------------------------------------
282handle_info(Info, State) ->
283    try do_handle_info(Info, State) of
284	Result ->
285	    Result
286    catch
287	Class:Reason:ST ->
288	    {stop, {shutdown, {{Class, Reason}, ST}}, State}
289    end.
290
291%%--------------------------------------------------------------------
292%% Function: terminate(Reason, State) -> _  (ignored by gen_server)
293%% Description: Shutdown the httpc_handler
294%%--------------------------------------------------------------------
295
296terminate(normal, #state{session = undefined}) ->
297    ok;
298
299%% Init error sending, no session information has been setup but
300%% there is a socket that needs closing.
301terminate(normal,
302          #state{session = #session{id = undefined} = Session}) ->
303    close_socket(Session);
304
305%% Socket closed remotely
306terminate(normal,
307          #state{session = #session{socket      = {remote_close, Socket},
308                                    socket_type = SocketType,
309                                    id          = Id},
310                 profile_name = ProfileName,
311                 request      = Request,
312                 timers       = Timers,
313                 pipeline     = Pipeline,
314                 keep_alive   = KeepAlive} = State) ->
315    %% Clobber session
316    (catch httpc_manager:delete_session(Id, ProfileName)),
317
318    maybe_retry_queue(Pipeline, State),
319    maybe_retry_queue(KeepAlive, State),
320
321    %% Cancel timers
322    cancel_timers(Timers),
323
324    %% Maybe deliver answers to requests
325    deliver_answer(Request),
326
327    %% And, just in case, close our side (**really** overkill)
328    http_transport:close(SocketType, Socket);
329
330terminate(_Reason, #state{session = #session{id          = Id,
331                                            socket      = Socket,
332                                            socket_type = SocketType},
333                    request      = undefined,
334                    profile_name = ProfileName,
335                    timers       = Timers,
336                    pipeline     = Pipeline,
337                    keep_alive   = KeepAlive} = State) ->
338
339    %% Clobber session
340    (catch httpc_manager:delete_session(Id, ProfileName)),
341
342    maybe_retry_queue(Pipeline, State),
343    maybe_retry_queue(KeepAlive, State),
344
345    cancel_timer(Timers#timers.queue_timer, timeout_queue),
346    http_transport:close(SocketType, Socket);
347
348terminate(_Reason, #state{request = undefined}) ->
349    ok;
350
351terminate(Reason, #state{request = Request} = State) ->
352    NewState = maybe_send_answer(Request,
353                                 httpc_response:error(Request, Reason),
354                                 State),
355    terminate(Reason, NewState#state{request = undefined}).
356
357%%--------------------------------------------------------------------
358%% Func: code_change(_OldVsn, State, Extra) -> {ok, NewState}
359%% Purpose: Convert process state when code is changed
360%%--------------------------------------------------------------------
361
362code_change(_, State, _) ->
363    {ok, State}.
364
365%%%--------------------------------------------------------------------
366%%% Internal functions
367%%%--------------------------------------------------------------------
368do_handle_call(#request{address = Addr} = Request, _,
369            #state{status  = Status,
370                   session = #session{type = pipeline} = Session,
371                   timers  = Timers,
372                   options = #options{proxy = Proxy} = _Options,
373                   profile_name = ProfileName} = State0)
374  when Status =/= undefined ->
375    Address = handle_proxy(Addr, Proxy),
376    case httpc_request:send(Address, Session, Request) of
377        ok ->
378
379            ?hcrd("request sent", []),
380
381            %% Activate the request time out for the new request
382            State1 =
383                activate_request_timeout(State0#state{request = Request}),
384
385            ClientClose =
386                httpc_request:is_client_closing(Request#request.headers),
387
388            case State0#state.request of
389                #request{} = OldRequest -> %% Old request not yet finished
390                    %% Make sure to use the new value of timers in state
391		    NewTimers = State1#state.timers,
392                    NewPipeline = queue:in(Request, State1#state.pipeline),
393                    NewSession  =
394                        Session#session{queue_length =
395                                        %% Queue + current
396                                        queue:len(NewPipeline) + 1,
397                                        client_close = ClientClose},
398                    insert_session(NewSession, ProfileName),
399                    {reply, ok, State1#state{
400				  request = OldRequest,
401				  pipeline = NewPipeline,
402				  session  = NewSession,
403				  timers   = NewTimers}};
404                undefined ->
405                    %% Note: tcp-message receiving has already been
406                    %% activated by handle_pipeline/2.
407                    cancel_timer(Timers#timers.queue_timer,
408                                 timeout_queue),
409                    NewSession =
410                        Session#session{queue_length = 1,
411                                        client_close = ClientClose},
412                    httpc_manager:insert_session(NewSession, ProfileName),
413                    NewTimers = Timers#timers{queue_timer = undefined},
414		    State = init_wait_for_response_state(Request, State1#state{session = NewSession,
415								      timers = NewTimers}),
416                    {reply, ok, State}
417            end;
418        {error, Reason} ->
419            NewPipeline = queue:in(Request, State0#state.pipeline),
420            {stop, {shutdown, {pipeline_failed, Reason}}, State0#state{pipeline = NewPipeline}}
421    end;
422
423do_handle_call(#request{address = Addr} = Request, _,
424            #state{status  = Status,
425                   session = #session{type = keep_alive} = Session,
426                   timers  = Timers,
427                   options = #options{proxy = Proxy} = _Options,
428                   profile_name = ProfileName} = State0)
429  when Status =/= undefined ->
430
431    ClientClose = httpc_request:is_client_closing(Request#request.headers),
432
433    case State0#state.request of
434	#request{} -> %% Old request not yet finished
435	    %% Make sure to use the new value of timers in state
436	    NewKeepAlive = queue:in(Request, State0#state.keep_alive),
437	    NewSession   =
438		Session#session{queue_length =
439				    %% Queue + current
440				    queue:len(NewKeepAlive) + 1,
441				client_close = ClientClose},
442	    insert_session(NewSession, ProfileName),
443	    {reply, ok, State0#state{keep_alive = NewKeepAlive,
444				    session    = NewSession}};
445	undefined ->
446	    %% Note: tcp-message receiving has already been
447	    %% activated by handle_pipeline/2.
448	    cancel_timer(Timers#timers.queue_timer,
449			 timeout_queue),
450	    NewTimers = Timers#timers{queue_timer = undefined},
451	    State1 = State0#state{timers = NewTimers},
452	    Address = handle_proxy(Addr, Proxy),
453	    case httpc_request:send(Address, Session, Request) of
454		ok ->
455		    %% Activate the request time out for the new request
456		    State2 =
457			activate_request_timeout(State1#state{request = Request}),
458		    NewSession =
459			Session#session{queue_length = 1,
460					client_close = ClientClose},
461		    insert_session(NewSession, ProfileName),
462		    State = init_wait_for_response_state(Request, State2#state{session = NewSession}),
463		    {reply, ok, State};
464		{error, Reason} ->
465		    {stop, {shutdown, {keepalive_failed, Reason}}, State1}
466	    end
467    end;
468do_handle_call(info, _, State) ->
469    Info = handler_info(State),
470    {reply, Info, State}.
471
472%% When the request in process has been canceled the handler process is
473%% stopped and the pipelined requests will be reissued or remaining
474%% requests will be sent on a new connection. This is is
475%% based on the assumption that it is probably cheaper to reissue the
476%% requests than to wait for a potentiall large response that we then
477%% only throw away. This of course is not always true maybe we could
478%% do something smarter here?! If the request canceled is not
479%% the one handled right now the same effect will take place in
480%% handle_pipeline/2 when the canceled request is on turn,
481%% handle_keep_alive_queue/2 on the other hand will just skip the
482%% request as if it was never issued as in this case the request will
483%% not have been sent.
484do_handle_cast({cancel, RequestId},
485            #state{request      = #request{id = RequestId} = Request,
486                   canceled     = Canceled} = State) ->
487    {stop, normal,
488     State#state{canceled = [RequestId | Canceled],
489                 request  = Request#request{from = answer_sent}}};
490do_handle_cast({cancel, RequestId},
491	       #state{request = #request{},
492		      canceled = Canceled} = State) ->
493    {noreply, State#state{canceled = [RequestId | Canceled]}};
494do_handle_cast({cancel, _},
495	       #state{request = undefined} = State) ->
496    {noreply, State};
497
498do_handle_cast(stream_next, #state{session = Session} = State) ->
499    activate_once(Session),
500    %% Inactivate the #state.once here because we don't want
501    %% next_body_chunk/1 to activate the socket twice.
502    {noreply, State#state{once = inactive}}.
503
504do_handle_info({Proto, _Socket, Data},
505            #state{mfa = {Module, Function, Args},
506                   request = #request{method = Method} = Request,
507                   session = Session,
508                   status_line = StatusLine} = State)
509  when (Proto =:= tcp) orelse
510       (Proto =:= ssl) orelse
511       (Proto =:= httpc_handler) ->
512
513    try Module:Function([Data | Args]) of
514	{ok, Result} ->
515	    handle_http_msg(Result, State);
516	{_, whole_body, _} when Method =:= head ->
517	    handle_response(State#state{body = <<>>});
518	{Module, whole_body, [Body, Length]} ->
519	    {_, Code, _} = StatusLine,
520	    {Streamed, NewBody, NewRequest} = stream(Body, Request, Code),
521	    %% When we stream we will not keep the already
522	    %% streamed data, that would be a waste of memory.
523	    NewLength =
524		case Streamed of
525		    false ->
526			Length;
527		    true ->
528			Length - size(Body)
529		end,
530
531	    NewState = next_body_chunk(State, Code),
532	    NewMFA   = {Module, whole_body, [NewBody, NewLength]},
533	    {noreply, NewState#state{mfa     = NewMFA,
534				     request = NewRequest}};
535        {Module, decode_size,
536             [TotalChunk, HexList, AccHeaderSize,
537              {MaxBodySize, BodySoFar, AccLength, MaxHeaderSize}]}
538	  when BodySoFar =/= <<>> ->
539	    %% The response body is chunk-encoded. Steal decoded
540	    %% chunks as much as possible to stream.
541	    {_, Code, _} = StatusLine,
542	    {_, NewBody, NewRequest} = stream(BodySoFar, Request, Code),
543	    NewState = next_body_chunk(State, Code),
544	    NewMFA   = {Module, decode_size,
545			[TotalChunk, HexList, AccHeaderSize,
546                             {MaxBodySize, NewBody, AccLength, MaxHeaderSize}]},
547	    {noreply, NewState#state{mfa     = NewMFA,
548				     request = NewRequest}};
549	{Module, decode_data,
550	 [ChunkSize, TotalChunk,
551	  {MaxBodySize, BodySoFar, AccLength, MaxHeaderSize}]}
552	  when TotalChunk =/= <<>> orelse BodySoFar =/= <<>> ->
553	    %% The response body is chunk-encoded. Steal decoded
554	    %% chunks as much as possible to stream.
555	    ChunkSizeToSteal = min(ChunkSize, byte_size(TotalChunk)),
556	    <<StolenChunk:ChunkSizeToSteal/binary, NewTotalChunk/binary>> = TotalChunk,
557	    StolenBody   = <<BodySoFar/binary, StolenChunk/binary>>,
558	    NewChunkSize = ChunkSize - ChunkSizeToSteal,
559	    {_, Code, _} = StatusLine,
560
561	    {_, NewBody, NewRequest} = stream(StolenBody, Request, Code),
562	    NewState = next_body_chunk(State, Code),
563	    NewMFA   = {Module, decode_data,
564			[NewChunkSize, NewTotalChunk,
565			 {MaxBodySize, NewBody, AccLength, MaxHeaderSize}]},
566                {noreply, NewState#state{mfa     = NewMFA,
567                                         request = NewRequest}};
568	NewMFA ->
569	    activate_once(Session),
570	    {noreply, State#state{mfa = NewMFA}}
571    catch
572	Class:Reason:ST ->
573	    ClientReason = {could_not_parse_as_http, Data},
574	    ClientErrMsg = httpc_response:error(Request, ClientReason),
575	    NewState     = answer_request(Request, ClientErrMsg, State),
576	    {stop, {shutdown, {{Class, Reason}, ST}}, NewState}
577    end;
578
579do_handle_info({Proto, Socket, Data},
580            #state{mfa          = MFA,
581                   request      = Request,
582                   session      = Session,
583                   status       = Status,
584                   status_line  = StatusLine,
585                   profile_name = Profile} = State)
586  when (Proto =:= tcp) orelse
587       (Proto =:= ssl) orelse
588       (Proto =:= httpc_handler) ->
589
590    error_logger:warning_msg("Received unexpected ~p data on ~p"
591                             "~n   Data:       ~p"
592                             "~n   MFA:        ~p"
593                             "~n   Request:    ~p"
594                             "~n   Session:    ~p"
595                             "~n   Status:     ~p"
596                             "~n   StatusLine: ~p"
597                             "~n   Profile:    ~p"
598                             "~n",
599                             [Proto, Socket, Data, MFA,
600                              Request, Session, Status, StatusLine, Profile]),
601    activate_once(Session),
602    {noreply, State};
603
604%% The Server may close the connection to indicate that the
605%% whole body is now sent instead of sending a lengh
606%% indicator. In this case the lengh indicator will be
607%% -1.
608do_handle_info({Info, _}, State = #state{mfa = {_, whole_body, Args}})
609  when Info =:= tcp_closed orelse
610       Info =:= ssl_closed ->
611    case lists:last(Args) of
612        Length when Length =< 0 ->
613            handle_response(State#state{body = hd(Args)});
614        _Else ->
615            {stop, {shutdown, server_closed}, State}
616    end;
617
618%%% Server closes idle pipeline
619do_handle_info({tcp_closed, _}, State = #state{request = undefined}) ->
620    {stop, normal, State};
621do_handle_info({ssl_closed, _}, State = #state{request = undefined}) ->
622    {stop, normal, State};
623
624%%% Error cases
625do_handle_info({tcp_closed, _}, #state{session = Session0} = State) ->
626    Socket  = Session0#session.socket,
627    Session = Session0#session{socket = {remote_close, Socket}},
628    %% {stop, session_remotly_closed, State};
629    {stop, normal, State#state{session = Session}};
630do_handle_info({ssl_closed, _}, #state{session = Session0} = State) ->
631    Socket  = Session0#session.socket,
632    Session = Session0#session{socket = {remote_close, Socket}},
633    %% {stop, session_remotly_closed, State};
634    {stop, normal, State#state{session = Session}};
635do_handle_info({tcp_error, _, _} = Reason, State) ->
636    {stop, Reason, State};
637do_handle_info({ssl_error, _, _} = Reason, State) ->
638    {stop, Reason, State};
639
640%% Timeouts
641%% Internally, to a request handling process, a request timeout is
642%% seen as a canceled request.
643do_handle_info({timeout, RequestId},
644            #state{request      = #request{id = RequestId} = Request,
645                   canceled     = Canceled,
646                   profile_name = ProfileName} = State) ->
647    httpc_response:send(Request#request.from,
648                        httpc_response:error(Request, timeout)),
649    httpc_manager:request_done(RequestId, ProfileName),
650    {stop, normal,
651     State#state{request  = Request#request{from = answer_sent},
652                 canceled = [RequestId | Canceled]}};
653
654do_handle_info({timeout, RequestId},
655            #state{canceled     = Canceled,
656                   profile_name = ProfileName} = State) ->
657    Filter =
658        fun(#request{id = Id, from = From} = Request) when Id =:= RequestId ->
659                %% Notify the owner
660                httpc_response:send(From,
661                                    httpc_response:error(Request, timeout)),
662                httpc_manager:request_done(RequestId, ProfileName),
663                [Request#request{from = answer_sent}];
664           (_) ->
665                true
666        end,
667    case State#state.status of
668        pipeline ->
669            Pipeline = queue:filter(Filter, State#state.pipeline),
670            {noreply, State#state{canceled = [RequestId | Canceled],
671                                  pipeline = Pipeline}};
672        keep_alive ->
673            KeepAlive = queue:filter(Filter, State#state.keep_alive),
674            {noreply, State#state{canceled   = [RequestId | Canceled],
675                                  keep_alive = KeepAlive}}
676    end;
677
678do_handle_info(timeout_queue, State = #state{request = undefined}) ->
679    {stop, normal, State};
680
681%% Timing was such as the queue_timeout was not canceled!
682do_handle_info(timeout_queue, #state{timers = Timers} = State) ->
683    {noreply, State#state{timers =
684                          Timers#timers{queue_timer = undefined}}};
685
686%% Setting up the connection to the server somehow failed.
687do_handle_info({init_error, Reason, ClientErrMsg},
688            State = #state{request = Request}) ->
689    NewState = answer_request(Request, ClientErrMsg, State),
690    {stop, {shutdown, Reason}, NewState};
691
692%%% httpc_manager process dies.
693do_handle_info({'EXIT', _, _}, State = #state{request = undefined}) ->
694    {stop, normal, State};
695%%Try to finish the current request anyway,
696%% there is a fairly high probability that it can be done successfully.
697%% Then close the connection, hopefully a new manager is started that
698%% can retry requests in the pipeline.
699do_handle_info({'EXIT', _, _}, State) ->
700    {noreply, State#state{status = close}}.
701
702call(Msg, Pid) ->
703    try gen_server:call(Pid, Msg, infinity)
704    catch
705 	exit:{noproc, _} ->
706 	    {error, closed};
707	exit:{normal, _} ->
708	    {error, closed};
709	exit:{{shutdown, _},_} ->
710	    {error, closed}
711    end.
712
713cast(Msg, Pid) ->
714    gen_server:cast(Pid, Msg).
715
716maybe_retry_queue(Q, State) ->
717    case queue:is_empty(Q) of
718        false ->
719            retry_pipeline(queue:to_list(Q), State);
720        true ->
721            ok
722    end.
723
724maybe_send_answer(#request{from = answer_sent}, _Reason, State) ->
725    State;
726maybe_send_answer(Request, Answer, State) ->
727    answer_request(Request, Answer, State).
728
729deliver_answer(#request{from = From} = Request)
730  when From =/= answer_sent ->
731    Response = httpc_response:error(Request, socket_closed_remotely),
732    httpc_response:send(From, Response);
733deliver_answer(_Request) ->
734    ok.
735
736%%%--------------------------------------------------------------------
737%%% Internal functions
738%%%--------------------------------------------------------------------
739
740connect(SocketType, ToAddress,
741        #options{ipfamily    = IpFamily,
742                 ip          = FromAddress,
743                 port        = FromPort,
744                 unix_socket = UnixSocket,
745                 socket_opts = Opts0}, Timeout) ->
746    Opts1 =
747        case FromPort of
748            default ->
749                Opts0;
750            _ ->
751                [{port, FromPort} | Opts0]
752        end,
753    Opts2 =
754        case FromAddress of
755            default ->
756                Opts1;
757            _ ->
758                [{ip, FromAddress} | Opts1]
759        end,
760    case IpFamily of
761        inet6fb4 ->
762            Opts3 = [inet6 | Opts2],
763            case http_transport:connect(SocketType,
764                                        ToAddress, Opts3, Timeout) of
765                {error, Reason6} ->
766                    Opts4 = [inet | Opts2],
767                    case http_transport:connect(SocketType,
768                                                ToAddress, Opts4, Timeout) of
769                        {error, Reason4} ->
770                            {error, {failed_connect,
771                                     [{to_address, ToAddress},
772                                      {inet6, Opts3, Reason6},
773                                      {inet,  Opts4, Reason4}]}};
774                        OK ->
775                            OK
776                    end;
777                OK ->
778                    OK
779            end;
780        local ->
781            Opts3 = [IpFamily | Opts2],
782            SocketAddr = {local, UnixSocket},
783            case http_transport:connect(SocketType, {SocketAddr, 0}, Opts3, Timeout) of
784                {error, Reason} ->
785                    {error, {failed_connect, [{to_address, SocketAddr},
786                                              {IpFamily, Opts3, Reason}]}};
787                Else ->
788                    Else
789            end;
790        _ ->
791            Opts3 = [IpFamily | Opts2],
792            case http_transport:connect(SocketType, ToAddress, Opts3, Timeout) of
793                {error, Reason} ->
794                    {error, {failed_connect, [{to_address, ToAddress},
795                                              {IpFamily, Opts3, Reason}]}};
796                Else ->
797                    Else
798            end
799    end.
800
801handle_unix_socket_options(#request{unix_socket = UnixSocket}, Options)
802  when UnixSocket =:= undefined ->
803    Options;
804
805handle_unix_socket_options(#request{unix_socket = UnixSocket},
806                           Options = #options{ipfamily = IpFamily}) ->
807    case IpFamily of
808        local ->
809            Options#options{unix_socket = UnixSocket};
810        Else ->
811            error({badarg, [{ipfamily, Else}, {unix_socket, UnixSocket}]})
812    end.
813
814connect_and_send_first_request(Address, Request, #state{options = Options0} = State) ->
815    SocketType  = socket_type(Request),
816    ConnTimeout = (Request#request.settings)#http_options.connect_timeout,
817    Options = handle_unix_socket_options(Request, Options0),
818    case connect(SocketType, format_address(Address), Options, ConnTimeout) of
819        {ok, Socket} ->
820            ClientClose =
821		httpc_request:is_client_closing(
822		  Request#request.headers),
823            SessionType = httpc_manager:session_type(Options),
824            SocketType  = socket_type(Request),
825            Session = #session{id = {Request#request.address, self()},
826                               scheme = Request#request.scheme,
827                               socket = Socket,
828			       socket_type = SocketType,
829			       client_close = ClientClose,
830			       type = SessionType},
831            case httpc_request:send(Address, Session, Request) of
832                ok ->
833                    TmpState = State#state{request = Request,
834                                           session = Session,
835                                           mfa = init_mfa(Request, State),
836                                           status_line = undefined,
837                                           headers = undefined,
838                                           body = undefined,
839                                           status = new},
840                    activate_once(Session),
841                    NewState = activate_request_timeout(TmpState),
842                    {ok, NewState};
843                {error, Reason} ->
844                    self() ! {init_error, error_sending,
845                              httpc_response:error(Request, Reason)},
846                    {ok, State#state{request = Request,
847                                     session = Session}}
848            end;
849        {error, Reason} ->
850            self() ! {init_error, error_connecting,
851                      httpc_response:error(Request, Reason)},
852            {ok, State#state{request = Request}}
853    end.
854
855connect_and_send_upgrade_request(Address, Request, #state{options = Options0} = State) ->
856    ConnTimeout = (Request#request.settings)#http_options.connect_timeout,
857    SocketType = ip_comm,
858    Options = handle_unix_socket_options(Request, Options0),
859    case connect(SocketType, Address, Options, ConnTimeout) of
860        {ok, Socket} ->
861	    SessionType = httpc_manager:session_type(Options),
862	    Session = #session{socket = Socket,
863			       socket_type = SocketType,
864			       id = {Request#request.address, self()},
865                               scheme = http,
866                               client_close = false,
867			       type = SessionType},
868	    ErrorHandler =
869		fun(ERequest, EState, EReason) ->
870			self() ! {init_error, error_sending,
871				  httpc_response:error(ERequest, EReason)},
872			{ok, EState#state{request = ERequest}} end,
873	    tls_tunnel(Address, Request, State#state{session = Session}, ErrorHandler);
874	{error, Reason} ->
875	    self() ! {init_error, error_connecting,
876		      httpc_response:error(Request, Reason)},
877	    {ok, State#state{request = Request}}
878    end.
879
880handler_info(#state{request     = Request,
881		    session     = Session,
882		    status_line = _StatusLine,
883		    pipeline    = Pipeline,
884		    keep_alive  = KeepAlive,
885		    status      = Status,
886		    canceled    = _Canceled,
887		    options     = _Options,
888		    timers      = _Timers} = _State) ->
889
890    %% Info about the current request
891    RequestInfo =
892	case Request of
893	    undefined ->
894		[];
895	    #request{id      = Id,
896		     started = ReqStarted} ->
897		[{id, Id}, {started, ReqStarted}]
898	end,
899
900    %% Info about the current session/socket
901    SessionType = Session#session.type,
902    QueueLen    = case SessionType of
903		      pipeline ->
904			  queue:len(Pipeline);
905		      keep_alive ->
906			  queue:len(KeepAlive)
907		  end,
908    Scheme     = Session#session.scheme,
909    Socket     = Session#session.socket,
910    SocketType = Session#session.socket_type,
911
912    SocketOpts  = http_transport:getopts(SocketType, Socket),
913    SocketStats = http_transport:getstat(SocketType, Socket),
914
915    Remote = http_transport:peername(SocketType, Socket),
916    Local  = http_transport:sockname(SocketType, Socket),
917
918    SocketInfo  = [{remote,       Remote},
919		   {local,        Local},
920		   {socket_opts,  SocketOpts},
921		   {socket_stats, SocketStats}],
922
923    SessionInfo =
924	[{type,         SessionType},
925	 {queue_length, QueueLen},
926	 {scheme,       Scheme},
927	 {socket_info,  SocketInfo}],
928
929    [{status,          Status},
930     {current_request, RequestInfo},
931     {session,         SessionInfo}].
932
933
934
935handle_http_msg({Version, StatusCode, ReasonPharse, Headers, Body},
936		State = #state{request = Request}) ->
937    case Headers#http_response_h.'content-type' of
938        "multipart/byteranges" ++ _Param ->
939            exit({not_yet_implemented, multypart_byteranges});
940        _ ->
941	    StatusLine       = {Version, StatusCode, ReasonPharse},
942	    {ok, NewRequest} = start_stream(StatusLine, Headers, Request),
943            handle_http_body(Body,
944			     State#state{request     = NewRequest,
945					 status_line = StatusLine,
946					 headers     = Headers})
947    end;
948handle_http_msg({ChunkedHeaders, Body},
949                #state{status_line = {_, Code, _}, headers = Headers} = State) ->
950    NewHeaders = http_chunk:handle_headers(Headers, ChunkedHeaders),
951    {_, NewBody, NewRequest} = stream(Body, State#state.request, Code),
952    handle_response(State#state{headers = NewHeaders,
953                                body    = NewBody,
954                                request = NewRequest});
955handle_http_msg(Body, #state{status_line = {_,Code, _}} = State) ->
956    {_, NewBody, NewRequest} = stream(Body, State#state.request, Code),
957    handle_response(State#state{body = NewBody, request = NewRequest}).
958
959handle_http_body(_, #state{status = {ssl_tunnel, _},
960			   status_line = {_,200, _}} = State) ->
961    tls_upgrade(State);
962
963handle_http_body(_, #state{status = {ssl_tunnel, Request},
964			   status_line = StatusLine} = State) ->
965    ClientErrMsg = httpc_response:error(Request,{could_no_establish_ssh_tunnel, StatusLine}),
966    NewState     = answer_request(Request, ClientErrMsg, State),
967    {stop, normal, NewState};
968
969%% All 1xx (informational), 204 (no content), and 304 (not modified)
970%% responses MUST NOT include a message-body, and thus are always
971%% terminated by the first empty line after the header fields.
972%% This implies that chunked encoding MUST NOT be used for these
973%% status codes.
974handle_http_body(Body, #state{headers = Headers,
975                              status_line = {_,StatusCode, _}} = State)
976  when Headers#http_response_h.'transfer-encoding' =/= "chunked" andalso
977       (StatusCode =:= 204 orelse                       %% No Content
978        StatusCode =:= 304 orelse                       %% Not Modified
979        100 =< StatusCode andalso StatusCode =< 199) -> %% Informational
980    handle_response(State#state{body = Body});
981
982%% Ignore the body of response to a HEAD method
983handle_http_body(_Body, #state{request = #request{method = head}} = State) ->
984    handle_response(State#state{body = <<>>});
985
986handle_http_body(Body, #state{headers       = Headers,
987			      max_body_size = MaxBodySize,
988			      status_line   = {_,Code, _},
989			      request       = Request} = State) ->
990    TransferEnc = Headers#http_response_h.'transfer-encoding',
991    case case_insensitive_header(TransferEnc) of
992        "chunked" ->
993	    try http_chunk:decode(Body, State#state.max_body_size,
994				  State#state.max_header_size) of
995		{Module, Function, Args} ->
996		    NewState = next_body_chunk(State, Code),
997		    {noreply, NewState#state{mfa =
998					     {Module, Function, Args}}};
999		{ok, {ChunkedHeaders, NewBody}} ->
1000		    NewHeaders = http_chunk:handle_headers(Headers,
1001							   ChunkedHeaders),
1002                    case Body of
1003                        <<>> ->
1004                           handle_response(State#state{headers = NewHeaders,
1005                                                body    = NewBody});
1006                        _ ->
1007                           {_, NewBody2, _} =
1008                                stream(NewBody, Request, Code),
1009                           handle_response(State#state{headers = NewHeaders,
1010                                       body    = NewBody2})
1011                    end
1012	    catch throw:{error, Reason} ->
1013		NewState =
1014		    answer_request(Request,
1015				   httpc_response:error(Request,
1016							Reason),
1017				   State),
1018		    {stop, normal, NewState}
1019	    end;
1020        Enc when Enc =:= "identity"; Enc =:= undefined ->
1021            Length =
1022                list_to_integer(Headers#http_response_h.'content-length'),
1023            case ((Length =< MaxBodySize) orelse (MaxBodySize =:= nolimit)) of
1024                true ->
1025                    case httpc_response:whole_body(Body, Length) of
1026                        {ok, Body} ->
1027			    {_, NewBody, NewRequest} =
1028				stream(Body, Request, Code),
1029			    handle_response(State#state{body    = NewBody,
1030							request = NewRequest});
1031                        MFA ->
1032			    NewState = next_body_chunk(State, Code),
1033			    {noreply, NewState#state{mfa = MFA}}
1034		    end;
1035                false ->
1036		    NewState =
1037			answer_request(Request,
1038				       httpc_response:error(Request,
1039							    body_too_big),
1040				       State),
1041                    {stop, normal, NewState}
1042            end;
1043        Encoding when is_list(Encoding) ->
1044            NewState = answer_request(Request,
1045                                      httpc_response:error(Request,
1046                                                           unknown_encoding),
1047                                      State),
1048            {stop, normal, NewState}
1049    end.
1050
1051handle_response(#state{status = new} = State) ->
1052    ?hcrd("handle response - status = new", []),
1053    handle_response(try_to_enable_pipeline_or_keep_alive(State));
1054
1055handle_response(#state{status = Status0} = State0) when Status0 =/= new ->
1056    State = handle_server_closing(State0),
1057    #state{request      = Request,
1058           session      = Session,
1059           status_line  = StatusLine,
1060           headers      = Headers,
1061           body         = Body,
1062           options      = Options,
1063           profile_name = ProfileName} = State,
1064    handle_cookies(Headers, Request, Options, ProfileName),
1065    case httpc_response:result({StatusLine, Headers, Body}, Request) of
1066	%% 100-continue
1067	continue ->
1068	    %% Send request body
1069	    {_, RequestBody} = Request#request.content,
1070	    send_raw(Session, RequestBody),
1071	    %% Wait for next response
1072	    activate_once(Session),
1073	    Relaxed = (Request#request.settings)#http_options.relaxed,
1074	    MFA = {httpc_response, parse,
1075		   [State#state.max_header_size, Relaxed]},
1076	    {noreply, State#state{mfa         = MFA,
1077				  status_line = undefined,
1078				  headers     = undefined,
1079				  body        = undefined}};
1080
1081	%% Ignore unexpected 100-continue response and receive the
1082	%% actual response that the server will send right away.
1083	{ignore, Data} ->
1084	    Relaxed = (Request#request.settings)#http_options.relaxed,
1085	    MFA     = {httpc_response, parse,
1086		       [State#state.max_header_size, Relaxed]},
1087	    NewState = State#state{mfa         = MFA,
1088				   status_line = undefined,
1089				   headers     = undefined,
1090				   body        = undefined},
1091	    handle_info({httpc_handler, dummy, Data}, NewState);
1092
1093	%% On a redirect or retry the current request becomes
1094	%% obsolete and the manager will create a new request
1095	%% with the same id as the current.
1096	{redirect, NewRequest, Data} ->
1097	    ok = httpc_manager:redirect_request(NewRequest, ProfileName),
1098	    handle_queue(State#state{request = undefined}, Data);
1099	{retry, TimeNewRequest, Data} ->
1100	    ok = httpc_manager:retry_request(TimeNewRequest, ProfileName),
1101	    handle_queue(State#state{request = undefined}, Data);
1102	{ok, Msg, Data} ->
1103	    stream_remaining_body(Body, Request, StatusLine),
1104	    end_stream(StatusLine, Request),
1105	    NewState = maybe_send_answer(Request, Msg, State),
1106	    handle_queue(NewState, Data);
1107	{stop, Msg} ->
1108	    end_stream(StatusLine, Request),
1109	    NewState = maybe_send_answer(Request, Msg, State),
1110	    {stop, normal, NewState}
1111    end.
1112
1113handle_cookies(_,_, #options{cookies = disabled}, _) ->
1114    ok;
1115%% User wants to verify the cookies before they are stored,
1116%% so the user will have to call a store command.
1117handle_cookies(_,_, #options{cookies = verify}, _) ->
1118    ok;
1119handle_cookies(Headers, Request, #options{cookies = enabled}, ProfileName) ->
1120    {Host, _ } = Request#request.address,
1121    Cookies = httpc_cookie:cookies(Headers#http_response_h.other,
1122				  Request#request.path, Host),
1123    httpc_manager:store_cookies(Cookies, Request#request.address,
1124				ProfileName).
1125
1126%% This request could not be pipelined or used as sequential keep alive
1127%% queue
1128handle_queue(#state{status = close} = State, _) ->
1129    {stop, normal, State};
1130
1131handle_queue(#state{status = keep_alive} = State, Data) ->
1132    handle_keep_alive_queue(State, Data);
1133
1134handle_queue(#state{status = pipeline} = State, Data) ->
1135    handle_pipeline(State, Data).
1136
1137handle_pipeline(#state{status       = pipeline,
1138		       session      = Session,
1139		       profile_name = ProfileName,
1140		       options      = #options{pipeline_timeout = TimeOut}} = State,
1141		Data) ->
1142    case queue:out(State#state.pipeline) of
1143	{empty, _} ->
1144	    handle_empty_queue(Session, ProfileName, TimeOut, State);
1145	{{value, NextRequest}, Pipeline} ->
1146	    case lists:member(NextRequest#request.id,
1147			      State#state.canceled) of
1148		true ->
1149		    %% See comment for handle_cast({cancel, RequestId})
1150		    {stop, normal,
1151		     State#state{request =
1152				 NextRequest#request{from = answer_sent},
1153				 pipeline = Pipeline}};
1154		false ->
1155		    NewSession =
1156			Session#session{queue_length =
1157					%% Queue + current
1158					queue:len(Pipeline) + 1},
1159		    receive_response(NextRequest,
1160				     NewSession, Data,
1161				     State#state{pipeline = Pipeline})
1162	    end
1163    end.
1164
1165handle_keep_alive_queue(#state{status       = keep_alive,
1166			       session      = Session,
1167			       profile_name = ProfileName,
1168			       options      = #options{keep_alive_timeout = TimeOut,
1169						       proxy              = Proxy}} = State,
1170			Data) ->
1171    case queue:out(State#state.keep_alive) of
1172	{empty, _} ->
1173	    handle_empty_queue(Session, ProfileName, TimeOut, State);
1174	{{value, NextRequest}, KeepAlive} ->
1175	    case lists:member(NextRequest#request.id,
1176			      State#state.canceled) of
1177		true ->
1178		    handle_keep_alive_queue(
1179		      State#state{keep_alive = KeepAlive}, Data);
1180		false ->
1181		    #request{address = Addr} = NextRequest,
1182		    Address = handle_proxy(Addr, Proxy),
1183		    case httpc_request:send(Address, Session, NextRequest) of
1184			ok ->
1185			    receive_response(NextRequest,
1186					     Session, <<>>,
1187					     State#state{keep_alive = KeepAlive});
1188			{error, Reason} ->
1189			    {stop, {shutdown, {keepalive_failed, Reason}}, State}
1190		    end
1191	    end
1192    end.
1193handle_empty_queue(Session, ProfileName, TimeOut, State) ->
1194    %% The server may choose too terminate an idle pipline| keep_alive session
1195    %% in this case we want to receive the close message
1196    %% at once and not when trying to send the next
1197    %% request.
1198    activate_once(Session),
1199    %% If a pipline | keep_alive session has been idle for some time is not
1200    %% closed by the server, the client may want to close it.
1201    NewState = activate_queue_timeout(TimeOut, State),
1202    case update_session(ProfileName, Session, #session.queue_length, 0) of
1203        {stop, Reason} ->
1204            {stop, {shutdown, Reason}, State};
1205        _ ->
1206            %% Note mfa will be initialized when a new request
1207            %% arrives.
1208            {noreply,
1209             NewState#state{request     = undefined,
1210                            mfa         = undefined,
1211                            status_line = undefined,
1212                            headers     = undefined,
1213                            body        = undefined
1214			   }}
1215    end.
1216
1217receive_response(Request, Session, Data, State) ->
1218    NewState = init_wait_for_response_state(Request, State),
1219    gather_data(Data, Session, NewState).
1220
1221init_wait_for_response_state(Request, State) ->
1222    Relaxed =
1223	(Request#request.settings)#http_options.relaxed,
1224    MFA     = {httpc_response, parse,
1225	       [State#state.max_header_size, Relaxed]},
1226    State#state{request     = Request,
1227		mfa         = MFA,
1228		status_line = undefined,
1229		headers     = undefined,
1230		body        = undefined}.
1231gather_data(<<>>, Session, State) ->
1232    activate_once(Session),
1233    {noreply, State};
1234gather_data(Data, _, State) ->
1235    %% If we already received some bytes of
1236    %% the next response
1237    handle_info({httpc_handler, dummy, Data}, State).
1238
1239case_insensitive_header(Str) when is_list(Str) ->
1240    http_util:to_lower(Str);
1241%% Might be undefined if server does not send such a header
1242case_insensitive_header(Str) ->
1243    Str.
1244
1245activate_once(#session{socket = Socket, socket_type = SocketType}) ->
1246    case http_transport:setopts(SocketType, Socket, [{active, once}]) of
1247        ok ->
1248            ok;
1249        {error, _} -> %% inet can return einval instead of closed
1250            self() ! {http_transport:close_tag(SocketType), Socket}
1251    end.
1252
1253close_socket(#session{socket = {remote_close,_}}) ->
1254    ok;
1255close_socket(#session{socket = Socket, socket_type = SocketType}) ->
1256    http_transport:close(SocketType, Socket).
1257
1258activate_request_timeout(
1259  #state{request = #request{timer = OldRef} = Request} = State) ->
1260    Timeout = (Request#request.settings)#http_options.timeout,
1261    case Timeout of
1262	infinity ->
1263	    State;
1264	_ ->
1265	    ReqId = Request#request.id,
1266	    Msg       = {timeout, ReqId},
1267	    case OldRef of
1268		undefined ->
1269		    ok;
1270		_ ->
1271		    %% Timer is already running! This is the case for a redirect or retry
1272		    %% We need to restart the timer because the handler pid has changed
1273		    cancel_timer(OldRef, Msg)
1274	    end,
1275	    Ref       = erlang:send_after(Timeout, self(), Msg),
1276	    Request2  = Request#request{timer = Ref},
1277	    ReqTimers = [{Request#request.id, Ref} |
1278			 (State#state.timers)#timers.request_timers],
1279	    Timers    = #timers{request_timers = ReqTimers},
1280	    State#state{request = Request2, timers = Timers}
1281    end.
1282
1283activate_queue_timeout(infinity, State) ->
1284    State;
1285activate_queue_timeout(Time, State) ->
1286    Ref = erlang:send_after(Time, self(), timeout_queue),
1287    State#state{timers = #timers{queue_timer = Ref}}.
1288
1289
1290is_pipeline_enabled_client(#session{type = pipeline}) ->
1291    true;
1292is_pipeline_enabled_client(_) ->
1293    false.
1294
1295is_keep_alive_enabled_server("HTTP/1." ++ N, _) when (hd(N) >= $1) ->
1296    true;
1297is_keep_alive_enabled_server("HTTP/1.0",
1298			     #http_response_h{connection = "keep-alive"}) ->
1299    true;
1300is_keep_alive_enabled_server(_,_) ->
1301    false.
1302
1303is_keep_alive_connection(Headers, #session{client_close = ClientClose}) ->
1304    (not ((ClientClose) orelse httpc_response:is_server_closing(Headers))).
1305
1306try_to_enable_pipeline_or_keep_alive(
1307  #state{session      = Session,
1308	 request      = #request{method = Method},
1309	 status_line  = {Version, _, _},
1310	 headers      = Headers,
1311	 profile_name = ProfileName} = State) ->
1312    case is_keep_alive_enabled_server(Version, Headers) andalso
1313	  is_keep_alive_connection(Headers, Session) of
1314	true ->
1315	    case (is_pipeline_enabled_client(Session) andalso
1316		  httpc_request:is_idempotent(Method)) of
1317		true ->
1318		    insert_session(Session, ProfileName),
1319		    State#state{status = pipeline};
1320		false ->
1321		    insert_session(Session, ProfileName),
1322		    %% Make sure type is keep_alive in session
1323		    %% as it in this case might be pipeline
1324		    NewSession = Session#session{type = keep_alive},
1325		    State#state{status  = keep_alive,
1326				session = NewSession}
1327	    end;
1328	false ->
1329	    State#state{status = close}
1330    end.
1331
1332handle_server_closing(State = #state{status = close}) -> State;
1333handle_server_closing(State = #state{headers = undefined}) -> State;
1334handle_server_closing(State = #state{headers = Headers}) ->
1335    case httpc_response:is_server_closing(Headers) of
1336        true -> State#state{status = close};
1337        false -> State
1338    end.
1339
1340answer_request(#request{id = RequestId, from = From} = Request, Msg,
1341	       #state{session      = Session,
1342		      timers       = Timers,
1343		      profile_name = ProfileName} = State) ->
1344    httpc_response:send(From, Msg),
1345    RequestTimers = Timers#timers.request_timers,
1346    TimerRef =
1347	proplists:get_value(RequestId, RequestTimers, undefined),
1348    Timer = {RequestId, TimerRef},
1349    cancel_timer(TimerRef, {timeout, Request#request.id}),
1350    httpc_manager:request_done(RequestId, ProfileName),
1351    NewSession = maybe_make_session_available(ProfileName, Session),
1352    Timers2 = Timers#timers{request_timers = lists:delete(Timer,
1353							  RequestTimers)},
1354    State#state{request = Request#request{from = answer_sent},
1355		session = NewSession,
1356		timers  = Timers2}.
1357
1358maybe_make_session_available(ProfileName,
1359			     #session{available = false} = Session) ->
1360    update_session(ProfileName, Session, #session.available, true),
1361    Session#session{available = true};
1362maybe_make_session_available(_ProfileName, Session) ->
1363    Session.
1364
1365cancel_timers(#timers{request_timers = ReqTmrs, queue_timer = QTmr}) ->
1366    cancel_timer(QTmr, timeout_queue),
1367    CancelTimer = fun({_, Timer}) -> cancel_timer(Timer, timeout) end,
1368    lists:foreach(CancelTimer, ReqTmrs).
1369
1370cancel_timer(undefined, _) ->
1371    ok;
1372cancel_timer(Timer, TimeoutMsg) ->
1373    erlang:cancel_timer(Timer),
1374    receive
1375	TimeoutMsg ->
1376	    ok
1377    after 0 ->
1378	    ok
1379    end.
1380
1381retry_pipeline([], _) ->
1382    ok;
1383
1384%% Skip requests when the answer has already been sent
1385retry_pipeline([#request{from = answer_sent}|PipeLine], State) ->
1386    retry_pipeline(PipeLine, State);
1387
1388retry_pipeline([Request | PipeLine],
1389	      #state{timers       = Timers,
1390		     profile_name = ProfileName} = State) ->
1391    NewState =
1392	case (catch httpc_manager:retry_request(Request, ProfileName)) of
1393	    ok ->
1394		RequestTimers = Timers#timers.request_timers,
1395		ReqId    = Request#request.id,
1396		TimerRef =
1397		    proplists:get_value(ReqId, RequestTimers, undefined),
1398		cancel_timer(TimerRef, {timeout, ReqId}),
1399		NewReqsTimers = lists:delete({ReqId, TimerRef}, RequestTimers),
1400		NewTimers     = Timers#timers{request_timers = NewReqsTimers},
1401		State#state{timers = NewTimers};
1402
1403	    Error ->
1404		answer_request(Request#request.from,
1405			       httpc_response:error(Request, Error), State)
1406	end,
1407    retry_pipeline(PipeLine, NewState).
1408
1409handle_proxy_options(https, #options{https_proxy = {HttpsProxy, _} = HttpsProxyOpt}) when
1410      HttpsProxy =/= undefined ->
1411    HttpsProxyOpt;
1412handle_proxy_options(_, #options{proxy = Proxy}) ->
1413    Proxy.
1414
1415%%% Check to see if the given {Host,Port} tuple is in the NoProxyList
1416%%% Returns an eventually updated {Host,Port} tuple, with the proxy address
1417handle_proxy(HostPort = {Host, _Port}, {Proxy, NoProxy}) ->
1418    case Proxy of
1419	undefined ->
1420	    HostPort;
1421	Proxy ->
1422	    case is_no_proxy_dest(Host, NoProxy) of
1423		true ->
1424		    HostPort;
1425		false ->
1426		    Proxy
1427	    end
1428    end.
1429
1430is_no_proxy_dest(_, []) ->
1431    false;
1432is_no_proxy_dest(Host, [ "*." ++ NoProxyDomain | NoProxyDests]) ->
1433
1434    case is_no_proxy_dest_domain(Host, NoProxyDomain) of
1435	true ->
1436	    true;
1437	false ->
1438	    is_no_proxy_dest(Host, NoProxyDests)
1439    end;
1440
1441is_no_proxy_dest(Host, [NoProxyDest | NoProxyDests]) ->
1442    IsNoProxyDest = case http_util:is_hostname(NoProxyDest) of
1443			true ->
1444			    fun is_no_proxy_host_name/2;
1445			false ->
1446			    fun is_no_proxy_dest_address/2
1447		    end,
1448
1449    case IsNoProxyDest(Host, NoProxyDest) of
1450	true ->
1451	    true;
1452	false ->
1453	    is_no_proxy_dest(Host, NoProxyDests)
1454    end.
1455
1456is_no_proxy_host_name(Host, Host) ->
1457    true;
1458is_no_proxy_host_name(_,_) ->
1459    false.
1460
1461is_no_proxy_dest_domain(Dest, DomainPart) ->
1462    lists:suffix(DomainPart, Dest).
1463
1464is_no_proxy_dest_address(Dest, Dest) ->
1465    true;
1466is_no_proxy_dest_address(Dest, AddressPart) ->
1467    lists:prefix(AddressPart, Dest).
1468
1469init_mfa(#request{settings = Settings}, State) ->
1470	Relaxed = Settings#http_options.relaxed,
1471	{httpc_response, parse, [State#state.max_header_size, Relaxed]}.
1472
1473socket_type(#request{scheme = http}) ->
1474    ip_comm;
1475socket_type(#request{scheme = https, settings = Settings}) ->
1476    Settings#http_options.ssl.
1477
1478start_stream({_Version, _Code, _ReasonPhrase}, _Headers,
1479	     #request{stream = none} = Request) ->
1480    {ok, Request};
1481start_stream({_Version, Code, _ReasonPhrase}, Headers,
1482	     #request{stream = self} = Request)
1483  when ?IS_STREAMED(Code) ->
1484    Msg = httpc_response:stream_start(Headers, Request, ignore),
1485    httpc_response:send(Request#request.from, Msg),
1486    {ok, Request};
1487start_stream({_Version, Code, _ReasonPhrase}, Headers,
1488	     #request{stream = {self, once}} = Request)
1489  when ?IS_STREAMED(Code) ->
1490    Msg = httpc_response:stream_start(Headers, Request, self()),
1491    httpc_response:send(Request#request.from, Msg),
1492    {ok, Request};
1493start_stream({_Version, Code, _ReasonPhrase}, _Headers,
1494	     #request{stream = Filename} = Request)
1495  when ?IS_STREAMED(Code) andalso is_list(Filename) ->
1496    case file:open(Filename, [write, raw, append, delayed_write]) of
1497        {ok, Fd} ->
1498            {ok, Request#request{stream = Fd}};
1499        {error, Reason} ->
1500            exit({stream_to_file_failed, Reason})
1501    end;
1502start_stream(_StatusLine, _Headers, Request) ->
1503    {ok, Request}.
1504
1505stream_remaining_body(<<>>, _, _) ->
1506    ok;
1507stream_remaining_body(Body, Request, {_, Code, _}) ->
1508    stream(Body, Request, Code).
1509
1510%% Note the end stream message is handled by httpc_response and will
1511%% be sent by answer_request
1512end_stream(_, #request{stream = none}) ->
1513    ok;
1514end_stream(_, #request{stream = self}) ->
1515    ok;
1516end_stream(_, #request{stream = {self, once}}) ->
1517    ok;
1518end_stream({_,200,_}, #request{stream = Fd}) ->
1519    case file:close(Fd) of
1520	ok ->
1521	    ok;
1522	{error, enospc} -> % Could be due to delayed_write
1523	    file:close(Fd)
1524    end;
1525end_stream({_,206,_}, #request{stream = Fd}) ->
1526    case file:close(Fd) of
1527       ok ->
1528           ok;
1529       {error, enospc} -> % Could be due to delayed_write
1530           file:close(Fd)
1531    end;
1532end_stream(_, _) ->
1533    ok.
1534
1535
1536next_body_chunk(#state{request = #request{stream = {self, once}},
1537		       once    = once,
1538		       session = Session} = State,
1539		Code) when ?IS_STREAMED(Code) ->
1540    activate_once(Session),
1541    State#state{once = inactive};
1542next_body_chunk(#state{request = #request{stream = {self, once}},
1543		       once = inactive} = State,
1544		Code) when ?IS_STREAMED(Code) ->
1545    State; %% Wait for user to call stream_next
1546next_body_chunk(#state{session = Session} = State, _) ->
1547    activate_once(Session),
1548    State.
1549
1550handle_verbose(verbose) ->
1551    dbg:p(self(), [r]);
1552handle_verbose(debug) ->
1553    dbg:p(self(), [call]),
1554    dbg:tp(?MODULE, [{'_', [], [{return_trace}]}]);
1555handle_verbose(trace) ->
1556    dbg:p(self(), [call]),
1557    dbg:tpl(?MODULE, [{'_', [], [{return_trace}]}]);
1558handle_verbose(_) ->
1559    ok.
1560
1561send_raw(#session{socket = Socket, socket_type = SocketType},
1562	 {ProcessBody, Acc}) when is_function(ProcessBody, 1) ->
1563    send_raw(SocketType, Socket, ProcessBody, Acc);
1564send_raw(#session{socket = Socket, socket_type = SocketType}, Body) ->
1565    http_transport:send(SocketType, Socket, Body).
1566
1567send_raw(SocketType, Socket, ProcessBody, Acc) ->
1568    case ProcessBody(Acc) of
1569        eof ->
1570            ok;
1571        {ok, Data, NewAcc} ->
1572            DataBin = iolist_to_binary(Data),
1573            case http_transport:send(SocketType, Socket, DataBin) of
1574                ok ->
1575                    send_raw(SocketType, Socket, ProcessBody, NewAcc);
1576                Error ->
1577                    Error
1578            end
1579    end.
1580
1581tls_tunnel(Address, Request, #state{session = #session{} = Session} = State,
1582	   ErrorHandler) ->
1583    UpgradeRequest = tls_tunnel_request(Request),
1584    case httpc_request:send(Address, Session, UpgradeRequest) of
1585	ok ->
1586	    TmpState = State#state{request = UpgradeRequest,
1587				   %%  session = Session,
1588				   mfa = init_mfa(UpgradeRequest, State),
1589				   status_line = undefined,
1590				   headers = undefined,
1591				   body = undefined},
1592	    activate_once(Session),
1593	    NewState = activate_request_timeout(TmpState),
1594	    {ok, NewState#state{status = {ssl_tunnel, Request}}};
1595	{error, Reason} ->
1596	   ErrorHandler(Request, State, Reason)
1597    end.
1598
1599tls_tunnel_request(#request{headers = Headers,
1600			     settings = Options,
1601			     id = RequestId,
1602			     from = From,
1603			     address =  {Host, Port}= Adress,
1604			     ipv6_host_with_brackets = IPV6}) ->
1605
1606    URI = Host ++":" ++ integer_to_list(Port),
1607
1608    #request{
1609       id = RequestId,
1610       from = From,
1611       scheme = http, %% Use tcp-first and then upgrade!
1612       address = Adress,
1613       path = URI,
1614       pquery  = "",
1615       method = connect,
1616       headers = #http_request_h{host = host_header(Headers, URI),
1617				 te = "",
1618				 pragma = "no-cache",
1619				 other = [{"Proxy-Connection", " Keep-Alive"}]},
1620       settings = Options,
1621       abs_uri = URI,
1622       stream = false,
1623       userinfo = "",
1624       headers_as_is = [],
1625       started  = http_util:timestamp(),
1626       ipv6_host_with_brackets = IPV6
1627      }.
1628
1629host_header(#http_request_h{host = Host}, _) ->
1630    Host;
1631
1632%% Handles headers_as_is
1633host_header(_, URI) ->
1634    #{host := Host} = uri_string:parse(URI),
1635    Host.
1636
1637tls_upgrade(#state{status =
1638		       {ssl_tunnel,
1639			#request{settings =
1640				     #http_options{ssl = {_, TLSOptions0} = SocketType},
1641				     address = {Host, _} = Address} = Request},
1642		   session = #session{socket = TCPSocket} = Session0,
1643		   options = Options} = State) ->
1644
1645    TLSOptions = maybe_add_sni(Host, TLSOptions0),
1646
1647    case ssl:connect(TCPSocket, TLSOptions) of
1648	{ok, TLSSocket} ->
1649	    ClientClose = httpc_request:is_client_closing(Request#request.headers),
1650	    SessionType = httpc_manager:session_type(Options),
1651	    Session = Session0#session{
1652			scheme = https,
1653			socket = TLSSocket,
1654			socket_type = SocketType,
1655			type = SessionType,
1656			client_close = ClientClose},
1657	    httpc_request:send(Address, Session, Request),
1658            activate_once(Session),
1659	    NewState = State#state{session = Session,
1660				   request = Request,
1661				   mfa = init_mfa(Request, State),
1662				   status_line = undefined,
1663				   headers = undefined,
1664				   body = undefined,
1665				   status = new
1666				  },
1667	    {noreply, activate_request_timeout(NewState)};
1668	{error, Reason} ->
1669	    Error = httpc_response:error(Request, {failed_connect,
1670						   [{to_address, Address},
1671						    {tls, TLSOptions, Reason}]}),
1672	    maybe_send_answer(Request, Error, State),
1673	    {stop, normal, State#state{request = Request}}
1674    end.
1675
1676maybe_add_sni(Host, Options) ->
1677    case http_util:is_hostname(Host) andalso
1678	  not lists:keymember(server_name_indication, 1, Options) of
1679	true ->
1680	    [{server_name_indication, Host} | Options];
1681	false ->
1682	    Options
1683    end.
1684
1685%% ---------------------------------------------------------------------
1686%% Session wrappers
1687%% ---------------------------------------------------------------------
1688
1689insert_session(Session, ProfileName) ->
1690    httpc_manager:insert_session(Session, ProfileName).
1691
1692
1693update_session(ProfileName, #session{id = SessionId} = Session, Pos, Value) ->
1694    try
1695	begin
1696	    httpc_manager:update_session(ProfileName, SessionId, Pos, Value)
1697	end
1698    catch
1699	error:undef -> %% This could happen during code upgrade
1700	    Session2 = erlang:setelement(Pos, Session, Value),
1701	    insert_session(Session2, ProfileName);
1702	error:badarg ->
1703	    {stop, normal};
1704	T:E:Stacktrace ->
1705	    %% Unexpected this must be an error!
1706            error_logger:error_msg("Failed updating session: "
1707                                   "~n   ProfileName: ~p"
1708                                   "~n   SessionId:   ~p"
1709                                   "~n   Pos:         ~p"
1710                                   "~n   Value:       ~p"
1711                                   "~nwhen"
1712                                   "~n   Session (db) info: ~p"
1713                                   "~n   Session (db):      ~p"
1714                                   "~n   Session (record):  ~p"
1715                                   "~n   T: ~p"
1716                                   "~n   E: ~p",
1717                                   [ProfileName, SessionId, Pos, Value,
1718                                    (catch httpc_manager:which_session_info(ProfileName)),
1719                                    Session,
1720                                    (catch httpc_manager:lookup_session(SessionId, ProfileName)),
1721                                    T, E]),
1722            {stop, {failed_updating_session,
1723                    [{profile,    ProfileName},
1724                     {session_id, SessionId},
1725                     {pos,        Pos},
1726                     {value,      Value},
1727                     {etype,      T},
1728                     {error,      E},
1729                     {stacktrace, Stacktrace}]}}
1730    end.
1731
1732format_address({[$[|T], Port}) ->
1733    {ok, Address} = inet:parse_address(string:strip(T, right, $])),
1734    {Address, Port};
1735format_address(HostPort) ->
1736    HostPort.
1737