1%%
2%% Licensed to the Apache Software Foundation (ASF) under one
3%% or more contributor license agreements. See the NOTICE file
4%% distributed with this work for additional information
5%% regarding copyright ownership. The ASF licenses this file
6%% to you under the Apache License, Version 2.0 (the
7%% "License"); you may not use this file except in compliance
8%% with the License. You may obtain a copy of the License at
9%%
10%%   http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing,
13%% software distributed under the License is distributed on an
14%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15%% KIND, either express or implied. See the License for the
16%% specific language governing permissions and limitations
17%% under the License.
18%%
19
20-module(thrift_reconnecting_client).
21
22-behaviour(gen_server).
23
24%% API
25-export([ call/3,
26          get_stats/1,
27          get_and_reset_stats/1 ]).
28
29-export([ start_link/6 ]).
30
31%% gen_server callbacks
32-export([ init/1,
33          handle_call/3,
34          handle_cast/2,
35          handle_info/2,
36          terminate/2,
37          code_change/3 ]).
38
39-record( state, { client = nil,
40                  host,
41                  port,
42                  thrift_svc,
43                  thrift_opts,
44                  reconn_min,
45                  reconn_max,
46                  reconn_time = 0,
47                  op_cnt_dict,
48                  op_time_dict } ).
49
50%%====================================================================
51%% API
52%%====================================================================
53%%--------------------------------------------------------------------
54%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
55%% Description: Starts the server
56%%--------------------------------------------------------------------
57start_link( Host, Port,
58            ThriftSvc, ThriftOpts,
59            ReconnMin, ReconnMax ) ->
60  gen_server:start_link( ?MODULE,
61                         [ Host, Port,
62                           ThriftSvc, ThriftOpts,
63                           ReconnMin, ReconnMax ],
64                         [] ).
65
66call( Pid, Op, Args ) ->
67  gen_server:call( Pid, { call, Op, Args } ).
68
69get_stats( Pid ) ->
70  gen_server:call( Pid, get_stats ).
71
72get_and_reset_stats( Pid ) ->
73  gen_server:call( Pid, get_and_reset_stats ).
74
75%%====================================================================
76%% gen_server callbacks
77%%====================================================================
78
79%%--------------------------------------------------------------------
80%% Function: init(Args) -> {ok, State} |
81%%                         {ok, State, Timeout} |
82%%                         ignore               |
83%%                         {stop, Reason}
84%% Description: Start the server.
85%%--------------------------------------------------------------------
86init( [ Host, Port, TSvc, TOpts, ReconnMin, ReconnMax ] ) ->
87  process_flag( trap_exit, true ),
88
89  State = #state{ host         = Host,
90                  port         = Port,
91                  thrift_svc   = TSvc,
92                  thrift_opts  = TOpts,
93                  reconn_min   = ReconnMin,
94                  reconn_max   = ReconnMax,
95                  op_cnt_dict  = dict:new(),
96                  op_time_dict = dict:new() },
97
98  { ok, try_connect( State ) }.
99
100%%--------------------------------------------------------------------
101%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
102%%                                                   {reply, Reply, State, Timeout} |
103%%                                                   {noreply, State} |
104%%                                                   {noreply, State, Timeout} |
105%%                                                   {stop, Reason, Reply, State} |
106%%                                                   {stop, Reason, State}
107%% Description: Handling call messages
108%%--------------------------------------------------------------------
109handle_call( { call, Op, _ },
110             _From,
111             State = #state{ client = nil } ) ->
112  { reply, { error, noconn }, incr_stats( Op, "failfast", 1, State ) };
113
114handle_call( { call, Op, Args },
115             _From,
116             State=#state{ client = Client } ) ->
117
118  Timer = timer_fun(),
119  Result = ( catch thrift_client:call( Client, Op, Args) ),
120  Time = Timer(),
121
122  case Result of
123    { C, { ok, Reply } } ->
124      S = incr_stats( Op, "success", Time, State#state{ client = C } ),
125      { reply, {ok, Reply }, S };
126    { _, { E, Msg } } when E == error; E == exception ->
127      S = incr_stats( Op, "error", Time, try_connect( State ) ),
128      { reply, { E, Msg }, S };
129    Other ->
130      S = incr_stats( Op, "error", Time, try_connect( State ) ),
131      { reply, Other, S }
132  end;
133
134handle_call( get_stats,
135             _From,
136             State = #state{} ) ->
137  { reply, stats( State ), State };
138
139handle_call( get_and_reset_stats,
140             _From,
141             State = #state{} ) ->
142  { reply, stats( State ), reset_stats( State ) }.
143
144%%--------------------------------------------------------------------
145%% Function: handle_cast(Msg, State) -> {noreply, State} |
146%%                                      {noreply, State, Timeout} |
147%%                                      {stop, Reason, State}
148%% Description: Handling cast messages
149%%--------------------------------------------------------------------
150handle_cast( _Msg, State ) ->
151  { noreply, State }.
152
153%%--------------------------------------------------------------------
154%% Function: handle_info(Info, State) -> {noreply, State} |
155%%                                       {noreply, State, Timeout} |
156%%                                       {stop, Reason, State}
157%% Description: Handling all non call/cast messages
158%%--------------------------------------------------------------------
159handle_info( try_connect, State ) ->
160  { noreply, try_connect( State ) };
161
162handle_info( _Info, State ) ->
163  { noreply, State }.
164
165%%--------------------------------------------------------------------
166%% Function: terminate(Reason, State) -> void()
167%% Description: This function is called by a gen_server when it is about to
168%% terminate. It should be the opposite of Module:init/1 and do any necessary
169%% cleaning up. When it returns, the gen_server terminates with Reason.
170%% The return value is ignored.
171%%--------------------------------------------------------------------
172terminate( _Reason, #state{ client = Client } ) ->
173  thrift_client:close( Client ),
174  ok.
175
176%%--------------------------------------------------------------------
177%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
178%% Description: Convert process state when code is changed
179%%--------------------------------------------------------------------
180code_change( _OldVsn, State, _Extra ) ->
181  { ok, State }.
182
183%%--------------------------------------------------------------------
184%%% Internal functions
185%%--------------------------------------------------------------------
186try_connect( State = #state{ client      = OldClient,
187                             host        = Host,
188                             port        = Port,
189                             thrift_svc  = TSvc,
190                             thrift_opts = TOpts } ) ->
191
192  case OldClient of
193    nil -> ok;
194    _   -> ( catch thrift_client:close( OldClient ) )
195  end,
196
197  case catch thrift_client_util:new( Host, Port, TSvc, TOpts ) of
198    { ok, Client } ->
199      State#state{ client = Client, reconn_time = 0 };
200    { E, Msg } when E == error; E == exception ->
201      ReconnTime = reconn_time( State ),
202      error_logger:error_msg( "[~w] ~w connect failed (~w), trying again in ~w ms~n",
203                              [ self(), TSvc, Msg, ReconnTime ] ),
204      erlang:send_after( ReconnTime, self(), try_connect ),
205      State#state{ client = nil, reconn_time = ReconnTime }
206  end.
207
208
209reconn_time( #state{ reconn_min = ReconnMin, reconn_time = 0 } ) ->
210  ReconnMin;
211reconn_time( #state{ reconn_max = ReconnMax, reconn_time = ReconnMax } ) ->
212  ReconnMax;
213reconn_time( #state{ reconn_max = ReconnMax, reconn_time = R } ) ->
214  Backoff = 2 * R,
215  case Backoff > ReconnMax of
216    true  -> ReconnMax;
217    false -> Backoff
218  end.
219
220-ifdef(time_correction).
221timer_fun() ->
222  T1 = erlang:monotonic_time(),
223  fun() ->
224    T2 = erlang:monotonic_time(),
225    erlang:convert_time_unit(T2 - T1, native, micro_seconds)
226  end.
227-else.
228timer_fun() ->
229  T1 = erlang:timestamp(),
230  fun() ->
231    T2 = erlang:timestamp(),
232    timer:now_diff(T2, T1)
233  end.
234-endif.
235
236incr_stats( Op, Result, Time,
237            State = #state{ op_cnt_dict  = OpCntDict,
238                            op_time_dict = OpTimeDict } ) ->
239  Key = lists:flatten( [ atom_to_list( Op ), [ "_" | Result ] ] ),
240  State#state{ op_cnt_dict  = dict:update_counter( Key, 1, OpCntDict ),
241               op_time_dict = dict:update_counter( Key, Time, OpTimeDict ) }.
242
243
244stats( #state{ thrift_svc   = TSvc,
245               op_cnt_dict  = OpCntDict,
246               op_time_dict = OpTimeDict } ) ->
247  Svc = atom_to_list(TSvc),
248
249  F = fun( Key, Count, Stats ) ->
250        Name = lists:flatten( [ Svc, [ "_" | Key ] ] ),
251        Micros = dict:fetch( Key, OpTimeDict ),
252        [ { Name, Count, Micros } | Stats ]
253      end,
254
255  dict:fold( F, [], OpCntDict ).
256
257reset_stats( State = #state{} ) ->
258  State#state{ op_cnt_dict = dict:new(), op_time_dict = dict:new() }.
259