1%% Poolboy - A hunky Erlang worker pool factory 2 3-module(poolboy). 4-behaviour(gen_server). 5 6-export([checkout/1, checkout/2, checkout/3, checkin/2, transaction/2, 7 transaction/3, child_spec/2, child_spec/3, start/1, start/2, 8 start_link/1, start_link/2, stop/1, status/1]). 9-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, 10 code_change/3]). 11-export_type([pool/0]). 12 13-define(TIMEOUT, 5000). 14 15-ifdef(pre17). 16-type pid_queue() :: queue(). 17-else. 18-type pid_queue() :: queue:queue(). 19-endif. 20 21-ifdef(OTP_RELEASE). %% this implies 21 or higher 22-define(EXCEPTION(Class, Reason, Stacktrace), Class:Reason:Stacktrace). 23-define(GET_STACK(Stacktrace), Stacktrace). 24-else. 25-define(EXCEPTION(Class, Reason, _), Class:Reason). 26-define(GET_STACK(_), erlang:get_stacktrace()). 27-endif. 28 29-type pool() :: 30 Name :: (atom() | pid()) | 31 {Name :: atom(), node()} | 32 {local, Name :: atom()} | 33 {global, GlobalName :: any()} | 34 {via, Module :: atom(), ViaName :: any()}. 35 36% Copied from gen:start_ret/0 37-type start_ret() :: {'ok', pid()} | 'ignore' | {'error', term()}. 38 39-record(state, { 40 supervisor :: undefined | pid(), 41 workers = [] :: [pid()], 42 waiting :: pid_queue(), 43 monitors :: ets:tid(), 44 size = 5 :: non_neg_integer(), 45 overflow = 0 :: non_neg_integer(), 46 max_overflow = 10 :: non_neg_integer(), 47 strategy = lifo :: lifo | fifo 48}). 49 50-spec checkout(Pool :: pool()) -> pid(). 51checkout(Pool) -> 52 checkout(Pool, true). 53 54-spec checkout(Pool :: pool(), Block :: boolean()) -> pid() | full. 55checkout(Pool, Block) -> 56 checkout(Pool, Block, ?TIMEOUT). 57 58-spec checkout(Pool :: pool(), Block :: boolean(), Timeout :: timeout()) 59 -> pid() | full. 60checkout(Pool, Block, Timeout) -> 61 CRef = make_ref(), 62 try 63 gen_server:call(Pool, {checkout, CRef, Block}, Timeout) 64 catch 65 ?EXCEPTION(Class, Reason, Stacktrace) -> 66 gen_server:cast(Pool, {cancel_waiting, CRef}), 67 erlang:raise(Class, Reason, ?GET_STACK(Stacktrace)) 68 end. 69 70-spec checkin(Pool :: pool(), Worker :: pid()) -> ok. 71checkin(Pool, Worker) when is_pid(Worker) -> 72 gen_server:cast(Pool, {checkin, Worker}). 73 74-spec transaction(Pool :: pool(), Fun :: fun((Worker :: pid()) -> any())) 75 -> any(). 76transaction(Pool, Fun) -> 77 transaction(Pool, Fun, ?TIMEOUT). 78 79-spec transaction(Pool :: pool(), Fun :: fun((Worker :: pid()) -> any()), 80 Timeout :: timeout()) -> any(). 81transaction(Pool, Fun, Timeout) -> 82 Worker = poolboy:checkout(Pool, true, Timeout), 83 try 84 Fun(Worker) 85 after 86 ok = poolboy:checkin(Pool, Worker) 87 end. 88 89-spec child_spec(PoolId :: term(), PoolArgs :: proplists:proplist()) 90 -> supervisor:child_spec(). 91child_spec(PoolId, PoolArgs) -> 92 child_spec(PoolId, PoolArgs, []). 93 94-spec child_spec(PoolId :: term(), 95 PoolArgs :: proplists:proplist(), 96 WorkerArgs :: proplists:proplist()) 97 -> supervisor:child_spec(). 98child_spec(PoolId, PoolArgs, WorkerArgs) -> 99 {PoolId, {poolboy, start_link, [PoolArgs, WorkerArgs]}, 100 permanent, 5000, worker, [poolboy]}. 101 102-spec start(PoolArgs :: proplists:proplist()) 103 -> start_ret(). 104start(PoolArgs) -> 105 start(PoolArgs, PoolArgs). 106 107-spec start(PoolArgs :: proplists:proplist(), 108 WorkerArgs:: proplists:proplist()) 109 -> start_ret(). 110start(PoolArgs, WorkerArgs) -> 111 start_pool(start, PoolArgs, WorkerArgs). 112 113-spec start_link(PoolArgs :: proplists:proplist()) 114 -> start_ret(). 115start_link(PoolArgs) -> 116 %% for backwards compatability, pass the pool args as the worker args as well 117 start_link(PoolArgs, PoolArgs). 118 119-spec start_link(PoolArgs :: proplists:proplist(), 120 WorkerArgs:: proplists:proplist()) 121 -> start_ret(). 122start_link(PoolArgs, WorkerArgs) -> 123 start_pool(start_link, PoolArgs, WorkerArgs). 124 125-spec stop(Pool :: pool()) -> ok. 126stop(Pool) -> 127 gen_server:call(Pool, stop). 128 129-spec status(Pool :: pool()) -> {atom(), integer(), integer(), integer()}. 130status(Pool) -> 131 gen_server:call(Pool, status). 132 133init({PoolArgs, WorkerArgs}) -> 134 process_flag(trap_exit, true), 135 Waiting = queue:new(), 136 Monitors = ets:new(monitors, [private]), 137 init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors}). 138 139init([{worker_module, Mod} | Rest], WorkerArgs, State) when is_atom(Mod) -> 140 {ok, Sup} = poolboy_sup:start_link(Mod, WorkerArgs), 141 init(Rest, WorkerArgs, State#state{supervisor = Sup}); 142init([{size, Size} | Rest], WorkerArgs, State) when is_integer(Size) -> 143 init(Rest, WorkerArgs, State#state{size = Size}); 144init([{max_overflow, MaxOverflow} | Rest], WorkerArgs, State) when is_integer(MaxOverflow) -> 145 init(Rest, WorkerArgs, State#state{max_overflow = MaxOverflow}); 146init([{strategy, lifo} | Rest], WorkerArgs, State) -> 147 init(Rest, WorkerArgs, State#state{strategy = lifo}); 148init([{strategy, fifo} | Rest], WorkerArgs, State) -> 149 init(Rest, WorkerArgs, State#state{strategy = fifo}); 150init([_ | Rest], WorkerArgs, State) -> 151 init(Rest, WorkerArgs, State); 152init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) -> 153 Workers = prepopulate(Size, Sup), 154 {ok, State#state{workers = Workers}}. 155 156handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) -> 157 case ets:lookup(Monitors, Pid) of 158 [{Pid, _, MRef}] -> 159 true = erlang:demonitor(MRef), 160 true = ets:delete(Monitors, Pid), 161 NewState = handle_checkin(Pid, State), 162 {noreply, NewState}; 163 [] -> 164 {noreply, State} 165 end; 166 167handle_cast({cancel_waiting, CRef}, State) -> 168 case ets:match(State#state.monitors, {'$1', CRef, '$2'}) of 169 [[Pid, MRef]] -> 170 demonitor(MRef, [flush]), 171 true = ets:delete(State#state.monitors, Pid), 172 NewState = handle_checkin(Pid, State), 173 {noreply, NewState}; 174 [] -> 175 Cancel = fun({_, Ref, MRef}) when Ref =:= CRef -> 176 demonitor(MRef, [flush]), 177 false; 178 (_) -> 179 true 180 end, 181 Waiting = queue:filter(Cancel, State#state.waiting), 182 {noreply, State#state{waiting = Waiting}} 183 end; 184 185handle_cast(_Msg, State) -> 186 {noreply, State}. 187 188handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) -> 189 #state{supervisor = Sup, 190 workers = Workers, 191 monitors = Monitors, 192 overflow = Overflow, 193 max_overflow = MaxOverflow} = State, 194 case Workers of 195 [Pid | Left] -> 196 MRef = erlang:monitor(process, FromPid), 197 true = ets:insert(Monitors, {Pid, CRef, MRef}), 198 {reply, Pid, State#state{workers = Left}}; 199 [] when MaxOverflow > 0, Overflow < MaxOverflow -> 200 {Pid, MRef} = new_worker(Sup, FromPid), 201 true = ets:insert(Monitors, {Pid, CRef, MRef}), 202 {reply, Pid, State#state{overflow = Overflow + 1}}; 203 [] when Block =:= false -> 204 {reply, full, State}; 205 [] -> 206 MRef = erlang:monitor(process, FromPid), 207 Waiting = queue:in({From, CRef, MRef}, State#state.waiting), 208 {noreply, State#state{waiting = Waiting}} 209 end; 210 211handle_call(status, _From, State) -> 212 #state{workers = Workers, 213 monitors = Monitors, 214 overflow = Overflow} = State, 215 StateName = state_name(State), 216 {reply, {StateName, length(Workers), Overflow, ets:info(Monitors, size)}, State}; 217handle_call(get_avail_workers, _From, State) -> 218 Workers = State#state.workers, 219 {reply, Workers, State}; 220handle_call(get_all_workers, _From, State) -> 221 Sup = State#state.supervisor, 222 WorkerList = supervisor:which_children(Sup), 223 {reply, WorkerList, State}; 224handle_call(get_all_monitors, _From, State) -> 225 Monitors = ets:select(State#state.monitors, 226 [{{'$1', '_', '$2'}, [], [{{'$1', '$2'}}]}]), 227 {reply, Monitors, State}; 228handle_call(stop, _From, State) -> 229 {stop, normal, ok, State}; 230handle_call(_Msg, _From, State) -> 231 Reply = {error, invalid_message}, 232 {reply, Reply, State}. 233 234handle_info({'DOWN', MRef, _, _, _}, State) -> 235 case ets:match(State#state.monitors, {'$1', '_', MRef}) of 236 [[Pid]] -> 237 true = ets:delete(State#state.monitors, Pid), 238 NewState = handle_checkin(Pid, State), 239 {noreply, NewState}; 240 [] -> 241 Waiting = queue:filter(fun ({_, _, R}) -> R =/= MRef end, State#state.waiting), 242 {noreply, State#state{waiting = Waiting}} 243 end; 244handle_info({'EXIT', Pid, _Reason}, State) -> 245 #state{supervisor = Sup, 246 monitors = Monitors} = State, 247 case ets:lookup(Monitors, Pid) of 248 [{Pid, _, MRef}] -> 249 true = erlang:demonitor(MRef), 250 true = ets:delete(Monitors, Pid), 251 NewState = handle_worker_exit(Pid, State), 252 {noreply, NewState}; 253 [] -> 254 case lists:member(Pid, State#state.workers) of 255 true -> 256 W = lists:filter(fun (P) -> P =/= Pid end, State#state.workers), 257 {noreply, State#state{workers = [new_worker(Sup) | W]}}; 258 false -> 259 {noreply, State} 260 end 261 end; 262 263handle_info(_Info, State) -> 264 {noreply, State}. 265 266terminate(_Reason, State) -> 267 ok = lists:foreach(fun (W) -> unlink(W) end, State#state.workers), 268 true = exit(State#state.supervisor, shutdown), 269 ok. 270 271code_change(_OldVsn, State, _Extra) -> 272 {ok, State}. 273 274start_pool(StartFun, PoolArgs, WorkerArgs) -> 275 case proplists:get_value(name, PoolArgs) of 276 undefined -> 277 gen_server:StartFun(?MODULE, {PoolArgs, WorkerArgs}, []); 278 Name -> 279 gen_server:StartFun(Name, ?MODULE, {PoolArgs, WorkerArgs}, []) 280 end. 281 282new_worker(Sup) -> 283 {ok, Pid} = supervisor:start_child(Sup, []), 284 true = link(Pid), 285 Pid. 286 287new_worker(Sup, FromPid) -> 288 Pid = new_worker(Sup), 289 Ref = erlang:monitor(process, FromPid), 290 {Pid, Ref}. 291 292dismiss_worker(Sup, Pid) -> 293 true = unlink(Pid), 294 supervisor:terminate_child(Sup, Pid). 295 296prepopulate(N, _Sup) when N < 1 -> 297 []; 298prepopulate(N, Sup) -> 299 prepopulate(N, Sup, []). 300 301prepopulate(0, _Sup, Workers) -> 302 Workers; 303prepopulate(N, Sup, Workers) -> 304 prepopulate(N-1, Sup, [new_worker(Sup) | Workers]). 305 306handle_checkin(Pid, State) -> 307 #state{supervisor = Sup, 308 waiting = Waiting, 309 monitors = Monitors, 310 overflow = Overflow, 311 strategy = Strategy} = State, 312 case queue:out(Waiting) of 313 {{value, {From, CRef, MRef}}, Left} -> 314 true = ets:insert(Monitors, {Pid, CRef, MRef}), 315 gen_server:reply(From, Pid), 316 State#state{waiting = Left}; 317 {empty, Empty} when Overflow > 0 -> 318 ok = dismiss_worker(Sup, Pid), 319 State#state{waiting = Empty, overflow = Overflow - 1}; 320 {empty, Empty} -> 321 Workers = case Strategy of 322 lifo -> [Pid | State#state.workers]; 323 fifo -> State#state.workers ++ [Pid] 324 end, 325 State#state{workers = Workers, waiting = Empty, overflow = 0} 326 end. 327 328handle_worker_exit(Pid, State) -> 329 #state{supervisor = Sup, 330 monitors = Monitors, 331 overflow = Overflow} = State, 332 case queue:out(State#state.waiting) of 333 {{value, {From, CRef, MRef}}, LeftWaiting} -> 334 NewWorker = new_worker(State#state.supervisor), 335 true = ets:insert(Monitors, {NewWorker, CRef, MRef}), 336 gen_server:reply(From, NewWorker), 337 State#state{waiting = LeftWaiting}; 338 {empty, Empty} when Overflow > 0 -> 339 State#state{overflow = Overflow - 1, waiting = Empty}; 340 {empty, Empty} -> 341 Workers = 342 [new_worker(Sup) 343 | lists:filter(fun (P) -> P =/= Pid end, State#state.workers)], 344 State#state{workers = Workers, waiting = Empty} 345 end. 346 347state_name(State = #state{overflow = Overflow}) when Overflow < 1 -> 348 #state{max_overflow = MaxOverflow, workers = Workers} = State, 349 case length(Workers) == 0 of 350 true when MaxOverflow < 1 -> full; 351 true -> overflow; 352 false -> ready 353 end; 354state_name(#state{overflow = MaxOverflow, max_overflow = MaxOverflow}) -> 355 full; 356state_name(_State) -> 357 overflow. 358