1
2-define(STREAM_COORDINATOR_STARTUP, {stream_coordinator_startup, self()}).
3-define(TICK_TIMEOUT, 30000).
4-define(RESTART_TIMEOUT, 1000).
5-define(PHASE_RETRY_TIMEOUT, 10000).
6-define(CMD_TIMEOUT, 30000).
7-define(RA_SYSTEM, coordination).
8
9-type stream_id() :: string().
10-type stream() :: #{conf := osiris:config(),
11                    atom() => term()}.
12-type monitor_role() :: member | listener.
13-type queue_ref() :: rabbit_types:r(queue).
14-type tail() :: {osiris:offset(), osiris:epoch()} | empty.
15
16-record(member,
17        {state = {down, 0} :: {down, osiris:epoch()}
18                              | {stopped, osiris:epoch(), tail()}
19                              | {ready, osiris:epoch()}
20                              %% when a replica disconnects
21                              | {running | disconnected, osiris:epoch(), pid()}
22                              | deleted,
23         role :: {writer | replica, osiris:epoch()},
24         node :: node(),
25         %% the currently running action, if any
26         current :: undefined |
27                    {updating |
28                     stopping |
29                     starting |
30                     deleting, ra:index()} |
31                    {sleeping, nodeup | non_neg_integer()},
32         %% record the "current" config used
33         conf :: undefined | osiris:config(),
34         target = running :: running | stopped | deleted}).
35
36%% member lifecycle
37%% down -> stopped(tail) -> running | disconnected -> deleted
38%%
39%% split the handling of incoming events (down, success | fail of operations)
40%% and the actioning of current state (i.e. member A is down but the cluster target
41%% is `up` - start a current action to turn member A -> running
42
43-type from() :: {pid(), reference()}.
44
45-record(stream, {id :: stream_id(),
46                 epoch = 0 :: osiris:epoch(),
47                 queue_ref :: queue_ref(),
48                 conf :: osiris:config(),
49                 nodes :: [node()],
50                 members = #{} :: #{node() := #member{}},
51                 listeners = #{} :: #{pid() := LeaderPid :: pid()},
52                 reply_to :: undefined | from(),
53                 mnesia = {updated, 0} :: {updated | updating, osiris:epoch()},
54                 target = running :: running | deleted
55                }).
56
57-record(?MODULE, {streams = #{} :: #{stream_id() => #stream{}},
58                  monitors = #{} :: #{pid() => {stream_id(), monitor_role()}},
59                  listeners = #{} :: #{stream_id() =>
60                                       #{pid() := queue_ref()}},
61                  %% future extensibility
62                  reserved_1,
63                  reserved_2}).
64