1%% 2%% 3%% Copyright WhatsApp Inc. and its affiliates. All rights reserved. 4%% 5%% Licensed under the Apache License, Version 2.0 (the "License"); 6%% you may not use this file except in compliance with the License. 7%% You may obtain a copy of the License at 8%% 9%% http://www.apache.org/licenses/LICENSE-2.0 10%% 11%% Unless required by applicable law or agreed to in writing, software 12%% distributed under the License is distributed on an "AS IS" BASIS, 13%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14%% See the License for the specific language governing permissions and 15%% limitations under the License. 16%% 17%%------------------------------------------------------------------- 18%% @author Maxim Fedorov <maximfca@gmail.com> 19%% Process Groups smoke test. 20-module(pg_SUITE). 21-author("maximfca@gmail.com"). 22 23%% Test server callbacks 24-export([ 25 suite/0, 26 all/0, 27 groups/0, 28 init_per_suite/1, 29 end_per_suite/1, 30 init_per_testcase/2, 31 end_per_testcase/2, 32 stop_proc/1, 33 ensure_peers_info/2 34]). 35 36%% Test cases exports 37-export([ 38 pg/0, pg/1, 39 errors/0, errors/1, 40 leave_exit_race/0, leave_exit_race/1, 41 dyn_distribution/0, dyn_distribution/1, 42 process_owner_check/0, process_owner_check/1, 43 overlay_missing/0, overlay_missing/1, 44 single/0, single/1, 45 two/1, 46 empty_group_by_remote_leave/0, empty_group_by_remote_leave/1, 47 thundering_herd/0, thundering_herd/1, 48 initial/1, 49 netsplit/1, 50 trisplit/1, 51 foursplit/1, 52 exchange/1, 53 nolocal/1, 54 double/1, 55 scope_restart/1, 56 missing_scope_join/1, 57 disconnected_start/1, 58 forced_sync/0, forced_sync/1, 59 group_leave/1 60]). 61 62-export([ 63 control/1, 64 controller/3 65]). 66 67-include_lib("common_test/include/ct.hrl"). 68-include_lib("stdlib/include/assert.hrl"). 69 70suite() -> 71 [{timetrap, {seconds, 60}}]. 72 73init_per_suite(Config) -> 74 case erlang:is_alive() of 75 false -> 76 %% verify epmd running (otherwise next call fails) 77 (erl_epmd:names("localhost") =:= {error, address}) andalso ([] = os:cmd("epmd -daemon")), 78 %% start a random node name 79 NodeName = list_to_atom(lists:concat([atom_to_list(?MODULE), "_", os:getpid()])), 80 {ok, Pid} = net_kernel:start([NodeName, shortnames]), 81 [{distribution, Pid} | Config]; 82 true -> 83 Config 84 end. 85 86end_per_suite(Config) -> 87 is_pid(proplists:get_value(distribution, Config)) andalso net_kernel:stop(). 88 89init_per_testcase(TestCase, Config) -> 90 {ok, _Pid} = pg:start_link(TestCase), 91 Config. 92 93end_per_testcase(TestCase, _Config) -> 94 gen_server:stop(TestCase), 95 ok. 96 97all() -> 98 [dyn_distribution, {group, basic}, {group, cluster}, {group, performance}]. 99 100groups() -> 101 [ 102 {basic, [parallel], [errors, pg, leave_exit_race, single, overlay_missing]}, 103 {performance, [sequential], [thundering_herd]}, 104 {cluster, [parallel], [process_owner_check, two, initial, netsplit, trisplit, foursplit, 105 exchange, nolocal, double, scope_restart, missing_scope_join, empty_group_by_remote_leave, 106 disconnected_start, forced_sync, group_leave]} 107 ]. 108 109%%-------------------------------------------------------------------- 110%% TEST CASES 111 112pg() -> 113 [{doc, "This test must be names pg, to stay inline with default scope"}]. 114 115pg(_Config) -> 116 ?assertNotEqual(undefined, whereis(?FUNCTION_NAME)), %% ensure scope was started 117 ?assertEqual(ok, pg:join(?FUNCTION_NAME, self())), 118 ?assertEqual([self()], pg:get_local_members(?FUNCTION_NAME)), 119 ?assertEqual([?FUNCTION_NAME], pg:which_groups()), 120 ?assertEqual([?FUNCTION_NAME], pg:which_local_groups()), 121 ?assertEqual(ok, pg:leave(?FUNCTION_NAME, self())), 122 ?assertEqual([], pg:get_members(?FUNCTION_NAME)), 123 ?assertEqual([], pg:which_groups(?FUNCTION_NAME)), 124 ?assertEqual([], pg:which_local_groups(?FUNCTION_NAME)). 125 126errors() -> 127 [{doc, "Tests that errors are handled as expected, for example pg server crashes when it needs to"}]. 128 129errors(_Config) -> 130 %% kill with 'info' and 'cast' 131 ?assertException(error, badarg, pg:handle_info(garbage, garbage)), 132 ?assertException(error, badarg, pg:handle_cast(garbage, garbage)), 133 %% kill with call 134 {ok, _Pid} = pg:start(second), 135 ?assertException(exit, {{badarg, _}, _}, gen_server:call(second, garbage, 100)). 136 137leave_exit_race() -> 138 [{doc, "Tests that pg correctly handles situation when leave and 'DOWN' messages are both in pg queue"}]. 139 140leave_exit_race(Config) when is_list(Config) -> 141 process_flag(priority, high), 142 [ 143 begin 144 Pid = spawn(fun () -> ok end), 145 pg:join(leave_exit_race, test, Pid), 146 pg:leave(leave_exit_race, test, Pid) 147 end 148 || _ <- lists:seq(1, 100)]. 149 150single() -> 151 [{doc, "Tests single node groups"}, {timetrap, {seconds, 5}}]. 152 153single(Config) when is_list(Config) -> 154 ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, self())), 155 ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, [self(), self()])), 156 ?assertEqual([self(), self(), self()], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)), 157 ?assertEqual([self(), self(), self()], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)), 158 ?assertEqual(not_joined, pg:leave(?FUNCTION_NAME, '$missing$', self())), 159 ?assertEqual(ok, pg:leave(?FUNCTION_NAME, ?FUNCTION_NAME, [self(), self()])), 160 ?assertEqual(ok, pg:leave(?FUNCTION_NAME, ?FUNCTION_NAME, self())), 161 ?assertEqual([], pg:which_groups(?FUNCTION_NAME)), 162 ?assertEqual([], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)), 163 ?assertEqual([], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)), 164 %% double 165 ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, self())), 166 Pid = erlang:spawn(forever()), 167 ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, Pid)), 168 Expected = lists:sort([Pid, self()]), 169 ?assertEqual(Expected, lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))), 170 ?assertEqual(Expected, lists:sort(pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME))), 171 172 stop_proc(Pid), 173 sync(?FUNCTION_NAME), 174 ?assertEqual([self()], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)), 175 ?assertEqual(ok, pg:leave(?FUNCTION_NAME, ?FUNCTION_NAME, self())), 176 ok. 177 178dyn_distribution() -> 179 [{doc, "Tests that local node when distribution is started dynamically is not treated as remote node"}]. 180 181dyn_distribution(Config) when is_list(Config) -> 182 %% When distribution is started or stopped dynamically, 183 %% there is a nodeup/nodedown message delivered to pg 184 %% It is possible but non-trivial to simulate this 185 %% behaviour with starting slave nodes being not 186 %% distributed, and calling net_kernel:start/1, however 187 %% the effect is still the same as simply sending nodeup, 188 %% which is also documented. 189 ?FUNCTION_NAME ! {nodeup, node()}, 190 %% 191 ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, self())), 192 ?assertEqual([self()], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)), 193 ok. 194 195process_owner_check() -> 196 [{doc, "Tests that process owner is local node"}]. 197 198process_owner_check(Config) when is_list(Config) -> 199 {TwoPeer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), 200 %% spawn remote process 201 LocalPid = erlang:spawn(forever()), 202 RemotePid = erlang:spawn(TwoPeer, forever()), 203 %% check they can't be joined locally 204 ?assertException(error, {nolocal, _}, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid)), 205 ?assertException(error, {nolocal, _}, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, [RemotePid, RemotePid])), 206 ?assertException(error, {nolocal, _}, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, [LocalPid, RemotePid])), 207 %% check that non-pid also triggers error 208 ?assertException(error, function_clause, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, undefined)), 209 ?assertException(error, {nolocal, _}, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, [undefined])), 210 %% stop the peer 211 stop_node(TwoPeer, Socket), 212 ok. 213 214overlay_missing() -> 215 [{doc, "Tests that scope process that is not a part of overlay network does not change state"}]. 216 217overlay_missing(_Config) -> 218 {TwoPeer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), 219 %% join self (sanity check) 220 ?assertEqual(ok, pg:join(?FUNCTION_NAME, group, self())), 221 %% remember pid from remote 222 PgPid = rpc:call(TwoPeer, erlang, whereis, [?FUNCTION_NAME]), 223 RemotePid = erlang:spawn(TwoPeer, forever()), 224 %% stop remote scope 225 gen_server:stop(PgPid), 226 %% craft white-box request: ensure it's rejected 227 ?FUNCTION_NAME ! {join, PgPid, group, RemotePid}, 228 %% rejected! 229 ?assertEqual([self()], pg:get_members(?FUNCTION_NAME, group)), 230 %% ... reject leave too 231 ?FUNCTION_NAME ! {leave, PgPid, RemotePid, [group]}, 232 ?assertEqual([self()], pg:get_members(?FUNCTION_NAME, group)), 233 %% join many times on remote 234 %RemotePids = [erlang:spawn(TwoPeer, forever()) || _ <- lists:seq(1, 1024)], 235 %?assertEqual(ok, rpc:call(TwoPeer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, Pid2])), 236 %% check they can't be joined locally 237 %?assertException(error, {nolocal, _}, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid)), 238 %?assertException(error, {nolocal, _}, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, [RemotePid, RemotePid])), 239 %?assertException(error, {nolocal, _}, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, [LocalPid, RemotePid])), 240 %% check that non-pid also triggers error 241 %?assertException(error, function_clause, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, undefined)), 242 %?assertException(error, {nolocal, _}, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, [undefined])), 243 %% stop the peer 244 stop_node(TwoPeer, Socket). 245 246 247two(Config) when is_list(Config) -> 248 {TwoPeer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), 249 Pid = erlang:spawn(forever()), 250 ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, Pid)), 251 ?assertEqual([Pid], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)), 252 %% first RPC must be serialised 253 sync({?FUNCTION_NAME, TwoPeer}), 254 ?assertEqual([Pid], rpc:call(TwoPeer, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME])), 255 ?assertEqual([], rpc:call(TwoPeer, pg, get_local_members, [?FUNCTION_NAME, ?FUNCTION_NAME])), 256 stop_proc(Pid), 257 %% again, must be serialised 258 sync(?FUNCTION_NAME), 259 ?assertEqual([], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)), 260 ?assertEqual([], rpc:call(TwoPeer, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME])), 261 262 Pid2 = erlang:spawn(TwoPeer, forever()), 263 Pid3 = erlang:spawn(TwoPeer, forever()), 264 ?assertEqual(ok, rpc:call(TwoPeer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, Pid2])), 265 ?assertEqual(ok, rpc:call(TwoPeer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, Pid3])), 266 %% serialise through the *other* node 267 sync({?FUNCTION_NAME, TwoPeer}), 268 ?assertEqual(lists:sort([Pid2, Pid3]), 269 lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))), 270 %% stop the peer 271 stop_node(TwoPeer, Socket), 272 %% hope that 'nodedown' comes before we route our request 273 sync(?FUNCTION_NAME), 274 ok. 275 276empty_group_by_remote_leave() -> 277 [{doc, "Empty group should be deleted from nodes."}]. 278 279empty_group_by_remote_leave(Config) when is_list(Config) -> 280 {TwoPeer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), 281 RemoteNode = rpc:call(TwoPeer, erlang, whereis, [?FUNCTION_NAME]), 282 RemotePid = erlang:spawn(TwoPeer, forever()), 283 % remote join 284 ?assertEqual(ok, rpc:call(TwoPeer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])), 285 sync({?FUNCTION_NAME, TwoPeer}), 286 ?assertEqual([RemotePid], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)), 287 % inspecting internal state is not best practice, but there's no other way to check if the state is correct. 288 {state, _, _, #{RemoteNode := {_, RemoteMap}}} = sys:get_state(?FUNCTION_NAME), 289 ?assertEqual(#{?FUNCTION_NAME => [RemotePid]}, RemoteMap), 290 % remote leave 291 ?assertEqual(ok, rpc:call(TwoPeer, pg, leave, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])), 292 sync({?FUNCTION_NAME, TwoPeer}), 293 ?assertEqual([], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)), 294 {state, _, _, #{RemoteNode := {_, NewRemoteMap}}} = sys:get_state(?FUNCTION_NAME), 295 % empty group should be deleted. 296 ?assertEqual(#{}, NewRemoteMap), 297 298 stop_node(TwoPeer, Socket), 299 ok. 300 301thundering_herd() -> 302 [{doc, "Thousands of overlay network nodes sending sync to us, and we time out!"}, {timetrap, {seconds, 5}}]. 303 304thundering_herd(Config) when is_list(Config) -> 305 GroupCount = 10000, 306 SyncCount = 2000, 307 %% make up a large amount of groups 308 [pg:join(?FUNCTION_NAME, {group, Seq}, self()) || Seq <- lists:seq(1, GroupCount)], 309 %% initiate a few syncs - and those are really slow... 310 {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), 311 PeerPid = erlang:spawn(Peer, forever()), 312 PeerPg = rpc:call(Peer, erlang, whereis, [?FUNCTION_NAME], 1000), 313 %% WARNING: code below acts for white-box! %% WARNING 314 FakeSync = [{{group, 1}, [PeerPid, PeerPid]}], 315 [gen_server:cast(?FUNCTION_NAME, {sync, PeerPg, FakeSync}) || _ <- lists:seq(1, SyncCount)], 316 %% next call must not timetrap, otherwise test fails 317 pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, self()), 318 stop_node(Peer, Socket). 319 320initial(Config) when is_list(Config) -> 321 Pid = erlang:spawn(forever()), 322 ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, Pid)), 323 ?assertEqual([Pid], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)), 324 {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), 325 %% first RPC must be serialised 326 sync({?FUNCTION_NAME, Peer}), 327 ?assertEqual([Pid], rpc:call(Peer, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME])), 328 329 ?assertEqual([], rpc:call(Peer, pg, get_local_members, [?FUNCTION_NAME, ?FUNCTION_NAME])), 330 stop_proc(Pid), 331 sync({?FUNCTION_NAME, Peer}), 332 ?assertEqual([], rpc:call(Peer, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME])), 333 stop_node(Peer, Socket), 334 ok. 335 336netsplit(Config) when is_list(Config) -> 337 {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), 338 ?assertEqual(Peer, rpc(Socket, erlang, node, [])), %% just to test RPC 339 RemoteOldPid = erlang:spawn(Peer, forever()), 340 ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, '$invisible', RemoteOldPid])), 341 %% hohoho, partition! 342 disconnect_nodes([Peer]), 343 ?assertEqual(Peer, rpc(Socket, erlang, node, [])), %% just to ensure RPC still works 344 RemotePid = rpc(Socket, erlang, spawn, [forever()]), 345 ?assertEqual([], rpc(Socket, erlang, nodes, [])), 346 ?assertNot(lists:member(Peer, nodes())), %% should be no nodes in the cluster 347 ?assertEqual(ok, rpc(Socket, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])), %% join - in a partition! 348 349 ?assertEqual(ok, rpc(Socket, pg, leave, [?FUNCTION_NAME, '$invisible', RemoteOldPid])), 350 ?assertEqual(ok, rpc(Socket, pg, join, [?FUNCTION_NAME, '$visible', RemoteOldPid])), 351 ?assertEqual([RemoteOldPid], rpc(Socket, pg, get_local_members, [?FUNCTION_NAME, '$visible'])), 352 %% join locally too 353 LocalPid = erlang:spawn(forever()), 354 ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, LocalPid)), 355 356 ?assertNot(lists:member(Peer, nodes())), %% should be no nodes in the cluster 357 358 pong = net_adm:ping(Peer), 359 %% now ensure sync happened 360 Pids = lists:sort([RemotePid, LocalPid]), 361 sync({?FUNCTION_NAME, Peer}), 362 ?assertEqual(Pids, lists:sort(rpc:call(Peer, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME]))), 363 ?assertEqual([RemoteOldPid], pg:get_members(?FUNCTION_NAME, '$visible')), 364 stop_node(Peer, Socket), 365 ok. 366 367trisplit(Config) when is_list(Config) -> 368 {Peer, Socket1} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), 369 _PeerPid1 = erlang:spawn(Peer, forever()), 370 PeerPid2 = erlang:spawn(Peer, forever()), 371 ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, three, PeerPid2])), 372 disconnect_nodes([Peer]), 373 ?assertEqual(true, net_kernel:connect_node(Peer)), 374 ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, one, PeerPid2])), 375 %% now ensure sync happened 376 {Peer2, Socket2} = spawn_node(?FUNCTION_NAME, trisplit_second), 377 ?assertEqual(true, rpc:call(Peer2, net_kernel, connect_node, [Peer])), 378 ?assertEqual(lists:sort([node(), Peer]), lists:sort(rpc:call(Peer2, erlang, nodes, []))), 379 ok = rpc:call(Peer2, ?MODULE, ensure_peers_info, [?FUNCTION_NAME, [node(), Peer]]), 380 ?assertEqual([PeerPid2], rpc:call(Peer2, pg, get_members, [?FUNCTION_NAME, one])), 381 stop_node(Peer, Socket1), 382 stop_node(Peer2, Socket2), 383 ok. 384 385foursplit(Config) when is_list(Config) -> 386 Pid = erlang:spawn(forever()), 387 {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), 388 ?assertEqual(ok, pg:join(?FUNCTION_NAME, one, Pid)), 389 ?assertEqual(ok, pg:join(?FUNCTION_NAME, two, Pid)), 390 PeerPid1 = spawn(Peer, forever()), 391 ?assertEqual(ok, pg:leave(?FUNCTION_NAME, one, Pid)), 392 ?assertEqual(not_joined, pg:leave(?FUNCTION_NAME, three, Pid)), 393 disconnect_nodes([Peer]), 394 ?assertEqual(ok, rpc(Socket, ?MODULE, stop_proc, [PeerPid1])), 395 ?assertEqual(not_joined, pg:leave(?FUNCTION_NAME, three, Pid)), 396 ?assertEqual(true, net_kernel:connect_node(Peer)), 397 ?assertEqual([], pg:get_members(?FUNCTION_NAME, one)), 398 ?assertEqual([], rpc(Socket, pg, get_members, [?FUNCTION_NAME, one])), 399 stop_node(Peer, Socket), 400 ok. 401 402exchange(Config) when is_list(Config) -> 403 {Peer1, Socket1} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), 404 {Peer2, Socket2} = spawn_node(?FUNCTION_NAME, exchange_second), 405 Pids10 = [rpc(Socket1, erlang, spawn, [forever()]) || _ <- lists:seq(1, 10)], 406 Pids2 = [rpc(Socket2, erlang, spawn, [forever()]) || _ <- lists:seq(1, 10)], 407 Pids11 = [rpc(Socket1, erlang, spawn, [forever()]) || _ <- lists:seq(1, 10)], 408 %% kill first 3 pids from node1 409 {PidsToKill, Pids1} = lists:split(3, Pids10), 410 411 ?assertEqual(ok, rpc(Socket1, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, Pids10])), 412 sync({?FUNCTION_NAME, Peer1}), %% Join broadcast have reached local 413 sync(?FUNCTION_NAME), %% Join broadcast has been processed by local 414 ?assertEqual(lists:sort(Pids10), lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))), 415 [rpc(Socket1, ?MODULE, stop_proc, [Pid]) || Pid <- PidsToKill], 416 sync(?FUNCTION_NAME), 417 sync({?FUNCTION_NAME, Peer1}), 418 419 Pids = lists:sort(Pids1 ++ Pids2 ++ Pids11), 420 ?assert(lists:all(fun erlang:is_pid/1, Pids)), 421 422 disconnect_nodes([Peer1, Peer2]), 423 424 sync(?FUNCTION_NAME), %% Processed nodedowns... 425 ?assertEqual([], lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))), 426 427 [?assertEqual(ok, rpc(Socket2, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, Pid])) || Pid <- Pids2], 428 [?assertEqual(ok, rpc(Socket1, pg, join, [?FUNCTION_NAME, second, Pid])) || Pid <- Pids11], 429 ?assertEqual(ok, rpc(Socket1, pg, join, [?FUNCTION_NAME, third, Pids11])), 430 %% rejoin 431 ?assertEqual(true, net_kernel:connect_node(Peer1)), 432 ?assertEqual(true, net_kernel:connect_node(Peer2)), 433 %% need to sleep longer to ensure both nodes made the exchange 434 ensure_peers_info(?FUNCTION_NAME, [Peer1, Peer2]), 435 ?assertEqual(Pids, lists:sort(pg:get_members(?FUNCTION_NAME, second) ++ pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))), 436 ?assertEqual(lists:sort(Pids11), lists:sort(pg:get_members(?FUNCTION_NAME, third))), 437 438 {Left, Stay} = lists:split(3, Pids11), 439 ?assertEqual(ok, rpc(Socket1, pg, leave, [?FUNCTION_NAME, third, Left])), 440 sync({?FUNCTION_NAME, Peer1}), 441 sync(?FUNCTION_NAME), 442 ?assertEqual(lists:sort(Stay), lists:sort(pg:get_members(?FUNCTION_NAME, third))), 443 ?assertEqual(not_joined, rpc(Socket1, pg, leave, [?FUNCTION_NAME, left, Stay])), 444 ?assertEqual(ok, rpc(Socket1, pg, leave, [?FUNCTION_NAME, third, Stay])), 445 sync({?FUNCTION_NAME, Peer1}), 446 sync(?FUNCTION_NAME), 447 ?assertEqual([], lists:sort(pg:get_members(?FUNCTION_NAME, third))), 448 sync({?FUNCTION_NAME, Peer1}), 449 sync(?FUNCTION_NAME), 450 451 stop_node(Peer1, Socket1), 452 stop_node(Peer2, Socket2), 453 ok. 454 455nolocal(Config) when is_list(Config) -> 456 {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), 457 RemotePid = spawn(Peer, forever()), 458 ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])), 459 ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])), 460 ?assertEqual([], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)), 461 stop_node(Peer, Socket), 462 ok. 463 464double(Config) when is_list(Config) -> 465 Pid = erlang:spawn(forever()), 466 ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, Pid)), 467 {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), 468 ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, [Pid])), 469 ?assertEqual([Pid, Pid], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)), 470 sync(?FUNCTION_NAME), 471 sync({?FUNCTION_NAME, Peer}), 472 ?assertEqual([Pid, Pid], rpc:call(Peer, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME])), 473 stop_node(Peer, Socket), 474 ok. 475 476scope_restart(Config) when is_list(Config) -> 477 Pid = erlang:spawn(forever()), 478 ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, [Pid, Pid])), 479 {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), 480 RemotePid = spawn(Peer, forever()), 481 ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])), 482 sync({?FUNCTION_NAME, Peer}), 483 ?assertEqual(lists:sort([RemotePid, Pid, Pid]), lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))), 484 %% stop scope locally, and restart 485 gen_server:stop(?FUNCTION_NAME), 486 pg:start(?FUNCTION_NAME), 487 %% ensure remote pids joined, local are missing 488 sync(?FUNCTION_NAME), 489 sync({?FUNCTION_NAME, Peer}), 490 sync(?FUNCTION_NAME), 491 ?assertEqual([RemotePid], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)), 492 stop_node(Peer, Socket), 493 ok. 494 495missing_scope_join(Config) when is_list(Config) -> 496 {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), 497 ?assertEqual(ok, rpc:call(Peer, gen_server, stop, [?FUNCTION_NAME])), 498 RemotePid = spawn(Peer, forever()), 499 ?assertMatch({badrpc, {'EXIT', {noproc, _}}}, rpc:call(Peer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])), 500 ?assertMatch({badrpc, {'EXIT', {noproc, _}}}, rpc:call(Peer, pg, leave, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])), 501 stop_node(Peer, Socket), 502 ok. 503 504disconnected_start(Config) when is_list(Config) -> 505 {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), 506 disconnect_nodes([Peer]), 507 ?assertEqual(ok, rpc(Socket, gen_server, stop, [?FUNCTION_NAME])), 508 ?assertMatch({ok, _Pid}, rpc(Socket, pg, start,[?FUNCTION_NAME])), 509 ?assertEqual(ok, rpc(Socket, gen_server, stop, [?FUNCTION_NAME])), 510 RemotePid = rpc(Socket, erlang, spawn, [forever()]), 511 ?assert(is_pid(RemotePid)), 512 stop_node(Peer, Socket), 513 ok. 514 515forced_sync() -> 516 [{doc, "This test was added when lookup_element was erroneously used instead of lookup, crashing pg with badmatch, and it tests rare out-of-order sync operations"}]. 517 518forced_sync(Config) when is_list(Config) -> 519 {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), 520 Pid = erlang:spawn(forever()), 521 RemotePid = spawn(Peer, forever()), 522 Expected = lists:sort([Pid, RemotePid]), 523 pg:join(?FUNCTION_NAME, one, Pid), 524 525 ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, one, RemotePid])), 526 RemoteScopePid = rpc:call(Peer, erlang, whereis, [?FUNCTION_NAME]), 527 ?assert(is_pid(RemoteScopePid)), 528 %% hohoho, partition! 529 disconnect_nodes([Peer]), 530 ?assertEqual(true, net_kernel:connect_node(Peer)), 531 ensure_peers_info(?FUNCTION_NAME, [Peer]), 532 ?assertEqual(Expected, lists:sort(pg:get_members(?FUNCTION_NAME, one))), 533 %% WARNING: this code uses pg as white-box, exploiting internals, 534 %% only to simulate broken 'sync' 535 %% Fake Groups: one should disappear, one should be replaced, one stays 536 %% This tests handle_sync function. 537 FakeGroups = [{one, [RemotePid, RemotePid]}, {?FUNCTION_NAME, [RemotePid, RemotePid]}], 538 gen_server:cast(?FUNCTION_NAME, {sync, RemoteScopePid, FakeGroups}), 539 %% ensure it is broken well enough 540 sync(?FUNCTION_NAME), 541 ?assertEqual(lists:sort([RemotePid, RemotePid]), lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))), 542 ?assertEqual(lists:sort([RemotePid, RemotePid, Pid]), lists:sort(pg:get_members(?FUNCTION_NAME, one))), 543 %% simulate force-sync via 'discover' - ask peer to send sync to us 544 {?FUNCTION_NAME, Peer} ! {discover, whereis(?FUNCTION_NAME)}, 545 sync({?FUNCTION_NAME, Peer}), 546 sync(?FUNCTION_NAME), 547 ?assertEqual(Expected, lists:sort(pg:get_members(?FUNCTION_NAME, one))), 548 ?assertEqual([], lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))), 549 %% and simulate extra sync 550 sync({?FUNCTION_NAME, Peer}), 551 sync(?FUNCTION_NAME), 552 ?assertEqual(Expected, lists:sort(pg:get_members(?FUNCTION_NAME, one))), 553 554 stop_node(Peer, Socket), 555 ok. 556 557group_leave(Config) when is_list(Config) -> 558 {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME), 559 RemotePid = erlang:spawn(Peer, forever()), 560 Total = lists:duplicate(16, RemotePid), 561 {Left, Remain} = lists:split(4, Total), 562 %% join 16 times! 563 ?assertEqual(ok, rpc:call(Peer, pg, join, [?FUNCTION_NAME, two, Total])), 564 ?assertEqual(ok, rpc:call(Peer, pg, leave, [?FUNCTION_NAME, two, Left])), 565 566 sync({?FUNCTION_NAME, Peer}), 567 sync(?FUNCTION_NAME), 568 ?assertEqual(Remain, pg:get_members(?FUNCTION_NAME, two)), 569 stop_node(Peer, Socket), 570 sync(?FUNCTION_NAME), 571 ?assertEqual([], pg:get_members(?FUNCTION_NAME, two)), 572 ok. 573 574%%-------------------------------------------------------------------- 575%% Test Helpers - start/stop additional Erlang nodes 576 577sync(GS) -> 578 _ = sys:log(GS, get). 579 580ensure_peers_info(Scope, Peers) -> 581 %% Ensures that pg server on local node has gotten info from 582 %% pg servers on all Peer nodes passed as argument (assuming 583 %% no connection failures). 584 %% 585 %% This function assumes that all nodeup messages has been 586 %% delivered to all local recipients (pg server) when called. 587 %% 588 %% Note that this relies on current ERTS implementation; not 589 %% language guarantees. 590 %% 591 592 sync(Scope), 593 %% Known: nodup handled and discover sent to Peer 594 595 lists:foreach(fun (Peer) -> sync({Scope, Peer}) end, Peers), 596 %% Known: nodeup handled by Peers and discover sent to local 597 %% Known: discover received/handled by Peers and sync sent to local 598 %% Known: discover received from Peer 599 %% Known: sync received from Peer 600 601 sync(Scope), 602 %% Known: discover handled from Peers and sync sent to Peers 603 %% Known: sync from Peers handled 604 ok. 605 606-ifdef(CURRENTLY_UNUSED_BUT_SERVES_AS_DOC). 607 608ensure_synced(Scope, Peers) -> 609 %% Ensures that the pg server on local node have synced 610 %% with pg servers on all Peer nodes (assuming no connection 611 %% failures). 612 %% 613 %% This function assumes that all nodeup messages has been 614 %% delivered to all local recipients (pg server) when called. 615 %% 616 %% Note that this relies on current ERTS implementation; not 617 %% language guarantees. 618 %% 619 ensure_peer_info(Scope, Peer), 620 %% Known: local has gotten info from all Peers 621 %% Known: discover from Peers handled and sync sent to Peers 622 lists:foreach(fun (Peer) -> sync({Scope, Peer}) end, Peers), 623 %% Known: sync from local handled by Peers 624 ok. 625 626-endif. 627 628disconnect_nodes(Nodes) -> 629 %% The following is not a language guarantee, but internal 630 %% knowledged about current implementation of ERTS and pg. 631 %% 632 %% The pg server reacts on 'DOWN's via process monitors of 633 %% its peers. These are delivered before 'nodedown's from 634 %% net_kernel:monitor_nodes(). That is, by waiting for 635 %% 'nodedown' from net_kernel:monitor_nodes() we know that 636 %% the 'DOWN' has been delivered to the pg server. 637 %% 638 %% We do this in a separate process to avoid stray 639 %% nodeup/nodedown messages in the test process after 640 %% the operation... 641 F = fun () -> 642 ok = net_kernel:monitor_nodes(true), 643 lists:foreach(fun (Node) -> 644 true = erlang:disconnect_node(Node) 645 end, 646 Nodes), 647 lists:foreach(fun (Node) -> 648 receive {nodedown, Node} -> ok end 649 end, 650 Nodes) 651 end, 652 {Pid, Mon} = spawn_monitor(F), 653 receive 654 {'DOWN', Mon, process, Pid, Reason} -> 655 normal = Reason 656 end, 657 ok. 658 659-define (LOCALHOST, {127, 0, 0, 1}). 660 661%% @doc Kills process Pid and waits for it to exit using monitor, 662%% and yields after (for 1 ms). 663-spec stop_proc(pid()) -> ok. 664stop_proc(Pid) -> 665 monitor(process, Pid), 666 erlang:exit(Pid, kill), 667 receive 668 {'DOWN', _MRef, process, Pid, _Info} -> 669 timer:sleep(1) 670 end. 671 672%% @doc Executes remote call on the node via TCP socket 673%% Used when dist connection is not available, or 674%% when it's undesirable to use one. 675-spec rpc(gen_tcp:socket(), module(), atom(), [term()]) -> term(). 676rpc(Sock, M, F, A) -> 677 ok = gen_tcp:send(Sock, term_to_binary({call, M, F, A})), 678 inet:setopts(Sock, [{active, once}]), 679 receive 680 {tcp, Sock, Data} -> 681 case binary_to_term(Data) of 682 {ok, Val} -> 683 Val; 684 {error, Error} -> 685 {badrpc, Error} 686 end; 687 {tcp_closed, Sock} -> 688 error(closed) 689 end. 690 691%% @doc starts peer node on this host. 692%% Returns spawned node name, and a gen_tcp socket to talk to it using ?MODULE:rpc. 693-spec spawn_node(Scope :: atom(), Node :: atom()) -> {node(), gen_tcp:socket()}. 694spawn_node(Scope, Name) -> 695 Self = self(), 696 Controller = erlang:spawn(?MODULE, controller, [Name, Scope, Self]), 697 receive 698 {'$node_started', Node, Port} -> 699 {ok, Socket} = gen_tcp:connect(?LOCALHOST, Port, [{active, false}, {mode, binary}, {packet, 4}]), 700 Controller ! {socket, Socket}, 701 {Node, Socket}; 702 Other -> 703 error({start_node, Name, Other}) 704 after 60000 -> 705 error({start_node, Name, timeout}) 706 end. 707 708%% @private 709-spec controller(atom(), atom(), pid()) -> ok. 710controller(Name, Scope, Self) -> 711 Pa = filename:dirname(code:which(?MODULE)), 712 Pa2 = filename:dirname(code:which(pg)), 713 Args = lists:concat(["-setcookie ", erlang:get_cookie(), 714 "-connect_all false -kernel dist_auto_connect never -noshell -pa ", Pa, " -pa ", Pa2]), 715 {ok, Node} = test_server:start_node(Name, peer, [{args, Args}]), 716 case rpc:call(Node, ?MODULE, control, [Scope], 5000) of 717 {badrpc, nodedown} -> 718 Self ! {badrpc, Node}, 719 ok; 720 {Port, _PgPid} -> 721 Self ! {'$node_started', Node, Port}, 722 controller_wait() 723 end. 724 725controller_wait() -> 726 Port = 727 receive 728 {socket, Port0} -> 729 Port0 730 end, 731 MRef = monitor(port, Port), 732 receive 733 {'DOWN', MRef, port, Port, _Info} -> 734 ok 735 end. 736 737%% @doc Stops the node previously started with spawn_node, 738%% and also closes the RPC socket. 739-spec stop_node(node(), gen_tcp:socket()) -> true. 740stop_node(Node, Socket) when Node =/= node() -> 741 true = test_server:stop_node(Node), 742 Socket =/= undefined andalso gen_tcp:close(Socket), 743 true. 744 745forever() -> 746 fun() -> receive after infinity -> ok end end. 747 748 749-spec control(Scope :: atom()) -> {Port :: integer(), pid()}. 750control(Scope) -> 751 Control = self(), 752 erlang:spawn(fun () -> server(Control, Scope) end), 753 receive 754 {port, Port, PgPid} -> 755 {Port, PgPid}; 756 Other -> 757 error({error, Other}) 758 end. 759 760server(Control, Scope) -> 761 try 762 {ok, Pid} = if Scope =:= undefined -> {ok, undefined}; true -> pg:start(Scope) end, 763 {ok, Listen} = gen_tcp:listen(0, [{mode, binary}, {packet, 4}, {ip, ?LOCALHOST}]), 764 {ok, Port} = inet:port(Listen), 765 Control ! {port, Port, Pid}, 766 {ok, Sock} = gen_tcp:accept(Listen), 767 server_loop(Sock) 768 catch 769 Class:Reason:Stack -> 770 Control ! {error, {Class, Reason, Stack}} 771 end. 772 773server_loop(Sock) -> 774 inet:setopts(Sock, [{active, once}]), 775 receive 776 {tcp, Sock, Data} -> 777 {call, M, F, A} = binary_to_term(Data), 778 Ret = 779 try 780 erlang:apply(M, F, A) of 781 Res -> 782 {ok, Res} 783 catch 784 exit:Reason -> 785 {error, {'EXIT', Reason}}; 786 error:Reason -> 787 {error, {'EXIT', Reason}} 788 end, 789 ok = gen_tcp:send(Sock, term_to_binary(Ret)), 790 server_loop(Sock); 791 {tcp_closed, Sock} -> 792 erlang:halt(1) 793 end. 794