1-module(eredis_sub_tests). 2 3-include_lib("eunit/include/eunit.hrl"). 4-include("eredis.hrl"). 5-include("eredis_sub.hrl"). 6 7-import(eredis, [create_multibulk/1]). 8 9c() -> 10 Res = eredis:start_link(), 11 ?assertMatch({ok, _}, Res), 12 {ok, C} = Res, 13 C. 14 15s() -> 16 Res = eredis_sub:start_link("127.0.0.1", 6379, ""), 17 ?assertMatch({ok, _}, Res), 18 {ok, C} = Res, 19 C. 20 21add_channels(Sub, Channels) -> 22 ok = eredis_sub:controlling_process(Sub), 23 ok = eredis_sub:subscribe(Sub, Channels), 24 lists:foreach( 25 fun (C) -> 26 receive M -> 27 ?assertEqual({subscribed, C, Sub}, M), 28 eredis_sub:ack_message(Sub) 29 end 30 end, Channels). 31 32 33 34 35 36 37pubsub_test() -> 38 Pub = c(), 39 Sub = s(), 40 add_channels(Sub, [<<"chan1">>, <<"chan2">>]), 41 ok = eredis_sub:controlling_process(Sub), 42 43 ?assertEqual({ok, <<"1">>}, eredis:q(Pub, ["PUBLISH", chan1, msg])), 44 receive 45 {message, _, _, _} = M -> 46 ?assertEqual({message, <<"chan1">>, <<"msg">>, Sub}, M) 47 after 10 -> 48 throw(timeout) 49 end, 50 51 receive 52 Msg -> 53 throw({unexpected_message, Msg}) 54 after 5 -> 55 ok 56 end, 57 eredis_sub:stop(Sub). 58 59%% Push size so high, the queue will be used 60pubsub2_test() -> 61 Pub = c(), 62 Sub = s(), 63 add_channels(Sub, [<<"chan">>]), 64 ok = eredis_sub:controlling_process(Sub), 65 lists:foreach( 66 fun(_) -> 67 Msg = binary:copy(<<"0">>, 2048), 68 ?assertEqual({ok, <<"1">>}, eredis:q(Pub, [publish, chan, Msg])) 69 end, lists:seq(1, 500)), 70 Msgs = recv_all(Sub), 71 ?assertEqual(500, length(Msgs)), 72 eredis_sub:stop(Sub). 73 74pubsub_manage_subscribers_test() -> 75 Pub = c(), 76 Sub = s(), 77 add_channels(Sub, [<<"chan">>]), 78 unlink(Sub), 79 Self = self(), 80 ?assertMatch(#state{controlling_process={_, Self}}, get_state(Sub)), 81 S1 = subscriber(Sub), 82 ok = eredis_sub:controlling_process(Sub, S1), 83 #state{controlling_process={_, S1}} = get_state(Sub), 84 S2 = subscriber(Sub), 85 ok = eredis_sub:controlling_process(Sub, S2), 86 #state{controlling_process={_, S2}} = get_state(Sub), 87 eredis:q(Pub, ["PUBLISH", chan, msg1]), 88 S1 ! stop, 89 ok = wait_for_stop(S1), 90 eredis:q(Pub, ["PUBLISH", chan, msg2]), 91 ?assertEqual({message, <<"chan">>, <<"msg1">>, Sub}, wait_for_msg(S2)), 92 ?assertEqual({message, <<"chan">>, <<"msg2">>, Sub}, wait_for_msg(S2)), 93 S2 ! stop, 94 ok = wait_for_stop(S2), 95 Ref = erlang:monitor(process, Sub), 96 receive {'DOWN', Ref, process, Sub, _} -> ok end. 97 98 99pubsub_connect_disconnect_messages_test() -> 100 Pub = c(), 101 Sub = s(), 102 add_channels(Sub, [<<"chan">>]), 103 S = subscriber(Sub), 104 ok = eredis_sub:controlling_process(Sub, S), 105 eredis:q(Pub, ["PUBLISH", chan, msg]), 106 wait_for_msg(S), 107 #state{socket=Sock} = get_state(Sub), 108 gen_tcp:close(Sock), 109 Sub ! {tcp_closed, Sock}, 110 ?assertEqual({eredis_disconnected, Sub}, wait_for_msg(S)), 111 ?assertEqual({eredis_reconnect_attempt, Sub}, wait_for_msg(S)), 112 ?assertEqual({eredis_connected, Sub}, wait_for_msg(S)), 113 eredis_sub:stop(Sub). 114 115 116 117drop_queue_test() -> 118 Pub = c(), 119 {ok, Sub} = eredis_sub:start_link("127.0.0.1", 6379, "", 100, 10, drop), 120 add_channels(Sub, [<<"foo">>]), 121 ok = eredis_sub:controlling_process(Sub), 122 123 [eredis:q(Pub, [publish, foo, N]) || N <- lists:seq(1, 12)], 124 125 receive M1 -> ?assertEqual({message,<<"foo">>,<<"1">>, Sub}, M1) end, 126 receive M2 -> ?assertEqual({dropped, 11}, M2) end, 127 eredis_sub:stop(Sub). 128 129 130crash_queue_test() -> 131 Pub = c(), 132 {ok, Sub} = eredis_sub:start_link("127.0.0.1", 6379, "", 100, 10, exit), 133 add_channels(Sub, [<<"foo">>]), 134 135 true = unlink(Sub), 136 ok = eredis_sub:controlling_process(Sub), 137 Ref = erlang:monitor(process, Sub), 138 139 [eredis:q(Pub, [publish, foo, N]) || N <- lists:seq(1, 12)], 140 141 receive M1 -> ?assertEqual({message,<<"foo">>,<<"1">>, Sub}, M1) end, 142 receive M2 -> ?assertEqual({'DOWN', Ref, process, Sub, max_queue_size}, M2) end. 143 144 145 146dynamic_channels_test() -> 147 Pub = c(), 148 Sub = s(), 149 ok = eredis_sub:controlling_process(Sub), 150 151 eredis:q(Pub, [publish, newchan, foo]), 152 153 receive {message, <<"foo">>, _, _} -> ?assert(false) 154 after 5 -> ok end, 155 156 %% We do the following twice to show that subscribing to the same channel 157 %% doesn't cause the channel to show up twice 158 lists:foreach(fun(_) -> 159 eredis_sub:subscribe(Sub, [<<"newchan">>, <<"otherchan">>]), 160 receive M1 -> ?assertEqual({subscribed, <<"newchan">>, Sub}, M1) end, 161 eredis_sub:ack_message(Sub), 162 receive M2 -> ?assertEqual({subscribed, <<"otherchan">>, Sub}, M2) end, 163 eredis_sub:ack_message(Sub), 164 165 {ok, Channels} = eredis_sub:channels(Sub), 166 ?assertEqual(true, lists:member(<<"otherchan">>, Channels)), 167 ?assertEqual(true, lists:member(<<"newchan">>, Channels)), 168 ?assertEqual(2, length(Channels)) 169 end, lists:seq(0, 1)), 170 171 eredis:q(Pub, [publish, newchan, foo]), 172 ?assertEqual([{message, <<"newchan">>, <<"foo">>, Sub}], recv_all(Sub)), 173 eredis:q(Pub, [publish, otherchan, foo]), 174 ?assertEqual([{message, <<"otherchan">>, <<"foo">>, Sub}], recv_all(Sub)), 175 176 eredis_sub:unsubscribe(Sub, [<<"otherchan">>]), 177 eredis_sub:ack_message(Sub), 178 receive M3 -> ?assertEqual({unsubscribed, <<"otherchan">>, Sub}, M3) end, 179 180 ?assertEqual({ok, [<<"newchan">>]}, eredis_sub:channels(Sub)). 181 182 183recv_all(Sub) -> 184 recv_all(Sub, []). 185 186recv_all(Sub, Acc) -> 187 receive 188 {message, _, _, _} = InMsg -> 189 eredis_sub:ack_message(Sub), 190 recv_all(Sub, [InMsg | Acc]) 191 after 5 -> 192 lists:reverse(Acc) 193 end. 194 195subscriber(Client) -> 196 Test = self(), 197 Pid = spawn(fun () -> subscriber(Client, Test) end), 198 spawn(fun() -> 199 Ref = erlang:monitor(process, Pid), 200 receive 201 {'DOWN', Ref, _, _, _} -> 202 Test ! {stopped, Pid} 203 end 204 end), 205 Pid. 206 207subscriber(Client, Test) -> 208 receive 209 stop -> 210 ok; 211 Msg -> 212 Test ! {got_message, self(), Msg}, 213 eredis_sub:ack_message(Client), 214 subscriber(Client, Test) 215 end. 216 217wait_for_msg(Subscriber) -> 218 receive 219 {got_message, Subscriber, Msg} -> 220 Msg 221 end. 222 223wait_for_stop(Subscriber) -> 224 receive 225 {stopped, Subscriber} -> 226 ok 227 end. 228 229get_state(Pid) 230 when is_pid(Pid) -> 231 {status, _, _, [_, _, _, _, State]} = sys:get_status(Pid), 232 get_state(State); 233get_state([{data, [{"State", State}]} | _]) -> 234 State; 235get_state([_|Rest]) -> 236 get_state(Rest). 237 238 239 240 241 242% Tests for Pattern Subscribe 243 244add_channels_pattern(Sub, Channels) -> 245 ok = eredis_sub:controlling_process(Sub), 246 ok = eredis_sub:psubscribe(Sub, Channels), 247 lists:foreach( 248 fun (C) -> 249 receive M -> 250 ?assertEqual({subscribed, C, Sub}, M), 251 eredis_sub:ack_message(Sub) 252 end 253 end, Channels). 254 255 256 257 258 259pubsub_pattern_test() -> 260 Pub = c(), 261 Sub = s(), 262 add_channels_pattern(Sub, [<<"chan1*">>, <<"chan2*">>]), 263 ok = eredis_sub:controlling_process(Sub), 264 265 ?assertEqual({ok, <<"1">>}, eredis:q(Pub, ["PUBLISH", <<"chan123">>, <<"msg">>])), 266 receive 267 {pmessage, _Pattern, _Channel, _Message, _} = M -> 268 ?assertEqual({pmessage, <<"chan1*">>,<<"chan123">>, <<"msg">>, Sub}, M) 269 after 10 -> 270 throw(timeout) 271 end, 272 273 eredis_sub:punsubscribe(Sub, [<<"chan1*">> , <<"chan2*">>]), 274 eredis_sub:ack_message(Sub), 275 eredis_sub:ack_message(Sub), 276 receive {unsubscribed,_,_} = M2 -> ?assertEqual({unsubscribed, <<"chan1*">>, Sub}, M2) end, 277 eredis_sub:ack_message(Sub), 278 receive {unsubscribed,_,_} = M3 -> ?assertEqual({unsubscribed, <<"chan2*">>, Sub}, M3) end, 279 eredis_sub:ack_message(Sub), 280 281 ?assertEqual({ok, <<"0">>}, eredis:q(Pub, ["PUBLISH", <<"chan123">>, <<"msg">>])), 282 receive 283 Msg -> throw({unexpected_message, Msg}) 284 after 10 -> 285 ok 286 end, 287 288 eredis_sub:stop(Sub). 289 290 291 292 293