1%%% Copyright (C) 2009 Enrique Marcote, Miguel Rodriguez 2%%% All rights reserved. 3%%% 4%%% Redistribution and use in source and binary forms, with or without 5%%% modification, are permitted provided that the following conditions are met: 6%%% 7%%% o Redistributions of source code must retain the above copyright notice, 8%%% this list of conditions and the following disclaimer. 9%%% 10%%% o Redistributions in binary form must reproduce the above copyright notice, 11%%% this list of conditions and the following disclaimer in the documentation 12%%% and/or other materials provided with the distribution. 13%%% 14%%% o Neither the name of ERLANG TRAINING AND CONSULTING nor the names of its 15%%% contributors may be used to endorse or promote products derived from this 16%%% software without specific prior written permission. 17%%% 18%%% THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 19%%% AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20%%% IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21%%% ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 22%%% LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23%%% CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24%%% SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25%%% INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26%%% CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27%%% ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28%%% POSSIBILITY OF SUCH DAMAGE. 29-module(cl_consumer). 30-behaviour(gen_server). 31 32%%% INCLUDE FILES 33 34%%% START/STOP EXPORTS 35-export([start_link/3, stop/1]). 36 37%%% RPS EXPORTS 38-export([pause/1, resume/1, rps/1, rps/2]). 39 40%%% INIT/TERMINATE EXPORTS 41-export([init/1, terminate/2]). 42 43%%% HANDLE MESSAGES EXPORTS 44-export([handle_call/3, handle_cast/2, handle_info/2]). 45 46%%% CODE UPDATE EXPORTS 47-export([code_change/3]). 48 49%%% INTERNAL EXPORTS 50-export([consumer_loop/3]). 51 52%%% MACROS 53% Setting a higher number of RPS per worker process will lead to less precise 54% RPS due to the deviations in the schedulling and timer functions. At most 55% 40 RPS per process could be set, but keeping it low in 25 provides less 56% deviations on loaded systems. Setting to high the priority for the worker 57% processes also leads to more accurate RPS, but it is also more dangerous. 58-define(MAX_RPS(Rps), if Rps < 25 -> Rps; true -> 25 end). % Per worker 59 60%%% RECORDS 61-record(st, {queue_srv, req_fun, rps, consumers = []}). 62 63%%%----------------------------------------------------------------------------- 64%%% START/STOP EXPORTS 65%%%----------------------------------------------------------------------------- 66start_link(QueueSrv, ReqFun, Rps) -> 67 gen_server:start_link(?MODULE, [QueueSrv, ReqFun, Rps], []). 68 69 70stop(Pid) -> 71 gen_server:call(Pid, stop, infinity). 72 73%%%----------------------------------------------------------------------------- 74%%% RPS EXPORTS 75%%%----------------------------------------------------------------------------- 76pause(Pid) -> 77 gen_server:call(Pid, pause, infinity). 78 79 80resume(Pid) -> 81 gen_server:cast(Pid, resume). 82 83 84rps(Pid) -> 85 gen_server:call(Pid, rps, infinity). 86 87 88rps(Pid, Rps) -> 89 gen_server:cast(Pid, {rps, Rps}). 90 91%%%----------------------------------------------------------------------------- 92%%% INIT/TERMINATE EXPORTS 93%%%----------------------------------------------------------------------------- 94init([QueueSrv, ReqFun, Rps]) -> 95 L = init_consumers(QueueSrv, ReqFun, Rps), 96 {ok, #st{queue_srv = QueueSrv, req_fun = ReqFun, rps = Rps, consumers = L}}. 97 98 99terminate(_Reason, St) -> 100 lists:foreach(fun(X) -> consumer_stop(X) end, St#st.consumers). 101 102%%%----------------------------------------------------------------------------- 103%%% HANDLE MESSAGES EXPORTS 104%%%----------------------------------------------------------------------------- 105handle_call(pause, _From, #st{consumers = []} = St) -> 106 {reply, ok, St}; 107handle_call(pause, _From, St) -> 108 lists:foreach(fun(X) -> consumer_stop(X) end, St#st.consumers), 109 {reply, ok, St#st{consumers = []}}; 110handle_call(rps, _From, St) -> 111 {reply, St#st.rps, St}; 112handle_call(stop, _From, St) -> 113 {stop, normal, ok, St}. 114 115 116handle_cast(resume, #st{consumers = []} = St) -> 117 L = init_consumers(St#st.queue_srv, St#st.req_fun, St#st.rps), 118 {noreply, St#st{consumers = L}}; 119handle_cast(resume, St) -> 120 {noreply, St}; 121handle_cast({rps, Rps}, #st{consumers = []} = St) -> 122 {noreply, St#st{rps = Rps}}; 123handle_cast({rps, Rps}, St) -> 124 lists:foreach(fun(X) -> consumer_stop(X) end, St#st.consumers), 125 L = init_consumers(St#st.queue_srv, St#st.req_fun, Rps), 126 {noreply, St#st{rps = Rps, consumers = L}}. 127 128 129handle_info(_Info, St) -> 130 {noreply, St}. 131 132%%%----------------------------------------------------------------------------- 133%%% CODE UPDATE EXPORTS 134%%%----------------------------------------------------------------------------- 135code_change(_OldVsn, St, _Extra) -> 136 {ok, St}. 137 138%%%----------------------------------------------------------------------------- 139%%% CONSUMER FUNCTIONS 140%%%----------------------------------------------------------------------------- 141consumer(QueueSrv, ReqFun, Time, Delay) -> 142 spawn_link(fun() -> consumer_init(QueueSrv, ReqFun, Time, Delay) end). 143 144 145consumer_init(QueueSrv, ReqFun, Time, Delay) -> 146% process_flag(priority, high), % Only if very precise RPS is required 147 timer:sleep(Delay), 148 consumer_loop(QueueSrv, ReqFun, Time). 149 150 151consumer_loop(QueueSrv, ReqFun, Time) -> 152 erlang:send_after(Time, self(), continue), 153 ok = consumer_req(QueueSrv, ReqFun), 154 ok = consumer_wait(), 155 ?MODULE:consumer_loop(QueueSrv, ReqFun, Time). 156 157 158consumer_req(QueueSrv, ReqFun) -> 159 case cl_queue_srv:out(QueueSrv, 1) of 160 [] -> 161 ok; 162 [Item] -> 163 ReqFun(Item), 164 ok 165 end. 166 167 168consumer_stop(Pid) -> 169 Pid ! stop. 170 171 172consumer_wait() -> 173 receive 174 continue -> 175 ok; 176 stop -> 177 exit(normal); 178 _Other -> 179 consumer_wait() 180 end. 181 182%%%----------------------------------------------------------------------------- 183%%% INTERNAL FUNCTIONS 184%%%----------------------------------------------------------------------------- 185init_consumers(_QueueSrv, _ReqFun, 0) -> 186 []; 187init_consumers(QueueSrv, ReqFun, Rps) -> 188 Gap = 1000 div (Rps div ?MAX_RPS(Rps)), 189 init_consumers(QueueSrv, ReqFun, Rps, Gap, []). 190 191init_consumers(_QueueSrv, _ReqFun, 0, _Gap, Acc) -> 192 Acc; 193init_consumers(QueueSrv, ReqFun, Rps, Gap, Acc) -> 194 Pid = consumer(QueueSrv, ReqFun, 1000 div ?MAX_RPS(Rps), Gap * length(Acc)), 195 init_consumers(QueueSrv, ReqFun, Rps - ?MAX_RPS(Rps), Gap, [Pid | Acc]). 196 197