1%% 2%% Licensed to the Apache Software Foundation (ASF) under one 3%% or more contributor license agreements. See the NOTICE file 4%% distributed with this work for additional information 5%% regarding copyright ownership. The ASF licenses this file 6%% to you under the Apache License, Version 2.0 (the 7%% "License"); you may not use this file except in compliance 8%% with the License. You may obtain a copy of the License at 9%% 10%% http://www.apache.org/licenses/LICENSE-2.0 11%% 12%% Unless required by applicable law or agreed to in writing, 13%% software distributed under the License is distributed on an 14%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15%% KIND, either express or implied. See the License for the 16%% specific language governing permissions and limitations 17%% under the License. 18%% 19 20-module(thrift_reconnecting_client). 21 22-behaviour(gen_server). 23 24%% API 25-export([ call/3, 26 get_stats/1, 27 get_and_reset_stats/1 ]). 28 29-export([ start_link/6 ]). 30 31%% gen_server callbacks 32-export([ init/1, 33 handle_call/3, 34 handle_cast/2, 35 handle_info/2, 36 terminate/2, 37 code_change/3 ]). 38 39-record( state, { client = nil, 40 host, 41 port, 42 thrift_svc, 43 thrift_opts, 44 reconn_min, 45 reconn_max, 46 reconn_time = 0, 47 op_cnt_dict, 48 op_time_dict } ). 49 50%%==================================================================== 51%% API 52%%==================================================================== 53%%-------------------------------------------------------------------- 54%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} 55%% Description: Starts the server 56%%-------------------------------------------------------------------- 57start_link( Host, Port, 58 ThriftSvc, ThriftOpts, 59 ReconnMin, ReconnMax ) -> 60 gen_server:start_link( ?MODULE, 61 [ Host, Port, 62 ThriftSvc, ThriftOpts, 63 ReconnMin, ReconnMax ], 64 [] ). 65 66call( Pid, Op, Args ) -> 67 gen_server:call( Pid, { call, Op, Args } ). 68 69get_stats( Pid ) -> 70 gen_server:call( Pid, get_stats ). 71 72get_and_reset_stats( Pid ) -> 73 gen_server:call( Pid, get_and_reset_stats ). 74 75%%==================================================================== 76%% gen_server callbacks 77%%==================================================================== 78 79%%-------------------------------------------------------------------- 80%% Function: init(Args) -> {ok, State} | 81%% {ok, State, Timeout} | 82%% ignore | 83%% {stop, Reason} 84%% Description: Start the server. 85%%-------------------------------------------------------------------- 86init( [ Host, Port, TSvc, TOpts, ReconnMin, ReconnMax ] ) -> 87 process_flag( trap_exit, true ), 88 89 State = #state{ host = Host, 90 port = Port, 91 thrift_svc = TSvc, 92 thrift_opts = TOpts, 93 reconn_min = ReconnMin, 94 reconn_max = ReconnMax, 95 op_cnt_dict = dict:new(), 96 op_time_dict = dict:new() }, 97 98 { ok, try_connect( State ) }. 99 100%%-------------------------------------------------------------------- 101%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | 102%% {reply, Reply, State, Timeout} | 103%% {noreply, State} | 104%% {noreply, State, Timeout} | 105%% {stop, Reason, Reply, State} | 106%% {stop, Reason, State} 107%% Description: Handling call messages 108%%-------------------------------------------------------------------- 109handle_call( { call, Op, _ }, 110 _From, 111 State = #state{ client = nil } ) -> 112 { reply, { error, noconn }, incr_stats( Op, "failfast", 1, State ) }; 113 114handle_call( { call, Op, Args }, 115 _From, 116 State=#state{ client = Client } ) -> 117 118 Timer = timer_fun(), 119 Result = ( catch thrift_client:call( Client, Op, Args) ), 120 Time = Timer(), 121 122 case Result of 123 { C, { ok, Reply } } -> 124 S = incr_stats( Op, "success", Time, State#state{ client = C } ), 125 { reply, {ok, Reply }, S }; 126 { _, { E, Msg } } when E == error; E == exception -> 127 S = incr_stats( Op, "error", Time, try_connect( State ) ), 128 { reply, { E, Msg }, S }; 129 Other -> 130 S = incr_stats( Op, "error", Time, try_connect( State ) ), 131 { reply, Other, S } 132 end; 133 134handle_call( get_stats, 135 _From, 136 State = #state{} ) -> 137 { reply, stats( State ), State }; 138 139handle_call( get_and_reset_stats, 140 _From, 141 State = #state{} ) -> 142 { reply, stats( State ), reset_stats( State ) }. 143 144%%-------------------------------------------------------------------- 145%% Function: handle_cast(Msg, State) -> {noreply, State} | 146%% {noreply, State, Timeout} | 147%% {stop, Reason, State} 148%% Description: Handling cast messages 149%%-------------------------------------------------------------------- 150handle_cast( _Msg, State ) -> 151 { noreply, State }. 152 153%%-------------------------------------------------------------------- 154%% Function: handle_info(Info, State) -> {noreply, State} | 155%% {noreply, State, Timeout} | 156%% {stop, Reason, State} 157%% Description: Handling all non call/cast messages 158%%-------------------------------------------------------------------- 159handle_info( try_connect, State ) -> 160 { noreply, try_connect( State ) }; 161 162handle_info( _Info, State ) -> 163 { noreply, State }. 164 165%%-------------------------------------------------------------------- 166%% Function: terminate(Reason, State) -> void() 167%% Description: This function is called by a gen_server when it is about to 168%% terminate. It should be the opposite of Module:init/1 and do any necessary 169%% cleaning up. When it returns, the gen_server terminates with Reason. 170%% The return value is ignored. 171%%-------------------------------------------------------------------- 172terminate( _Reason, #state{ client = Client } ) -> 173 thrift_client:close( Client ), 174 ok. 175 176%%-------------------------------------------------------------------- 177%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} 178%% Description: Convert process state when code is changed 179%%-------------------------------------------------------------------- 180code_change( _OldVsn, State, _Extra ) -> 181 { ok, State }. 182 183%%-------------------------------------------------------------------- 184%%% Internal functions 185%%-------------------------------------------------------------------- 186try_connect( State = #state{ client = OldClient, 187 host = Host, 188 port = Port, 189 thrift_svc = TSvc, 190 thrift_opts = TOpts } ) -> 191 192 case OldClient of 193 nil -> ok; 194 _ -> ( catch thrift_client:close( OldClient ) ) 195 end, 196 197 case catch thrift_client_util:new( Host, Port, TSvc, TOpts ) of 198 { ok, Client } -> 199 State#state{ client = Client, reconn_time = 0 }; 200 { E, Msg } when E == error; E == exception -> 201 ReconnTime = reconn_time( State ), 202 error_logger:error_msg( "[~w] ~w connect failed (~w), trying again in ~w ms~n", 203 [ self(), TSvc, Msg, ReconnTime ] ), 204 erlang:send_after( ReconnTime, self(), try_connect ), 205 State#state{ client = nil, reconn_time = ReconnTime } 206 end. 207 208 209reconn_time( #state{ reconn_min = ReconnMin, reconn_time = 0 } ) -> 210 ReconnMin; 211reconn_time( #state{ reconn_max = ReconnMax, reconn_time = ReconnMax } ) -> 212 ReconnMax; 213reconn_time( #state{ reconn_max = ReconnMax, reconn_time = R } ) -> 214 Backoff = 2 * R, 215 case Backoff > ReconnMax of 216 true -> ReconnMax; 217 false -> Backoff 218 end. 219 220-ifdef(time_correction). 221timer_fun() -> 222 T1 = erlang:monotonic_time(), 223 fun() -> 224 T2 = erlang:monotonic_time(), 225 erlang:convert_time_unit(T2 - T1, native, micro_seconds) 226 end. 227-else. 228timer_fun() -> 229 T1 = erlang:timestamp(), 230 fun() -> 231 T2 = erlang:timestamp(), 232 timer:now_diff(T2, T1) 233 end. 234-endif. 235 236incr_stats( Op, Result, Time, 237 State = #state{ op_cnt_dict = OpCntDict, 238 op_time_dict = OpTimeDict } ) -> 239 Key = lists:flatten( [ atom_to_list( Op ), [ "_" | Result ] ] ), 240 State#state{ op_cnt_dict = dict:update_counter( Key, 1, OpCntDict ), 241 op_time_dict = dict:update_counter( Key, Time, OpTimeDict ) }. 242 243 244stats( #state{ thrift_svc = TSvc, 245 op_cnt_dict = OpCntDict, 246 op_time_dict = OpTimeDict } ) -> 247 Svc = atom_to_list(TSvc), 248 249 F = fun( Key, Count, Stats ) -> 250 Name = lists:flatten( [ Svc, [ "_" | Key ] ] ), 251 Micros = dict:fetch( Key, OpTimeDict ), 252 [ { Name, Count, Micros } | Stats ] 253 end, 254 255 dict:fold( F, [], OpCntDict ). 256 257reset_stats( State = #state{} ) -> 258 State#state{ op_cnt_dict = dict:new(), op_time_dict = dict:new() }. 259