1%% This file is a copy of pg2.erl from the R13B-3 Erlang/OTP 2%% distribution, with the following modifications: 3%% 4%% 1) Process groups are node-local only. 5%% 6%% 2) Groups are created/deleted implicitly. 7%% 8%% 3) 'join' and 'leave' are asynchronous. 9%% 10%% 4) the type specs of the exported non-callback functions have been 11%% extracted into a separate, guarded section, and rewritten in 12%% old-style spec syntax, for better compatibility with older 13%% versions of Erlang/OTP. The remaining type specs have been 14%% removed. 15 16%% All modifications are (C) 2010-2021 VMware, Inc. or its affiliates. 17 18%% %CopyrightBegin% 19%% 20%% Copyright Ericsson AB 1997-2009. All Rights Reserved. 21%% 22%% The contents of this file are subject to the Erlang Public License, 23%% Version 1.1, (the "License"); you may not use this file except in 24%% compliance with the License. You should have received a copy of the 25%% Erlang Public License along with this software. If not, it can be 26%% retrieved online at https://www.erlang.org/. 27%% 28%% Software distributed under the License is distributed on an "AS IS" 29%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See 30%% the License for the specific language governing rights and limitations 31%% under the License. 32%% 33%% %CopyrightEnd% 34%% 35-module(pg_local). 36 37-export([join/2, leave/2, get_members/1, in_group/2]). 38%% intended for testing only; not part of official API 39-export([sync/0, clear/0]). 40-export([start/0, start_link/0, init/1, handle_call/3, handle_cast/2, 41 handle_info/2, terminate/2]). 42 43%%---------------------------------------------------------------------------- 44 45-type name() :: term(). 46 47%%---------------------------------------------------------------------------- 48 49-define(TABLE, pg_local_table). 50 51%%% 52%%% Exported functions 53%%% 54 55-spec start_link() -> {'ok', pid()} | {'error', any()}. 56 57start_link() -> 58 gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). 59 60-spec start() -> {'ok', pid()} | {'error', any()}. 61 62start() -> 63 ensure_started(). 64 65-spec join(name(), pid()) -> 'ok'. 66 67join(Name, Pid) when is_pid(Pid) -> 68 _ = ensure_started(), 69 gen_server:cast(?MODULE, {join, Name, Pid}). 70 71-spec leave(name(), pid()) -> 'ok'. 72 73leave(Name, Pid) when is_pid(Pid) -> 74 _ = ensure_started(), 75 gen_server:cast(?MODULE, {leave, Name, Pid}). 76 77-spec get_members(name()) -> [pid()]. 78 79get_members(Name) -> 80 _ = ensure_started(), 81 group_members(Name). 82 83-spec in_group(name(), pid()) -> boolean(). 84 85in_group(Name, Pid) -> 86 _ = ensure_started(), 87 %% The join message is a cast and thus can race, but we want to 88 %% keep it that way to be fast in the common case. 89 case member_present(Name, Pid) of 90 true -> true; 91 false -> sync(), 92 member_present(Name, Pid) 93 end. 94 95-spec sync() -> 'ok'. 96 97sync() -> 98 _ = ensure_started(), 99 gen_server:call(?MODULE, sync, infinity). 100 101clear() -> 102 _ = ensure_started(), 103 gen_server:call(?MODULE, clear, infinity). 104 105%%% 106%%% Callback functions from gen_server 107%%% 108 109-record(state, {}). 110 111init([]) -> 112 ?TABLE = ets:new(?TABLE, [ordered_set, protected, named_table]), 113 {ok, #state{}}. 114 115handle_call(sync, _From, S) -> 116 {reply, ok, S}; 117 118handle_call(clear, _From, S) -> 119 ets:delete_all_objects(?TABLE), 120 {reply, ok, S}; 121 122handle_call(Request, From, S) -> 123 error_logger:warning_msg("The pg_local server received an unexpected message:\n" 124 "handle_call(~p, ~p, _)\n", 125 [Request, From]), 126 {noreply, S}. 127 128handle_cast({join, Name, Pid}, S) -> 129 _ = join_group(Name, Pid), 130 {noreply, S}; 131handle_cast({leave, Name, Pid}, S) -> 132 leave_group(Name, Pid), 133 {noreply, S}; 134handle_cast(_, S) -> 135 {noreply, S}. 136 137handle_info({'DOWN', MonitorRef, process, Pid, _Info}, S) -> 138 member_died(MonitorRef, Pid), 139 {noreply, S}; 140handle_info(_, S) -> 141 {noreply, S}. 142 143terminate(_Reason, _S) -> 144 true = ets:delete(?TABLE), 145 ok. 146 147%%% 148%%% Local functions 149%%% 150 151%%% One ETS table, pg_local_table, is used for bookkeeping. The type of the 152%%% table is ordered_set, and the fast matching of partially 153%%% instantiated keys is used extensively. 154%%% 155%%% {{ref, Pid}, MonitorRef, Counter} 156%%% {{ref, MonitorRef}, Pid} 157%%% Each process has one monitor. Counter is incremented when the 158%%% Pid joins some group. 159%%% {{member, Name, Pid}, _} 160%%% Pid is a member of group Name, GroupCounter is incremented when the 161%%% Pid joins the group Name. 162%%% {{pid, Pid, Name}} 163%%% Pid is a member of group Name. 164 165member_died(Ref, Pid) -> 166 case ets:lookup(?TABLE, {ref, Ref}) of 167 [{{ref, Ref}, Pid}] -> 168 leave_all_groups(Pid); 169 %% in case the key has already been removed 170 %% we can clean up using the value from the DOWN message 171 _ -> 172 leave_all_groups(Pid) 173 end, 174 ok. 175 176leave_all_groups(Pid) -> 177 Names = member_groups(Pid), 178 _ = [leave_group(Name, P) || 179 Name <- Names, 180 P <- member_in_group(Pid, Name)]. 181 182join_group(Name, Pid) -> 183 Ref_Pid = {ref, Pid}, 184 try _ = ets:update_counter(?TABLE, Ref_Pid, {3, +1}) 185 catch _:_ -> 186 Ref = erlang:monitor(process, Pid), 187 true = ets:insert(?TABLE, {Ref_Pid, Ref, 1}), 188 true = ets:insert(?TABLE, {{ref, Ref}, Pid}) 189 end, 190 Member_Name_Pid = {member, Name, Pid}, 191 try _ = ets:update_counter(?TABLE, Member_Name_Pid, {2, +1}) 192 catch _:_ -> 193 true = ets:insert(?TABLE, {Member_Name_Pid, 1}), 194 true = ets:insert(?TABLE, {{pid, Pid, Name}}) 195 end. 196 197leave_group(Name, Pid) -> 198 Member_Name_Pid = {member, Name, Pid}, 199 try ets:update_counter(?TABLE, Member_Name_Pid, {2, -1}) of 200 N -> 201 if 202 N =:= 0 -> 203 true = ets:delete(?TABLE, {pid, Pid, Name}), 204 true = ets:delete(?TABLE, Member_Name_Pid); 205 true -> 206 ok 207 end, 208 Ref_Pid = {ref, Pid}, 209 case ets:update_counter(?TABLE, Ref_Pid, {3, -1}) of 210 0 -> 211 [{Ref_Pid,Ref,0}] = ets:lookup(?TABLE, Ref_Pid), 212 true = ets:delete(?TABLE, {ref, Ref}), 213 true = ets:delete(?TABLE, Ref_Pid), 214 true = erlang:demonitor(Ref, [flush]), 215 ok; 216 _ -> 217 ok 218 end 219 catch _:_ -> 220 ok 221 end. 222 223group_members(Name) -> 224 [P || 225 [P, N] <- ets:match(?TABLE, {{member, Name, '$1'},'$2'}), 226 _ <- lists:seq(1, N)]. 227 228member_in_group(Pid, Name) -> 229 [{{member, Name, Pid}, N}] = ets:lookup(?TABLE, {member, Name, Pid}), 230 lists:duplicate(N, Pid). 231 232member_present(Name, Pid) -> 233 case ets:lookup(?TABLE, {member, Name, Pid}) of 234 [_] -> true; 235 [] -> false 236 end. 237 238member_groups(Pid) -> 239 [Name || [Name] <- ets:match(?TABLE, {{pid, Pid, '$1'}})]. 240 241ensure_started() -> 242 case whereis(?MODULE) of 243 undefined -> 244 C = {pg_local, {?MODULE, start_link, []}, permanent, 245 16#ffffffff, worker, [?MODULE]}, 246 supervisor:start_child(kernel_safe_sup, C); 247 PgLocalPid -> 248 {ok, PgLocalPid} 249 end. 250