1%% Poolboy - A hunky Erlang worker pool factory
2
3-module(poolboy).
4-behaviour(gen_server).
5
6-export([checkout/1, checkout/2, checkout/3, checkin/2, transaction/2,
7         transaction/3, child_spec/2, child_spec/3, start/1, start/2,
8         start_link/1, start_link/2, stop/1, status/1]).
9-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
10         code_change/3]).
11-export_type([pool/0]).
12
13-define(TIMEOUT, 5000).
14
15-ifdef(pre17).
16-type pid_queue() :: queue().
17-else.
18-type pid_queue() :: queue:queue().
19-endif.
20
21-ifdef(OTP_RELEASE). %% this implies 21 or higher
22-define(EXCEPTION(Class, Reason, Stacktrace), Class:Reason:Stacktrace).
23-define(GET_STACK(Stacktrace), Stacktrace).
24-else.
25-define(EXCEPTION(Class, Reason, _), Class:Reason).
26-define(GET_STACK(_), erlang:get_stacktrace()).
27-endif.
28
29-type pool() ::
30    Name :: (atom() | pid()) |
31    {Name :: atom(), node()} |
32    {local, Name :: atom()} |
33    {global, GlobalName :: any()} |
34    {via, Module :: atom(), ViaName :: any()}.
35
36% Copied from gen:start_ret/0
37-type start_ret() :: {'ok', pid()} | 'ignore' | {'error', term()}.
38
39-record(state, {
40    supervisor :: undefined | pid(),
41    workers = [] :: [pid()],
42    waiting :: pid_queue(),
43    monitors :: ets:tid(),
44    size = 5 :: non_neg_integer(),
45    overflow = 0 :: non_neg_integer(),
46    max_overflow = 10 :: non_neg_integer(),
47    strategy = lifo :: lifo | fifo
48}).
49
50-spec checkout(Pool :: pool()) -> pid().
51checkout(Pool) ->
52    checkout(Pool, true).
53
54-spec checkout(Pool :: pool(), Block :: boolean()) -> pid() | full.
55checkout(Pool, Block) ->
56    checkout(Pool, Block, ?TIMEOUT).
57
58-spec checkout(Pool :: pool(), Block :: boolean(), Timeout :: timeout())
59    -> pid() | full.
60checkout(Pool, Block, Timeout) ->
61    CRef = make_ref(),
62    try
63        gen_server:call(Pool, {checkout, CRef, Block}, Timeout)
64    catch
65        ?EXCEPTION(Class, Reason, Stacktrace) ->
66            gen_server:cast(Pool, {cancel_waiting, CRef}),
67            erlang:raise(Class, Reason, ?GET_STACK(Stacktrace))
68    end.
69
70-spec checkin(Pool :: pool(), Worker :: pid()) -> ok.
71checkin(Pool, Worker) when is_pid(Worker) ->
72    gen_server:cast(Pool, {checkin, Worker}).
73
74-spec transaction(Pool :: pool(), Fun :: fun((Worker :: pid()) -> any()))
75    -> any().
76transaction(Pool, Fun) ->
77    transaction(Pool, Fun, ?TIMEOUT).
78
79-spec transaction(Pool :: pool(), Fun :: fun((Worker :: pid()) -> any()),
80    Timeout :: timeout()) -> any().
81transaction(Pool, Fun, Timeout) ->
82    Worker = poolboy:checkout(Pool, true, Timeout),
83    try
84        Fun(Worker)
85    after
86        ok = poolboy:checkin(Pool, Worker)
87    end.
88
89-spec child_spec(PoolId :: term(), PoolArgs :: proplists:proplist())
90    -> supervisor:child_spec().
91child_spec(PoolId, PoolArgs) ->
92    child_spec(PoolId, PoolArgs, []).
93
94-spec child_spec(PoolId :: term(),
95                 PoolArgs :: proplists:proplist(),
96                 WorkerArgs :: proplists:proplist())
97    -> supervisor:child_spec().
98child_spec(PoolId, PoolArgs, WorkerArgs) ->
99    {PoolId, {poolboy, start_link, [PoolArgs, WorkerArgs]},
100     permanent, 5000, worker, [poolboy]}.
101
102-spec start(PoolArgs :: proplists:proplist())
103    -> start_ret().
104start(PoolArgs) ->
105    start(PoolArgs, PoolArgs).
106
107-spec start(PoolArgs :: proplists:proplist(),
108            WorkerArgs:: proplists:proplist())
109    -> start_ret().
110start(PoolArgs, WorkerArgs) ->
111    start_pool(start, PoolArgs, WorkerArgs).
112
113-spec start_link(PoolArgs :: proplists:proplist())
114    -> start_ret().
115start_link(PoolArgs)  ->
116    %% for backwards compatability, pass the pool args as the worker args as well
117    start_link(PoolArgs, PoolArgs).
118
119-spec start_link(PoolArgs :: proplists:proplist(),
120                 WorkerArgs:: proplists:proplist())
121    -> start_ret().
122start_link(PoolArgs, WorkerArgs)  ->
123    start_pool(start_link, PoolArgs, WorkerArgs).
124
125-spec stop(Pool :: pool()) -> ok.
126stop(Pool) ->
127    gen_server:call(Pool, stop).
128
129-spec status(Pool :: pool()) -> {atom(), integer(), integer(), integer()}.
130status(Pool) ->
131    gen_server:call(Pool, status).
132
133init({PoolArgs, WorkerArgs}) ->
134    process_flag(trap_exit, true),
135    Waiting = queue:new(),
136    Monitors = ets:new(monitors, [private]),
137    init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors}).
138
139init([{worker_module, Mod} | Rest], WorkerArgs, State) when is_atom(Mod) ->
140    {ok, Sup} = poolboy_sup:start_link(Mod, WorkerArgs),
141    init(Rest, WorkerArgs, State#state{supervisor = Sup});
142init([{size, Size} | Rest], WorkerArgs, State) when is_integer(Size) ->
143    init(Rest, WorkerArgs, State#state{size = Size});
144init([{max_overflow, MaxOverflow} | Rest], WorkerArgs, State) when is_integer(MaxOverflow) ->
145    init(Rest, WorkerArgs, State#state{max_overflow = MaxOverflow});
146init([{strategy, lifo} | Rest], WorkerArgs, State) ->
147    init(Rest, WorkerArgs, State#state{strategy = lifo});
148init([{strategy, fifo} | Rest], WorkerArgs, State) ->
149    init(Rest, WorkerArgs, State#state{strategy = fifo});
150init([_ | Rest], WorkerArgs, State) ->
151    init(Rest, WorkerArgs, State);
152init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) ->
153    Workers = prepopulate(Size, Sup),
154    {ok, State#state{workers = Workers}}.
155
156handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) ->
157    case ets:lookup(Monitors, Pid) of
158        [{Pid, _, MRef}] ->
159            true = erlang:demonitor(MRef),
160            true = ets:delete(Monitors, Pid),
161            NewState = handle_checkin(Pid, State),
162            {noreply, NewState};
163        [] ->
164            {noreply, State}
165    end;
166
167handle_cast({cancel_waiting, CRef}, State) ->
168    case ets:match(State#state.monitors, {'$1', CRef, '$2'}) of
169        [[Pid, MRef]] ->
170            demonitor(MRef, [flush]),
171            true = ets:delete(State#state.monitors, Pid),
172            NewState = handle_checkin(Pid, State),
173            {noreply, NewState};
174        [] ->
175            Cancel = fun({_, Ref, MRef}) when Ref =:= CRef ->
176                             demonitor(MRef, [flush]),
177                             false;
178                        (_) ->
179                             true
180                     end,
181            Waiting = queue:filter(Cancel, State#state.waiting),
182            {noreply, State#state{waiting = Waiting}}
183    end;
184
185handle_cast(_Msg, State) ->
186    {noreply, State}.
187
188handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) ->
189    #state{supervisor = Sup,
190           workers = Workers,
191           monitors = Monitors,
192           overflow = Overflow,
193           max_overflow = MaxOverflow} = State,
194    case Workers of
195        [Pid | Left] ->
196            MRef = erlang:monitor(process, FromPid),
197            true = ets:insert(Monitors, {Pid, CRef, MRef}),
198            {reply, Pid, State#state{workers = Left}};
199        [] when MaxOverflow > 0, Overflow < MaxOverflow ->
200            {Pid, MRef} = new_worker(Sup, FromPid),
201            true = ets:insert(Monitors, {Pid, CRef, MRef}),
202            {reply, Pid, State#state{overflow = Overflow + 1}};
203        [] when Block =:= false ->
204            {reply, full, State};
205        [] ->
206            MRef = erlang:monitor(process, FromPid),
207            Waiting = queue:in({From, CRef, MRef}, State#state.waiting),
208            {noreply, State#state{waiting = Waiting}}
209    end;
210
211handle_call(status, _From, State) ->
212    #state{workers = Workers,
213           monitors = Monitors,
214           overflow = Overflow} = State,
215    StateName = state_name(State),
216    {reply, {StateName, length(Workers), Overflow, ets:info(Monitors, size)}, State};
217handle_call(get_avail_workers, _From, State) ->
218    Workers = State#state.workers,
219    {reply, Workers, State};
220handle_call(get_all_workers, _From, State) ->
221    Sup = State#state.supervisor,
222    WorkerList = supervisor:which_children(Sup),
223    {reply, WorkerList, State};
224handle_call(get_all_monitors, _From, State) ->
225    Monitors = ets:select(State#state.monitors,
226                          [{{'$1', '_', '$2'}, [], [{{'$1', '$2'}}]}]),
227    {reply, Monitors, State};
228handle_call(stop, _From, State) ->
229    {stop, normal, ok, State};
230handle_call(_Msg, _From, State) ->
231    Reply = {error, invalid_message},
232    {reply, Reply, State}.
233
234handle_info({'DOWN', MRef, _, _, _}, State) ->
235    case ets:match(State#state.monitors, {'$1', '_', MRef}) of
236        [[Pid]] ->
237            true = ets:delete(State#state.monitors, Pid),
238            NewState = handle_checkin(Pid, State),
239            {noreply, NewState};
240        [] ->
241            Waiting = queue:filter(fun ({_, _, R}) -> R =/= MRef end, State#state.waiting),
242            {noreply, State#state{waiting = Waiting}}
243    end;
244handle_info({'EXIT', Pid, _Reason}, State) ->
245    #state{supervisor = Sup,
246           monitors = Monitors} = State,
247    case ets:lookup(Monitors, Pid) of
248        [{Pid, _, MRef}] ->
249            true = erlang:demonitor(MRef),
250            true = ets:delete(Monitors, Pid),
251            NewState = handle_worker_exit(Pid, State),
252            {noreply, NewState};
253        [] ->
254            case lists:member(Pid, State#state.workers) of
255                true ->
256                    W = lists:filter(fun (P) -> P =/= Pid end, State#state.workers),
257                    {noreply, State#state{workers = [new_worker(Sup) | W]}};
258                false ->
259                    {noreply, State}
260            end
261    end;
262
263handle_info(_Info, State) ->
264    {noreply, State}.
265
266terminate(_Reason, State) ->
267    ok = lists:foreach(fun (W) -> unlink(W) end, State#state.workers),
268    true = exit(State#state.supervisor, shutdown),
269    ok.
270
271code_change(_OldVsn, State, _Extra) ->
272    {ok, State}.
273
274start_pool(StartFun, PoolArgs, WorkerArgs) ->
275    case proplists:get_value(name, PoolArgs) of
276        undefined ->
277            gen_server:StartFun(?MODULE, {PoolArgs, WorkerArgs}, []);
278        Name ->
279            gen_server:StartFun(Name, ?MODULE, {PoolArgs, WorkerArgs}, [])
280    end.
281
282new_worker(Sup) ->
283    {ok, Pid} = supervisor:start_child(Sup, []),
284    true = link(Pid),
285    Pid.
286
287new_worker(Sup, FromPid) ->
288    Pid = new_worker(Sup),
289    Ref = erlang:monitor(process, FromPid),
290    {Pid, Ref}.
291
292dismiss_worker(Sup, Pid) ->
293    true = unlink(Pid),
294    supervisor:terminate_child(Sup, Pid).
295
296prepopulate(N, _Sup) when N < 1 ->
297    [];
298prepopulate(N, Sup) ->
299    prepopulate(N, Sup, []).
300
301prepopulate(0, _Sup, Workers) ->
302    Workers;
303prepopulate(N, Sup, Workers) ->
304    prepopulate(N-1, Sup, [new_worker(Sup) | Workers]).
305
306handle_checkin(Pid, State) ->
307    #state{supervisor = Sup,
308           waiting = Waiting,
309           monitors = Monitors,
310           overflow = Overflow,
311           strategy = Strategy} = State,
312    case queue:out(Waiting) of
313        {{value, {From, CRef, MRef}}, Left} ->
314            true = ets:insert(Monitors, {Pid, CRef, MRef}),
315            gen_server:reply(From, Pid),
316            State#state{waiting = Left};
317        {empty, Empty} when Overflow > 0 ->
318            ok = dismiss_worker(Sup, Pid),
319            State#state{waiting = Empty, overflow = Overflow - 1};
320        {empty, Empty} ->
321            Workers = case Strategy of
322                lifo -> [Pid | State#state.workers];
323                fifo -> State#state.workers ++ [Pid]
324            end,
325            State#state{workers = Workers, waiting = Empty, overflow = 0}
326    end.
327
328handle_worker_exit(Pid, State) ->
329    #state{supervisor = Sup,
330           monitors = Monitors,
331           overflow = Overflow} = State,
332    case queue:out(State#state.waiting) of
333        {{value, {From, CRef, MRef}}, LeftWaiting} ->
334            NewWorker = new_worker(State#state.supervisor),
335            true = ets:insert(Monitors, {NewWorker, CRef, MRef}),
336            gen_server:reply(From, NewWorker),
337            State#state{waiting = LeftWaiting};
338        {empty, Empty} when Overflow > 0 ->
339            State#state{overflow = Overflow - 1, waiting = Empty};
340        {empty, Empty} ->
341            Workers =
342                [new_worker(Sup)
343                 | lists:filter(fun (P) -> P =/= Pid end, State#state.workers)],
344            State#state{workers = Workers, waiting = Empty}
345    end.
346
347state_name(State = #state{overflow = Overflow}) when Overflow < 1 ->
348    #state{max_overflow = MaxOverflow, workers = Workers} = State,
349    case length(Workers) == 0 of
350        true when MaxOverflow < 1 -> full;
351        true -> overflow;
352        false -> ready
353    end;
354state_name(#state{overflow = MaxOverflow, max_overflow = MaxOverflow}) ->
355    full;
356state_name(_State) ->
357    overflow.
358