1%% This file is a copy of pg2.erl from the R13B-3 Erlang/OTP
2%% distribution, with the following modifications:
3%%
4%% 1) Process groups are node-local only.
5%%
6%% 2) Groups are created/deleted implicitly.
7%%
8%% 3) 'join' and 'leave' are asynchronous.
9%%
10%% 4) the type specs of the exported non-callback functions have been
11%%    extracted into a separate, guarded section, and rewritten in
12%%    old-style spec syntax, for better compatibility with older
13%%    versions of Erlang/OTP. The remaining type specs have been
14%%    removed.
15
16%% All modifications are (C) 2010-2021 VMware, Inc. or its affiliates.
17
18%% %CopyrightBegin%
19%%
20%% Copyright Ericsson AB 1997-2009. All Rights Reserved.
21%%
22%% The contents of this file are subject to the Erlang Public License,
23%% Version 1.1, (the "License"); you may not use this file except in
24%% compliance with the License. You should have received a copy of the
25%% Erlang Public License along with this software. If not, it can be
26%% retrieved online at https://www.erlang.org/.
27%%
28%% Software distributed under the License is distributed on an "AS IS"
29%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
30%% the License for the specific language governing rights and limitations
31%% under the License.
32%%
33%% %CopyrightEnd%
34%%
35-module(pg_local).
36
37-export([join/2, leave/2, get_members/1, in_group/2]).
38%% intended for testing only; not part of official API
39-export([sync/0, clear/0]).
40-export([start/0, start_link/0, init/1, handle_call/3, handle_cast/2,
41         handle_info/2, terminate/2]).
42
43%%----------------------------------------------------------------------------
44
45-type name() :: term().
46
47%%----------------------------------------------------------------------------
48
49-define(TABLE, pg_local_table).
50
51%%%
52%%% Exported functions
53%%%
54
55-spec start_link() -> {'ok', pid()} | {'error', any()}.
56
57start_link() ->
58    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
59
60-spec start() -> {'ok', pid()} | {'error', any()}.
61
62start() ->
63    ensure_started().
64
65-spec join(name(), pid()) -> 'ok'.
66
67join(Name, Pid) when is_pid(Pid) ->
68    _ = ensure_started(),
69    gen_server:cast(?MODULE, {join, Name, Pid}).
70
71-spec leave(name(), pid()) -> 'ok'.
72
73leave(Name, Pid) when is_pid(Pid) ->
74    _ = ensure_started(),
75    gen_server:cast(?MODULE, {leave, Name, Pid}).
76
77-spec get_members(name()) -> [pid()].
78
79get_members(Name) ->
80    _ = ensure_started(),
81    group_members(Name).
82
83-spec in_group(name(), pid()) -> boolean().
84
85in_group(Name, Pid) ->
86    _ = ensure_started(),
87    %% The join message is a cast and thus can race, but we want to
88    %% keep it that way to be fast in the common case.
89    case member_present(Name, Pid) of
90        true  -> true;
91        false -> sync(),
92                 member_present(Name, Pid)
93    end.
94
95-spec sync() -> 'ok'.
96
97sync() ->
98    _ = ensure_started(),
99    gen_server:call(?MODULE, sync, infinity).
100
101clear() ->
102    _ = ensure_started(),
103    gen_server:call(?MODULE, clear, infinity).
104
105%%%
106%%% Callback functions from gen_server
107%%%
108
109-record(state, {}).
110
111init([]) ->
112    ?TABLE = ets:new(?TABLE, [ordered_set, protected, named_table]),
113    {ok, #state{}}.
114
115handle_call(sync, _From, S) ->
116    {reply, ok, S};
117
118handle_call(clear, _From, S) ->
119    ets:delete_all_objects(?TABLE),
120    {reply, ok, S};
121
122handle_call(Request, From, S) ->
123    error_logger:warning_msg("The pg_local server received an unexpected message:\n"
124                             "handle_call(~p, ~p, _)\n",
125                             [Request, From]),
126    {noreply, S}.
127
128handle_cast({join, Name, Pid}, S) ->
129    _ = join_group(Name, Pid),
130    {noreply, S};
131handle_cast({leave, Name, Pid}, S) ->
132    leave_group(Name, Pid),
133    {noreply, S};
134handle_cast(_, S) ->
135    {noreply, S}.
136
137handle_info({'DOWN', MonitorRef, process, Pid, _Info}, S) ->
138    member_died(MonitorRef, Pid),
139    {noreply, S};
140handle_info(_, S) ->
141    {noreply, S}.
142
143terminate(_Reason, _S) ->
144    true = ets:delete(?TABLE),
145    ok.
146
147%%%
148%%% Local functions
149%%%
150
151%%% One ETS table, pg_local_table, is used for bookkeeping. The type of the
152%%% table is ordered_set, and the fast matching of partially
153%%% instantiated keys is used extensively.
154%%%
155%%% {{ref, Pid}, MonitorRef, Counter}
156%%% {{ref, MonitorRef}, Pid}
157%%%    Each process has one monitor. Counter is incremented when the
158%%%    Pid joins some group.
159%%% {{member, Name, Pid}, _}
160%%%    Pid is a member of group Name, GroupCounter is incremented when the
161%%%    Pid joins the group Name.
162%%% {{pid, Pid, Name}}
163%%%    Pid is a member of group Name.
164
165member_died(Ref, Pid) ->
166    case ets:lookup(?TABLE, {ref, Ref}) of
167        [{{ref, Ref}, Pid}] ->
168            leave_all_groups(Pid);
169        %% in case the key has already been removed
170        %% we can clean up using the value from the DOWN message
171        _  ->
172            leave_all_groups(Pid)
173    end,
174    ok.
175
176leave_all_groups(Pid) ->
177    Names = member_groups(Pid),
178    _ = [leave_group(Name, P) ||
179            Name <- Names,
180            P <- member_in_group(Pid, Name)].
181
182join_group(Name, Pid) ->
183    Ref_Pid = {ref, Pid},
184    try _ = ets:update_counter(?TABLE, Ref_Pid, {3, +1})
185    catch _:_ ->
186            Ref = erlang:monitor(process, Pid),
187            true = ets:insert(?TABLE, {Ref_Pid, Ref, 1}),
188            true = ets:insert(?TABLE, {{ref, Ref}, Pid})
189    end,
190    Member_Name_Pid = {member, Name, Pid},
191    try _ = ets:update_counter(?TABLE, Member_Name_Pid, {2, +1})
192    catch _:_ ->
193            true = ets:insert(?TABLE, {Member_Name_Pid, 1}),
194            true = ets:insert(?TABLE, {{pid, Pid, Name}})
195    end.
196
197leave_group(Name, Pid) ->
198    Member_Name_Pid = {member, Name, Pid},
199    try ets:update_counter(?TABLE, Member_Name_Pid, {2, -1}) of
200        N ->
201            if
202                N =:= 0 ->
203                    true = ets:delete(?TABLE, {pid, Pid, Name}),
204                    true = ets:delete(?TABLE, Member_Name_Pid);
205                true ->
206                    ok
207            end,
208            Ref_Pid = {ref, Pid},
209            case ets:update_counter(?TABLE, Ref_Pid, {3, -1}) of
210                0 ->
211                    [{Ref_Pid,Ref,0}] = ets:lookup(?TABLE, Ref_Pid),
212                    true = ets:delete(?TABLE, {ref, Ref}),
213                    true = ets:delete(?TABLE, Ref_Pid),
214                    true = erlang:demonitor(Ref, [flush]),
215                    ok;
216                _ ->
217                    ok
218            end
219    catch _:_ ->
220            ok
221    end.
222
223group_members(Name) ->
224    [P ||
225        [P, N] <- ets:match(?TABLE, {{member, Name, '$1'},'$2'}),
226        _ <- lists:seq(1, N)].
227
228member_in_group(Pid, Name) ->
229    [{{member, Name, Pid}, N}] = ets:lookup(?TABLE, {member, Name, Pid}),
230    lists:duplicate(N, Pid).
231
232member_present(Name, Pid) ->
233    case ets:lookup(?TABLE, {member, Name, Pid}) of
234        [_] -> true;
235        []  -> false
236    end.
237
238member_groups(Pid) ->
239    [Name || [Name] <- ets:match(?TABLE, {{pid, Pid, '$1'}})].
240
241ensure_started() ->
242    case whereis(?MODULE) of
243        undefined ->
244            C = {pg_local, {?MODULE, start_link, []}, permanent,
245                 16#ffffffff, worker, [?MODULE]},
246            supervisor:start_child(kernel_safe_sup, C);
247        PgLocalPid ->
248            {ok, PgLocalPid}
249    end.
250