1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbit_stream_coordinator).
9
10-behaviour(ra_machine).
11
12-export([format_ra_event/2]).
13
14-export([init/1,
15         apply/3,
16         state_enter/2,
17         init_aux/1,
18         handle_aux/6,
19         tick/2]).
20
21-export([recover/0,
22         add_replica/2,
23         delete_replica/2,
24         register_listener/1]).
25
26-export([new_stream/2,
27         delete_stream/2]).
28
29-export([policy_changed/1]).
30
31-export([local_pid/1,
32         members/1]).
33-export([query_local_pid/3,
34         query_members/2]).
35
36
37-export([log_overview/1]).
38-export([replay/1]).
39
40-rabbit_boot_step({?MODULE,
41                   [{description, "Restart stream coordinator"},
42                    {mfa,         {?MODULE, recover, []}},
43                    {requires,    core_initialized},
44                    {enables,     recovery}]}).
45
46%% exported for unit tests only
47-ifdef(TEST).
48-export([update_stream/3,
49         evaluate_stream/3]).
50-endif.
51
52-include("rabbit_stream_coordinator.hrl").
53-include("amqqueue.hrl").
54
55-define(REPLICA_FRESHNESS_LIMIT_MS, 10 * 1000). %% 10s
56
57-type state() :: #?MODULE{}.
58-type args() :: #{index := ra:index(),
59                  node := node(),
60                  epoch := osiris:epoch()}.
61
62-type command() :: {new_stream, stream_id(), #{leader_node := node(),
63                                               queue := amqqueue:amqqueue()}} |
64                   {delete_stream, stream_id(), #{}} |
65                   {add_replica, stream_id(), #{node := node()}} |
66                   {delete_replica, stream_id(), #{node := node()}} |
67                   {policy_changed, stream_id(), #{queue := amqqueue:amqqueue()}} |
68                   {register_listener, #{pid := pid(),
69                                         stream_id := stream_id(),
70                                         queue_ref := queue_ref()}} |
71                   {action_failed, stream_id(), #{index := ra:index(),
72                                                  node := node(),
73                                                  epoch := osiris:epoch(),
74                                                  action := atom(), %% TODO: refine
75                                                  term() => term()}} |
76                   {member_started, stream_id(), #{index := ra:index(),
77                                                   node := node(),
78                                                   epoch := osiris:epoch(),
79                                                   pid := pid()}} |
80                   {member_stopped, stream_id(), args()} |
81                   {retention_updated, stream_id(), args()} |
82                   {mnesia_updated, stream_id(), args()} |
83                   ra_machine:effect().
84
85-export_type([command/0]).
86
87recover() ->
88    case erlang:whereis(?MODULE) of
89        undefined ->
90            case ra:restart_server(?RA_SYSTEM, {?MODULE, node()}) of
91                {error, Reason} when Reason == not_started;
92                                     Reason == name_not_registered ->
93                    %% First boot, do nothing and wait until the first `declare`
94                    ok;
95                _ ->
96                    ok
97            end;
98        _ ->
99            ok
100    end.
101
102%% new api
103
104new_stream(Q, LeaderNode)
105  when ?is_amqqueue(Q) andalso is_atom(LeaderNode) ->
106    #{name := StreamId,
107      nodes := Nodes} = amqqueue:get_type_state(Q),
108    %% assertion leader is in nodes configuration
109    true = lists:member(LeaderNode, Nodes),
110    process_command({new_stream, StreamId,
111                     #{leader_node => LeaderNode,
112                       queue => Q}}).
113
114delete_stream(Q, ActingUser)
115  when ?is_amqqueue(Q) ->
116    #{name := StreamId} = amqqueue:get_type_state(Q),
117    case process_command({delete_stream, StreamId, #{}}) of
118        {ok, ok, _} ->
119            QName = amqqueue:get_name(Q),
120              _ = rabbit_amqqueue:internal_delete(QName, ActingUser),
121            {ok, {ok, 0}};
122        Err ->
123            Err
124    end.
125
126-spec add_replica(amqqueue:amqqueue(), node()) ->
127    ok | {error, term()}.
128add_replica(Q, Node) when ?is_amqqueue(Q) ->
129    %% performing safety check
130    %% if any replica is stale then we should not allow
131    %% further replicas to be added
132    Pid = amqqueue:get_pid(Q),
133    try
134        ReplState0 = osiris_writer:query_replication_state(Pid),
135        {{_, InitTs}, ReplState} = maps:take(node(Pid), ReplState0),
136        {MaxTs, MinTs} = maps:fold(fun (_, {_, Ts}, {Max, Min}) ->
137                                           {max(Ts, Max), min(Ts, Min)}
138                                   end, {InitTs, InitTs}, ReplState),
139        case (MaxTs - MinTs) > ?REPLICA_FRESHNESS_LIMIT_MS of
140            true ->
141                {error, {disallowed, out_of_sync_replica}};
142            false ->
143                Name = rabbit_misc:rs(amqqueue:get_name(Q)),
144                rabbit_log:info("~s : adding replica ~s to ~s Replication State: ~w",
145                                [?MODULE, Node, Name, ReplState0]),
146                StreamId = maps:get(name, amqqueue:get_type_state(Q)),
147                case process_command({add_replica, StreamId, #{node => Node}}) of
148                    {ok, Result, _} ->
149                        Result;
150                    Err ->
151                        Err
152                end
153        end
154    catch
155        _:Error ->
156            {error, Error}
157    end.
158
159delete_replica(StreamId, Node) ->
160    process_command({delete_replica, StreamId, #{node => Node}}).
161
162policy_changed(Q) when ?is_amqqueue(Q) ->
163    StreamId = maps:get(name, amqqueue:get_type_state(Q)),
164    process_command({policy_changed, StreamId, #{queue => Q}}).
165
166local_pid(StreamId) when is_list(StreamId) ->
167    MFA = {?MODULE, query_local_pid, [StreamId, node()]},
168    case ra:local_query({?MODULE, node()}, MFA) of
169        {ok, {_, {ok, Pid}}, _} ->
170            case is_process_alive(Pid) of
171                true ->
172                    {ok, Pid};
173                false ->
174                    case ra:consistent_query({?MODULE, node()}, MFA) of
175                        {ok, Result, _} ->
176                            Result;
177                        {error, _} = Err ->
178                            Err;
179                        {timeout, _} ->
180                            {error, timeout}
181                    end
182            end;
183        {ok, {_, Result}, _} ->
184            Result;
185        {error, _} = Err ->
186            Err;
187        {timeout, _} ->
188            {error, timeout}
189    end.
190
191-spec members(stream_id()) ->
192    {ok, #{node() := {pid() | undefined, writer | replica}}} |
193    {error, not_found}.
194members(StreamId) when is_list(StreamId) ->
195    MFA = {?MODULE, query_members, [StreamId]},
196    case ra:local_query({?MODULE, node()}, MFA) of
197        {ok, {_, {ok, _} = Result}, _} ->
198            Result;
199        {ok, {_, {error, not_found}}, _} ->
200            %% fall back to consistent query
201            case ra:consistent_query({?MODULE, node()}, MFA) of
202                {ok, Result, _} ->
203                    Result;
204                {error, _} = Err ->
205                    Err;
206                {timeout, _} ->
207                    {error, timeout}
208            end;
209        {ok, {_, Result}, _} ->
210            Result;
211        {error, _} = Err ->
212            Err;
213        {timeout, _} ->
214            {error, timeout}
215    end.
216
217query_members(StreamId, #?MODULE{streams = Streams}) ->
218    case Streams of
219        #{StreamId := #stream{members = Members}} ->
220            {ok, maps:map(
221                   fun (_, #member{state = {running, _, Pid},
222                                   role = {Role, _}}) ->
223                           {Pid, Role};
224                       (_, #member{role = {Role, _}}) ->
225                           {undefined, Role}
226                   end, Members)};
227        _ ->
228            {error, not_found}
229    end.
230
231query_local_pid(StreamId, Node, #?MODULE{streams = Streams}) ->
232    case Streams of
233        #{StreamId := #stream{members =
234                              #{Node := #member{state =
235                                                {running, _, Pid}}}}} ->
236            {ok, Pid};
237        _ ->
238            {error, not_found}
239    end.
240
241-spec register_listener(amqqueue:amqqueue()) ->
242    {error, term()} | {ok, ok, atom() | {atom(), atom()}}.
243register_listener(Q) when ?is_amqqueue(Q)->
244    #{name := StreamId} = amqqueue:get_type_state(Q),
245    process_command({register_listener,
246                     #{pid => self(),
247                       stream_id => StreamId}}).
248
249process_command(Cmd) ->
250    Servers = ensure_coordinator_started(),
251    process_command(Servers, Cmd).
252
253process_command([], _Cmd) ->
254    {error, coordinator_unavailable};
255process_command([Server | Servers], Cmd) ->
256    case ra:process_command(Server, Cmd, ?CMD_TIMEOUT) of
257        {timeout, _} ->
258            rabbit_log:warning("Coordinator timeout on server ~w when processing command ~W",
259                               [element(2, Server), element(1, Cmd), 10]),
260            process_command(Servers, Cmd);
261        {error, noproc} ->
262            process_command(Servers, Cmd);
263        Reply ->
264            Reply
265    end.
266
267ensure_coordinator_started() ->
268    Local = {?MODULE, node()},
269    AllNodes = all_coord_members(),
270    case whereis(?MODULE) of
271        undefined ->
272            global:set_lock(?STREAM_COORDINATOR_STARTUP),
273            Nodes = case ra:restart_server(?RA_SYSTEM, Local) of
274                {error, Reason} when Reason == not_started orelse
275                                     Reason == name_not_registered ->
276                    OtherNodes = all_coord_members() -- [Local],
277                    %% We can't use find_members/0 here as a process that timeouts means the cluster is up
278                    case lists:filter(fun(N) -> global:whereis_name(N) =/= undefined end, OtherNodes) of
279                        [] ->
280                            start_coordinator_cluster();
281                        _ ->
282                            OtherNodes
283                    end;
284                ok ->
285                    AllNodes;
286                {error, {already_started, _}} ->
287                            AllNodes;
288                _ ->
289                    AllNodes
290            end,
291            global:del_lock(?STREAM_COORDINATOR_STARTUP),
292            Nodes;
293        _ ->
294            AllNodes
295    end.
296
297start_coordinator_cluster() ->
298    Nodes = rabbit_mnesia:cluster_nodes(running),
299    rabbit_log:debug("Starting stream coordinator on nodes: ~w", [Nodes]),
300    case ra:start_cluster(?RA_SYSTEM, [make_ra_conf(Node, Nodes) || Node <-  Nodes]) of
301        {ok, Started, _} ->
302            rabbit_log:debug("Started stream coordinator on ~w", [Started]),
303            Started;
304        {error, cluster_not_formed} ->
305            rabbit_log:warning("Stream coordinator could not be started on nodes ~w",
306                               [Nodes]),
307            []
308    end.
309
310all_coord_members() ->
311    Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()],
312    [{?MODULE, Node} || Node <- [node() | Nodes]].
313
314init(_Conf) ->
315    #?MODULE{}.
316
317-spec apply(map(), command(), state()) ->
318    {state(), term(), ra_machine:effects()}.
319apply(#{index := _Idx} = Meta0, {_CmdTag, StreamId, #{}} = Cmd,
320      #?MODULE{streams = Streams0,
321               monitors = Monitors0} = State0) ->
322    Stream0 = maps:get(StreamId, Streams0, undefined),
323    Meta = maps:without([term, machine_version], Meta0),
324    case filter_command(Meta, Cmd, Stream0) of
325        ok ->
326            Stream1 = update_stream(Meta, Cmd, Stream0),
327            Reply = case Stream1 of
328                        #stream{reply_to = undefined} ->
329                            ok;
330                        _ ->
331                            %% reply_to is set so we'll reply later
332                            '$ra_no_reply'
333                    end,
334            case Stream1 of
335                undefined ->
336                    return(Meta, State0#?MODULE{streams = maps:remove(StreamId, Streams0)},
337                           Reply, []);
338                _ ->
339                    {Stream2, Effects0} = evaluate_stream(Meta, Stream1, []),
340                    {Stream3, Effects1} = eval_listeners(Stream2, Effects0),
341                    {Stream, Effects2} = eval_retention(Meta, Stream3, Effects1),
342                    {Monitors, Effects} = ensure_monitors(Stream, Monitors0, Effects2),
343                    return(Meta,
344                           State0#?MODULE{streams = Streams0#{StreamId => Stream},
345                                          monitors = Monitors}, Reply, Effects)
346            end;
347        Reply ->
348            return(Meta, State0, Reply, [])
349    end;
350apply(Meta, {down, Pid, Reason} = Cmd,
351      #?MODULE{streams = Streams0,
352               listeners = Listeners0,
353               monitors = Monitors0} = State) ->
354
355    Effects0 = case Reason of
356                   noconnection ->
357                       [{monitor, node, node(Pid)}];
358                   _ ->
359                       []
360               end,
361    case maps:take(Pid, Monitors0) of
362        {{StreamId, listener}, Monitors} ->
363            Listeners = case maps:take(StreamId, Listeners0) of
364                            error ->
365                                Listeners0;
366                            {Pids0, Listeners1} ->
367                                case maps:remove(Pid, Pids0) of
368                                    Pids when map_size(Pids) == 0 ->
369                                        Listeners1;
370                                    Pids ->
371                                        Listeners1#{StreamId => Pids}
372                                end
373                        end,
374            return(Meta, State#?MODULE{listeners = Listeners,
375                                       monitors = Monitors}, ok, Effects0);
376        {{StreamId, member}, Monitors1} ->
377            case Streams0 of
378                #{StreamId := Stream0} ->
379                    Stream1 = update_stream(Meta, Cmd, Stream0),
380                    {Stream, Effects} = evaluate_stream(Meta, Stream1, Effects0),
381                    Streams = Streams0#{StreamId => Stream},
382                    return(Meta, State#?MODULE{streams = Streams,
383                                               monitors = Monitors1}, ok,
384                           Effects);
385                _ ->
386                    %% stream not found, can happen if "late" downs are
387                    %% received
388                    return(Meta, State#?MODULE{streams = Streams0,
389                                               monitors = Monitors1}, ok, Effects0)
390            end;
391        error ->
392            return(Meta, State, ok, Effects0)
393    end;
394apply(Meta, {register_listener, #{pid := Pid,
395                                  stream_id := StreamId}},
396      #?MODULE{streams = Streams,
397               monitors = Monitors0} = State0) ->
398    case Streams of
399        #{StreamId := #stream{listeners = Listeners0} = Stream0} ->
400            Stream1 = Stream0#stream{listeners = maps:put(Pid, undefined, Listeners0)},
401            {Stream, Effects} = eval_listeners(Stream1, []),
402            Monitors = maps:put(Pid, {StreamId, listener}, Monitors0),
403            return(Meta,
404                   State0#?MODULE{streams = maps:put(StreamId, Stream, Streams),
405                                  monitors = Monitors}, ok,
406                   [{monitor, process, Pid} | Effects]);
407        _ ->
408            return(Meta, State0, stream_not_found, [])
409    end;
410apply(Meta, {nodeup, Node} = Cmd,
411      #?MODULE{monitors = Monitors0,
412               streams = Streams0} = State)  ->
413    %% reissue monitors for all disconnected members
414    {Effects0, Monitors} =
415        maps:fold(
416          fun(_, #stream{id = Id,
417                         members = M}, {Acc, Mon}) ->
418                  case M of
419                      #{Node := #member{state = {disconnected, _, P}}} ->
420                          {[{monitor, process, P} | Acc],
421                           Mon#{P => {Id, member}}};
422                      _ ->
423                          {Acc, Mon}
424                  end
425          end, {[], Monitors0}, Streams0),
426    {Streams, Effects} =
427        maps:fold(fun (Id, S0, {Ss, E0}) ->
428                          S1 = update_stream(Meta, Cmd, S0),
429                          {S, E} = evaluate_stream(Meta, S1, E0),
430                          {Ss#{Id => S}, E}
431                  end, {Streams0, Effects0}, Streams0),
432    return(Meta, State#?MODULE{monitors = Monitors,
433                               streams = Streams}, ok, Effects);
434apply(Meta, UnkCmd, State) ->
435    rabbit_log:debug("~s: unknown command ~W",
436                     [?MODULE, UnkCmd, 10]),
437    return(Meta, State, {error, unknown_command}, []).
438
439return(#{index := Idx}, State, Reply, Effects) ->
440    case Idx rem 4096 == 0 of
441        true ->
442            %% add release cursor effect
443            {State, Reply, [{release_cursor, Idx, State} | Effects]};
444        false ->
445            {State, Reply, Effects}
446    end.
447
448state_enter(recover, _) ->
449    put('$rabbit_vm_category', ?MODULE),
450    [];
451state_enter(leader, #?MODULE{streams = Streams,
452                             monitors = Monitors}) ->
453    Pids = maps:keys(Monitors),
454    %% monitor all the known nodes
455    Nodes = all_member_nodes(Streams),
456    NodeMons = [{monitor, node, N} || N <- Nodes],
457    NodeMons ++ [{aux, fail_active_actions} |
458                 [{monitor, process, P} || P <- Pids]];
459state_enter(_S, _) ->
460    [].
461
462all_member_nodes(Streams) ->
463    maps:keys(
464      maps:fold(
465        fun (_, #stream{members = M}, Acc) ->
466                maps:merge(Acc, M)
467        end, #{}, Streams)).
468
469tick(_Ts, _State) ->
470    [{aux, maybe_resize_coordinator_cluster}].
471
472maybe_resize_coordinator_cluster() ->
473    spawn(fun() ->
474                  case ra:members({?MODULE, node()}) of
475                      {_, Members, _} ->
476                          MemberNodes = [Node || {_, Node} <- Members],
477                          Running = rabbit_mnesia:cluster_nodes(running),
478                          All = rabbit_nodes:all(),
479                          case Running -- MemberNodes of
480                              [] ->
481                                  ok;
482                              New ->
483                                  rabbit_log:info("~s: New rabbit node(s) detected, "
484                                                  "adding : ~w",
485                                                  [?MODULE, New]),
486                                  add_members(Members, New)
487                          end,
488                          case MemberNodes -- All of
489                              [] ->
490                                  ok;
491                              Old ->
492                                  rabbit_log:info("~s: Rabbit node(s) removed from the cluster, "
493                                                  "deleting: ~w", [?MODULE, Old]),
494                                  remove_members(Members, Old)
495                          end;
496                      _ ->
497                          ok
498                  end
499          end).
500
501add_members(_, []) ->
502    ok;
503add_members(Members, [Node | Nodes]) ->
504    Conf = make_ra_conf(Node, [N || {_, N} <- Members]),
505    case ra:start_server(?RA_SYSTEM, Conf) of
506        ok ->
507            case ra:add_member(Members, {?MODULE, Node}) of
508                {ok, NewMembers, _} ->
509                    add_members(NewMembers, Nodes);
510                _ ->
511                    add_members(Members, Nodes)
512            end;
513        Error ->
514            rabbit_log:warning("Stream coordinator failed to start on node ~s : ~W",
515                               [Node, Error, 10]),
516            add_members(Members, Nodes)
517    end.
518
519remove_members(_, []) ->
520    ok;
521remove_members(Members, [Node | Nodes]) ->
522    case ra:remove_member(Members, {?MODULE, Node}) of
523        {ok, NewMembers, _} ->
524            remove_members(NewMembers, Nodes);
525        _ ->
526            remove_members(Members, Nodes)
527    end.
528
529-record(aux, {actions = #{} ::
530              #{pid() := {stream_id(), #{node := node(),
531                                         index := non_neg_integer(),
532                                         epoch := osiris:epoch()}}},
533              resizer :: undefined | pid()}).
534
535init_aux(_Name) ->
536    #aux{}.
537
538%% TODO ensure the dead writer is restarted as a replica at some point in time, increasing timeout?
539handle_aux(leader, _, maybe_resize_coordinator_cluster,
540           #aux{resizer = undefined} = Aux, LogState, _) ->
541    Pid = maybe_resize_coordinator_cluster(),
542    {no_reply, Aux#aux{resizer = Pid}, LogState, [{monitor, process, aux, Pid}]};
543handle_aux(leader, _, maybe_resize_coordinator_cluster,
544           AuxState, LogState, _) ->
545    %% Coordinator resizing is still happening, let's ignore this tick event
546    {no_reply, AuxState, LogState};
547handle_aux(leader, _, {down, Pid, _},
548           #aux{resizer = Pid} = Aux, LogState, _) ->
549    %% Coordinator resizing has finished
550    {no_reply, Aux#aux{resizer = undefined}, LogState};
551handle_aux(leader, _, {start_writer, StreamId,
552                       #{epoch := Epoch, node := Node} = Args, Conf},
553           Aux, LogState, _) ->
554    rabbit_log:debug("~s: running action: 'start_writer'"
555                     " for ~s on node ~w in epoch ~b",
556                     [?MODULE, StreamId, Node, Epoch]),
557    ActionFun = phase_start_writer(StreamId, Args, Conf),
558    run_action(starting, StreamId, Args, ActionFun, Aux, LogState);
559handle_aux(leader, _, {start_replica, StreamId,
560                       #{epoch := Epoch, node := Node} = Args, Conf},
561           Aux, LogState, _) ->
562    rabbit_log:debug("~s: running action: 'start_replica'"
563                     " for ~s on node ~w in epoch ~b",
564                     [?MODULE, StreamId, Node, Epoch]),
565    ActionFun = phase_start_replica(StreamId, Args, Conf),
566    run_action(starting, StreamId, Args, ActionFun, Aux, LogState);
567handle_aux(leader, _, {stop, StreamId, #{node := Node,
568                                         epoch := Epoch} = Args, Conf},
569           Aux, LogState, _) ->
570    rabbit_log:debug("~s: running action: 'stop'"
571                     " for ~s on node ~w in epoch ~b",
572                     [?MODULE, StreamId, Node, Epoch]),
573    ActionFun = phase_stop_member(StreamId, Args, Conf),
574    run_action(stopping, StreamId, Args, ActionFun, Aux, LogState);
575handle_aux(leader, _, {update_mnesia, StreamId, Args, Conf},
576           #aux{actions = _Monitors} = Aux, LogState,
577           #?MODULE{streams = _Streams}) ->
578    rabbit_log:debug("~s: running action: 'update_mnesia'"
579                     " for ~s", [?MODULE, StreamId]),
580    ActionFun = phase_update_mnesia(StreamId, Args, Conf),
581    run_action(updating_mnesia, StreamId, Args, ActionFun, Aux, LogState);
582handle_aux(leader, _, {update_retention, StreamId, Args, _Conf},
583           #aux{actions = _Monitors} = Aux, LogState,
584           #?MODULE{streams = _Streams}) ->
585    rabbit_log:debug("~s: running action: 'update_retention'"
586                     " for ~s", [?MODULE, StreamId]),
587    ActionFun = phase_update_retention(StreamId, Args),
588    run_action(update_retention, StreamId, Args, ActionFun, Aux, LogState);
589handle_aux(leader, _, {delete_member, StreamId, #{node := Node} = Args, Conf},
590           #aux{actions = _Monitors} = Aux, LogState,
591           #?MODULE{streams = _Streams}) ->
592    rabbit_log:debug("~s: running action: 'delete_member'"
593                     " for ~s ~s", [?MODULE, StreamId, Node]),
594    ActionFun = phase_delete_member(StreamId, Args, Conf),
595    run_action(delete_member, StreamId, Args, ActionFun, Aux, LogState);
596handle_aux(leader, _, fail_active_actions,
597           #aux{actions = Monitors} = Aux, LogState,
598           #?MODULE{streams = Streams}) ->
599    Exclude = maps:from_list([{S, ok}
600                              || {P, {S, _, _}} <- maps:to_list(Monitors),
601                             not is_process_alive(P)]),
602    rabbit_log:debug("~s: failing actions: ~w", [?MODULE, Exclude]),
603    fail_active_actions(Streams, Exclude),
604    {no_reply, Aux, LogState, []};
605handle_aux(leader, _, {down, Pid, normal},
606           #aux{actions = Monitors} = Aux, LogState, _) ->
607    %% action process finished normally, just remove from actions map
608    {no_reply, Aux#aux{actions = maps:remove(Pid, Monitors)}, LogState, []};
609handle_aux(leader, _, {down, Pid, Reason},
610           #aux{actions = Monitors0} = Aux, LogState, _) ->
611    %% An action has failed - report back to the state machine
612    case maps:get(Pid, Monitors0, undefined) of
613        {StreamId, Action, #{node := Node, epoch := Epoch} = Args} ->
614            rabbit_log:warning("~s: error while executing action for stream queue ~s, "
615                               " node ~s, epoch ~b Err: ~w",
616                               [?MODULE, StreamId, Node, Epoch, Reason]),
617            Monitors = maps:remove(Pid, Monitors0),
618            Cmd = {action_failed, StreamId, Args#{action => Action}},
619            send_self_command(Cmd),
620            {no_reply, Aux#aux{actions = maps:remove(Pid, Monitors)},
621             LogState, []};
622        undefined ->
623            %% should this ever happen?
624            {no_reply, Aux, LogState, []}
625    end;
626handle_aux(_, _, _, AuxState, LogState, _) ->
627    {no_reply, AuxState, LogState}.
628
629run_action(Action, StreamId, #{node := _Node,
630                               epoch := _Epoch} = Args,
631           ActionFun, #aux{actions = Actions0} = Aux, Log) ->
632    Coordinator = self(),
633    Pid = spawn_link(fun() ->
634                             ActionFun(),
635                             unlink(Coordinator)
636                     end),
637    Effects = [{monitor, process, aux, Pid}],
638    Actions = Actions0#{Pid => {StreamId, Action, Args}},
639    {no_reply, Aux#aux{actions = Actions}, Log, Effects}.
640
641wrap_reply(From, Reply) ->
642    [{reply, From, {wrap_reply, Reply}}].
643
644phase_start_replica(StreamId, #{epoch := Epoch,
645                                node := Node} = Args, Conf0) ->
646    fun() ->
647            try osiris_replica:start(Node, Conf0) of
648                {ok, Pid} ->
649                    rabbit_log:info("~s: ~s: replica started on ~s in ~b pid ~w",
650                                    [?MODULE, StreamId, Node, Epoch, Pid]),
651                    send_self_command({member_started, StreamId,
652                                       Args#{pid => Pid}});
653                {error, already_present} ->
654                    %% need to remove child record if this is the case
655                    %% can it ever happen?
656                    _ = osiris_replica:stop(Node, Conf0),
657                    send_action_failed(StreamId, starting, Args);
658                {error, {already_started, Pid}} ->
659                    %% TODO: we need to check that the current epoch is the same
660                    %% before we can be 100% sure it is started in the correct
661                    %% epoch, can this happen? who knows...
662                    send_self_command({member_started, StreamId,
663                                       Args#{pid => Pid}});
664                {error, Reason} ->
665                    rabbit_log:warning("~s: Error while starting replica for ~s on node ~s in ~b : ~W",
666                                       [?MODULE, maps:get(name, Conf0), Node, Epoch, Reason, 10]),
667                    maybe_sleep(Reason),
668                    send_action_failed(StreamId, starting, Args)
669            catch _:Error ->
670                    rabbit_log:warning("~s: Error while starting replica for ~s on node ~s in ~b : ~W",
671                                       [?MODULE, maps:get(name, Conf0), Node, Epoch, Error, 10]),
672                    maybe_sleep(Error),
673                    send_action_failed(StreamId, starting, Args)
674            end
675    end.
676
677send_action_failed(StreamId, Action, Arg) ->
678  send_self_command({action_failed, StreamId, Arg#{action => Action}}).
679
680send_self_command(Cmd) ->
681    ra:pipeline_command({?MODULE, node()}, Cmd),
682    ok.
683
684
685phase_delete_member(StreamId, #{node := Node} = Arg, Conf) ->
686    fun() ->
687            try osiris_server_sup:delete_child(Node, Conf) of
688                ok ->
689                    send_self_command({member_deleted, StreamId, Arg});
690                _ ->
691                    send_action_failed(StreamId, deleting, Arg)
692            catch _:E ->
693                    rabbit_log:warning("~s: Error while deleting member for ~s : on node ~s ~W",
694                                       [?MODULE, StreamId, Node, E, 10]),
695                    maybe_sleep(E),
696                    send_action_failed(StreamId, deleting, Arg)
697            end
698    end.
699
700phase_stop_member(StreamId, #{node := Node,
701                              epoch := Epoch} = Arg0, Conf) ->
702    fun() ->
703            try osiris_server_sup:stop_child(Node, StreamId) of
704                ok ->
705                    %% get tail
706                    try get_replica_tail(Node, Conf) of
707                        {ok, Tail} ->
708                            Arg = Arg0#{tail => Tail},
709                            rabbit_log:debug("~s: ~s: member stopped on ~s in ~b Tail ~w",
710                                             [?MODULE, StreamId, Node, Epoch, Tail]),
711                            send_self_command({member_stopped, StreamId, Arg});
712                        Err ->
713                            rabbit_log:warning("~s: failed to get tail of member ~s on ~s in ~b Error: ~w",
714                                               [?MODULE, StreamId, Node, Epoch, Err]),
715                            maybe_sleep(Err),
716                            send_action_failed(StreamId, stopping, Arg0)
717                    catch _:Err ->
718                            rabbit_log:warning("~s: failed to get tail of member ~s on ~s in ~b Error: ~w",
719                                               [?MODULE, StreamId, Node, Epoch, Err]),
720                            maybe_sleep(Err),
721                            send_action_failed(StreamId, stopping, Arg0)
722                    end;
723                Err ->
724                    rabbit_log:warning("~s: failed to stop "
725                                       "member ~s ~w Error: ~w",
726                                       [?MODULE, StreamId, Node, Err]),
727                    maybe_sleep(Err),
728                    send_action_failed(StreamId, stopping, Arg0)
729            catch _:Err ->
730                      rabbit_log:warning("~s: failed to stop member ~s ~w Error: ~w",
731                                         [?MODULE, StreamId, Node, Err]),
732                      maybe_sleep(Err),
733                      send_action_failed(StreamId, stopping, Arg0)
734            end
735    end.
736
737phase_start_writer(StreamId, #{epoch := Epoch,
738                               node := Node} = Args0, Conf) ->
739    fun() ->
740            try osiris_writer:start(Conf) of
741                {ok, Pid} ->
742                    Args = Args0#{epoch => Epoch, pid => Pid},
743                    rabbit_log:info("~s: started writer ~s on ~w in ~b",
744                                    [?MODULE, StreamId, Node, Epoch]),
745                    send_self_command({member_started, StreamId, Args});
746                Err ->
747                    %% no sleep for writer failures as we want to trigger a new
748                    %% election asap
749                    rabbit_log:warning("~s: failed to start writer ~s on ~s in ~b Error: ~w",
750                                       [?MODULE, StreamId, Node, Epoch, Err]),
751                    send_action_failed(StreamId, starting, Args0)
752            catch _:Err ->
753                    rabbit_log:warning("~s: failed to start writer ~s on ~s in ~b Error: ~w",
754                                       [?MODULE, StreamId, Node, Epoch, Err]),
755                    send_action_failed(StreamId, starting, Args0)
756            end
757    end.
758
759phase_update_retention(StreamId, #{pid := Pid,
760                                   retention := Retention} = Args) ->
761    fun() ->
762            try osiris:update_retention(Pid, Retention) of
763                ok ->
764                    send_self_command({retention_updated, StreamId, Args});
765                {error, Reason} = Err ->
766                    rabbit_log:warning("~s: failed to update retention for ~s ~w Reason: ~w",
767                                       [?MODULE, StreamId, node(Pid), Reason]),
768                    maybe_sleep(Err),
769                    send_action_failed(StreamId, update_retention, Args)
770            catch _:Err ->
771                    rabbit_log:warning("~s: failed to update retention for ~s ~w Error: ~w",
772                                       [?MODULE, StreamId, node(Pid), Err]),
773                    maybe_sleep(Err),
774                    send_action_failed(StreamId, update_retention, Args)
775            end
776    end.
777
778get_replica_tail(Node, Conf) ->
779    case rpc:call(Node, ?MODULE, log_overview, [Conf]) of
780        {badrpc, nodedown} ->
781            {error, nodedown};
782        {error, _} = Err ->
783            Err;
784        {_Range, Offsets} ->
785            {ok, select_highest_offset(Offsets)}
786    end.
787
788select_highest_offset([]) ->
789    empty;
790select_highest_offset(Offsets) ->
791    lists:last(Offsets).
792
793log_overview(Config) ->
794    case whereis(osiris_sup) of
795        undefined ->
796            {error, app_not_running};
797        _ ->
798            Dir = osiris_log:directory(Config),
799            osiris_log:overview(Dir)
800    end.
801
802
803replay(L) when is_list(L) ->
804    lists:foldl(
805      fun ({M, E}, Acc) ->
806              element(1, ?MODULE:apply(M, E, Acc))
807      end, init(#{}), L).
808
809is_quorum(1, 1) ->
810    true;
811is_quorum(NumReplicas, NumAlive) ->
812    NumAlive >= ((NumReplicas div 2) + 1).
813
814phase_update_mnesia(StreamId, Args, #{reference := QName,
815                                      leader_pid := LeaderPid} = Conf) ->
816    fun() ->
817            rabbit_log:debug("~s: running mnesia update for ~s: ~W",
818                             [?MODULE, StreamId, Conf, 10]),
819            Fun = fun (Q) ->
820                          case amqqueue:get_type_state(Q) of
821                              #{name := S} when S == StreamId ->
822                                  %% the stream id matches so we can update the
823                                  %% amqqueue record
824                                  amqqueue:set_type_state(
825                                    amqqueue:set_pid(Q, LeaderPid), Conf);
826                              Ts ->
827                                  S = maps:get(name, Ts, undefined),
828                                  rabbit_log:debug("~s: refusing mnesia update for stale stream id ~s, current ~s",
829                                                   [?MODULE, StreamId, S]),
830                                  %% if the stream id isn't a match this is a stale
831                                  %% update from a previous stream incarnation for the
832                                  %% same queue name and we ignore it
833                                  Q
834                          end
835                  end,
836            try rabbit_misc:execute_mnesia_transaction(
837                  fun() ->
838                          rabbit_amqqueue:update(QName, Fun)
839                  end) of
840                not_found ->
841                    rabbit_log:debug("~s: resource for stream id ~s not found, "
842                                     "recovering from rabbit_durable_queue",
843                                     [?MODULE, StreamId]),
844                    %% This can happen during recovery
845                    %% we need to re-initialise the queue record
846                    %% if the stream id is a match
847                    [Q] = mnesia:dirty_read(rabbit_durable_queue, QName),
848                    case amqqueue:get_type_state(Q) of
849                        #{name := S} when S == StreamId ->
850                            rabbit_log:debug("~s: initializing queue record for stream id  ~s",
851                                             [?MODULE, StreamId]),
852                            _ = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)),
853                            ok;
854                        _ ->
855                            ok
856                    end,
857
858                    send_self_command({mnesia_updated, StreamId, Args});
859                _ ->
860                    send_self_command({mnesia_updated, StreamId, Args})
861            catch _:E ->
862                    rabbit_log:debug("~s: failed to update mnesia for ~s: ~W",
863                                     [?MODULE, StreamId, E, 10]),
864                    send_action_failed(StreamId, updating_mnesia, Args)
865            end
866    end.
867
868format_ra_event(ServerId, Evt) ->
869    {stream_coordinator_event, ServerId, Evt}.
870
871make_ra_conf(Node, Nodes) ->
872    UId = ra:new_uid(ra_lib:to_binary(?MODULE)),
873    Formatter = {?MODULE, format_ra_event, []},
874    Members = [{?MODULE, N} || N <- Nodes],
875    TickTimeout = application:get_env(rabbit, stream_tick_interval,
876                                      ?TICK_TIMEOUT),
877    #{cluster_name => ?MODULE,
878      id => {?MODULE, Node},
879      uid => UId,
880      friendly_name => atom_to_list(?MODULE),
881      metrics_key => ?MODULE,
882      initial_members => Members,
883      log_init_args => #{uid => UId},
884      tick_timeout => TickTimeout,
885      machine => {module, ?MODULE, #{}},
886      ra_event_formatter => Formatter}.
887
888filter_command(_Meta, {delete_replica, _, #{node := Node}}, #stream{id = StreamId,
889                                                                    members = Members0}) ->
890    Members = maps:filter(fun(_, #member{target = S}) when S =/= deleted ->
891                                  true;
892                             (_, _) ->
893                                  false
894                          end, Members0),
895    case maps:size(Members) =< 1 of
896        true ->
897            rabbit_log:warning(
898              "~s failed to delete replica on node ~s for stream ~s: refusing to delete the only replica",
899              [?MODULE, Node, StreamId]),
900            {error, last_stream_member};
901        false ->
902            ok
903    end;
904filter_command(_, _, _) ->
905    ok.
906
907update_stream(Meta, Cmd, Stream) ->
908    try
909        update_stream0(Meta, Cmd, Stream)
910    catch
911        _:E:Stacktrace ->
912            rabbit_log:warning(
913              "~s failed to update stream:~n~W~n~W",
914              [?MODULE, E, 10, Stacktrace, 10]),
915            Stream
916    end.
917
918update_stream0(#{system_time := _} = Meta,
919               {new_stream, StreamId, #{leader_node := LeaderNode,
920                                        queue := Q}}, undefined) ->
921    #{nodes := Nodes} = Conf = amqqueue:get_type_state(Q),
922    %% this jumps straight to the state where all members
923    %% have been stopped and a new writer has been chosen
924    E = 1,
925    QueueRef = amqqueue:get_name(Q),
926    Members = maps:from_list(
927                [{N, #member{role = case LeaderNode of
928                                        N -> {writer, E};
929                                        _ -> {replica, E}
930                                    end,
931                             node = N,
932                             state = {ready, E},
933                             %% no members are running actions
934                             current = undefined}
935                 } || N <- Nodes]),
936    #stream{id = StreamId,
937            epoch = E,
938            nodes = Nodes,
939            queue_ref = QueueRef,
940            conf = Conf,
941            members = Members,
942            reply_to = maps:get(from, Meta, undefined)};
943update_stream0(#{system_time := _Ts} = _Meta,
944               {delete_stream, _StreamId, #{}},
945               #stream{members = Members0,
946                       target = _} = Stream0) ->
947    Members = maps:map(
948                fun (_, M) ->
949                        M#member{target = deleted}
950                end, Members0),
951    Stream0#stream{members = Members,
952                   %% reset reply_to here to ensure a reply
953                   %% is returned as the command has been accepted
954                   reply_to = undefined,
955                   target = deleted};
956update_stream0(#{system_time := _Ts} = _Meta,
957               {add_replica, _StreamId, #{node := Node}},
958               #stream{members = Members0,
959                       epoch = Epoch,
960                       nodes = Nodes,
961                       target = _} = Stream0) ->
962    case maps:is_key(Node, Members0) of
963        true ->
964            Stream0;
965        false ->
966            Members1 = Members0#{Node => #member{role = {replica, Epoch},
967                                                 node = Node,
968                                                 target = stopped}},
969            Members = set_running_to_stopped(Members1),
970            Stream0#stream{members = Members,
971                           nodes = lists:sort([Node | Nodes])}
972    end;
973update_stream0(#{system_time := _Ts} = _Meta,
974               {delete_replica, _StreamId, #{node := Node}},
975               #stream{members = Members0,
976                       epoch = _Epoch,
977                       nodes = Nodes,
978                       target = _} = Stream0) ->
979    case maps:is_key(Node, Members0) of
980        true ->
981            %% TODO: check of duplicate
982            Members = maps:map(
983                        fun (K, M) when K == Node ->
984                                M#member{target = deleted};
985                            (_, #member{target = running} = M) ->
986                                M#member{target = stopped};
987                            (_, M) ->
988                                M
989                        end, Members0),
990            Stream0#stream{members = Members,
991                           nodes = lists:delete(Node, Nodes)};
992        false ->
993            Stream0
994    end;
995update_stream0(#{system_time := _Ts},
996               {member_started, _StreamId,
997                #{epoch := E,
998                  index := Idx,
999                  pid := Pid} = Args}, #stream{epoch = E,
1000                                               members = Members} = Stream0) ->
1001    Node = node(Pid),
1002    case maps:get(Node, Members, undefined) of
1003        #member{role = {_, E},
1004                current = {starting, Idx},
1005                state = _} = Member0 ->
1006            %% this is what we expect, leader epoch should match overall
1007            %% epoch
1008            Member = Member0#member{state = {running, E, Pid},
1009                                    current = undefined},
1010            %% TODO: we need to tell the machine to monitor the leader
1011            Stream0#stream{members =
1012                           Members#{Node => Member}};
1013        Member ->
1014            %% do we just ignore any members started events from unexpected
1015            %% epochs?
1016            rabbit_log:warning("~s: member started unexpected ~w ~w",
1017                               [?MODULE, Args, Member]),
1018            Stream0
1019    end;
1020update_stream0(#{system_time := _Ts},
1021               {member_deleted, _StreamId, #{node := Node}},
1022               #stream{nodes = Nodes,
1023                       members = Members0} = Stream0) ->
1024    case maps:take(Node, Members0) of
1025        {_, Members} when map_size(Members) == 0 ->
1026            undefined;
1027        {#member{state = _}, Members} ->
1028            %% this is what we expect, leader epoch should match overall
1029            %% epoch
1030            Stream0#stream{nodes = lists:delete(Node, Nodes),
1031                           members = Members};
1032        _ ->
1033            %% do we just ignore any writer_started events from unexpected
1034            %% epochs?
1035            Stream0
1036    end;
1037update_stream0(#{system_time := _Ts},
1038               {member_stopped, _StreamId,
1039                #{node := Node,
1040                  index := Idx,
1041                  epoch := StoppedEpoch,
1042                  tail := Tail}}, #stream{epoch = Epoch,
1043                                          target = Target,
1044                                          nodes = Nodes,
1045                                          members = Members0} = Stream0) ->
1046    IsLeaderInCurrent = case find_leader(Members0) of
1047                            {#member{role = {writer, Epoch},
1048                                     target = running,
1049                                     state = {ready, Epoch}}, _} ->
1050                                true;
1051                            {#member{role = {writer, Epoch},
1052                                     target = running,
1053                                     state = {running, Epoch, _}}, _} ->
1054                                true;
1055                            _ ->
1056                                false
1057                        end,
1058    case maps:get(Node, Members0) of
1059        #member{role = {replica, Epoch},
1060                current = {stopping, Idx},
1061                state = _} = Member0
1062          when IsLeaderInCurrent ->
1063            %% A leader has already been selected so skip straight to ready state
1064            Member = update_target(Member0#member{state = {ready, Epoch},
1065                                                  current = undefined}, Target),
1066            Members1 = Members0#{Node => Member},
1067            Stream0#stream{members = Members1};
1068        #member{role = {_, Epoch},
1069                current = {stopping, Idx},
1070                state = _} = Member0 ->
1071            %% this is what we expect, member epoch should match overall
1072            %% epoch
1073            Member = case StoppedEpoch of
1074                         Epoch ->
1075                             update_target(Member0#member{state = {stopped, StoppedEpoch, Tail},
1076                                                          current = undefined}, Target);
1077                         _ ->
1078                             %% if stopped epoch is from another epoch
1079                             %% leave target as is to retry stop in current term
1080                             Member0#member{state = {stopped, StoppedEpoch, Tail},
1081                                            current = undefined}
1082                     end,
1083
1084            Members1 = Members0#{Node => Member},
1085
1086            Offsets = [{N, T}
1087                       || #member{state = {stopped, E, T},
1088                                  target = running,
1089                                  node = N} <- maps:values(Members1),
1090                          E == Epoch],
1091            case is_quorum(length(Nodes), length(Offsets)) of
1092                true ->
1093                    %% select leader
1094                    NewWriterNode = select_leader(Offsets),
1095                    NextEpoch = Epoch + 1,
1096                    Members = maps:map(
1097                                fun (N, #member{state = {stopped, E, _}} = M)
1098                                      when E == Epoch ->
1099                                        case NewWriterNode of
1100                                            N ->
1101                                                %% new leader
1102                                                M#member{role = {writer, NextEpoch},
1103                                                         state = {ready, NextEpoch}};
1104                                            _ ->
1105                                                M#member{role = {replica, NextEpoch},
1106                                                         state = {ready, NextEpoch}}
1107                                        end;
1108                                    (_N, #member{target = deleted} = M) ->
1109                                        M;
1110                                    (_N, M) ->
1111                                        M#member{role = {replica, NextEpoch}}
1112                                end, Members1),
1113                    Stream0#stream{epoch = NextEpoch,
1114                                   members = Members};
1115                false ->
1116                    Stream0#stream{members = Members1}
1117            end;
1118        _Member ->
1119            Stream0
1120    end;
1121update_stream0(#{system_time := _Ts},
1122               {mnesia_updated, _StreamId, #{epoch := E}},
1123               Stream0) ->
1124    %% reset mnesia state
1125    case Stream0 of
1126        undefined ->
1127            undefined;
1128        _ ->
1129            Stream0#stream{mnesia = {updated, E}}
1130    end;
1131update_stream0(#{system_time := _Ts},
1132               {retention_updated, _StreamId, #{node := Node}},
1133               #stream{members = Members0,
1134                       conf = Conf} = Stream0) ->
1135    Members = maps:update_with(Node, fun (M) ->
1136                                             M#member{current = undefined,
1137                                                      conf = Conf}
1138                                     end, Members0),
1139    Stream0#stream{members = Members};
1140update_stream0(#{system_time := _Ts},
1141               {action_failed, _StreamId, #{action := updating_mnesia}},
1142               #stream{mnesia = {_, E}} = Stream0) ->
1143    Stream0#stream{mnesia = {updated, E}};
1144update_stream0(#{system_time := _Ts},
1145               {action_failed, _StreamId,
1146                #{node := Node,
1147                  index := Idx,
1148                  action := Action,
1149                  epoch := _Epoch}}, #stream{members = Members0} = Stream0) ->
1150    Members1 = maps:update_with(Node,
1151                                fun (#member{current = {C, I}} = M)
1152                                      when C == Action andalso I == Idx ->
1153                                        M#member{current = undefined};
1154                                    (M) ->
1155                                        M
1156                                end, Members0),
1157    case Members0 of
1158        #{Node := #member{role = {writer, E},
1159                          state = {ready, E},
1160                          current = {starting, Idx}}}
1161          when Action == starting ->
1162            %% the leader failed to start = we need a new election
1163            %% stop all members
1164            Members = set_running_to_stopped(Members1),
1165            Stream0#stream{members = Members};
1166        _ ->
1167            Stream0#stream{members = Members1}
1168    end;
1169update_stream0(#{system_time := _Ts},
1170               {down, Pid, Reason},
1171               #stream{epoch = E,
1172                       members = Members0} = Stream0) ->
1173    DownNode = node(Pid),
1174    case Members0 of
1175        #{DownNode := #member{role = {writer, E},
1176                              state = {running, E, Pid}} = Member} ->
1177            Members1 = Members0#{DownNode => Member#member{state = {down, E}}},
1178            %% leader is down, set all members that should be running to stopped
1179            Members = maps:map(fun (_, #member{target = running} = M) ->
1180                                       M#member{target = stopped};
1181                                   (_, M) ->
1182                                       M
1183                               end, Members1),
1184            Stream0#stream{members = Members};
1185        #{DownNode := #member{role = {replica, _},
1186                              state = {running, _, Pid}} = Member}
1187          when Reason == noconnection ->
1188            %% mark process as disconnected such that we don't set it to down until
1189            %% the node is back and we can re-monitor
1190            Members = Members0#{DownNode =>
1191                                Member#member{state = {disconnected, E, Pid}}},
1192            Stream0#stream{members = Members};
1193        #{DownNode := #member{role = {replica, _},
1194                              state = {S, _, Pid}} = Member}
1195          when S == running orelse S == disconnected ->
1196            %% the down process is currently running with the correct Pid
1197            %% set state to down
1198            Members = Members0#{DownNode => Member#member{state = {down, E}}},
1199            Stream0#stream{members = Members};
1200        _ ->
1201            Stream0
1202    end;
1203update_stream0(#{system_time := _Ts},
1204               {down, _Pid, _Reason}, undefined) ->
1205    undefined;
1206update_stream0(#{system_time := _Ts} = _Meta,
1207               {nodeup, Node},
1208               #stream{members = Members0} = Stream0) ->
1209    Members = maps:map(
1210                fun (_, #member{node = N,
1211                                current = {sleeping, nodeup}} = M)
1212                      when N == Node ->
1213                        M#member{current = undefined};
1214                    (_, M) ->
1215                        M
1216                end, Members0),
1217    Stream0#stream{members = Members};
1218update_stream0(#{system_time := _Ts},
1219               {policy_changed, _StreamId, #{queue := Q}},
1220               #stream{conf = Conf0,
1221                       members = _Members0} = Stream0) ->
1222    Conf = rabbit_stream_queue:update_stream_conf(Q, Conf0),
1223    Stream0#stream{conf = Conf};
1224update_stream0(_Meta, _Cmd, undefined) ->
1225    undefined.
1226
1227eval_listeners(#stream{listeners = Listeners0,
1228                       queue_ref = QRef,
1229                       members = Members} = Stream, Effects0) ->
1230    case find_leader(Members) of
1231        {#member{state = {running, _, LeaderPid}}, _} ->
1232            %% a leader is running, check all listeners to see if any of them
1233            %% has not been notified of the current leader pid
1234            {Listeners, Effects} =
1235                maps:fold(
1236                  fun(_, P, Acc) when P == LeaderPid ->
1237                          Acc;
1238                     (LPid, _, {L, Acc}) ->
1239                          {L#{LPid => LeaderPid},
1240                           [{send_msg, LPid,
1241                             {queue_event, QRef,
1242                              {stream_leader_change, LeaderPid}},
1243                             cast} | Acc]}
1244                  end, {Listeners0, Effects0}, Listeners0),
1245            {Stream#stream{listeners = Listeners}, Effects};
1246        _ ->
1247            {Stream, Effects0}
1248    end.
1249
1250eval_retention(#{index := Idx} = Meta,
1251               #stream{conf = #{retention := Ret} = Conf,
1252                       id = StreamId,
1253                       epoch = Epoch,
1254                       members = Members} = Stream, Effects0) ->
1255    NeedUpdate = maps:filter(
1256                   fun (_, #member{state = {running, _, _},
1257                                   current = undefined,
1258                                   conf = C}) ->
1259                           Ret =/= maps:get(retention, C, undefined);
1260                       (_, _) ->
1261                           false
1262                   end, Members),
1263    Args = Meta#{epoch => Epoch},
1264    Effs = [{aux, {update_retention, StreamId,
1265                   Args#{pid => Pid,
1266                         node => node(Pid),
1267                         retention => Ret}, Conf}}
1268            || #member{state = {running, _, Pid}} <- maps:values(NeedUpdate)],
1269    Updated = maps:map(fun (_, M) -> M#member{current = {updating, Idx}} end,
1270                       NeedUpdate),
1271    {Stream#stream{members = maps:merge(Members, Updated)}, Effs ++ Effects0}.
1272
1273
1274%% this function should be idempotent,
1275%% it should modify the state such that it won't issue duplicate
1276%% actions when called again
1277evaluate_stream(#{index := Idx} = Meta,
1278                #stream{id = StreamId,
1279                        reply_to = From,
1280                        epoch = Epoch,
1281                        mnesia = {MnesiaTag, MnesiaEpoch},
1282                        members = Members0} = Stream0, Effs0) ->
1283     case find_leader(Members0) of
1284         {#member{state = LState,
1285                  node = LeaderNode,
1286                  target = deleted,
1287                  current = undefined} = Writer0, Replicas}
1288           when LState =/= deleted ->
1289             Action = {aux, {delete_member, StreamId, LeaderNode,
1290                             make_writer_conf(Writer0, Stream0)}},
1291             Writer = Writer0#member{current = {deleting, Idx}},
1292             Effs = [Action | Effs0],
1293             Stream = Stream0#stream{reply_to = undefined},
1294             eval_replicas(Meta, Writer, Replicas, Stream, Effs);
1295         {#member{state = {down, Epoch},
1296                  target = stopped,
1297                  node = LeaderNode,
1298                  current = undefined} = Writer0, Replicas} ->
1299             %% leader is down - all replicas need to be stopped
1300             %% and tail infos retrieved
1301             %% some replicas may already be in stopping or ready state
1302             Args = Meta#{epoch => Epoch,
1303                          node => LeaderNode},
1304             Conf = make_writer_conf(Writer0, Stream0),
1305             Action = {aux, {stop, StreamId, Args, Conf}},
1306             Writer = Writer0#member{current = {stopping, Idx}},
1307             eval_replicas(Meta, Writer, Replicas, Stream0, [Action | Effs0]);
1308         {#member{state = {ready, Epoch}, %% writer ready in current epoch
1309                  target = running,
1310                  node = LeaderNode,
1311                  current = undefined} = Writer0, _Replicas} ->
1312             %% ready check has been completed and a new leader has been chosen
1313             %% time to start writer,
1314             %% if leader start fails, revert back to down state for all and re-run
1315             WConf = make_writer_conf(Writer0, Stream0),
1316             Members = Members0#{LeaderNode =>
1317                                 Writer0#member{current = {starting, Idx},
1318                                                conf = WConf}},
1319             Args = Meta#{node => LeaderNode, epoch => Epoch},
1320             Actions = [{aux, {start_writer, StreamId, Args, WConf}} | Effs0],
1321             {Stream0#stream{members = Members}, Actions};
1322         {#member{state = {running, Epoch, LeaderPid},
1323                  target = running} = Writer, Replicas} ->
1324             Effs1 = case From of
1325                         undefined ->
1326                             Effs0;
1327                         _ ->
1328                             %% we need a reply effect here
1329                             wrap_reply(From, {ok, LeaderPid}) ++ Effs0
1330                     end,
1331             Stream1 = Stream0#stream{reply_to = undefined},
1332             case MnesiaTag == updated andalso MnesiaEpoch < Epoch of
1333                 true ->
1334                     Args = Meta#{node => node(LeaderPid), epoch => Epoch},
1335                     Effs = [{aux,
1336                              {update_mnesia, StreamId, Args,
1337                               make_replica_conf(LeaderPid, Stream1)}} | Effs1],
1338                     Stream = Stream1#stream{mnesia = {updating, MnesiaEpoch}},
1339                     eval_replicas(Meta, Writer, Replicas, Stream, Effs);
1340                 false ->
1341                     eval_replicas(Meta, Writer, Replicas, Stream1, Effs1)
1342             end;
1343         {#member{state = S,
1344                  target = stopped,
1345                  node = LeaderNode,
1346                  current = undefined} = Writer0, Replicas}
1347           when element(1, S) =/= stopped ->
1348             %% leader should be stopped
1349             Args = Meta#{node => LeaderNode, epoch => Epoch},
1350             Action = {aux, {stop, StreamId, Args,
1351                             make_writer_conf(Writer0, Stream0)}},
1352             Writer = Writer0#member{current = {stopping, Idx}},
1353             eval_replicas(Meta, Writer, Replicas, Stream0, [Action | Effs0]);
1354         {Writer, Replicas} ->
1355             eval_replicas(Meta, Writer, Replicas, Stream0, Effs0)
1356     end.
1357
1358eval_replicas(Meta, undefined, Replicas, Stream, Actions0) ->
1359    {Members, Actions} = lists:foldl(
1360                           fun (R, Acc) ->
1361                                   eval_replica(Meta, R, deleted, Stream, Acc)
1362                           end, {#{}, Actions0},
1363                           Replicas),
1364    {Stream#stream{members = Members}, Actions};
1365eval_replicas(Meta, #member{state = LeaderState,
1366                            node = WriterNode} = Writer, Replicas,
1367              Stream, Actions0) ->
1368    {Members, Actions} = lists:foldl(
1369                           fun (R, Acc) ->
1370                                   eval_replica(Meta, R, LeaderState,
1371                                                Stream, Acc)
1372                           end, {#{WriterNode => Writer}, Actions0},
1373                           Replicas),
1374    {Stream#stream{members = Members}, Actions}.
1375
1376eval_replica(#{index := Idx} = Meta,
1377             #member{state = _State,
1378                     target = stopped,
1379                     node = Node,
1380                     current = undefined} = Replica,
1381             _LeaderState,
1382             #stream{id = StreamId,
1383                     epoch = Epoch,
1384                     conf = Conf0},
1385             {Replicas, Actions}) ->
1386    %% if we're not running anything and we aren't stopped and not caught
1387    %% by previous clauses we probably should stop
1388    Args = Meta#{node => Node, epoch => Epoch},
1389
1390    Conf = Conf0#{epoch => Epoch},
1391    {Replicas#{Node => Replica#member{current = {stopping, Idx}}},
1392     [{aux, {stop, StreamId, Args, Conf}} | Actions]};
1393eval_replica(#{index := Idx} = Meta, #member{state = _,
1394                                             node = Node,
1395                                             current = Current,
1396                                             target = deleted} = Replica,
1397             _LeaderState, #stream{id = StreamId,
1398                                   epoch = Epoch,
1399                                   conf = Conf}, {Replicas, Actions0}) ->
1400
1401    case Current of
1402        undefined ->
1403            Args = Meta#{epoch => Epoch, node => Node},
1404            Actions = [{aux, {delete_member, StreamId, Args, Conf}} |
1405                       Actions0],
1406            {Replicas#{Node => Replica#member{current = {deleting, Idx}}},
1407             Actions};
1408        _ ->
1409            {Replicas#{Node => Replica}, Actions0}
1410    end;
1411eval_replica(#{index := Idx} = Meta, #member{state = {State, Epoch},
1412                                             node = Node,
1413                                             target = running,
1414                                             current = undefined} = Replica,
1415             {running, Epoch, Pid},
1416             #stream{id = StreamId,
1417                     epoch = Epoch} = Stream,
1418             {Replicas, Actions})
1419  when State == ready; State == down ->
1420    %% replica is down or ready and the leader is running
1421    %% time to start it
1422    Conf = make_replica_conf(Pid, Stream),
1423    Args = Meta#{node => Node, epoch => Epoch},
1424    {Replicas#{Node => Replica#member{current = {starting, Idx},
1425                                      conf = Conf}},
1426     [{aux, {start_replica, StreamId, Args, Conf}} | Actions]};
1427eval_replica(_Meta, #member{state = {running, Epoch, _},
1428                            target = running,
1429                            node = Node} = Replica,
1430             {running, Epoch, _}, _Stream, {Replicas, Actions}) ->
1431    {Replicas#{Node => Replica}, Actions};
1432eval_replica(_Meta, #member{state = {stopped, _E, _},
1433                            node = Node,
1434                            current = undefined} = Replica,
1435             _LeaderState, _Stream,
1436             {Replicas, Actions}) ->
1437    %%  if stopped we should just wait for a quorum to reach stopped and
1438    %%  update_stream will move to ready state
1439    {Replicas#{Node => Replica}, Actions};
1440eval_replica(_Meta, #member{state = {ready, E},
1441                            target = running,
1442                            node = Node,
1443                            current = undefined} = Replica,
1444             {ready, E}, _Stream,
1445             {Replicas, Actions}) ->
1446    %% if we're ready and so is the leader we just wait a swell
1447    {Replicas#{Node => Replica}, Actions};
1448eval_replica(_Meta, #member{node = Node} = Replica, _LeaderState, _Stream,
1449             {Replicas, Actions}) ->
1450    {Replicas#{Node => Replica}, Actions}.
1451
1452fail_active_actions(Streams, Exclude) ->
1453    maps:map(
1454      fun (_,  #stream{id = Id, members = Members})
1455            when not is_map_key(Id, Exclude)  ->
1456              _ = maps:map(fun(_, M) ->
1457                                   fail_action(Id, M)
1458                           end, Members)
1459      end, Streams),
1460
1461    ok.
1462
1463fail_action(_StreamId, #member{current = undefined}) ->
1464    ok;
1465fail_action(StreamId, #member{role = {_, E},
1466                              current = {Action, Idx},
1467                              node = Node}) ->
1468    rabbit_log:debug("~s: failing active action for ~s node ~w Action ~w",
1469                     [?MODULE, StreamId, Node, Action]),
1470    %% if we have an action send failure message
1471    send_self_command({action_failed, StreamId,
1472                       #{action => Action,
1473                         index => Idx,
1474                         node => Node,
1475                         epoch => E}}).
1476
1477ensure_monitors(#stream{id = StreamId,
1478                        members = Members}, Monitors, Effects) ->
1479    maps:fold(
1480      fun
1481          (_, #member{state = {running, _, Pid}}, {M, E})
1482            when not is_map_key(Pid, M) ->
1483              {M#{Pid => {StreamId, member}},
1484               [{monitor, process, Pid},
1485                %% ensure we're always monitoring the node as well
1486                {monitor, node, node(Pid)} | E]};
1487          (_, _, Acc) ->
1488              Acc
1489      end, {Monitors, Effects}, Members).
1490
1491make_replica_conf(LeaderPid,
1492                  #stream{epoch = Epoch,
1493                          nodes = Nodes,
1494                          conf = Conf}) ->
1495    LeaderNode = node(LeaderPid),
1496    Conf#{leader_node => LeaderNode,
1497          nodes => Nodes,
1498          leader_pid => LeaderPid,
1499          replica_nodes => lists:delete(LeaderNode, Nodes),
1500          epoch => Epoch}.
1501
1502make_writer_conf(#member{node = Node}, #stream{epoch = Epoch,
1503                                               nodes = Nodes,
1504                                               conf = Conf}) ->
1505    Conf#{leader_node => Node,
1506          nodes => Nodes,
1507          replica_nodes => lists:delete(Node, Nodes),
1508          epoch => Epoch}.
1509
1510
1511find_leader(Members) ->
1512    case lists:partition(
1513           fun (#member{target = deleted}) ->
1514                   false;
1515               (#member{role = {Role, _}}) ->
1516                   Role == writer
1517           end, maps:values(Members)) of
1518        {[Writer], Replicas} ->
1519            {Writer, Replicas};
1520        {[], Replicas} ->
1521            {undefined, Replicas}
1522    end.
1523
1524select_leader(Offsets) ->
1525    [{Node, _} | _] = lists:sort(fun({_, {Ao, E}}, {_, {Bo, E}}) ->
1526                                         Ao >= Bo;
1527                                    ({_, {_, Ae}}, {_, {_, Be}}) ->
1528                                         Ae >= Be;
1529                                    ({_, empty}, _) ->
1530                                         false;
1531                                    (_, {_, empty}) ->
1532                                         true
1533                                 end, Offsets),
1534    Node.
1535
1536maybe_sleep({{nodedown, _}, _}) ->
1537    timer:sleep(10000);
1538maybe_sleep({noproc, _}) ->
1539    timer:sleep(5000);
1540maybe_sleep({error, nodedown}) ->
1541    timer:sleep(5000);
1542maybe_sleep({error, _}) ->
1543    timer:sleep(5000);
1544maybe_sleep(_) ->
1545    ok.
1546
1547set_running_to_stopped(Members) ->
1548    maps:map(fun (_, #member{target = running} = M) ->
1549                     M#member{target = stopped};
1550                 (_, M) ->
1551                     M
1552             end, Members).
1553
1554update_target(#member{target = deleted} = Member, _) ->
1555    %% A deleted member can never transition to another state
1556    Member;
1557update_target(Member, Target) ->
1558    Member#member{target = Target}.
1559