1-module(eetcd_election).
2-include("eetcd.hrl").
3
4-export([new/1, with_timeout/2, with_name/2, with_lease/2, with_leader/2]).
5-export([campaign/4, proclaim/3, leader/2, resign/2]).
6-export([campaign/1, proclaim/1, leader/1, resign/1]).
7-export([campaign_request/4, campaign_response/2]).
8-export([observe/3, observe_stream/2]).
9
10-export_type([campaign_ctx/0, observe_ctx/0]).
11-type observe_ctx() :: #{leader => map(), http2_pid => pid(), monitor_ref => reference(), stream_ref => reference()}.
12-type campaign_ctx() :: #{campaign => map()|'waiting_campaign_response', http2_pid => pid(), monitor_ref => reference(), stream_ref => reference()}.
13
14%%% @doc Creates a blank context for a request.
15-spec new(atom()|reference()) -> context().
16new(Ctx) -> eetcd:new(Ctx).
17
18%% @doc Timeout is an integer greater than zero which specifies how many milliseconds to wait for a reply,
19%% or the atom infinity to wait indefinitely. Default value is 5000.
20%% If no reply is received within the specified time, the function call fails with `{error, timeout}'.
21-spec with_timeout(context(), pos_integer()|infinity) -> context().
22with_timeout(Ctx, Timeout) -> eetcd:with_timeout(Ctx, Timeout).
23
24%%% @doc name is the election's identifier for the campaign.
25-spec with_name(context(), Name :: binary()) -> context().
26with_name(Ctx, Name) ->
27    maps:put(name, Name, Ctx).
28
29%%% @doc lease is the ID of the lease attached to leadership of the election. If the
30%%  lease expires or is revoked before resigning leadership, then the
31%%  leadership is transferred to the next campaigner, if any.
32-spec with_lease(context(), LeaseID :: pos_integer()) -> context().
33with_lease(Ctx, LeaseID) ->
34    maps:put(lease, LeaseID, Ctx).
35
36%%% @doc value is the value set when the campaigner wins the election.
37-spec with_value(context(), Value :: binary()) -> context().
38with_value(Ctx, Value) ->
39    maps:put(value, Value, Ctx).
40
41%%% @doc  leader describes the resources used for holding leadership of the election.
42%%%  It's a map return from CampaignResponse
43%%% name is the election identifier that corresponds to the leadership key.
44%%% key is an opaque key representing the ownership of the election. If the key is deleted, then leadership is lost.
45%%% rev is the creation revision of the key. It can be used to test for ownership of an election during transactions by testing the key's creation revision matches rev.
46%%% lease is the lease ID of the election leader.
47-spec with_leader(context(), Leader :: binary()) -> context().
48with_leader(Ctx, Leader) ->
49    maps:put(leader, Leader, Ctx).
50
51%%% @doc
52%%% Campaign waits to acquire leadership in an election, returning a LeaderKey
53%%% representing the leadership if successful. The LeaderKey can then be used
54%%% to issue new values on the election, transactionally guard API requests on
55%%% leadership still being held, and resign from the election.
56%%% <dl>
57%%% <dt> 1. base </dt>
58%%% <dd> `eetcd_election:campaign(ConnName, Name, LeaseId, Value).' </dd>
59%%% <dt> 2. elixir </dt>
60%%% <dd>
61%%% ```
62%%% :eetcd_election.new(connName)
63%%% |> :eetcd_election.with_timeout(3000)
64%%% |> :eetcd_election.with_name(name)
65%%% |> :eetcd_election.with_lease(leaseId)
66%%% |> :eetcd_election.with_value(Value)
67%%% |> :eetcd_kv.campaign()
68%%% '''
69%%% </dd> </dl>
70%%% {@link eetcd_election:with_name/2}, {@link eetcd_election:with_lease/2},
71%%% {@link eetcd_election:with_value/2}, {@link eetcd_election:with_timeout/2}
72%%% @end
73-spec campaign(Ctx :: context()) -> {ok, router_pb:'Etcd.CampaignResponse'()} | {error, eetcd_error()}.
74campaign(Ctx) ->
75    eetcd_election_gen:campaign(Ctx).
76
77-spec campaign(Ctx :: context()|name(), Name :: binary(), LeaseId :: integer(), Value :: binary()) ->
78    {ok, router_pb:'Etcd.CampaignResponse'()} | {error, eetcd_error()}.
79campaign(Ctx, Name, LeaseId, Value) ->
80    Ctx1 = new(Ctx),
81    Ctx2 = with_name(Ctx1, Name),
82    Ctx3 = with_lease(Ctx2, LeaseId),
83    Ctx4 = with_value(Ctx3, Value),
84    eetcd_election_gen:campaign(Ctx4).
85
86%%% @doc campaign async to acquire leadership.
87%%% if there is already a leader, campaign/4 will be held(block) forever until timeout.
88%%% the campaign_request/4 will return immediately,
89%%% then your can use campaign_response/2 to handle `Etcd.CampaignResponse`.
90%%% gen_server example
91%%% ```
92%%% init(Arg) ->
93%%%   ...
94%%%   {ok, CCtx} = eetcd_election:campaign_request(connName, Name, LeaderId, Value),
95%%%   ...
96%%% handle_info(Msg, State=#{ctx := CCtx}) ->
97%%%   case eetcd_election:campaign_response(CCtx, Msg) of
98%%%          unknown -> do_handle_your_msg(Msg, State);
99%%%         {ok, #{campaign := Leader}} -> campaign_win(Leader);
100%%%         {error, Reason} -> campaign_error(Reason)
101%%%   end.
102%%% '''
103-spec campaign_request(name(), Name :: binary(), LeaseId :: integer(), Value :: binary()) ->
104    {ok, campaign_ctx()} | {error, eetcd_error()}.
105campaign_request(ConnName, Name, LeaseId, Value) ->
106    Request0 = with_name(#{}, Name),
107    Request1 = with_lease(Request0, LeaseId),
108    Request = with_value(Request1, Value),
109    case eetcd_stream:new(ConnName, <<"/v3electionpb.Election/Campaign">>) of
110        {ok, Gun, StreamRef} ->
111            MRef = erlang:monitor(process, Gun),
112            eetcd_stream:data(Gun, StreamRef, Request, 'Etcd.CampaignRequest', fin),
113            {ok,
114                #{
115                    http2_pid => Gun,
116                    monitor_ref => MRef,
117                    stream_ref => StreamRef,
118                    campaign => waiting_campaign_response
119                }
120            };
121        Err -> Err
122    end.
123
124-spec campaign_response(campaign_ctx(), term()) ->
125    unknown|{ok, campaign_ctx()} | {error, eetcd_error()}.
126%%% @doc handle campaign async response `Etcd.CampaignResponse'.
127campaign_response(CCtx, Msg) ->
128    case resp_stream(CCtx, Msg) of
129        {ok, Bin} ->
130            #{monitor_ref := MRef} = CCtx,
131            erlang:demonitor(MRef, [flush]),
132            {ok, #{leader := Leader}, <<>>}
133                = eetcd_grpc:decode(identity, Bin, 'Etcd.CampaignResponse'),
134            {ok, #{
135                campaign => Leader,
136                http2_pid => undefined,
137                monitor_ref => undefined,
138                stream_ref => undefined
139            }};
140        Other -> Other
141    end.
142
143%%% @doc
144%%% Proclaim updates the leader's posted value with a new value.
145%%% Leader is the leadership hold on the election.
146%%% Value is an update meant to overwrite the leader's current value.
147%%% <dl>
148%%% <dt> 1. base </dt>
149%%% <dd> `eetcd_election:proclaim(ConnName, Leader, Value).' </dd>
150%%% <dt> 2. elixir </dt>
151%%% <dd>
152%%% ```
153%%% :eetcd_election.new(connName)
154%%% |> :eetcd_election.with_leader(name)
155%%% |> :eetcd_election.with_value(Value)
156%%% |> :eetcd_kv.proclaim()
157%%% '''
158%%% </dd> </dl>
159%%% {@link eetcd_election:with_leader/2}, {@link eetcd_election:with_value/2}
160%%% @end
161-spec proclaim(Ctx :: context()) ->
162    {ok, router_pb:'Etcd.ProclaimResponse'()} | {error, eetcd_error()}.
163proclaim(Ctx) ->
164    eetcd_election_gen:proclaim(Ctx).
165
166-spec proclaim(Ctx :: context()|name(), Leader :: map(), Value :: binary()) ->
167    {ok, router_pb:'Etcd.ProclaimResponse'()} | {error, eetcd_error()}.
168proclaim(Ctx, Leader, Val) ->
169    Ctx1 = new(Ctx),
170    Ctx2 = with_leader(Ctx1, Leader),
171    Ctx3 = with_value(Ctx2, Val),
172    eetcd_election_gen:proclaim(Ctx3).
173
174%%% @doc
175%%% Resign releases election leadership so other campaigners may acquire
176%%  leadership on the election.
177%%% <dl>
178%%% <dt> 1. base </dt>
179%%% <dd> `eetcd_election:resign(ConnName, Leader).' </dd>
180%%% <dt> 2. elixir </dt>
181%%% <dd>
182%%% ```
183%%% :eetcd_election.new(connName)
184%%% |> :eetcd_election.with_leader(Leader)
185%%% |> :eetcd_kv.resign()
186%%% '''
187%%% </dd> </dl>
188%%% {@link eetcd_election:with_leader/2}
189%%% @end
190-spec resign(Ctx :: context()) ->
191    {ok, router_pb:'Etcd.ResignResponse'()} | {error, eetcd_error()}.
192resign(Ctx) ->
193    eetcd_election_gen:resign(Ctx).
194
195-spec resign(Ctx :: context()|name(), Leader :: binary()) ->
196    {ok, router_pb:'Etcd.ResignResponse'()} | {error, eetcd_error()}.
197resign(Ctx, Leader) ->
198    Ctx1 = new(Ctx),
199    Ctx2 = with_leader(Ctx1, Leader),
200    eetcd_election_gen:resign(Ctx2).
201
202%%% @doc
203%%% Leader returns the current election proclamation, if any.
204%%% <dl>
205%%% <dt> 1. base </dt>
206%%% <dd> `eetcd_election:leader(ConnName, Name).' </dd>
207%%% <dt> 2. elixir </dt>
208%%% <dd>
209%%% ```
210%%% :eetcd_election.new(connName)
211%%% |> :eetcd_election.with_name(name)
212%%% |> :eetcd_kv.leader()
213%%% '''
214%%% </dd> </dl>
215%%% {@link eetcd_election:with_name/2}
216%%% @end
217-spec leader(Ctx :: context()) ->
218    {ok, router_pb:'Etcd.LeaderResponse'()} | {error, eetcd_error()}.
219leader(Ctx) ->
220    eetcd_election_gen:leader(Ctx).
221
222-spec leader(Ctx :: context()|name(), Name :: binary()) ->
223    {ok, router_pb:'Etcd.LeaderResponse'()} | {error, eetcd_error()}.
224leader(Ctx, Name) ->
225    Ctx1 = new(Ctx),
226    Ctx2 = with_name(Ctx1, Name),
227    eetcd_election_gen:leader(Ctx2).
228
229%%% @doc Observe streams election proclamations in-order as made by the election's elected leaders.
230%%% Timeout is an integer greater than zero which specifies how many milliseconds to wait for a leaders,
231%%% or the atom infinity to wait indefinitely. If no leader is received within the specified time,
232%%% the function call return 'election_no_leader'. and will streams election proclamations by order messages.
233-spec observe(name(), binary(), timeout()) -> {ok, observe_ctx()}|{error, eetcd_error()}.
234observe(ConnName, Name, Timeout) ->
235    Request = #{name => Name},
236    {ok, Gun, StreamRef} = eetcd_election_gen:observe(ConnName),
237    MRef = erlang:monitor(process, Gun),
238    eetcd_stream:data(Gun, StreamRef, Request, 'Etcd.LeaderRequest', fin),
239    case eetcd_stream:await(Gun, StreamRef, Timeout, MRef) of
240        {response, nofin, 200, _Headers} ->
241            case eetcd_stream:await(Gun, StreamRef, Timeout, MRef) of
242                {data, nofin, Body} ->
243                    {ok, #{kv := KV}, <<>>}
244                        = eetcd_grpc:decode(identity, Body, 'Etcd.LeaderResponse'),
245                    {ok,
246                        #{
247                            http2_pid => Gun,
248                            monitor_ref => MRef,
249                            stream_ref => StreamRef,
250                            leader => KV
251                        }
252                    };
253                {error, _} = Err1 ->
254                    erlang:demonitor(MRef, [flush]),
255                    Err1
256            end;
257        {response, fin, 200, RespHeaders} ->
258            erlang:demonitor(MRef, [flush]),
259            {error, eetcd_grpc:grpc_status(RespHeaders)};
260        {error, timeout} ->
261            {ok,
262                #{
263                    http2_pid => Gun,
264                    monitor_ref => MRef,
265                    stream_ref => StreamRef,
266                    leader => 'election_no_leader'
267                }
268            };
269        {error, _} = Err2 ->
270            erlang:demonitor(MRef, [flush]),
271            Err2
272    end.
273
274%%% @doc handle observe stream `Etcd.LeaderResponse'.
275-spec observe_stream(observe_ctx(), term()) ->
276    unknown|{ok, observe_ctx()} | {error, eetcd_error()}.
277observe_stream(OCtx, Msg) ->
278    case resp_stream(OCtx, Msg) of
279        {ok, Bin} ->
280            {ok, #{kv := KV}, <<>>} = eetcd_grpc:decode(identity, Bin, 'Etcd.LeaderResponse'),
281            {ok, OCtx#{leader => KV}};
282        Other -> Other
283    end.
284
285resp_stream(#{stream_ref := Ref, http2_pid := Pid},
286    {gun_response, Pid, Ref, nofin, 200, _Headers}) ->
287    receive {gun_data, Pid, Ref, nofin, Bin} ->
288        {ok, Bin}
289    after 2000 -> unknown
290    end;
291resp_stream(#{stream_ref := Ref, http2_pid := Pid},
292    {gun_data, Pid, Ref, nofin, Bin}) ->
293    {ok, Bin};
294resp_stream(#{stream_ref := SRef, http2_pid := Pid, monitor_ref := MRef},
295    {gun_error, Pid, SRef, Reason}) -> %% stream error
296    erlang:demonitor(MRef, [flush]),
297    gun:cancel(Pid, SRef),
298    {error, {stream_error, Reason}};
299resp_stream(#{http2_pid := Pid, stream_ref := SRef, monitor_ref := MRef},
300    {gun_error, Pid, Reason}) -> %% gun connection process state error
301    erlang:demonitor(MRef, [flush]),
302    gun:cancel(Pid, SRef),
303    {error, {conn_error, Reason}};
304resp_stream(#{http2_pid := Pid, monitor_ref := MRef},
305    {'DOWN', MRef, process, Pid, Reason}) -> %% gun connection down
306    erlang:demonitor(MRef, [flush]),
307    {error, {http2_down, Reason}};
308resp_stream(_OCtx, _UnKnow) -> unknown.
309