1%%%-------------------------------------------------------------------
2%%% File    : ibrowse.erl
3%%% Author  : Chandrashekhar Mullaparthi <chandrashekhar.mullaparthi@t-mobile.co.uk>
4%%% Description : Load balancer process for HTTP client connections.
5%%%
6%%% Created : 11 Oct 2003 by Chandrashekhar Mullaparthi <chandrashekhar.mullaparthi@t-mobile.co.uk>
7%%%-------------------------------------------------------------------
8%% @author Chandrashekhar Mullaparthi <chandrashekhar dot mullaparthi at gmail dot com>
9%% @copyright 2005-2014 Chandrashekhar Mullaparthi
10%% @doc The ibrowse application implements an HTTP 1.1 client in erlang. This
11%% module implements the API of the HTTP client. There is one named
12%% process called 'ibrowse' which assists in load balancing and maintaining configuration. There is one load balancing process per unique webserver. There is
13%% one process to handle one TCP connection to a webserver
14%% (implemented in the module ibrowse_http_client). Multiple connections to a
15%% webserver are setup based on the settings for each webserver. The
16%% ibrowse process also determines which connection to pipeline a
17%% certain request on.  The functions to call are send_req/3,
18%% send_req/4, send_req/5, send_req/6.
19%%
20%% <p>Here are a few sample invocations.</p>
21%%
22%% <code>
23%% ibrowse:send_req("http://intranet/messenger/", [], get).
24%% <br/><br/>
25%%
26%% ibrowse:send_req("http://www.google.com/", [], get, [],
27%%               [{proxy_user, "XXXXX"},
28%%                {proxy_password, "XXXXX"},
29%%                {proxy_host, "proxy"},
30%%                {proxy_port, 8080}], 1000).
31%% <br/><br/>
32%%
33%%ibrowse:send_req("http://www.erlang.org/download/otp_src_R10B-3.tar.gz", [], get, [],
34%%               [{proxy_user, "XXXXX"},
35%%                {proxy_password, "XXXXX"},
36%%                {proxy_host, "proxy"},
37%%                {proxy_port, 8080},
38%%                {save_response_to_file, true}], 1000).
39%% <br/><br/>
40%%
41%% ibrowse:send_req("http://www.erlang.org", [], head).
42%%
43%% <br/><br/>
44%% ibrowse:send_req("http://www.sun.com", [], options).
45%%
46%% <br/><br/>
47%% ibrowse:send_req("http://www.bbc.co.uk", [], trace).
48%%
49%% <br/><br/>
50%% ibrowse:send_req("http://www.google.com", [], get, [],
51%%                   [{stream_to, self()}]).
52%% </code>
53%%
54
55-module(ibrowse).
56-behaviour(gen_server).
57%%--------------------------------------------------------------------
58%% Include files
59%%--------------------------------------------------------------------
60
61%%--------------------------------------------------------------------
62%% External exports
63-export([start_link/0, start/0, stop/0]).
64
65%% gen_server callbacks
66-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
67         terminate/2, code_change/3]).
68
69%% API interface
70-export([
71         rescan_config/0,
72         rescan_config/1,
73         add_config/1,
74         get_config_value/1,
75         get_config_value/2,
76         spawn_worker_process/1,
77         spawn_worker_process/2,
78         spawn_link_worker_process/1,
79         spawn_link_worker_process/2,
80         stop_worker_process/1,
81         send_req/3,
82         send_req/4,
83         send_req/5,
84         send_req/6,
85         send_req_direct/4,
86         send_req_direct/5,
87         send_req_direct/6,
88         send_req_direct/7,
89         stream_next/1,
90         stream_close/1,
91         set_max_sessions/3,
92         set_max_pipeline_size/3,
93         set_max_attempts/3,
94         set_dest/3,
95         trace_on/0,
96         trace_off/0,
97         trace_on/2,
98         trace_off/2,
99         all_trace_off/0,
100         show_dest_status/0,
101         show_dest_status/1,
102         show_dest_status/2,
103         get_metrics/0,
104         get_metrics/2
105        ]).
106
107-ifdef(debug).
108-compile(export_all).
109-endif.
110
111-import(ibrowse_lib, [
112                      parse_url/1,
113                      get_value/3,
114                      do_trace/2,
115                      log_msg/2
116                     ]).
117
118-record(state, {trace = false}).
119
120-include("ibrowse.hrl").
121-include_lib("stdlib/include/ms_transform.hrl").
122
123-define(DEF_MAX_SESSIONS,10).
124-define(DEF_MAX_PIPELINE_SIZE,10).
125-define(DEF_MAX_ATTEMPTS,3).
126
127%%====================================================================
128%% External functions
129%%====================================================================
130%%--------------------------------------------------------------------
131%% Function: start_link/0
132%% Description: Starts the server
133%%--------------------------------------------------------------------
134%% @doc Starts the ibrowse process linked to the calling process. Usually invoked by the supervisor ibrowse_sup
135%% @spec start_link() -> {ok, pid()}
136start_link() ->
137    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
138
139%% @doc Starts the ibrowse process without linking. Useful when testing using the shell
140start() ->
141    gen_server:start({local, ?MODULE}, ?MODULE, [], [{debug, []}]).
142
143%% @doc Stop the ibrowse process. Useful when testing using the shell.
144stop() ->
145    case catch gen_server:call(ibrowse, stop) of
146        {'EXIT',{noproc,_}} ->
147            ok;
148        Res ->
149            Res
150    end.
151
152%% @doc This is the basic function to send a HTTP request.
153%% The Status return value indicates the HTTP status code returned by the webserver
154%% @spec send_req(Url::string(), Headers::headerList(), Method::method()) -> response()
155%% headerList() = [{header(), value()}]
156%% header() = atom() | string() | binary()
157%% value() = term()
158%% method() = get | post | head | options | put | delete | trace | mkcol | propfind | proppatch | lock | unlock | move | copy
159%% Status = string()
160%% ResponseHeaders = [respHeader()]
161%% respHeader() = {headerName(), headerValue()}
162%% headerName() = string()
163%% headerValue() = string()
164%% response() = {ok, Status, ResponseHeaders, ResponseBody} | {ibrowse_req_id, req_id() } | {error, Reason}
165%% req_id() = term()
166%% ResponseBody = string() | {file, Filename}
167%% Reason = term()
168send_req(Url, Headers, Method) ->
169    send_req(Url, Headers, Method, [], []).
170
171%% @doc Same as send_req/3.
172%% If a list is specified for the body it has to be a flat list. The body can also be a fun/0 or a fun/1. <br/>
173%% If fun/0, the connection handling process will repeatdely call the fun until it returns an error or eof. <pre>Fun() = {ok, Data} | eof</pre><br/>
174%% If fun/1, the connection handling process will repeatedly call the fun with the supplied state until it returns an error or eof. <pre>Fun(State) = {ok, Data} | {ok, Data, NewState} | eof</pre>
175%% @spec send_req(Url, Headers, Method::method(), Body::body()) -> response()
176%% body() = [] | string() | binary() | fun_arity_0() | {fun_arity_1(), initial_state()}
177%% initial_state() = term()
178send_req(Url, Headers, Method, Body) ->
179    send_req(Url, Headers, Method, Body, []).
180
181%% @doc Same as send_req/4.
182
183%% For a description of SSL Options, look in the <a href="http://www.erlang.org/doc/apps/ssl/index.html">ssl</a> manpage.
184%% For a description of Process Options, look in the <a href="http://www.erlang.org/doc/man/gen_server.html">gen_server</a> manpage.
185%% If the HTTP Version to use is not specified, the default is 1.1.
186%% <br/>
187%% <ul>
188%% <li>The <code>host_header</code> option is useful in the case where ibrowse is
189%% connecting to a component such as <a
190%% href="http://www.stunnel.org">stunnel</a> which then sets up a
191%% secure connection to a webserver. In this case, the URL supplied to
192%% ibrowse must have the stunnel host/port details, but that won't
193%% make sense to the destination webserver. This option can then be
194%% used to specify what should go in the <code>Host</code> header in
195%% the request.</li>
196%% <li>The <code>stream_to</code> option can be used to have the HTTP
197%% response streamed to a process as messages as data arrives on the
198%% socket. If the calling process wishes to control the rate at which
199%% data is received from the server, the option <code>{stream_to,
200%% {process(), once}}</code> can be specified. The calling process
201%% will have to invoke <code>ibrowse:stream_next(Request_id)</code> to
202%% receive the next packet.</li>
203%%
204%% <li>When both the options <code>save_response_to_file</code> and <code>stream_to</code>
205%% are specified, the former takes precedence.</li>
206%%
207%% <li>For the <code>save_response_to_file</code> option, the response body is saved to
208%% file only if the status code is in the 200-299 range. If not, the response body is returned
209%% as a string.</li>
210%% <li>Whenever an error occurs in the processing of a request, ibrowse will return as much
211%% information as it has, such as HTTP Status Code and HTTP Headers. When this happens, the response
212%% is of the form <code>{error, {Reason, {stat_code, StatusCode}, HTTP_headers}}</code></li>
213%%
214%% <li>The <code>inactivity_timeout</code> option is useful when
215%% dealing with large response bodies and/or slow links. In these
216%% cases, it might be hard to estimate how long a request will take to
217%% complete. In such cases, the client might want to timeout if no
218%% data has been received on the link for a certain time interval.
219%%
220%% This value is also used to close connections which are not in use for
221%% the specified timeout value.
222%% </li>
223%%
224%% <li>
225%% The <code>connect_timeout</code> option is to specify how long the
226%% client process should wait for connection establishment. This is
227%% useful in scenarios where connections to servers are usually setup
228%% very fast, but responses might take much longer compared to
229%% connection setup. In such cases, it is better for the calling
230%% process to timeout faster if there is a problem (DNS lookup
231%% delays/failures, network routing issues, etc). The total timeout
232%% value specified for the request will enforced. To illustrate using
233%% an example:
234%% <code>
235%% ibrowse:send_req("http://www.example.com/cgi-bin/request", [], get, [], [{connect_timeout, 100}], 1000).
236%% </code>
237%% In the above invocation, if the connection isn't established within
238%% 100 milliseconds, the request will fail with
239%% <code>{error, conn_failed}</code>.<br/>
240%% If connection setup succeeds, the total time allowed for the
241%% request to complete will be 1000 milliseconds minus the time taken
242%% for connection setup.
243%% </li>
244%%
245%% <li> The <code>socket_options</code> option can be used to set
246%% specific options on the socket. The <code>{active, true | false | once}</code>
247%% and <code>{packet_type, Packet_type}</code> will be filtered out by ibrowse.  </li>
248%%
249%% <li> The <code>headers_as_is</code> option is to enable the caller
250%% to send headers exactly as specified in the request without ibrowse
251%% adding some of its own. Required for some picky servers apparently.  </li>
252%%
253%% <li>The <code>give_raw_headers</code> option is to enable the
254%% caller to get access to the raw status line and raw unparsed
255%% headers. Not quite sure why someone would want this, but one of my
256%% users asked for it, so here it is. </li>
257%%
258%% <li> The <code>preserve_status_line</code> option is to get the raw status line as a custom header
259%% in the response. The status line is returned as a tuple {ibrowse_status_line, Status_line_binary}
260%% If both the <code>give_raw_headers</code> and <code>preserve_status_line</code> are specified
261%% in a request, only the <code>give_raw_headers</code> is honoured. </li>
262%%
263%% <li> The <code>preserve_chunked_encoding</code> option enables the caller
264%% to receive the raw data stream when the Transfer-Encoding of the server
265%% response is Chunked.
266%% </li>
267%% <li> The <code>return_raw_request</code> option enables the caller to get the exact request which was sent by ibrowse to the server, along with the response. When this option is used, the response for synchronous requests is a 5-tuple instead of the usual 4-tuple. For asynchronous requests, the calling process gets a message <code>{ibrowse_async_raw_req, Raw_req}</code>.
268%% </li>
269%% </ul>
270%%
271%% @spec send_req(Url::string(), Headers::headerList(), Method::method(), Body::body(), Options::optionList()) -> response()
272%% optionList() = [option()]
273%% option() = {max_sessions, integer()}        |
274%%          {response_format,response_format()}|
275%%          {stream_full_chunks, boolean()}    |
276%%          {stream_chunk_size, integer()}     |
277%%          {max_pipeline_size, integer()}     |
278%%          {trace, boolean()}                 |
279%%          {is_ssl, boolean()}                |
280%%          {ssl_options, [SSLOpt]}            |
281%%          {pool_name, atom()}                |
282%%          {proxy_host, string()}             |
283%%          {proxy_port, integer()}            |
284%%          {proxy_user, string()}             |
285%%          {proxy_password, string()}         |
286%%          {use_absolute_uri, boolean()}      |
287%%          {basic_auth, {username(), password()}} |
288%%          {cookie, string()}                 |
289%%          {content_length, integer()}        |
290%%          {content_type, string()}           |
291%%          {save_response_to_file, srtf()}    |
292%%          {stream_to, stream_to()}           |
293%%          {http_vsn, {MajorVsn, MinorVsn}}   |
294%%          {host_header, string()}            |
295%%          {inactivity_timeout, integer()}    |
296%%          {connect_timeout, integer()}       |
297%%          {socket_options, Sock_opts}        |
298%%          {transfer_encoding, {chunked, ChunkSize}} |
299%%          {headers_as_is, boolean()}         |
300%%          {give_raw_headers, boolean()}      |
301%%          {preserve_chunked_encoding,boolean()}     |
302%%          {workaround, head_response_with_body}     |
303%%          {worker_process_options, list()} |
304%%          {return_raw_request, true}         |
305%%          {max_attempts, integer()}          |
306%%          {socks5_host, host()}              |
307%%          {socks5_port, integer()}           |
308%%          {socks5_user, binary()}            |
309%%          {socks5_password, binary()}
310%%
311%% ip4_address() = {0..255, 0..255, 0..255, 0..255}
312%% ip6_address() =
313%%     {0..65535,
314%%      0..65535,
315%%      0..65535,
316%%      0..65535,
317%%      0..65535,
318%%      0..65535,
319%%      0..65535,
320%%      0..65535}
321%% host() = string() | ip4_address() | ip6_address()
322%% stream_to() = process() | {process(), once}
323%% process() = pid() | atom()
324%% username() = string()
325%% password() = string()
326%% SSLOpt = term()
327%% Sock_opts = [Sock_opt]
328%% Sock_opt = term()
329%% ChunkSize = integer()
330%% srtf() = boolean() | filename() | {append, filename()}
331%% filename() = string()
332%% response_format() = list | binary
333send_req(Url, Headers, Method, Body, Options) ->
334    send_req(Url, Headers, Method, Body, Options, 30000).
335
336%% @doc Same as send_req/5.
337%% All timeout values are in milliseconds.
338%% @spec send_req(Url, Headers::headerList(), Method::method(), Body::body(), Options::optionList(), Timeout) -> response()
339%% Timeout = integer() | infinity
340send_req(Url, Headers, Method, Body, Options, Timeout) ->
341    case catch parse_url(Url) of
342        #url{host = Host,
343             port = Port,
344             protocol = Protocol} = Parsed_url ->
345            Lb_pid = lb_pid(Host, Port, Parsed_url),
346            Max_sessions = get_max_sessions(Host, Port, Options),
347            Max_pipeline_size = get_max_pipeline_size(Host, Port, Options),
348            Max_attempts = get_max_attempts(Host, Port, Options),
349            Options_1 = merge_options(Host, Port, Options),
350            {SSLOptions, IsSSL} =
351                case (Protocol == https) orelse
352                    get_value(is_ssl, Options_1, false) of
353                    false -> {[], false};
354                    true -> {get_value(ssl_options, Options_1, []), true}
355                end,
356            try_routing_request(Lb_pid, Parsed_url,
357                                Max_sessions,
358                                Max_pipeline_size,
359                                {SSLOptions, IsSSL},
360                                Headers, Method, Body, Options_1, Timeout, Timeout, os:timestamp(), Max_attempts, 0);
361        Err ->
362            {error, {url_parsing_failed, Err}}
363    end.
364
365lb_pid(Host, Port, Url) ->
366    case ets:lookup(ibrowse_lb, {Host, Port}) of
367        [] ->
368            get_lb_pid(Url);
369        [#lb_pid{pid = Pid}] ->
370            case is_process_alive(Pid) of
371                true ->
372                    Pid;
373                false ->
374                    ets:delete(ibrowse_lb, {Host, Port}),
375                    get_lb_pid(Url)
376            end
377    end.
378
379try_routing_request(Lb_pid, Parsed_url,
380                    Max_sessions,
381                    Max_pipeline_size,
382                    {SSLOptions, IsSSL},
383                    Headers, Method, Body, Options_1, Timeout,
384                    Ori_timeout, Req_start_time, Max_attempts, Try_count) when Try_count < Max_attempts ->
385    ProcessOptions = get_value(worker_process_options, Options_1, []),
386    case ibrowse_lb:spawn_connection(Lb_pid, Parsed_url,
387                                             Max_sessions,
388                                             Max_pipeline_size,
389                                             {SSLOptions, IsSSL},
390                                             ProcessOptions) of
391        {ok, {_Pid_cur_spec_size, _, Conn_Pid}} ->
392            case do_send_req(Conn_Pid, Parsed_url, Headers,
393                             Method, Body, Options_1, Timeout) of
394                {error, sel_conn_closed} ->
395                    Time_now = os:timestamp(),
396                    Time_taken_so_far = trunc(round(timer:now_diff(Time_now, Req_start_time)/1000)),
397                    Time_remaining = Ori_timeout - Time_taken_so_far,
398                    Time_remaining_percent = trunc(round((Time_remaining/Ori_timeout)*100)),
399                    %% io:format("~p -- Time_remaining: ~p (~p%)~n", [self(), Time_remaining, Time_remaining_percent]),
400                    case (Time_remaining > 0) andalso (Time_remaining_percent >= 5) of
401                        true ->
402                            try_routing_request(Lb_pid, Parsed_url,
403                                                Max_sessions,
404                                                Max_pipeline_size,
405                                                {SSLOptions, IsSSL},
406                                                Headers, Method, Body, Options_1,
407                                                Time_remaining, Ori_timeout, Req_start_time, Max_attempts, Try_count + 1);
408                        false ->
409                            {error, retry_later}
410                    end;
411                Res ->
412                    Res
413            end;
414        Err ->
415            Err
416    end;
417try_routing_request(_, _, _, _, _, _, _, _, _, _, _, _, _, _) ->
418    {error, retry_later}.
419
420merge_options(Host, Port, Options) ->
421    Config_options = get_config_value({options, Host, Port}, []) ++
422                     get_config_value({options, global}, []),
423    lists:foldl(
424      fun({Key, Val}, Acc) ->
425              case lists:keysearch(Key, 1, Options) of
426                  false ->
427                      [{Key, Val} | Acc];
428                  _ ->
429                      Acc
430              end
431      end, Options, Config_options).
432
433get_lb_pid(Url) ->
434    gen_server:call(?MODULE, {get_lb_pid, Url}).
435
436get_max_sessions(Host, Port, Options) ->
437    get_value(max_sessions, Options,
438              get_config_value({max_sessions, Host, Port},
439                               default_max_sessions())).
440
441get_max_pipeline_size(Host, Port, Options) ->
442    get_value(max_pipeline_size, Options,
443              get_config_value({max_pipeline_size, Host, Port},
444                               default_max_pipeline_size())).
445
446get_max_attempts(Host, Port, Options) ->
447    get_value(max_attempts, Options,
448              get_config_value({max_attempts, Host, Port},
449                               default_max_attempts())).
450
451default_max_sessions() ->
452    safe_get_env(ibrowse, default_max_sessions, ?DEF_MAX_SESSIONS).
453
454default_max_pipeline_size() ->
455    safe_get_env(ibrowse, default_max_pipeline_size, ?DEF_MAX_PIPELINE_SIZE).
456
457default_max_attempts() ->
458    safe_get_env(ibrowse, default_max_attempts, ?DEF_MAX_ATTEMPTS).
459
460safe_get_env(App, Key, Def_val) ->
461    case application:get_env(App, Key) of
462        undefined ->
463            Def_val;
464        {ok, Val} ->
465            Val
466    end.
467
468%% @doc Deprecated. Use set_max_sessions/3 and set_max_pipeline_size/3
469%% for achieving the same effect.
470set_dest(Host, Port, [{max_sessions, Max} | T]) ->
471    set_max_sessions(Host, Port, Max),
472    set_dest(Host, Port, T);
473set_dest(Host, Port, [{max_pipeline_size, Max} | T]) ->
474    set_max_pipeline_size(Host, Port, Max),
475    set_dest(Host, Port, T);
476set_dest(Host, Port, [{trace, Bool} | T]) when Bool == true; Bool == false ->
477    ibrowse ! {trace, true, Host, Port},
478    set_dest(Host, Port, T);
479set_dest(_Host, _Port, [H | _]) ->
480    exit({invalid_option, H});
481set_dest(_, _, []) ->
482    ok.
483
484%% @doc Set the maximum number of connections allowed to a specific Host:Port.
485%% @spec set_max_sessions(Host::string(), Port::integer(), Max::integer()) -> ok
486set_max_sessions(Host, Port, Max) when is_integer(Max), Max > 0 ->
487    gen_server:call(?MODULE, {set_config_value, {max_sessions, Host, Port}, Max}).
488
489%% @doc Set the maximum pipeline size for each connection to a specific Host:Port.
490%% @spec set_max_pipeline_size(Host::string(), Port::integer(), Max::integer()) -> ok
491set_max_pipeline_size(Host, Port, Max) when is_integer(Max), Max > 0 ->
492    gen_server:call(?MODULE, {set_config_value, {max_pipeline_size, Host, Port}, Max}).
493
494%% @doc Set the maximum attempts for each connection to a specific Host:Port.
495%% @spec set_max_attempts(Host::string(), Port::integer(), Max::integer()) -> ok
496set_max_attempts(Host, Port, Max) when is_integer(Max), Max > 0 ->
497    gen_server:call(?MODULE, {set_config_value, {max_attempts, Host, Port}, Max}).
498
499do_send_req(Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout) ->
500    case catch ibrowse_http_client:send_req(Conn_Pid, Parsed_url,
501                                            Headers, Method, ensure_bin(Body),
502                                            Options, Timeout) of
503        {'EXIT', {timeout, _}} ->
504            P_info = case catch erlang:process_info(Conn_Pid, [messages, message_queue_len, backtrace]) of
505                            [_|_] = Conn_Pid_info_list ->
506                                Conn_Pid_info_list;
507                            _ ->
508                                process_info_not_available
509                        end,
510            log_msg("{ibrowse_http_client, send_req, ~1000.p} gen_server call timeout.~nProcess info: ~p~n",
511                    [[Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout], P_info]),
512            {error, req_timedout};
513        {'EXIT', {normal, _}} = Ex_rsn ->
514            log_msg("{ibrowse_http_client, send_req, ~1000.p} gen_server call got ~1000.p~n",
515                    [[Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout], Ex_rsn]),
516            {error, req_timedout};
517        {error, X} when X == connection_closed;
518                        X == {send_failed, {error, enotconn}};
519                        X == {send_failed,{error,einval}};
520                        X == {send_failed,{error,closed}};
521                        X == connection_closing;
522                        ((X == connection_closed_no_retry) andalso ((Method == get) orelse (Method == head))) ->
523            {error, sel_conn_closed};
524        {error, connection_closed_no_retry} ->
525            {error, connection_closed};
526        {error, {'EXIT', {noproc, _}}} ->
527            {error, sel_conn_closed};
528        {'EXIT', Reason} ->
529            {error, {'EXIT', Reason}};
530        {ok, St_code, Headers, Body} = Ret when is_binary(Body) ->
531            case get_value(response_format, Options, list) of
532                list ->
533                    {ok, St_code, Headers, binary_to_list(Body)};
534                binary ->
535                    Ret
536            end;
537        {ok, St_code, Headers, Body, Req} = Ret when is_binary(Body) ->
538            case get_value(response_format, Options, list) of
539                list ->
540                    {ok, St_code, Headers, binary_to_list(Body), Req};
541                binary ->
542                    Ret
543            end;
544        Ret ->
545            Ret
546    end.
547
548ensure_bin(L) when is_list(L)                     -> list_to_binary(L);
549ensure_bin(B) when is_binary(B)                   -> B;
550ensure_bin(Fun) when is_function(Fun)             -> Fun;
551ensure_bin({Fun}) when is_function(Fun)           -> Fun;
552ensure_bin({Fun, _} = Body) when is_function(Fun) -> Body.
553
554%% @doc Creates a HTTP client process to the specified Host:Port which
555%% is not part of the load balancing pool. This is useful in cases
556%% where some requests to a webserver might take a long time whereas
557%% some might take a very short time. To avoid getting these quick
558%% requests stuck in the pipeline behind time consuming requests, use
559%% this function to get a handle to a connection process. <br/>
560%% <b>Note:</b> Calling this function only creates a worker process. No connection
561%% is setup. The connection attempt is made only when the first
562%% request is sent via any of the send_req_direct/4,5,6,7 functions.<br/>
563%% <b>Note:</b> It is the responsibility of the calling process to control
564%% pipeline size on such connections.
565
566%% @spec spawn_worker_process(Url::string() | {Host::string(), Port::integer()}) -> {ok, pid()}
567spawn_worker_process(Args) ->
568    spawn_worker_process(Args, []).
569
570%% @doc Same as spawn_worker_process/1 except with Erlang process options.
571%% @spec spawn_worker_process(Host::string(), Port::integer()) -> {ok, pid()}
572spawn_worker_process(Host, Port) when is_list(Host), is_integer(Port) ->
573    %% Convert old API calls to new API format.
574    spawn_worker_process({Host, Port}, []);
575spawn_worker_process(Args, Options) ->
576    ibrowse_http_client:start(Args, Options).
577
578%% @doc Same as spawn_worker_process/1 except the the calling process
579%% is linked to the worker process which is spawned.
580%% @spec spawn_link_worker_process(Url::string() | {Host::string(), Port::integer()}) -> {ok, pid()}
581spawn_link_worker_process(Args) ->
582    spawn_link_worker_process(Args, []).
583
584%% @doc Same as spawn_link_worker_process/1 except with Erlang process options.
585%% @spec spawn_link_worker_process(Host::string(), Port::integer()) -> {ok, pid()}
586spawn_link_worker_process(Host, Port) when is_list(Host), is_integer(Port) ->
587    %% Convert old API calls to new API format.
588    spawn_link_worker_process({Host, Port}, []);
589spawn_link_worker_process(Args, Options) ->
590    ibrowse_http_client:start_link(Args, Options).
591
592%% @doc Terminate a worker process spawned using
593%% spawn_worker_process/2 or spawn_link_worker_process/2. Requests in
594%% progress will get the error response <pre>{error, closing_on_request}</pre>
595%% @spec stop_worker_process(Conn_pid::pid()) -> ok
596stop_worker_process(Conn_pid) ->
597    ibrowse_http_client:stop(Conn_pid).
598
599%% @doc Same as send_req/3 except that the first argument is the PID
600%% returned by spawn_worker_process/2 or spawn_link_worker_process/2
601send_req_direct(Conn_pid, Url, Headers, Method) ->
602    send_req_direct(Conn_pid, Url, Headers, Method, [], []).
603
604%% @doc Same as send_req/4 except that the first argument is the PID
605%% returned by spawn_worker_process/2 or spawn_link_worker_process/2
606send_req_direct(Conn_pid, Url, Headers, Method, Body) ->
607    send_req_direct(Conn_pid, Url, Headers, Method, Body, []).
608
609%% @doc Same as send_req/5 except that the first argument is the PID
610%% returned by spawn_worker_process/2 or spawn_link_worker_process/2
611send_req_direct(Conn_pid, Url, Headers, Method, Body, Options) ->
612    send_req_direct(Conn_pid, Url, Headers, Method, Body, Options, 30000).
613
614%% @doc Same as send_req/6 except that the first argument is the PID
615%% returned by spawn_worker_process/2 or spawn_link_worker_process/2
616send_req_direct(Conn_pid, Url, Headers, Method, Body, Options, Timeout) ->
617    case catch parse_url(Url) of
618        #url{host = Host,
619             port = Port} = Parsed_url ->
620            Options_1 = merge_options(Host, Port, Options),
621            case do_send_req(Conn_pid, Parsed_url, Headers, Method, Body, Options_1, Timeout) of
622                {error, {'EXIT', {noproc, _}}} ->
623                    {error, worker_is_dead};
624                Ret ->
625                    Ret
626            end;
627        Err ->
628            {error, {url_parsing_failed, Err}}
629    end.
630
631%% @doc Tell ibrowse to stream the next chunk of data to the
632%% caller. Should be used in conjunction with the
633%% <code>stream_to</code> option
634%% @spec stream_next(Req_id :: req_id()) -> ok | {error, unknown_req_id}
635stream_next(Req_id) ->
636    case ets:lookup(ibrowse_stream, {req_id_pid, Req_id}) of
637        [] ->
638            {error, unknown_req_id};
639        [{_, Pid}] ->
640            catch Pid ! {stream_next, Req_id},
641            ok
642    end.
643
644%% @doc Tell ibrowse to close the connection associated with the
645%% specified stream.  Should be used in conjunction with the
646%% <code>stream_to</code> option. Note that all requests in progress on
647%% the connection which is serving this Req_id will be aborted, and an
648%% error returned.
649%% @spec stream_close(Req_id :: req_id()) -> ok | {error, unknown_req_id}
650stream_close(Req_id) ->
651    case ets:lookup(ibrowse_stream, {req_id_pid, Req_id}) of
652        [] ->
653            {error, unknown_req_id};
654        [{_, Pid}] ->
655            catch Pid ! {stream_close, Req_id},
656            ok
657    end.
658
659%% @doc Turn tracing on for the ibrowse process
660trace_on() ->
661    ibrowse ! {trace, true}.
662%% @doc Turn tracing off for the ibrowse process
663trace_off() ->
664    ibrowse ! {trace, false}.
665
666%% @doc Turn tracing on for all connections to the specified HTTP
667%% server. Host is whatever is specified as the domain name in the URL
668%% @spec trace_on(Host, Port) -> ok
669%% Host = string()
670%% Port = integer()
671trace_on(Host, Port) ->
672    ibrowse ! {trace, true, Host, Port},
673    ok.
674
675%% @doc Turn tracing OFF for all connections to the specified HTTP
676%% server.
677%% @spec trace_off(Host, Port) -> ok
678trace_off(Host, Port) ->
679    ibrowse ! {trace, false, Host, Port},
680    ok.
681
682%% @doc Turn Off ALL tracing
683%% @spec all_trace_off() -> ok
684all_trace_off() ->
685    ibrowse ! all_trace_off,
686    ok.
687
688%% @doc Shows some internal information about load balancing. Info
689%% about workers spawned using spawn_worker_process/2 or
690%% spawn_link_worker_process/2 is not included.
691-ifdef(ets_ref).
692show_dest_status() ->
693    io:format("~-40.40s | ~-5.5s | ~-10.10s | ~s~n",
694              ["Server:port", "ETS", "Num conns", "LB Pid"]),
695    io:format("~80.80.=s~n", [""]),
696    Metrics = get_metrics(),
697    lists:foreach(
698      fun({Host, Port, {Lb_pid, _, Tid, Size, _}}) ->
699              io:format("~40.40s | ~-5.5s | ~-5.5s | ~p~n",
700                        [Host ++ ":" ++ integer_to_list(Port),
701                         ref_to_list(Tid),
702                         integer_to_list(Size),
703                         Lb_pid])
704      end, Metrics).
705-else.
706show_dest_status() ->
707    io:format("~-40.40s | ~-5.5s | ~-10.10s | ~s~n",
708              ["Server:port", "ETS", "Num conns", "LB Pid"]),
709    io:format("~80.80.=s~n", [""]),
710    Metrics = get_metrics(),
711    lists:foreach(
712      fun({Host, Port, {Lb_pid, _, Tid, Size, _}}) ->
713              io:format("~40.40s | ~-5.5s | ~-5.5s | ~p~n",
714                        [Host ++ ":" ++ integer_to_list(Port),
715                         integer_to_list(Tid),
716                         integer_to_list(Size),
717                         Lb_pid])
718      end, Metrics).
719-endif.
720
721show_dest_status(Url) ->
722    #url{host = Host, port = Port} = ibrowse_lib:parse_url(Url),
723    show_dest_status(Host, Port).
724
725%% @doc Shows some internal information about load balancing to a
726%% specified Host:Port. Info about workers spawned using
727%% spawn_worker_process/2 or spawn_link_worker_process/2 is not
728%% included.
729show_dest_status(Host, Port) ->
730    case get_metrics(Host, Port) of
731        {Lb_pid, MsgQueueSize,
732         Tid, Size,
733         {{First_p_sz,  First_p_sz},
734          {Last_p_sz, Last_p_sz}}} ->
735            io:format("Load Balancer Pid     : ~p~n"
736                      "LB process msg q size : ~p~n"
737                      "LB ETS table id       : ~p~n"
738                      "Num Connections       : ~p~n"
739                      "Smallest pipeline     : ~p~n"
740                      "Largest pipeline      : ~p~n",
741                      [Lb_pid, MsgQueueSize, Tid, Size,
742                       First_p_sz, Last_p_sz]);
743        _Err ->
744            io:format("Metrics not available~n", [])
745    end.
746
747get_metrics() ->
748    Dests = lists:filter(
749              fun(#lb_pid{host_port = {Host, Port}}) when is_list(Host),
750                                                          is_integer(Port) ->
751                      true;
752                 (_) ->
753                      false
754              end, ets:tab2list(ibrowse_lb)),
755    lists:foldl(
756      fun(#lb_pid{host_port = {X_host, X_port}}, X_acc) ->
757              case get_metrics(X_host, X_port) of
758                  {_, _, _, _, _} = X_res ->
759                      [{X_host, X_port, X_res} | X_acc];
760                  _X_res ->
761                      X_acc
762              end
763      end, [], Dests).
764
765get_metrics(Host, Port) ->
766    case ets:lookup(ibrowse_lb, {Host, Port}) of
767        [] ->
768            no_active_processes;
769        [#lb_pid{pid = Lb_pid, ets_tid = Tid}] ->
770            MsgQueueSize = case (catch process_info(Lb_pid, message_queue_len)) of
771			       {message_queue_len, Msg_q_len} ->
772				   Msg_q_len;
773			       _ ->
774				   -1
775			   end,
776            case Tid of
777                undefined ->
778                    {Lb_pid, MsgQueueSize, undefined, 0, {{0, 0}, {0, 0}}};
779                _ ->
780                    try
781                        Size = ets:info(Tid, size),
782                        case Size of
783                            0 ->
784                                {Lb_pid, MsgQueueSize, Tid, 0, {{0, 0}, {0, 0}}};
785                            _ ->
786                                {First_p_sz, _, _} = ets:first(Tid),
787                                {Last_p_sz, _, _} = ets:last(Tid),
788                                {Lb_pid, MsgQueueSize,
789                                 Tid, Size,
790                                 {{First_p_sz,  First_p_sz}, {Last_p_sz, Last_p_sz}}}
791                        end
792                    catch _:_Err ->
793                            not_available
794                    end
795            end
796    end.
797
798%% @doc Clear current configuration for ibrowse and load from the file
799%% ibrowse.conf in the IBROWSE_EBIN/../priv directory. Current
800%% configuration is cleared only if the ibrowse.conf file is readable
801%% using file:consult/1
802rescan_config() ->
803    gen_server:call(?MODULE, rescan_config).
804
805%% Clear current configuration for ibrowse and load from the specified
806%% file. Current configuration is cleared only if the specified
807%% file is readable using file:consult/1
808rescan_config([{_,_}|_]=Terms) ->
809    gen_server:call(?MODULE, {rescan_config_terms, Terms});
810rescan_config(File) when is_list(File) ->
811    gen_server:call(?MODULE, {rescan_config, File}).
812
813%% @doc Add additional configuration elements at runtime.
814add_config([{_,_}|_]=Terms) ->
815    gen_server:call(?MODULE, {add_config_terms, Terms}).
816
817%%====================================================================
818%% Server functions
819%%====================================================================
820
821%%--------------------------------------------------------------------
822%% Function: init/1
823%% Description: Initiates the server
824%% Returns: {ok, State}          |
825%%          {ok, State, Timeout} |
826%%          ignore               |
827%%          {stop, Reason}
828%%--------------------------------------------------------------------
829init(_) ->
830    process_flag(trap_exit, true),
831    State = #state{},
832    put(my_trace_flag, State#state.trace),
833    put(ibrowse_trace_token, "ibrowse"),
834    ibrowse_lb     = ets:new(ibrowse_lb, [named_table, public, {keypos, 2}]),
835    ibrowse_conf   = ets:new(ibrowse_conf, [named_table, protected, {keypos, 2}]),
836    ibrowse_stream = ets:new(ibrowse_stream, [named_table, public]),
837    import_config(),
838    {ok, #state{}}.
839
840import_config() ->
841    case code:priv_dir(ibrowse) of
842        {error, _} ->
843            ok;
844        PrivDir ->
845            Filename = filename:join(PrivDir, "ibrowse.conf"),
846            import_config(Filename)
847    end.
848
849import_config(Filename) ->
850    case file:consult(Filename) of
851        {ok, Terms} ->
852            apply_config(Terms);
853        _Err ->
854            ok
855    end.
856
857apply_config(Terms) ->
858    ets:delete_all_objects(ibrowse_conf),
859    insert_config(Terms).
860
861insert_config(Terms) ->
862    Fun = fun({dest, Host, Port, MaxSess, MaxPipe, Options})
863             when is_list(Host), is_integer(Port),
864                  is_integer(MaxSess), MaxSess > 0,
865                  is_integer(MaxPipe), MaxPipe > 0, is_list(Options) ->
866                  I = [{{max_sessions, Host, Port}, MaxSess},
867                       {{max_pipeline_size, Host, Port}, MaxPipe},
868                       {{options, Host, Port}, Options}],
869                  lists:foreach(
870                    fun({X, Y}) ->
871                            ets:insert(ibrowse_conf,
872                                       #ibrowse_conf{key = X,
873                                                     value = Y})
874                    end, I);
875             ({K, V}) ->
876                  ets:insert(ibrowse_conf,
877                             #ibrowse_conf{key = K,
878                                           value = V});
879             (X) ->
880                  io:format("Skipping unrecognised term: ~p~n", [X])
881          end,
882    lists:foreach(Fun, Terms).
883
884%% @doc Internal export
885get_config_value(Key) ->
886    try
887        [#ibrowse_conf{value = V}] = ets:lookup(ibrowse_conf, Key),
888        V
889    catch
890        error:badarg ->
891            throw({error, ibrowse_not_running})
892    end.
893
894%% @doc Internal export
895get_config_value(Key, DefVal) ->
896    try
897        case ets:lookup(ibrowse_conf, Key) of
898            [] ->
899                DefVal;
900            [#ibrowse_conf{value = V}] ->
901                V
902        end
903    catch
904        error:badarg ->
905            throw({error, ibrowse_not_running})
906    end.
907
908set_config_value(Key, Val) ->
909    ets:insert(ibrowse_conf, #ibrowse_conf{key = Key, value = Val}).
910%%--------------------------------------------------------------------
911%% Function: handle_call/3
912%% Description: Handling call messages
913%% Returns: {reply, Reply, State}          |
914%%          {reply, Reply, State, Timeout} |
915%%          {noreply, State}               |
916%%          {noreply, State, Timeout}      |
917%%          {stop, Reason, Reply, State}   | (terminate/2 is called)
918%%          {stop, Reason, State}            (terminate/2 is called)
919%%--------------------------------------------------------------------
920handle_call({get_lb_pid, #url{host = Host, port = Port} = Url}, _From, State) ->
921    Pid = do_get_connection(Url, ets:lookup(ibrowse_lb, {Host, Port})),
922    {reply, Pid, State};
923
924handle_call(stop, _From, State) ->
925    do_trace("IBROWSE shutting down~n", []),
926    ets:foldl(fun(#lb_pid{pid = Pid}, Acc) ->
927                      ibrowse_lb:stop(Pid),
928                      Acc
929              end, [], ibrowse_lb),
930    {stop, normal, ok, State};
931
932handle_call({set_config_value, Key, Val}, _From, State) ->
933    set_config_value(Key, Val),
934    {reply, ok, State};
935
936handle_call(rescan_config, _From, State) ->
937    Ret = (catch import_config()),
938    {reply, Ret, State};
939
940handle_call({rescan_config, File}, _From, State) ->
941    Ret = (catch import_config(File)),
942    {reply, Ret, State};
943
944handle_call({rescan_config_terms, Terms}, _From, State) ->
945    Ret = (catch apply_config(Terms)),
946    {reply, Ret, State};
947
948handle_call({add_config_terms, Terms}, _From, State) ->
949    Ret = (catch insert_config(Terms)),
950    {reply, Ret, State};
951
952handle_call(Request, _From, State) ->
953    Reply = {unknown_request, Request},
954    {reply, Reply, State}.
955
956%%--------------------------------------------------------------------
957%% Function: handle_cast/2
958%% Description: Handling cast messages
959%% Returns: {noreply, State}          |
960%%          {noreply, State, Timeout} |
961%%          {stop, Reason, State}            (terminate/2 is called)
962%%--------------------------------------------------------------------
963
964handle_cast(_Msg, State) ->
965    {noreply, State}.
966
967%%--------------------------------------------------------------------
968%% Function: handle_info/2
969%% Description: Handling all non call/cast messages
970%% Returns: {noreply, State}          |
971%%          {noreply, State, Timeout} |
972%%          {stop, Reason, State}            (terminate/2 is called)
973%%--------------------------------------------------------------------
974handle_info(all_trace_off, State) ->
975    Mspec = [{{ibrowse_conf,{trace,'$1','$2'},true},[],[{{'$1','$2'}}]}],
976    Trace_on_dests = ets:select(ibrowse_conf, Mspec),
977    Fun = fun(#lb_pid{host_port = {H, P}, pid = Pid}, _) ->
978                  case lists:member({H, P}, Trace_on_dests) of
979                      false ->
980                          ok;
981                      true ->
982                          catch Pid ! {trace, false}
983                  end;
984             (_, Acc) ->
985                  Acc
986          end,
987    ets:foldl(Fun, undefined, ibrowse_lb),
988    ets:select_delete(ibrowse_conf, [{{ibrowse_conf,{trace,'$1','$2'},true},[],['true']}]),
989    {noreply, State};
990
991handle_info({trace, Bool}, State) ->
992    put(my_trace_flag, Bool),
993    {noreply, State};
994
995handle_info({trace, Bool, Host, Port}, State) ->
996    Fun = fun(#lb_pid{host_port = {H, P}, pid = Pid}, _)
997             when H == Host,
998                  P == Port ->
999                  catch Pid ! {trace, Bool};
1000             (_, Acc) ->
1001                  Acc
1002          end,
1003    ets:foldl(Fun, undefined, ibrowse_lb),
1004    ets:insert(ibrowse_conf, #ibrowse_conf{key = {trace, Host, Port},
1005                                           value = Bool}),
1006    {noreply, State};
1007
1008handle_info(_Info, State) ->
1009    {noreply, State}.
1010
1011%%--------------------------------------------------------------------
1012%% Function: terminate/2
1013%% Description: Shutdown the server
1014%% Returns: any (ignored by gen_server)
1015%%--------------------------------------------------------------------
1016terminate(_Reason, _State) ->
1017    ok.
1018
1019%%--------------------------------------------------------------------
1020%% Func: code_change/3
1021%% Purpose: Convert process state when code is changed
1022%% Returns: {ok, NewState}
1023%%--------------------------------------------------------------------
1024code_change(_OldVsn, State, _Extra) ->
1025    {ok, State}.
1026
1027%%--------------------------------------------------------------------
1028%%% Internal functions
1029%%--------------------------------------------------------------------
1030do_get_connection(#url{host = Host, port = Port}, []) ->
1031    {ok, Pid} = ibrowse_lb:start_link([Host, Port]),
1032    Pid;
1033do_get_connection(_Url, [#lb_pid{pid = Pid}]) ->
1034    Pid.
1035