1%%% Copyright (C) 2009 Enrique Marcote, Miguel Rodriguez
2%%% All rights reserved.
3%%%
4%%% Redistribution and use in source and binary forms, with or without
5%%% modification, are permitted provided that the following conditions are met:
6%%%
7%%% o Redistributions of source code must retain the above copyright notice,
8%%%   this list of conditions and the following disclaimer.
9%%%
10%%% o Redistributions in binary form must reproduce the above copyright notice,
11%%%   this list of conditions and the following disclaimer in the documentation
12%%%   and/or other materials provided with the distribution.
13%%%
14%%% o Neither the name of ERLANG TRAINING AND CONSULTING nor the names of its
15%%%   contributors may be used to endorse or promote products derived from this
16%%%   software without specific prior written permission.
17%%%
18%%% THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19%%% AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20%%% IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21%%% ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22%%% LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23%%% CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24%%% SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25%%% INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26%%% CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27%%% ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28%%% POSSIBILITY OF SUCH DAMAGE.
29-module(cl_queue_srv).
30-behaviour(gen_server).
31
32%%% START/STOP EXPORTS
33-export([start_link/0, start_link/1, stop/1]).
34
35%%% QUEUE EXPORTS
36-export([in/3, len/1, out/1, out/2, out_r/1, out_r/2]).
37
38%%% COUNTER EXPORTS
39-export([count_in/1, count_out/1, count_reset/1]).
40
41%%% RPS EXPORTS
42-export([rps/1, rps_avg/1]).
43
44%%% INIT/TERMINATE EXPORTS
45-export([init/1, terminate/2]).
46
47%%% HANDLE MESSAGES EXPORTS
48-export([handle_call/3, handle_cast/2, handle_info/2]).
49
50%%% CODE UPDATE EXPORTS
51-export([code_change/3]).
52
53%%% RECORDS
54-record(st, {parent, queue, in = 0, out = 0, start = now()}).
55
56%%%-----------------------------------------------------------------------------
57%%% START/STOP EXPORTS
58%%%-----------------------------------------------------------------------------
59start_link() ->
60    start_link(undefined).
61
62start_link(File) ->
63    gen_server:start(?MODULE, [self(), File], []).  % See init
64
65
66stop(Pid) ->
67    gen_server:call(Pid, stop).
68
69%%%-----------------------------------------------------------------------------
70%%% QUEUE EXPORTS
71%%%-----------------------------------------------------------------------------
72in(Pid, Item, Priority) ->
73    gen_server:call(Pid, {in, Item, Priority}, infinity).
74
75
76len(Pid) ->
77    gen_server:call(Pid, len, infinity).
78
79
80out(Pid) ->
81    out(Pid, 1).
82
83out(Pid, Num) when Num >= 0 ->
84    gen_server:call(Pid, {out, Num}, infinity).
85
86
87out_r(Pid) ->
88    out_r(Pid, 1).
89
90out_r(Pid, Num) when Num >= 0 ->
91    gen_server:call(Pid, {out_r, Num}, infinity).
92
93%%%-----------------------------------------------------------------------------
94%%% COUNTER EXPORTS
95%%%-----------------------------------------------------------------------------
96count_in(Pid) ->
97    gen_server:call(Pid, count_in, infinity).
98
99
100count_out(Pid) ->
101    gen_server:call(Pid, count_out, infinity).
102
103
104count_reset(Pid) ->
105    gen_server:cast(Pid, count_reset).
106
107%%%-----------------------------------------------------------------------------
108%%% RPS EXPORTS
109%%%-----------------------------------------------------------------------------
110rps_avg(Pid) ->
111    gen_server:call(Pid, rps_avg, infinity).
112
113
114rps(Pid) ->
115    gen_server:call(Pid, rps, infinity).
116
117%%%-----------------------------------------------------------------------------
118%%% INIT/TERMINATE EXPORTS
119%%%-----------------------------------------------------------------------------
120init([Parent, File]) ->
121    % Since the gen_server handles exits from the parent process when
122    % started with start_link, we need to do this to ensure that we will
123    % receive the exit signal in handle_info.
124    link(Parent),
125    process_flag(trap_exit, true),
126    case queue_open(File) of
127        {ok, Queue} ->
128            {ok, #st{parent = Parent, queue = Queue}};
129        {error, Reason} ->
130            {stop, Reason}
131    end.
132
133
134terminate(_Reason, St) ->
135    ok = queue_close(St#st.queue).
136
137%%%-----------------------------------------------------------------------------
138%%% HANDLE MESSAGES EXPORTS
139%%%-----------------------------------------------------------------------------
140handle_call({in, Item, Priority}, _From, St) ->
141    Queue = queue_in(Item, St#st.queue, Priority),
142    {reply, ok, St#st{queue = Queue, in = St#st.in + 1}};
143handle_call(len, _From, St) ->
144    {reply, queue_len(St#st.queue), St};
145handle_call({out, Num}, _From, St) ->
146    {Result, Queue} = queue_out_num(St#st.queue, Num),
147    Len = length(Result),
148    {reply, Result, St#st{queue = Queue, out = St#st.out + Len}};
149handle_call({out_r, Num}, _From, St) ->
150    {Result, Queue} = queue_out_r_num(St#st.queue, Num),
151    Len = length(Result),
152    {reply, Result, St#st{queue = Queue, out = St#st.out + Len}};
153handle_call(count_in, _From, St) ->
154    {reply, St#st.in, St};
155handle_call(count_out, _From, St) ->
156    {reply, St#st.out, St};
157handle_call(rps_avg, _From, St) ->
158    Secs = timer:now_diff(now(), St#st.start) / 1000000,
159    {reply, round(St#st.out / Secs), St};
160handle_call(rps, From, St) ->
161    _Ref = erlang:start_timer(1000, self(), {rps, From, St#st.out}),
162    {noreply, St};
163handle_call(stop, _From, St) ->
164    {stop, normal, ok, St}.
165
166
167handle_cast(count_reset, St) ->
168    {noreply, St#st{in = 0, out = 0, start = now()}}.
169
170
171handle_info({timeout, _Ref, {rps, From, Out}}, St) ->
172    gen_server:reply(From, St#st.out - Out),
173    {noreply, St};
174handle_info({'EXIT', Parent, Reason}, #st{parent = Parent} = St) ->
175    % Close the queue and remove it from the state to prevent dumping the
176    % entire queue structure to the error logger.
177    ok = queue_close(St#st.queue),
178    {stop, Reason, St#st{queue = undefined}};
179handle_info(_Info, St) ->
180    {noreply, St}.
181
182%%%-----------------------------------------------------------------------------
183%%% CODE UPDATE EXPORTS
184%%%-----------------------------------------------------------------------------
185code_change(_OldVsn, St, _Extra) ->
186    {ok, St}.
187
188%%%-----------------------------------------------------------------------------
189%%% INTERNAL FUNCTIONS
190%%%-----------------------------------------------------------------------------
191queue_close({cl_dqueue, Q}) ->
192    ok = cl_dqueue:close(Q);
193queue_close(_Other) ->
194    ok.
195
196
197queue_open(undefined) ->
198    {ok, {cl_queue, cl_queue:new()}};
199queue_open(File) ->
200    case cl_dqueue:open(File) of
201        {ok, Q} ->
202            {ok, {cl_dqueue, Q}};
203        Error ->
204            Error
205    end.
206
207
208queue_in(Item, {Mod, Q}, Priority) ->
209    {Mod, Mod:in(Item, Q, Priority)}.
210
211
212queue_len({Mod, Q}) ->
213    Mod:len(Q).
214
215
216queue_out_num(Queue, Num) ->
217    queue_out_num(Queue, out, Num, []).
218
219queue_out_r_num(Queue, Num) ->
220    queue_out_num(Queue, out_r, Num, []).
221
222queue_out_num(Queue, _Out, 0, Acc) ->
223    {lists:reverse(Acc), Queue};
224queue_out_num({Mod, Q}, Out, Num, Acc) ->
225    case Mod:Out(Q) of
226        {empty, Q} ->
227            {lists:reverse(Acc), {Mod, Q}};
228        {{value, Item}, NewQ} ->
229            queue_out_num({Mod, NewQ}, Out, Num - 1, [Item | Acc])
230    end.
231