1-module(eetcd_election). 2-include("eetcd.hrl"). 3 4-export([new/1, with_timeout/2, with_name/2, with_lease/2, with_leader/2]). 5-export([campaign/4, proclaim/3, leader/2, resign/2]). 6-export([campaign/1, proclaim/1, leader/1, resign/1]). 7-export([campaign_request/4, campaign_response/2]). 8-export([observe/3, observe_stream/2]). 9 10-export_type([campaign_ctx/0, observe_ctx/0]). 11-type observe_ctx() :: #{leader => map(), http2_pid => pid(), monitor_ref => reference(), stream_ref => reference()}. 12-type campaign_ctx() :: #{campaign => map()|'waiting_campaign_response', http2_pid => pid(), monitor_ref => reference(), stream_ref => reference()}. 13 14%%% @doc Creates a blank context for a request. 15-spec new(atom()|reference()) -> context(). 16new(Ctx) -> eetcd:new(Ctx). 17 18%% @doc Timeout is an integer greater than zero which specifies how many milliseconds to wait for a reply, 19%% or the atom infinity to wait indefinitely. Default value is 5000. 20%% If no reply is received within the specified time, the function call fails with `{error, timeout}'. 21-spec with_timeout(context(), pos_integer()|infinity) -> context(). 22with_timeout(Ctx, Timeout) -> eetcd:with_timeout(Ctx, Timeout). 23 24%%% @doc name is the election's identifier for the campaign. 25-spec with_name(context(), Name :: binary()) -> context(). 26with_name(Ctx, Name) -> 27 maps:put(name, Name, Ctx). 28 29%%% @doc lease is the ID of the lease attached to leadership of the election. If the 30%% lease expires or is revoked before resigning leadership, then the 31%% leadership is transferred to the next campaigner, if any. 32-spec with_lease(context(), LeaseID :: pos_integer()) -> context(). 33with_lease(Ctx, LeaseID) -> 34 maps:put(lease, LeaseID, Ctx). 35 36%%% @doc value is the value set when the campaigner wins the election. 37-spec with_value(context(), Value :: binary()) -> context(). 38with_value(Ctx, Value) -> 39 maps:put(value, Value, Ctx). 40 41%%% @doc leader describes the resources used for holding leadership of the election. 42%%% It's a map return from CampaignResponse 43%%% name is the election identifier that corresponds to the leadership key. 44%%% key is an opaque key representing the ownership of the election. If the key is deleted, then leadership is lost. 45%%% rev is the creation revision of the key. It can be used to test for ownership of an election during transactions by testing the key's creation revision matches rev. 46%%% lease is the lease ID of the election leader. 47-spec with_leader(context(), Leader :: binary()) -> context(). 48with_leader(Ctx, Leader) -> 49 maps:put(leader, Leader, Ctx). 50 51%%% @doc 52%%% Campaign waits to acquire leadership in an election, returning a LeaderKey 53%%% representing the leadership if successful. The LeaderKey can then be used 54%%% to issue new values on the election, transactionally guard API requests on 55%%% leadership still being held, and resign from the election. 56%%% <dl> 57%%% <dt> 1. base </dt> 58%%% <dd> `eetcd_election:campaign(ConnName, Name, LeaseId, Value).' </dd> 59%%% <dt> 2. elixir </dt> 60%%% <dd> 61%%% ``` 62%%% :eetcd_election.new(connName) 63%%% |> :eetcd_election.with_timeout(3000) 64%%% |> :eetcd_election.with_name(name) 65%%% |> :eetcd_election.with_lease(leaseId) 66%%% |> :eetcd_election.with_value(Value) 67%%% |> :eetcd_kv.campaign() 68%%% ''' 69%%% </dd> </dl> 70%%% {@link eetcd_election:with_name/2}, {@link eetcd_election:with_lease/2}, 71%%% {@link eetcd_election:with_value/2}, {@link eetcd_election:with_timeout/2} 72%%% @end 73-spec campaign(Ctx :: context()) -> {ok, router_pb:'Etcd.CampaignResponse'()} | {error, eetcd_error()}. 74campaign(Ctx) -> 75 eetcd_election_gen:campaign(Ctx). 76 77-spec campaign(Ctx :: context()|name(), Name :: binary(), LeaseId :: integer(), Value :: binary()) -> 78 {ok, router_pb:'Etcd.CampaignResponse'()} | {error, eetcd_error()}. 79campaign(Ctx, Name, LeaseId, Value) -> 80 Ctx1 = new(Ctx), 81 Ctx2 = with_name(Ctx1, Name), 82 Ctx3 = with_lease(Ctx2, LeaseId), 83 Ctx4 = with_value(Ctx3, Value), 84 eetcd_election_gen:campaign(Ctx4). 85 86%%% @doc campaign async to acquire leadership. 87%%% if there is already a leader, campaign/4 will be held(block) forever until timeout. 88%%% the campaign_request/4 will return immediately, 89%%% then your can use campaign_response/2 to handle `Etcd.CampaignResponse`. 90%%% gen_server example 91%%% ``` 92%%% init(Arg) -> 93%%% ... 94%%% {ok, CCtx} = eetcd_election:campaign_request(connName, Name, LeaderId, Value), 95%%% ... 96%%% handle_info(Msg, State=#{ctx := CCtx}) -> 97%%% case eetcd_election:campaign_response(CCtx, Msg) of 98%%% unknown -> do_handle_your_msg(Msg, State); 99%%% {ok, #{campaign := Leader}} -> campaign_win(Leader); 100%%% {error, Reason} -> campaign_error(Reason) 101%%% end. 102%%% ''' 103-spec campaign_request(name(), Name :: binary(), LeaseId :: integer(), Value :: binary()) -> 104 {ok, campaign_ctx()} | {error, eetcd_error()}. 105campaign_request(ConnName, Name, LeaseId, Value) -> 106 Request0 = with_name(#{}, Name), 107 Request1 = with_lease(Request0, LeaseId), 108 Request = with_value(Request1, Value), 109 case eetcd_stream:new(ConnName, <<"/v3electionpb.Election/Campaign">>) of 110 {ok, Gun, StreamRef} -> 111 MRef = erlang:monitor(process, Gun), 112 eetcd_stream:data(Gun, StreamRef, Request, 'Etcd.CampaignRequest', fin), 113 {ok, 114 #{ 115 http2_pid => Gun, 116 monitor_ref => MRef, 117 stream_ref => StreamRef, 118 campaign => waiting_campaign_response 119 } 120 }; 121 Err -> Err 122 end. 123 124-spec campaign_response(campaign_ctx(), term()) -> 125 unknown|{ok, campaign_ctx()} | {error, eetcd_error()}. 126%%% @doc handle campaign async response `Etcd.CampaignResponse'. 127campaign_response(CCtx, Msg) -> 128 case resp_stream(CCtx, Msg) of 129 {ok, Bin} -> 130 #{monitor_ref := MRef} = CCtx, 131 erlang:demonitor(MRef, [flush]), 132 {ok, #{leader := Leader}, <<>>} 133 = eetcd_grpc:decode(identity, Bin, 'Etcd.CampaignResponse'), 134 {ok, #{ 135 campaign => Leader, 136 http2_pid => undefined, 137 monitor_ref => undefined, 138 stream_ref => undefined 139 }}; 140 Other -> Other 141 end. 142 143%%% @doc 144%%% Proclaim updates the leader's posted value with a new value. 145%%% Leader is the leadership hold on the election. 146%%% Value is an update meant to overwrite the leader's current value. 147%%% <dl> 148%%% <dt> 1. base </dt> 149%%% <dd> `eetcd_election:proclaim(ConnName, Leader, Value).' </dd> 150%%% <dt> 2. elixir </dt> 151%%% <dd> 152%%% ``` 153%%% :eetcd_election.new(connName) 154%%% |> :eetcd_election.with_leader(name) 155%%% |> :eetcd_election.with_value(Value) 156%%% |> :eetcd_kv.proclaim() 157%%% ''' 158%%% </dd> </dl> 159%%% {@link eetcd_election:with_leader/2}, {@link eetcd_election:with_value/2} 160%%% @end 161-spec proclaim(Ctx :: context()) -> 162 {ok, router_pb:'Etcd.ProclaimResponse'()} | {error, eetcd_error()}. 163proclaim(Ctx) -> 164 eetcd_election_gen:proclaim(Ctx). 165 166-spec proclaim(Ctx :: context()|name(), Leader :: map(), Value :: binary()) -> 167 {ok, router_pb:'Etcd.ProclaimResponse'()} | {error, eetcd_error()}. 168proclaim(Ctx, Leader, Val) -> 169 Ctx1 = new(Ctx), 170 Ctx2 = with_leader(Ctx1, Leader), 171 Ctx3 = with_value(Ctx2, Val), 172 eetcd_election_gen:proclaim(Ctx3). 173 174%%% @doc 175%%% Resign releases election leadership so other campaigners may acquire 176%% leadership on the election. 177%%% <dl> 178%%% <dt> 1. base </dt> 179%%% <dd> `eetcd_election:resign(ConnName, Leader).' </dd> 180%%% <dt> 2. elixir </dt> 181%%% <dd> 182%%% ``` 183%%% :eetcd_election.new(connName) 184%%% |> :eetcd_election.with_leader(Leader) 185%%% |> :eetcd_kv.resign() 186%%% ''' 187%%% </dd> </dl> 188%%% {@link eetcd_election:with_leader/2} 189%%% @end 190-spec resign(Ctx :: context()) -> 191 {ok, router_pb:'Etcd.ResignResponse'()} | {error, eetcd_error()}. 192resign(Ctx) -> 193 eetcd_election_gen:resign(Ctx). 194 195-spec resign(Ctx :: context()|name(), Leader :: binary()) -> 196 {ok, router_pb:'Etcd.ResignResponse'()} | {error, eetcd_error()}. 197resign(Ctx, Leader) -> 198 Ctx1 = new(Ctx), 199 Ctx2 = with_leader(Ctx1, Leader), 200 eetcd_election_gen:resign(Ctx2). 201 202%%% @doc 203%%% Leader returns the current election proclamation, if any. 204%%% <dl> 205%%% <dt> 1. base </dt> 206%%% <dd> `eetcd_election:leader(ConnName, Name).' </dd> 207%%% <dt> 2. elixir </dt> 208%%% <dd> 209%%% ``` 210%%% :eetcd_election.new(connName) 211%%% |> :eetcd_election.with_name(name) 212%%% |> :eetcd_kv.leader() 213%%% ''' 214%%% </dd> </dl> 215%%% {@link eetcd_election:with_name/2} 216%%% @end 217-spec leader(Ctx :: context()) -> 218 {ok, router_pb:'Etcd.LeaderResponse'()} | {error, eetcd_error()}. 219leader(Ctx) -> 220 eetcd_election_gen:leader(Ctx). 221 222-spec leader(Ctx :: context()|name(), Name :: binary()) -> 223 {ok, router_pb:'Etcd.LeaderResponse'()} | {error, eetcd_error()}. 224leader(Ctx, Name) -> 225 Ctx1 = new(Ctx), 226 Ctx2 = with_name(Ctx1, Name), 227 eetcd_election_gen:leader(Ctx2). 228 229%%% @doc Observe streams election proclamations in-order as made by the election's elected leaders. 230%%% Timeout is an integer greater than zero which specifies how many milliseconds to wait for a leaders, 231%%% or the atom infinity to wait indefinitely. If no leader is received within the specified time, 232%%% the function call return 'election_no_leader'. and will streams election proclamations by order messages. 233-spec observe(name(), binary(), timeout()) -> {ok, observe_ctx()}|{error, eetcd_error()}. 234observe(ConnName, Name, Timeout) -> 235 Request = #{name => Name}, 236 {ok, Gun, StreamRef} = eetcd_election_gen:observe(ConnName), 237 MRef = erlang:monitor(process, Gun), 238 eetcd_stream:data(Gun, StreamRef, Request, 'Etcd.LeaderRequest', fin), 239 case eetcd_stream:await(Gun, StreamRef, Timeout, MRef) of 240 {response, nofin, 200, _Headers} -> 241 case eetcd_stream:await(Gun, StreamRef, Timeout, MRef) of 242 {data, nofin, Body} -> 243 {ok, #{kv := KV}, <<>>} 244 = eetcd_grpc:decode(identity, Body, 'Etcd.LeaderResponse'), 245 {ok, 246 #{ 247 http2_pid => Gun, 248 monitor_ref => MRef, 249 stream_ref => StreamRef, 250 leader => KV 251 } 252 }; 253 {error, _} = Err1 -> 254 erlang:demonitor(MRef, [flush]), 255 Err1 256 end; 257 {response, fin, 200, RespHeaders} -> 258 erlang:demonitor(MRef, [flush]), 259 {error, eetcd_grpc:grpc_status(RespHeaders)}; 260 {error, timeout} -> 261 {ok, 262 #{ 263 http2_pid => Gun, 264 monitor_ref => MRef, 265 stream_ref => StreamRef, 266 leader => 'election_no_leader' 267 } 268 }; 269 {error, _} = Err2 -> 270 erlang:demonitor(MRef, [flush]), 271 Err2 272 end. 273 274%%% @doc handle observe stream `Etcd.LeaderResponse'. 275-spec observe_stream(observe_ctx(), term()) -> 276 unknown|{ok, observe_ctx()} | {error, eetcd_error()}. 277observe_stream(OCtx, Msg) -> 278 case resp_stream(OCtx, Msg) of 279 {ok, Bin} -> 280 {ok, #{kv := KV}, <<>>} = eetcd_grpc:decode(identity, Bin, 'Etcd.LeaderResponse'), 281 {ok, OCtx#{leader => KV}}; 282 Other -> Other 283 end. 284 285resp_stream(#{stream_ref := Ref, http2_pid := Pid}, 286 {gun_response, Pid, Ref, nofin, 200, _Headers}) -> 287 receive {gun_data, Pid, Ref, nofin, Bin} -> 288 {ok, Bin} 289 after 2000 -> unknown 290 end; 291resp_stream(#{stream_ref := Ref, http2_pid := Pid}, 292 {gun_data, Pid, Ref, nofin, Bin}) -> 293 {ok, Bin}; 294resp_stream(#{stream_ref := SRef, http2_pid := Pid, monitor_ref := MRef}, 295 {gun_error, Pid, SRef, Reason}) -> %% stream error 296 erlang:demonitor(MRef, [flush]), 297 gun:cancel(Pid, SRef), 298 {error, {stream_error, Reason}}; 299resp_stream(#{http2_pid := Pid, stream_ref := SRef, monitor_ref := MRef}, 300 {gun_error, Pid, Reason}) -> %% gun connection process state error 301 erlang:demonitor(MRef, [flush]), 302 gun:cancel(Pid, SRef), 303 {error, {conn_error, Reason}}; 304resp_stream(#{http2_pid := Pid, monitor_ref := MRef}, 305 {'DOWN', MRef, process, Pid, Reason}) -> %% gun connection down 306 erlang:demonitor(MRef, [flush]), 307 {error, {http2_down, Reason}}; 308resp_stream(_OCtx, _UnKnow) -> unknown. 309