1%%% Copyright (C) 2009 Enrique Marcote, Miguel Rodriguez
2%%% All rights reserved.
3%%%
4%%% Redistribution and use in source and binary forms, with or without
5%%% modification, are permitted provided that the following conditions are met:
6%%%
7%%% o Redistributions of source code must retain the above copyright notice,
8%%%   this list of conditions and the following disclaimer.
9%%%
10%%% o Redistributions in binary form must reproduce the above copyright notice,
11%%%   this list of conditions and the following disclaimer in the documentation
12%%%   and/or other materials provided with the distribution.
13%%%
14%%% o Neither the name of ERLANG TRAINING AND CONSULTING nor the names of its
15%%%   contributors may be used to endorse or promote products derived from this
16%%%   software without specific prior written permission.
17%%%
18%%% THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19%%% AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20%%% IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21%%% ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22%%% LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23%%% CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24%%% SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25%%% INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26%%% CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27%%% ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28%%% POSSIBILITY OF SUCH DAMAGE.
29-module(cl_consumer).
30-behaviour(gen_server).
31
32%%% INCLUDE FILES
33
34%%% START/STOP EXPORTS
35-export([start_link/3, stop/1]).
36
37%%% RPS EXPORTS
38-export([pause/1, resume/1, rps/1, rps/2]).
39
40%%% INIT/TERMINATE EXPORTS
41-export([init/1, terminate/2]).
42
43%%% HANDLE MESSAGES EXPORTS
44-export([handle_call/3, handle_cast/2, handle_info/2]).
45
46%%% CODE UPDATE EXPORTS
47-export([code_change/3]).
48
49%%% INTERNAL EXPORTS
50-export([consumer_loop/3]).
51
52%%% MACROS
53% Setting a higher number of RPS per worker process will lead to less precise
54% RPS due to the deviations in the schedulling and timer functions.  At most
55% 40 RPS per process could be set, but keeping it low in 25 provides less
56% deviations on loaded systems.  Setting to high the priority for the worker
57% processes also leads to more accurate RPS, but it is also more dangerous.
58-define(MAX_RPS(Rps), if Rps < 25 -> Rps; true -> 25 end).  % Per worker
59
60%%% RECORDS
61-record(st, {queue_srv, req_fun, rps, consumers = []}).
62
63%%%-----------------------------------------------------------------------------
64%%% START/STOP EXPORTS
65%%%-----------------------------------------------------------------------------
66start_link(QueueSrv, ReqFun, Rps) ->
67    gen_server:start_link(?MODULE, [QueueSrv, ReqFun, Rps], []).
68
69
70stop(Pid) ->
71    gen_server:call(Pid, stop, infinity).
72
73%%%-----------------------------------------------------------------------------
74%%% RPS EXPORTS
75%%%-----------------------------------------------------------------------------
76pause(Pid) ->
77    gen_server:call(Pid, pause, infinity).
78
79
80resume(Pid) ->
81    gen_server:cast(Pid, resume).
82
83
84rps(Pid) ->
85    gen_server:call(Pid, rps, infinity).
86
87
88rps(Pid, Rps) ->
89    gen_server:cast(Pid, {rps, Rps}).
90
91%%%-----------------------------------------------------------------------------
92%%% INIT/TERMINATE EXPORTS
93%%%-----------------------------------------------------------------------------
94init([QueueSrv, ReqFun, Rps]) ->
95    L = init_consumers(QueueSrv, ReqFun, Rps),
96    {ok, #st{queue_srv = QueueSrv, req_fun = ReqFun, rps = Rps, consumers = L}}.
97
98
99terminate(_Reason, St) ->
100    lists:foreach(fun(X) -> consumer_stop(X) end, St#st.consumers).
101
102%%%-----------------------------------------------------------------------------
103%%% HANDLE MESSAGES EXPORTS
104%%%-----------------------------------------------------------------------------
105handle_call(pause, _From, #st{consumers = []} = St) ->
106    {reply, ok, St};
107handle_call(pause, _From, St) ->
108    lists:foreach(fun(X) -> consumer_stop(X) end, St#st.consumers),
109    {reply, ok, St#st{consumers = []}};
110handle_call(rps, _From, St) ->
111    {reply, St#st.rps, St};
112handle_call(stop, _From, St) ->
113    {stop, normal, ok, St}.
114
115
116handle_cast(resume, #st{consumers = []} = St) ->
117    L = init_consumers(St#st.queue_srv, St#st.req_fun, St#st.rps),
118    {noreply, St#st{consumers = L}};
119handle_cast(resume, St) ->
120    {noreply, St};
121handle_cast({rps, Rps}, #st{consumers = []} = St) ->
122    {noreply, St#st{rps = Rps}};
123handle_cast({rps, Rps}, St) ->
124    lists:foreach(fun(X) -> consumer_stop(X) end, St#st.consumers),
125    L = init_consumers(St#st.queue_srv, St#st.req_fun, Rps),
126    {noreply, St#st{rps = Rps, consumers = L}}.
127
128
129handle_info(_Info, St) ->
130    {noreply, St}.
131
132%%%-----------------------------------------------------------------------------
133%%% CODE UPDATE EXPORTS
134%%%-----------------------------------------------------------------------------
135code_change(_OldVsn, St, _Extra) ->
136    {ok, St}.
137
138%%%-----------------------------------------------------------------------------
139%%% CONSUMER FUNCTIONS
140%%%-----------------------------------------------------------------------------
141consumer(QueueSrv, ReqFun, Time, Delay) ->
142    spawn_link(fun() -> consumer_init(QueueSrv, ReqFun, Time, Delay) end).
143
144
145consumer_init(QueueSrv, ReqFun, Time, Delay) ->
146%    process_flag(priority, high),   % Only if very precise RPS is required
147    timer:sleep(Delay),
148    consumer_loop(QueueSrv, ReqFun, Time).
149
150
151consumer_loop(QueueSrv, ReqFun, Time) ->
152    erlang:send_after(Time, self(), continue),
153    ok = consumer_req(QueueSrv, ReqFun),
154    ok = consumer_wait(),
155    ?MODULE:consumer_loop(QueueSrv, ReqFun, Time).
156
157
158consumer_req(QueueSrv, ReqFun) ->
159    case cl_queue_srv:out(QueueSrv, 1) of
160        [] ->
161            ok;
162        [Item] ->
163            ReqFun(Item),
164            ok
165    end.
166
167
168consumer_stop(Pid) ->
169    Pid ! stop.
170
171
172consumer_wait() ->
173    receive
174        continue ->
175            ok;
176        stop ->
177            exit(normal);
178        _Other ->
179            consumer_wait()
180    end.
181
182%%%-----------------------------------------------------------------------------
183%%% INTERNAL FUNCTIONS
184%%%-----------------------------------------------------------------------------
185init_consumers(_QueueSrv, _ReqFun, 0) ->
186    [];
187init_consumers(QueueSrv, ReqFun, Rps) ->
188    Gap = 1000 div (Rps div ?MAX_RPS(Rps)),
189    init_consumers(QueueSrv, ReqFun, Rps, Gap, []).
190
191init_consumers(_QueueSrv, _ReqFun, 0, _Gap, Acc) ->
192    Acc;
193init_consumers(QueueSrv, ReqFun, Rps, Gap, Acc) ->
194    Pid = consumer(QueueSrv, ReqFun, 1000 div ?MAX_RPS(Rps), Gap * length(Acc)),
195    init_consumers(QueueSrv, ReqFun, Rps - ?MAX_RPS(Rps), Gap, [Pid | Acc]).
196
197