1-module(jobs_server_tests).
2
3
4-include_lib("eunit/include/eunit.hrl").
5
6rate_test_() ->
7     {foreachx,
8      fun(Type) -> start_test_server(Type) end,
9      fun(_, _) -> stop_server() end,
10      [{{rate,1}, fun(_,_) -> [fun() -> serial(1,2,2) end] end}
11       , {{rate,   5}, fun(_,_) -> [fun() -> serial(5,5,1) end] end}
12       , {{rate,   50}, fun(_,_) -> [fun() -> serial(50,50,1) end] end}
13       , {{rate,   100}, fun(_,_) -> [fun() -> serial(100,100,1) end] end}
14       , {{rate,   300}, fun(_,_) -> [fun() -> serial(300,300,1) end] end}
15       , {{rate,   500}, fun(_,_) -> [fun() -> serial(500,500,1) end] end}
16       , {{rate,   1000}, fun(_,_) -> [fun() -> serial(1000,1000,1) end] end}
17       %% , {{rate, 100}, fun(O,_) -> [fun() -> rate_test(O,1) end] end}
18       , {{rate, 400}, fun(_,_) -> [fun() -> par_run(400,400,1) end] end}
19       , {{rate, 600}, fun(_,_) -> [fun() -> par_run(600,600,1) end] end}
20       , {{rate,1000}, fun(_,_) -> [fun() -> par_run(1000,1000,1) end] end}
21       , {{rate,2000}, fun(_,_) -> [fun() -> par_run(2000,2000,1) end] end}
22       %% , {[{rate,100},
23       %% 	   {group,50}], fun(O,_) -> [fun() -> max_rate_test(O,1) end] end}
24       , {{count,3}, fun(_,_) -> [fun() -> counter_run(30,1) end] end}
25       , {{timeout,500}, fun(_,_) -> [fun() ->
26					      ?debugVal(timeout_test(500))
27				      end] end}
28      ]}.
29
30
31
32serial(R, N, TargetRatio) ->
33    Expected = (N div R) * 1000000 * TargetRatio,
34    ?debugVal({R,N,Expected}),
35    {T,Ts} = tc(fun() -> run_jobs(q,N) end),
36    time_eval(R, N, T, Ts, Expected).
37
38par_run(R, N, TargetRatio) ->
39    Expected = (N div R) * 1000000 * TargetRatio,
40    ?debugVal({R,N,Expected}),
41    {T,Ts} = tc(fun() -> pmap(fun() ->
42				      run_job(q, one_job(time))
43			      end, N)
44		end),
45    time_eval(R, N, T, Ts, Expected).
46
47counter_run(N, Target) ->
48    ?debugVal({N, Target}),
49    {T, Ts} = tc(fun() ->
50			 pmap(fun() -> run_job(q, one_job(count)) end, N)
51		 end),
52    ?debugVal({T,Ts}).
53
54timeout_test(T) ->
55    case timer:tc(jobs, ask, [q]) of
56	{US, {error, timeout}} ->
57	    case (US div 1000) - T of
58		Diff when Diff < 5 ->
59		    ok;
60		Other ->
61		    error({timeout_too_late, Other})
62	    end;
63	Other ->
64	    error({timeout_expected, Other})
65    end.
66
67time_eval(_R, _N, T, Ts, Expected) ->
68    [{Hd,_}|Tl] = lists:sort(Ts),
69    Diffs = [X-Hd || {X,_} <- Tl],
70    Ratio = T/Expected,
71    Max = lists:max(Diffs),
72    {Mean, Variance} = time_variance(Diffs),
73    io:fwrite(user,
74	      "Time: ~p, Ratio = ~.1f, Max = ~p, "
75	      "Mean = ~.1f, Variance = ~.1f~n",
76	      [T, Ratio, Max, Mean, Variance]).
77
78
79time_variance(L) ->
80    N = length(L),
81    Mean = lists:sum(L) / N,
82    SQ = fun(X) -> X*X end,
83    {Mean, math:sqrt(lists:sum([SQ(X-Mean) || X <- L]) / N)}.
84
85
86
87%% counter_test(Count) ->
88%%     start_test_server({count,Count}),
89%%     Res = tc(fun() ->
90%% 		     pmap(fun() -> jobs:run(q, one_job(count)) end, Count * 2)
91%% 	     end),
92%%     io:fwrite(user, "~p~n", [Res]),
93%%     stop_server().
94
95
96pmap(F, N) ->
97    Pids = [spawn_monitor(fun() -> exit(F()) end) || _ <- lists:seq(1,N)],
98    collect(Pids).
99
100collect([{_P,Ref}|Ps]) ->
101    receive
102	{'DOWN', Ref, _, _, Res} ->
103	    [Res|collect(Ps)]
104    end;
105collect([]) ->
106    [].
107
108start_test_server(Conf) ->
109    start_test_server(true, Conf).
110
111start_test_server(Silent, {rate,Rate}) ->
112    start_with_conf(Silent, [{queues, [{q, [{regulators,
113					     [{rate,[
114						     {limit, Rate}]
115					      }]}
116					    %% , {mod, jobs_queue_list}
117					   ]}
118				      ]}
119			    ]),
120    Rate;
121start_test_server(Silent, [{rate,Rate},{group,Grp}]) ->
122    start_with_conf(Silent,
123		    [{group_rates, [{gr, [{limit, Grp}]}]},
124		     {queues, [{q, [{regulators,
125				     [{rate,[{limit, Rate}]},
126				      {group_rate, gr}]}
127				   ]}
128			      ]}
129		    ]),
130    Grp;
131start_test_server(Silent, {count, Count}) ->
132    start_with_conf(Silent,
133		    [{queues, [{q, [{regulators,
134				     [{counter,[
135						{limit, Count}
136					       ]
137				      }]}
138				   ]}
139			      ]}
140		    ]);
141start_test_server(Silent, {timeout, T}) ->
142    start_with_conf(Silent,
143		    [{queues, [{q, [{regulators,
144				     [{counter,[
145						{limit, 0}
146					       ]}
147				     ]},
148				    {max_time, T}
149				   ]}
150			      ]}
151		    ]).
152
153
154start_with_conf(Silent, Conf) ->
155    application:unload(jobs),
156    application:load(jobs),
157    [application:set_env(jobs, K, V) ||	{K,V} <- Conf],
158    if Silent == true ->
159	    error_logger:delete_report_handler(error_logger_tty_h);
160       true ->
161	    ok
162    end,
163    application:start(jobs).
164
165
166stop_server() ->
167    application:stop(jobs).
168
169tc(F) ->
170    T1 = jobs_lib:time_compat(),
171    R = (catch F()),
172    T2 = jobs_lib:time_compat(),
173    {timer:now_diff(T2,T1), R}.
174
175run_jobs(Q,N) ->
176    [run_job(Q, one_job(time)) || _ <- lists:seq(1,N)].
177
178run_job(Q,F) ->
179    timer:tc(jobs,run,[Q,F]).
180
181one_job(time) ->
182    fun timestamp/0;
183one_job(count) ->
184    fun() ->
185	    1
186    end.
187
188
189timestamp() ->
190    jobs_server:timestamp().
191