1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_stream_coordinator). 9 10-behaviour(ra_machine). 11 12-export([format_ra_event/2]). 13 14-export([init/1, 15 apply/3, 16 state_enter/2, 17 init_aux/1, 18 handle_aux/6, 19 tick/2]). 20 21-export([recover/0, 22 add_replica/2, 23 delete_replica/2, 24 register_listener/1]). 25 26-export([new_stream/2, 27 delete_stream/2]). 28 29-export([policy_changed/1]). 30 31-export([local_pid/1, 32 members/1]). 33-export([query_local_pid/3, 34 query_members/2]). 35 36 37-export([log_overview/1]). 38-export([replay/1]). 39 40-rabbit_boot_step({?MODULE, 41 [{description, "Restart stream coordinator"}, 42 {mfa, {?MODULE, recover, []}}, 43 {requires, core_initialized}, 44 {enables, recovery}]}). 45 46%% exported for unit tests only 47-ifdef(TEST). 48-export([update_stream/3, 49 evaluate_stream/3]). 50-endif. 51 52-include("rabbit_stream_coordinator.hrl"). 53-include("amqqueue.hrl"). 54 55-define(REPLICA_FRESHNESS_LIMIT_MS, 10 * 1000). %% 10s 56 57-type state() :: #?MODULE{}. 58-type args() :: #{index := ra:index(), 59 node := node(), 60 epoch := osiris:epoch()}. 61 62-type command() :: {new_stream, stream_id(), #{leader_node := node(), 63 queue := amqqueue:amqqueue()}} | 64 {delete_stream, stream_id(), #{}} | 65 {add_replica, stream_id(), #{node := node()}} | 66 {delete_replica, stream_id(), #{node := node()}} | 67 {policy_changed, stream_id(), #{queue := amqqueue:amqqueue()}} | 68 {register_listener, #{pid := pid(), 69 stream_id := stream_id(), 70 queue_ref := queue_ref()}} | 71 {action_failed, stream_id(), #{index := ra:index(), 72 node := node(), 73 epoch := osiris:epoch(), 74 action := atom(), %% TODO: refine 75 term() => term()}} | 76 {member_started, stream_id(), #{index := ra:index(), 77 node := node(), 78 epoch := osiris:epoch(), 79 pid := pid()}} | 80 {member_stopped, stream_id(), args()} | 81 {retention_updated, stream_id(), args()} | 82 {mnesia_updated, stream_id(), args()} | 83 ra_machine:effect(). 84 85-export_type([command/0]). 86 87recover() -> 88 case erlang:whereis(?MODULE) of 89 undefined -> 90 case ra:restart_server(?RA_SYSTEM, {?MODULE, node()}) of 91 {error, Reason} when Reason == not_started; 92 Reason == name_not_registered -> 93 %% First boot, do nothing and wait until the first `declare` 94 ok; 95 _ -> 96 ok 97 end; 98 _ -> 99 ok 100 end. 101 102%% new api 103 104new_stream(Q, LeaderNode) 105 when ?is_amqqueue(Q) andalso is_atom(LeaderNode) -> 106 #{name := StreamId, 107 nodes := Nodes} = amqqueue:get_type_state(Q), 108 %% assertion leader is in nodes configuration 109 true = lists:member(LeaderNode, Nodes), 110 process_command({new_stream, StreamId, 111 #{leader_node => LeaderNode, 112 queue => Q}}). 113 114delete_stream(Q, ActingUser) 115 when ?is_amqqueue(Q) -> 116 #{name := StreamId} = amqqueue:get_type_state(Q), 117 case process_command({delete_stream, StreamId, #{}}) of 118 {ok, ok, _} -> 119 QName = amqqueue:get_name(Q), 120 _ = rabbit_amqqueue:internal_delete(QName, ActingUser), 121 {ok, {ok, 0}}; 122 Err -> 123 Err 124 end. 125 126-spec add_replica(amqqueue:amqqueue(), node()) -> 127 ok | {error, term()}. 128add_replica(Q, Node) when ?is_amqqueue(Q) -> 129 %% performing safety check 130 %% if any replica is stale then we should not allow 131 %% further replicas to be added 132 Pid = amqqueue:get_pid(Q), 133 try 134 ReplState0 = osiris_writer:query_replication_state(Pid), 135 {{_, InitTs}, ReplState} = maps:take(node(Pid), ReplState0), 136 {MaxTs, MinTs} = maps:fold(fun (_, {_, Ts}, {Max, Min}) -> 137 {max(Ts, Max), min(Ts, Min)} 138 end, {InitTs, InitTs}, ReplState), 139 case (MaxTs - MinTs) > ?REPLICA_FRESHNESS_LIMIT_MS of 140 true -> 141 {error, {disallowed, out_of_sync_replica}}; 142 false -> 143 Name = rabbit_misc:rs(amqqueue:get_name(Q)), 144 rabbit_log:info("~s : adding replica ~s to ~s Replication State: ~w", 145 [?MODULE, Node, Name, ReplState0]), 146 StreamId = maps:get(name, amqqueue:get_type_state(Q)), 147 case process_command({add_replica, StreamId, #{node => Node}}) of 148 {ok, Result, _} -> 149 Result; 150 Err -> 151 Err 152 end 153 end 154 catch 155 _:Error -> 156 {error, Error} 157 end. 158 159delete_replica(StreamId, Node) -> 160 process_command({delete_replica, StreamId, #{node => Node}}). 161 162policy_changed(Q) when ?is_amqqueue(Q) -> 163 StreamId = maps:get(name, amqqueue:get_type_state(Q)), 164 process_command({policy_changed, StreamId, #{queue => Q}}). 165 166local_pid(StreamId) when is_list(StreamId) -> 167 MFA = {?MODULE, query_local_pid, [StreamId, node()]}, 168 case ra:local_query({?MODULE, node()}, MFA) of 169 {ok, {_, {ok, Pid}}, _} -> 170 case is_process_alive(Pid) of 171 true -> 172 {ok, Pid}; 173 false -> 174 case ra:consistent_query({?MODULE, node()}, MFA) of 175 {ok, Result, _} -> 176 Result; 177 {error, _} = Err -> 178 Err; 179 {timeout, _} -> 180 {error, timeout} 181 end 182 end; 183 {ok, {_, Result}, _} -> 184 Result; 185 {error, _} = Err -> 186 Err; 187 {timeout, _} -> 188 {error, timeout} 189 end. 190 191-spec members(stream_id()) -> 192 {ok, #{node() := {pid() | undefined, writer | replica}}} | 193 {error, not_found}. 194members(StreamId) when is_list(StreamId) -> 195 MFA = {?MODULE, query_members, [StreamId]}, 196 case ra:local_query({?MODULE, node()}, MFA) of 197 {ok, {_, {ok, _} = Result}, _} -> 198 Result; 199 {ok, {_, {error, not_found}}, _} -> 200 %% fall back to consistent query 201 case ra:consistent_query({?MODULE, node()}, MFA) of 202 {ok, Result, _} -> 203 Result; 204 {error, _} = Err -> 205 Err; 206 {timeout, _} -> 207 {error, timeout} 208 end; 209 {ok, {_, Result}, _} -> 210 Result; 211 {error, _} = Err -> 212 Err; 213 {timeout, _} -> 214 {error, timeout} 215 end. 216 217query_members(StreamId, #?MODULE{streams = Streams}) -> 218 case Streams of 219 #{StreamId := #stream{members = Members}} -> 220 {ok, maps:map( 221 fun (_, #member{state = {running, _, Pid}, 222 role = {Role, _}}) -> 223 {Pid, Role}; 224 (_, #member{role = {Role, _}}) -> 225 {undefined, Role} 226 end, Members)}; 227 _ -> 228 {error, not_found} 229 end. 230 231query_local_pid(StreamId, Node, #?MODULE{streams = Streams}) -> 232 case Streams of 233 #{StreamId := #stream{members = 234 #{Node := #member{state = 235 {running, _, Pid}}}}} -> 236 {ok, Pid}; 237 _ -> 238 {error, not_found} 239 end. 240 241-spec register_listener(amqqueue:amqqueue()) -> 242 {error, term()} | {ok, ok, atom() | {atom(), atom()}}. 243register_listener(Q) when ?is_amqqueue(Q)-> 244 #{name := StreamId} = amqqueue:get_type_state(Q), 245 process_command({register_listener, 246 #{pid => self(), 247 stream_id => StreamId}}). 248 249process_command(Cmd) -> 250 Servers = ensure_coordinator_started(), 251 process_command(Servers, Cmd). 252 253process_command([], _Cmd) -> 254 {error, coordinator_unavailable}; 255process_command([Server | Servers], Cmd) -> 256 case ra:process_command(Server, Cmd, ?CMD_TIMEOUT) of 257 {timeout, _} -> 258 rabbit_log:warning("Coordinator timeout on server ~w when processing command ~W", 259 [element(2, Server), element(1, Cmd), 10]), 260 process_command(Servers, Cmd); 261 {error, noproc} -> 262 process_command(Servers, Cmd); 263 Reply -> 264 Reply 265 end. 266 267ensure_coordinator_started() -> 268 Local = {?MODULE, node()}, 269 AllNodes = all_coord_members(), 270 case whereis(?MODULE) of 271 undefined -> 272 global:set_lock(?STREAM_COORDINATOR_STARTUP), 273 Nodes = case ra:restart_server(?RA_SYSTEM, Local) of 274 {error, Reason} when Reason == not_started orelse 275 Reason == name_not_registered -> 276 OtherNodes = all_coord_members() -- [Local], 277 %% We can't use find_members/0 here as a process that timeouts means the cluster is up 278 case lists:filter(fun(N) -> global:whereis_name(N) =/= undefined end, OtherNodes) of 279 [] -> 280 start_coordinator_cluster(); 281 _ -> 282 OtherNodes 283 end; 284 ok -> 285 AllNodes; 286 {error, {already_started, _}} -> 287 AllNodes; 288 _ -> 289 AllNodes 290 end, 291 global:del_lock(?STREAM_COORDINATOR_STARTUP), 292 Nodes; 293 _ -> 294 AllNodes 295 end. 296 297start_coordinator_cluster() -> 298 Nodes = rabbit_mnesia:cluster_nodes(running), 299 rabbit_log:debug("Starting stream coordinator on nodes: ~w", [Nodes]), 300 case ra:start_cluster(?RA_SYSTEM, [make_ra_conf(Node, Nodes) || Node <- Nodes]) of 301 {ok, Started, _} -> 302 rabbit_log:debug("Started stream coordinator on ~w", [Started]), 303 Started; 304 {error, cluster_not_formed} -> 305 rabbit_log:warning("Stream coordinator could not be started on nodes ~w", 306 [Nodes]), 307 [] 308 end. 309 310all_coord_members() -> 311 Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()], 312 [{?MODULE, Node} || Node <- [node() | Nodes]]. 313 314init(_Conf) -> 315 #?MODULE{}. 316 317-spec apply(map(), command(), state()) -> 318 {state(), term(), ra_machine:effects()}. 319apply(#{index := _Idx} = Meta0, {_CmdTag, StreamId, #{}} = Cmd, 320 #?MODULE{streams = Streams0, 321 monitors = Monitors0} = State0) -> 322 Stream0 = maps:get(StreamId, Streams0, undefined), 323 Meta = maps:without([term, machine_version], Meta0), 324 case filter_command(Meta, Cmd, Stream0) of 325 ok -> 326 Stream1 = update_stream(Meta, Cmd, Stream0), 327 Reply = case Stream1 of 328 #stream{reply_to = undefined} -> 329 ok; 330 _ -> 331 %% reply_to is set so we'll reply later 332 '$ra_no_reply' 333 end, 334 case Stream1 of 335 undefined -> 336 return(Meta, State0#?MODULE{streams = maps:remove(StreamId, Streams0)}, 337 Reply, []); 338 _ -> 339 {Stream2, Effects0} = evaluate_stream(Meta, Stream1, []), 340 {Stream3, Effects1} = eval_listeners(Stream2, Effects0), 341 {Stream, Effects2} = eval_retention(Meta, Stream3, Effects1), 342 {Monitors, Effects} = ensure_monitors(Stream, Monitors0, Effects2), 343 return(Meta, 344 State0#?MODULE{streams = Streams0#{StreamId => Stream}, 345 monitors = Monitors}, Reply, Effects) 346 end; 347 Reply -> 348 return(Meta, State0, Reply, []) 349 end; 350apply(Meta, {down, Pid, Reason} = Cmd, 351 #?MODULE{streams = Streams0, 352 listeners = Listeners0, 353 monitors = Monitors0} = State) -> 354 355 Effects0 = case Reason of 356 noconnection -> 357 [{monitor, node, node(Pid)}]; 358 _ -> 359 [] 360 end, 361 case maps:take(Pid, Monitors0) of 362 {{StreamId, listener}, Monitors} -> 363 Listeners = case maps:take(StreamId, Listeners0) of 364 error -> 365 Listeners0; 366 {Pids0, Listeners1} -> 367 case maps:remove(Pid, Pids0) of 368 Pids when map_size(Pids) == 0 -> 369 Listeners1; 370 Pids -> 371 Listeners1#{StreamId => Pids} 372 end 373 end, 374 return(Meta, State#?MODULE{listeners = Listeners, 375 monitors = Monitors}, ok, Effects0); 376 {{StreamId, member}, Monitors1} -> 377 case Streams0 of 378 #{StreamId := Stream0} -> 379 Stream1 = update_stream(Meta, Cmd, Stream0), 380 {Stream, Effects} = evaluate_stream(Meta, Stream1, Effects0), 381 Streams = Streams0#{StreamId => Stream}, 382 return(Meta, State#?MODULE{streams = Streams, 383 monitors = Monitors1}, ok, 384 Effects); 385 _ -> 386 %% stream not found, can happen if "late" downs are 387 %% received 388 return(Meta, State#?MODULE{streams = Streams0, 389 monitors = Monitors1}, ok, Effects0) 390 end; 391 error -> 392 return(Meta, State, ok, Effects0) 393 end; 394apply(Meta, {register_listener, #{pid := Pid, 395 stream_id := StreamId}}, 396 #?MODULE{streams = Streams, 397 monitors = Monitors0} = State0) -> 398 case Streams of 399 #{StreamId := #stream{listeners = Listeners0} = Stream0} -> 400 Stream1 = Stream0#stream{listeners = maps:put(Pid, undefined, Listeners0)}, 401 {Stream, Effects} = eval_listeners(Stream1, []), 402 Monitors = maps:put(Pid, {StreamId, listener}, Monitors0), 403 return(Meta, 404 State0#?MODULE{streams = maps:put(StreamId, Stream, Streams), 405 monitors = Monitors}, ok, 406 [{monitor, process, Pid} | Effects]); 407 _ -> 408 return(Meta, State0, stream_not_found, []) 409 end; 410apply(Meta, {nodeup, Node} = Cmd, 411 #?MODULE{monitors = Monitors0, 412 streams = Streams0} = State) -> 413 %% reissue monitors for all disconnected members 414 {Effects0, Monitors} = 415 maps:fold( 416 fun(_, #stream{id = Id, 417 members = M}, {Acc, Mon}) -> 418 case M of 419 #{Node := #member{state = {disconnected, _, P}}} -> 420 {[{monitor, process, P} | Acc], 421 Mon#{P => {Id, member}}}; 422 _ -> 423 {Acc, Mon} 424 end 425 end, {[], Monitors0}, Streams0), 426 {Streams, Effects} = 427 maps:fold(fun (Id, S0, {Ss, E0}) -> 428 S1 = update_stream(Meta, Cmd, S0), 429 {S, E} = evaluate_stream(Meta, S1, E0), 430 {Ss#{Id => S}, E} 431 end, {Streams0, Effects0}, Streams0), 432 return(Meta, State#?MODULE{monitors = Monitors, 433 streams = Streams}, ok, Effects); 434apply(Meta, UnkCmd, State) -> 435 rabbit_log:debug("~s: unknown command ~W", 436 [?MODULE, UnkCmd, 10]), 437 return(Meta, State, {error, unknown_command}, []). 438 439return(#{index := Idx}, State, Reply, Effects) -> 440 case Idx rem 4096 == 0 of 441 true -> 442 %% add release cursor effect 443 {State, Reply, [{release_cursor, Idx, State} | Effects]}; 444 false -> 445 {State, Reply, Effects} 446 end. 447 448state_enter(recover, _) -> 449 put('$rabbit_vm_category', ?MODULE), 450 []; 451state_enter(leader, #?MODULE{streams = Streams, 452 monitors = Monitors}) -> 453 Pids = maps:keys(Monitors), 454 %% monitor all the known nodes 455 Nodes = all_member_nodes(Streams), 456 NodeMons = [{monitor, node, N} || N <- Nodes], 457 NodeMons ++ [{aux, fail_active_actions} | 458 [{monitor, process, P} || P <- Pids]]; 459state_enter(_S, _) -> 460 []. 461 462all_member_nodes(Streams) -> 463 maps:keys( 464 maps:fold( 465 fun (_, #stream{members = M}, Acc) -> 466 maps:merge(Acc, M) 467 end, #{}, Streams)). 468 469tick(_Ts, _State) -> 470 [{aux, maybe_resize_coordinator_cluster}]. 471 472maybe_resize_coordinator_cluster() -> 473 spawn(fun() -> 474 case ra:members({?MODULE, node()}) of 475 {_, Members, _} -> 476 MemberNodes = [Node || {_, Node} <- Members], 477 Running = rabbit_mnesia:cluster_nodes(running), 478 All = rabbit_nodes:all(), 479 case Running -- MemberNodes of 480 [] -> 481 ok; 482 New -> 483 rabbit_log:info("~s: New rabbit node(s) detected, " 484 "adding : ~w", 485 [?MODULE, New]), 486 add_members(Members, New) 487 end, 488 case MemberNodes -- All of 489 [] -> 490 ok; 491 Old -> 492 rabbit_log:info("~s: Rabbit node(s) removed from the cluster, " 493 "deleting: ~w", [?MODULE, Old]), 494 remove_members(Members, Old) 495 end; 496 _ -> 497 ok 498 end 499 end). 500 501add_members(_, []) -> 502 ok; 503add_members(Members, [Node | Nodes]) -> 504 Conf = make_ra_conf(Node, [N || {_, N} <- Members]), 505 case ra:start_server(?RA_SYSTEM, Conf) of 506 ok -> 507 case ra:add_member(Members, {?MODULE, Node}) of 508 {ok, NewMembers, _} -> 509 add_members(NewMembers, Nodes); 510 _ -> 511 add_members(Members, Nodes) 512 end; 513 Error -> 514 rabbit_log:warning("Stream coordinator failed to start on node ~s : ~W", 515 [Node, Error, 10]), 516 add_members(Members, Nodes) 517 end. 518 519remove_members(_, []) -> 520 ok; 521remove_members(Members, [Node | Nodes]) -> 522 case ra:remove_member(Members, {?MODULE, Node}) of 523 {ok, NewMembers, _} -> 524 remove_members(NewMembers, Nodes); 525 _ -> 526 remove_members(Members, Nodes) 527 end. 528 529-record(aux, {actions = #{} :: 530 #{pid() := {stream_id(), #{node := node(), 531 index := non_neg_integer(), 532 epoch := osiris:epoch()}}}, 533 resizer :: undefined | pid()}). 534 535init_aux(_Name) -> 536 #aux{}. 537 538%% TODO ensure the dead writer is restarted as a replica at some point in time, increasing timeout? 539handle_aux(leader, _, maybe_resize_coordinator_cluster, 540 #aux{resizer = undefined} = Aux, LogState, _) -> 541 Pid = maybe_resize_coordinator_cluster(), 542 {no_reply, Aux#aux{resizer = Pid}, LogState, [{monitor, process, aux, Pid}]}; 543handle_aux(leader, _, maybe_resize_coordinator_cluster, 544 AuxState, LogState, _) -> 545 %% Coordinator resizing is still happening, let's ignore this tick event 546 {no_reply, AuxState, LogState}; 547handle_aux(leader, _, {down, Pid, _}, 548 #aux{resizer = Pid} = Aux, LogState, _) -> 549 %% Coordinator resizing has finished 550 {no_reply, Aux#aux{resizer = undefined}, LogState}; 551handle_aux(leader, _, {start_writer, StreamId, 552 #{epoch := Epoch, node := Node} = Args, Conf}, 553 Aux, LogState, _) -> 554 rabbit_log:debug("~s: running action: 'start_writer'" 555 " for ~s on node ~w in epoch ~b", 556 [?MODULE, StreamId, Node, Epoch]), 557 ActionFun = phase_start_writer(StreamId, Args, Conf), 558 run_action(starting, StreamId, Args, ActionFun, Aux, LogState); 559handle_aux(leader, _, {start_replica, StreamId, 560 #{epoch := Epoch, node := Node} = Args, Conf}, 561 Aux, LogState, _) -> 562 rabbit_log:debug("~s: running action: 'start_replica'" 563 " for ~s on node ~w in epoch ~b", 564 [?MODULE, StreamId, Node, Epoch]), 565 ActionFun = phase_start_replica(StreamId, Args, Conf), 566 run_action(starting, StreamId, Args, ActionFun, Aux, LogState); 567handle_aux(leader, _, {stop, StreamId, #{node := Node, 568 epoch := Epoch} = Args, Conf}, 569 Aux, LogState, _) -> 570 rabbit_log:debug("~s: running action: 'stop'" 571 " for ~s on node ~w in epoch ~b", 572 [?MODULE, StreamId, Node, Epoch]), 573 ActionFun = phase_stop_member(StreamId, Args, Conf), 574 run_action(stopping, StreamId, Args, ActionFun, Aux, LogState); 575handle_aux(leader, _, {update_mnesia, StreamId, Args, Conf}, 576 #aux{actions = _Monitors} = Aux, LogState, 577 #?MODULE{streams = _Streams}) -> 578 rabbit_log:debug("~s: running action: 'update_mnesia'" 579 " for ~s", [?MODULE, StreamId]), 580 ActionFun = phase_update_mnesia(StreamId, Args, Conf), 581 run_action(updating_mnesia, StreamId, Args, ActionFun, Aux, LogState); 582handle_aux(leader, _, {update_retention, StreamId, Args, _Conf}, 583 #aux{actions = _Monitors} = Aux, LogState, 584 #?MODULE{streams = _Streams}) -> 585 rabbit_log:debug("~s: running action: 'update_retention'" 586 " for ~s", [?MODULE, StreamId]), 587 ActionFun = phase_update_retention(StreamId, Args), 588 run_action(update_retention, StreamId, Args, ActionFun, Aux, LogState); 589handle_aux(leader, _, {delete_member, StreamId, #{node := Node} = Args, Conf}, 590 #aux{actions = _Monitors} = Aux, LogState, 591 #?MODULE{streams = _Streams}) -> 592 rabbit_log:debug("~s: running action: 'delete_member'" 593 " for ~s ~s", [?MODULE, StreamId, Node]), 594 ActionFun = phase_delete_member(StreamId, Args, Conf), 595 run_action(delete_member, StreamId, Args, ActionFun, Aux, LogState); 596handle_aux(leader, _, fail_active_actions, 597 #aux{actions = Monitors} = Aux, LogState, 598 #?MODULE{streams = Streams}) -> 599 Exclude = maps:from_list([{S, ok} 600 || {P, {S, _, _}} <- maps:to_list(Monitors), 601 not is_process_alive(P)]), 602 rabbit_log:debug("~s: failing actions: ~w", [?MODULE, Exclude]), 603 fail_active_actions(Streams, Exclude), 604 {no_reply, Aux, LogState, []}; 605handle_aux(leader, _, {down, Pid, normal}, 606 #aux{actions = Monitors} = Aux, LogState, _) -> 607 %% action process finished normally, just remove from actions map 608 {no_reply, Aux#aux{actions = maps:remove(Pid, Monitors)}, LogState, []}; 609handle_aux(leader, _, {down, Pid, Reason}, 610 #aux{actions = Monitors0} = Aux, LogState, _) -> 611 %% An action has failed - report back to the state machine 612 case maps:get(Pid, Monitors0, undefined) of 613 {StreamId, Action, #{node := Node, epoch := Epoch} = Args} -> 614 rabbit_log:warning("~s: error while executing action for stream queue ~s, " 615 " node ~s, epoch ~b Err: ~w", 616 [?MODULE, StreamId, Node, Epoch, Reason]), 617 Monitors = maps:remove(Pid, Monitors0), 618 Cmd = {action_failed, StreamId, Args#{action => Action}}, 619 send_self_command(Cmd), 620 {no_reply, Aux#aux{actions = maps:remove(Pid, Monitors)}, 621 LogState, []}; 622 undefined -> 623 %% should this ever happen? 624 {no_reply, Aux, LogState, []} 625 end; 626handle_aux(_, _, _, AuxState, LogState, _) -> 627 {no_reply, AuxState, LogState}. 628 629run_action(Action, StreamId, #{node := _Node, 630 epoch := _Epoch} = Args, 631 ActionFun, #aux{actions = Actions0} = Aux, Log) -> 632 Coordinator = self(), 633 Pid = spawn_link(fun() -> 634 ActionFun(), 635 unlink(Coordinator) 636 end), 637 Effects = [{monitor, process, aux, Pid}], 638 Actions = Actions0#{Pid => {StreamId, Action, Args}}, 639 {no_reply, Aux#aux{actions = Actions}, Log, Effects}. 640 641wrap_reply(From, Reply) -> 642 [{reply, From, {wrap_reply, Reply}}]. 643 644phase_start_replica(StreamId, #{epoch := Epoch, 645 node := Node} = Args, Conf0) -> 646 fun() -> 647 try osiris_replica:start(Node, Conf0) of 648 {ok, Pid} -> 649 rabbit_log:info("~s: ~s: replica started on ~s in ~b pid ~w", 650 [?MODULE, StreamId, Node, Epoch, Pid]), 651 send_self_command({member_started, StreamId, 652 Args#{pid => Pid}}); 653 {error, already_present} -> 654 %% need to remove child record if this is the case 655 %% can it ever happen? 656 _ = osiris_replica:stop(Node, Conf0), 657 send_action_failed(StreamId, starting, Args); 658 {error, {already_started, Pid}} -> 659 %% TODO: we need to check that the current epoch is the same 660 %% before we can be 100% sure it is started in the correct 661 %% epoch, can this happen? who knows... 662 send_self_command({member_started, StreamId, 663 Args#{pid => Pid}}); 664 {error, Reason} -> 665 rabbit_log:warning("~s: Error while starting replica for ~s on node ~s in ~b : ~W", 666 [?MODULE, maps:get(name, Conf0), Node, Epoch, Reason, 10]), 667 maybe_sleep(Reason), 668 send_action_failed(StreamId, starting, Args) 669 catch _:Error -> 670 rabbit_log:warning("~s: Error while starting replica for ~s on node ~s in ~b : ~W", 671 [?MODULE, maps:get(name, Conf0), Node, Epoch, Error, 10]), 672 maybe_sleep(Error), 673 send_action_failed(StreamId, starting, Args) 674 end 675 end. 676 677send_action_failed(StreamId, Action, Arg) -> 678 send_self_command({action_failed, StreamId, Arg#{action => Action}}). 679 680send_self_command(Cmd) -> 681 ra:pipeline_command({?MODULE, node()}, Cmd), 682 ok. 683 684 685phase_delete_member(StreamId, #{node := Node} = Arg, Conf) -> 686 fun() -> 687 try osiris_server_sup:delete_child(Node, Conf) of 688 ok -> 689 send_self_command({member_deleted, StreamId, Arg}); 690 _ -> 691 send_action_failed(StreamId, deleting, Arg) 692 catch _:E -> 693 rabbit_log:warning("~s: Error while deleting member for ~s : on node ~s ~W", 694 [?MODULE, StreamId, Node, E, 10]), 695 maybe_sleep(E), 696 send_action_failed(StreamId, deleting, Arg) 697 end 698 end. 699 700phase_stop_member(StreamId, #{node := Node, 701 epoch := Epoch} = Arg0, Conf) -> 702 fun() -> 703 try osiris_server_sup:stop_child(Node, StreamId) of 704 ok -> 705 %% get tail 706 try get_replica_tail(Node, Conf) of 707 {ok, Tail} -> 708 Arg = Arg0#{tail => Tail}, 709 rabbit_log:debug("~s: ~s: member stopped on ~s in ~b Tail ~w", 710 [?MODULE, StreamId, Node, Epoch, Tail]), 711 send_self_command({member_stopped, StreamId, Arg}); 712 Err -> 713 rabbit_log:warning("~s: failed to get tail of member ~s on ~s in ~b Error: ~w", 714 [?MODULE, StreamId, Node, Epoch, Err]), 715 maybe_sleep(Err), 716 send_action_failed(StreamId, stopping, Arg0) 717 catch _:Err -> 718 rabbit_log:warning("~s: failed to get tail of member ~s on ~s in ~b Error: ~w", 719 [?MODULE, StreamId, Node, Epoch, Err]), 720 maybe_sleep(Err), 721 send_action_failed(StreamId, stopping, Arg0) 722 end; 723 Err -> 724 rabbit_log:warning("~s: failed to stop " 725 "member ~s ~w Error: ~w", 726 [?MODULE, StreamId, Node, Err]), 727 maybe_sleep(Err), 728 send_action_failed(StreamId, stopping, Arg0) 729 catch _:Err -> 730 rabbit_log:warning("~s: failed to stop member ~s ~w Error: ~w", 731 [?MODULE, StreamId, Node, Err]), 732 maybe_sleep(Err), 733 send_action_failed(StreamId, stopping, Arg0) 734 end 735 end. 736 737phase_start_writer(StreamId, #{epoch := Epoch, 738 node := Node} = Args0, Conf) -> 739 fun() -> 740 try osiris_writer:start(Conf) of 741 {ok, Pid} -> 742 Args = Args0#{epoch => Epoch, pid => Pid}, 743 rabbit_log:info("~s: started writer ~s on ~w in ~b", 744 [?MODULE, StreamId, Node, Epoch]), 745 send_self_command({member_started, StreamId, Args}); 746 Err -> 747 %% no sleep for writer failures as we want to trigger a new 748 %% election asap 749 rabbit_log:warning("~s: failed to start writer ~s on ~s in ~b Error: ~w", 750 [?MODULE, StreamId, Node, Epoch, Err]), 751 send_action_failed(StreamId, starting, Args0) 752 catch _:Err -> 753 rabbit_log:warning("~s: failed to start writer ~s on ~s in ~b Error: ~w", 754 [?MODULE, StreamId, Node, Epoch, Err]), 755 send_action_failed(StreamId, starting, Args0) 756 end 757 end. 758 759phase_update_retention(StreamId, #{pid := Pid, 760 retention := Retention} = Args) -> 761 fun() -> 762 try osiris:update_retention(Pid, Retention) of 763 ok -> 764 send_self_command({retention_updated, StreamId, Args}); 765 {error, Reason} = Err -> 766 rabbit_log:warning("~s: failed to update retention for ~s ~w Reason: ~w", 767 [?MODULE, StreamId, node(Pid), Reason]), 768 maybe_sleep(Err), 769 send_action_failed(StreamId, update_retention, Args) 770 catch _:Err -> 771 rabbit_log:warning("~s: failed to update retention for ~s ~w Error: ~w", 772 [?MODULE, StreamId, node(Pid), Err]), 773 maybe_sleep(Err), 774 send_action_failed(StreamId, update_retention, Args) 775 end 776 end. 777 778get_replica_tail(Node, Conf) -> 779 case rpc:call(Node, ?MODULE, log_overview, [Conf]) of 780 {badrpc, nodedown} -> 781 {error, nodedown}; 782 {error, _} = Err -> 783 Err; 784 {_Range, Offsets} -> 785 {ok, select_highest_offset(Offsets)} 786 end. 787 788select_highest_offset([]) -> 789 empty; 790select_highest_offset(Offsets) -> 791 lists:last(Offsets). 792 793log_overview(Config) -> 794 case whereis(osiris_sup) of 795 undefined -> 796 {error, app_not_running}; 797 _ -> 798 Dir = osiris_log:directory(Config), 799 osiris_log:overview(Dir) 800 end. 801 802 803replay(L) when is_list(L) -> 804 lists:foldl( 805 fun ({M, E}, Acc) -> 806 element(1, ?MODULE:apply(M, E, Acc)) 807 end, init(#{}), L). 808 809is_quorum(1, 1) -> 810 true; 811is_quorum(NumReplicas, NumAlive) -> 812 NumAlive >= ((NumReplicas div 2) + 1). 813 814phase_update_mnesia(StreamId, Args, #{reference := QName, 815 leader_pid := LeaderPid} = Conf) -> 816 fun() -> 817 rabbit_log:debug("~s: running mnesia update for ~s: ~W", 818 [?MODULE, StreamId, Conf, 10]), 819 Fun = fun (Q) -> 820 case amqqueue:get_type_state(Q) of 821 #{name := S} when S == StreamId -> 822 %% the stream id matches so we can update the 823 %% amqqueue record 824 amqqueue:set_type_state( 825 amqqueue:set_pid(Q, LeaderPid), Conf); 826 Ts -> 827 S = maps:get(name, Ts, undefined), 828 rabbit_log:debug("~s: refusing mnesia update for stale stream id ~s, current ~s", 829 [?MODULE, StreamId, S]), 830 %% if the stream id isn't a match this is a stale 831 %% update from a previous stream incarnation for the 832 %% same queue name and we ignore it 833 Q 834 end 835 end, 836 try rabbit_misc:execute_mnesia_transaction( 837 fun() -> 838 rabbit_amqqueue:update(QName, Fun) 839 end) of 840 not_found -> 841 rabbit_log:debug("~s: resource for stream id ~s not found, " 842 "recovering from rabbit_durable_queue", 843 [?MODULE, StreamId]), 844 %% This can happen during recovery 845 %% we need to re-initialise the queue record 846 %% if the stream id is a match 847 [Q] = mnesia:dirty_read(rabbit_durable_queue, QName), 848 case amqqueue:get_type_state(Q) of 849 #{name := S} when S == StreamId -> 850 rabbit_log:debug("~s: initializing queue record for stream id ~s", 851 [?MODULE, StreamId]), 852 _ = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)), 853 ok; 854 _ -> 855 ok 856 end, 857 858 send_self_command({mnesia_updated, StreamId, Args}); 859 _ -> 860 send_self_command({mnesia_updated, StreamId, Args}) 861 catch _:E -> 862 rabbit_log:debug("~s: failed to update mnesia for ~s: ~W", 863 [?MODULE, StreamId, E, 10]), 864 send_action_failed(StreamId, updating_mnesia, Args) 865 end 866 end. 867 868format_ra_event(ServerId, Evt) -> 869 {stream_coordinator_event, ServerId, Evt}. 870 871make_ra_conf(Node, Nodes) -> 872 UId = ra:new_uid(ra_lib:to_binary(?MODULE)), 873 Formatter = {?MODULE, format_ra_event, []}, 874 Members = [{?MODULE, N} || N <- Nodes], 875 TickTimeout = application:get_env(rabbit, stream_tick_interval, 876 ?TICK_TIMEOUT), 877 #{cluster_name => ?MODULE, 878 id => {?MODULE, Node}, 879 uid => UId, 880 friendly_name => atom_to_list(?MODULE), 881 metrics_key => ?MODULE, 882 initial_members => Members, 883 log_init_args => #{uid => UId}, 884 tick_timeout => TickTimeout, 885 machine => {module, ?MODULE, #{}}, 886 ra_event_formatter => Formatter}. 887 888filter_command(_Meta, {delete_replica, _, #{node := Node}}, #stream{id = StreamId, 889 members = Members0}) -> 890 Members = maps:filter(fun(_, #member{target = S}) when S =/= deleted -> 891 true; 892 (_, _) -> 893 false 894 end, Members0), 895 case maps:size(Members) =< 1 of 896 true -> 897 rabbit_log:warning( 898 "~s failed to delete replica on node ~s for stream ~s: refusing to delete the only replica", 899 [?MODULE, Node, StreamId]), 900 {error, last_stream_member}; 901 false -> 902 ok 903 end; 904filter_command(_, _, _) -> 905 ok. 906 907update_stream(Meta, Cmd, Stream) -> 908 try 909 update_stream0(Meta, Cmd, Stream) 910 catch 911 _:E:Stacktrace -> 912 rabbit_log:warning( 913 "~s failed to update stream:~n~W~n~W", 914 [?MODULE, E, 10, Stacktrace, 10]), 915 Stream 916 end. 917 918update_stream0(#{system_time := _} = Meta, 919 {new_stream, StreamId, #{leader_node := LeaderNode, 920 queue := Q}}, undefined) -> 921 #{nodes := Nodes} = Conf = amqqueue:get_type_state(Q), 922 %% this jumps straight to the state where all members 923 %% have been stopped and a new writer has been chosen 924 E = 1, 925 QueueRef = amqqueue:get_name(Q), 926 Members = maps:from_list( 927 [{N, #member{role = case LeaderNode of 928 N -> {writer, E}; 929 _ -> {replica, E} 930 end, 931 node = N, 932 state = {ready, E}, 933 %% no members are running actions 934 current = undefined} 935 } || N <- Nodes]), 936 #stream{id = StreamId, 937 epoch = E, 938 nodes = Nodes, 939 queue_ref = QueueRef, 940 conf = Conf, 941 members = Members, 942 reply_to = maps:get(from, Meta, undefined)}; 943update_stream0(#{system_time := _Ts} = _Meta, 944 {delete_stream, _StreamId, #{}}, 945 #stream{members = Members0, 946 target = _} = Stream0) -> 947 Members = maps:map( 948 fun (_, M) -> 949 M#member{target = deleted} 950 end, Members0), 951 Stream0#stream{members = Members, 952 %% reset reply_to here to ensure a reply 953 %% is returned as the command has been accepted 954 reply_to = undefined, 955 target = deleted}; 956update_stream0(#{system_time := _Ts} = _Meta, 957 {add_replica, _StreamId, #{node := Node}}, 958 #stream{members = Members0, 959 epoch = Epoch, 960 nodes = Nodes, 961 target = _} = Stream0) -> 962 case maps:is_key(Node, Members0) of 963 true -> 964 Stream0; 965 false -> 966 Members1 = Members0#{Node => #member{role = {replica, Epoch}, 967 node = Node, 968 target = stopped}}, 969 Members = set_running_to_stopped(Members1), 970 Stream0#stream{members = Members, 971 nodes = lists:sort([Node | Nodes])} 972 end; 973update_stream0(#{system_time := _Ts} = _Meta, 974 {delete_replica, _StreamId, #{node := Node}}, 975 #stream{members = Members0, 976 epoch = _Epoch, 977 nodes = Nodes, 978 target = _} = Stream0) -> 979 case maps:is_key(Node, Members0) of 980 true -> 981 %% TODO: check of duplicate 982 Members = maps:map( 983 fun (K, M) when K == Node -> 984 M#member{target = deleted}; 985 (_, #member{target = running} = M) -> 986 M#member{target = stopped}; 987 (_, M) -> 988 M 989 end, Members0), 990 Stream0#stream{members = Members, 991 nodes = lists:delete(Node, Nodes)}; 992 false -> 993 Stream0 994 end; 995update_stream0(#{system_time := _Ts}, 996 {member_started, _StreamId, 997 #{epoch := E, 998 index := Idx, 999 pid := Pid} = Args}, #stream{epoch = E, 1000 members = Members} = Stream0) -> 1001 Node = node(Pid), 1002 case maps:get(Node, Members, undefined) of 1003 #member{role = {_, E}, 1004 current = {starting, Idx}, 1005 state = _} = Member0 -> 1006 %% this is what we expect, leader epoch should match overall 1007 %% epoch 1008 Member = Member0#member{state = {running, E, Pid}, 1009 current = undefined}, 1010 %% TODO: we need to tell the machine to monitor the leader 1011 Stream0#stream{members = 1012 Members#{Node => Member}}; 1013 Member -> 1014 %% do we just ignore any members started events from unexpected 1015 %% epochs? 1016 rabbit_log:warning("~s: member started unexpected ~w ~w", 1017 [?MODULE, Args, Member]), 1018 Stream0 1019 end; 1020update_stream0(#{system_time := _Ts}, 1021 {member_deleted, _StreamId, #{node := Node}}, 1022 #stream{nodes = Nodes, 1023 members = Members0} = Stream0) -> 1024 case maps:take(Node, Members0) of 1025 {_, Members} when map_size(Members) == 0 -> 1026 undefined; 1027 {#member{state = _}, Members} -> 1028 %% this is what we expect, leader epoch should match overall 1029 %% epoch 1030 Stream0#stream{nodes = lists:delete(Node, Nodes), 1031 members = Members}; 1032 _ -> 1033 %% do we just ignore any writer_started events from unexpected 1034 %% epochs? 1035 Stream0 1036 end; 1037update_stream0(#{system_time := _Ts}, 1038 {member_stopped, _StreamId, 1039 #{node := Node, 1040 index := Idx, 1041 epoch := StoppedEpoch, 1042 tail := Tail}}, #stream{epoch = Epoch, 1043 target = Target, 1044 nodes = Nodes, 1045 members = Members0} = Stream0) -> 1046 IsLeaderInCurrent = case find_leader(Members0) of 1047 {#member{role = {writer, Epoch}, 1048 target = running, 1049 state = {ready, Epoch}}, _} -> 1050 true; 1051 {#member{role = {writer, Epoch}, 1052 target = running, 1053 state = {running, Epoch, _}}, _} -> 1054 true; 1055 _ -> 1056 false 1057 end, 1058 case maps:get(Node, Members0) of 1059 #member{role = {replica, Epoch}, 1060 current = {stopping, Idx}, 1061 state = _} = Member0 1062 when IsLeaderInCurrent -> 1063 %% A leader has already been selected so skip straight to ready state 1064 Member = update_target(Member0#member{state = {ready, Epoch}, 1065 current = undefined}, Target), 1066 Members1 = Members0#{Node => Member}, 1067 Stream0#stream{members = Members1}; 1068 #member{role = {_, Epoch}, 1069 current = {stopping, Idx}, 1070 state = _} = Member0 -> 1071 %% this is what we expect, member epoch should match overall 1072 %% epoch 1073 Member = case StoppedEpoch of 1074 Epoch -> 1075 update_target(Member0#member{state = {stopped, StoppedEpoch, Tail}, 1076 current = undefined}, Target); 1077 _ -> 1078 %% if stopped epoch is from another epoch 1079 %% leave target as is to retry stop in current term 1080 Member0#member{state = {stopped, StoppedEpoch, Tail}, 1081 current = undefined} 1082 end, 1083 1084 Members1 = Members0#{Node => Member}, 1085 1086 Offsets = [{N, T} 1087 || #member{state = {stopped, E, T}, 1088 target = running, 1089 node = N} <- maps:values(Members1), 1090 E == Epoch], 1091 case is_quorum(length(Nodes), length(Offsets)) of 1092 true -> 1093 %% select leader 1094 NewWriterNode = select_leader(Offsets), 1095 NextEpoch = Epoch + 1, 1096 Members = maps:map( 1097 fun (N, #member{state = {stopped, E, _}} = M) 1098 when E == Epoch -> 1099 case NewWriterNode of 1100 N -> 1101 %% new leader 1102 M#member{role = {writer, NextEpoch}, 1103 state = {ready, NextEpoch}}; 1104 _ -> 1105 M#member{role = {replica, NextEpoch}, 1106 state = {ready, NextEpoch}} 1107 end; 1108 (_N, #member{target = deleted} = M) -> 1109 M; 1110 (_N, M) -> 1111 M#member{role = {replica, NextEpoch}} 1112 end, Members1), 1113 Stream0#stream{epoch = NextEpoch, 1114 members = Members}; 1115 false -> 1116 Stream0#stream{members = Members1} 1117 end; 1118 _Member -> 1119 Stream0 1120 end; 1121update_stream0(#{system_time := _Ts}, 1122 {mnesia_updated, _StreamId, #{epoch := E}}, 1123 Stream0) -> 1124 %% reset mnesia state 1125 case Stream0 of 1126 undefined -> 1127 undefined; 1128 _ -> 1129 Stream0#stream{mnesia = {updated, E}} 1130 end; 1131update_stream0(#{system_time := _Ts}, 1132 {retention_updated, _StreamId, #{node := Node}}, 1133 #stream{members = Members0, 1134 conf = Conf} = Stream0) -> 1135 Members = maps:update_with(Node, fun (M) -> 1136 M#member{current = undefined, 1137 conf = Conf} 1138 end, Members0), 1139 Stream0#stream{members = Members}; 1140update_stream0(#{system_time := _Ts}, 1141 {action_failed, _StreamId, #{action := updating_mnesia}}, 1142 #stream{mnesia = {_, E}} = Stream0) -> 1143 Stream0#stream{mnesia = {updated, E}}; 1144update_stream0(#{system_time := _Ts}, 1145 {action_failed, _StreamId, 1146 #{node := Node, 1147 index := Idx, 1148 action := Action, 1149 epoch := _Epoch}}, #stream{members = Members0} = Stream0) -> 1150 Members1 = maps:update_with(Node, 1151 fun (#member{current = {C, I}} = M) 1152 when C == Action andalso I == Idx -> 1153 M#member{current = undefined}; 1154 (M) -> 1155 M 1156 end, Members0), 1157 case Members0 of 1158 #{Node := #member{role = {writer, E}, 1159 state = {ready, E}, 1160 current = {starting, Idx}}} 1161 when Action == starting -> 1162 %% the leader failed to start = we need a new election 1163 %% stop all members 1164 Members = set_running_to_stopped(Members1), 1165 Stream0#stream{members = Members}; 1166 _ -> 1167 Stream0#stream{members = Members1} 1168 end; 1169update_stream0(#{system_time := _Ts}, 1170 {down, Pid, Reason}, 1171 #stream{epoch = E, 1172 members = Members0} = Stream0) -> 1173 DownNode = node(Pid), 1174 case Members0 of 1175 #{DownNode := #member{role = {writer, E}, 1176 state = {running, E, Pid}} = Member} -> 1177 Members1 = Members0#{DownNode => Member#member{state = {down, E}}}, 1178 %% leader is down, set all members that should be running to stopped 1179 Members = maps:map(fun (_, #member{target = running} = M) -> 1180 M#member{target = stopped}; 1181 (_, M) -> 1182 M 1183 end, Members1), 1184 Stream0#stream{members = Members}; 1185 #{DownNode := #member{role = {replica, _}, 1186 state = {running, _, Pid}} = Member} 1187 when Reason == noconnection -> 1188 %% mark process as disconnected such that we don't set it to down until 1189 %% the node is back and we can re-monitor 1190 Members = Members0#{DownNode => 1191 Member#member{state = {disconnected, E, Pid}}}, 1192 Stream0#stream{members = Members}; 1193 #{DownNode := #member{role = {replica, _}, 1194 state = {S, _, Pid}} = Member} 1195 when S == running orelse S == disconnected -> 1196 %% the down process is currently running with the correct Pid 1197 %% set state to down 1198 Members = Members0#{DownNode => Member#member{state = {down, E}}}, 1199 Stream0#stream{members = Members}; 1200 _ -> 1201 Stream0 1202 end; 1203update_stream0(#{system_time := _Ts}, 1204 {down, _Pid, _Reason}, undefined) -> 1205 undefined; 1206update_stream0(#{system_time := _Ts} = _Meta, 1207 {nodeup, Node}, 1208 #stream{members = Members0} = Stream0) -> 1209 Members = maps:map( 1210 fun (_, #member{node = N, 1211 current = {sleeping, nodeup}} = M) 1212 when N == Node -> 1213 M#member{current = undefined}; 1214 (_, M) -> 1215 M 1216 end, Members0), 1217 Stream0#stream{members = Members}; 1218update_stream0(#{system_time := _Ts}, 1219 {policy_changed, _StreamId, #{queue := Q}}, 1220 #stream{conf = Conf0, 1221 members = _Members0} = Stream0) -> 1222 Conf = rabbit_stream_queue:update_stream_conf(Q, Conf0), 1223 Stream0#stream{conf = Conf}; 1224update_stream0(_Meta, _Cmd, undefined) -> 1225 undefined. 1226 1227eval_listeners(#stream{listeners = Listeners0, 1228 queue_ref = QRef, 1229 members = Members} = Stream, Effects0) -> 1230 case find_leader(Members) of 1231 {#member{state = {running, _, LeaderPid}}, _} -> 1232 %% a leader is running, check all listeners to see if any of them 1233 %% has not been notified of the current leader pid 1234 {Listeners, Effects} = 1235 maps:fold( 1236 fun(_, P, Acc) when P == LeaderPid -> 1237 Acc; 1238 (LPid, _, {L, Acc}) -> 1239 {L#{LPid => LeaderPid}, 1240 [{send_msg, LPid, 1241 {queue_event, QRef, 1242 {stream_leader_change, LeaderPid}}, 1243 cast} | Acc]} 1244 end, {Listeners0, Effects0}, Listeners0), 1245 {Stream#stream{listeners = Listeners}, Effects}; 1246 _ -> 1247 {Stream, Effects0} 1248 end. 1249 1250eval_retention(#{index := Idx} = Meta, 1251 #stream{conf = #{retention := Ret} = Conf, 1252 id = StreamId, 1253 epoch = Epoch, 1254 members = Members} = Stream, Effects0) -> 1255 NeedUpdate = maps:filter( 1256 fun (_, #member{state = {running, _, _}, 1257 current = undefined, 1258 conf = C}) -> 1259 Ret =/= maps:get(retention, C, undefined); 1260 (_, _) -> 1261 false 1262 end, Members), 1263 Args = Meta#{epoch => Epoch}, 1264 Effs = [{aux, {update_retention, StreamId, 1265 Args#{pid => Pid, 1266 node => node(Pid), 1267 retention => Ret}, Conf}} 1268 || #member{state = {running, _, Pid}} <- maps:values(NeedUpdate)], 1269 Updated = maps:map(fun (_, M) -> M#member{current = {updating, Idx}} end, 1270 NeedUpdate), 1271 {Stream#stream{members = maps:merge(Members, Updated)}, Effs ++ Effects0}. 1272 1273 1274%% this function should be idempotent, 1275%% it should modify the state such that it won't issue duplicate 1276%% actions when called again 1277evaluate_stream(#{index := Idx} = Meta, 1278 #stream{id = StreamId, 1279 reply_to = From, 1280 epoch = Epoch, 1281 mnesia = {MnesiaTag, MnesiaEpoch}, 1282 members = Members0} = Stream0, Effs0) -> 1283 case find_leader(Members0) of 1284 {#member{state = LState, 1285 node = LeaderNode, 1286 target = deleted, 1287 current = undefined} = Writer0, Replicas} 1288 when LState =/= deleted -> 1289 Action = {aux, {delete_member, StreamId, LeaderNode, 1290 make_writer_conf(Writer0, Stream0)}}, 1291 Writer = Writer0#member{current = {deleting, Idx}}, 1292 Effs = [Action | Effs0], 1293 Stream = Stream0#stream{reply_to = undefined}, 1294 eval_replicas(Meta, Writer, Replicas, Stream, Effs); 1295 {#member{state = {down, Epoch}, 1296 target = stopped, 1297 node = LeaderNode, 1298 current = undefined} = Writer0, Replicas} -> 1299 %% leader is down - all replicas need to be stopped 1300 %% and tail infos retrieved 1301 %% some replicas may already be in stopping or ready state 1302 Args = Meta#{epoch => Epoch, 1303 node => LeaderNode}, 1304 Conf = make_writer_conf(Writer0, Stream0), 1305 Action = {aux, {stop, StreamId, Args, Conf}}, 1306 Writer = Writer0#member{current = {stopping, Idx}}, 1307 eval_replicas(Meta, Writer, Replicas, Stream0, [Action | Effs0]); 1308 {#member{state = {ready, Epoch}, %% writer ready in current epoch 1309 target = running, 1310 node = LeaderNode, 1311 current = undefined} = Writer0, _Replicas} -> 1312 %% ready check has been completed and a new leader has been chosen 1313 %% time to start writer, 1314 %% if leader start fails, revert back to down state for all and re-run 1315 WConf = make_writer_conf(Writer0, Stream0), 1316 Members = Members0#{LeaderNode => 1317 Writer0#member{current = {starting, Idx}, 1318 conf = WConf}}, 1319 Args = Meta#{node => LeaderNode, epoch => Epoch}, 1320 Actions = [{aux, {start_writer, StreamId, Args, WConf}} | Effs0], 1321 {Stream0#stream{members = Members}, Actions}; 1322 {#member{state = {running, Epoch, LeaderPid}, 1323 target = running} = Writer, Replicas} -> 1324 Effs1 = case From of 1325 undefined -> 1326 Effs0; 1327 _ -> 1328 %% we need a reply effect here 1329 wrap_reply(From, {ok, LeaderPid}) ++ Effs0 1330 end, 1331 Stream1 = Stream0#stream{reply_to = undefined}, 1332 case MnesiaTag == updated andalso MnesiaEpoch < Epoch of 1333 true -> 1334 Args = Meta#{node => node(LeaderPid), epoch => Epoch}, 1335 Effs = [{aux, 1336 {update_mnesia, StreamId, Args, 1337 make_replica_conf(LeaderPid, Stream1)}} | Effs1], 1338 Stream = Stream1#stream{mnesia = {updating, MnesiaEpoch}}, 1339 eval_replicas(Meta, Writer, Replicas, Stream, Effs); 1340 false -> 1341 eval_replicas(Meta, Writer, Replicas, Stream1, Effs1) 1342 end; 1343 {#member{state = S, 1344 target = stopped, 1345 node = LeaderNode, 1346 current = undefined} = Writer0, Replicas} 1347 when element(1, S) =/= stopped -> 1348 %% leader should be stopped 1349 Args = Meta#{node => LeaderNode, epoch => Epoch}, 1350 Action = {aux, {stop, StreamId, Args, 1351 make_writer_conf(Writer0, Stream0)}}, 1352 Writer = Writer0#member{current = {stopping, Idx}}, 1353 eval_replicas(Meta, Writer, Replicas, Stream0, [Action | Effs0]); 1354 {Writer, Replicas} -> 1355 eval_replicas(Meta, Writer, Replicas, Stream0, Effs0) 1356 end. 1357 1358eval_replicas(Meta, undefined, Replicas, Stream, Actions0) -> 1359 {Members, Actions} = lists:foldl( 1360 fun (R, Acc) -> 1361 eval_replica(Meta, R, deleted, Stream, Acc) 1362 end, {#{}, Actions0}, 1363 Replicas), 1364 {Stream#stream{members = Members}, Actions}; 1365eval_replicas(Meta, #member{state = LeaderState, 1366 node = WriterNode} = Writer, Replicas, 1367 Stream, Actions0) -> 1368 {Members, Actions} = lists:foldl( 1369 fun (R, Acc) -> 1370 eval_replica(Meta, R, LeaderState, 1371 Stream, Acc) 1372 end, {#{WriterNode => Writer}, Actions0}, 1373 Replicas), 1374 {Stream#stream{members = Members}, Actions}. 1375 1376eval_replica(#{index := Idx} = Meta, 1377 #member{state = _State, 1378 target = stopped, 1379 node = Node, 1380 current = undefined} = Replica, 1381 _LeaderState, 1382 #stream{id = StreamId, 1383 epoch = Epoch, 1384 conf = Conf0}, 1385 {Replicas, Actions}) -> 1386 %% if we're not running anything and we aren't stopped and not caught 1387 %% by previous clauses we probably should stop 1388 Args = Meta#{node => Node, epoch => Epoch}, 1389 1390 Conf = Conf0#{epoch => Epoch}, 1391 {Replicas#{Node => Replica#member{current = {stopping, Idx}}}, 1392 [{aux, {stop, StreamId, Args, Conf}} | Actions]}; 1393eval_replica(#{index := Idx} = Meta, #member{state = _, 1394 node = Node, 1395 current = Current, 1396 target = deleted} = Replica, 1397 _LeaderState, #stream{id = StreamId, 1398 epoch = Epoch, 1399 conf = Conf}, {Replicas, Actions0}) -> 1400 1401 case Current of 1402 undefined -> 1403 Args = Meta#{epoch => Epoch, node => Node}, 1404 Actions = [{aux, {delete_member, StreamId, Args, Conf}} | 1405 Actions0], 1406 {Replicas#{Node => Replica#member{current = {deleting, Idx}}}, 1407 Actions}; 1408 _ -> 1409 {Replicas#{Node => Replica}, Actions0} 1410 end; 1411eval_replica(#{index := Idx} = Meta, #member{state = {State, Epoch}, 1412 node = Node, 1413 target = running, 1414 current = undefined} = Replica, 1415 {running, Epoch, Pid}, 1416 #stream{id = StreamId, 1417 epoch = Epoch} = Stream, 1418 {Replicas, Actions}) 1419 when State == ready; State == down -> 1420 %% replica is down or ready and the leader is running 1421 %% time to start it 1422 Conf = make_replica_conf(Pid, Stream), 1423 Args = Meta#{node => Node, epoch => Epoch}, 1424 {Replicas#{Node => Replica#member{current = {starting, Idx}, 1425 conf = Conf}}, 1426 [{aux, {start_replica, StreamId, Args, Conf}} | Actions]}; 1427eval_replica(_Meta, #member{state = {running, Epoch, _}, 1428 target = running, 1429 node = Node} = Replica, 1430 {running, Epoch, _}, _Stream, {Replicas, Actions}) -> 1431 {Replicas#{Node => Replica}, Actions}; 1432eval_replica(_Meta, #member{state = {stopped, _E, _}, 1433 node = Node, 1434 current = undefined} = Replica, 1435 _LeaderState, _Stream, 1436 {Replicas, Actions}) -> 1437 %% if stopped we should just wait for a quorum to reach stopped and 1438 %% update_stream will move to ready state 1439 {Replicas#{Node => Replica}, Actions}; 1440eval_replica(_Meta, #member{state = {ready, E}, 1441 target = running, 1442 node = Node, 1443 current = undefined} = Replica, 1444 {ready, E}, _Stream, 1445 {Replicas, Actions}) -> 1446 %% if we're ready and so is the leader we just wait a swell 1447 {Replicas#{Node => Replica}, Actions}; 1448eval_replica(_Meta, #member{node = Node} = Replica, _LeaderState, _Stream, 1449 {Replicas, Actions}) -> 1450 {Replicas#{Node => Replica}, Actions}. 1451 1452fail_active_actions(Streams, Exclude) -> 1453 maps:map( 1454 fun (_, #stream{id = Id, members = Members}) 1455 when not is_map_key(Id, Exclude) -> 1456 _ = maps:map(fun(_, M) -> 1457 fail_action(Id, M) 1458 end, Members) 1459 end, Streams), 1460 1461 ok. 1462 1463fail_action(_StreamId, #member{current = undefined}) -> 1464 ok; 1465fail_action(StreamId, #member{role = {_, E}, 1466 current = {Action, Idx}, 1467 node = Node}) -> 1468 rabbit_log:debug("~s: failing active action for ~s node ~w Action ~w", 1469 [?MODULE, StreamId, Node, Action]), 1470 %% if we have an action send failure message 1471 send_self_command({action_failed, StreamId, 1472 #{action => Action, 1473 index => Idx, 1474 node => Node, 1475 epoch => E}}). 1476 1477ensure_monitors(#stream{id = StreamId, 1478 members = Members}, Monitors, Effects) -> 1479 maps:fold( 1480 fun 1481 (_, #member{state = {running, _, Pid}}, {M, E}) 1482 when not is_map_key(Pid, M) -> 1483 {M#{Pid => {StreamId, member}}, 1484 [{monitor, process, Pid}, 1485 %% ensure we're always monitoring the node as well 1486 {monitor, node, node(Pid)} | E]}; 1487 (_, _, Acc) -> 1488 Acc 1489 end, {Monitors, Effects}, Members). 1490 1491make_replica_conf(LeaderPid, 1492 #stream{epoch = Epoch, 1493 nodes = Nodes, 1494 conf = Conf}) -> 1495 LeaderNode = node(LeaderPid), 1496 Conf#{leader_node => LeaderNode, 1497 nodes => Nodes, 1498 leader_pid => LeaderPid, 1499 replica_nodes => lists:delete(LeaderNode, Nodes), 1500 epoch => Epoch}. 1501 1502make_writer_conf(#member{node = Node}, #stream{epoch = Epoch, 1503 nodes = Nodes, 1504 conf = Conf}) -> 1505 Conf#{leader_node => Node, 1506 nodes => Nodes, 1507 replica_nodes => lists:delete(Node, Nodes), 1508 epoch => Epoch}. 1509 1510 1511find_leader(Members) -> 1512 case lists:partition( 1513 fun (#member{target = deleted}) -> 1514 false; 1515 (#member{role = {Role, _}}) -> 1516 Role == writer 1517 end, maps:values(Members)) of 1518 {[Writer], Replicas} -> 1519 {Writer, Replicas}; 1520 {[], Replicas} -> 1521 {undefined, Replicas} 1522 end. 1523 1524select_leader(Offsets) -> 1525 [{Node, _} | _] = lists:sort(fun({_, {Ao, E}}, {_, {Bo, E}}) -> 1526 Ao >= Bo; 1527 ({_, {_, Ae}}, {_, {_, Be}}) -> 1528 Ae >= Be; 1529 ({_, empty}, _) -> 1530 false; 1531 (_, {_, empty}) -> 1532 true 1533 end, Offsets), 1534 Node. 1535 1536maybe_sleep({{nodedown, _}, _}) -> 1537 timer:sleep(10000); 1538maybe_sleep({noproc, _}) -> 1539 timer:sleep(5000); 1540maybe_sleep({error, nodedown}) -> 1541 timer:sleep(5000); 1542maybe_sleep({error, _}) -> 1543 timer:sleep(5000); 1544maybe_sleep(_) -> 1545 ok. 1546 1547set_running_to_stopped(Members) -> 1548 maps:map(fun (_, #member{target = running} = M) -> 1549 M#member{target = stopped}; 1550 (_, M) -> 1551 M 1552 end, Members). 1553 1554update_target(#member{target = deleted} = Member, _) -> 1555 %% A deleted member can never transition to another state 1556 Member; 1557update_target(Member, Target) -> 1558 Member#member{target = Target}. 1559