1-module(jobs_server_tests). 2 3 4-include_lib("eunit/include/eunit.hrl"). 5 6rate_test_() -> 7 {foreachx, 8 fun(Type) -> start_test_server(Type) end, 9 fun(_, _) -> stop_server() end, 10 [{{rate,1}, fun(_,_) -> [fun() -> serial(1,2,2) end] end} 11 , {{rate, 5}, fun(_,_) -> [fun() -> serial(5,5,1) end] end} 12 , {{rate, 50}, fun(_,_) -> [fun() -> serial(50,50,1) end] end} 13 , {{rate, 100}, fun(_,_) -> [fun() -> serial(100,100,1) end] end} 14 , {{rate, 300}, fun(_,_) -> [fun() -> serial(300,300,1) end] end} 15 , {{rate, 500}, fun(_,_) -> [fun() -> serial(500,500,1) end] end} 16 , {{rate, 1000}, fun(_,_) -> [fun() -> serial(1000,1000,1) end] end} 17 %% , {{rate, 100}, fun(O,_) -> [fun() -> rate_test(O,1) end] end} 18 , {{rate, 400}, fun(_,_) -> [fun() -> par_run(400,400,1) end] end} 19 , {{rate, 600}, fun(_,_) -> [fun() -> par_run(600,600,1) end] end} 20 , {{rate,1000}, fun(_,_) -> [fun() -> par_run(1000,1000,1) end] end} 21 , {{rate,2000}, fun(_,_) -> [fun() -> par_run(2000,2000,1) end] end} 22 %% , {[{rate,100}, 23 %% {group,50}], fun(O,_) -> [fun() -> max_rate_test(O,1) end] end} 24 , {{count,3}, fun(_,_) -> [fun() -> counter_run(30,1) end] end} 25 , {{timeout,500}, fun(_,_) -> [fun() -> 26 ?debugVal(timeout_test(500)) 27 end] end} 28 ]}. 29 30 31 32serial(R, N, TargetRatio) -> 33 Expected = (N div R) * 1000000 * TargetRatio, 34 ?debugVal({R,N,Expected}), 35 {T,Ts} = tc(fun() -> run_jobs(q,N) end), 36 time_eval(R, N, T, Ts, Expected). 37 38par_run(R, N, TargetRatio) -> 39 Expected = (N div R) * 1000000 * TargetRatio, 40 ?debugVal({R,N,Expected}), 41 {T,Ts} = tc(fun() -> pmap(fun() -> 42 run_job(q, one_job(time)) 43 end, N) 44 end), 45 time_eval(R, N, T, Ts, Expected). 46 47counter_run(N, Target) -> 48 ?debugVal({N, Target}), 49 {T, Ts} = tc(fun() -> 50 pmap(fun() -> run_job(q, one_job(count)) end, N) 51 end), 52 ?debugVal({T,Ts}). 53 54timeout_test(T) -> 55 case timer:tc(jobs, ask, [q]) of 56 {US, {error, timeout}} -> 57 case (US div 1000) - T of 58 Diff when Diff < 5 -> 59 ok; 60 Other -> 61 error({timeout_too_late, Other}) 62 end; 63 Other -> 64 error({timeout_expected, Other}) 65 end. 66 67time_eval(_R, _N, T, Ts, Expected) -> 68 [{Hd,_}|Tl] = lists:sort(Ts), 69 Diffs = [X-Hd || {X,_} <- Tl], 70 Ratio = T/Expected, 71 Max = lists:max(Diffs), 72 {Mean, Variance} = time_variance(Diffs), 73 io:fwrite(user, 74 "Time: ~p, Ratio = ~.1f, Max = ~p, " 75 "Mean = ~.1f, Variance = ~.1f~n", 76 [T, Ratio, Max, Mean, Variance]). 77 78 79time_variance(L) -> 80 N = length(L), 81 Mean = lists:sum(L) / N, 82 SQ = fun(X) -> X*X end, 83 {Mean, math:sqrt(lists:sum([SQ(X-Mean) || X <- L]) / N)}. 84 85 86 87%% counter_test(Count) -> 88%% start_test_server({count,Count}), 89%% Res = tc(fun() -> 90%% pmap(fun() -> jobs:run(q, one_job(count)) end, Count * 2) 91%% end), 92%% io:fwrite(user, "~p~n", [Res]), 93%% stop_server(). 94 95 96pmap(F, N) -> 97 Pids = [spawn_monitor(fun() -> exit(F()) end) || _ <- lists:seq(1,N)], 98 collect(Pids). 99 100collect([{_P,Ref}|Ps]) -> 101 receive 102 {'DOWN', Ref, _, _, Res} -> 103 [Res|collect(Ps)] 104 end; 105collect([]) -> 106 []. 107 108start_test_server(Conf) -> 109 start_test_server(true, Conf). 110 111start_test_server(Silent, {rate,Rate}) -> 112 start_with_conf(Silent, [{queues, [{q, [{regulators, 113 [{rate,[ 114 {limit, Rate}] 115 }]} 116 %% , {mod, jobs_queue_list} 117 ]} 118 ]} 119 ]), 120 Rate; 121start_test_server(Silent, [{rate,Rate},{group,Grp}]) -> 122 start_with_conf(Silent, 123 [{group_rates, [{gr, [{limit, Grp}]}]}, 124 {queues, [{q, [{regulators, 125 [{rate,[{limit, Rate}]}, 126 {group_rate, gr}]} 127 ]} 128 ]} 129 ]), 130 Grp; 131start_test_server(Silent, {count, Count}) -> 132 start_with_conf(Silent, 133 [{queues, [{q, [{regulators, 134 [{counter,[ 135 {limit, Count} 136 ] 137 }]} 138 ]} 139 ]} 140 ]); 141start_test_server(Silent, {timeout, T}) -> 142 start_with_conf(Silent, 143 [{queues, [{q, [{regulators, 144 [{counter,[ 145 {limit, 0} 146 ]} 147 ]}, 148 {max_time, T} 149 ]} 150 ]} 151 ]). 152 153 154start_with_conf(Silent, Conf) -> 155 application:unload(jobs), 156 application:load(jobs), 157 [application:set_env(jobs, K, V) || {K,V} <- Conf], 158 if Silent == true -> 159 error_logger:delete_report_handler(error_logger_tty_h); 160 true -> 161 ok 162 end, 163 application:start(jobs). 164 165 166stop_server() -> 167 application:stop(jobs). 168 169tc(F) -> 170 T1 = jobs_lib:time_compat(), 171 R = (catch F()), 172 T2 = jobs_lib:time_compat(), 173 {timer:now_diff(T2,T1), R}. 174 175run_jobs(Q,N) -> 176 [run_job(Q, one_job(time)) || _ <- lists:seq(1,N)]. 177 178run_job(Q,F) -> 179 timer:tc(jobs,run,[Q,F]). 180 181one_job(time) -> 182 fun timestamp/0; 183one_job(count) -> 184 fun() -> 185 1 186 end. 187 188 189timestamp() -> 190 jobs_server:timestamp(). 191