1%%
2%% %CopyrightBegin%
3%%
4%% Copyright Ericsson AB 2002-2018. All Rights Reserved.
5%%
6%% Licensed under the Apache License, Version 2.0 (the "License");
7%% you may not use this file except in compliance with the License.
8%% You may obtain a copy of the License at
9%%
10%%     http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing, software
13%% distributed under the License is distributed on an "AS IS" BASIS,
14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15%% See the License for the specific language governing permissions and
16%% limitations under the License.
17%%
18%% %CopyrightEnd%
19%%
20%%
21
22-module(httpc_handler).
23
24-behaviour(gen_server).
25
26-include_lib("inets/src/http_lib/http_internal.hrl").
27-include("httpc_internal.hrl").
28
29-define(IS_STREAMED(Code), ((Code =:= 200) orelse (Code =:= 206))).
30
31%%--------------------------------------------------------------------
32%% Internal Application API
33-export([
34         start_link/4,
35         send/2,
36         cancel/2,
37         stream_next/1,
38         info/1
39        ]).
40
41%% gen_server callbacks
42-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
43         terminate/2, code_change/3]).
44
45-record(timers,
46        {
47          request_timers = [] :: [reference()],
48          queue_timer         :: reference() | 'undefined'
49         }).
50
51-record(state,
52        {
53          request                   :: request() | 'undefined',
54          session                   :: session() | 'undefined',
55          status_line,               % {Version, StatusCode, ReasonPharse}
56          headers                   :: http_response_h() | 'undefined',
57          body                      :: binary() | 'undefined',
58          mfa,                       % {Module, Function, Args}
59          pipeline = queue:new()    :: queue:queue(),
60          keep_alive = queue:new()  :: queue:queue(),
61          status                    :: undefined | new | pipeline | keep_alive | close | {ssl_tunnel, request()},
62          canceled = [],             % [RequestId]
63          max_header_size = nolimit :: nolimit | integer(),
64          max_body_size = nolimit   :: nolimit | integer(),
65          options                   :: options(),
66          timers = #timers{}        :: #timers{},
67          profile_name              :: atom(), % id of httpc_manager process.
68          once = inactive           :: 'inactive' | 'once'
69         }).
70
71
72%%====================================================================
73%% External functions
74%%====================================================================
75%%--------------------------------------------------------------------
76%% Function: start_link(Request, Options, ProfileName) -> {ok, Pid}
77%%
78%%      Request = #request{}
79%%      Options =  #options{}
80%%      ProfileName = atom() - id of httpc manager process
81%%
82%% Description: Starts a http-request handler process. Intended to be
83%% called by the httpc profile supervisor or the http manager process
84%% if the client is started stand alone form inets.
85%%
86%% Note: Uses proc_lib and gen_server:enter_loop so that waiting
87%% for gen_tcp:connect to timeout in init/1 will not
88%% block the httpc manager process in odd cases such as trying to call
89%% a server that does not exist. (See OTP-6735) The only API function
90%% sending messages to the handler process that can be called before
91%% init has completed is cancel and that is not a problem! (Send and
92%% stream will not be called before the first request has been sent and
93%% the reply or part of it has arrived.)
94%%--------------------------------------------------------------------
95%%--------------------------------------------------------------------
96
97start_link(Parent, Request, Options, ProfileName) ->
98    {ok, proc_lib:start_link(?MODULE, init, [[Parent, Request, Options,
99                                              ProfileName]])}.
100
101%%--------------------------------------------------------------------
102%% Function: send(Request, Pid) -> ok
103%%      Request = #request{}
104%%      Pid = pid() - the pid of the http-request handler process.
105%%
106%% Description: Uses this handlers session to send a request. Intended
107%% to be called by the httpc manager process.
108%%--------------------------------------------------------------------
109send(Request, Pid) ->
110    call(Request, Pid).
111
112
113%%--------------------------------------------------------------------
114%% Function: cancel(RequestId, Pid) -> ok
115%%      RequestId = reference()
116%%      Pid = pid() -  the pid of the http-request handler process.
117%%
118%% Description: Cancels a request. Intended to be called by the httpc
119%% manager process.
120%%--------------------------------------------------------------------
121cancel(RequestId, Pid) ->
122    cast({cancel, RequestId}, Pid).
123
124
125%%--------------------------------------------------------------------
126%% Function: stream_next(Pid) -> ok
127%%      Pid = pid() -  the pid of the http-request handler process.
128%%
129%% Description: Works as inets:setopts(active, once) but for
130%% body chunks sent to the user.
131%%--------------------------------------------------------------------
132stream_next(Pid) ->
133    cast(stream_next, Pid).
134
135
136%%--------------------------------------------------------------------
137%% Function: info(Pid) -> [{Key, Val}]
138%%      Pid = pid() -  the pid of the http-request handler process.
139%%
140%% Description:
141%%     Returns various information related to this handler
142%%     Used for debugging and testing
143%%--------------------------------------------------------------------
144info(Pid) ->
145    try
146	call(info, Pid)
147    catch
148	_:_ ->
149	    []
150    end.
151
152%%--------------------------------------------------------------------
153%% Function: stream(BodyPart, Request, Code) -> _
154%%      BodyPart = binary()
155%%      Request = #request{}
156%%      Code = integer()
157%%
158%% Description: Stream the HTTP body to the caller process (client)
159%%              or to a file. Note that the data that has been stream
160%%              does not have to be saved. (We do not want to use up
161%%              memory in vain.)
162%%--------------------------------------------------------------------
163%% Request should not be streamed
164stream(BodyPart, #request{stream = none} = Request, _) ->
165    {false, BodyPart, Request};
166
167%% Stream to caller
168stream(BodyPart, #request{stream = Self} = Request, Code)
169  when ?IS_STREAMED(Code) andalso
170       ((Self =:= self) orelse (Self =:= {self, once})) ->
171    httpc_response:send(Request#request.from,
172                        {Request#request.id, stream, BodyPart}),
173    {true, <<>>, Request};
174
175%% Stream to file
176%% This has been moved to start_stream/3
177%% We keep this for backward compatibillity...
178stream(BodyPart, #request{stream = Filename} = Request, Code)
179  when ?IS_STREAMED(Code) andalso is_list(Filename) ->
180    case file:open(Filename, [write, raw, append, delayed_write]) of
181        {ok, Fd} ->
182            stream(BodyPart, Request#request{stream = Fd}, 200);
183        {error, Reason} ->
184            exit({stream_to_file_failed, Reason})
185    end;
186
187%% Stream to file
188stream(BodyPart, #request{stream = Fd} = Request, Code)
189  when ?IS_STREAMED(Code) ->
190    case file:write(Fd, BodyPart) of
191        ok ->
192            {true, <<>>, Request};
193        {error, Reason} ->
194            exit({stream_to_file_failed, Reason})
195    end;
196
197stream(BodyPart, Request,_) -> % only 200 and 206 responses can be streamed
198    {false, BodyPart, Request}.
199
200
201%%====================================================================
202%% Server functions
203%%====================================================================
204
205%%--------------------------------------------------------------------
206%% Function: init([Options, ProfileName]) -> {ok, State} |
207%%                       {ok, State, Timeout} | ignore | {stop, Reason}
208%%
209%%      Options =  #options{}
210%%      ProfileName = atom() - id of httpc manager process
211%%
212%% Description: Initiates the httpc_handler process
213%%
214%% Note: The init function may not fail, that will kill the
215%% httpc_manager process. We could make the httpc_manager more comlex
216%% but we do not want that so errors will be handled by the process
217%% sending an init_error message to itself.
218%%--------------------------------------------------------------------
219init([Parent, Request, Options, ProfileName]) ->
220    process_flag(trap_exit, true),
221
222    %% Do not let initial tcp-connection block the manager-process
223    proc_lib:init_ack(Parent, self()),
224    handle_verbose(Options#options.verbose),
225    ProxyOptions = handle_proxy_options(Request#request.scheme, Options),
226    Address = handle_proxy(Request#request.address, ProxyOptions),
227    {ok, State} =
228        %% #state.once should initially be 'inactive' because we
229        %% activate the socket at first regardless of the state.
230        case {Address /= Request#request.address, Request#request.scheme} of
231            {true, https} ->
232                connect_and_send_upgrade_request(Address, Request,
233                                                 #state{options = Options,
234                                                        profile_name = ProfileName});
235            {_, _} ->
236                connect_and_send_first_request(Address, Request,
237                                   #state{options = Options,
238                                          profile_name = ProfileName})
239        end,
240    gen_server:enter_loop(?MODULE, [], State).
241
242%%--------------------------------------------------------------------
243%% Function: handle_call(Request, From, State) -> {reply, Reply, State} |
244%%          {reply, Reply, State, Timeout} |
245%%          {noreply, State}               |
246%%          {noreply, State, Timeout}      |
247%%          {stop, Reason, Reply, State}   | (terminate/2 is called)
248%%          {stop, Reason, State}            (terminate/2 is called)
249%% Description: Handling call messages
250%%--------------------------------------------------------------------
251handle_call(Request, From, State) ->
252    try do_handle_call(Request, From, State) of
253	Result ->
254	    Result
255    catch
256	Class:Reason:ST ->
257	    {stop, {shutdown, {{Class, Reason}, ST}}, State}
258    end.
259
260
261%%--------------------------------------------------------------------
262%% Function: handle_cast(Msg, State) -> {noreply, State} |
263%%          {noreply, State, Timeout} |
264%%          {stop, Reason, State}            (terminate/2 is called)
265%% Description: Handling cast messages
266%%--------------------------------------------------------------------
267handle_cast(Msg, State) ->
268    try do_handle_cast(Msg, State) of
269	Result ->
270	    Result
271    catch
272	Class:Reason:ST ->
273	    {stop, {shutdown, {{Class, Reason}, ST}}, State}
274    end.
275
276%%--------------------------------------------------------------------
277%% Function: handle_info(Info, State) -> {noreply, State} |
278%%          {noreply, State, Timeout} |
279%%          {stop, Reason, State}            (terminate/2 is called)
280%% Description: Handling all non call/cast messages
281%%--------------------------------------------------------------------
282handle_info(Info, State) ->
283    try do_handle_info(Info, State) of
284	Result ->
285	    Result
286    catch
287	Class:Reason:ST ->
288	    {stop, {shutdown, {{Class, Reason}, ST}}, State}
289    end.
290
291%%--------------------------------------------------------------------
292%% Function: terminate(Reason, State) -> _  (ignored by gen_server)
293%% Description: Shutdown the httpc_handler
294%%--------------------------------------------------------------------
295
296terminate(normal, #state{session = undefined}) ->
297    ok;
298
299%% Init error sending, no session information has been setup but
300%% there is a socket that needs closing.
301terminate(normal,
302          #state{session = #session{id = undefined} = Session}) ->
303    close_socket(Session);
304
305%% Socket closed remotely
306terminate(normal,
307          #state{session = #session{socket      = {remote_close, Socket},
308                                    socket_type = SocketType,
309                                    id          = Id},
310                 profile_name = ProfileName,
311                 request      = Request,
312                 timers       = Timers,
313                 pipeline     = Pipeline,
314                 keep_alive   = KeepAlive} = State) ->
315    %% Clobber session
316    (catch httpc_manager:delete_session(Id, ProfileName)),
317
318    maybe_retry_queue(Pipeline, State),
319    maybe_retry_queue(KeepAlive, State),
320
321    %% Cancel timers
322    cancel_timers(Timers),
323
324    %% Maybe deliver answers to requests
325    deliver_answer(Request),
326
327    %% And, just in case, close our side (**really** overkill)
328    http_transport:close(SocketType, Socket);
329
330terminate(_Reason, #state{session = #session{id          = Id,
331                                            socket      = Socket,
332                                            socket_type = SocketType},
333                    request      = undefined,
334                    profile_name = ProfileName,
335                    timers       = Timers,
336                    pipeline     = Pipeline,
337                    keep_alive   = KeepAlive} = State) ->
338
339    %% Clobber session
340    (catch httpc_manager:delete_session(Id, ProfileName)),
341
342    maybe_retry_queue(Pipeline, State),
343    maybe_retry_queue(KeepAlive, State),
344
345    cancel_timer(Timers#timers.queue_timer, timeout_queue),
346    http_transport:close(SocketType, Socket);
347
348terminate(_Reason, #state{request = undefined}) ->
349    ok;
350
351terminate(Reason, #state{request = Request} = State) ->
352    NewState = maybe_send_answer(Request,
353                                 httpc_response:error(Request, Reason),
354                                 State),
355    terminate(Reason, NewState#state{request = undefined}).
356
357%%--------------------------------------------------------------------
358%% Func: code_change(_OldVsn, State, Extra) -> {ok, NewState}
359%% Purpose: Convert process state when code is changed
360%%--------------------------------------------------------------------
361
362code_change(_, State, _) ->
363    {ok, State}.
364
365%%%--------------------------------------------------------------------
366%%% Internal functions
367%%%--------------------------------------------------------------------
368do_handle_call(#request{address = Addr} = Request, _,
369            #state{status  = Status,
370                   session = #session{type = pipeline} = Session,
371                   timers  = Timers,
372                   options = #options{proxy = Proxy} = _Options,
373                   profile_name = ProfileName} = State0)
374  when Status =/= undefined ->
375    Address = handle_proxy(Addr, Proxy),
376    case httpc_request:send(Address, Session, Request) of
377        ok ->
378
379            ?hcrd("request sent", []),
380
381            %% Activate the request time out for the new request
382            State1 =
383                activate_request_timeout(State0#state{request = Request}),
384
385            ClientClose =
386                httpc_request:is_client_closing(Request#request.headers),
387
388            case State0#state.request of
389                #request{} = OldRequest -> %% Old request not yet finished
390                    %% Make sure to use the new value of timers in state
391		    NewTimers = State1#state.timers,
392                    NewPipeline = queue:in(Request, State1#state.pipeline),
393                    NewSession  =
394                        Session#session{queue_length =
395                                        %% Queue + current
396                                        queue:len(NewPipeline) + 1,
397                                        client_close = ClientClose},
398                    insert_session(NewSession, ProfileName),
399                    {reply, ok, State1#state{
400				  request = OldRequest,
401				  pipeline = NewPipeline,
402				  session  = NewSession,
403				  timers   = NewTimers}};
404                undefined ->
405                    %% Note: tcp-message receiving has already been
406                    %% activated by handle_pipeline/2.
407                    cancel_timer(Timers#timers.queue_timer,
408                                 timeout_queue),
409                    NewSession =
410                        Session#session{queue_length = 1,
411                                        client_close = ClientClose},
412                    httpc_manager:insert_session(NewSession, ProfileName),
413                    NewTimers = Timers#timers{queue_timer = undefined},
414		    State = init_wait_for_response_state(Request, State1#state{session = NewSession,
415								      timers = NewTimers}),
416                    {reply, ok, State}
417            end;
418        {error, Reason} ->
419            NewPipeline = queue:in(Request, State0#state.pipeline),
420            {stop, {shutdown, {pipeline_failed, Reason}}, State0#state{pipeline = NewPipeline}}
421    end;
422
423do_handle_call(#request{address = Addr} = Request, _,
424            #state{status  = Status,
425                   session = #session{type = keep_alive} = Session,
426                   timers  = Timers,
427                   options = #options{proxy = Proxy} = _Options,
428                   profile_name = ProfileName} = State0)
429  when Status =/= undefined ->
430
431    ClientClose = httpc_request:is_client_closing(Request#request.headers),
432
433    case State0#state.request of
434	#request{} -> %% Old request not yet finished
435	    %% Make sure to use the new value of timers in state
436	    NewKeepAlive = queue:in(Request, State0#state.keep_alive),
437	    NewSession   =
438		Session#session{queue_length =
439				    %% Queue + current
440				    queue:len(NewKeepAlive) + 1,
441				client_close = ClientClose},
442	    insert_session(NewSession, ProfileName),
443	    {reply, ok, State0#state{keep_alive = NewKeepAlive,
444				    session    = NewSession}};
445	undefined ->
446	    %% Note: tcp-message receiving has already been
447	    %% activated by handle_pipeline/2.
448	    cancel_timer(Timers#timers.queue_timer,
449			 timeout_queue),
450	    NewTimers = Timers#timers{queue_timer = undefined},
451	    State1 = State0#state{timers = NewTimers},
452	    Address = handle_proxy(Addr, Proxy),
453	    case httpc_request:send(Address, Session, Request) of
454		ok ->
455		    %% Activate the request time out for the new request
456		    State2 =
457			activate_request_timeout(State1#state{request = Request}),
458		    NewSession =
459			Session#session{queue_length = 1,
460					client_close = ClientClose},
461		    insert_session(NewSession, ProfileName),
462		    State = init_wait_for_response_state(Request, State2#state{session = NewSession}),
463		    {reply, ok, State};
464		{error, Reason} ->
465		    {stop, {shutdown, {keepalive_failed, Reason}}, State1}
466	    end
467    end;
468do_handle_call(info, _, State) ->
469    Info = handler_info(State),
470    {reply, Info, State}.
471
472%% When the request in process has been canceled the handler process is
473%% stopped and the pipelined requests will be reissued or remaining
474%% requests will be sent on a new connection. This is is
475%% based on the assumption that it is probably cheaper to reissue the
476%% requests than to wait for a potentiall large response that we then
477%% only throw away. This of course is not always true maybe we could
478%% do something smarter here?! If the request canceled is not
479%% the one handled right now the same effect will take place in
480%% handle_pipeline/2 when the canceled request is on turn,
481%% handle_keep_alive_queue/2 on the other hand will just skip the
482%% request as if it was never issued as in this case the request will
483%% not have been sent.
484do_handle_cast({cancel, RequestId},
485            #state{request      = #request{id = RequestId} = Request,
486                   canceled     = Canceled} = State) ->
487    {stop, normal,
488     State#state{canceled = [RequestId | Canceled],
489                 request  = Request#request{from = answer_sent}}};
490do_handle_cast({cancel, RequestId},
491	       #state{request = #request{},
492		      canceled = Canceled} = State) ->
493    {noreply, State#state{canceled = [RequestId | Canceled]}};
494do_handle_cast({cancel, _},
495	       #state{request = undefined} = State) ->
496    {noreply, State};
497
498do_handle_cast(stream_next, #state{session = Session} = State) ->
499    activate_once(Session),
500    %% Inactivate the #state.once here because we don't want
501    %% next_body_chunk/1 to activate the socket twice.
502    {noreply, State#state{once = inactive}}.
503
504do_handle_info({Proto, _Socket, Data},
505            #state{mfa = {Module, Function, Args},
506                   request = #request{method = Method} = Request,
507                   session = Session,
508                   status_line = StatusLine} = State)
509  when (Proto =:= tcp) orelse
510       (Proto =:= ssl) orelse
511       (Proto =:= httpc_handler) ->
512
513    try Module:Function([Data | Args]) of
514	{ok, Result} ->
515	    handle_http_msg(Result, State);
516	{_, whole_body, _} when Method =:= head ->
517	    handle_response(State#state{body = <<>>});
518	{Module, whole_body, [Body, Length]} ->
519	    {_, Code, _} = StatusLine,
520	    {Streamed, NewBody, NewRequest} = stream(Body, Request, Code),
521	    %% When we stream we will not keep the already
522	    %% streamed data, that would be a waste of memory.
523	    NewLength =
524		case Streamed of
525		    false ->
526			Length;
527		    true ->
528			Length - size(Body)
529		end,
530
531	    NewState = next_body_chunk(State, Code),
532	    NewMFA   = {Module, whole_body, [NewBody, NewLength]},
533	    {noreply, NewState#state{mfa     = NewMFA,
534				     request = NewRequest}};
535        {Module, decode_size,
536             [TotalChunk, HexList, AccHeaderSize,
537              {MaxBodySize, BodySoFar, AccLength, MaxHeaderSize}]}
538	  when BodySoFar =/= <<>> ->
539	    %% The response body is chunk-encoded. Steal decoded
540	    %% chunks as much as possible to stream.
541	    {_, Code, _} = StatusLine,
542	    {_, NewBody, NewRequest} = stream(BodySoFar, Request, Code),
543	    NewState = next_body_chunk(State, Code),
544	    NewMFA   = {Module, decode_size,
545			[TotalChunk, HexList, AccHeaderSize,
546                             {MaxBodySize, NewBody, AccLength, MaxHeaderSize}]},
547	    {noreply, NewState#state{mfa     = NewMFA,
548				     request = NewRequest}};
549	{Module, decode_data,
550	 [ChunkSize, TotalChunk,
551	  {MaxBodySize, BodySoFar, AccLength, MaxHeaderSize}]}
552	  when TotalChunk =/= <<>> orelse BodySoFar =/= <<>> ->
553	    %% The response body is chunk-encoded. Steal decoded
554	    %% chunks as much as possible to stream.
555	    ChunkSizeToSteal = min(ChunkSize, byte_size(TotalChunk)),
556	    <<StolenChunk:ChunkSizeToSteal/binary, NewTotalChunk/binary>> = TotalChunk,
557	    StolenBody   = <<BodySoFar/binary, StolenChunk/binary>>,
558	    NewChunkSize = ChunkSize - ChunkSizeToSteal,
559	    {_, Code, _} = StatusLine,
560
561	    {_, NewBody, NewRequest} = stream(StolenBody, Request, Code),
562	    NewState = next_body_chunk(State, Code),
563	    NewMFA   = {Module, decode_data,
564			[NewChunkSize, NewTotalChunk,
565			 {MaxBodySize, NewBody, AccLength, MaxHeaderSize}]},
566                {noreply, NewState#state{mfa     = NewMFA,
567                                         request = NewRequest}};
568	NewMFA ->
569	    activate_once(Session),
570	    {noreply, State#state{mfa = NewMFA}}
571    catch
572	Class:Reason:ST ->
573	    ClientReason = {could_not_parse_as_http, Data},
574	    ClientErrMsg = httpc_response:error(Request, ClientReason),
575	    NewState     = answer_request(Request, ClientErrMsg, State),
576	    {stop, {shutdown, {{Class, Reason}, ST}}, NewState}
577    end;
578
579do_handle_info({Proto, Socket, Data},
580            #state{mfa          = MFA,
581                   request      = Request,
582                   session      = Session,
583                   status       = Status,
584                   status_line  = StatusLine,
585                   profile_name = Profile} = State)
586  when (Proto =:= tcp) orelse
587       (Proto =:= ssl) orelse
588       (Proto =:= httpc_handler) ->
589
590    error_logger:warning_msg("Received unexpected ~p data on ~p"
591                             "~n   Data:       ~p"
592                             "~n   MFA:        ~p"
593                             "~n   Request:    ~p"
594                             "~n   Session:    ~p"
595                             "~n   Status:     ~p"
596                             "~n   StatusLine: ~p"
597                             "~n   Profile:    ~p"
598                             "~n",
599                             [Proto, Socket, Data, MFA,
600                              Request, Session, Status, StatusLine, Profile]),
601
602    {noreply, State};
603
604%% The Server may close the connection to indicate that the
605%% whole body is now sent instead of sending an length
606%% indicator.
607do_handle_info({tcp_closed, _}, State = #state{mfa = {_, whole_body, Args}}) ->
608    handle_response(State#state{body = hd(Args)});
609do_handle_info({ssl_closed, _}, State = #state{mfa = {_, whole_body, Args}}) ->
610    handle_response(State#state{body = hd(Args)});
611
612%%% Server closes idle pipeline
613do_handle_info({tcp_closed, _}, State = #state{request = undefined}) ->
614    {stop, normal, State};
615do_handle_info({ssl_closed, _}, State = #state{request = undefined}) ->
616    {stop, normal, State};
617
618%%% Error cases
619do_handle_info({tcp_closed, _}, #state{session = Session0} = State) ->
620    Socket  = Session0#session.socket,
621    Session = Session0#session{socket = {remote_close, Socket}},
622    %% {stop, session_remotly_closed, State};
623    {stop, normal, State#state{session = Session}};
624do_handle_info({ssl_closed, _}, #state{session = Session0} = State) ->
625    Socket  = Session0#session.socket,
626    Session = Session0#session{socket = {remote_close, Socket}},
627    %% {stop, session_remotly_closed, State};
628    {stop, normal, State#state{session = Session}};
629do_handle_info({tcp_error, _, _} = Reason, State) ->
630    {stop, Reason, State};
631do_handle_info({ssl_error, _, _} = Reason, State) ->
632    {stop, Reason, State};
633
634%% Timeouts
635%% Internally, to a request handling process, a request timeout is
636%% seen as a canceled request.
637do_handle_info({timeout, RequestId},
638            #state{request      = #request{id = RequestId} = Request,
639                   canceled     = Canceled,
640                   profile_name = ProfileName} = State) ->
641    httpc_response:send(Request#request.from,
642                        httpc_response:error(Request, timeout)),
643    httpc_manager:request_done(RequestId, ProfileName),
644    {stop, normal,
645     State#state{request  = Request#request{from = answer_sent},
646                 canceled = [RequestId | Canceled]}};
647
648do_handle_info({timeout, RequestId},
649            #state{canceled     = Canceled,
650                   profile_name = ProfileName} = State) ->
651    Filter =
652        fun(#request{id = Id, from = From} = Request) when Id =:= RequestId ->
653                %% Notify the owner
654                httpc_response:send(From,
655                                    httpc_response:error(Request, timeout)),
656                httpc_manager:request_done(RequestId, ProfileName),
657                [Request#request{from = answer_sent}];
658           (_) ->
659                true
660        end,
661    case State#state.status of
662        pipeline ->
663            Pipeline = queue:filter(Filter, State#state.pipeline),
664            {noreply, State#state{canceled = [RequestId | Canceled],
665                                  pipeline = Pipeline}};
666        keep_alive ->
667            KeepAlive = queue:filter(Filter, State#state.keep_alive),
668            {noreply, State#state{canceled   = [RequestId | Canceled],
669                                  keep_alive = KeepAlive}}
670    end;
671
672do_handle_info(timeout_queue, State = #state{request = undefined}) ->
673    {stop, normal, State};
674
675%% Timing was such as the queue_timeout was not canceled!
676do_handle_info(timeout_queue, #state{timers = Timers} = State) ->
677    {noreply, State#state{timers =
678                          Timers#timers{queue_timer = undefined}}};
679
680%% Setting up the connection to the server somehow failed.
681do_handle_info({init_error, Reason, ClientErrMsg},
682            State = #state{request = Request}) ->
683    NewState = answer_request(Request, ClientErrMsg, State),
684    {stop, {shutdown, Reason}, NewState};
685
686%%% httpc_manager process dies.
687do_handle_info({'EXIT', _, _}, State = #state{request = undefined}) ->
688    {stop, normal, State};
689%%Try to finish the current request anyway,
690%% there is a fairly high probability that it can be done successfully.
691%% Then close the connection, hopefully a new manager is started that
692%% can retry requests in the pipeline.
693do_handle_info({'EXIT', _, _}, State) ->
694    {noreply, State#state{status = close}}.
695
696call(Msg, Pid) ->
697    try gen_server:call(Pid, Msg, infinity)
698    catch
699 	exit:{noproc, _} ->
700 	    {error, closed};
701	exit:{normal, _} ->
702	    {error, closed};
703	exit:{{shutdown, _},_} ->
704	    {error, closed}
705    end.
706
707cast(Msg, Pid) ->
708    gen_server:cast(Pid, Msg).
709
710maybe_retry_queue(Q, State) ->
711    case queue:is_empty(Q) of
712        false ->
713            retry_pipeline(queue:to_list(Q), State);
714        true ->
715            ok
716    end.
717
718maybe_send_answer(#request{from = answer_sent}, _Reason, State) ->
719    State;
720maybe_send_answer(Request, Answer, State) ->
721    answer_request(Request, Answer, State).
722
723deliver_answer(#request{from = From} = Request)
724  when From =/= answer_sent ->
725    Response = httpc_response:error(Request, socket_closed_remotely),
726    httpc_response:send(From, Response);
727deliver_answer(_Request) ->
728    ok.
729
730%%%--------------------------------------------------------------------
731%%% Internal functions
732%%%--------------------------------------------------------------------
733
734connect(SocketType, ToAddress,
735        #options{ipfamily    = IpFamily,
736                 ip          = FromAddress,
737                 port        = FromPort,
738                 unix_socket = UnixSocket,
739                 socket_opts = Opts0}, Timeout) ->
740    Opts1 =
741        case FromPort of
742            default ->
743                Opts0;
744            _ ->
745                [{port, FromPort} | Opts0]
746        end,
747    Opts2 =
748        case FromAddress of
749            default ->
750                Opts1;
751            _ ->
752                [{ip, FromAddress} | Opts1]
753        end,
754    case IpFamily of
755        inet6fb4 ->
756            Opts3 = [inet6 | Opts2],
757            case http_transport:connect(SocketType,
758                                        ToAddress, Opts3, Timeout) of
759                {error, Reason6} ->
760                    Opts4 = [inet | Opts2],
761                    case http_transport:connect(SocketType,
762                                                ToAddress, Opts4, Timeout) of
763                        {error, Reason4} ->
764                            {error, {failed_connect,
765                                     [{to_address, ToAddress},
766                                      {inet6, Opts3, Reason6},
767                                      {inet,  Opts4, Reason4}]}};
768                        OK ->
769                            OK
770                    end;
771                OK ->
772                    OK
773            end;
774        local ->
775            Opts3 = [IpFamily | Opts2],
776            SocketAddr = {local, UnixSocket},
777            case http_transport:connect(SocketType, {SocketAddr, 0}, Opts3, Timeout) of
778                {error, Reason} ->
779                    {error, {failed_connect, [{to_address, SocketAddr},
780                                              {IpFamily, Opts3, Reason}]}};
781                Else ->
782                    Else
783            end;
784        _ ->
785            Opts3 = [IpFamily | Opts2],
786            case http_transport:connect(SocketType, ToAddress, Opts3, Timeout) of
787                {error, Reason} ->
788                    {error, {failed_connect, [{to_address, ToAddress},
789                                              {IpFamily, Opts3, Reason}]}};
790                Else ->
791                    Else
792            end
793    end.
794
795handle_unix_socket_options(#request{unix_socket = UnixSocket}, Options)
796  when UnixSocket =:= undefined ->
797    Options;
798
799handle_unix_socket_options(#request{unix_socket = UnixSocket},
800                           Options = #options{ipfamily = IpFamily}) ->
801    case IpFamily of
802        local ->
803            Options#options{unix_socket = UnixSocket};
804        Else ->
805            error({badarg, [{ipfamily, Else}, {unix_socket, UnixSocket}]})
806    end.
807
808connect_and_send_first_request(Address, Request, #state{options = Options0} = State) ->
809    SocketType  = socket_type(Request),
810    ConnTimeout = (Request#request.settings)#http_options.connect_timeout,
811    Options = handle_unix_socket_options(Request, Options0),
812    case connect(SocketType, format_address(Address), Options, ConnTimeout) of
813        {ok, Socket} ->
814            ClientClose =
815		httpc_request:is_client_closing(
816		  Request#request.headers),
817            SessionType = httpc_manager:session_type(Options),
818            SocketType  = socket_type(Request),
819            Session = #session{id = {Request#request.address, self()},
820                               scheme = Request#request.scheme,
821                               socket = Socket,
822			       socket_type = SocketType,
823			       client_close = ClientClose,
824			       type = SessionType},
825            case httpc_request:send(Address, Session, Request) of
826                ok ->
827                    TmpState = State#state{request = Request,
828                                           session = Session,
829                                           mfa = init_mfa(Request, State),
830                                           status_line = init_status_line(Request),
831                                           headers = undefined,
832                                           body = undefined,
833                                           status = new},
834                    http_transport:setopts(SocketType,
835                                           Socket, [{active, once}]),
836                    NewState = activate_request_timeout(TmpState),
837                    {ok, NewState};
838                {error, Reason} ->
839                    self() ! {init_error, error_sending,
840                              httpc_response:error(Request, Reason)},
841                    {ok, State#state{request = Request,
842                                     session = Session}}
843            end;
844        {error, Reason} ->
845            self() ! {init_error, error_connecting,
846                      httpc_response:error(Request, Reason)},
847            {ok, State#state{request = Request}}
848    end.
849
850connect_and_send_upgrade_request(Address, Request, #state{options = Options0} = State) ->
851    ConnTimeout = (Request#request.settings)#http_options.connect_timeout,
852    SocketType = ip_comm,
853    Options = handle_unix_socket_options(Request, Options0),
854    case connect(SocketType, Address, Options, ConnTimeout) of
855        {ok, Socket} ->
856	    SessionType = httpc_manager:session_type(Options),
857	    Session = #session{socket = Socket,
858			       socket_type = SocketType,
859			       id = {Request#request.address, self()},
860                               scheme = http,
861                               client_close = false,
862			       type = SessionType},
863	    ErrorHandler =
864		fun(ERequest, EState, EReason) ->
865			self() ! {init_error, error_sending,
866				  httpc_response:error(ERequest, EReason)},
867			{ok, EState#state{request = ERequest}} end,
868	    tls_tunnel(Address, Request, State#state{session = Session}, ErrorHandler);
869	{error, Reason} ->
870	    self() ! {init_error, error_connecting,
871		      httpc_response:error(Request, Reason)},
872	    {ok, State#state{request = Request}}
873    end.
874
875handler_info(#state{request     = Request,
876		    session     = Session,
877		    status_line = _StatusLine,
878		    pipeline    = Pipeline,
879		    keep_alive  = KeepAlive,
880		    status      = Status,
881		    canceled    = _Canceled,
882		    options     = _Options,
883		    timers      = _Timers} = _State) ->
884
885    %% Info about the current request
886    RequestInfo =
887	case Request of
888	    undefined ->
889		[];
890	    #request{id      = Id,
891		     started = ReqStarted} ->
892		[{id, Id}, {started, ReqStarted}]
893	end,
894
895    %% Info about the current session/socket
896    SessionType = Session#session.type,
897    QueueLen    = case SessionType of
898		      pipeline ->
899			  queue:len(Pipeline);
900		      keep_alive ->
901			  queue:len(KeepAlive)
902		  end,
903    Scheme     = Session#session.scheme,
904    Socket     = Session#session.socket,
905    SocketType = Session#session.socket_type,
906
907    SocketOpts  = http_transport:getopts(SocketType, Socket),
908    SocketStats = http_transport:getstat(SocketType, Socket),
909
910    Remote = http_transport:peername(SocketType, Socket),
911    Local  = http_transport:sockname(SocketType, Socket),
912
913    SocketInfo  = [{remote,       Remote},
914		   {local,        Local},
915		   {socket_opts,  SocketOpts},
916		   {socket_stats, SocketStats}],
917
918    SessionInfo =
919	[{type,         SessionType},
920	 {queue_length, QueueLen},
921	 {scheme,       Scheme},
922	 {socket_info,  SocketInfo}],
923
924    [{status,          Status},
925     {current_request, RequestInfo},
926     {session,         SessionInfo}].
927
928
929
930handle_http_msg({Version, StatusCode, ReasonPharse, Headers, Body},
931		State = #state{request = Request}) ->
932    case Headers#http_response_h.'content-type' of
933        "multipart/byteranges" ++ _Param ->
934            exit({not_yet_implemented, multypart_byteranges});
935        _ ->
936	    StatusLine       = {Version, StatusCode, ReasonPharse},
937	    {ok, NewRequest} = start_stream(StatusLine, Headers, Request),
938            handle_http_body(Body,
939			     State#state{request     = NewRequest,
940					 status_line = StatusLine,
941					 headers     = Headers})
942    end;
943handle_http_msg({ChunkedHeaders, Body},
944                #state{status_line = {_, Code, _}, headers = Headers} = State) ->
945    NewHeaders = http_chunk:handle_headers(Headers, ChunkedHeaders),
946    {_, NewBody, NewRequest} = stream(Body, State#state.request, Code),
947    handle_response(State#state{headers = NewHeaders,
948                                body    = NewBody,
949                                request = NewRequest});
950handle_http_msg(Body, #state{status_line = {_,Code, _}} = State) ->
951    {_, NewBody, NewRequest} = stream(Body, State#state.request, Code),
952    handle_response(State#state{body = NewBody, request = NewRequest}).
953
954handle_http_body(_, #state{status = {ssl_tunnel, _},
955			   status_line = {_,200, _}} = State) ->
956    tls_upgrade(State);
957
958handle_http_body(_, #state{status = {ssl_tunnel, Request},
959			   status_line = StatusLine} = State) ->
960    ClientErrMsg = httpc_response:error(Request,{could_no_establish_ssh_tunnel, StatusLine}),
961    NewState     = answer_request(Request, ClientErrMsg, State),
962    {stop, normal, NewState};
963
964%% All 1xx (informational), 204 (no content), and 304 (not modified)
965%% responses MUST NOT include a message-body, and thus are always
966%% terminated by the first empty line after the header fields.
967%% This implies that chunked encoding MUST NOT be used for these
968%% status codes.
969handle_http_body(<<>>, #state{headers = Headers,
970                              status_line = {_,StatusCode, _}} = State)
971  when Headers#http_response_h.'transfer-encoding' =/= "chunked" andalso
972       (StatusCode =:= 204 orelse                       %% No Content
973        StatusCode =:= 304 orelse                       %% Not Modified
974        100 =< StatusCode andalso StatusCode =< 199) -> %% Informational
975    handle_response(State#state{body = <<>>});
976
977
978handle_http_body(<<>>, #state{headers = Headers,
979                              request = #request{method = head}} = State)
980  when Headers#http_response_h.'transfer-encoding' =/= "chunked" ->
981    handle_response(State#state{body = <<>>});
982
983handle_http_body(Body, #state{headers       = Headers,
984			      max_body_size = MaxBodySize,
985			      status_line   = {_,Code, _},
986			      request       = Request} = State) ->
987    TransferEnc = Headers#http_response_h.'transfer-encoding',
988    case case_insensitive_header(TransferEnc) of
989        "chunked" ->
990	    try http_chunk:decode(Body, State#state.max_body_size,
991				  State#state.max_header_size) of
992		{Module, Function, Args} ->
993		    NewState = next_body_chunk(State, Code),
994		    {noreply, NewState#state{mfa =
995					     {Module, Function, Args}}};
996		{ok, {ChunkedHeaders, NewBody}} ->
997		    NewHeaders = http_chunk:handle_headers(Headers,
998							   ChunkedHeaders),
999                    case Body of
1000                        <<>> ->
1001                           handle_response(State#state{headers = NewHeaders,
1002                                                body    = NewBody});
1003                        _ ->
1004                           {_, NewBody2, _} =
1005                                stream(NewBody, Request, Code),
1006                           handle_response(State#state{headers = NewHeaders,
1007                                       body    = NewBody2})
1008                    end
1009	    catch throw:{error, Reason} ->
1010		NewState =
1011		    answer_request(Request,
1012				   httpc_response:error(Request,
1013							Reason),
1014				   State),
1015		    {stop, normal, NewState}
1016	    end;
1017        Enc when Enc =:= "identity"; Enc =:= undefined ->
1018            Length =
1019                list_to_integer(Headers#http_response_h.'content-length'),
1020            case ((Length =< MaxBodySize) orelse (MaxBodySize =:= nolimit)) of
1021                true ->
1022                    case httpc_response:whole_body(Body, Length) of
1023                        {ok, Body} ->
1024			    {_, NewBody, NewRequest} =
1025				stream(Body, Request, Code),
1026			    handle_response(State#state{body    = NewBody,
1027							request = NewRequest});
1028                        MFA ->
1029			    NewState = next_body_chunk(State, Code),
1030			    {noreply, NewState#state{mfa = MFA}}
1031		    end;
1032                false ->
1033		    NewState =
1034			answer_request(Request,
1035				       httpc_response:error(Request,
1036							    body_too_big),
1037				       State),
1038                    {stop, normal, NewState}
1039            end;
1040        Encoding when is_list(Encoding) ->
1041            NewState = answer_request(Request,
1042                                      httpc_response:error(Request,
1043                                                           unknown_encoding),
1044                                      State),
1045            {stop, normal, NewState}
1046    end.
1047
1048handle_response(#state{status = new} = State) ->
1049    ?hcrd("handle response - status = new", []),
1050    handle_response(try_to_enable_pipeline_or_keep_alive(State));
1051
1052handle_response(#state{status = Status0} = State0) when Status0 =/= new ->
1053    State = handle_server_closing(State0),
1054    #state{request      = Request,
1055           session      = Session,
1056           status_line  = StatusLine,
1057           headers      = Headers,
1058           body         = Body,
1059           options      = Options,
1060           profile_name = ProfileName} = State,
1061    handle_cookies(Headers, Request, Options, ProfileName),
1062    case httpc_response:result({StatusLine, Headers, Body}, Request) of
1063	%% 100-continue
1064	continue ->
1065	    %% Send request body
1066	    {_, RequestBody} = Request#request.content,
1067	    send_raw(Session, RequestBody),
1068	    %% Wait for next response
1069	    activate_once(Session),
1070	    Relaxed = (Request#request.settings)#http_options.relaxed,
1071	    MFA = {httpc_response, parse,
1072		   [State#state.max_header_size, Relaxed]},
1073	    {noreply, State#state{mfa         = MFA,
1074				  status_line = undefined,
1075				  headers     = undefined,
1076				  body        = undefined}};
1077
1078	%% Ignore unexpected 100-continue response and receive the
1079	%% actual response that the server will send right away.
1080	{ignore, Data} ->
1081	    Relaxed = (Request#request.settings)#http_options.relaxed,
1082	    MFA     = {httpc_response, parse,
1083		       [State#state.max_header_size, Relaxed]},
1084	    NewState = State#state{mfa         = MFA,
1085				   status_line = undefined,
1086				   headers     = undefined,
1087				   body        = undefined},
1088	    handle_info({httpc_handler, dummy, Data}, NewState);
1089
1090	%% On a redirect or retry the current request becomes
1091	%% obsolete and the manager will create a new request
1092	%% with the same id as the current.
1093	{redirect, NewRequest, Data} ->
1094	    ok = httpc_manager:redirect_request(NewRequest, ProfileName),
1095	    handle_queue(State#state{request = undefined}, Data);
1096	{retry, TimeNewRequest, Data} ->
1097	    ok = httpc_manager:retry_request(TimeNewRequest, ProfileName),
1098	    handle_queue(State#state{request = undefined}, Data);
1099	{ok, Msg, Data} ->
1100	    stream_remaining_body(Body, Request, StatusLine),
1101	    end_stream(StatusLine, Request),
1102	    NewState = maybe_send_answer(Request, Msg, State),
1103	    handle_queue(NewState, Data);
1104	{stop, Msg} ->
1105	    end_stream(StatusLine, Request),
1106	    NewState = maybe_send_answer(Request, Msg, State),
1107	    {stop, normal, NewState}
1108    end.
1109
1110handle_cookies(_,_, #options{cookies = disabled}, _) ->
1111    ok;
1112%% User wants to verify the cookies before they are stored,
1113%% so the user will have to call a store command.
1114handle_cookies(_,_, #options{cookies = verify}, _) ->
1115    ok;
1116handle_cookies(Headers, Request, #options{cookies = enabled}, ProfileName) ->
1117    {Host, _ } = Request#request.address,
1118    Cookies = httpc_cookie:cookies(Headers#http_response_h.other,
1119				  Request#request.path, Host),
1120    httpc_manager:store_cookies(Cookies, Request#request.address,
1121				ProfileName).
1122
1123%% This request could not be pipelined or used as sequential keep alive
1124%% queue
1125handle_queue(#state{status = close} = State, _) ->
1126    {stop, normal, State};
1127
1128handle_queue(#state{status = keep_alive} = State, Data) ->
1129    handle_keep_alive_queue(State, Data);
1130
1131handle_queue(#state{status = pipeline} = State, Data) ->
1132    handle_pipeline(State, Data).
1133
1134handle_pipeline(#state{status       = pipeline,
1135		       session      = Session,
1136		       profile_name = ProfileName,
1137		       options      = #options{pipeline_timeout = TimeOut}} = State,
1138		Data) ->
1139    case queue:out(State#state.pipeline) of
1140	{empty, _} ->
1141	    handle_empty_queue(Session, ProfileName, TimeOut, State);
1142	{{value, NextRequest}, Pipeline} ->
1143	    case lists:member(NextRequest#request.id,
1144			      State#state.canceled) of
1145		true ->
1146		    %% See comment for handle_cast({cancel, RequestId})
1147		    {stop, normal,
1148		     State#state{request =
1149				 NextRequest#request{from = answer_sent},
1150				 pipeline = Pipeline}};
1151		false ->
1152		    NewSession =
1153			Session#session{queue_length =
1154					%% Queue + current
1155					queue:len(Pipeline) + 1},
1156		    receive_response(NextRequest,
1157				     NewSession, Data,
1158				     State#state{pipeline = Pipeline})
1159	    end
1160    end.
1161
1162handle_keep_alive_queue(#state{status       = keep_alive,
1163			       session      = Session,
1164			       profile_name = ProfileName,
1165			       options      = #options{keep_alive_timeout = TimeOut,
1166						       proxy              = Proxy}} = State,
1167			Data) ->
1168    case queue:out(State#state.keep_alive) of
1169	{empty, _} ->
1170	    handle_empty_queue(Session, ProfileName, TimeOut, State);
1171	{{value, NextRequest}, KeepAlive} ->
1172	    case lists:member(NextRequest#request.id,
1173			      State#state.canceled) of
1174		true ->
1175		    handle_keep_alive_queue(
1176		      State#state{keep_alive = KeepAlive}, Data);
1177		false ->
1178		    #request{address = Addr} = NextRequest,
1179		    Address = handle_proxy(Addr, Proxy),
1180		    case httpc_request:send(Address, Session, NextRequest) of
1181			ok ->
1182			    receive_response(NextRequest,
1183					     Session, <<>>,
1184					     State#state{keep_alive = KeepAlive});
1185			{error, Reason} ->
1186			    {stop, {shutdown, {keepalive_failed, Reason}}, State}
1187		    end
1188	    end
1189    end.
1190handle_empty_queue(Session, ProfileName, TimeOut, State) ->
1191    %% The server may choose too terminate an idle pipline| keep_alive session
1192    %% in this case we want to receive the close message
1193    %% at once and not when trying to send the next
1194    %% request.
1195    activate_once(Session),
1196    %% If a pipline | keep_alive session has been idle for some time is not
1197    %% closed by the server, the client may want to close it.
1198    NewState = activate_queue_timeout(TimeOut, State),
1199    case update_session(ProfileName, Session, #session.queue_length, 0) of
1200        {stop, Reason} ->
1201            {stop, {shutdown, Reason}, State};
1202        _ ->
1203            %% Note mfa will be initialized when a new request
1204            %% arrives.
1205            {noreply,
1206             NewState#state{request     = undefined,
1207                            mfa         = undefined,
1208                            status_line = undefined,
1209                            headers     = undefined,
1210                            body        = undefined
1211			   }}
1212    end.
1213
1214receive_response(Request, Session, Data, State) ->
1215    NewState = init_wait_for_response_state(Request, State),
1216    gather_data(Data, Session, NewState).
1217
1218init_wait_for_response_state(Request, State) ->
1219    Relaxed =
1220	(Request#request.settings)#http_options.relaxed,
1221    MFA     = {httpc_response, parse,
1222	       [State#state.max_header_size, Relaxed]},
1223    State#state{request     = Request,
1224		mfa         = MFA,
1225		status_line = undefined,
1226		headers     = undefined,
1227		body        = undefined}.
1228gather_data(<<>>, Session, State) ->
1229    activate_once(Session),
1230    {noreply, State};
1231gather_data(Data, _, State) ->
1232    %% If we already received some bytes of
1233    %% the next response
1234    handle_info({httpc_handler, dummy, Data}, State).
1235
1236case_insensitive_header(Str) when is_list(Str) ->
1237    http_util:to_lower(Str);
1238%% Might be undefined if server does not send such a header
1239case_insensitive_header(Str) ->
1240    Str.
1241
1242activate_once(#session{socket = Socket, socket_type = SocketType}) ->
1243    http_transport:setopts(SocketType, Socket, [{active, once}]).
1244
1245close_socket(#session{socket = {remote_close,_}}) ->
1246    ok;
1247close_socket(#session{socket = Socket, socket_type = SocketType}) ->
1248    http_transport:close(SocketType, Socket).
1249
1250activate_request_timeout(
1251  #state{request = #request{timer = OldRef} = Request} = State) ->
1252    Timeout = (Request#request.settings)#http_options.timeout,
1253    case Timeout of
1254	infinity ->
1255	    State;
1256	_ ->
1257	    ReqId = Request#request.id,
1258	    Msg       = {timeout, ReqId},
1259	    case OldRef of
1260		undefined ->
1261		    ok;
1262		_ ->
1263		    %% Timer is already running! This is the case for a redirect or retry
1264		    %% We need to restart the timer because the handler pid has changed
1265		    cancel_timer(OldRef, Msg)
1266	    end,
1267	    Ref       = erlang:send_after(Timeout, self(), Msg),
1268	    Request2  = Request#request{timer = Ref},
1269	    ReqTimers = [{Request#request.id, Ref} |
1270			 (State#state.timers)#timers.request_timers],
1271	    Timers    = #timers{request_timers = ReqTimers},
1272	    State#state{request = Request2, timers = Timers}
1273    end.
1274
1275activate_queue_timeout(infinity, State) ->
1276    State;
1277activate_queue_timeout(Time, State) ->
1278    Ref = erlang:send_after(Time, self(), timeout_queue),
1279    State#state{timers = #timers{queue_timer = Ref}}.
1280
1281
1282is_pipeline_enabled_client(#session{type = pipeline}) ->
1283    true;
1284is_pipeline_enabled_client(_) ->
1285    false.
1286
1287is_keep_alive_enabled_server("HTTP/1." ++ N, _) when (hd(N) >= $1) ->
1288    true;
1289is_keep_alive_enabled_server("HTTP/1.0",
1290			     #http_response_h{connection = "keep-alive"}) ->
1291    true;
1292is_keep_alive_enabled_server(_,_) ->
1293    false.
1294
1295is_keep_alive_connection(Headers, #session{client_close = ClientClose}) ->
1296    (not ((ClientClose) orelse httpc_response:is_server_closing(Headers))).
1297
1298try_to_enable_pipeline_or_keep_alive(
1299  #state{session      = Session,
1300	 request      = #request{method = Method},
1301	 status_line  = {Version, _, _},
1302	 headers      = Headers,
1303	 profile_name = ProfileName} = State) ->
1304    case is_keep_alive_enabled_server(Version, Headers) andalso
1305	  is_keep_alive_connection(Headers, Session) of
1306	true ->
1307	    case (is_pipeline_enabled_client(Session) andalso
1308		  httpc_request:is_idempotent(Method)) of
1309		true ->
1310		    insert_session(Session, ProfileName),
1311		    State#state{status = pipeline};
1312		false ->
1313		    insert_session(Session, ProfileName),
1314		    %% Make sure type is keep_alive in session
1315		    %% as it in this case might be pipeline
1316		    NewSession = Session#session{type = keep_alive},
1317		    State#state{status  = keep_alive,
1318				session = NewSession}
1319	    end;
1320	false ->
1321	    State#state{status = close}
1322    end.
1323
1324handle_server_closing(State = #state{status = close}) -> State;
1325handle_server_closing(State = #state{headers = undefined}) -> State;
1326handle_server_closing(State = #state{headers = Headers}) ->
1327    case httpc_response:is_server_closing(Headers) of
1328        true -> State#state{status = close};
1329        false -> State
1330    end.
1331
1332answer_request(#request{id = RequestId, from = From} = Request, Msg,
1333	       #state{session      = Session,
1334		      timers       = Timers,
1335		      profile_name = ProfileName} = State) ->
1336    httpc_response:send(From, Msg),
1337    RequestTimers = Timers#timers.request_timers,
1338    TimerRef =
1339	proplists:get_value(RequestId, RequestTimers, undefined),
1340    Timer = {RequestId, TimerRef},
1341    cancel_timer(TimerRef, {timeout, Request#request.id}),
1342    httpc_manager:request_done(RequestId, ProfileName),
1343    NewSession = maybe_make_session_available(ProfileName, Session),
1344    Timers2 = Timers#timers{request_timers = lists:delete(Timer,
1345							  RequestTimers)},
1346    State#state{request = Request#request{from = answer_sent},
1347		session = NewSession,
1348		timers  = Timers2}.
1349
1350maybe_make_session_available(ProfileName,
1351			     #session{available = false} = Session) ->
1352    update_session(ProfileName, Session, #session.available, true),
1353    Session#session{available = true};
1354maybe_make_session_available(_ProfileName, Session) ->
1355    Session.
1356
1357cancel_timers(#timers{request_timers = ReqTmrs, queue_timer = QTmr}) ->
1358    cancel_timer(QTmr, timeout_queue),
1359    CancelTimer = fun({_, Timer}) -> cancel_timer(Timer, timeout) end,
1360    lists:foreach(CancelTimer, ReqTmrs).
1361
1362cancel_timer(undefined, _) ->
1363    ok;
1364cancel_timer(Timer, TimeoutMsg) ->
1365    erlang:cancel_timer(Timer),
1366    receive
1367	TimeoutMsg ->
1368	    ok
1369    after 0 ->
1370	    ok
1371    end.
1372
1373retry_pipeline([], _) ->
1374    ok;
1375
1376%% Skip requests when the answer has already been sent
1377retry_pipeline([#request{from = answer_sent}|PipeLine], State) ->
1378    retry_pipeline(PipeLine, State);
1379
1380retry_pipeline([Request | PipeLine],
1381	      #state{timers       = Timers,
1382		     profile_name = ProfileName} = State) ->
1383    NewState =
1384	case (catch httpc_manager:retry_request(Request, ProfileName)) of
1385	    ok ->
1386		RequestTimers = Timers#timers.request_timers,
1387		ReqId    = Request#request.id,
1388		TimerRef =
1389		    proplists:get_value(ReqId, RequestTimers, undefined),
1390		cancel_timer(TimerRef, {timeout, ReqId}),
1391		NewReqsTimers = lists:delete({ReqId, TimerRef}, RequestTimers),
1392		NewTimers     = Timers#timers{request_timers = NewReqsTimers},
1393		State#state{timers = NewTimers};
1394
1395	    Error ->
1396		answer_request(Request#request.from,
1397			       httpc_response:error(Request, Error), State)
1398	end,
1399    retry_pipeline(PipeLine, NewState).
1400
1401handle_proxy_options(https, #options{https_proxy = {HttpsProxy, _} = HttpsProxyOpt}) when
1402      HttpsProxy =/= undefined ->
1403    HttpsProxyOpt;
1404handle_proxy_options(_, #options{proxy = Proxy}) ->
1405    Proxy.
1406
1407%%% Check to see if the given {Host,Port} tuple is in the NoProxyList
1408%%% Returns an eventually updated {Host,Port} tuple, with the proxy address
1409handle_proxy(HostPort = {Host, _Port}, {Proxy, NoProxy}) ->
1410    case Proxy of
1411	undefined ->
1412	    HostPort;
1413	Proxy ->
1414	    case is_no_proxy_dest(Host, NoProxy) of
1415		true ->
1416		    HostPort;
1417		false ->
1418		    Proxy
1419	    end
1420    end.
1421
1422is_no_proxy_dest(_, []) ->
1423    false;
1424is_no_proxy_dest(Host, [ "*." ++ NoProxyDomain | NoProxyDests]) ->
1425
1426    case is_no_proxy_dest_domain(Host, NoProxyDomain) of
1427	true ->
1428	    true;
1429	false ->
1430	    is_no_proxy_dest(Host, NoProxyDests)
1431    end;
1432
1433is_no_proxy_dest(Host, [NoProxyDest | NoProxyDests]) ->
1434    IsNoProxyDest = case http_util:is_hostname(NoProxyDest) of
1435			true ->
1436			    fun is_no_proxy_host_name/2;
1437			false ->
1438			    fun is_no_proxy_dest_address/2
1439		    end,
1440
1441    case IsNoProxyDest(Host, NoProxyDest) of
1442	true ->
1443	    true;
1444	false ->
1445	    is_no_proxy_dest(Host, NoProxyDests)
1446    end.
1447
1448is_no_proxy_host_name(Host, Host) ->
1449    true;
1450is_no_proxy_host_name(_,_) ->
1451    false.
1452
1453is_no_proxy_dest_domain(Dest, DomainPart) ->
1454    lists:suffix(DomainPart, Dest).
1455
1456is_no_proxy_dest_address(Dest, Dest) ->
1457    true;
1458is_no_proxy_dest_address(Dest, AddressPart) ->
1459    lists:prefix(AddressPart, Dest).
1460
1461init_mfa(#request{settings = Settings}, State) ->
1462    case Settings#http_options.version of
1463	"HTTP/0.9" ->
1464	    {httpc_response, whole_body, [<<>>, -1]};
1465	_ ->
1466	    Relaxed = Settings#http_options.relaxed,
1467	    {httpc_response, parse, [State#state.max_header_size, Relaxed]}
1468    end.
1469
1470init_status_line(#request{settings = Settings}) ->
1471    case Settings#http_options.version of
1472	"HTTP/0.9" ->
1473	    {"HTTP/0.9", 200, "OK"};
1474	_ ->
1475	    undefined
1476    end.
1477
1478socket_type(#request{scheme = http}) ->
1479    ip_comm;
1480socket_type(#request{scheme = https, settings = Settings}) ->
1481    Settings#http_options.ssl.
1482
1483start_stream({_Version, _Code, _ReasonPhrase}, _Headers,
1484	     #request{stream = none} = Request) ->
1485    {ok, Request};
1486start_stream({_Version, Code, _ReasonPhrase}, Headers,
1487	     #request{stream = self} = Request)
1488  when ?IS_STREAMED(Code) ->
1489    Msg = httpc_response:stream_start(Headers, Request, ignore),
1490    httpc_response:send(Request#request.from, Msg),
1491    {ok, Request};
1492start_stream({_Version, Code, _ReasonPhrase}, Headers,
1493	     #request{stream = {self, once}} = Request)
1494  when ?IS_STREAMED(Code) ->
1495    Msg = httpc_response:stream_start(Headers, Request, self()),
1496    httpc_response:send(Request#request.from, Msg),
1497    {ok, Request};
1498start_stream({_Version, Code, _ReasonPhrase}, _Headers,
1499	     #request{stream = Filename} = Request)
1500  when ?IS_STREAMED(Code) andalso is_list(Filename) ->
1501    case file:open(Filename, [write, raw, append, delayed_write]) of
1502        {ok, Fd} ->
1503            {ok, Request#request{stream = Fd}};
1504        {error, Reason} ->
1505            exit({stream_to_file_failed, Reason})
1506    end;
1507start_stream(_StatusLine, _Headers, Request) ->
1508    {ok, Request}.
1509
1510stream_remaining_body(<<>>, _, _) ->
1511    ok;
1512stream_remaining_body(Body, Request, {_, Code, _}) ->
1513    stream(Body, Request, Code).
1514
1515%% Note the end stream message is handled by httpc_response and will
1516%% be sent by answer_request
1517end_stream(_, #request{stream = none}) ->
1518    ok;
1519end_stream(_, #request{stream = self}) ->
1520    ok;
1521end_stream(_, #request{stream = {self, once}}) ->
1522    ok;
1523end_stream({_,200,_}, #request{stream = Fd}) ->
1524    case file:close(Fd) of
1525	ok ->
1526	    ok;
1527	{error, enospc} -> % Could be due to delayed_write
1528	    file:close(Fd)
1529    end;
1530end_stream({_,206,_}, #request{stream = Fd}) ->
1531    case file:close(Fd) of
1532       ok ->
1533           ok;
1534       {error, enospc} -> % Could be due to delayed_write
1535           file:close(Fd)
1536    end;
1537end_stream(_, _) ->
1538    ok.
1539
1540
1541next_body_chunk(#state{request = #request{stream = {self, once}},
1542		       once    = once,
1543		       session = Session} = State,
1544		Code) when ?IS_STREAMED(Code) ->
1545    activate_once(Session),
1546    State#state{once = inactive};
1547next_body_chunk(#state{request = #request{stream = {self, once}},
1548		       once = inactive} = State,
1549		Code) when ?IS_STREAMED(Code) ->
1550    State; %% Wait for user to call stream_next
1551next_body_chunk(#state{session = Session} = State, _) ->
1552    activate_once(Session),
1553    State.
1554
1555handle_verbose(verbose) ->
1556    dbg:p(self(), [r]);
1557handle_verbose(debug) ->
1558    dbg:p(self(), [call]),
1559    dbg:tp(?MODULE, [{'_', [], [{return_trace}]}]);
1560handle_verbose(trace) ->
1561    dbg:p(self(), [call]),
1562    dbg:tpl(?MODULE, [{'_', [], [{return_trace}]}]);
1563handle_verbose(_) ->
1564    ok.
1565
1566send_raw(#session{socket = Socket, socket_type = SocketType},
1567	 {ProcessBody, Acc}) when is_function(ProcessBody, 1) ->
1568    send_raw(SocketType, Socket, ProcessBody, Acc);
1569send_raw(#session{socket = Socket, socket_type = SocketType}, Body) ->
1570    http_transport:send(SocketType, Socket, Body).
1571
1572send_raw(SocketType, Socket, ProcessBody, Acc) ->
1573    case ProcessBody(Acc) of
1574        eof ->
1575            ok;
1576        {ok, Data, NewAcc} ->
1577            DataBin = iolist_to_binary(Data),
1578            case http_transport:send(SocketType, Socket, DataBin) of
1579                ok ->
1580                    send_raw(SocketType, Socket, ProcessBody, NewAcc);
1581                Error ->
1582                    Error
1583            end
1584    end.
1585
1586tls_tunnel(Address, Request, #state{session = #session{socket = Socket,
1587						       socket_type = SocketType} = Session} = State,
1588	   ErrorHandler) ->
1589    UpgradeRequest = tls_tunnel_request(Request),
1590    case httpc_request:send(Address, Session, UpgradeRequest) of
1591	ok ->
1592	    TmpState = State#state{request = UpgradeRequest,
1593				   %%  session = Session,
1594				   mfa = init_mfa(UpgradeRequest, State),
1595				   status_line =
1596				       init_status_line(UpgradeRequest),
1597				   headers = undefined,
1598				   body = undefined},
1599	    http_transport:setopts(SocketType,
1600				   Socket, [{active, once}]),
1601	    NewState = activate_request_timeout(TmpState),
1602	    {ok, NewState#state{status = {ssl_tunnel, Request}}};
1603	{error, Reason} ->
1604	   ErrorHandler(Request, State, Reason)
1605    end.
1606
1607tls_tunnel_request(#request{headers = Headers,
1608			     settings = Options,
1609			     id = RequestId,
1610			     from = From,
1611			     address =  {Host, Port}= Adress,
1612			     ipv6_host_with_brackets = IPV6}) ->
1613
1614    URI = Host ++":" ++ integer_to_list(Port),
1615
1616    #request{
1617       id = RequestId,
1618       from = From,
1619       scheme = http, %% Use tcp-first and then upgrade!
1620       address = Adress,
1621       path = URI,
1622       pquery  = "",
1623       method = connect,
1624       headers = #http_request_h{host = host_header(Headers, URI),
1625				 te = "",
1626				 pragma = "no-cache",
1627				 other = [{"Proxy-Connection", " Keep-Alive"}]},
1628       settings = Options,
1629       abs_uri = URI,
1630       stream = false,
1631       userinfo = "",
1632       headers_as_is = [],
1633       started  = http_util:timestamp(),
1634       ipv6_host_with_brackets = IPV6
1635      }.
1636
1637host_header(#http_request_h{host = Host}, _) ->
1638    Host;
1639
1640%% Handles headers_as_is
1641host_header(_, URI) ->
1642    {ok, {_, _, Host, _, _, _}} =  http_uri:parse(URI),
1643    Host.
1644
1645tls_upgrade(#state{status =
1646		       {ssl_tunnel,
1647			#request{settings =
1648				     #http_options{ssl = {_, TLSOptions0} = SocketType},
1649				     address = {Host, _} = Address} = Request},
1650		   session = #session{socket = TCPSocket} = Session0,
1651		   options = Options} = State) ->
1652
1653    TLSOptions = maybe_add_sni(Host, TLSOptions0),
1654
1655    case ssl:connect(TCPSocket, TLSOptions) of
1656	{ok, TLSSocket} ->
1657	    ClientClose = httpc_request:is_client_closing(Request#request.headers),
1658	    SessionType = httpc_manager:session_type(Options),
1659	    Session = Session0#session{
1660			scheme = https,
1661			socket = TLSSocket,
1662			socket_type = SocketType,
1663			type = SessionType,
1664			client_close = ClientClose},
1665	    httpc_request:send(Address, Session, Request),
1666	    http_transport:setopts(SocketType, TLSSocket, [{active, once}]),
1667	    NewState = State#state{session = Session,
1668				   request = Request,
1669				   mfa = init_mfa(Request, State),
1670				   status_line =
1671				       init_status_line(Request),
1672				   headers = undefined,
1673				   body = undefined,
1674				   status = new
1675				  },
1676	    {noreply, activate_request_timeout(NewState)};
1677	{error, Reason} ->
1678	    Error = httpc_response:error(Request, {failed_connect,
1679						   [{to_address, Address},
1680						    {tls, TLSOptions, Reason}]}),
1681	    maybe_send_answer(Request, Error, State),
1682	    {stop, normal, State#state{request = Request}}
1683    end.
1684
1685maybe_add_sni(Host, Options) ->
1686    case http_util:is_hostname(Host) andalso
1687	  not lists:keymember(server_name_indication, 1, Options) of
1688	true ->
1689	    [{server_name_indication, Host} | Options];
1690	false ->
1691	    Options
1692    end.
1693
1694%% ---------------------------------------------------------------------
1695%% Session wrappers
1696%% ---------------------------------------------------------------------
1697
1698insert_session(Session, ProfileName) ->
1699    httpc_manager:insert_session(Session, ProfileName).
1700
1701
1702update_session(ProfileName, #session{id = SessionId} = Session, Pos, Value) ->
1703    try
1704	begin
1705	    httpc_manager:update_session(ProfileName, SessionId, Pos, Value)
1706	end
1707    catch
1708	error:undef -> %% This could happen during code upgrade
1709	    Session2 = erlang:setelement(Pos, Session, Value),
1710	    insert_session(Session2, ProfileName);
1711	error:badarg ->
1712	    {stop, normal};
1713	T:E:Stacktrace ->
1714	    %% Unexpected this must be an error!
1715            error_logger:error_msg("Failed updating session: "
1716                                   "~n   ProfileName: ~p"
1717                                   "~n   SessionId:   ~p"
1718                                   "~n   Pos:         ~p"
1719                                   "~n   Value:       ~p"
1720                                   "~nwhen"
1721                                   "~n   Session (db) info: ~p"
1722                                   "~n   Session (db):      ~p"
1723                                   "~n   Session (record):  ~p"
1724                                   "~n   T: ~p"
1725                                   "~n   E: ~p",
1726                                   [ProfileName, SessionId, Pos, Value,
1727                                    (catch httpc_manager:which_session_info(ProfileName)),
1728                                    Session,
1729                                    (catch httpc_manager:lookup_session(SessionId, ProfileName)),
1730                                    T, E]),
1731            {stop, {failed_updating_session,
1732                    [{profile,    ProfileName},
1733                     {session_id, SessionId},
1734                     {pos,        Pos},
1735                     {value,      Value},
1736                     {etype,      T},
1737                     {error,      E},
1738                     {stacktrace, Stacktrace}]}}
1739    end.
1740
1741format_address({[$[|T], Port}) ->
1742    {ok, Address} = inet:parse_address(string:strip(T, right, $])),
1743    {Address, Port};
1744format_address(HostPort) ->
1745    HostPort.
1746