1-module(eredis_sub_tests).
2
3-include_lib("eunit/include/eunit.hrl").
4-include("eredis.hrl").
5-include("eredis_sub.hrl").
6
7-import(eredis, [create_multibulk/1]).
8
9c() ->
10    Res = eredis:start_link(),
11    ?assertMatch({ok, _}, Res),
12    {ok, C} = Res,
13    C.
14
15s() ->
16    Res = eredis_sub:start_link("127.0.0.1", 6379, ""),
17    ?assertMatch({ok, _}, Res),
18    {ok, C} = Res,
19    C.
20
21add_channels(Sub, Channels) ->
22    ok = eredis_sub:controlling_process(Sub),
23    ok = eredis_sub:subscribe(Sub, Channels),
24    lists:foreach(
25      fun (C) ->
26              receive M ->
27                      ?assertEqual({subscribed, C, Sub}, M),
28                      eredis_sub:ack_message(Sub)
29              end
30      end, Channels).
31
32
33
34
35
36
37pubsub_test() ->
38    Pub = c(),
39    Sub = s(),
40    add_channels(Sub, [<<"chan1">>, <<"chan2">>]),
41    ok = eredis_sub:controlling_process(Sub),
42
43    ?assertEqual({ok, <<"1">>}, eredis:q(Pub, ["PUBLISH", chan1, msg])),
44    receive
45        {message, _, _, _} = M ->
46            ?assertEqual({message, <<"chan1">>, <<"msg">>, Sub}, M)
47    after 10 ->
48            throw(timeout)
49    end,
50
51    receive
52        Msg ->
53            throw({unexpected_message, Msg})
54    after 5 ->
55            ok
56    end,
57    eredis_sub:stop(Sub).
58
59%% Push size so high, the queue will be used
60pubsub2_test() ->
61    Pub = c(),
62    Sub = s(),
63    add_channels(Sub, [<<"chan">>]),
64    ok = eredis_sub:controlling_process(Sub),
65    lists:foreach(
66      fun(_) ->
67              Msg = binary:copy(<<"0">>, 2048),
68              ?assertEqual({ok, <<"1">>}, eredis:q(Pub, [publish, chan, Msg]))
69      end, lists:seq(1, 500)),
70    Msgs = recv_all(Sub),
71    ?assertEqual(500, length(Msgs)),
72    eredis_sub:stop(Sub).
73
74pubsub_manage_subscribers_test() ->
75    Pub = c(),
76    Sub = s(),
77    add_channels(Sub, [<<"chan">>]),
78    unlink(Sub),
79    Self = self(),
80    ?assertMatch(#state{controlling_process={_, Self}}, get_state(Sub)),
81    S1 = subscriber(Sub),
82    ok = eredis_sub:controlling_process(Sub, S1),
83    #state{controlling_process={_, S1}} = get_state(Sub),
84    S2 = subscriber(Sub),
85    ok = eredis_sub:controlling_process(Sub, S2),
86    #state{controlling_process={_, S2}} = get_state(Sub),
87    eredis:q(Pub, ["PUBLISH", chan, msg1]),
88    S1 ! stop,
89    ok = wait_for_stop(S1),
90    eredis:q(Pub, ["PUBLISH", chan, msg2]),
91    ?assertEqual({message, <<"chan">>, <<"msg1">>, Sub}, wait_for_msg(S2)),
92    ?assertEqual({message, <<"chan">>, <<"msg2">>, Sub}, wait_for_msg(S2)),
93    S2 ! stop,
94    ok = wait_for_stop(S2),
95    Ref = erlang:monitor(process, Sub),
96    receive {'DOWN', Ref, process, Sub, _} -> ok end.
97
98
99pubsub_connect_disconnect_messages_test() ->
100    Pub = c(),
101    Sub = s(),
102    add_channels(Sub, [<<"chan">>]),
103    S = subscriber(Sub),
104    ok = eredis_sub:controlling_process(Sub, S),
105    eredis:q(Pub, ["PUBLISH", chan, msg]),
106    wait_for_msg(S),
107    #state{socket=Sock} = get_state(Sub),
108    gen_tcp:close(Sock),
109    Sub ! {tcp_closed, Sock},
110    ?assertEqual({eredis_disconnected, Sub}, wait_for_msg(S)),
111    ?assertEqual({eredis_reconnect_attempt, Sub}, wait_for_msg(S)),
112    ?assertEqual({eredis_connected, Sub}, wait_for_msg(S)),
113    eredis_sub:stop(Sub).
114
115
116
117drop_queue_test() ->
118    Pub = c(),
119    {ok, Sub} = eredis_sub:start_link("127.0.0.1", 6379, "", 100, 10, drop),
120    add_channels(Sub, [<<"foo">>]),
121    ok = eredis_sub:controlling_process(Sub),
122
123    [eredis:q(Pub, [publish, foo, N]) || N <- lists:seq(1, 12)],
124
125    receive M1 -> ?assertEqual({message,<<"foo">>,<<"1">>, Sub}, M1) end,
126    receive M2 -> ?assertEqual({dropped, 11}, M2) end,
127    eredis_sub:stop(Sub).
128
129
130crash_queue_test() ->
131    Pub = c(),
132    {ok, Sub} = eredis_sub:start_link("127.0.0.1", 6379, "", 100, 10, exit),
133    add_channels(Sub, [<<"foo">>]),
134
135    true = unlink(Sub),
136    ok = eredis_sub:controlling_process(Sub),
137    Ref = erlang:monitor(process, Sub),
138
139    [eredis:q(Pub, [publish, foo, N]) || N <- lists:seq(1, 12)],
140
141    receive M1 -> ?assertEqual({message,<<"foo">>,<<"1">>, Sub}, M1) end,
142    receive M2 -> ?assertEqual({'DOWN', Ref, process, Sub, max_queue_size}, M2) end.
143
144
145
146dynamic_channels_test() ->
147    Pub = c(),
148    Sub = s(),
149    ok = eredis_sub:controlling_process(Sub),
150
151    eredis:q(Pub, [publish, newchan, foo]),
152
153    receive {message, <<"foo">>, _, _} -> ?assert(false)
154    after 5 -> ok end,
155
156    %% We do the following twice to show that subscribing to the same channel
157    %% doesn't cause the channel to show up twice
158    lists:foreach(fun(_) ->
159        eredis_sub:subscribe(Sub, [<<"newchan">>, <<"otherchan">>]),
160        receive M1 -> ?assertEqual({subscribed, <<"newchan">>, Sub}, M1) end,
161        eredis_sub:ack_message(Sub),
162        receive M2 -> ?assertEqual({subscribed, <<"otherchan">>, Sub}, M2) end,
163        eredis_sub:ack_message(Sub),
164
165        {ok, Channels} = eredis_sub:channels(Sub),
166        ?assertEqual(true, lists:member(<<"otherchan">>, Channels)),
167        ?assertEqual(true, lists:member(<<"newchan">>, Channels)),
168        ?assertEqual(2, length(Channels))
169    end, lists:seq(0, 1)),
170
171    eredis:q(Pub, [publish, newchan, foo]),
172    ?assertEqual([{message, <<"newchan">>, <<"foo">>, Sub}], recv_all(Sub)),
173    eredis:q(Pub, [publish, otherchan, foo]),
174    ?assertEqual([{message, <<"otherchan">>, <<"foo">>, Sub}], recv_all(Sub)),
175
176    eredis_sub:unsubscribe(Sub, [<<"otherchan">>]),
177    eredis_sub:ack_message(Sub),
178    receive M3 -> ?assertEqual({unsubscribed, <<"otherchan">>, Sub}, M3) end,
179
180    ?assertEqual({ok, [<<"newchan">>]}, eredis_sub:channels(Sub)).
181
182
183recv_all(Sub) ->
184    recv_all(Sub, []).
185
186recv_all(Sub, Acc) ->
187    receive
188        {message, _, _, _} = InMsg ->
189            eredis_sub:ack_message(Sub),
190            recv_all(Sub, [InMsg | Acc])
191    after 5 ->
192              lists:reverse(Acc)
193    end.
194
195subscriber(Client) ->
196    Test = self(),
197    Pid = spawn(fun () -> subscriber(Client, Test) end),
198    spawn(fun() ->
199                  Ref = erlang:monitor(process, Pid),
200                  receive
201                      {'DOWN', Ref, _, _, _} ->
202                          Test ! {stopped, Pid}
203                  end
204          end),
205    Pid.
206
207subscriber(Client, Test) ->
208    receive
209        stop ->
210            ok;
211        Msg ->
212            Test ! {got_message, self(), Msg},
213            eredis_sub:ack_message(Client),
214            subscriber(Client, Test)
215    end.
216
217wait_for_msg(Subscriber) ->
218    receive
219        {got_message, Subscriber, Msg} ->
220            Msg
221    end.
222
223wait_for_stop(Subscriber) ->
224    receive
225        {stopped, Subscriber} ->
226            ok
227    end.
228
229get_state(Pid)
230  when is_pid(Pid) ->
231    {status, _, _, [_, _, _, _, State]} = sys:get_status(Pid),
232    get_state(State);
233get_state([{data, [{"State", State}]} | _]) ->
234    State;
235get_state([_|Rest]) ->
236    get_state(Rest).
237
238
239
240
241
242% Tests for Pattern Subscribe
243
244add_channels_pattern(Sub, Channels) ->
245    ok = eredis_sub:controlling_process(Sub),
246    ok = eredis_sub:psubscribe(Sub, Channels),
247    lists:foreach(
248      fun (C) ->
249              receive M ->
250                      ?assertEqual({subscribed, C, Sub}, M),
251                      eredis_sub:ack_message(Sub)
252              end
253      end, Channels).
254
255
256
257
258
259pubsub_pattern_test() ->
260    Pub = c(),
261    Sub = s(),
262    add_channels_pattern(Sub, [<<"chan1*">>, <<"chan2*">>]),
263    ok = eredis_sub:controlling_process(Sub),
264
265    ?assertEqual({ok, <<"1">>}, eredis:q(Pub, ["PUBLISH", <<"chan123">>, <<"msg">>])),
266    receive
267        {pmessage, _Pattern, _Channel, _Message, _} = M ->
268            ?assertEqual({pmessage, <<"chan1*">>,<<"chan123">>, <<"msg">>, Sub}, M)
269    after 10 ->
270            throw(timeout)
271    end,
272
273    eredis_sub:punsubscribe(Sub, [<<"chan1*">> , <<"chan2*">>]),
274    eredis_sub:ack_message(Sub),
275    eredis_sub:ack_message(Sub),
276    receive {unsubscribed,_,_} = M2 -> ?assertEqual({unsubscribed, <<"chan1*">>, Sub}, M2) end,
277    eredis_sub:ack_message(Sub),
278    receive {unsubscribed,_,_} =  M3 -> ?assertEqual({unsubscribed, <<"chan2*">>, Sub}, M3) end,
279    eredis_sub:ack_message(Sub),
280
281    ?assertEqual({ok, <<"0">>}, eredis:q(Pub, ["PUBLISH", <<"chan123">>, <<"msg">>])),
282    receive
283        Msg -> throw({unexpected_message, Msg})
284    after 10 ->
285            ok
286    end,
287
288    eredis_sub:stop(Sub).
289
290
291
292
293