1%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
2%% --------------------------------------------------
3%% This file is provided to you under the Apache License,
4%% Version 2.0 (the "License"); you may not use this file
5%% except in compliance with the License.  You may obtain
6%% a copy of the License at
7%%
8%%   http://www.apache.org/licenses/LICENSE-2.0
9%%
10%% Unless required by applicable law or agreed to in writing,
11%% software distributed under the License is distributed on an
12%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
13%% KIND, either express or implied.  See the License for the
14%% specific language governing permissions and limitations
15%% under the License.
16%% --------------------------------------------------
17%%
18%% @author Ulf Wiger <ulf@wiger.net>
19%%
20-module(gproc_dist_tests).
21
22-ifdef(TEST).
23-include_lib("eunit/include/eunit.hrl").
24-export([t_spawn/1, t_spawn_reg/2]).
25
26-define(f(E), fun() -> ?debugVal(E) end).
27
28dist_test_() ->
29    {timeout, 120,
30     [
31      %% {setup,
32      %%  fun dist_setup/0,
33      %%  fun dist_cleanup/1,
34      %%  fun(skip) -> [];
35      %%     (Ns) when is_list(Ns) ->
36      %%          {inorder, basic_tests(Ns)}
37      %%  end
38      %% },
39      {foreach,
40       fun dist_setup/0,
41       fun dist_cleanup/1,
42       [
43        fun(Ns) ->
44                [{inorder, basic_tests(Ns)}]
45        end,
46        fun(Ns) ->
47                tests(Ns, [?f(t_sync_cand_dies(Ns))])
48        end,
49        fun(Ns) ->
50                tests(Ns, [?f(t_fail_node(Ns))])
51        end,
52        fun(Ns) ->
53                tests(Ns, [{timeout, 15, ?f(t_master_dies(Ns))}])
54        end
55       ]}
56     ]}.
57
58tests(skip, _) ->
59    [];
60tests(_, L) ->
61    L.
62
63basic_tests(skip) ->
64    [];
65basic_tests(Ns) ->
66    [
67     ?f(t_simple_reg(Ns)),
68     ?f(t_simple_reg_other(Ns)),
69     ?f(t_simple_ensure(Ns)),
70     ?f(t_simple_ensure_other(Ns)),
71     ?f(t_simple_reg_or_locate(Ns)),
72     ?f(t_simple_counter(Ns)),
73     ?f(t_aggr_counter(Ns)),
74     ?f(t_awaited_aggr_counter(Ns)),
75     ?f(t_simple_resource_count(Ns)),
76     ?f(t_awaited_resource_count(Ns)),
77     ?f(t_resource_count_on_zero(Ns)),
78     ?f(t_update_counters(Ns)),
79     ?f(t_shared_counter(Ns)),
80     ?f(t_prop(Ns)),
81     ?f(t_mreg(Ns)),
82     ?f(t_await_reg(Ns)),
83     ?f(t_await_self(Ns)),
84     ?f(t_await_reg_exists(Ns)),
85     ?f(t_give_away(Ns)),
86     ?f(t_sync(Ns)),
87     ?f(t_monitor(Ns)),
88     ?f(t_standby_monitor(Ns)),
89     ?f(t_standby_monitor_unreg(Ns)),
90     ?f(t_follow_monitor(Ns)),
91     ?f(t_monitor_demonitor(Ns)),
92     ?f(t_subscribe(Ns))
93    ].
94
95dist_setup() ->
96    case run_dist_tests() of
97        true ->
98            Ns = start_slaves([dist_test_n1, dist_test_n2, dist_test_n3]),
99            ?assertMatch({[ok,ok,ok],[]},
100                         rpc:multicall(Ns, application, set_env,
101                                       [gproc, gproc_dist, Ns])),
102            ?assertMatch({[ok,ok,ok],[]},
103                         rpc:multicall(
104                           Ns, application, start, [gproc])),
105            Ns;
106        false ->
107            skip
108    end.
109
110dist_cleanup(skip) ->
111    ok;
112dist_cleanup(Ns) ->
113    [slave:stop(N) || N <- Ns],
114    ok.
115
116run_dist_tests() ->
117    case os:getenv("GPROC_DIST") of
118        "true" -> true;
119	"false" -> false;
120	false ->
121	    case code:ensure_loaded(gen_leader) of
122		{error, nofile} ->
123		    false;
124		_ ->
125		    true
126	    end
127    end.
128
129-define(T_NAME, {n, g, {?MODULE, ?LINE, os:timestamp()}}).
130-define(T_KVL, [{foo, "foo"}, {bar, "bar"}]).
131-define(T_COUNTER, {c, g, {?MODULE, ?LINE}}).
132-define(T_RESOURCE, {r, g, {?MODULE, ?LINE}}).
133-define(T_PROP, {p, g, ?MODULE}).
134
135t_simple_reg([H|_] = Ns) ->
136    Name = ?T_NAME,
137    P = t_spawn_reg(H, Name),
138    ?assertMatch(ok, t_lookup_everywhere(Name, Ns, P)),
139    ?assertMatch(true, t_call(P, {apply, gproc, unreg, [Name]})),
140    ?assertMatch(ok, t_lookup_everywhere(Name, Ns, undefined)),
141    ?assertMatch(ok, t_call(P, die)).
142
143t_simple_reg_other([A, B|_] = Ns) ->
144    Name = ?T_NAME,
145    P1 = t_spawn(A),
146    P2 = t_spawn(B),
147    ?assertMatch(true, t_call(P1, {apply, gproc, reg_other, [Name, P2]})),
148    ?assertMatch(ok, t_lookup_everywhere(Name, Ns, P2)),
149    ?assertMatch(true, t_call(P1, {apply, gproc, unreg_other, [Name, P2]})),
150    ?assertMatch(ok, t_lookup_everywhere(Name, Ns, undefined)),
151    ?assertMatch(ok, t_call(P1, die)),
152    ?assertMatch(ok, t_call(P2, die)).
153
154t_simple_ensure([H|_] = Ns) ->
155    Name = ?T_NAME,
156    P = t_spawn_reg(H, Name),
157    ?assertMatch(ok, t_lookup_everywhere(Name, Ns, P)),
158    ?assertMatch(
159       updated, t_call(
160                  P, {apply, gproc, ensure_reg, [Name, new_val, [{a,1}]]})),
161    ?assertMatch(
162       [{a,1}], t_call(
163                  P, {apply, gproc, get_attributes, [Name]})),
164    ?assertMatch(ok, t_read_everywhere(Name, P, Ns, new_val)),
165    ?assertMatch(true, t_call(P, {apply, gproc, unreg, [Name]})),
166    ?assertMatch(ok, t_lookup_everywhere(Name, Ns, undefined)),
167    ?assertMatch(ok, t_call(P, die)).
168
169t_simple_ensure_other([A, B|_] = Ns) ->
170    Name = ?T_NAME,
171    P1 = t_spawn(A),
172    P2 = t_spawn(B),
173    ?assertMatch(true, t_call(P1, {apply, gproc, reg_other, [Name, P2]})),
174    ?assertMatch(ok, t_lookup_everywhere(Name, Ns, P2)),
175    ?assertMatch(
176       updated, t_call(
177                  P1, {apply, gproc, ensure_reg_other, [Name, P2, new_val]})),
178    ?assertMatch(ok, t_read_everywhere(Name, P2, Ns, new_val)),
179    ?assertMatch(true, t_call(P1, {apply, gproc, unreg_other, [Name, P2]})),
180    ?assertMatch(ok, t_lookup_everywhere(Name, Ns, undefined)),
181    ?assertMatch(ok, t_call(P1, die)),
182    ?assertMatch(ok, t_call(P2, die)).
183
184t_simple_reg_or_locate([A,B|_] = _Ns) ->
185    Name = ?T_NAME,
186    P1 = t_spawn(A),
187    Ref = erlang:monitor(process, P1),
188    ?assertMatch({P1, the_value},
189		 t_call(P1, {apply, gproc, reg_or_locate, [Name, the_value]})),
190    P2 = t_spawn(B),
191    Ref2 = erlang:monitor(process, P2),
192    ?assertMatch({P1, the_value},
193		 t_call(P2, {apply, gproc, reg_or_locate, [Name, other_value]})),
194    ?assertMatch(ok, t_call(P1, die)),
195    ?assertMatch(ok, t_call(P2, die)),
196    flush_down(Ref),
197    flush_down(Ref2).
198
199flush_down(Ref) ->
200    receive
201        {'DOWN', Ref, _, _, _} ->
202            ok
203    after 1000 ->
204            erlang:error({timeout, [flush_down, Ref]})
205    end.
206
207
208t_simple_counter([H|_] = Ns) ->
209    Ctr = ?T_COUNTER,
210    P = t_spawn_reg(H, Ctr, 3),
211    ?assertMatch(ok, t_read_everywhere(Ctr, P, Ns, 3)),
212    ?assertMatch(5, t_call(P, {apply, gproc, update_counter, [Ctr, 2]})),
213    ?assertMatch(ok, t_read_everywhere(Ctr, P, Ns, 5)),
214    ?assertMatch(ok, t_call(P, die)).
215
216t_shared_counter([H|_] = Ns) ->
217    Ctr = ?T_COUNTER,
218    P = t_spawn_reg_shared(H, Ctr, 3),
219    ?assertMatch(ok, t_read_everywhere(Ctr, shared, Ns, 3)),
220    ?assertMatch(5, t_call(P, {apply, gproc, update_shared_counter, [Ctr, 2]})),
221    ?assertMatch(ok, t_read_everywhere(Ctr, shared, Ns, 5)),
222    ?assertMatch(ok, t_call(P, die)),
223    ?assertMatch(ok, t_read_everywhere(Ctr, shared, Ns, 5)),
224    ?assertMatch(ok, t_read_everywhere(Ctr, shared, Ns, 5)), % twice
225    P1 = t_spawn(H),
226    ?assertMatch(true, t_call(P1, {apply, gproc, unreg_shared, [Ctr]})),
227    ?assertMatch(ok, t_read_everywhere(Ctr, shared, Ns, badarg)).
228
229
230t_aggr_counter([H1,H2|_] = Ns) ->
231    {c,g,Nm} = Ctr = ?T_COUNTER,
232    Aggr = {a,g,Nm},
233    Pc1 = t_spawn_reg(H1, Ctr, 3),
234    Pa = t_spawn_reg(H2, Aggr),
235    ?assertMatch(ok, t_read_everywhere(Ctr, Pc1, Ns, 3)),
236    ?assertMatch(ok, t_read_everywhere(Aggr, Pa, Ns, 3)),
237    Pc2 = t_spawn_reg(H2, Ctr, 3),
238    ?assertMatch(ok, t_read_everywhere(Ctr, Pc2, Ns, 3)),
239    ?assertMatch(ok, t_read_everywhere(Aggr, Pa, Ns, 6)),
240    ?assertMatch(5, t_call(Pc1, {apply, gproc, update_counter, [Ctr, 2]})),
241    ?assertMatch(ok, t_read_everywhere(Ctr, Pc1, Ns, 5)),
242    ?assertMatch(ok, t_read_everywhere(Aggr, Pa, Ns, 8)),
243    ?assertMatch(ok, t_call(Pc1, die)),
244    ?assertMatch(ok, t_read_everywhere(Aggr, Pa, Ns, 3)),
245    ?assertMatch(ok, t_call(Pc2, die)),
246    ?assertMatch(ok, t_call(Pa, die)).
247
248t_awaited_aggr_counter([H1,H2|_] = Ns) ->
249    {c,g,Nm} = Ctr = ?T_COUNTER,
250    Aggr = {a,g,Nm},
251    Pc1 = t_spawn_reg(H1, Ctr, 3),
252    P = t_spawn(H2),
253    Ref = erlang:monitor(process, P),
254    P ! {self(), Ref, {apply, gproc, await, [Aggr]}},
255    t_sleep(),
256    P1 = t_spawn_reg(H2, Aggr),
257    ?assert(P1 == receive
258                      {P, Ref, Res} ->
259                          element(1, Res);
260                      {'DOWN', Ref, _, _, Reason} ->
261                          erlang:error(Reason);
262                      Other ->
263                          erlang:error({received, Other})
264                  end),
265    ?assertMatch(ok, t_read_everywhere(Aggr, P1, Ns, 3)),
266    ?assertMatch(ok, t_call(Pc1, die)),
267    ?assertMatch(ok, t_call(P, die)),
268    flush_down(Ref),
269    ?assertMatch(ok, t_call(P1, die)).
270
271t_simple_resource_count([H1,H2|_] = Ns) ->
272    {r,g,Nm} = R = ?T_RESOURCE,
273    RC = {rc,g,Nm},
274    Pr1 = t_spawn_reg(H1, R, 3),
275    Prc = t_spawn_reg(H2, RC),
276    ?assertMatch(ok, t_read_everywhere(R, Pr1, Ns, 3)),
277    ?assertMatch(ok, t_read_everywhere(RC, Prc, Ns, 1)),
278    Pr2 = t_spawn_reg(H2, R, 4),
279    ?assertMatch(ok, t_read_everywhere(R, Pr2, Ns, 4)),
280    ?assertMatch(ok, t_read_everywhere(RC, Prc, Ns, 2)),
281    ?assertMatch(ok, t_call(Pr1, die)),
282    ?assertMatch(ok, t_read_everywhere(RC, Prc, Ns, 1)),
283    ?assertMatch(ok, t_call(Pr2, die)),
284    ?assertMatch(ok, t_call(Prc, die)).
285
286t_awaited_resource_count([H1,H2|_] = Ns) ->
287    {r,g,Nm} = R = ?T_RESOURCE,
288    RC = {rc,g,Nm},
289    Pr1 = t_spawn_reg(H1, R, 3),
290    P = t_spawn(H2),
291    Ref = erlang:monitor(process, P),
292    P ! {self(), Ref, {apply, gproc, await, [RC]}},
293    t_sleep(),
294    P1 = t_spawn_reg(H2, RC),
295    ?assert(P1 == receive
296                      {P, Ref, Res} ->
297                          element(1, Res);
298                      {'DOWN', Ref, _, _, Reason} ->
299                          erlang:error(Reason);
300                      Other ->
301                          erlang:error({received, Other})
302                  end),
303    ?assertMatch(ok, t_read_everywhere(RC, P1, Ns, 1)),
304    ?assertMatch(ok, t_call(Pr1, die)),
305    ?assertMatch(ok, t_call(P, die)),
306    flush_down(Ref),
307    ?assertMatch(ok, t_call(P1, die)).
308
309t_resource_count_on_zero([H1,H2|_] = Ns) ->
310    {r,g,Nm} = R = ?T_RESOURCE,
311    Prop = ?T_PROP,
312    RC = {rc,g,Nm},
313    Pr1 = t_spawn_reg(H1, R, 3),
314    Pp = t_spawn_reg(H2, Prop),
315    ?assertMatch(ok, t_call(Pp, {selective, true})),
316    Prc = t_spawn_reg(H2, RC, undefined, [{on_zero, [{send, Prop}]}]),
317    ?assertMatch(ok, t_read_everywhere(R, Pr1, Ns, 3)),
318    ?assertMatch(ok, t_read_everywhere(RC, Prc, Ns, 1)),
319    ?assertMatch(ok, t_call(Pr1, die)),
320    ?assertMatch(ok, t_read_everywhere(RC, Prc, Ns, 0)),
321    ?assertMatch({gproc, resource_on_zero, g, Nm, Prc},
322                 t_call(Pp, {apply_fun, fun() ->
323                                                receive
324                                                    {gproc, _, _, _, _} = M ->
325                                                        M
326                                                after 10000 ->
327                                                        timeout
328                                                end
329                                        end})),
330    ?assertMatch(ok, t_call(Pp, {selective, false})),
331    ?assertMatch(ok, t_call(Pp, die)),
332    ?assertMatch(ok, t_call(Prc, die)).
333
334t_update_counters([H1,H2|_] = Ns) ->
335    {c,g,N1} = C1 = ?T_COUNTER,
336    A1 = {a,g,N1},
337    C2 = ?T_COUNTER,
338    P1 = t_spawn_reg(H1, C1, 2),
339    P12 = t_spawn_reg(H2, C1, 2),
340    P2 = t_spawn_reg(H2, C2, 1),
341    Pa1 = t_spawn_reg(H2, A1),
342    ?assertMatch(ok, t_read_everywhere(C1, P1, Ns, 2)),
343    ?assertMatch(ok, t_read_everywhere(C1, P12, Ns, 2)),
344    ?assertMatch(ok, t_read_everywhere(C2, P2, Ns, 1)),
345    ?assertMatch(ok, t_read_everywhere(A1, Pa1, Ns, 4)),
346    ?assertMatch([{C1,P1, 3},
347		  {C1,P12,4},
348		  {C2,P2, 0}], t_call(P1, {apply, gproc, update_counters,
349					   [g, [{C1,P1,1},{C1,P12,2},{C2,P2,{-2,0,0}}]]})),
350    ?assertMatch(ok, t_read_everywhere(C1, P1, Ns, 3)),
351    ?assertMatch(ok, t_read_everywhere(C1, P12, Ns, 4)),
352    ?assertMatch(ok, t_read_everywhere(C2, P2, Ns, 0)),
353    ?assertMatch(ok, t_read_everywhere(A1, Pa1, Ns, 7)),
354    ?assertMatch(ok, t_call(P1, die)),
355    ?assertMatch(ok, t_call(P12, die)),
356    ?assertMatch(ok, t_call(P2, die)).
357
358t_prop([H1,H2|_] = Ns) ->
359    {p, g, _} = P = ?T_PROP,
360    P1 = t_spawn_reg(H1, P, 1),
361    P2 = t_spawn_reg(H2, P, 2),
362    ?assertMatch(ok, t_read_everywhere(P, P1, Ns, 1)),
363    ?assertMatch(ok, t_read_everywhere(P, P2, Ns, 2)),
364    ?assertMatch(ok, t_call(P1, die)),
365    ?assertMatch(ok, t_read_everywhere(P, P1, Ns, badarg)),
366    ?assertMatch(ok, t_call(P2, die)).
367
368t_mreg([H|_] = Ns) ->
369    Kvl = ?T_KVL,
370    Keys = [K || {K,_} <- Kvl],
371    P = t_spawn_mreg(H, Kvl),
372    [?assertMatch(ok, t_lookup_everywhere({n,g,K}, Ns, P)) || K <- Keys],
373    ?assertMatch(true, t_call(P, {apply, gproc, munreg, [n, g, Keys]})),
374    [?assertMatch(ok, t_lookup_everywhere({n,g,K},Ns,undefined)) || K <- Keys],
375    ?assertMatch(ok, t_call(P, die)).
376
377t_await_reg([A,B|_] = Ns) ->
378    Name = ?T_NAME,
379    P = t_spawn(A),
380    Ref = erlang:monitor(process, P),
381    P ! {self(), Ref, {apply, gproc, await, [Name]}},
382    t_sleep(),
383    P1 = t_spawn_reg(B, Name),
384    ?assert(P1 == receive
385		      {P, Ref, Res} ->
386			  element(1, Res);
387		      {'DOWN', Ref, _, _, Reason} ->
388			  erlang:error(Reason);
389		      Other ->
390			  erlang:error({received,Other})
391		  end),
392    ?assertMatch(ok, t_call(P, die)),
393    flush_down(Ref),
394    ?assertMatch(ok, t_lookup_everywhere(Name, Ns, P1)),
395    ?assertMatch(ok, t_call(P1, die)).
396
397t_await_self([A|_]) ->
398    Name = ?T_NAME,
399    P = t_spawn(A, false),  % don't buffer unknowns
400    Ref = t_call(P, {apply, gproc, nb_wait, [Name]}),
401    ?assertMatch(ok, t_call(P, {selective, true})),
402    ?assertMatch(true, t_call(P, {apply, gproc, reg, [Name, some_value]})),
403    ?assertMatch({registered, {Name, P, some_value}},
404		 t_call(P, {apply_fun, fun() ->
405					       receive
406						   {gproc, Ref, R, Wh} ->
407						       {R, Wh}
408					       after 10000 ->
409						       timeout
410					       end
411				       end})),
412    ?assertMatch(ok, t_call(P, {selective, false})),
413    ?assertMatch(true, t_call(P, {apply, gproc, unreg, [Name]})).
414
415t_await_reg_exists([A,B|_]) ->
416    Name = ?T_NAME,
417    P = t_spawn(A),
418    Ref = erlang:monitor(process, P),
419    P1 = t_spawn_reg(B, Name),
420    P ! {self(), Ref, {apply, gproc, await, [Name]}},
421    ?assert(P1 == receive
422		      {P, Ref, Res} ->
423			  element(1, Res);
424		      {'DOWN', Ref, _, _, Reason} ->
425			  erlang:error(Reason);
426		      Other ->
427			  erlang:error({received,Other})
428		  end),
429    ?assertMatch(ok, t_call(P, die)),
430    ?assertMatch(ok, t_call(P1, die)).
431
432t_give_away([A,B|_] = Ns) ->
433    Na = ?T_NAME,
434    Nb = ?T_NAME,
435    Pa = t_spawn_reg(A, Na),
436    Pb = t_spawn_reg(B, Nb),
437    ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pa)),
438    ?assertMatch(ok, t_lookup_everywhere(Nb, Ns, Pb)),
439    ?assertMatch(Pb, t_call(Pa, {apply, gproc, give_away, [Na, Nb]})),
440    ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pb)),
441    ?assertMatch(Pa, t_call(Pb, {apply, gproc, give_away, [Na, Pa]})),
442    ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pa)),
443    ?assertMatch(ok, t_call(Pa, die)),
444    ?assertMatch(ok, t_call(Pb, die)).
445
446t_sync(Ns) ->
447    %% Don't really know how to test this...
448    [?assertMatch(true, rpc:call(N, gproc_dist, sync, []))
449     || N <- Ns].
450
451t_monitor([A,B|_]) ->
452    Na = ?T_NAME,
453    Pa = t_spawn_reg(A, Na),
454    Pb = t_spawn(B, _Selective = true),
455    Ref = t_call(Pb, {apply, gproc, monitor, [Na]}),
456    ?assert(is_reference(Ref)),
457    ?assertMatch(ok, t_call(Pa, die)),
458    ?assertMatch({gproc,unreg,Ref,Na}, got_msg(Pb, gproc)),
459    Pc = t_spawn_reg(A, Na),
460    Ref1 = t_call(Pb, {apply, gproc, monitor, [Na]}),
461    ?assertMatch(true, t_call(Pc, {apply, gproc, unreg, [Na]})),
462    ?assertMatch({gproc,unreg,Ref1,Na}, got_msg(Pb, gproc)).
463
464t_standby_monitor([A,B|_] = Ns) ->
465    Na = ?T_NAME,
466    Pa = t_spawn_reg(A, Na),
467    Pb = t_spawn(B, _Selective = true),
468    Ref = t_call(Pb, {apply, gproc, monitor, [Na, standby]}),
469    ?assert(is_reference(Ref)),
470    ?assertMatch(ok, t_call(Pa, die)),
471    ?assertMatch({gproc,{failover,Pb},Ref,Na}, got_msg(Pb, gproc)),
472    ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pb)),
473    Pc = t_spawn(A, true),
474    Ref1 = t_call(Pc, {apply, gproc, monitor, [Na, standby]}),
475    ?assertMatch(true, t_call(Pb, {apply, gproc, unreg, [Na]})),
476    ?assertMatch({gproc,unreg,Ref1,Na}, got_msg(Pc, gproc)),
477    ?assertMatch(ok, t_lookup_everywhere(Na, Ns, undefined)).
478
479t_standby_monitor_unreg([A|_] = Ns) ->
480    Na = ?T_NAME,
481    Pa = t_spawn(A, _Selective = true),
482    Ref = t_call(Pa, {apply, gproc, monitor, [Na, standby]}),
483    ?assert(is_reference(Ref)),
484    ?assertMatch({gproc,{failover,Pa},Ref,Na}, got_msg(Pa, gproc)),
485    ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pa)),
486    ?assertMatch(ok, t_call(Pa, die)),
487    ?assertMatch(ok, t_lookup_everywhere(Na, Ns, undefined)).
488
489t_follow_monitor([A,B|_]) ->
490    Na = ?T_NAME,
491    Pa = t_spawn(A, _Selective = true),
492    Ref = t_call(Pa, {apply, gproc, monitor, [Na, follow]}),
493    Msg1 = {gproc,unreg,Ref,Na},
494    {Msg1, Msg1} = {got_msg(Pa), Msg1},
495    Pb = t_spawn_reg(B, Na),
496    Msg2 = {gproc,registered,Ref,Na},
497    {Msg2, Msg2} = {got_msg(Pa), Msg2},
498    ok = t_call(Pb, die),
499    ok = t_call(Pa, die).
500
501t_monitor_demonitor([A,B|_]) ->
502    Na = ?T_NAME,
503    Pa = t_spawn(A, Selective = true),
504    Pa2 = t_spawn(A, Selective),
505    Pb = t_spawn(B, Selective),
506    Pb2 = t_spawn(B, Selective),
507    RefA = t_call(Pa, {apply, gproc, monitor, [Na, follow]}),
508    RefA2 = t_call(Pa2, {apply, gproc, monitor, [Na, follow]}),
509    RefB = t_call(Pb, {apply, gproc, monitor, [Na, follow]}),
510    RefB2 = t_call(Pb2, {apply, gproc, monitor, [Na, follow]}),
511    Msg1 = {gproc, unreg, RefA, Na},
512    {Msg1, Msg1} = {got_msg(Pa), Msg1},
513    Msg2 = {gproc, unreg, RefA2, Na},
514    {Msg2, Msg2} = {got_msg(Pa2), Msg2},
515    Msg3 = {gproc, unreg, RefB, Na},
516    {Msg3, Msg3} = {got_msg(Pb), Msg3},
517    Msg4 = {gproc, unreg, RefB2, Na},
518    {Msg4, Msg4} = {got_msg(Pb2), Msg4},
519    ok = t_call(Pa, {apply, gproc, demonitor, [Na, RefA]}),
520    ok = t_call(Pb, {apply, gproc, demonitor, [Na, RefB]}),
521    Pr = t_spawn_reg(B, Na),
522    Msg5 = {gproc, registered, RefA2, Na},
523    {Msg5, Msg5} = {got_msg(Pa2), Msg5},
524    Msg6 = {gproc, registered, RefB2, Na},
525    {Msg6, Msg6} = {got_msg(Pb2), Msg6},
526    ok = no_msg(Pa, 500),
527    ok = no_msg(Pb, 500),
528    [ ok = t_call(P, die) || P <- [Pa, Pa2, Pb, Pb2, Pr]],
529    ok.
530
531t_subscribe([A,B|_] = Ns) ->
532    Na = ?T_NAME,
533    Pb = t_spawn(B, _Selective = true),
534    ?assertEqual(ok, t_call(Pb, {apply, gproc_monitor, subscribe, [Na]})),
535    ?assertMatch({gproc_monitor, Na, undefined}, got_msg(Pb, gproc_monitor)),
536    Pa = t_spawn_reg(A, Na),
537    ?assertMatch({gproc_monitor, Na, Pa}, got_msg(Pb, gproc_monitor)),
538    Pc = t_spawn(A),
539    t_call(Pa, {apply, gproc, give_away, [Na, Pc]}),
540    ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pc)),
541    ?assertEqual({gproc_monitor,Na,{migrated,Pc}}, got_msg(Pb, gproc_monitor)),
542    ?assertEqual(ok, t_call(Pc, die)),
543    ?assertEqual({gproc_monitor,Na,undefined}, got_msg(Pb, gproc_monitor)),
544    ok.
545
546%% got_msg(Pb, Tag) ->
547%%     t_call(Pb,
548%% 	   {apply_fun,
549%% 	    fun() ->
550%% 		    receive
551%% 			M when element(1, M) == Tag ->
552%% 			    M
553%% 		    after 1000 ->
554%% 			    erlang:error({timeout, got_msg, [Pb, Tag]})
555%% 		    end
556%% 	    end}).
557
558%% Verify that the gproc_dist:sync() call returns true even if a candidate dies
559%% while the sync is underway. This test makes use of sys:suspend() to ensure that
560%% the other candidate doesn't respond too quickly.
561t_sync_cand_dies([A,B,C]) ->
562    Leader = rpc:call(A, gproc_dist, get_leader, []),
563    Other = case Leader of
564		A -> B;
565		B -> A;
566                C -> A
567	    end,
568    ?assertMatch(ok, rpc:call(Other, sys, suspend, [gproc_dist])),
569    P = rpc:call(Other, erlang, whereis, [gproc_dist]),
570    Key = rpc:async_call(Leader, gproc_dist, sync, []),
571    %% The overall timeout for gproc_dist:sync() is 10 seconds. Here, we should
572    %% still be waiting.
573    ?assertMatch(timeout, rpc:nb_yield(Key, 1000)),
574    exit(P, kill),
575    %% The leader should detect that the other candidate died and respond
576    %% immediately. Therefore, we should have our answer well within 1 sec.
577    ?assertMatch({value, true}, rpc:nb_yield(Key, 1000)).
578
579
580%% Verify that the registry updates consistently if a non-leader node
581%% dies.
582t_fail_node(Ns) ->
583    Leader = rpc:call(hd(Ns), gproc_dist, get_leader, []),
584    [A,B] = Ns -- [Leader],
585    Na = ?T_NAME,
586    Nb = ?T_NAME,
587    Pa = t_spawn_reg(A, Na),
588    Pb = t_spawn_reg(B, Nb),
589    ?assertMatch(ok, rpc:call(A, application, stop, [gproc])),
590    ?assertMatch(ok, t_lookup_everywhere(Na, Ns -- [A], undefined)),
591    ?assertMatch(ok, t_lookup_everywhere(Nb, Ns -- [A], Pb)),
592    ?assertMatch(ok, rpc:call(A, application, start, [gproc])),
593    ?assertMatch(ok, t_lookup_everywhere(Na, Ns, undefined)),
594    ?assertMatch(ok, t_lookup_everywhere(Nb, Ns, Pb)),
595    ?assertMatch(ok, t_call(Pa, die)),
596    ?assertMatch(ok, t_call(Pb, die)).
597
598t_master_dies([A,B,C] = Ns) ->
599    Na = ?T_NAME,
600    Nb = ?T_NAME,
601    Nc = ?T_NAME,
602    Pa = t_spawn_reg(A, Na),
603    Pb = t_spawn_reg(B, Nb),
604    Pc = t_spawn_reg(C, Nc),
605    L = rpc:call(A, gproc_dist, get_leader, []),
606    ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pa)),
607    ?assertMatch(ok, t_lookup_everywhere(Nb, Ns, Pb)),
608    ?assertMatch(ok, t_lookup_everywhere(Nc, Ns, Pc)),
609    {Nl, Pl} = case L of
610                   A -> {Na, Pa};
611                   B -> {Nb, Pb};
612                   C -> {Nc, Pc}
613               end,
614    ?assertMatch(true, rpc:call(A, gproc_dist, sync, [])),
615    ?assertMatch(ok, rpc:call(L, application, stop, [gproc])),
616    Names = [{Na,Pa}, {Nb,Pb}, {Nc,Pc}] -- [{Nl, Pl}],
617    RestNs = Ns -- [L],
618    %% ?assertMatch(true, rpc:call(hd(RestNs), gproc_dist, sync, [])),
619    ?assertMatch(true, try_sync(hd(RestNs), RestNs)),
620    ?assertMatch(ok, t_lookup_everywhere(Nl, RestNs, undefined)),
621    [?assertMatch(ok, t_lookup_everywhere(Nx, RestNs, Px))
622     || {Nx, Px} <- Names],
623    ok.
624
625try_sync(N, Ns) ->
626    case rpc:call(N, gproc_dist, sync, []) of
627        {badrpc, _} = Err ->
628            ?debugFmt(
629               "Error in gproc_dist:sync() (~p):~n"
630               "  ~p~n"
631               "Status = ~p~n",
632               [Err, N,
633                {Ns, rpc:multicall([N|Ns], sys, get_status, [gproc_dist])}]),
634            Err;
635        true ->
636            true
637    end.
638
639t_sleep() ->
640    timer:sleep(500).
641
642t_lookup_everywhere(Key, Nodes, Exp) ->
643    true = rpc:call(hd(Nodes), gproc_dist, sync, []),
644    t_lookup_everywhere(Key, Nodes, Exp, 3).
645
646t_lookup_everywhere(Key, _, Exp, 0) ->
647    {lookup_failed, Key, Exp};
648t_lookup_everywhere(Key, Nodes, Exp, I) ->
649    Expected = [{N, Exp} || N <- Nodes],
650    Found = [{N,rpc:call(N, gproc, where, [Key])} || N <- Nodes],
651    if Expected =/= Found ->
652	    ?debugFmt("lookup ~p failed~n"
653		      "(Expected: ~p;~n"
654		      " Found   : ~p), retrying...~n",
655		      [Key, Expected, Found]),
656	    t_sleep(),
657	    t_lookup_everywhere(Key, Nodes, Exp, I-1);
658       true ->
659	    ok
660    end.
661
662t_read_everywhere(Key, Pid, Nodes, Exp) ->
663    true = rpc:call(hd(Nodes), gproc_dist, sync, []),
664    t_read_everywhere(Key, Pid, Nodes, Exp, 3).
665
666t_read_everywhere(Key, _, _, Exp, 0) ->
667    {read_failed, Key, Exp};
668t_read_everywhere(Key, Pid, Nodes, Exp, I) ->
669    Expected = [{N, Exp} || N <- Nodes],
670    Found = [{N, read_result(rpc:call(N, gproc, get_value, [Key, Pid]))}
671	     || N <- Nodes],
672    if Expected =/= Found ->
673	    ?debugFmt("read ~p failed~n"
674		      "(Expected: ~p;~n"
675		      " Found   : ~p), retrying...~n",
676		      [{Key, Pid}, Expected, Found]),
677	    t_sleep(),
678	    t_read_everywhere(Key, Pid, Nodes, Exp, I-1);
679       true ->
680	    ok
681    end.
682
683read_result({badrpc, {'EXIT', {badarg, _}}}) -> badarg;
684read_result(R) -> R.
685
686t_spawn(Node) -> gproc_test_lib:t_spawn(Node).
687t_spawn(Node, Selective) -> gproc_test_lib:t_spawn(Node, Selective).
688t_spawn_mreg(Node, KVL) -> gproc_test_lib:t_spawn_mreg(Node, KVL).
689t_spawn_reg(Node, N) -> gproc_test_lib:t_spawn_reg(Node, N).
690t_spawn_reg(Node, N, V) -> gproc_test_lib:t_spawn_reg(Node, N, V).
691t_spawn_reg(Node, N, V, As) -> gproc_test_lib:t_spawn_reg(Node, N, V, As).
692t_spawn_reg_shared(Node, N, V) -> gproc_test_lib:t_spawn_reg_shared(Node, N, V).
693got_msg(P) -> gproc_test_lib:got_msg(P).
694got_msg(P, Tag) -> gproc_test_lib:got_msg(P, Tag).
695no_msg(P, Timeout) -> gproc_test_lib:no_msg(P, Timeout).
696
697t_call(P, Req) ->
698    gproc_test_lib:t_call(P, Req).
699
700start_slaves(Ns) ->
701    [H|T] = Nodes = [start_slave(N) || N <- Ns],
702    _ = [rpc:call(H, net_adm, ping, [N]) || N <- T],
703    Nodes.
704
705start_slave(Name) ->
706    case node() of
707        nonode@nohost ->
708            os:cmd("epmd -daemon"),
709            {ok, _} = net_kernel:start([gproc_master, shortnames]);
710        _ ->
711            ok
712    end,
713    {Pa, Pz} = paths(),
714    Paths = "-pa ./ -pz ../ebin" ++
715        lists:flatten([[" -pa " ++ Path || Path <- Pa],
716		       [" -pz " ++ Path || Path <- Pz]]),
717    {ok, Node} = slave:start(host(), Name, Paths),
718    Node.
719
720paths() ->
721    Path = code:get_path(),
722    {ok, [[Root]]} = init:get_argument(root),
723    {Pas, Rest} = lists:splitwith(fun(P) ->
724					  not lists:prefix(Root, P)
725				  end, Path),
726    {_, Pzs} = lists:splitwith(fun(P) ->
727				       lists:prefix(Root, P)
728			       end, Rest),
729    {Pas, Pzs}.
730
731
732host() ->
733    [_Name, Host] = re:split(atom_to_list(node()), "@", [{return, list}]),
734    list_to_atom(Host).
735
736-endif.
737