1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(gm).
9
10%% Guaranteed Multicast
11%% ====================
12%%
13%% This module provides the ability to create named groups of
14%% processes to which members can be dynamically added and removed,
15%% and for messages to be broadcast within the group that are
16%% guaranteed to reach all members of the group during the lifetime of
17%% the message. The lifetime of a message is defined as being, at a
18%% minimum, the time from which the message is first sent to any
19%% member of the group, up until the time at which it is known by the
20%% member who published the message that the message has reached all
21%% group members.
22%%
23%% The guarantee given is that provided a message, once sent, makes it
24%% to members who do not all leave the group, the message will
25%% continue to propagate to all group members.
26%%
27%% Another way of stating the guarantee is that if member P publishes
28%% messages m and m', then for all members P', if P' is a member of
29%% the group prior to the publication of m, and P' receives m', then
30%% P' will receive m.
31%%
32%% Note that only local-ordering is enforced: i.e. if member P sends
33%% message m and then message m', then for-all members P', if P'
34%% receives m and m', then they will receive m' after m. Causality
35%% ordering is _not_ enforced. I.e. if member P receives message m
36%% and as a result publishes message m', there is no guarantee that
37%% other members P' will receive m before m'.
38%%
39%%
40%% API Use
41%% -------
42%%
43%% Mnesia must be started. Use the idempotent create_tables/0 function
44%% to create the tables required.
45%%
46%% start_link/3
47%% Provide the group name, the callback module name, and any arguments
48%% you wish to be passed into the callback module's functions. The
49%% joined/2 function will be called when we have joined the group,
50%% with the arguments passed to start_link and a list of the current
51%% members of the group. See the callbacks specs and the comments
52%% below for further details of the callback functions.
53%%
54%% leave/1
55%% Provide the Pid. Removes the Pid from the group. The callback
56%% handle_terminate/2 function will be called.
57%%
58%% broadcast/2
59%% Provide the Pid and a Message. The message will be sent to all
60%% members of the group as per the guarantees given above. This is a
61%% cast and the function call will return immediately. There is no
62%% guarantee that the message will reach any member of the group.
63%%
64%% confirmed_broadcast/2
65%% Provide the Pid and a Message. As per broadcast/2 except that this
66%% is a call, not a cast, and only returns 'ok' once the Message has
67%% reached every member of the group. Do not call
68%% confirmed_broadcast/2 directly from the callback module otherwise
69%% you will deadlock the entire group.
70%%
71%% info/1
72%% Provide the Pid. Returns a proplist with various facts, including
73%% the group name and the current group members.
74%%
75%% validate_members/2
76%% Check whether a given member list agrees with the chosen member's
77%% view. Any differences will be communicated via the members_changed
78%% callback. If there are no differences then there will be no reply.
79%% Note that members will not necessarily share the same view.
80%%
81%% forget_group/1
82%% Provide the group name. Removes its mnesia record. Makes no attempt
83%% to ensure the group is empty.
84%%
85%% Implementation Overview
86%% -----------------------
87%%
88%% One possible means of implementation would be a fan-out from the
89%% sender to every member of the group. This would require that the
90%% group is fully connected, and, in the event that the original
91%% sender of the message disappears from the group before the message
92%% has made it to every member of the group, raises questions as to
93%% who is responsible for sending on the message to new group members.
94%% In particular, the issue is with [ Pid ! Msg || Pid <- Members ] -
95%% if the sender dies part way through, who is responsible for
96%% ensuring that the remaining Members receive the Msg? In the event
97%% that within the group, messages sent are broadcast from a subset of
98%% the members, the fan-out arrangement has the potential to
99%% substantially impact the CPU and network workload of such members,
100%% as such members would have to accommodate the cost of sending each
101%% message to every group member.
102%%
103%% Instead, if the members of the group are arranged in a chain, then
104%% it becomes easier to reason about who within the group has received
105%% each message and who has not. It eases issues of responsibility: in
106%% the event of a group member disappearing, the nearest upstream
107%% member of the chain is responsible for ensuring that messages
108%% continue to propagate down the chain. It also results in equal
109%% distribution of sending and receiving workload, even if all
110%% messages are being sent from just a single group member. This
111%% configuration has the further advantage that it is not necessary
112%% for every group member to know of every other group member, and
113%% even that a group member does not have to be accessible from all
114%% other group members.
115%%
116%% Performance is kept high by permitting pipelining and all
117%% communication between joined group members is asynchronous. In the
118%% chain A -> B -> C -> D, if A sends a message to the group, it will
119%% not directly contact C or D. However, it must know that D receives
120%% the message (in addition to B and C) before it can consider the
121%% message fully sent. A simplistic implementation would require that
122%% D replies to C, C replies to B and B then replies to A. This would
123%% result in a propagation delay of twice the length of the chain. It
124%% would also require, in the event of the failure of C, that D knows
125%% to directly contact B and issue the necessary replies. Instead, the
126%% chain forms a ring: D sends the message on to A: D does not
127%% distinguish A as the sender, merely as the next member (downstream)
128%% within the chain (which has now become a ring). When A receives
129%% from D messages that A sent, it knows that all members have
130%% received the message. However, the message is not dead yet: if C
131%% died as B was sending to C, then B would need to detect the death
132%% of C and forward the message on to D instead: thus every node has
133%% to remember every message published until it is told that it can
134%% forget about the message. This is essential not just for dealing
135%% with failure of members, but also for the addition of new members.
136%%
137%% Thus once A receives the message back again, it then sends to B an
138%% acknowledgement for the message, indicating that B can now forget
139%% about the message. B does so, and forwards the ack to C. C forgets
140%% the message, and forwards the ack to D, which forgets the message
141%% and finally forwards the ack back to A. At this point, A takes no
142%% further action: the message and its acknowledgement have made it to
143%% every member of the group. The message is now dead, and any new
144%% member joining the group at this point will not receive the
145%% message.
146%%
147%% We therefore have two roles:
148%%
149%% 1. The sender, who upon receiving their own messages back, must
150%% then send out acknowledgements, and upon receiving their own
151%% acknowledgements back perform no further action.
152%%
153%% 2. The other group members who upon receiving messages and
154%% acknowledgements must update their own internal state accordingly
155%% (the sending member must also do this in order to be able to
156%% accommodate failures), and forwards messages on to their downstream
157%% neighbours.
158%%
159%%
160%% Implementation: It gets trickier
161%% --------------------------------
162%%
163%% Chain A -> B -> C -> D
164%%
165%% A publishes a message which B receives. A now dies. B and D will
166%% detect the death of A, and will link up, thus the chain is now B ->
167%% C -> D. B forwards A's message on to C, who forwards it to D, who
168%% forwards it to B. Thus B is now responsible for A's messages - both
169%% publications and acknowledgements that were in flight at the point
170%% at which A died. Even worse is that this is transitive: after B
171%% forwards A's message to C, B dies as well. Now C is not only
172%% responsible for B's in-flight messages, but is also responsible for
173%% A's in-flight messages.
174%%
175%% Lemma 1: A member can only determine which dead members they have
176%% inherited responsibility for if there is a total ordering on the
177%% conflicting additions and subtractions of members from the group.
178%%
179%% Consider the simultaneous death of B and addition of B' that
180%% transitions a chain from A -> B -> C to A -> B' -> C. Either B' or
181%% C is responsible for in-flight messages from B. It is easy to
182%% ensure that at least one of them thinks they have inherited B, but
183%% if we do not ensure that exactly one of them inherits B, then we
184%% could have B' converting publishes to acks, which then will crash C
185%% as C does not believe it has issued acks for those messages.
186%%
187%% More complex scenarios are easy to concoct: A -> B -> C -> D -> E
188%% becoming A -> C' -> E. Who has inherited which of B, C and D?
189%%
190%% However, for non-conflicting membership changes, only a partial
191%% ordering is required. For example, A -> B -> C becoming A -> A' ->
192%% B. The addition of A', between A and B can have no conflicts with
193%% the death of C: it is clear that A has inherited C's messages.
194%%
195%% For ease of implementation, we adopt the simple solution, of
196%% imposing a total order on all membership changes.
197%%
198%% On the death of a member, it is ensured the dead member's
199%% neighbours become aware of the death, and the upstream neighbour
200%% now sends to its new downstream neighbour its state, including the
201%% messages pending acknowledgement. The downstream neighbour can then
202%% use this to calculate which publishes and acknowledgements it has
203%% missed out on, due to the death of its old upstream. Thus the
204%% downstream can catch up, and continues the propagation of messages
205%% through the group.
206%%
207%% Lemma 2: When a member is joining, it must synchronously
208%% communicate with its upstream member in order to receive its
209%% starting state atomically with its addition to the group.
210%%
211%% New members must start with the same state as their nearest
212%% upstream neighbour. This ensures that it is not surprised by
213%% acknowledgements they are sent, and that should their downstream
214%% neighbour die, they are able to send the correct state to their new
215%% downstream neighbour to ensure it can catch up. Thus in the
216%% transition A -> B -> C becomes A -> A' -> B -> C becomes A -> A' ->
217%% C, A' must start with the state of A, so that it can send C the
218%% correct state when B dies, allowing C to detect any missed
219%% messages.
220%%
221%% If A' starts by adding itself to the group membership, A could then
222%% die, without A' having received the necessary state from A. This
223%% would leave A' responsible for in-flight messages from A, but
224%% having the least knowledge of all, of those messages. Thus A' must
225%% start by synchronously calling A, which then immediately sends A'
226%% back its state. A then adds A' to the group. If A dies at this
227%% point then A' will be able to see this (as A' will fail to appear
228%% in the group membership), and thus A' will ignore the state it
229%% receives from A, and will simply repeat the process, trying to now
230%% join downstream from some other member. This ensures that should
231%% the upstream die as soon as the new member has been joined, the new
232%% member is guaranteed to receive the correct state, allowing it to
233%% correctly process messages inherited due to the death of its
234%% upstream neighbour.
235%%
236%% The canonical definition of the group membership is held by a
237%% distributed database. Whilst this allows the total ordering of
238%% changes to be achieved, it is nevertheless undesirable to have to
239%% query this database for the current view, upon receiving each
240%% message. Instead, we wish for members to be able to cache a view of
241%% the group membership, which then requires a cache invalidation
242%% mechanism. Each member maintains its own view of the group
243%% membership. Thus when the group's membership changes, members may
244%% need to become aware of such changes in order to be able to
245%% accurately process messages they receive. Because of the
246%% requirement of a total ordering of conflicting membership changes,
247%% it is not possible to use the guaranteed broadcast mechanism to
248%% communicate these changes: to achieve the necessary ordering, it
249%% would be necessary for such messages to be published by exactly one
250%% member, which can not be guaranteed given that such a member could
251%% die.
252%%
253%% The total ordering we enforce on membership changes gives rise to a
254%% view version number: every change to the membership creates a
255%% different view, and the total ordering permits a simple
256%% monotonically increasing view version number.
257%%
258%% Lemma 3: If a message is sent from a member that holds view version
259%% N, it can be correctly processed by any member receiving the
260%% message with a view version >= N.
261%%
262%% Initially, let us suppose that each view contains the ordering of
263%% every member that was ever part of the group. Dead members are
264%% marked as such. Thus we have a ring of members, some of which are
265%% dead, and are thus inherited by the nearest alive downstream
266%% member.
267%%
268%% In the chain A -> B -> C, all three members initially have view
269%% version 1, which reflects reality. B publishes a message, which is
270%% forward by C to A. B now dies, which A notices very quickly. Thus A
271%% updates the view, creating version 2. It now forwards B's
272%% publication, sending that message to its new downstream neighbour,
273%% C. This happens before C is aware of the death of B. C must become
274%% aware of the view change before it interprets the message its
275%% received, otherwise it will fail to learn of the death of B, and
276%% thus will not realise it has inherited B's messages (and will
277%% likely crash).
278%%
279%% Thus very simply, we have that each subsequent view contains more
280%% information than the preceding view.
281%%
282%% However, to avoid the views growing indefinitely, we need to be
283%% able to delete members which have died _and_ for which no messages
284%% are in-flight. This requires that upon inheriting a dead member, we
285%% know the last publication sent by the dead member (this is easy: we
286%% inherit a member because we are the nearest downstream member which
287%% implies that we know at least as much than everyone else about the
288%% publications of the dead member), and we know the earliest message
289%% for which the acknowledgement is still in flight.
290%%
291%% In the chain A -> B -> C, when B dies, A will send to C its state
292%% (as C is the new downstream from A), allowing C to calculate which
293%% messages it has missed out on (described above). At this point, C
294%% also inherits B's messages. If that state from A also includes the
295%% last message published by B for which an acknowledgement has been
296%% seen, then C knows exactly which further acknowledgements it must
297%% receive (also including issuing acknowledgements for publications
298%% still in-flight that it receives), after which it is known there
299%% are no more messages in flight for B, thus all evidence that B was
300%% ever part of the group can be safely removed from the canonical
301%% group membership.
302%%
303%% Thus, for every message that a member sends, it includes with that
304%% message its view version. When a member receives a message it will
305%% update its view from the canonical copy, should its view be older
306%% than the view version included in the message it has received.
307%%
308%% The state held by each member therefore includes the messages from
309%% each publisher pending acknowledgement, the last publication seen
310%% from that publisher, and the last acknowledgement from that
311%% publisher. In the case of the member's own publications or
312%% inherited members, this last acknowledgement seen state indicates
313%% the last acknowledgement retired, rather than sent.
314%%
315%%
316%% Proof sketch
317%% ------------
318%%
319%% We need to prove that with the provided operational semantics, we
320%% can never reach a state that is not well formed from a well-formed
321%% starting state.
322%%
323%% Operational semantics (small step): straight-forward message
324%% sending, process monitoring, state updates.
325%%
326%% Well formed state: dead members inherited by exactly one non-dead
327%% member; for every entry in anyone's pending-acks, either (the
328%% publication of the message is in-flight downstream from the member
329%% and upstream from the publisher) or (the acknowledgement of the
330%% message is in-flight downstream from the publisher and upstream
331%% from the member).
332%%
333%% Proof by induction on the applicable operational semantics.
334%%
335%%
336%% Related work
337%% ------------
338%%
339%% The ring configuration and double traversal of messages around the
340%% ring is similar (though developed independently) to the LCR
341%% protocol by [Levy 2008]. However, LCR differs in several
342%% ways. Firstly, by using vector clocks, it enforces a total order of
343%% message delivery, which is unnecessary for our purposes. More
344%% significantly, it is built on top of a "group communication system"
345%% which performs the group management functions, taking
346%% responsibility away from the protocol as to how to cope with safely
347%% adding and removing members. When membership changes do occur, the
348%% protocol stipulates that every member must perform communication
349%% with every other member of the group, to ensure all outstanding
350%% deliveries complete, before the entire group transitions to the new
351%% view. This, in total, requires two sets of all-to-all synchronous
352%% communications.
353%%
354%% This is not only rather inefficient, but also does not explain what
355%% happens upon the failure of a member during this process. It does
356%% though entirely avoid the need for inheritance of responsibility of
357%% dead members that our protocol incorporates.
358%%
359%% In [Marandi et al 2010], a Paxos-based protocol is described. This
360%% work explicitly focuses on the efficiency of communication. LCR
361%% (and our protocol too) are more efficient, but at the cost of
362%% higher latency. The Ring-Paxos protocol is itself built on top of
363%% IP-multicast, which rules it out for many applications where
364%% point-to-point communication is all that can be required. They also
365%% have an excellent related work section which I really ought to
366%% read...
367%%
368%%
369%% [Levy 2008] The Complexity of Reliable Distributed Storage, 2008.
370%% [Marandi et al 2010] Ring Paxos: A High-Throughput Atomic Broadcast
371%% Protocol
372
373
374-behaviour(gen_server2).
375
376-export([create_tables/0, start_link/4, leave/1, broadcast/2, broadcast/3,
377         confirmed_broadcast/2, info/1, validate_members/2, forget_group/1]).
378
379-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
380         code_change/3, prioritise_info/3]).
381
382%% For INSTR_MOD callbacks
383-export([call/3, cast/2, monitor/1, demonitor/1]).
384
385-export([table_definitions/0]).
386
387-define(GROUP_TABLE, gm_group).
388-define(MAX_BUFFER_SIZE, 100000000). %% 100MB
389-define(BROADCAST_TIMER, 25).
390-define(FORCE_GC_TIMER, 250).
391-define(VERSION_START, 0).
392-define(SETS, ordsets).
393
394-record(state,
395        { self,
396          left,
397          right,
398          group_name,
399          module,
400          view,
401          pub_count,
402          members_state,
403          callback_args,
404          confirms,
405          broadcast_buffer,
406          broadcast_buffer_sz,
407          broadcast_timer,
408          force_gc_timer,
409          txn_executor,
410          shutting_down
411        }).
412
413-record(gm_group, { name, version, members }).
414
415-record(view_member, { id, aliases, left, right }).
416
417-record(member, { pending_ack, last_pub, last_ack }).
418
419-define(TABLE, {?GROUP_TABLE, [{record_name, gm_group},
420                               {attributes, record_info(fields, gm_group)}]}).
421-define(TABLE_MATCH, {match, #gm_group { _ = '_' }}).
422
423-define(TAG, '$gm').
424
425-export_type([group_name/0]).
426
427-type group_name() :: any().
428-type txn_fun() :: fun((fun(() -> any())) -> any()).
429
430%% The joined, members_changed and handle_msg callbacks can all return
431%% any of the following terms:
432%%
433%% 'ok' - the callback function returns normally
434%%
435%% {'stop', Reason} - the callback indicates the member should stop
436%% with reason Reason and should leave the group.
437%%
438%% {'become', Module, Args} - the callback indicates that the callback
439%% module should be changed to Module and that the callback functions
440%% should now be passed the arguments Args. This allows the callback
441%% module to be dynamically changed.
442
443%% Called when we've successfully joined the group. Supplied with Args
444%% provided in start_link, plus current group members.
445-callback joined(Args :: term(), Members :: [pid()]) ->
446    ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
447
448%% Supplied with Args provided in start_link, the list of new members
449%% and the list of members previously known to us that have since
450%% died. Note that if a member joins and dies very quickly, it's
451%% possible that we will never see that member appear in either births
452%% or deaths. However we are guaranteed that (1) we will see a member
453%% joining either in the births here, or in the members passed to
454%% joined/2 before receiving any messages from it; and (2) we will not
455%% see members die that we have not seen born (or supplied in the
456%% members to joined/2).
457-callback members_changed(Args :: term(),
458                          Births :: [pid()], Deaths :: [pid()]) ->
459    ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
460
461%% Supplied with Args provided in start_link, the sender, and the
462%% message. This does get called for messages injected by this member,
463%% however, in such cases, there is no special significance of this
464%% invocation: it does not indicate that the message has made it to
465%% any other members, let alone all other members.
466-callback handle_msg(Args :: term(), From :: pid(), Message :: term()) ->
467    ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
468
469%% Called on gm member termination as per rules in gen_server, with
470%% the Args provided in start_link plus the termination Reason.
471-callback handle_terminate(Args :: term(), Reason :: term()) ->
472    ok | term().
473
474-spec create_tables() -> 'ok' | {'aborted', any()}.
475
476create_tables() ->
477    create_tables([?TABLE]).
478
479create_tables([]) ->
480    ok;
481create_tables([{Table, Attributes} | Tables]) ->
482    case mnesia:create_table(Table, Attributes) of
483        {atomic, ok}                       -> create_tables(Tables);
484        {aborted, {already_exists, Table}} -> create_tables(Tables);
485        Err                                -> Err
486    end.
487
488table_definitions() ->
489    {Name, Attributes} = ?TABLE,
490    [{Name, [?TABLE_MATCH | Attributes]}].
491
492-spec start_link(group_name(), atom(), any(), txn_fun()) ->
493          rabbit_types:ok_pid_or_error().
494
495start_link(GroupName, Module, Args, TxnFun) ->
496    gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun],
497                       [{spawn_opt, [{fullsweep_after, 0}]}]).
498
499-spec leave(pid()) -> 'ok'.
500
501leave(Server) ->
502    gen_server2:cast(Server, leave).
503
504-spec broadcast(pid(), any()) -> 'ok'.
505
506broadcast(Server, Msg) -> broadcast(Server, Msg, 0).
507
508broadcast(Server, Msg, SizeHint) ->
509    gen_server2:cast(Server, {broadcast, Msg, SizeHint}).
510
511-spec confirmed_broadcast(pid(), any()) -> 'ok'.
512
513confirmed_broadcast(Server, Msg) ->
514    gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity).
515
516-spec info(pid()) -> rabbit_types:infos().
517
518info(Server) ->
519    gen_server2:call(Server, info, infinity).
520
521-spec validate_members(pid(), [pid()]) -> 'ok'.
522
523validate_members(Server, Members) ->
524    gen_server2:cast(Server, {validate_members, Members}).
525
526-spec forget_group(group_name()) -> 'ok'.
527
528forget_group(GroupName) ->
529    {atomic, ok} = mnesia:sync_transaction(
530                     fun () ->
531                             mnesia:delete({?GROUP_TABLE, GroupName})
532                     end),
533    ok.
534
535init([GroupName, Module, Args, TxnFun]) ->
536    put(process_name, {?MODULE, GroupName}),
537    Self = make_member(GroupName),
538    gen_server2:cast(self(), join),
539    {ok, #state { self                = Self,
540                  left                = {Self, undefined},
541                  right               = {Self, undefined},
542                  group_name          = GroupName,
543                  module              = Module,
544                  view                = undefined,
545                  pub_count           = -1,
546                  members_state       = undefined,
547                  callback_args       = Args,
548                  confirms            = queue:new(),
549                  broadcast_buffer    = [],
550                  broadcast_buffer_sz = 0,
551                  broadcast_timer     = undefined,
552                  force_gc_timer      = undefined,
553                  txn_executor        = TxnFun,
554                  shutting_down       = false }}.
555
556
557handle_call({confirmed_broadcast, _Msg}, _From,
558            State = #state { shutting_down = {true, _} }) ->
559    reply(shutting_down, State);
560
561handle_call({confirmed_broadcast, _Msg}, _From,
562            State = #state { members_state = undefined }) ->
563    reply(not_joined, State);
564
565handle_call({confirmed_broadcast, Msg}, _From,
566            State = #state { self          = Self,
567                             right         = {Self, undefined},
568                             module        = Module,
569                             callback_args = Args }) ->
570    handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
571                           ok, State});
572
573handle_call({confirmed_broadcast, Msg}, From, State) ->
574    {Result, State1 = #state { pub_count = PubCount, confirms = Confirms }} =
575        internal_broadcast(Msg, 0, State),
576    Confirms1 = queue:in({PubCount, From}, Confirms),
577    handle_callback_result({Result, flush_broadcast_buffer(
578                                      State1 #state { confirms = Confirms1 })});
579
580handle_call(info, _From,
581            State = #state { members_state = undefined }) ->
582    reply(not_joined, State);
583
584handle_call(info, _From, State = #state { group_name = GroupName,
585                                          module     = Module,
586                                          view       = View }) ->
587    reply([{group_name,    GroupName},
588           {module,        Module},
589           {group_members, get_pids(alive_view_members(View))}], State);
590
591handle_call({add_on_right, _NewMember}, _From,
592            State = #state { members_state = undefined }) ->
593    reply(not_ready, State);
594
595handle_call({add_on_right, NewMember}, _From,
596            State = #state { self          = Self,
597                             group_name    = GroupName,
598                             members_state = MembersState,
599                             txn_executor  = TxnFun }) ->
600    try
601        Group = record_new_member_in_group(
602                  NewMember, Self, GroupName, TxnFun),
603        View1 = group_to_view(check_membership(Self, Group)),
604        MembersState1 = remove_erased_members(MembersState, View1),
605        ok = send_right(NewMember, View1,
606                        {catchup, Self, prepare_members_state(MembersState1)}),
607        {Result, State1} = change_view(View1, State #state {
608                                                members_state = MembersState1 }),
609        handle_callback_result({Result, {ok, Group}, State1})
610    catch
611        lost_membership ->
612            {stop, shutdown, State}
613    end.
614
615%% add_on_right causes a catchup to be sent immediately from the left,
616%% so we can never see this from the left neighbour. However, it's
617%% possible for the right neighbour to send us a check_neighbours
618%% immediately before that. We can't possibly handle it, but if we're
619%% in this state we know a catchup is coming imminently anyway. So
620%% just ignore it.
621handle_cast({?TAG, _ReqVer, check_neighbours},
622            State = #state { members_state = undefined }) ->
623    noreply(State);
624
625handle_cast({?TAG, ReqVer, Msg},
626            State = #state { view          = View,
627                             self          = Self,
628                             members_state = MembersState,
629                             group_name    = GroupName }) ->
630    try
631        {Result, State1} =
632            case needs_view_update(ReqVer, View) of
633                true  ->
634                    View1 = group_to_view(
635                              check_membership(Self,
636                                               dirty_read_group(GroupName))),
637                    MemberState1 = remove_erased_members(MembersState, View1),
638                    change_view(View1, State #state {
639                                         members_state = MemberState1 });
640                false -> {ok, State}
641            end,
642        handle_callback_result(
643          if_callback_success(
644            Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1))
645    catch
646        lost_membership ->
647            {stop, shutdown, State}
648    end;
649
650handle_cast({broadcast, _Msg, _SizeHint},
651            State = #state { shutting_down = {true, _} }) ->
652    noreply(State);
653
654handle_cast({broadcast, _Msg, _SizeHint},
655            State = #state { members_state = undefined }) ->
656    noreply(State);
657
658handle_cast({broadcast, Msg, _SizeHint},
659            State = #state { self          = Self,
660                             right         = {Self, undefined},
661                             module        = Module,
662                             callback_args = Args }) ->
663    handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
664                            State});
665
666handle_cast({broadcast, Msg, SizeHint}, State) ->
667    {Result, State1} = internal_broadcast(Msg, SizeHint, State),
668    handle_callback_result({Result, maybe_flush_broadcast_buffer(State1)});
669
670handle_cast(join, State = #state { self          = Self,
671                                   group_name    = GroupName,
672                                   members_state = undefined,
673                                   module        = Module,
674                                   callback_args = Args,
675                                   txn_executor  = TxnFun }) ->
676    try
677	View = join_group(Self, GroupName, TxnFun),
678	MembersState =
679	    case alive_view_members(View) of
680		[Self] -> blank_member_state();
681		_      -> undefined
682	    end,
683	State1 = check_neighbours(State #state { view          = View,
684						 members_state = MembersState }),
685	handle_callback_result(
686	  {Module:joined(Args, get_pids(all_known_members(View))), State1})
687    catch
688        lost_membership ->
689            {stop, shutdown, State}
690    end;
691
692handle_cast({validate_members, OldMembers},
693            State = #state { view          = View,
694                             module        = Module,
695                             callback_args = Args }) ->
696    NewMembers = get_pids(all_known_members(View)),
697    Births = NewMembers -- OldMembers,
698    Deaths = OldMembers -- NewMembers,
699    case {Births, Deaths} of
700        {[], []}  -> noreply(State);
701        _         -> Result = Module:members_changed(Args, Births, Deaths),
702                     handle_callback_result({Result, State})
703    end;
704
705handle_cast(leave, State) ->
706    {stop, normal, State}.
707
708
709handle_info(force_gc, State) ->
710    garbage_collect(),
711    noreply(State #state { force_gc_timer = undefined });
712
713handle_info(flush, State) ->
714    noreply(
715      flush_broadcast_buffer(State #state { broadcast_timer = undefined }));
716
717handle_info(timeout, State) ->
718    noreply(flush_broadcast_buffer(State));
719
720handle_info({'DOWN', _MRef, process, _Pid, _Reason},
721            State = #state { shutting_down =
722                                 {true, {shutdown, ring_shutdown}} }) ->
723    noreply(State);
724handle_info({'DOWN', MRef, process, _Pid, Reason},
725            State = #state { self          = Self,
726                             left          = Left,
727                             right         = Right,
728                             group_name    = GroupName,
729                             confirms      = Confirms,
730                             txn_executor  = TxnFun }) ->
731    try
732        check_membership(GroupName),
733        Member = case {Left, Right} of
734                     {{Member1, MRef}, _} -> Member1;
735                     {_, {Member1, MRef}} -> Member1;
736                     _                    -> undefined
737                 end,
738        case {Member, Reason} of
739            {undefined, _} ->
740                noreply(State);
741            {_, {shutdown, ring_shutdown}} ->
742                noreply(State);
743            _ ->
744                %% In the event of a partial partition we could see another member
745                %% go down and then remove them from Mnesia. While they can
746                %% recover from this they'd have to restart the queue - not
747                %% ideal. So let's sleep here briefly just in case this was caused
748                %% by a partial partition; in which case by the time we record the
749                %% member death in Mnesia we will probably be in a full
750                %% partition and will not be assassinating another member.
751                timer:sleep(100),
752                View1 = group_to_view(record_dead_member_in_group(Self,
753                                        Member, GroupName, TxnFun, true)),
754                handle_callback_result(
755                  case alive_view_members(View1) of
756                      [Self] -> maybe_erase_aliases(
757                                  State #state {
758                                    members_state = blank_member_state(),
759                                    confirms      = purge_confirms(Confirms) },
760                                  View1);
761                      _      -> change_view(View1, State)
762                  end)
763        end
764    catch
765        lost_membership ->
766            {stop, shutdown, State}
767    end;
768handle_info(_, State) ->
769    %% Discard any unexpected messages, such as late replies from neighbour_call/2
770    %% TODO: For #gm_group{} related info messages, it could be worthwhile to
771    %% change_view/2, as this might reflect an alteration in the gm group, meaning
772    %% we now need to update our state. see rabbitmq-server#914.
773    noreply(State).
774
775terminate(Reason, #state { module = Module, callback_args = Args }) ->
776    Module:handle_terminate(Args, Reason).
777
778code_change(_OldVsn, State, _Extra) ->
779    {ok, State}.
780
781prioritise_info(flush, _Len, _State) ->
782    1;
783%% DOWN messages should not overtake initial catchups; if they do we
784%% will receive a DOWN we do not know what to do with.
785prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _Len,
786                #state { members_state = undefined }) ->
787    0;
788%% We should not prioritise DOWN messages from our left since
789%% otherwise the DOWN can overtake any last activity from the left,
790%% causing that activity to be lost.
791prioritise_info({'DOWN', _MRef, process, LeftPid, _Reason}, _Len,
792                #state { left = {{_LeftVer, LeftPid}, _MRef2} }) ->
793    0;
794%% But prioritise all other DOWNs - we want to make sure we are not
795%% sending activity into the void for too long because our right is
796%% down but we don't know it.
797prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _Len, _State) ->
798    1;
799prioritise_info(_, _Len, _State) ->
800    0.
801
802
803handle_msg(check_neighbours, State) ->
804    %% no-op - it's already been done by the calling handle_cast
805    {ok, State};
806
807handle_msg({catchup, Left, MembersStateLeft},
808           State = #state { self          = Self,
809                            left          = {Left, _MRefL},
810                            right         = {Right, _MRefR},
811                            view          = View,
812                            members_state = undefined }) ->
813    ok = send_right(Right, View, {catchup, Self, MembersStateLeft}),
814    MembersStateLeft1 = build_members_state(MembersStateLeft),
815    {ok, State #state { members_state = MembersStateLeft1 }};
816
817handle_msg({catchup, Left, MembersStateLeft},
818           State = #state { self = Self,
819                            left = {Left, _MRefL},
820                            view = View,
821                            members_state = MembersState })
822  when MembersState =/= undefined ->
823    MembersStateLeft1 = build_members_state(MembersStateLeft),
824    AllMembers = lists:usort(maps:keys(MembersState) ++
825                                 maps:keys(MembersStateLeft1)),
826    {MembersState1, Activity} =
827        lists:foldl(
828          fun (Id, MembersStateActivity) ->
829                  #member { pending_ack = PALeft, last_ack = LA } =
830                      find_member_or_blank(Id, MembersStateLeft1),
831                  with_member_acc(
832                    fun (#member { pending_ack = PA } = Member, Activity1) ->
833                            case is_member_alias(Id, Self, View) of
834                                true ->
835                                    {_AcksInFlight, Pubs, _PA1} =
836                                        find_prefix_common_suffix(PALeft, PA),
837                                    {Member #member { last_ack = LA },
838                                     activity_cons(Id, pubs_from_queue(Pubs),
839                                                   [], Activity1)};
840                                false ->
841                                    {Acks, _Common, Pubs} =
842                                        find_prefix_common_suffix(PA, PALeft),
843                                    {Member,
844                                     activity_cons(Id, pubs_from_queue(Pubs),
845                                                   acks_from_queue(Acks),
846                                                   Activity1)}
847                            end
848                    end, Id, MembersStateActivity)
849          end, {MembersState, activity_nil()}, AllMembers),
850    handle_msg({activity, Left, activity_finalise(Activity)},
851               State #state { members_state = MembersState1 });
852
853handle_msg({catchup, _NotLeft, _MembersState}, State) ->
854    {ok, State};
855
856handle_msg({activity, Left, Activity},
857           State = #state { self          = Self,
858                            group_name    = GroupName,
859                            left          = {Left, _MRefL},
860                            view          = View,
861                            members_state = MembersState,
862                            confirms      = Confirms })
863  when MembersState =/= undefined ->
864    try
865        %% If we have to stop, do it asap so we avoid any ack confirmation
866        %% Membership must be checked again by erase_members_in_group, as the
867        %% node can be marked as dead on the meanwhile
868        check_membership(GroupName),
869        {MembersState1, {Confirms1, Activity1}} =
870            calculate_activity(MembersState, Confirms, Activity, Self, View),
871        State1 = State #state { members_state = MembersState1,
872                                confirms      = Confirms1 },
873        Activity3 = activity_finalise(Activity1),
874        ok = maybe_send_activity(Activity3, State1),
875        {Result, State2} = maybe_erase_aliases(State1, View),
876        if_callback_success(
877          Result, fun activity_true/3, fun activity_false/3, Activity3, State2)
878    catch
879        lost_membership ->
880            {{stop, shutdown}, State}
881    end;
882
883handle_msg({activity, _NotLeft, _Activity}, State) ->
884    {ok, State}.
885
886
887noreply(State) ->
888    {noreply, ensure_timers(State), flush_timeout(State)}.
889
890reply(Reply, State) ->
891    {reply, Reply, ensure_timers(State), flush_timeout(State)}.
892
893ensure_timers(State) ->
894    ensure_force_gc_timer(ensure_broadcast_timer(State)).
895
896flush_timeout(#state{broadcast_buffer = []}) -> infinity;
897flush_timeout(_)                             -> 0.
898
899ensure_force_gc_timer(State = #state { force_gc_timer = TRef })
900  when is_reference(TRef) ->
901    State;
902ensure_force_gc_timer(State = #state { force_gc_timer = undefined }) ->
903    TRef = erlang:send_after(?FORCE_GC_TIMER, self(), force_gc),
904    State #state { force_gc_timer = TRef }.
905
906ensure_broadcast_timer(State = #state { broadcast_buffer = [],
907                                        broadcast_timer  = undefined }) ->
908    State;
909ensure_broadcast_timer(State = #state { broadcast_buffer = [],
910                                        broadcast_timer  = TRef }) ->
911    _ = erlang:cancel_timer(TRef),
912    State #state { broadcast_timer = undefined };
913ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) ->
914    TRef = erlang:send_after(?BROADCAST_TIMER, self(), flush),
915    State #state { broadcast_timer = TRef };
916ensure_broadcast_timer(State) ->
917    State.
918
919internal_broadcast(Msg, SizeHint,
920                   State = #state { self                = Self,
921                                    pub_count           = PubCount,
922                                    module              = Module,
923                                    callback_args       = Args,
924                                    broadcast_buffer    = Buffer,
925                                    broadcast_buffer_sz = BufferSize }) ->
926    PubCount1 = PubCount + 1,
927    {Module:handle_msg(Args, get_pid(Self), Msg),
928     State #state { pub_count           = PubCount1,
929                    broadcast_buffer    = [{PubCount1, Msg} | Buffer],
930                    broadcast_buffer_sz = BufferSize + SizeHint}}.
931
932%% The Erlang distribution mechanism has an interesting quirk - it
933%% will kill the VM cold with "Absurdly large distribution output data
934%% buffer" if you attempt to send a message which serialises out to
935%% more than 2^31 bytes in size. It's therefore a very good idea to
936%% make sure that we don't exceed that size!
937%%
938%% Now, we could figure out the size of messages as they come in using
939%% size(term_to_binary(Msg)) or similar. The trouble is, that requires
940%% us to serialise the message only to throw the serialised form
941%% away. Hard to believe that's a sensible thing to do. So instead we
942%% accept a size hint from the application, via broadcast/3. This size
943%% hint can be the size of anything in the message which we expect
944%% could be large, and we just ignore the size of any small bits of
945%% the message term. Therefore MAX_BUFFER_SIZE is set somewhat
946%% conservatively at 100MB - but the buffer is only to allow us to
947%% buffer tiny messages anyway, so 100MB is plenty.
948
949maybe_flush_broadcast_buffer(State = #state{broadcast_buffer_sz = Size}) ->
950    case Size > ?MAX_BUFFER_SIZE of
951        true  -> flush_broadcast_buffer(State);
952        false -> State
953    end.
954
955flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) ->
956    State;
957flush_broadcast_buffer(State = #state { self             = Self,
958                                        members_state    = MembersState,
959                                        broadcast_buffer = Buffer,
960                                        pub_count        = PubCount }) ->
961    [{PubCount, _Msg}|_] = Buffer, %% ASSERTION match on PubCount
962    Pubs = lists:reverse(Buffer),
963    Activity = activity_cons(Self, Pubs, [], activity_nil()),
964    ok = maybe_send_activity(activity_finalise(Activity), State),
965    MembersState1 = with_member(
966                      fun (Member = #member { pending_ack = PA }) ->
967                              PA1 = queue:join(PA, queue:from_list(Pubs)),
968                              Member #member { pending_ack = PA1,
969                                               last_pub = PubCount }
970                      end, Self, MembersState),
971    State #state { members_state       = MembersState1,
972                   broadcast_buffer    = [],
973                   broadcast_buffer_sz = 0 }.
974
975%% ---------------------------------------------------------------------------
976%% View construction and inspection
977%% ---------------------------------------------------------------------------
978
979needs_view_update(ReqVer, {Ver, _View}) -> Ver < ReqVer.
980
981view_version({Ver, _View}) -> Ver.
982
983is_member_alive({dead, _Member}) -> false;
984is_member_alive(_)               -> true.
985
986is_member_alias(Self, Self, _View) ->
987    true;
988is_member_alias(Member, Self, View) ->
989    ?SETS:is_element(Member,
990                     ((fetch_view_member(Self, View)) #view_member.aliases)).
991
992dead_member_id({dead, Member}) -> Member.
993
994store_view_member(VMember = #view_member { id = Id }, {Ver, View}) ->
995    {Ver, maps:put(Id, VMember, View)}.
996
997with_view_member(Fun, View, Id) ->
998    store_view_member(Fun(fetch_view_member(Id, View)), View).
999
1000fetch_view_member(Id, {_Ver, View}) -> maps:get(Id, View).
1001
1002find_view_member(Id, {_Ver, View}) -> maps:find(Id, View).
1003
1004blank_view(Ver) -> {Ver, maps:new()}.
1005
1006alive_view_members({_Ver, View}) -> maps:keys(View).
1007
1008all_known_members({_Ver, View}) ->
1009    maps:fold(
1010       fun (Member, #view_member { aliases = Aliases }, Acc) ->
1011               ?SETS:to_list(Aliases) ++ [Member | Acc]
1012       end, [], View).
1013
1014group_to_view(#gm_group { members = Members, version = Ver }) ->
1015    Alive = lists:filter(fun is_member_alive/1, Members),
1016    [_|_] = Alive, %% ASSERTION - can't have all dead members
1017    add_aliases(link_view(Alive ++ Alive ++ Alive, blank_view(Ver)), Members).
1018
1019link_view([Left, Middle, Right | Rest], View) ->
1020    case find_view_member(Middle, View) of
1021        error ->
1022            link_view(
1023              [Middle, Right | Rest],
1024              store_view_member(#view_member { id      = Middle,
1025                                               aliases = ?SETS:new(),
1026                                               left    = Left,
1027                                               right   = Right }, View));
1028        {ok, _} ->
1029            View
1030    end;
1031link_view(_, View) ->
1032    View.
1033
1034add_aliases(View, Members) ->
1035    Members1 = ensure_alive_suffix(Members),
1036    {EmptyDeadSet, View1} =
1037        lists:foldl(
1038          fun (Member, {DeadAcc, ViewAcc}) ->
1039                  case is_member_alive(Member) of
1040                      true ->
1041                          {?SETS:new(),
1042                           with_view_member(
1043                             fun (VMember =
1044                                      #view_member { aliases = Aliases }) ->
1045                                     VMember #view_member {
1046                                       aliases = ?SETS:union(Aliases, DeadAcc) }
1047                             end, ViewAcc, Member)};
1048                      false ->
1049                          {?SETS:add_element(dead_member_id(Member), DeadAcc),
1050                           ViewAcc}
1051                  end
1052          end, {?SETS:new(), View}, Members1),
1053    0 = ?SETS:size(EmptyDeadSet), %% ASSERTION
1054    View1.
1055
1056ensure_alive_suffix(Members) ->
1057    queue:to_list(ensure_alive_suffix1(queue:from_list(Members))).
1058
1059ensure_alive_suffix1(MembersQ) ->
1060    {{value, Member}, MembersQ1} = queue:out_r(MembersQ),
1061    case is_member_alive(Member) of
1062        true  -> MembersQ;
1063        false -> ensure_alive_suffix1(queue:in_r(Member, MembersQ1))
1064    end.
1065
1066
1067%% ---------------------------------------------------------------------------
1068%% View modification
1069%% ---------------------------------------------------------------------------
1070
1071join_group(Self, GroupName, TxnFun) ->
1072    join_group(Self, GroupName, dirty_read_group(GroupName), TxnFun).
1073
1074join_group(Self, GroupName, {error, not_found}, TxnFun) ->
1075    join_group(Self, GroupName,
1076               prune_or_create_group(Self, GroupName, TxnFun), TxnFun);
1077join_group(Self, _GroupName, #gm_group { members = [Self] } = Group, _TxnFun) ->
1078    group_to_view(Group);
1079join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) ->
1080    case lists:member(Self, Members) of
1081        true ->
1082            group_to_view(Group);
1083        false ->
1084            case lists:filter(fun is_member_alive/1, Members) of
1085                [] ->
1086                    join_group(Self, GroupName,
1087                               prune_or_create_group(Self, GroupName, TxnFun),
1088                               TxnFun);
1089                Alive ->
1090                    Left = lists:nth(rand:uniform(length(Alive)), Alive),
1091                    Handler =
1092                        fun () ->
1093                                join_group(
1094                                  Self, GroupName,
1095                                  record_dead_member_in_group(Self,
1096                                    Left, GroupName, TxnFun, false),
1097                                  TxnFun)
1098                        end,
1099                    try
1100                        case neighbour_call(Left, {add_on_right, Self}) of
1101                            {ok, Group1} -> group_to_view(Group1);
1102                            not_ready    -> join_group(Self, GroupName, TxnFun)
1103                        end
1104                    catch
1105                        exit:{R, _}
1106                          when R =:= noproc; R =:= normal; R =:= shutdown ->
1107                            Handler();
1108                        exit:{{R, _}, _}
1109                          when R =:= nodedown; R =:= shutdown ->
1110                            Handler()
1111                    end
1112            end
1113    end.
1114
1115dirty_read_group(GroupName) ->
1116    case mnesia:dirty_read(?GROUP_TABLE, GroupName) of
1117        []      -> {error, not_found};
1118        [Group] -> Group
1119    end.
1120
1121read_group(GroupName) ->
1122    case mnesia:read({?GROUP_TABLE, GroupName}) of
1123        []      -> {error, not_found};
1124        [Group] -> Group
1125    end.
1126
1127write_group(Group) -> mnesia:write(?GROUP_TABLE, Group, write), Group.
1128
1129prune_or_create_group(Self, GroupName, TxnFun) ->
1130    TxnFun(
1131      fun () ->
1132              GroupNew = #gm_group { name    = GroupName,
1133                                     members = [Self],
1134                                     version = get_version(Self) },
1135              case read_group(GroupName) of
1136                  {error, not_found} ->
1137                      write_group(GroupNew);
1138                  Group = #gm_group { members = Members } ->
1139                      case lists:any(fun is_member_alive/1, Members) of
1140                          true  -> Group;
1141                          false -> write_group(GroupNew)
1142                      end
1143              end
1144      end).
1145
1146record_dead_member_in_group(Self, Member, GroupName, TxnFun, Verify) ->
1147    Fun =
1148        fun () ->
1149                try
1150                    Group = #gm_group { members = Members, version = Ver } =
1151                        case Verify of
1152                            true ->
1153                                check_membership(Self, read_group(GroupName));
1154                            false ->
1155                                check_group(read_group(GroupName))
1156                        end,
1157                    case lists:splitwith(
1158                           fun (Member1) -> Member1 =/= Member end, Members) of
1159                        {_Members1, []} -> %% not found - already recorded dead
1160                            Group;
1161                        {Members1, [Member | Members2]} ->
1162                            Members3 = Members1 ++ [{dead, Member} | Members2],
1163                            write_group(Group #gm_group { members = Members3,
1164                                                          version = Ver + 1 })
1165                    end
1166                catch
1167                    lost_membership ->
1168                        %% The transaction must not be abruptly crashed, but
1169                        %% leave the gen_server to stop normally
1170                        {error, lost_membership}
1171                end
1172        end,
1173    handle_lost_membership_in_txn(TxnFun, Fun).
1174
1175handle_lost_membership_in_txn(TxnFun, Fun) ->
1176    case TxnFun(Fun)  of
1177        {error, lost_membership = T} ->
1178            throw(T);
1179        Any ->
1180            Any
1181    end.
1182
1183record_new_member_in_group(NewMember, Left, GroupName, TxnFun) ->
1184    Fun =
1185        fun () ->
1186                try
1187                    Group = #gm_group { members = Members, version = Ver } =
1188                        check_membership(Left, read_group(GroupName)),
1189                    case lists:member(NewMember, Members) of
1190                        true ->
1191                            %% This avois duplicates during partial partitions,
1192                            %% as inconsistent views might happen during them
1193                            rabbit_log:warning("(~p) GM avoiding duplicate of ~p",
1194                                               [self(), NewMember]),
1195                            Group;
1196                        false ->
1197                            {Prefix, [Left | Suffix]} =
1198                                lists:splitwith(fun (M) -> M =/= Left end, Members),
1199                            write_group(Group #gm_group {
1200                                          members = Prefix ++ [Left, NewMember | Suffix],
1201                                          version = Ver + 1 })
1202                    end
1203                catch
1204                    lost_membership ->
1205                        %% The transaction must not be abruptly crashed, but
1206                        %% leave the gen_server to stop normally
1207                        {error, lost_membership}
1208                end
1209        end,
1210    handle_lost_membership_in_txn(TxnFun, Fun).
1211
1212erase_members_in_group(Self, Members, GroupName, TxnFun) ->
1213    DeadMembers = [{dead, Id} || Id <- Members],
1214    Fun =
1215        fun () ->
1216                try
1217                    Group = #gm_group { members = [_|_] = Members1, version = Ver } =
1218                        check_membership(Self, read_group(GroupName)),
1219                    case Members1 -- DeadMembers of
1220                        Members1 -> Group;
1221                        Members2 -> write_group(
1222                                      Group #gm_group { members = Members2,
1223                                                        version = Ver + 1 })
1224                    end
1225              catch
1226                  lost_membership ->
1227                      %% The transaction must not be abruptly crashed, but
1228                      %% leave the gen_server to stop normally
1229                      {error, lost_membership}
1230              end
1231        end,
1232    handle_lost_membership_in_txn(TxnFun, Fun).
1233
1234maybe_erase_aliases(State = #state { self          = Self,
1235                                     group_name    = GroupName,
1236                                     members_state = MembersState,
1237                                     txn_executor  = TxnFun }, View) ->
1238    #view_member { aliases = Aliases } = fetch_view_member(Self, View),
1239    {Erasable, MembersState1}
1240        = ?SETS:fold(
1241             fun (Id, {ErasableAcc, MembersStateAcc} = Acc) ->
1242                     #member { last_pub = LP, last_ack = LA } =
1243                         find_member_or_blank(Id, MembersState),
1244                     case can_erase_view_member(Self, Id, LA, LP) of
1245                         true  -> {[Id | ErasableAcc],
1246                                   erase_member(Id, MembersStateAcc)};
1247                         false -> Acc
1248                     end
1249             end, {[], MembersState}, Aliases),
1250    View1 = case Erasable of
1251                [] -> View;
1252                _  -> group_to_view(
1253                        erase_members_in_group(Self, Erasable, GroupName, TxnFun))
1254            end,
1255    change_view(View1, State #state { members_state = MembersState1 }).
1256
1257can_erase_view_member(Self, Self, _LA, _LP) -> false;
1258can_erase_view_member(_Self, _Id,   N,   N) -> true;
1259can_erase_view_member(_Self, _Id, _LA, _LP) -> false.
1260
1261neighbour_cast(N, Msg) -> ?INSTR_MOD:cast(get_pid(N), Msg).
1262neighbour_call(N, Msg) -> ?INSTR_MOD:call(get_pid(N), Msg, infinity).
1263
1264%% ---------------------------------------------------------------------------
1265%% View monitoring and maintenance
1266%% ---------------------------------------------------------------------------
1267
1268ensure_neighbour(_Ver, Self, {Self, undefined}, Self) ->
1269    {Self, undefined};
1270ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) ->
1271    ok = neighbour_cast(RealNeighbour, {?TAG, Ver, check_neighbours}),
1272    {RealNeighbour, maybe_monitor(RealNeighbour, Self)};
1273ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) ->
1274    {RealNeighbour, MRef};
1275ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
1276    true = ?INSTR_MOD:demonitor(MRef),
1277    Msg = {?TAG, Ver, check_neighbours},
1278    ok = neighbour_cast(RealNeighbour, Msg),
1279    ok = case Neighbour of
1280             Self -> ok;
1281             _    -> neighbour_cast(Neighbour, Msg)
1282         end,
1283    {Neighbour, maybe_monitor(Neighbour, Self)}.
1284
1285maybe_monitor( Self,  Self) -> undefined;
1286maybe_monitor(Other, _Self) -> ?INSTR_MOD:monitor(get_pid(Other)).
1287
1288check_neighbours(State = #state { self             = Self,
1289                                  left             = Left,
1290                                  right            = Right,
1291                                  view             = View,
1292                                  broadcast_buffer = Buffer }) ->
1293    #view_member { left = VLeft, right = VRight }
1294        = fetch_view_member(Self, View),
1295    Ver = view_version(View),
1296    Left1 = ensure_neighbour(Ver, Self, Left, VLeft),
1297    Right1 = ensure_neighbour(Ver, Self, Right, VRight),
1298    Buffer1 = case Right1 of
1299                  {Self, undefined} -> [];
1300                  _                 -> Buffer
1301              end,
1302    State1 = State #state { left = Left1, right = Right1,
1303                            broadcast_buffer = Buffer1 },
1304    ok = maybe_send_catchup(Right, State1),
1305    State1.
1306
1307maybe_send_catchup(Right, #state { right = Right }) ->
1308    ok;
1309maybe_send_catchup(_Right, #state { self  = Self,
1310                                    right = {Self, undefined} }) ->
1311    ok;
1312maybe_send_catchup(_Right, #state { members_state = undefined }) ->
1313    ok;
1314maybe_send_catchup(_Right, #state { self          = Self,
1315                                    right         = {Right, _MRef},
1316                                    view          = View,
1317                                    members_state = MembersState }) ->
1318    send_right(Right, View,
1319               {catchup, Self, prepare_members_state(MembersState)}).
1320
1321
1322%% ---------------------------------------------------------------------------
1323%% Catch_up delta detection
1324%% ---------------------------------------------------------------------------
1325
1326find_prefix_common_suffix(A, B) ->
1327    {Prefix, A1} = find_prefix(A, B, queue:new()),
1328    {Common, Suffix} = find_common(A1, B, queue:new()),
1329    {Prefix, Common, Suffix}.
1330
1331%% Returns the elements of A that occur before the first element of B,
1332%% plus the remainder of A.
1333find_prefix(A, B, Prefix) ->
1334    case {queue:out(A), queue:out(B)} of
1335        {{{value, Val}, _A1}, {{value, Val}, _B1}} ->
1336            {Prefix, A};
1337        {{empty, A1}, {{value, _A}, _B1}} ->
1338            {Prefix, A1};
1339        {{{value, {NumA, _MsgA} = Val}, A1},
1340         {{value, {NumB, _MsgB}}, _B1}} when NumA < NumB ->
1341            find_prefix(A1, B, queue:in(Val, Prefix));
1342        {_, {empty, _B1}} ->
1343            {A, Prefix} %% Prefix well be empty here
1344    end.
1345
1346%% A should be a prefix of B. Returns the commonality plus the
1347%% remainder of B.
1348find_common(A, B, Common) ->
1349    case {queue:out(A), queue:out(B)} of
1350        {{{value, Val}, A1}, {{value, Val}, B1}} ->
1351            find_common(A1, B1, queue:in(Val, Common));
1352        {{empty, _A}, _} ->
1353            {Common, B};
1354        %% Drop value from B.
1355        %% Match value to avoid infinite loop, since {empty, B} = queue:out(B).
1356        {_, {{value, _}, B1}} ->
1357            find_common(A, B1, Common);
1358        %% Drop value from A. Empty A should be matched by second close.
1359        {{{value, _}, A1}, _} ->
1360            find_common(A1, B, Common)
1361    end.
1362
1363
1364%% ---------------------------------------------------------------------------
1365%% Members helpers
1366%% ---------------------------------------------------------------------------
1367
1368with_member(Fun, Id, MembersState) ->
1369    store_member(
1370      Id, Fun(find_member_or_blank(Id, MembersState)), MembersState).
1371
1372with_member_acc(Fun, Id, {MembersState, Acc}) ->
1373    {MemberState, Acc1} = Fun(find_member_or_blank(Id, MembersState), Acc),
1374    {store_member(Id, MemberState, MembersState), Acc1}.
1375
1376find_member_or_blank(Id, MembersState) ->
1377    case maps:find(Id, MembersState) of
1378        {ok, Result} -> Result;
1379        error        -> blank_member()
1380    end.
1381
1382erase_member(Id, MembersState) -> maps:remove(Id, MembersState).
1383
1384blank_member() ->
1385    #member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }.
1386
1387blank_member_state() -> maps:new().
1388
1389store_member(Id, MemberState, MembersState) ->
1390    maps:put(Id, MemberState, MembersState).
1391
1392prepare_members_state(MembersState) -> maps:to_list(MembersState).
1393
1394build_members_state(MembersStateList) -> maps:from_list(MembersStateList).
1395
1396make_member(GroupName) ->
1397   {case dirty_read_group(GroupName) of
1398        #gm_group { version = Version } -> Version;
1399        {error, not_found}              -> ?VERSION_START
1400    end, self()}.
1401
1402remove_erased_members(MembersState, View) ->
1403    lists:foldl(fun (Id, MembersState1) ->
1404                    store_member(Id, find_member_or_blank(Id, MembersState),
1405                                 MembersState1)
1406                end, blank_member_state(), all_known_members(View)).
1407
1408get_version({Version, _Pid}) -> Version.
1409
1410get_pid({_Version, Pid}) -> Pid.
1411
1412get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids].
1413
1414%% ---------------------------------------------------------------------------
1415%% Activity assembly
1416%% ---------------------------------------------------------------------------
1417
1418activity_nil() -> queue:new().
1419
1420activity_cons(   _Id,   [],   [], Tail) -> Tail;
1421activity_cons(Sender, Pubs, Acks, Tail) -> queue:in({Sender, Pubs, Acks}, Tail).
1422
1423activity_finalise(Activity) -> queue:to_list(Activity).
1424
1425maybe_send_activity([], _State) ->
1426    ok;
1427maybe_send_activity(Activity, #state { self  = Self,
1428                                       right = {Right, _MRefR},
1429                                       view  = View }) ->
1430    send_right(Right, View, {activity, Self, Activity}).
1431
1432send_right(Right, View, Msg) ->
1433    ok = neighbour_cast(Right, {?TAG, view_version(View), Msg}).
1434
1435calculate_activity(MembersState, Confirms, Activity, Self, View) ->
1436    lists:foldl(
1437      fun ({Id, Pubs, Acks}, MembersStateConfirmsActivity) ->
1438              with_member_acc(
1439                fun (Member = #member { pending_ack = PA,
1440                                        last_pub    = LP,
1441                                        last_ack    = LA },
1442                     {Confirms2, Activity2}) ->
1443                        case is_member_alias(Id, Self, View) of
1444                            true ->
1445                                {ToAck, PA1} =
1446                                    find_common(queue_from_pubs(Pubs), PA,
1447                                                queue:new()),
1448                                LA1 = last_ack(Acks, LA),
1449                                AckNums = acks_from_queue(ToAck),
1450                                Confirms3 = maybe_confirm(
1451                                              Self, Id, Confirms2, AckNums),
1452                                {Member #member { pending_ack = PA1,
1453                                                  last_ack    = LA1 },
1454                                 {Confirms3,
1455                                  activity_cons(
1456                                    Id, [], AckNums, Activity2)}};
1457                            false ->
1458                                PA1 = apply_acks(Acks, join_pubs(PA, Pubs)),
1459                                LA1 = last_ack(Acks, LA),
1460                                LP1 = last_pub(Pubs, LP),
1461                                {Member #member { pending_ack = PA1,
1462                                                  last_pub    = LP1,
1463                                                  last_ack    = LA1 },
1464                                 {Confirms2,
1465                                  activity_cons(Id, Pubs, Acks, Activity2)}}
1466                        end
1467                end, Id, MembersStateConfirmsActivity)
1468      end, {MembersState, {Confirms, activity_nil()}}, Activity).
1469
1470callback(Args, Module, Activity) ->
1471    Result =
1472      lists:foldl(
1473        fun ({Id, Pubs, _Acks}, {Args1, Module1, ok}) ->
1474                lists:foldl(fun ({_PubNum, Pub}, Acc = {Args2, Module2, ok}) ->
1475                                    case Module2:handle_msg(
1476                                           Args2, get_pid(Id), Pub) of
1477                                        ok ->
1478                                            Acc;
1479                                        {become, Module3, Args3} ->
1480                                            {Args3, Module3, ok};
1481                                        {stop, _Reason} = Error ->
1482                                            Error
1483                                    end;
1484                                (_, Error = {stop, _Reason}) ->
1485                                    Error
1486                            end, {Args1, Module1, ok}, Pubs);
1487            (_, Error = {stop, _Reason}) ->
1488                Error
1489        end, {Args, Module, ok}, Activity),
1490    case Result of
1491        {Args, Module, ok}      -> ok;
1492        {Args1, Module1, ok}    -> {become, Module1, Args1};
1493        {stop, _Reason} = Error -> Error
1494    end.
1495
1496change_view(View, State = #state { view          = View0,
1497                                   module        = Module,
1498                                   callback_args = Args }) ->
1499    OldMembers = all_known_members(View0),
1500    NewMembers = all_known_members(View),
1501    Births = NewMembers -- OldMembers,
1502    Deaths = OldMembers -- NewMembers,
1503    Result = case {Births, Deaths} of
1504                 {[], []} -> ok;
1505                 _        -> Module:members_changed(
1506                               Args, get_pids(Births), get_pids(Deaths))
1507             end,
1508    {Result, check_neighbours(State #state { view = View })}.
1509
1510handle_callback_result({Result, State}) ->
1511    if_callback_success(
1512      Result, fun no_reply_true/3, fun no_reply_false/3, undefined, State);
1513handle_callback_result({Result, Reply, State}) ->
1514    if_callback_success(
1515      Result, fun reply_true/3, fun reply_false/3, Reply, State).
1516
1517no_reply_true (_Result,        _Undefined, State) -> noreply(State).
1518no_reply_false({stop, Reason}, _Undefined, State) -> {stop, Reason, State}.
1519
1520reply_true (_Result,        Reply, State) -> reply(Reply, State).
1521reply_false({stop, Reason}, Reply, State) -> {stop, Reason, Reply, State}.
1522
1523handle_msg_true (_Result, Msg, State) -> handle_msg(Msg, State).
1524handle_msg_false(Result, _Msg, State) -> {Result, State}.
1525
1526activity_true(_Result, Activity, State = #state { module        = Module,
1527                                                  callback_args = Args }) ->
1528    {callback(Args, Module, Activity), State}.
1529activity_false(Result, _Activity, State) ->
1530    {Result, State}.
1531
1532if_callback_success(Result, True, False, Arg, State) ->
1533    {NewResult, NewState} = maybe_stop(Result, State),
1534    if_callback_success1(NewResult, True, False, Arg, NewState).
1535
1536if_callback_success1(ok, True, _False, Arg, State) ->
1537    True(ok, Arg, State);
1538if_callback_success1(
1539  {become, Module, Args} = Result, True, _False, Arg, State) ->
1540    True(Result, Arg, State #state { module        = Module,
1541                                     callback_args = Args });
1542if_callback_success1({stop, _Reason} = Result, _True, False, Arg, State) ->
1543    False(Result, Arg, State).
1544
1545maybe_stop({stop, Reason}, #state{ shutting_down = false } = State) ->
1546    ShuttingDown = {true, Reason},
1547    case has_pending_messages(State) of
1548        true  -> {ok, State #state{ shutting_down = ShuttingDown }};
1549        false -> {{stop, Reason}, State #state{ shutting_down = ShuttingDown }}
1550    end;
1551maybe_stop(Result, #state{ shutting_down = false } = State) ->
1552    {Result, State};
1553maybe_stop(Result, #state{ shutting_down = {true, Reason} } = State) ->
1554    case has_pending_messages(State) of
1555        true  -> {Result, State};
1556        false -> {{stop, Reason}, State}
1557    end.
1558
1559has_pending_messages(#state{ broadcast_buffer = Buffer })
1560  when Buffer =/= [] ->
1561    true;
1562has_pending_messages(#state{ members_state = MembersState }) ->
1563    MembersWithPubAckMismatches = maps:filter(fun(_Id, #member{last_pub = LP, last_ack = LA}) ->
1564                                                      LP =/= LA
1565                                              end, MembersState),
1566    0 =/= maps:size(MembersWithPubAckMismatches).
1567
1568maybe_confirm(_Self, _Id, Confirms, []) ->
1569    Confirms;
1570maybe_confirm(Self, Self, Confirms, [PubNum | PubNums]) ->
1571    case queue:out(Confirms) of
1572        {empty, _Confirms} ->
1573            Confirms;
1574        {{value, {PubNum, From}}, Confirms1} ->
1575            gen_server2:reply(From, ok),
1576            maybe_confirm(Self, Self, Confirms1, PubNums);
1577        {{value, {PubNum1, _From}}, _Confirms} when PubNum1 > PubNum ->
1578            maybe_confirm(Self, Self, Confirms, PubNums)
1579    end;
1580maybe_confirm(_Self, _Id, Confirms, _PubNums) ->
1581    Confirms.
1582
1583purge_confirms(Confirms) ->
1584    _ = [gen_server2:reply(From, ok) || {_PubNum, From} <- queue:to_list(Confirms)],
1585    queue:new().
1586
1587
1588%% ---------------------------------------------------------------------------
1589%% Msg transformation
1590%% ---------------------------------------------------------------------------
1591
1592acks_from_queue(Q) -> [PubNum || {PubNum, _Msg} <- queue:to_list(Q)].
1593
1594pubs_from_queue(Q) -> queue:to_list(Q).
1595
1596queue_from_pubs(Pubs) -> queue:from_list(Pubs).
1597
1598apply_acks(  [], Pubs) -> Pubs;
1599apply_acks(List, Pubs) -> {_, Pubs1} = queue:split(length(List), Pubs),
1600                          Pubs1.
1601
1602join_pubs(Q, [])   -> Q;
1603join_pubs(Q, Pubs) -> queue:join(Q, queue_from_pubs(Pubs)).
1604
1605last_ack(  [], LA) -> LA;
1606last_ack(List, LA) -> LA1 = lists:last(List),
1607                      true = LA1 > LA, %% ASSERTION
1608                      LA1.
1609
1610last_pub(  [], LP) -> LP;
1611last_pub(List, LP) -> {PubNum, _Msg} = lists:last(List),
1612                      true = PubNum > LP, %% ASSERTION
1613                      PubNum.
1614
1615%% ---------------------------------------------------------------------------
1616
1617%% Uninstrumented versions
1618
1619call(Pid, Msg, Timeout) -> gen_server2:call(Pid, Msg, Timeout).
1620cast(Pid, Msg)          -> gen_server2:cast(Pid, Msg).
1621monitor(Pid)            -> erlang:monitor(process, Pid).
1622demonitor(MRef)         -> erlang:demonitor(MRef).
1623
1624check_membership(Self, #gm_group{members = M} = Group) ->
1625    case lists:member(Self, M) of
1626        true ->
1627            Group;
1628        false ->
1629            throw(lost_membership)
1630    end;
1631check_membership(_Self, {error, not_found}) ->
1632    throw(lost_membership).
1633
1634check_membership(GroupName) ->
1635    case dirty_read_group(GroupName) of
1636        #gm_group{members = M} ->
1637            case lists:keymember(self(), 2, M) of
1638                true ->
1639                    ok;
1640                false ->
1641                    throw(lost_membership)
1642            end;
1643        {error, not_found} ->
1644            throw(lost_membership)
1645    end.
1646
1647check_group({error, not_found}) ->
1648    throw(lost_membership);
1649check_group(Any) ->
1650    Any.
1651