1 2-define(STREAM_COORDINATOR_STARTUP, {stream_coordinator_startup, self()}). 3-define(TICK_TIMEOUT, 30000). 4-define(RESTART_TIMEOUT, 1000). 5-define(PHASE_RETRY_TIMEOUT, 10000). 6-define(CMD_TIMEOUT, 30000). 7-define(RA_SYSTEM, coordination). 8 9-type stream_id() :: string(). 10-type stream() :: #{conf := osiris:config(), 11 atom() => term()}. 12-type monitor_role() :: member | listener. 13-type queue_ref() :: rabbit_types:r(queue). 14-type tail() :: {osiris:offset(), osiris:epoch()} | empty. 15 16-record(member, 17 {state = {down, 0} :: {down, osiris:epoch()} 18 | {stopped, osiris:epoch(), tail()} 19 | {ready, osiris:epoch()} 20 %% when a replica disconnects 21 | {running | disconnected, osiris:epoch(), pid()} 22 | deleted, 23 role :: {writer | replica, osiris:epoch()}, 24 node :: node(), 25 %% the currently running action, if any 26 current :: undefined | 27 {updating | 28 stopping | 29 starting | 30 deleting, ra:index()} | 31 {sleeping, nodeup | non_neg_integer()}, 32 %% record the "current" config used 33 conf :: undefined | osiris:config(), 34 target = running :: running | stopped | deleted}). 35 36%% member lifecycle 37%% down -> stopped(tail) -> running | disconnected -> deleted 38%% 39%% split the handling of incoming events (down, success | fail of operations) 40%% and the actioning of current state (i.e. member A is down but the cluster target 41%% is `up` - start a current action to turn member A -> running 42 43-type from() :: {pid(), reference()}. 44 45-record(stream, {id :: stream_id(), 46 epoch = 0 :: osiris:epoch(), 47 queue_ref :: queue_ref(), 48 conf :: osiris:config(), 49 nodes :: [node()], 50 members = #{} :: #{node() := #member{}}, 51 listeners = #{} :: #{pid() := LeaderPid :: pid()}, 52 reply_to :: undefined | from(), 53 mnesia = {updated, 0} :: {updated | updating, osiris:epoch()}, 54 target = running :: running | deleted 55 }). 56 57-record(?MODULE, {streams = #{} :: #{stream_id() => #stream{}}, 58 monitors = #{} :: #{pid() => {stream_id(), monitor_role()}}, 59 listeners = #{} :: #{stream_id() => 60 #{pid() := queue_ref()}}, 61 %% future extensibility 62 reserved_1, 63 reserved_2}). 64