1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(gm). 9 10%% Guaranteed Multicast 11%% ==================== 12%% 13%% This module provides the ability to create named groups of 14%% processes to which members can be dynamically added and removed, 15%% and for messages to be broadcast within the group that are 16%% guaranteed to reach all members of the group during the lifetime of 17%% the message. The lifetime of a message is defined as being, at a 18%% minimum, the time from which the message is first sent to any 19%% member of the group, up until the time at which it is known by the 20%% member who published the message that the message has reached all 21%% group members. 22%% 23%% The guarantee given is that provided a message, once sent, makes it 24%% to members who do not all leave the group, the message will 25%% continue to propagate to all group members. 26%% 27%% Another way of stating the guarantee is that if member P publishes 28%% messages m and m', then for all members P', if P' is a member of 29%% the group prior to the publication of m, and P' receives m', then 30%% P' will receive m. 31%% 32%% Note that only local-ordering is enforced: i.e. if member P sends 33%% message m and then message m', then for-all members P', if P' 34%% receives m and m', then they will receive m' after m. Causality 35%% ordering is _not_ enforced. I.e. if member P receives message m 36%% and as a result publishes message m', there is no guarantee that 37%% other members P' will receive m before m'. 38%% 39%% 40%% API Use 41%% ------- 42%% 43%% Mnesia must be started. Use the idempotent create_tables/0 function 44%% to create the tables required. 45%% 46%% start_link/3 47%% Provide the group name, the callback module name, and any arguments 48%% you wish to be passed into the callback module's functions. The 49%% joined/2 function will be called when we have joined the group, 50%% with the arguments passed to start_link and a list of the current 51%% members of the group. See the callbacks specs and the comments 52%% below for further details of the callback functions. 53%% 54%% leave/1 55%% Provide the Pid. Removes the Pid from the group. The callback 56%% handle_terminate/2 function will be called. 57%% 58%% broadcast/2 59%% Provide the Pid and a Message. The message will be sent to all 60%% members of the group as per the guarantees given above. This is a 61%% cast and the function call will return immediately. There is no 62%% guarantee that the message will reach any member of the group. 63%% 64%% confirmed_broadcast/2 65%% Provide the Pid and a Message. As per broadcast/2 except that this 66%% is a call, not a cast, and only returns 'ok' once the Message has 67%% reached every member of the group. Do not call 68%% confirmed_broadcast/2 directly from the callback module otherwise 69%% you will deadlock the entire group. 70%% 71%% info/1 72%% Provide the Pid. Returns a proplist with various facts, including 73%% the group name and the current group members. 74%% 75%% validate_members/2 76%% Check whether a given member list agrees with the chosen member's 77%% view. Any differences will be communicated via the members_changed 78%% callback. If there are no differences then there will be no reply. 79%% Note that members will not necessarily share the same view. 80%% 81%% forget_group/1 82%% Provide the group name. Removes its mnesia record. Makes no attempt 83%% to ensure the group is empty. 84%% 85%% Implementation Overview 86%% ----------------------- 87%% 88%% One possible means of implementation would be a fan-out from the 89%% sender to every member of the group. This would require that the 90%% group is fully connected, and, in the event that the original 91%% sender of the message disappears from the group before the message 92%% has made it to every member of the group, raises questions as to 93%% who is responsible for sending on the message to new group members. 94%% In particular, the issue is with [ Pid ! Msg || Pid <- Members ] - 95%% if the sender dies part way through, who is responsible for 96%% ensuring that the remaining Members receive the Msg? In the event 97%% that within the group, messages sent are broadcast from a subset of 98%% the members, the fan-out arrangement has the potential to 99%% substantially impact the CPU and network workload of such members, 100%% as such members would have to accommodate the cost of sending each 101%% message to every group member. 102%% 103%% Instead, if the members of the group are arranged in a chain, then 104%% it becomes easier to reason about who within the group has received 105%% each message and who has not. It eases issues of responsibility: in 106%% the event of a group member disappearing, the nearest upstream 107%% member of the chain is responsible for ensuring that messages 108%% continue to propagate down the chain. It also results in equal 109%% distribution of sending and receiving workload, even if all 110%% messages are being sent from just a single group member. This 111%% configuration has the further advantage that it is not necessary 112%% for every group member to know of every other group member, and 113%% even that a group member does not have to be accessible from all 114%% other group members. 115%% 116%% Performance is kept high by permitting pipelining and all 117%% communication between joined group members is asynchronous. In the 118%% chain A -> B -> C -> D, if A sends a message to the group, it will 119%% not directly contact C or D. However, it must know that D receives 120%% the message (in addition to B and C) before it can consider the 121%% message fully sent. A simplistic implementation would require that 122%% D replies to C, C replies to B and B then replies to A. This would 123%% result in a propagation delay of twice the length of the chain. It 124%% would also require, in the event of the failure of C, that D knows 125%% to directly contact B and issue the necessary replies. Instead, the 126%% chain forms a ring: D sends the message on to A: D does not 127%% distinguish A as the sender, merely as the next member (downstream) 128%% within the chain (which has now become a ring). When A receives 129%% from D messages that A sent, it knows that all members have 130%% received the message. However, the message is not dead yet: if C 131%% died as B was sending to C, then B would need to detect the death 132%% of C and forward the message on to D instead: thus every node has 133%% to remember every message published until it is told that it can 134%% forget about the message. This is essential not just for dealing 135%% with failure of members, but also for the addition of new members. 136%% 137%% Thus once A receives the message back again, it then sends to B an 138%% acknowledgement for the message, indicating that B can now forget 139%% about the message. B does so, and forwards the ack to C. C forgets 140%% the message, and forwards the ack to D, which forgets the message 141%% and finally forwards the ack back to A. At this point, A takes no 142%% further action: the message and its acknowledgement have made it to 143%% every member of the group. The message is now dead, and any new 144%% member joining the group at this point will not receive the 145%% message. 146%% 147%% We therefore have two roles: 148%% 149%% 1. The sender, who upon receiving their own messages back, must 150%% then send out acknowledgements, and upon receiving their own 151%% acknowledgements back perform no further action. 152%% 153%% 2. The other group members who upon receiving messages and 154%% acknowledgements must update their own internal state accordingly 155%% (the sending member must also do this in order to be able to 156%% accommodate failures), and forwards messages on to their downstream 157%% neighbours. 158%% 159%% 160%% Implementation: It gets trickier 161%% -------------------------------- 162%% 163%% Chain A -> B -> C -> D 164%% 165%% A publishes a message which B receives. A now dies. B and D will 166%% detect the death of A, and will link up, thus the chain is now B -> 167%% C -> D. B forwards A's message on to C, who forwards it to D, who 168%% forwards it to B. Thus B is now responsible for A's messages - both 169%% publications and acknowledgements that were in flight at the point 170%% at which A died. Even worse is that this is transitive: after B 171%% forwards A's message to C, B dies as well. Now C is not only 172%% responsible for B's in-flight messages, but is also responsible for 173%% A's in-flight messages. 174%% 175%% Lemma 1: A member can only determine which dead members they have 176%% inherited responsibility for if there is a total ordering on the 177%% conflicting additions and subtractions of members from the group. 178%% 179%% Consider the simultaneous death of B and addition of B' that 180%% transitions a chain from A -> B -> C to A -> B' -> C. Either B' or 181%% C is responsible for in-flight messages from B. It is easy to 182%% ensure that at least one of them thinks they have inherited B, but 183%% if we do not ensure that exactly one of them inherits B, then we 184%% could have B' converting publishes to acks, which then will crash C 185%% as C does not believe it has issued acks for those messages. 186%% 187%% More complex scenarios are easy to concoct: A -> B -> C -> D -> E 188%% becoming A -> C' -> E. Who has inherited which of B, C and D? 189%% 190%% However, for non-conflicting membership changes, only a partial 191%% ordering is required. For example, A -> B -> C becoming A -> A' -> 192%% B. The addition of A', between A and B can have no conflicts with 193%% the death of C: it is clear that A has inherited C's messages. 194%% 195%% For ease of implementation, we adopt the simple solution, of 196%% imposing a total order on all membership changes. 197%% 198%% On the death of a member, it is ensured the dead member's 199%% neighbours become aware of the death, and the upstream neighbour 200%% now sends to its new downstream neighbour its state, including the 201%% messages pending acknowledgement. The downstream neighbour can then 202%% use this to calculate which publishes and acknowledgements it has 203%% missed out on, due to the death of its old upstream. Thus the 204%% downstream can catch up, and continues the propagation of messages 205%% through the group. 206%% 207%% Lemma 2: When a member is joining, it must synchronously 208%% communicate with its upstream member in order to receive its 209%% starting state atomically with its addition to the group. 210%% 211%% New members must start with the same state as their nearest 212%% upstream neighbour. This ensures that it is not surprised by 213%% acknowledgements they are sent, and that should their downstream 214%% neighbour die, they are able to send the correct state to their new 215%% downstream neighbour to ensure it can catch up. Thus in the 216%% transition A -> B -> C becomes A -> A' -> B -> C becomes A -> A' -> 217%% C, A' must start with the state of A, so that it can send C the 218%% correct state when B dies, allowing C to detect any missed 219%% messages. 220%% 221%% If A' starts by adding itself to the group membership, A could then 222%% die, without A' having received the necessary state from A. This 223%% would leave A' responsible for in-flight messages from A, but 224%% having the least knowledge of all, of those messages. Thus A' must 225%% start by synchronously calling A, which then immediately sends A' 226%% back its state. A then adds A' to the group. If A dies at this 227%% point then A' will be able to see this (as A' will fail to appear 228%% in the group membership), and thus A' will ignore the state it 229%% receives from A, and will simply repeat the process, trying to now 230%% join downstream from some other member. This ensures that should 231%% the upstream die as soon as the new member has been joined, the new 232%% member is guaranteed to receive the correct state, allowing it to 233%% correctly process messages inherited due to the death of its 234%% upstream neighbour. 235%% 236%% The canonical definition of the group membership is held by a 237%% distributed database. Whilst this allows the total ordering of 238%% changes to be achieved, it is nevertheless undesirable to have to 239%% query this database for the current view, upon receiving each 240%% message. Instead, we wish for members to be able to cache a view of 241%% the group membership, which then requires a cache invalidation 242%% mechanism. Each member maintains its own view of the group 243%% membership. Thus when the group's membership changes, members may 244%% need to become aware of such changes in order to be able to 245%% accurately process messages they receive. Because of the 246%% requirement of a total ordering of conflicting membership changes, 247%% it is not possible to use the guaranteed broadcast mechanism to 248%% communicate these changes: to achieve the necessary ordering, it 249%% would be necessary for such messages to be published by exactly one 250%% member, which can not be guaranteed given that such a member could 251%% die. 252%% 253%% The total ordering we enforce on membership changes gives rise to a 254%% view version number: every change to the membership creates a 255%% different view, and the total ordering permits a simple 256%% monotonically increasing view version number. 257%% 258%% Lemma 3: If a message is sent from a member that holds view version 259%% N, it can be correctly processed by any member receiving the 260%% message with a view version >= N. 261%% 262%% Initially, let us suppose that each view contains the ordering of 263%% every member that was ever part of the group. Dead members are 264%% marked as such. Thus we have a ring of members, some of which are 265%% dead, and are thus inherited by the nearest alive downstream 266%% member. 267%% 268%% In the chain A -> B -> C, all three members initially have view 269%% version 1, which reflects reality. B publishes a message, which is 270%% forward by C to A. B now dies, which A notices very quickly. Thus A 271%% updates the view, creating version 2. It now forwards B's 272%% publication, sending that message to its new downstream neighbour, 273%% C. This happens before C is aware of the death of B. C must become 274%% aware of the view change before it interprets the message its 275%% received, otherwise it will fail to learn of the death of B, and 276%% thus will not realise it has inherited B's messages (and will 277%% likely crash). 278%% 279%% Thus very simply, we have that each subsequent view contains more 280%% information than the preceding view. 281%% 282%% However, to avoid the views growing indefinitely, we need to be 283%% able to delete members which have died _and_ for which no messages 284%% are in-flight. This requires that upon inheriting a dead member, we 285%% know the last publication sent by the dead member (this is easy: we 286%% inherit a member because we are the nearest downstream member which 287%% implies that we know at least as much than everyone else about the 288%% publications of the dead member), and we know the earliest message 289%% for which the acknowledgement is still in flight. 290%% 291%% In the chain A -> B -> C, when B dies, A will send to C its state 292%% (as C is the new downstream from A), allowing C to calculate which 293%% messages it has missed out on (described above). At this point, C 294%% also inherits B's messages. If that state from A also includes the 295%% last message published by B for which an acknowledgement has been 296%% seen, then C knows exactly which further acknowledgements it must 297%% receive (also including issuing acknowledgements for publications 298%% still in-flight that it receives), after which it is known there 299%% are no more messages in flight for B, thus all evidence that B was 300%% ever part of the group can be safely removed from the canonical 301%% group membership. 302%% 303%% Thus, for every message that a member sends, it includes with that 304%% message its view version. When a member receives a message it will 305%% update its view from the canonical copy, should its view be older 306%% than the view version included in the message it has received. 307%% 308%% The state held by each member therefore includes the messages from 309%% each publisher pending acknowledgement, the last publication seen 310%% from that publisher, and the last acknowledgement from that 311%% publisher. In the case of the member's own publications or 312%% inherited members, this last acknowledgement seen state indicates 313%% the last acknowledgement retired, rather than sent. 314%% 315%% 316%% Proof sketch 317%% ------------ 318%% 319%% We need to prove that with the provided operational semantics, we 320%% can never reach a state that is not well formed from a well-formed 321%% starting state. 322%% 323%% Operational semantics (small step): straight-forward message 324%% sending, process monitoring, state updates. 325%% 326%% Well formed state: dead members inherited by exactly one non-dead 327%% member; for every entry in anyone's pending-acks, either (the 328%% publication of the message is in-flight downstream from the member 329%% and upstream from the publisher) or (the acknowledgement of the 330%% message is in-flight downstream from the publisher and upstream 331%% from the member). 332%% 333%% Proof by induction on the applicable operational semantics. 334%% 335%% 336%% Related work 337%% ------------ 338%% 339%% The ring configuration and double traversal of messages around the 340%% ring is similar (though developed independently) to the LCR 341%% protocol by [Levy 2008]. However, LCR differs in several 342%% ways. Firstly, by using vector clocks, it enforces a total order of 343%% message delivery, which is unnecessary for our purposes. More 344%% significantly, it is built on top of a "group communication system" 345%% which performs the group management functions, taking 346%% responsibility away from the protocol as to how to cope with safely 347%% adding and removing members. When membership changes do occur, the 348%% protocol stipulates that every member must perform communication 349%% with every other member of the group, to ensure all outstanding 350%% deliveries complete, before the entire group transitions to the new 351%% view. This, in total, requires two sets of all-to-all synchronous 352%% communications. 353%% 354%% This is not only rather inefficient, but also does not explain what 355%% happens upon the failure of a member during this process. It does 356%% though entirely avoid the need for inheritance of responsibility of 357%% dead members that our protocol incorporates. 358%% 359%% In [Marandi et al 2010], a Paxos-based protocol is described. This 360%% work explicitly focuses on the efficiency of communication. LCR 361%% (and our protocol too) are more efficient, but at the cost of 362%% higher latency. The Ring-Paxos protocol is itself built on top of 363%% IP-multicast, which rules it out for many applications where 364%% point-to-point communication is all that can be required. They also 365%% have an excellent related work section which I really ought to 366%% read... 367%% 368%% 369%% [Levy 2008] The Complexity of Reliable Distributed Storage, 2008. 370%% [Marandi et al 2010] Ring Paxos: A High-Throughput Atomic Broadcast 371%% Protocol 372 373 374-behaviour(gen_server2). 375 376-export([create_tables/0, start_link/4, leave/1, broadcast/2, broadcast/3, 377 confirmed_broadcast/2, info/1, validate_members/2, forget_group/1]). 378 379-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, 380 code_change/3, prioritise_info/3]). 381 382%% For INSTR_MOD callbacks 383-export([call/3, cast/2, monitor/1, demonitor/1]). 384 385-export([table_definitions/0]). 386 387-define(GROUP_TABLE, gm_group). 388-define(MAX_BUFFER_SIZE, 100000000). %% 100MB 389-define(BROADCAST_TIMER, 25). 390-define(FORCE_GC_TIMER, 250). 391-define(VERSION_START, 0). 392-define(SETS, ordsets). 393 394-record(state, 395 { self, 396 left, 397 right, 398 group_name, 399 module, 400 view, 401 pub_count, 402 members_state, 403 callback_args, 404 confirms, 405 broadcast_buffer, 406 broadcast_buffer_sz, 407 broadcast_timer, 408 force_gc_timer, 409 txn_executor, 410 shutting_down 411 }). 412 413-record(gm_group, { name, version, members }). 414 415-record(view_member, { id, aliases, left, right }). 416 417-record(member, { pending_ack, last_pub, last_ack }). 418 419-define(TABLE, {?GROUP_TABLE, [{record_name, gm_group}, 420 {attributes, record_info(fields, gm_group)}]}). 421-define(TABLE_MATCH, {match, #gm_group { _ = '_' }}). 422 423-define(TAG, '$gm'). 424 425-export_type([group_name/0]). 426 427-type group_name() :: any(). 428-type txn_fun() :: fun((fun(() -> any())) -> any()). 429 430%% The joined, members_changed and handle_msg callbacks can all return 431%% any of the following terms: 432%% 433%% 'ok' - the callback function returns normally 434%% 435%% {'stop', Reason} - the callback indicates the member should stop 436%% with reason Reason and should leave the group. 437%% 438%% {'become', Module, Args} - the callback indicates that the callback 439%% module should be changed to Module and that the callback functions 440%% should now be passed the arguments Args. This allows the callback 441%% module to be dynamically changed. 442 443%% Called when we've successfully joined the group. Supplied with Args 444%% provided in start_link, plus current group members. 445-callback joined(Args :: term(), Members :: [pid()]) -> 446 ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}. 447 448%% Supplied with Args provided in start_link, the list of new members 449%% and the list of members previously known to us that have since 450%% died. Note that if a member joins and dies very quickly, it's 451%% possible that we will never see that member appear in either births 452%% or deaths. However we are guaranteed that (1) we will see a member 453%% joining either in the births here, or in the members passed to 454%% joined/2 before receiving any messages from it; and (2) we will not 455%% see members die that we have not seen born (or supplied in the 456%% members to joined/2). 457-callback members_changed(Args :: term(), 458 Births :: [pid()], Deaths :: [pid()]) -> 459 ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}. 460 461%% Supplied with Args provided in start_link, the sender, and the 462%% message. This does get called for messages injected by this member, 463%% however, in such cases, there is no special significance of this 464%% invocation: it does not indicate that the message has made it to 465%% any other members, let alone all other members. 466-callback handle_msg(Args :: term(), From :: pid(), Message :: term()) -> 467 ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}. 468 469%% Called on gm member termination as per rules in gen_server, with 470%% the Args provided in start_link plus the termination Reason. 471-callback handle_terminate(Args :: term(), Reason :: term()) -> 472 ok | term(). 473 474-spec create_tables() -> 'ok' | {'aborted', any()}. 475 476create_tables() -> 477 create_tables([?TABLE]). 478 479create_tables([]) -> 480 ok; 481create_tables([{Table, Attributes} | Tables]) -> 482 case mnesia:create_table(Table, Attributes) of 483 {atomic, ok} -> create_tables(Tables); 484 {aborted, {already_exists, Table}} -> create_tables(Tables); 485 Err -> Err 486 end. 487 488table_definitions() -> 489 {Name, Attributes} = ?TABLE, 490 [{Name, [?TABLE_MATCH | Attributes]}]. 491 492-spec start_link(group_name(), atom(), any(), txn_fun()) -> 493 rabbit_types:ok_pid_or_error(). 494 495start_link(GroupName, Module, Args, TxnFun) -> 496 gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], 497 [{spawn_opt, [{fullsweep_after, 0}]}]). 498 499-spec leave(pid()) -> 'ok'. 500 501leave(Server) -> 502 gen_server2:cast(Server, leave). 503 504-spec broadcast(pid(), any()) -> 'ok'. 505 506broadcast(Server, Msg) -> broadcast(Server, Msg, 0). 507 508broadcast(Server, Msg, SizeHint) -> 509 gen_server2:cast(Server, {broadcast, Msg, SizeHint}). 510 511-spec confirmed_broadcast(pid(), any()) -> 'ok'. 512 513confirmed_broadcast(Server, Msg) -> 514 gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity). 515 516-spec info(pid()) -> rabbit_types:infos(). 517 518info(Server) -> 519 gen_server2:call(Server, info, infinity). 520 521-spec validate_members(pid(), [pid()]) -> 'ok'. 522 523validate_members(Server, Members) -> 524 gen_server2:cast(Server, {validate_members, Members}). 525 526-spec forget_group(group_name()) -> 'ok'. 527 528forget_group(GroupName) -> 529 {atomic, ok} = mnesia:sync_transaction( 530 fun () -> 531 mnesia:delete({?GROUP_TABLE, GroupName}) 532 end), 533 ok. 534 535init([GroupName, Module, Args, TxnFun]) -> 536 put(process_name, {?MODULE, GroupName}), 537 Self = make_member(GroupName), 538 gen_server2:cast(self(), join), 539 {ok, #state { self = Self, 540 left = {Self, undefined}, 541 right = {Self, undefined}, 542 group_name = GroupName, 543 module = Module, 544 view = undefined, 545 pub_count = -1, 546 members_state = undefined, 547 callback_args = Args, 548 confirms = queue:new(), 549 broadcast_buffer = [], 550 broadcast_buffer_sz = 0, 551 broadcast_timer = undefined, 552 force_gc_timer = undefined, 553 txn_executor = TxnFun, 554 shutting_down = false }}. 555 556 557handle_call({confirmed_broadcast, _Msg}, _From, 558 State = #state { shutting_down = {true, _} }) -> 559 reply(shutting_down, State); 560 561handle_call({confirmed_broadcast, _Msg}, _From, 562 State = #state { members_state = undefined }) -> 563 reply(not_joined, State); 564 565handle_call({confirmed_broadcast, Msg}, _From, 566 State = #state { self = Self, 567 right = {Self, undefined}, 568 module = Module, 569 callback_args = Args }) -> 570 handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg), 571 ok, State}); 572 573handle_call({confirmed_broadcast, Msg}, From, State) -> 574 {Result, State1 = #state { pub_count = PubCount, confirms = Confirms }} = 575 internal_broadcast(Msg, 0, State), 576 Confirms1 = queue:in({PubCount, From}, Confirms), 577 handle_callback_result({Result, flush_broadcast_buffer( 578 State1 #state { confirms = Confirms1 })}); 579 580handle_call(info, _From, 581 State = #state { members_state = undefined }) -> 582 reply(not_joined, State); 583 584handle_call(info, _From, State = #state { group_name = GroupName, 585 module = Module, 586 view = View }) -> 587 reply([{group_name, GroupName}, 588 {module, Module}, 589 {group_members, get_pids(alive_view_members(View))}], State); 590 591handle_call({add_on_right, _NewMember}, _From, 592 State = #state { members_state = undefined }) -> 593 reply(not_ready, State); 594 595handle_call({add_on_right, NewMember}, _From, 596 State = #state { self = Self, 597 group_name = GroupName, 598 members_state = MembersState, 599 txn_executor = TxnFun }) -> 600 try 601 Group = record_new_member_in_group( 602 NewMember, Self, GroupName, TxnFun), 603 View1 = group_to_view(check_membership(Self, Group)), 604 MembersState1 = remove_erased_members(MembersState, View1), 605 ok = send_right(NewMember, View1, 606 {catchup, Self, prepare_members_state(MembersState1)}), 607 {Result, State1} = change_view(View1, State #state { 608 members_state = MembersState1 }), 609 handle_callback_result({Result, {ok, Group}, State1}) 610 catch 611 lost_membership -> 612 {stop, shutdown, State} 613 end. 614 615%% add_on_right causes a catchup to be sent immediately from the left, 616%% so we can never see this from the left neighbour. However, it's 617%% possible for the right neighbour to send us a check_neighbours 618%% immediately before that. We can't possibly handle it, but if we're 619%% in this state we know a catchup is coming imminently anyway. So 620%% just ignore it. 621handle_cast({?TAG, _ReqVer, check_neighbours}, 622 State = #state { members_state = undefined }) -> 623 noreply(State); 624 625handle_cast({?TAG, ReqVer, Msg}, 626 State = #state { view = View, 627 self = Self, 628 members_state = MembersState, 629 group_name = GroupName }) -> 630 try 631 {Result, State1} = 632 case needs_view_update(ReqVer, View) of 633 true -> 634 View1 = group_to_view( 635 check_membership(Self, 636 dirty_read_group(GroupName))), 637 MemberState1 = remove_erased_members(MembersState, View1), 638 change_view(View1, State #state { 639 members_state = MemberState1 }); 640 false -> {ok, State} 641 end, 642 handle_callback_result( 643 if_callback_success( 644 Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1)) 645 catch 646 lost_membership -> 647 {stop, shutdown, State} 648 end; 649 650handle_cast({broadcast, _Msg, _SizeHint}, 651 State = #state { shutting_down = {true, _} }) -> 652 noreply(State); 653 654handle_cast({broadcast, _Msg, _SizeHint}, 655 State = #state { members_state = undefined }) -> 656 noreply(State); 657 658handle_cast({broadcast, Msg, _SizeHint}, 659 State = #state { self = Self, 660 right = {Self, undefined}, 661 module = Module, 662 callback_args = Args }) -> 663 handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg), 664 State}); 665 666handle_cast({broadcast, Msg, SizeHint}, State) -> 667 {Result, State1} = internal_broadcast(Msg, SizeHint, State), 668 handle_callback_result({Result, maybe_flush_broadcast_buffer(State1)}); 669 670handle_cast(join, State = #state { self = Self, 671 group_name = GroupName, 672 members_state = undefined, 673 module = Module, 674 callback_args = Args, 675 txn_executor = TxnFun }) -> 676 try 677 View = join_group(Self, GroupName, TxnFun), 678 MembersState = 679 case alive_view_members(View) of 680 [Self] -> blank_member_state(); 681 _ -> undefined 682 end, 683 State1 = check_neighbours(State #state { view = View, 684 members_state = MembersState }), 685 handle_callback_result( 686 {Module:joined(Args, get_pids(all_known_members(View))), State1}) 687 catch 688 lost_membership -> 689 {stop, shutdown, State} 690 end; 691 692handle_cast({validate_members, OldMembers}, 693 State = #state { view = View, 694 module = Module, 695 callback_args = Args }) -> 696 NewMembers = get_pids(all_known_members(View)), 697 Births = NewMembers -- OldMembers, 698 Deaths = OldMembers -- NewMembers, 699 case {Births, Deaths} of 700 {[], []} -> noreply(State); 701 _ -> Result = Module:members_changed(Args, Births, Deaths), 702 handle_callback_result({Result, State}) 703 end; 704 705handle_cast(leave, State) -> 706 {stop, normal, State}. 707 708 709handle_info(force_gc, State) -> 710 garbage_collect(), 711 noreply(State #state { force_gc_timer = undefined }); 712 713handle_info(flush, State) -> 714 noreply( 715 flush_broadcast_buffer(State #state { broadcast_timer = undefined })); 716 717handle_info(timeout, State) -> 718 noreply(flush_broadcast_buffer(State)); 719 720handle_info({'DOWN', _MRef, process, _Pid, _Reason}, 721 State = #state { shutting_down = 722 {true, {shutdown, ring_shutdown}} }) -> 723 noreply(State); 724handle_info({'DOWN', MRef, process, _Pid, Reason}, 725 State = #state { self = Self, 726 left = Left, 727 right = Right, 728 group_name = GroupName, 729 confirms = Confirms, 730 txn_executor = TxnFun }) -> 731 try 732 check_membership(GroupName), 733 Member = case {Left, Right} of 734 {{Member1, MRef}, _} -> Member1; 735 {_, {Member1, MRef}} -> Member1; 736 _ -> undefined 737 end, 738 case {Member, Reason} of 739 {undefined, _} -> 740 noreply(State); 741 {_, {shutdown, ring_shutdown}} -> 742 noreply(State); 743 _ -> 744 %% In the event of a partial partition we could see another member 745 %% go down and then remove them from Mnesia. While they can 746 %% recover from this they'd have to restart the queue - not 747 %% ideal. So let's sleep here briefly just in case this was caused 748 %% by a partial partition; in which case by the time we record the 749 %% member death in Mnesia we will probably be in a full 750 %% partition and will not be assassinating another member. 751 timer:sleep(100), 752 View1 = group_to_view(record_dead_member_in_group(Self, 753 Member, GroupName, TxnFun, true)), 754 handle_callback_result( 755 case alive_view_members(View1) of 756 [Self] -> maybe_erase_aliases( 757 State #state { 758 members_state = blank_member_state(), 759 confirms = purge_confirms(Confirms) }, 760 View1); 761 _ -> change_view(View1, State) 762 end) 763 end 764 catch 765 lost_membership -> 766 {stop, shutdown, State} 767 end; 768handle_info(_, State) -> 769 %% Discard any unexpected messages, such as late replies from neighbour_call/2 770 %% TODO: For #gm_group{} related info messages, it could be worthwhile to 771 %% change_view/2, as this might reflect an alteration in the gm group, meaning 772 %% we now need to update our state. see rabbitmq-server#914. 773 noreply(State). 774 775terminate(Reason, #state { module = Module, callback_args = Args }) -> 776 Module:handle_terminate(Args, Reason). 777 778code_change(_OldVsn, State, _Extra) -> 779 {ok, State}. 780 781prioritise_info(flush, _Len, _State) -> 782 1; 783%% DOWN messages should not overtake initial catchups; if they do we 784%% will receive a DOWN we do not know what to do with. 785prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _Len, 786 #state { members_state = undefined }) -> 787 0; 788%% We should not prioritise DOWN messages from our left since 789%% otherwise the DOWN can overtake any last activity from the left, 790%% causing that activity to be lost. 791prioritise_info({'DOWN', _MRef, process, LeftPid, _Reason}, _Len, 792 #state { left = {{_LeftVer, LeftPid}, _MRef2} }) -> 793 0; 794%% But prioritise all other DOWNs - we want to make sure we are not 795%% sending activity into the void for too long because our right is 796%% down but we don't know it. 797prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _Len, _State) -> 798 1; 799prioritise_info(_, _Len, _State) -> 800 0. 801 802 803handle_msg(check_neighbours, State) -> 804 %% no-op - it's already been done by the calling handle_cast 805 {ok, State}; 806 807handle_msg({catchup, Left, MembersStateLeft}, 808 State = #state { self = Self, 809 left = {Left, _MRefL}, 810 right = {Right, _MRefR}, 811 view = View, 812 members_state = undefined }) -> 813 ok = send_right(Right, View, {catchup, Self, MembersStateLeft}), 814 MembersStateLeft1 = build_members_state(MembersStateLeft), 815 {ok, State #state { members_state = MembersStateLeft1 }}; 816 817handle_msg({catchup, Left, MembersStateLeft}, 818 State = #state { self = Self, 819 left = {Left, _MRefL}, 820 view = View, 821 members_state = MembersState }) 822 when MembersState =/= undefined -> 823 MembersStateLeft1 = build_members_state(MembersStateLeft), 824 AllMembers = lists:usort(maps:keys(MembersState) ++ 825 maps:keys(MembersStateLeft1)), 826 {MembersState1, Activity} = 827 lists:foldl( 828 fun (Id, MembersStateActivity) -> 829 #member { pending_ack = PALeft, last_ack = LA } = 830 find_member_or_blank(Id, MembersStateLeft1), 831 with_member_acc( 832 fun (#member { pending_ack = PA } = Member, Activity1) -> 833 case is_member_alias(Id, Self, View) of 834 true -> 835 {_AcksInFlight, Pubs, _PA1} = 836 find_prefix_common_suffix(PALeft, PA), 837 {Member #member { last_ack = LA }, 838 activity_cons(Id, pubs_from_queue(Pubs), 839 [], Activity1)}; 840 false -> 841 {Acks, _Common, Pubs} = 842 find_prefix_common_suffix(PA, PALeft), 843 {Member, 844 activity_cons(Id, pubs_from_queue(Pubs), 845 acks_from_queue(Acks), 846 Activity1)} 847 end 848 end, Id, MembersStateActivity) 849 end, {MembersState, activity_nil()}, AllMembers), 850 handle_msg({activity, Left, activity_finalise(Activity)}, 851 State #state { members_state = MembersState1 }); 852 853handle_msg({catchup, _NotLeft, _MembersState}, State) -> 854 {ok, State}; 855 856handle_msg({activity, Left, Activity}, 857 State = #state { self = Self, 858 group_name = GroupName, 859 left = {Left, _MRefL}, 860 view = View, 861 members_state = MembersState, 862 confirms = Confirms }) 863 when MembersState =/= undefined -> 864 try 865 %% If we have to stop, do it asap so we avoid any ack confirmation 866 %% Membership must be checked again by erase_members_in_group, as the 867 %% node can be marked as dead on the meanwhile 868 check_membership(GroupName), 869 {MembersState1, {Confirms1, Activity1}} = 870 calculate_activity(MembersState, Confirms, Activity, Self, View), 871 State1 = State #state { members_state = MembersState1, 872 confirms = Confirms1 }, 873 Activity3 = activity_finalise(Activity1), 874 ok = maybe_send_activity(Activity3, State1), 875 {Result, State2} = maybe_erase_aliases(State1, View), 876 if_callback_success( 877 Result, fun activity_true/3, fun activity_false/3, Activity3, State2) 878 catch 879 lost_membership -> 880 {{stop, shutdown}, State} 881 end; 882 883handle_msg({activity, _NotLeft, _Activity}, State) -> 884 {ok, State}. 885 886 887noreply(State) -> 888 {noreply, ensure_timers(State), flush_timeout(State)}. 889 890reply(Reply, State) -> 891 {reply, Reply, ensure_timers(State), flush_timeout(State)}. 892 893ensure_timers(State) -> 894 ensure_force_gc_timer(ensure_broadcast_timer(State)). 895 896flush_timeout(#state{broadcast_buffer = []}) -> infinity; 897flush_timeout(_) -> 0. 898 899ensure_force_gc_timer(State = #state { force_gc_timer = TRef }) 900 when is_reference(TRef) -> 901 State; 902ensure_force_gc_timer(State = #state { force_gc_timer = undefined }) -> 903 TRef = erlang:send_after(?FORCE_GC_TIMER, self(), force_gc), 904 State #state { force_gc_timer = TRef }. 905 906ensure_broadcast_timer(State = #state { broadcast_buffer = [], 907 broadcast_timer = undefined }) -> 908 State; 909ensure_broadcast_timer(State = #state { broadcast_buffer = [], 910 broadcast_timer = TRef }) -> 911 _ = erlang:cancel_timer(TRef), 912 State #state { broadcast_timer = undefined }; 913ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) -> 914 TRef = erlang:send_after(?BROADCAST_TIMER, self(), flush), 915 State #state { broadcast_timer = TRef }; 916ensure_broadcast_timer(State) -> 917 State. 918 919internal_broadcast(Msg, SizeHint, 920 State = #state { self = Self, 921 pub_count = PubCount, 922 module = Module, 923 callback_args = Args, 924 broadcast_buffer = Buffer, 925 broadcast_buffer_sz = BufferSize }) -> 926 PubCount1 = PubCount + 1, 927 {Module:handle_msg(Args, get_pid(Self), Msg), 928 State #state { pub_count = PubCount1, 929 broadcast_buffer = [{PubCount1, Msg} | Buffer], 930 broadcast_buffer_sz = BufferSize + SizeHint}}. 931 932%% The Erlang distribution mechanism has an interesting quirk - it 933%% will kill the VM cold with "Absurdly large distribution output data 934%% buffer" if you attempt to send a message which serialises out to 935%% more than 2^31 bytes in size. It's therefore a very good idea to 936%% make sure that we don't exceed that size! 937%% 938%% Now, we could figure out the size of messages as they come in using 939%% size(term_to_binary(Msg)) or similar. The trouble is, that requires 940%% us to serialise the message only to throw the serialised form 941%% away. Hard to believe that's a sensible thing to do. So instead we 942%% accept a size hint from the application, via broadcast/3. This size 943%% hint can be the size of anything in the message which we expect 944%% could be large, and we just ignore the size of any small bits of 945%% the message term. Therefore MAX_BUFFER_SIZE is set somewhat 946%% conservatively at 100MB - but the buffer is only to allow us to 947%% buffer tiny messages anyway, so 100MB is plenty. 948 949maybe_flush_broadcast_buffer(State = #state{broadcast_buffer_sz = Size}) -> 950 case Size > ?MAX_BUFFER_SIZE of 951 true -> flush_broadcast_buffer(State); 952 false -> State 953 end. 954 955flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) -> 956 State; 957flush_broadcast_buffer(State = #state { self = Self, 958 members_state = MembersState, 959 broadcast_buffer = Buffer, 960 pub_count = PubCount }) -> 961 [{PubCount, _Msg}|_] = Buffer, %% ASSERTION match on PubCount 962 Pubs = lists:reverse(Buffer), 963 Activity = activity_cons(Self, Pubs, [], activity_nil()), 964 ok = maybe_send_activity(activity_finalise(Activity), State), 965 MembersState1 = with_member( 966 fun (Member = #member { pending_ack = PA }) -> 967 PA1 = queue:join(PA, queue:from_list(Pubs)), 968 Member #member { pending_ack = PA1, 969 last_pub = PubCount } 970 end, Self, MembersState), 971 State #state { members_state = MembersState1, 972 broadcast_buffer = [], 973 broadcast_buffer_sz = 0 }. 974 975%% --------------------------------------------------------------------------- 976%% View construction and inspection 977%% --------------------------------------------------------------------------- 978 979needs_view_update(ReqVer, {Ver, _View}) -> Ver < ReqVer. 980 981view_version({Ver, _View}) -> Ver. 982 983is_member_alive({dead, _Member}) -> false; 984is_member_alive(_) -> true. 985 986is_member_alias(Self, Self, _View) -> 987 true; 988is_member_alias(Member, Self, View) -> 989 ?SETS:is_element(Member, 990 ((fetch_view_member(Self, View)) #view_member.aliases)). 991 992dead_member_id({dead, Member}) -> Member. 993 994store_view_member(VMember = #view_member { id = Id }, {Ver, View}) -> 995 {Ver, maps:put(Id, VMember, View)}. 996 997with_view_member(Fun, View, Id) -> 998 store_view_member(Fun(fetch_view_member(Id, View)), View). 999 1000fetch_view_member(Id, {_Ver, View}) -> maps:get(Id, View). 1001 1002find_view_member(Id, {_Ver, View}) -> maps:find(Id, View). 1003 1004blank_view(Ver) -> {Ver, maps:new()}. 1005 1006alive_view_members({_Ver, View}) -> maps:keys(View). 1007 1008all_known_members({_Ver, View}) -> 1009 maps:fold( 1010 fun (Member, #view_member { aliases = Aliases }, Acc) -> 1011 ?SETS:to_list(Aliases) ++ [Member | Acc] 1012 end, [], View). 1013 1014group_to_view(#gm_group { members = Members, version = Ver }) -> 1015 Alive = lists:filter(fun is_member_alive/1, Members), 1016 [_|_] = Alive, %% ASSERTION - can't have all dead members 1017 add_aliases(link_view(Alive ++ Alive ++ Alive, blank_view(Ver)), Members). 1018 1019link_view([Left, Middle, Right | Rest], View) -> 1020 case find_view_member(Middle, View) of 1021 error -> 1022 link_view( 1023 [Middle, Right | Rest], 1024 store_view_member(#view_member { id = Middle, 1025 aliases = ?SETS:new(), 1026 left = Left, 1027 right = Right }, View)); 1028 {ok, _} -> 1029 View 1030 end; 1031link_view(_, View) -> 1032 View. 1033 1034add_aliases(View, Members) -> 1035 Members1 = ensure_alive_suffix(Members), 1036 {EmptyDeadSet, View1} = 1037 lists:foldl( 1038 fun (Member, {DeadAcc, ViewAcc}) -> 1039 case is_member_alive(Member) of 1040 true -> 1041 {?SETS:new(), 1042 with_view_member( 1043 fun (VMember = 1044 #view_member { aliases = Aliases }) -> 1045 VMember #view_member { 1046 aliases = ?SETS:union(Aliases, DeadAcc) } 1047 end, ViewAcc, Member)}; 1048 false -> 1049 {?SETS:add_element(dead_member_id(Member), DeadAcc), 1050 ViewAcc} 1051 end 1052 end, {?SETS:new(), View}, Members1), 1053 0 = ?SETS:size(EmptyDeadSet), %% ASSERTION 1054 View1. 1055 1056ensure_alive_suffix(Members) -> 1057 queue:to_list(ensure_alive_suffix1(queue:from_list(Members))). 1058 1059ensure_alive_suffix1(MembersQ) -> 1060 {{value, Member}, MembersQ1} = queue:out_r(MembersQ), 1061 case is_member_alive(Member) of 1062 true -> MembersQ; 1063 false -> ensure_alive_suffix1(queue:in_r(Member, MembersQ1)) 1064 end. 1065 1066 1067%% --------------------------------------------------------------------------- 1068%% View modification 1069%% --------------------------------------------------------------------------- 1070 1071join_group(Self, GroupName, TxnFun) -> 1072 join_group(Self, GroupName, dirty_read_group(GroupName), TxnFun). 1073 1074join_group(Self, GroupName, {error, not_found}, TxnFun) -> 1075 join_group(Self, GroupName, 1076 prune_or_create_group(Self, GroupName, TxnFun), TxnFun); 1077join_group(Self, _GroupName, #gm_group { members = [Self] } = Group, _TxnFun) -> 1078 group_to_view(Group); 1079join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) -> 1080 case lists:member(Self, Members) of 1081 true -> 1082 group_to_view(Group); 1083 false -> 1084 case lists:filter(fun is_member_alive/1, Members) of 1085 [] -> 1086 join_group(Self, GroupName, 1087 prune_or_create_group(Self, GroupName, TxnFun), 1088 TxnFun); 1089 Alive -> 1090 Left = lists:nth(rand:uniform(length(Alive)), Alive), 1091 Handler = 1092 fun () -> 1093 join_group( 1094 Self, GroupName, 1095 record_dead_member_in_group(Self, 1096 Left, GroupName, TxnFun, false), 1097 TxnFun) 1098 end, 1099 try 1100 case neighbour_call(Left, {add_on_right, Self}) of 1101 {ok, Group1} -> group_to_view(Group1); 1102 not_ready -> join_group(Self, GroupName, TxnFun) 1103 end 1104 catch 1105 exit:{R, _} 1106 when R =:= noproc; R =:= normal; R =:= shutdown -> 1107 Handler(); 1108 exit:{{R, _}, _} 1109 when R =:= nodedown; R =:= shutdown -> 1110 Handler() 1111 end 1112 end 1113 end. 1114 1115dirty_read_group(GroupName) -> 1116 case mnesia:dirty_read(?GROUP_TABLE, GroupName) of 1117 [] -> {error, not_found}; 1118 [Group] -> Group 1119 end. 1120 1121read_group(GroupName) -> 1122 case mnesia:read({?GROUP_TABLE, GroupName}) of 1123 [] -> {error, not_found}; 1124 [Group] -> Group 1125 end. 1126 1127write_group(Group) -> mnesia:write(?GROUP_TABLE, Group, write), Group. 1128 1129prune_or_create_group(Self, GroupName, TxnFun) -> 1130 TxnFun( 1131 fun () -> 1132 GroupNew = #gm_group { name = GroupName, 1133 members = [Self], 1134 version = get_version(Self) }, 1135 case read_group(GroupName) of 1136 {error, not_found} -> 1137 write_group(GroupNew); 1138 Group = #gm_group { members = Members } -> 1139 case lists:any(fun is_member_alive/1, Members) of 1140 true -> Group; 1141 false -> write_group(GroupNew) 1142 end 1143 end 1144 end). 1145 1146record_dead_member_in_group(Self, Member, GroupName, TxnFun, Verify) -> 1147 Fun = 1148 fun () -> 1149 try 1150 Group = #gm_group { members = Members, version = Ver } = 1151 case Verify of 1152 true -> 1153 check_membership(Self, read_group(GroupName)); 1154 false -> 1155 check_group(read_group(GroupName)) 1156 end, 1157 case lists:splitwith( 1158 fun (Member1) -> Member1 =/= Member end, Members) of 1159 {_Members1, []} -> %% not found - already recorded dead 1160 Group; 1161 {Members1, [Member | Members2]} -> 1162 Members3 = Members1 ++ [{dead, Member} | Members2], 1163 write_group(Group #gm_group { members = Members3, 1164 version = Ver + 1 }) 1165 end 1166 catch 1167 lost_membership -> 1168 %% The transaction must not be abruptly crashed, but 1169 %% leave the gen_server to stop normally 1170 {error, lost_membership} 1171 end 1172 end, 1173 handle_lost_membership_in_txn(TxnFun, Fun). 1174 1175handle_lost_membership_in_txn(TxnFun, Fun) -> 1176 case TxnFun(Fun) of 1177 {error, lost_membership = T} -> 1178 throw(T); 1179 Any -> 1180 Any 1181 end. 1182 1183record_new_member_in_group(NewMember, Left, GroupName, TxnFun) -> 1184 Fun = 1185 fun () -> 1186 try 1187 Group = #gm_group { members = Members, version = Ver } = 1188 check_membership(Left, read_group(GroupName)), 1189 case lists:member(NewMember, Members) of 1190 true -> 1191 %% This avois duplicates during partial partitions, 1192 %% as inconsistent views might happen during them 1193 rabbit_log:warning("(~p) GM avoiding duplicate of ~p", 1194 [self(), NewMember]), 1195 Group; 1196 false -> 1197 {Prefix, [Left | Suffix]} = 1198 lists:splitwith(fun (M) -> M =/= Left end, Members), 1199 write_group(Group #gm_group { 1200 members = Prefix ++ [Left, NewMember | Suffix], 1201 version = Ver + 1 }) 1202 end 1203 catch 1204 lost_membership -> 1205 %% The transaction must not be abruptly crashed, but 1206 %% leave the gen_server to stop normally 1207 {error, lost_membership} 1208 end 1209 end, 1210 handle_lost_membership_in_txn(TxnFun, Fun). 1211 1212erase_members_in_group(Self, Members, GroupName, TxnFun) -> 1213 DeadMembers = [{dead, Id} || Id <- Members], 1214 Fun = 1215 fun () -> 1216 try 1217 Group = #gm_group { members = [_|_] = Members1, version = Ver } = 1218 check_membership(Self, read_group(GroupName)), 1219 case Members1 -- DeadMembers of 1220 Members1 -> Group; 1221 Members2 -> write_group( 1222 Group #gm_group { members = Members2, 1223 version = Ver + 1 }) 1224 end 1225 catch 1226 lost_membership -> 1227 %% The transaction must not be abruptly crashed, but 1228 %% leave the gen_server to stop normally 1229 {error, lost_membership} 1230 end 1231 end, 1232 handle_lost_membership_in_txn(TxnFun, Fun). 1233 1234maybe_erase_aliases(State = #state { self = Self, 1235 group_name = GroupName, 1236 members_state = MembersState, 1237 txn_executor = TxnFun }, View) -> 1238 #view_member { aliases = Aliases } = fetch_view_member(Self, View), 1239 {Erasable, MembersState1} 1240 = ?SETS:fold( 1241 fun (Id, {ErasableAcc, MembersStateAcc} = Acc) -> 1242 #member { last_pub = LP, last_ack = LA } = 1243 find_member_or_blank(Id, MembersState), 1244 case can_erase_view_member(Self, Id, LA, LP) of 1245 true -> {[Id | ErasableAcc], 1246 erase_member(Id, MembersStateAcc)}; 1247 false -> Acc 1248 end 1249 end, {[], MembersState}, Aliases), 1250 View1 = case Erasable of 1251 [] -> View; 1252 _ -> group_to_view( 1253 erase_members_in_group(Self, Erasable, GroupName, TxnFun)) 1254 end, 1255 change_view(View1, State #state { members_state = MembersState1 }). 1256 1257can_erase_view_member(Self, Self, _LA, _LP) -> false; 1258can_erase_view_member(_Self, _Id, N, N) -> true; 1259can_erase_view_member(_Self, _Id, _LA, _LP) -> false. 1260 1261neighbour_cast(N, Msg) -> ?INSTR_MOD:cast(get_pid(N), Msg). 1262neighbour_call(N, Msg) -> ?INSTR_MOD:call(get_pid(N), Msg, infinity). 1263 1264%% --------------------------------------------------------------------------- 1265%% View monitoring and maintenance 1266%% --------------------------------------------------------------------------- 1267 1268ensure_neighbour(_Ver, Self, {Self, undefined}, Self) -> 1269 {Self, undefined}; 1270ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) -> 1271 ok = neighbour_cast(RealNeighbour, {?TAG, Ver, check_neighbours}), 1272 {RealNeighbour, maybe_monitor(RealNeighbour, Self)}; 1273ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) -> 1274 {RealNeighbour, MRef}; 1275ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) -> 1276 true = ?INSTR_MOD:demonitor(MRef), 1277 Msg = {?TAG, Ver, check_neighbours}, 1278 ok = neighbour_cast(RealNeighbour, Msg), 1279 ok = case Neighbour of 1280 Self -> ok; 1281 _ -> neighbour_cast(Neighbour, Msg) 1282 end, 1283 {Neighbour, maybe_monitor(Neighbour, Self)}. 1284 1285maybe_monitor( Self, Self) -> undefined; 1286maybe_monitor(Other, _Self) -> ?INSTR_MOD:monitor(get_pid(Other)). 1287 1288check_neighbours(State = #state { self = Self, 1289 left = Left, 1290 right = Right, 1291 view = View, 1292 broadcast_buffer = Buffer }) -> 1293 #view_member { left = VLeft, right = VRight } 1294 = fetch_view_member(Self, View), 1295 Ver = view_version(View), 1296 Left1 = ensure_neighbour(Ver, Self, Left, VLeft), 1297 Right1 = ensure_neighbour(Ver, Self, Right, VRight), 1298 Buffer1 = case Right1 of 1299 {Self, undefined} -> []; 1300 _ -> Buffer 1301 end, 1302 State1 = State #state { left = Left1, right = Right1, 1303 broadcast_buffer = Buffer1 }, 1304 ok = maybe_send_catchup(Right, State1), 1305 State1. 1306 1307maybe_send_catchup(Right, #state { right = Right }) -> 1308 ok; 1309maybe_send_catchup(_Right, #state { self = Self, 1310 right = {Self, undefined} }) -> 1311 ok; 1312maybe_send_catchup(_Right, #state { members_state = undefined }) -> 1313 ok; 1314maybe_send_catchup(_Right, #state { self = Self, 1315 right = {Right, _MRef}, 1316 view = View, 1317 members_state = MembersState }) -> 1318 send_right(Right, View, 1319 {catchup, Self, prepare_members_state(MembersState)}). 1320 1321 1322%% --------------------------------------------------------------------------- 1323%% Catch_up delta detection 1324%% --------------------------------------------------------------------------- 1325 1326find_prefix_common_suffix(A, B) -> 1327 {Prefix, A1} = find_prefix(A, B, queue:new()), 1328 {Common, Suffix} = find_common(A1, B, queue:new()), 1329 {Prefix, Common, Suffix}. 1330 1331%% Returns the elements of A that occur before the first element of B, 1332%% plus the remainder of A. 1333find_prefix(A, B, Prefix) -> 1334 case {queue:out(A), queue:out(B)} of 1335 {{{value, Val}, _A1}, {{value, Val}, _B1}} -> 1336 {Prefix, A}; 1337 {{empty, A1}, {{value, _A}, _B1}} -> 1338 {Prefix, A1}; 1339 {{{value, {NumA, _MsgA} = Val}, A1}, 1340 {{value, {NumB, _MsgB}}, _B1}} when NumA < NumB -> 1341 find_prefix(A1, B, queue:in(Val, Prefix)); 1342 {_, {empty, _B1}} -> 1343 {A, Prefix} %% Prefix well be empty here 1344 end. 1345 1346%% A should be a prefix of B. Returns the commonality plus the 1347%% remainder of B. 1348find_common(A, B, Common) -> 1349 case {queue:out(A), queue:out(B)} of 1350 {{{value, Val}, A1}, {{value, Val}, B1}} -> 1351 find_common(A1, B1, queue:in(Val, Common)); 1352 {{empty, _A}, _} -> 1353 {Common, B}; 1354 %% Drop value from B. 1355 %% Match value to avoid infinite loop, since {empty, B} = queue:out(B). 1356 {_, {{value, _}, B1}} -> 1357 find_common(A, B1, Common); 1358 %% Drop value from A. Empty A should be matched by second close. 1359 {{{value, _}, A1}, _} -> 1360 find_common(A1, B, Common) 1361 end. 1362 1363 1364%% --------------------------------------------------------------------------- 1365%% Members helpers 1366%% --------------------------------------------------------------------------- 1367 1368with_member(Fun, Id, MembersState) -> 1369 store_member( 1370 Id, Fun(find_member_or_blank(Id, MembersState)), MembersState). 1371 1372with_member_acc(Fun, Id, {MembersState, Acc}) -> 1373 {MemberState, Acc1} = Fun(find_member_or_blank(Id, MembersState), Acc), 1374 {store_member(Id, MemberState, MembersState), Acc1}. 1375 1376find_member_or_blank(Id, MembersState) -> 1377 case maps:find(Id, MembersState) of 1378 {ok, Result} -> Result; 1379 error -> blank_member() 1380 end. 1381 1382erase_member(Id, MembersState) -> maps:remove(Id, MembersState). 1383 1384blank_member() -> 1385 #member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }. 1386 1387blank_member_state() -> maps:new(). 1388 1389store_member(Id, MemberState, MembersState) -> 1390 maps:put(Id, MemberState, MembersState). 1391 1392prepare_members_state(MembersState) -> maps:to_list(MembersState). 1393 1394build_members_state(MembersStateList) -> maps:from_list(MembersStateList). 1395 1396make_member(GroupName) -> 1397 {case dirty_read_group(GroupName) of 1398 #gm_group { version = Version } -> Version; 1399 {error, not_found} -> ?VERSION_START 1400 end, self()}. 1401 1402remove_erased_members(MembersState, View) -> 1403 lists:foldl(fun (Id, MembersState1) -> 1404 store_member(Id, find_member_or_blank(Id, MembersState), 1405 MembersState1) 1406 end, blank_member_state(), all_known_members(View)). 1407 1408get_version({Version, _Pid}) -> Version. 1409 1410get_pid({_Version, Pid}) -> Pid. 1411 1412get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids]. 1413 1414%% --------------------------------------------------------------------------- 1415%% Activity assembly 1416%% --------------------------------------------------------------------------- 1417 1418activity_nil() -> queue:new(). 1419 1420activity_cons( _Id, [], [], Tail) -> Tail; 1421activity_cons(Sender, Pubs, Acks, Tail) -> queue:in({Sender, Pubs, Acks}, Tail). 1422 1423activity_finalise(Activity) -> queue:to_list(Activity). 1424 1425maybe_send_activity([], _State) -> 1426 ok; 1427maybe_send_activity(Activity, #state { self = Self, 1428 right = {Right, _MRefR}, 1429 view = View }) -> 1430 send_right(Right, View, {activity, Self, Activity}). 1431 1432send_right(Right, View, Msg) -> 1433 ok = neighbour_cast(Right, {?TAG, view_version(View), Msg}). 1434 1435calculate_activity(MembersState, Confirms, Activity, Self, View) -> 1436 lists:foldl( 1437 fun ({Id, Pubs, Acks}, MembersStateConfirmsActivity) -> 1438 with_member_acc( 1439 fun (Member = #member { pending_ack = PA, 1440 last_pub = LP, 1441 last_ack = LA }, 1442 {Confirms2, Activity2}) -> 1443 case is_member_alias(Id, Self, View) of 1444 true -> 1445 {ToAck, PA1} = 1446 find_common(queue_from_pubs(Pubs), PA, 1447 queue:new()), 1448 LA1 = last_ack(Acks, LA), 1449 AckNums = acks_from_queue(ToAck), 1450 Confirms3 = maybe_confirm( 1451 Self, Id, Confirms2, AckNums), 1452 {Member #member { pending_ack = PA1, 1453 last_ack = LA1 }, 1454 {Confirms3, 1455 activity_cons( 1456 Id, [], AckNums, Activity2)}}; 1457 false -> 1458 PA1 = apply_acks(Acks, join_pubs(PA, Pubs)), 1459 LA1 = last_ack(Acks, LA), 1460 LP1 = last_pub(Pubs, LP), 1461 {Member #member { pending_ack = PA1, 1462 last_pub = LP1, 1463 last_ack = LA1 }, 1464 {Confirms2, 1465 activity_cons(Id, Pubs, Acks, Activity2)}} 1466 end 1467 end, Id, MembersStateConfirmsActivity) 1468 end, {MembersState, {Confirms, activity_nil()}}, Activity). 1469 1470callback(Args, Module, Activity) -> 1471 Result = 1472 lists:foldl( 1473 fun ({Id, Pubs, _Acks}, {Args1, Module1, ok}) -> 1474 lists:foldl(fun ({_PubNum, Pub}, Acc = {Args2, Module2, ok}) -> 1475 case Module2:handle_msg( 1476 Args2, get_pid(Id), Pub) of 1477 ok -> 1478 Acc; 1479 {become, Module3, Args3} -> 1480 {Args3, Module3, ok}; 1481 {stop, _Reason} = Error -> 1482 Error 1483 end; 1484 (_, Error = {stop, _Reason}) -> 1485 Error 1486 end, {Args1, Module1, ok}, Pubs); 1487 (_, Error = {stop, _Reason}) -> 1488 Error 1489 end, {Args, Module, ok}, Activity), 1490 case Result of 1491 {Args, Module, ok} -> ok; 1492 {Args1, Module1, ok} -> {become, Module1, Args1}; 1493 {stop, _Reason} = Error -> Error 1494 end. 1495 1496change_view(View, State = #state { view = View0, 1497 module = Module, 1498 callback_args = Args }) -> 1499 OldMembers = all_known_members(View0), 1500 NewMembers = all_known_members(View), 1501 Births = NewMembers -- OldMembers, 1502 Deaths = OldMembers -- NewMembers, 1503 Result = case {Births, Deaths} of 1504 {[], []} -> ok; 1505 _ -> Module:members_changed( 1506 Args, get_pids(Births), get_pids(Deaths)) 1507 end, 1508 {Result, check_neighbours(State #state { view = View })}. 1509 1510handle_callback_result({Result, State}) -> 1511 if_callback_success( 1512 Result, fun no_reply_true/3, fun no_reply_false/3, undefined, State); 1513handle_callback_result({Result, Reply, State}) -> 1514 if_callback_success( 1515 Result, fun reply_true/3, fun reply_false/3, Reply, State). 1516 1517no_reply_true (_Result, _Undefined, State) -> noreply(State). 1518no_reply_false({stop, Reason}, _Undefined, State) -> {stop, Reason, State}. 1519 1520reply_true (_Result, Reply, State) -> reply(Reply, State). 1521reply_false({stop, Reason}, Reply, State) -> {stop, Reason, Reply, State}. 1522 1523handle_msg_true (_Result, Msg, State) -> handle_msg(Msg, State). 1524handle_msg_false(Result, _Msg, State) -> {Result, State}. 1525 1526activity_true(_Result, Activity, State = #state { module = Module, 1527 callback_args = Args }) -> 1528 {callback(Args, Module, Activity), State}. 1529activity_false(Result, _Activity, State) -> 1530 {Result, State}. 1531 1532if_callback_success(Result, True, False, Arg, State) -> 1533 {NewResult, NewState} = maybe_stop(Result, State), 1534 if_callback_success1(NewResult, True, False, Arg, NewState). 1535 1536if_callback_success1(ok, True, _False, Arg, State) -> 1537 True(ok, Arg, State); 1538if_callback_success1( 1539 {become, Module, Args} = Result, True, _False, Arg, State) -> 1540 True(Result, Arg, State #state { module = Module, 1541 callback_args = Args }); 1542if_callback_success1({stop, _Reason} = Result, _True, False, Arg, State) -> 1543 False(Result, Arg, State). 1544 1545maybe_stop({stop, Reason}, #state{ shutting_down = false } = State) -> 1546 ShuttingDown = {true, Reason}, 1547 case has_pending_messages(State) of 1548 true -> {ok, State #state{ shutting_down = ShuttingDown }}; 1549 false -> {{stop, Reason}, State #state{ shutting_down = ShuttingDown }} 1550 end; 1551maybe_stop(Result, #state{ shutting_down = false } = State) -> 1552 {Result, State}; 1553maybe_stop(Result, #state{ shutting_down = {true, Reason} } = State) -> 1554 case has_pending_messages(State) of 1555 true -> {Result, State}; 1556 false -> {{stop, Reason}, State} 1557 end. 1558 1559has_pending_messages(#state{ broadcast_buffer = Buffer }) 1560 when Buffer =/= [] -> 1561 true; 1562has_pending_messages(#state{ members_state = MembersState }) -> 1563 MembersWithPubAckMismatches = maps:filter(fun(_Id, #member{last_pub = LP, last_ack = LA}) -> 1564 LP =/= LA 1565 end, MembersState), 1566 0 =/= maps:size(MembersWithPubAckMismatches). 1567 1568maybe_confirm(_Self, _Id, Confirms, []) -> 1569 Confirms; 1570maybe_confirm(Self, Self, Confirms, [PubNum | PubNums]) -> 1571 case queue:out(Confirms) of 1572 {empty, _Confirms} -> 1573 Confirms; 1574 {{value, {PubNum, From}}, Confirms1} -> 1575 gen_server2:reply(From, ok), 1576 maybe_confirm(Self, Self, Confirms1, PubNums); 1577 {{value, {PubNum1, _From}}, _Confirms} when PubNum1 > PubNum -> 1578 maybe_confirm(Self, Self, Confirms, PubNums) 1579 end; 1580maybe_confirm(_Self, _Id, Confirms, _PubNums) -> 1581 Confirms. 1582 1583purge_confirms(Confirms) -> 1584 _ = [gen_server2:reply(From, ok) || {_PubNum, From} <- queue:to_list(Confirms)], 1585 queue:new(). 1586 1587 1588%% --------------------------------------------------------------------------- 1589%% Msg transformation 1590%% --------------------------------------------------------------------------- 1591 1592acks_from_queue(Q) -> [PubNum || {PubNum, _Msg} <- queue:to_list(Q)]. 1593 1594pubs_from_queue(Q) -> queue:to_list(Q). 1595 1596queue_from_pubs(Pubs) -> queue:from_list(Pubs). 1597 1598apply_acks( [], Pubs) -> Pubs; 1599apply_acks(List, Pubs) -> {_, Pubs1} = queue:split(length(List), Pubs), 1600 Pubs1. 1601 1602join_pubs(Q, []) -> Q; 1603join_pubs(Q, Pubs) -> queue:join(Q, queue_from_pubs(Pubs)). 1604 1605last_ack( [], LA) -> LA; 1606last_ack(List, LA) -> LA1 = lists:last(List), 1607 true = LA1 > LA, %% ASSERTION 1608 LA1. 1609 1610last_pub( [], LP) -> LP; 1611last_pub(List, LP) -> {PubNum, _Msg} = lists:last(List), 1612 true = PubNum > LP, %% ASSERTION 1613 PubNum. 1614 1615%% --------------------------------------------------------------------------- 1616 1617%% Uninstrumented versions 1618 1619call(Pid, Msg, Timeout) -> gen_server2:call(Pid, Msg, Timeout). 1620cast(Pid, Msg) -> gen_server2:cast(Pid, Msg). 1621monitor(Pid) -> erlang:monitor(process, Pid). 1622demonitor(MRef) -> erlang:demonitor(MRef). 1623 1624check_membership(Self, #gm_group{members = M} = Group) -> 1625 case lists:member(Self, M) of 1626 true -> 1627 Group; 1628 false -> 1629 throw(lost_membership) 1630 end; 1631check_membership(_Self, {error, not_found}) -> 1632 throw(lost_membership). 1633 1634check_membership(GroupName) -> 1635 case dirty_read_group(GroupName) of 1636 #gm_group{members = M} -> 1637 case lists:keymember(self(), 2, M) of 1638 true -> 1639 ok; 1640 false -> 1641 throw(lost_membership) 1642 end; 1643 {error, not_found} -> 1644 throw(lost_membership) 1645 end. 1646 1647check_group({error, not_found}) -> 1648 throw(lost_membership); 1649check_group(Any) -> 1650 Any. 1651