1%%
2%%
3%% Copyright WhatsApp Inc. and its affiliates. All rights reserved.
4%%
5%% Licensed under the Apache License, Version 2.0 (the "License");
6%% you may not use this file except in compliance with the License.
7%% You may obtain a copy of the License at
8%%
9%%     http://www.apache.org/licenses/LICENSE-2.0
10%%
11%% Unless required by applicable law or agreed to in writing, software
12%% distributed under the License is distributed on an "AS IS" BASIS,
13%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14%% See the License for the specific language governing permissions and
15%% limitations under the License.
16%%
17%%-------------------------------------------------------------------
18%% @author Maxim Fedorov <maximfca@gmail.com>
19%%     Process Groups smoke test.
20-module(pg_SUITE).
21-author("maximfca@gmail.com").
22
23%% Test server callbacks
24-export([
25    suite/0,
26    all/0,
27    groups/0,
28    init_per_suite/1,
29    end_per_suite/1,
30    init_per_testcase/2,
31    end_per_testcase/2,
32    stop_proc/1,
33    ensure_peers_info/2
34]).
35
36%% Test cases exports
37-export([
38    pg/0, pg/1,
39    errors/0, errors/1,
40    leave_exit_race/0, leave_exit_race/1,
41    dyn_distribution/0, dyn_distribution/1,
42    process_owner_check/0, process_owner_check/1,
43    overlay_missing/0, overlay_missing/1,
44    single/0, single/1,
45    two/1,
46    empty_group_by_remote_leave/0, empty_group_by_remote_leave/1,
47    thundering_herd/0, thundering_herd/1,
48    initial/1,
49    netsplit/1,
50    trisplit/1,
51    foursplit/1,
52    exchange/1,
53    nolocal/1,
54    double/1,
55    scope_restart/1,
56    missing_scope_join/1,
57    disconnected_start/1,
58    forced_sync/0, forced_sync/1,
59    group_leave/1
60]).
61
62-export([
63    control/1,
64    controller/3
65]).
66
67-include_lib("common_test/include/ct.hrl").
68-include_lib("stdlib/include/assert.hrl").
69
70suite() ->
71    [{timetrap, {seconds, 60}}].
72
73init_per_suite(Config) ->
74    case erlang:is_alive() of
75        false ->
76            %% verify epmd running (otherwise next call fails)
77            (erl_epmd:names("localhost") =:= {error, address}) andalso ([] = os:cmd("epmd -daemon")),
78            %% start a random node name
79            NodeName = list_to_atom(lists:concat([atom_to_list(?MODULE), "_", os:getpid()])),
80            {ok, Pid} = net_kernel:start([NodeName, shortnames]),
81            [{distribution, Pid} | Config];
82        true ->
83            Config
84    end.
85
86end_per_suite(Config) ->
87    is_pid(proplists:get_value(distribution, Config)) andalso net_kernel:stop().
88
89init_per_testcase(TestCase, Config) ->
90    {ok, _Pid} = pg:start_link(TestCase),
91    Config.
92
93end_per_testcase(TestCase, _Config) ->
94    gen_server:stop(TestCase),
95    ok.
96
97all() ->
98    [dyn_distribution, {group, basic}, {group, cluster}, {group, performance}].
99
100groups() ->
101    [
102        {basic, [parallel], [errors, pg, leave_exit_race, single, overlay_missing]},
103        {performance, [sequential], [thundering_herd]},
104        {cluster, [parallel], [process_owner_check, two, initial, netsplit, trisplit, foursplit,
105            exchange, nolocal, double, scope_restart, missing_scope_join, empty_group_by_remote_leave,
106            disconnected_start, forced_sync, group_leave]}
107    ].
108
109%%--------------------------------------------------------------------
110%% TEST CASES
111
112pg() ->
113    [{doc, "This test must be names pg, to stay inline with default scope"}].
114
115pg(_Config) ->
116    ?assertNotEqual(undefined, whereis(?FUNCTION_NAME)), %% ensure scope was started
117    ?assertEqual(ok, pg:join(?FUNCTION_NAME, self())),
118    ?assertEqual([self()], pg:get_local_members(?FUNCTION_NAME)),
119    ?assertEqual([?FUNCTION_NAME], pg:which_groups()),
120    ?assertEqual([?FUNCTION_NAME], pg:which_local_groups()),
121    ?assertEqual(ok, pg:leave(?FUNCTION_NAME, self())),
122    ?assertEqual([], pg:get_members(?FUNCTION_NAME)),
123    ?assertEqual([], pg:which_groups(?FUNCTION_NAME)),
124    ?assertEqual([], pg:which_local_groups(?FUNCTION_NAME)).
125
126errors() ->
127    [{doc, "Tests that errors are handled as expected, for example pg server crashes when it needs to"}].
128
129errors(_Config) ->
130    %% kill with 'info' and 'cast'
131    ?assertException(error, badarg, pg:handle_info(garbage, garbage)),
132    ?assertException(error, badarg, pg:handle_cast(garbage, garbage)),
133    %% kill with call
134    {ok, _Pid} = pg:start(second),
135    ?assertException(exit, {{badarg, _}, _}, gen_server:call(second, garbage, 100)).
136
137leave_exit_race() ->
138    [{doc, "Tests that pg correctly handles situation when leave and 'DOWN' messages are both in pg queue"}].
139
140leave_exit_race(Config) when is_list(Config) ->
141    process_flag(priority, high),
142    [
143        begin
144            Pid = spawn(fun () -> ok end),
145            pg:join(leave_exit_race, test, Pid),
146            pg:leave(leave_exit_race, test, Pid)
147        end
148        || _ <- lists:seq(1, 100)].
149
150single() ->
151    [{doc, "Tests single node groups"}, {timetrap, {seconds, 5}}].
152
153single(Config) when is_list(Config) ->
154    ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, self())),
155    ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, [self(), self()])),
156    ?assertEqual([self(), self(), self()], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
157    ?assertEqual([self(), self(), self()], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
158    ?assertEqual(not_joined, pg:leave(?FUNCTION_NAME, '$missing$', self())),
159    ?assertEqual(ok, pg:leave(?FUNCTION_NAME, ?FUNCTION_NAME, [self(), self()])),
160    ?assertEqual(ok, pg:leave(?FUNCTION_NAME, ?FUNCTION_NAME, self())),
161    ?assertEqual([], pg:which_groups(?FUNCTION_NAME)),
162    ?assertEqual([], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
163    ?assertEqual([], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
164    %% double
165    ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, self())),
166    Pid = erlang:spawn(forever()),
167    ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, Pid)),
168    Expected = lists:sort([Pid, self()]),
169    ?assertEqual(Expected, lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))),
170    ?assertEqual(Expected, lists:sort(pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME))),
171
172    stop_proc(Pid),
173    sync(?FUNCTION_NAME),
174    ?assertEqual([self()], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
175    ?assertEqual(ok, pg:leave(?FUNCTION_NAME, ?FUNCTION_NAME, self())),
176    ok.
177
178dyn_distribution() ->
179    [{doc, "Tests that local node when distribution is started dynamically is not treated as remote node"}].
180
181dyn_distribution(Config) when is_list(Config) ->
182    %% When distribution is started or stopped dynamically,
183    %%  there is a nodeup/nodedown message delivered to pg
184    %% It is possible but non-trivial to simulate this
185    %%  behaviour with starting slave nodes being not
186    %%  distributed, and calling net_kernel:start/1, however
187    %%  the effect is still the same as simply sending nodeup,
188    %%  which is also documented.
189    ?FUNCTION_NAME ! {nodeup, node()},
190    %%
191    ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, self())),
192    ?assertEqual([self()], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
193    ok.
194
195process_owner_check() ->
196    [{doc, "Tests that process owner is local node"}].
197
198process_owner_check(Config) when is_list(Config) ->
199    {TwoPeer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
200    %% spawn remote process
201    LocalPid = erlang:spawn(forever()),
202    RemotePid = erlang:spawn(TwoPeer, forever()),
203    %% check they can't be joined locally
204    ?assertException(error, {nolocal, _}, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid)),
205    ?assertException(error, {nolocal, _}, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, [RemotePid, RemotePid])),
206    ?assertException(error, {nolocal, _}, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, [LocalPid, RemotePid])),
207    %% check that non-pid also triggers error
208    ?assertException(error, function_clause, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, undefined)),
209    ?assertException(error, {nolocal, _}, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, [undefined])),
210    %% stop the peer
211    stop_node(TwoPeer, Socket),
212    ok.
213
214overlay_missing() ->
215    [{doc, "Tests that scope process that is not a part of overlay network does not change state"}].
216
217overlay_missing(_Config) ->
218    {TwoPeer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
219    %% join self (sanity check)
220    ?assertEqual(ok, pg:join(?FUNCTION_NAME, group, self())),
221    %% remember pid from remote
222    PgPid = rpc:call(TwoPeer, erlang, whereis, [?FUNCTION_NAME]),
223    RemotePid = erlang:spawn(TwoPeer, forever()),
224    %% stop remote scope
225    gen_server:stop(PgPid),
226    %% craft white-box request: ensure it's rejected
227    ?FUNCTION_NAME ! {join, PgPid, group, RemotePid},
228    %% rejected!
229    ?assertEqual([self()], pg:get_members(?FUNCTION_NAME, group)),
230    %% ... reject leave too
231    ?FUNCTION_NAME ! {leave, PgPid, RemotePid, [group]},
232    ?assertEqual([self()], pg:get_members(?FUNCTION_NAME, group)),
233    %% join many times on remote
234    %RemotePids = [erlang:spawn(TwoPeer, forever()) || _ <- lists:seq(1, 1024)],
235    %?assertEqual(ok, rpc:call(TwoPeer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, Pid2])),
236    %% check they can't be joined locally
237    %?assertException(error, {nolocal, _}, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid)),
238    %?assertException(error, {nolocal, _}, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, [RemotePid, RemotePid])),
239    %?assertException(error, {nolocal, _}, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, [LocalPid, RemotePid])),
240    %% check that non-pid also triggers error
241    %?assertException(error, function_clause, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, undefined)),
242    %?assertException(error, {nolocal, _}, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, [undefined])),
243    %% stop the peer
244    stop_node(TwoPeer, Socket).
245
246
247two(Config) when is_list(Config) ->
248    {TwoPeer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
249    Pid = erlang:spawn(forever()),
250    ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, Pid)),
251    ?assertEqual([Pid], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
252    %% first RPC must be serialised
253    sync({?FUNCTION_NAME, TwoPeer}),
254    ?assertEqual([Pid], rpc:call(TwoPeer, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME])),
255    ?assertEqual([], rpc:call(TwoPeer, pg, get_local_members, [?FUNCTION_NAME, ?FUNCTION_NAME])),
256    stop_proc(Pid),
257    %% again, must be serialised
258    sync(?FUNCTION_NAME),
259    ?assertEqual([], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
260    ?assertEqual([], rpc:call(TwoPeer, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME])),
261
262    Pid2 = erlang:spawn(TwoPeer, forever()),
263    Pid3 = erlang:spawn(TwoPeer, forever()),
264    ?assertEqual(ok, rpc:call(TwoPeer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, Pid2])),
265    ?assertEqual(ok, rpc:call(TwoPeer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, Pid3])),
266    %% serialise through the *other* node
267    sync({?FUNCTION_NAME, TwoPeer}),
268    ?assertEqual(lists:sort([Pid2, Pid3]),
269        lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))),
270    %% stop the peer
271    stop_node(TwoPeer, Socket),
272    %% hope that 'nodedown' comes before we route our request
273    sync(?FUNCTION_NAME),
274    ok.
275
276empty_group_by_remote_leave() ->
277    [{doc, "Empty group should be deleted from nodes."}].
278
279empty_group_by_remote_leave(Config) when is_list(Config) ->
280    {TwoPeer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
281    RemoteNode = rpc:call(TwoPeer, erlang, whereis, [?FUNCTION_NAME]),
282    RemotePid = erlang:spawn(TwoPeer, forever()),
283    % remote join
284    ?assertEqual(ok, rpc:call(TwoPeer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])),
285    sync({?FUNCTION_NAME, TwoPeer}),
286    ?assertEqual([RemotePid], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
287    % inspecting internal state is not best practice, but there's no other way to check if the state is correct.
288    {state, _, _, #{RemoteNode := {_, RemoteMap}}} = sys:get_state(?FUNCTION_NAME),
289    ?assertEqual(#{?FUNCTION_NAME => [RemotePid]}, RemoteMap),
290    % remote leave
291    ?assertEqual(ok, rpc:call(TwoPeer, pg, leave, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])),
292    sync({?FUNCTION_NAME, TwoPeer}),
293    ?assertEqual([], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
294    {state, _, _, #{RemoteNode := {_, NewRemoteMap}}} = sys:get_state(?FUNCTION_NAME),
295    % empty group should be deleted.
296    ?assertEqual(#{}, NewRemoteMap),
297
298    stop_node(TwoPeer, Socket),
299    ok.
300
301thundering_herd() ->
302    [{doc, "Thousands of overlay network nodes sending sync to us, and we time out!"}, {timetrap, {seconds, 5}}].
303
304thundering_herd(Config) when is_list(Config) ->
305    GroupCount = 10000,
306    SyncCount = 2000,
307    %% make up a large amount of groups
308    [pg:join(?FUNCTION_NAME, {group, Seq}, self()) || Seq <- lists:seq(1, GroupCount)],
309    %% initiate a few syncs - and those are really slow...
310    {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
311    PeerPid = erlang:spawn(Peer, forever()),
312    PeerPg = rpc:call(Peer, erlang, whereis, [?FUNCTION_NAME], 1000),
313    %% WARNING: code below acts for white-box! %% WARNING
314    FakeSync = [{{group, 1}, [PeerPid, PeerPid]}],
315    [gen_server:cast(?FUNCTION_NAME, {sync, PeerPg, FakeSync}) || _ <- lists:seq(1, SyncCount)],
316    %% next call must not timetrap, otherwise test fails
317    pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, self()),
318    stop_node(Peer, Socket).
319
320initial(Config) when is_list(Config) ->
321    Pid = erlang:spawn(forever()),
322    ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, Pid)),
323    ?assertEqual([Pid], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
324    {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
325    %% first RPC must be serialised
326    sync({?FUNCTION_NAME, Peer}),
327    ?assertEqual([Pid], rpc:call(Peer, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME])),
328
329    ?assertEqual([], rpc:call(Peer, pg, get_local_members, [?FUNCTION_NAME, ?FUNCTION_NAME])),
330    stop_proc(Pid),
331    sync({?FUNCTION_NAME, Peer}),
332    ?assertEqual([], rpc:call(Peer, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME])),
333    stop_node(Peer, Socket),
334    ok.
335
336netsplit(Config) when is_list(Config) ->
337    {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
338    ?assertEqual(Peer, rpc(Socket, erlang, node, [])), %% just to test RPC
339    RemoteOldPid = erlang:spawn(Peer, forever()),
340    ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, '$invisible', RemoteOldPid])),
341    %% hohoho, partition!
342    disconnect_nodes([Peer]),
343    ?assertEqual(Peer, rpc(Socket, erlang, node, [])), %% just to ensure RPC still works
344    RemotePid = rpc(Socket, erlang, spawn, [forever()]),
345    ?assertEqual([], rpc(Socket, erlang, nodes, [])),
346    ?assertNot(lists:member(Peer, nodes())), %% should be no nodes in the cluster
347    ?assertEqual(ok, rpc(Socket, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])), %% join - in a partition!
348
349    ?assertEqual(ok, rpc(Socket, pg, leave, [?FUNCTION_NAME, '$invisible', RemoteOldPid])),
350    ?assertEqual(ok, rpc(Socket, pg, join, [?FUNCTION_NAME, '$visible', RemoteOldPid])),
351    ?assertEqual([RemoteOldPid], rpc(Socket, pg, get_local_members, [?FUNCTION_NAME, '$visible'])),
352    %% join locally too
353    LocalPid = erlang:spawn(forever()),
354    ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, LocalPid)),
355
356    ?assertNot(lists:member(Peer, nodes())), %% should be no nodes in the cluster
357
358    pong = net_adm:ping(Peer),
359    %% now ensure sync happened
360    Pids = lists:sort([RemotePid, LocalPid]),
361    sync({?FUNCTION_NAME, Peer}),
362    ?assertEqual(Pids, lists:sort(rpc:call(Peer, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME]))),
363    ?assertEqual([RemoteOldPid], pg:get_members(?FUNCTION_NAME, '$visible')),
364    stop_node(Peer, Socket),
365    ok.
366
367trisplit(Config) when is_list(Config) ->
368    {Peer, Socket1} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
369    _PeerPid1 = erlang:spawn(Peer, forever()),
370    PeerPid2 = erlang:spawn(Peer, forever()),
371    ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, three, PeerPid2])),
372    disconnect_nodes([Peer]),
373    ?assertEqual(true, net_kernel:connect_node(Peer)),
374    ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, one, PeerPid2])),
375    %% now ensure sync happened
376    {Peer2, Socket2} = spawn_node(?FUNCTION_NAME, trisplit_second),
377    ?assertEqual(true, rpc:call(Peer2, net_kernel, connect_node, [Peer])),
378    ?assertEqual(lists:sort([node(), Peer]), lists:sort(rpc:call(Peer2, erlang, nodes, []))),
379    ok = rpc:call(Peer2, ?MODULE, ensure_peers_info, [?FUNCTION_NAME, [node(), Peer]]),
380    ?assertEqual([PeerPid2], rpc:call(Peer2, pg, get_members, [?FUNCTION_NAME, one])),
381    stop_node(Peer, Socket1),
382    stop_node(Peer2, Socket2),
383    ok.
384
385foursplit(Config) when is_list(Config) ->
386    Pid = erlang:spawn(forever()),
387    {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
388    ?assertEqual(ok, pg:join(?FUNCTION_NAME, one, Pid)),
389    ?assertEqual(ok, pg:join(?FUNCTION_NAME, two, Pid)),
390    PeerPid1 = spawn(Peer, forever()),
391    ?assertEqual(ok, pg:leave(?FUNCTION_NAME, one, Pid)),
392    ?assertEqual(not_joined, pg:leave(?FUNCTION_NAME, three, Pid)),
393    disconnect_nodes([Peer]),
394    ?assertEqual(ok, rpc(Socket, ?MODULE, stop_proc, [PeerPid1])),
395    ?assertEqual(not_joined, pg:leave(?FUNCTION_NAME, three, Pid)),
396    ?assertEqual(true, net_kernel:connect_node(Peer)),
397    ?assertEqual([], pg:get_members(?FUNCTION_NAME, one)),
398    ?assertEqual([], rpc(Socket, pg, get_members, [?FUNCTION_NAME, one])),
399    stop_node(Peer, Socket),
400    ok.
401
402exchange(Config) when is_list(Config) ->
403    {Peer1, Socket1} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
404    {Peer2, Socket2} = spawn_node(?FUNCTION_NAME, exchange_second),
405    Pids10 = [rpc(Socket1, erlang, spawn, [forever()]) || _ <- lists:seq(1, 10)],
406    Pids2 = [rpc(Socket2, erlang, spawn, [forever()]) || _ <- lists:seq(1, 10)],
407    Pids11 = [rpc(Socket1, erlang, spawn, [forever()]) || _ <- lists:seq(1, 10)],
408    %% kill first 3 pids from node1
409    {PidsToKill, Pids1} = lists:split(3, Pids10),
410
411    ?assertEqual(ok, rpc(Socket1, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, Pids10])),
412    sync({?FUNCTION_NAME, Peer1}), %% Join broadcast have reached local
413    sync(?FUNCTION_NAME), %% Join broadcast has been processed by local
414    ?assertEqual(lists:sort(Pids10), lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))),
415    [rpc(Socket1, ?MODULE, stop_proc, [Pid]) || Pid <- PidsToKill],
416    sync(?FUNCTION_NAME),
417    sync({?FUNCTION_NAME, Peer1}),
418
419    Pids = lists:sort(Pids1 ++ Pids2 ++ Pids11),
420    ?assert(lists:all(fun erlang:is_pid/1, Pids)),
421
422    disconnect_nodes([Peer1, Peer2]),
423
424    sync(?FUNCTION_NAME), %% Processed nodedowns...
425    ?assertEqual([], lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))),
426
427    [?assertEqual(ok, rpc(Socket2, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, Pid])) || Pid <- Pids2],
428    [?assertEqual(ok, rpc(Socket1, pg, join, [?FUNCTION_NAME, second, Pid])) || Pid <- Pids11],
429    ?assertEqual(ok, rpc(Socket1, pg, join, [?FUNCTION_NAME, third, Pids11])),
430    %% rejoin
431    ?assertEqual(true, net_kernel:connect_node(Peer1)),
432    ?assertEqual(true, net_kernel:connect_node(Peer2)),
433    %% need to sleep longer to ensure both nodes made the exchange
434    ensure_peers_info(?FUNCTION_NAME, [Peer1, Peer2]),
435    ?assertEqual(Pids, lists:sort(pg:get_members(?FUNCTION_NAME, second) ++ pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))),
436    ?assertEqual(lists:sort(Pids11), lists:sort(pg:get_members(?FUNCTION_NAME, third))),
437
438    {Left, Stay} = lists:split(3, Pids11),
439    ?assertEqual(ok, rpc(Socket1, pg, leave, [?FUNCTION_NAME, third, Left])),
440    sync({?FUNCTION_NAME, Peer1}),
441    sync(?FUNCTION_NAME),
442    ?assertEqual(lists:sort(Stay), lists:sort(pg:get_members(?FUNCTION_NAME, third))),
443    ?assertEqual(not_joined, rpc(Socket1, pg, leave, [?FUNCTION_NAME, left, Stay])),
444    ?assertEqual(ok, rpc(Socket1, pg, leave, [?FUNCTION_NAME, third, Stay])),
445    sync({?FUNCTION_NAME, Peer1}),
446    sync(?FUNCTION_NAME),
447    ?assertEqual([], lists:sort(pg:get_members(?FUNCTION_NAME, third))),
448    sync({?FUNCTION_NAME, Peer1}),
449    sync(?FUNCTION_NAME),
450
451    stop_node(Peer1, Socket1),
452    stop_node(Peer2, Socket2),
453    ok.
454
455nolocal(Config) when is_list(Config) ->
456    {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
457    RemotePid = spawn(Peer, forever()),
458    ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])),
459    ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])),
460    ?assertEqual([], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
461    stop_node(Peer, Socket),
462    ok.
463
464double(Config) when is_list(Config) ->
465    Pid = erlang:spawn(forever()),
466    ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, Pid)),
467    {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
468    ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, [Pid])),
469    ?assertEqual([Pid, Pid], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
470    sync(?FUNCTION_NAME),
471    sync({?FUNCTION_NAME, Peer}),
472    ?assertEqual([Pid, Pid], rpc:call(Peer, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME])),
473    stop_node(Peer, Socket),
474    ok.
475
476scope_restart(Config) when is_list(Config) ->
477    Pid = erlang:spawn(forever()),
478    ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, [Pid, Pid])),
479    {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
480    RemotePid = spawn(Peer, forever()),
481    ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])),
482    sync({?FUNCTION_NAME, Peer}),
483    ?assertEqual(lists:sort([RemotePid, Pid, Pid]), lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))),
484    %% stop scope locally, and restart
485    gen_server:stop(?FUNCTION_NAME),
486    pg:start(?FUNCTION_NAME),
487    %% ensure remote pids joined, local are missing
488    sync(?FUNCTION_NAME),
489    sync({?FUNCTION_NAME, Peer}),
490    sync(?FUNCTION_NAME),
491    ?assertEqual([RemotePid], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
492    stop_node(Peer, Socket),
493    ok.
494
495missing_scope_join(Config) when is_list(Config) ->
496    {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
497    ?assertEqual(ok, rpc:call(Peer, gen_server, stop, [?FUNCTION_NAME])),
498    RemotePid = spawn(Peer, forever()),
499    ?assertMatch({badrpc, {'EXIT', {noproc, _}}}, rpc:call(Peer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])),
500    ?assertMatch({badrpc, {'EXIT', {noproc, _}}}, rpc:call(Peer, pg, leave, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])),
501    stop_node(Peer, Socket),
502    ok.
503
504disconnected_start(Config) when is_list(Config) ->
505    {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
506    disconnect_nodes([Peer]),
507    ?assertEqual(ok, rpc(Socket, gen_server, stop, [?FUNCTION_NAME])),
508    ?assertMatch({ok, _Pid}, rpc(Socket, pg, start,[?FUNCTION_NAME])),
509    ?assertEqual(ok, rpc(Socket, gen_server, stop, [?FUNCTION_NAME])),
510    RemotePid = rpc(Socket, erlang, spawn, [forever()]),
511    ?assert(is_pid(RemotePid)),
512    stop_node(Peer, Socket),
513    ok.
514
515forced_sync() ->
516    [{doc, "This test was added when lookup_element was erroneously used instead of lookup, crashing pg with badmatch, and it tests rare out-of-order sync operations"}].
517
518forced_sync(Config) when is_list(Config) ->
519    {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
520    Pid = erlang:spawn(forever()),
521    RemotePid = spawn(Peer, forever()),
522    Expected = lists:sort([Pid, RemotePid]),
523    pg:join(?FUNCTION_NAME, one, Pid),
524
525    ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, one, RemotePid])),
526    RemoteScopePid = rpc:call(Peer, erlang, whereis, [?FUNCTION_NAME]),
527    ?assert(is_pid(RemoteScopePid)),
528    %% hohoho, partition!
529    disconnect_nodes([Peer]),
530    ?assertEqual(true, net_kernel:connect_node(Peer)),
531    ensure_peers_info(?FUNCTION_NAME, [Peer]),
532    ?assertEqual(Expected, lists:sort(pg:get_members(?FUNCTION_NAME, one))),
533    %% WARNING: this code uses pg as white-box, exploiting internals,
534    %%  only to simulate broken 'sync'
535    %% Fake Groups: one should disappear, one should be replaced, one stays
536    %% This tests handle_sync function.
537    FakeGroups = [{one, [RemotePid, RemotePid]}, {?FUNCTION_NAME, [RemotePid, RemotePid]}],
538    gen_server:cast(?FUNCTION_NAME, {sync, RemoteScopePid, FakeGroups}),
539    %% ensure it is broken well enough
540    sync(?FUNCTION_NAME),
541    ?assertEqual(lists:sort([RemotePid, RemotePid]), lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))),
542    ?assertEqual(lists:sort([RemotePid, RemotePid, Pid]), lists:sort(pg:get_members(?FUNCTION_NAME, one))),
543    %% simulate force-sync via 'discover' - ask peer to send sync to us
544    {?FUNCTION_NAME, Peer} ! {discover, whereis(?FUNCTION_NAME)},
545    sync({?FUNCTION_NAME, Peer}),
546    sync(?FUNCTION_NAME),
547    ?assertEqual(Expected, lists:sort(pg:get_members(?FUNCTION_NAME, one))),
548    ?assertEqual([], lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))),
549    %% and simulate extra sync
550    sync({?FUNCTION_NAME, Peer}),
551    sync(?FUNCTION_NAME),
552    ?assertEqual(Expected, lists:sort(pg:get_members(?FUNCTION_NAME, one))),
553
554    stop_node(Peer, Socket),
555    ok.
556
557group_leave(Config) when is_list(Config) ->
558    {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
559    RemotePid = erlang:spawn(Peer, forever()),
560    Total = lists:duplicate(16, RemotePid),
561    {Left, Remain} = lists:split(4, Total),
562    %% join 16 times!
563    ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, two, Total])),
564    ?assertEqual(ok, rpc:call(Peer, pg, leave, [?FUNCTION_NAME, two, Left])),
565
566    sync({?FUNCTION_NAME, Peer}),
567    sync(?FUNCTION_NAME),
568    ?assertEqual(Remain, pg:get_members(?FUNCTION_NAME, two)),
569    stop_node(Peer, Socket),
570    sync(?FUNCTION_NAME),
571    ?assertEqual([], pg:get_members(?FUNCTION_NAME, two)),
572    ok.
573
574%%--------------------------------------------------------------------
575%% Test Helpers - start/stop additional Erlang nodes
576
577sync(GS) ->
578    _ = sys:log(GS, get).
579
580ensure_peers_info(Scope, Peers) ->
581    %% Ensures that pg server on local node has gotten info from
582    %% pg servers on all Peer nodes passed as argument (assuming
583    %% no connection failures).
584    %%
585    %% This function assumes that all nodeup messages has been
586    %% delivered to all local recipients (pg server) when called.
587    %%
588    %% Note that this relies on current ERTS implementation; not
589    %% language guarantees.
590    %%
591
592    sync(Scope),
593    %% Known: nodup handled and discover sent to Peer
594
595    lists:foreach(fun (Peer) -> sync({Scope, Peer}) end, Peers),
596    %% Known: nodeup handled by Peers and discover sent to local
597    %% Known: discover received/handled by Peers and sync sent to local
598    %% Known: discover received from Peer
599    %% Known: sync received from Peer
600
601    sync(Scope),
602    %% Known: discover handled from Peers and sync sent to Peers
603    %% Known: sync from Peers handled
604    ok.
605
606-ifdef(CURRENTLY_UNUSED_BUT_SERVES_AS_DOC).
607
608ensure_synced(Scope, Peers) ->
609    %% Ensures that the pg server on local node have synced
610    %% with pg servers on all Peer nodes (assuming no connection
611    %% failures).
612    %%
613    %% This function assumes that all nodeup messages has been
614    %% delivered to all local recipients (pg server) when called.
615    %%
616    %% Note that this relies on current ERTS implementation; not
617    %% language guarantees.
618    %%
619    ensure_peer_info(Scope, Peer),
620    %% Known: local has gotten info from all Peers
621    %% Known: discover from Peers handled and sync sent to Peers
622    lists:foreach(fun (Peer) -> sync({Scope, Peer}) end, Peers),
623    %% Known: sync from local handled by Peers
624    ok.
625
626-endif.
627
628disconnect_nodes(Nodes) ->
629    %% The following is not a language guarantee, but internal
630    %% knowledged about current implementation of ERTS and pg.
631    %%
632    %% The pg server reacts on 'DOWN's via process monitors of
633    %% its peers. These are delivered before 'nodedown's from
634    %% net_kernel:monitor_nodes(). That is, by waiting for
635    %% 'nodedown' from net_kernel:monitor_nodes() we know that
636    %% the 'DOWN' has been delivered to the pg server.
637    %%
638    %% We do this in a separate process to avoid stray
639    %% nodeup/nodedown messages in the test process after
640    %% the operation...
641    F = fun () ->
642                ok = net_kernel:monitor_nodes(true),
643                lists:foreach(fun (Node) ->
644                                      true = erlang:disconnect_node(Node)
645                              end,
646                              Nodes),
647                lists:foreach(fun (Node) ->
648                                      receive {nodedown, Node} -> ok end
649                              end,
650                              Nodes)
651        end,
652    {Pid, Mon} = spawn_monitor(F),
653    receive
654        {'DOWN', Mon, process, Pid, Reason} ->
655            normal = Reason
656    end,
657    ok.
658
659-define (LOCALHOST, {127, 0, 0, 1}).
660
661%% @doc Kills process Pid and waits for it to exit using monitor,
662%%      and yields after (for 1 ms).
663-spec stop_proc(pid()) -> ok.
664stop_proc(Pid) ->
665    monitor(process, Pid),
666    erlang:exit(Pid, kill),
667    receive
668        {'DOWN', _MRef, process, Pid, _Info} ->
669            timer:sleep(1)
670    end.
671
672%% @doc Executes remote call on the node via TCP socket
673%%      Used when dist connection is not available, or
674%%      when it's undesirable to use one.
675-spec rpc(gen_tcp:socket(), module(), atom(), [term()]) -> term().
676rpc(Sock, M, F, A) ->
677    ok = gen_tcp:send(Sock, term_to_binary({call, M, F, A})),
678    inet:setopts(Sock, [{active, once}]),
679    receive
680        {tcp, Sock, Data} ->
681            case binary_to_term(Data) of
682                {ok, Val} ->
683                    Val;
684                {error, Error} ->
685                    {badrpc, Error}
686            end;
687        {tcp_closed, Sock} ->
688            error(closed)
689    end.
690
691%% @doc starts peer node on this host.
692%% Returns spawned node name, and a gen_tcp socket to talk to it using ?MODULE:rpc.
693-spec spawn_node(Scope :: atom(), Node :: atom()) -> {node(), gen_tcp:socket()}.
694spawn_node(Scope, Name) ->
695    Self = self(),
696    Controller = erlang:spawn(?MODULE, controller, [Name, Scope, Self]),
697    receive
698        {'$node_started', Node, Port} ->
699            {ok, Socket} = gen_tcp:connect(?LOCALHOST, Port, [{active, false}, {mode, binary}, {packet, 4}]),
700            Controller ! {socket, Socket},
701            {Node, Socket};
702        Other ->
703            error({start_node, Name, Other})
704    after 60000 ->
705        error({start_node, Name, timeout})
706    end.
707
708%% @private
709-spec controller(atom(), atom(), pid()) -> ok.
710controller(Name, Scope, Self) ->
711    Pa = filename:dirname(code:which(?MODULE)),
712    Pa2 = filename:dirname(code:which(pg)),
713    Args = lists:concat(["-setcookie ", erlang:get_cookie(),
714            "-connect_all false -kernel dist_auto_connect never -noshell -pa ", Pa, " -pa ", Pa2]),
715    {ok, Node} = test_server:start_node(Name, peer, [{args, Args}]),
716    case rpc:call(Node, ?MODULE, control, [Scope], 5000) of
717        {badrpc, nodedown} ->
718            Self ! {badrpc, Node},
719            ok;
720        {Port, _PgPid} ->
721            Self ! {'$node_started', Node, Port},
722            controller_wait()
723    end.
724
725controller_wait() ->
726    Port =
727        receive
728            {socket, Port0} ->
729                Port0
730        end,
731    MRef = monitor(port, Port),
732    receive
733        {'DOWN', MRef, port, Port, _Info} ->
734            ok
735    end.
736
737%% @doc Stops the node previously started with spawn_node,
738%%      and also closes the RPC socket.
739-spec stop_node(node(), gen_tcp:socket()) -> true.
740stop_node(Node, Socket) when Node =/= node() ->
741    true = test_server:stop_node(Node),
742    Socket =/= undefined andalso gen_tcp:close(Socket),
743    true.
744
745forever() ->
746    fun() -> receive after infinity -> ok end end.
747
748
749-spec control(Scope :: atom()) -> {Port :: integer(), pid()}.
750control(Scope) ->
751    Control = self(),
752    erlang:spawn(fun () -> server(Control, Scope) end),
753    receive
754        {port, Port, PgPid} ->
755            {Port, PgPid};
756        Other ->
757            error({error, Other})
758    end.
759
760server(Control, Scope) ->
761    try
762        {ok, Pid} = if Scope =:= undefined -> {ok, undefined}; true -> pg:start(Scope) end,
763        {ok, Listen} = gen_tcp:listen(0, [{mode, binary}, {packet, 4}, {ip, ?LOCALHOST}]),
764        {ok, Port} = inet:port(Listen),
765        Control ! {port, Port, Pid},
766        {ok, Sock} = gen_tcp:accept(Listen),
767        server_loop(Sock)
768    catch
769        Class:Reason:Stack ->
770            Control ! {error, {Class, Reason, Stack}}
771    end.
772
773server_loop(Sock) ->
774    inet:setopts(Sock, [{active, once}]),
775    receive
776        {tcp, Sock, Data} ->
777            {call, M, F, A} = binary_to_term(Data),
778            Ret =
779                try
780                    erlang:apply(M, F, A) of
781                    Res ->
782                        {ok, Res}
783                catch
784                    exit:Reason ->
785                        {error, {'EXIT', Reason}};
786                    error:Reason ->
787                        {error, {'EXIT', Reason}}
788                end,
789            ok = gen_tcp:send(Sock, term_to_binary(Ret)),
790            server_loop(Sock);
791        {tcp_closed, Sock} ->
792            erlang:halt(1)
793    end.
794