1%%% Copyright (C) 2009 Enrique Marcote, Miguel Rodriguez 2%%% All rights reserved. 3%%% 4%%% Redistribution and use in source and binary forms, with or without 5%%% modification, are permitted provided that the following conditions are met: 6%%% 7%%% o Redistributions of source code must retain the above copyright notice, 8%%% this list of conditions and the following disclaimer. 9%%% 10%%% o Redistributions in binary form must reproduce the above copyright notice, 11%%% this list of conditions and the following disclaimer in the documentation 12%%% and/or other materials provided with the distribution. 13%%% 14%%% o Neither the name of ERLANG TRAINING AND CONSULTING nor the names of its 15%%% contributors may be used to endorse or promote products derived from this 16%%% software without specific prior written permission. 17%%% 18%%% THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 19%%% AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20%%% IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21%%% ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 22%%% LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23%%% CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24%%% SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25%%% INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26%%% CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27%%% ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28%%% POSSIBILITY OF SUCH DAMAGE. 29-module(cl_queue_srv). 30-behaviour(gen_server). 31 32%%% START/STOP EXPORTS 33-export([start_link/0, start_link/1, stop/1]). 34 35%%% QUEUE EXPORTS 36-export([in/3, len/1, out/1, out/2, out_r/1, out_r/2]). 37 38%%% COUNTER EXPORTS 39-export([count_in/1, count_out/1, count_reset/1]). 40 41%%% RPS EXPORTS 42-export([rps/1, rps_avg/1]). 43 44%%% INIT/TERMINATE EXPORTS 45-export([init/1, terminate/2]). 46 47%%% HANDLE MESSAGES EXPORTS 48-export([handle_call/3, handle_cast/2, handle_info/2]). 49 50%%% CODE UPDATE EXPORTS 51-export([code_change/3]). 52 53%%% RECORDS 54-record(st, {parent, queue, in = 0, out = 0, start = now()}). 55 56%%%----------------------------------------------------------------------------- 57%%% START/STOP EXPORTS 58%%%----------------------------------------------------------------------------- 59start_link() -> 60 start_link(undefined). 61 62start_link(File) -> 63 gen_server:start(?MODULE, [self(), File], []). % See init 64 65 66stop(Pid) -> 67 gen_server:call(Pid, stop). 68 69%%%----------------------------------------------------------------------------- 70%%% QUEUE EXPORTS 71%%%----------------------------------------------------------------------------- 72in(Pid, Item, Priority) -> 73 gen_server:call(Pid, {in, Item, Priority}, infinity). 74 75 76len(Pid) -> 77 gen_server:call(Pid, len, infinity). 78 79 80out(Pid) -> 81 out(Pid, 1). 82 83out(Pid, Num) when Num >= 0 -> 84 gen_server:call(Pid, {out, Num}, infinity). 85 86 87out_r(Pid) -> 88 out_r(Pid, 1). 89 90out_r(Pid, Num) when Num >= 0 -> 91 gen_server:call(Pid, {out_r, Num}, infinity). 92 93%%%----------------------------------------------------------------------------- 94%%% COUNTER EXPORTS 95%%%----------------------------------------------------------------------------- 96count_in(Pid) -> 97 gen_server:call(Pid, count_in, infinity). 98 99 100count_out(Pid) -> 101 gen_server:call(Pid, count_out, infinity). 102 103 104count_reset(Pid) -> 105 gen_server:cast(Pid, count_reset). 106 107%%%----------------------------------------------------------------------------- 108%%% RPS EXPORTS 109%%%----------------------------------------------------------------------------- 110rps_avg(Pid) -> 111 gen_server:call(Pid, rps_avg, infinity). 112 113 114rps(Pid) -> 115 gen_server:call(Pid, rps, infinity). 116 117%%%----------------------------------------------------------------------------- 118%%% INIT/TERMINATE EXPORTS 119%%%----------------------------------------------------------------------------- 120init([Parent, File]) -> 121 % Since the gen_server handles exits from the parent process when 122 % started with start_link, we need to do this to ensure that we will 123 % receive the exit signal in handle_info. 124 link(Parent), 125 process_flag(trap_exit, true), 126 case queue_open(File) of 127 {ok, Queue} -> 128 {ok, #st{parent = Parent, queue = Queue}}; 129 {error, Reason} -> 130 {stop, Reason} 131 end. 132 133 134terminate(_Reason, St) -> 135 ok = queue_close(St#st.queue). 136 137%%%----------------------------------------------------------------------------- 138%%% HANDLE MESSAGES EXPORTS 139%%%----------------------------------------------------------------------------- 140handle_call({in, Item, Priority}, _From, St) -> 141 Queue = queue_in(Item, St#st.queue, Priority), 142 {reply, ok, St#st{queue = Queue, in = St#st.in + 1}}; 143handle_call(len, _From, St) -> 144 {reply, queue_len(St#st.queue), St}; 145handle_call({out, Num}, _From, St) -> 146 {Result, Queue} = queue_out_num(St#st.queue, Num), 147 Len = length(Result), 148 {reply, Result, St#st{queue = Queue, out = St#st.out + Len}}; 149handle_call({out_r, Num}, _From, St) -> 150 {Result, Queue} = queue_out_r_num(St#st.queue, Num), 151 Len = length(Result), 152 {reply, Result, St#st{queue = Queue, out = St#st.out + Len}}; 153handle_call(count_in, _From, St) -> 154 {reply, St#st.in, St}; 155handle_call(count_out, _From, St) -> 156 {reply, St#st.out, St}; 157handle_call(rps_avg, _From, St) -> 158 Secs = timer:now_diff(now(), St#st.start) / 1000000, 159 {reply, round(St#st.out / Secs), St}; 160handle_call(rps, From, St) -> 161 _Ref = erlang:start_timer(1000, self(), {rps, From, St#st.out}), 162 {noreply, St}; 163handle_call(stop, _From, St) -> 164 {stop, normal, ok, St}. 165 166 167handle_cast(count_reset, St) -> 168 {noreply, St#st{in = 0, out = 0, start = now()}}. 169 170 171handle_info({timeout, _Ref, {rps, From, Out}}, St) -> 172 gen_server:reply(From, St#st.out - Out), 173 {noreply, St}; 174handle_info({'EXIT', Parent, Reason}, #st{parent = Parent} = St) -> 175 % Close the queue and remove it from the state to prevent dumping the 176 % entire queue structure to the error logger. 177 ok = queue_close(St#st.queue), 178 {stop, Reason, St#st{queue = undefined}}; 179handle_info(_Info, St) -> 180 {noreply, St}. 181 182%%%----------------------------------------------------------------------------- 183%%% CODE UPDATE EXPORTS 184%%%----------------------------------------------------------------------------- 185code_change(_OldVsn, St, _Extra) -> 186 {ok, St}. 187 188%%%----------------------------------------------------------------------------- 189%%% INTERNAL FUNCTIONS 190%%%----------------------------------------------------------------------------- 191queue_close({cl_dqueue, Q}) -> 192 ok = cl_dqueue:close(Q); 193queue_close(_Other) -> 194 ok. 195 196 197queue_open(undefined) -> 198 {ok, {cl_queue, cl_queue:new()}}; 199queue_open(File) -> 200 case cl_dqueue:open(File) of 201 {ok, Q} -> 202 {ok, {cl_dqueue, Q}}; 203 Error -> 204 Error 205 end. 206 207 208queue_in(Item, {Mod, Q}, Priority) -> 209 {Mod, Mod:in(Item, Q, Priority)}. 210 211 212queue_len({Mod, Q}) -> 213 Mod:len(Q). 214 215 216queue_out_num(Queue, Num) -> 217 queue_out_num(Queue, out, Num, []). 218 219queue_out_r_num(Queue, Num) -> 220 queue_out_num(Queue, out_r, Num, []). 221 222queue_out_num(Queue, _Out, 0, Acc) -> 223 {lists:reverse(Acc), Queue}; 224queue_out_num({Mod, Q}, Out, Num, Acc) -> 225 case Mod:Out(Q) of 226 {empty, Q} -> 227 {lists:reverse(Acc), {Mod, Q}}; 228 {{value, Item}, NewQ} -> 229 queue_out_num({Mod, NewQ}, Out, Num - 1, [Item | Acc]) 230 end. 231