1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbitmq_peer_discovery_etcd_v3_client).
9
10%% API
11-export([]).
12
13
14-behaviour(gen_statem).
15
16-export([start_link/1, start/1, stop/0]).
17-export([init/1, callback_mode/0, terminate/3]).
18-export([register/1, register/0, unregister/1, unregister/0, list_nodes/0, list_nodes/1]).
19-export([lock/0, lock/1, lock/2, unlock/0, unlock/1, unlock/2]).
20-export([recover/3, connected/3, disconnected/3]).
21
22%% for tests
23-export([extract_node/1, filter_node/1, registration_value/1, node_key_base/1, node_key/1, lock_key_base/1]).
24
25-import(rabbit_data_coercion, [to_binary/1, to_list/1]).
26
27-compile(nowarn_unused_function).
28
29-include("rabbit_peer_discovery_etcd.hrl").
30
31%%
32%% API
33%%
34
35-define(ETCD_CONN_NAME, ?MODULE).
36%% 60s by default matches the default heartbeat timeout.
37%% We add 1s for state machine bookkeeping and
38-define(DEFAULT_NODE_KEY_LEASE_TTL, 61).
39%% don't allow node lease key TTL to be lower than this
40%% as overly low values can cause annoying timeouts in etcd client operations
41-define(MINIMUM_NODE_KEY_LEASE_TTL, 15).
42%% default randomized delay range was 5s to 60s, so this value
43%% produces a comparable delay
44-define(DEFAULT_LOCK_WAIT_TTL, 70).
45%% don't allow lock lease TTL to be lower than this
46%% as overly low values can cause annoying timeouts in etcd client operations
47-define(MINIMUM_LOCK_WAIT_TTL, 30).
48
49-define(CALL_TIMEOUT, 15000).
50
51start(Conf) ->
52    gen_statem:start({local, ?MODULE}, ?MODULE, Conf, []).
53
54start_link(Conf) ->
55    gen_statem:start_link({local, ?MODULE}, ?MODULE, Conf, []).
56
57stop() ->
58    gen_statem:stop(?MODULE).
59
60init(Args) ->
61    ok = application:ensure_started(eetcd),
62    Settings = normalize_settings(Args),
63    Endpoints = maps:get(endpoints, Settings),
64    Username = maps:get(etcd_username, Settings, undefined),
65    Password = maps:get(etcd_password, Settings, undefined),
66    TLSOpts = maps:get(ssl_options, Settings, []),
67    Actions = [{next_event, internal, start}],
68    {ok, recover, #statem_data{
69        endpoints = Endpoints,
70        tls_options = TLSOpts,
71        username = Username,
72        obfuscated_password = obfuscate(Password),
73        key_prefix = maps:get(etcd_prefix, Settings, <<"rabbitmq">>),
74        node_key_ttl_in_seconds = erlang:max(
75            ?MINIMUM_NODE_KEY_LEASE_TTL,
76            maps:get(etcd_node_ttl, Settings, ?DEFAULT_NODE_KEY_LEASE_TTL)
77        ),
78        cluster_name = maps:get(cluster_name, Settings, <<"default">>),
79        lock_ttl_in_seconds = erlang:max(
80            ?MINIMUM_LOCK_WAIT_TTL,
81            maps:get(lock_wait_time, Settings, ?DEFAULT_LOCK_WAIT_TTL)
82        )
83    }, Actions}.
84
85callback_mode() -> [state_functions, state_enter].
86
87terminate(Reason, State, Data) ->
88    rabbit_log:debug("etcd v3 API client will terminate in state ~p, reason: ~p",
89                     [State, Reason]),
90    disconnect(?ETCD_CONN_NAME, Data),
91    rabbit_log:debug("etcd v3 API client has disconnected"),
92    rabbit_log:debug("etcd v3 API client: total number of connections to etcd is ~p", [length(eetcd_conn_sup:info())]),
93    ok.
94
95register() ->
96    register(?MODULE).
97
98register(ServerRef) ->
99    gen_statem:call(ServerRef, register, ?CALL_TIMEOUT).
100
101unregister() ->
102    ?MODULE:unregister(?MODULE).
103
104unregister(ServerRef) ->
105    gen_statem:call(ServerRef, unregister, ?CALL_TIMEOUT).
106
107list_nodes() ->
108    list_nodes(?MODULE).
109
110list_nodes(ServerRef) ->
111    gen_statem:call(ServerRef, list_keys, ?CALL_TIMEOUT).
112
113lock() ->
114    lock(?MODULE, node()).
115
116lock(Node) ->
117    lock(?MODULE, Node).
118
119lock(ServerRef, Node) ->
120    gen_statem:call(ServerRef, {lock, Node}, ?CALL_TIMEOUT).
121
122unlock() ->
123    unlock(?MODULE, node()).
124
125unlock(LockKey) ->
126    unlock(?MODULE, LockKey).
127
128unlock(ServerRef, LockKey) ->
129    gen_statem:call(ServerRef, {unlock, LockKey}, ?CALL_TIMEOUT).
130
131%%
132%% States
133%%
134
135recover(enter, _PrevState, #statem_data{endpoints = Endpoints}) ->
136    rabbit_log:debug("etcd v3 API client has entered recovery state, endpoints: ~s",
137                     [string:join(Endpoints, ",")]),
138    keep_state_and_data;
139recover(internal, start, Data = #statem_data{endpoints = Endpoints, connection_monitor = Ref}) ->
140    rabbit_log:debug("etcd v3 API client will attempt to connect, endpoints: ~s",
141                     [string:join(Endpoints, ",")]),
142    maybe_demonitor(Ref),
143    {Transport, TransportOpts} = pick_transport(Data),
144    case Transport of
145        tcp -> rabbit_log:info("etcd v3 API client is configured to connect over plain TCP, without using TLS");
146        tls -> rabbit_log:info("etcd v3 API client is configured to use TLS")
147    end,
148    ConnName = ?ETCD_CONN_NAME,
149    case connect(ConnName, Endpoints, Transport, TransportOpts, Data) of
150        {ok, Pid} ->
151            rabbit_log:debug("etcd v3 API client connection: ~p", [Pid]),
152            rabbit_log:debug("etcd v3 API client: total number of connections to etcd is ~p", [length(eetcd_conn_sup:info())]),
153            {next_state, connected, Data#statem_data{
154                connection_name = ConnName,
155                connection_pid = Pid,
156                connection_monitor = monitor(process, Pid)
157            }};
158        {error, Errors} ->
159            [rabbit_log:error("etcd peer discovery: failed to connect to endpoint ~p: ~p", [Endpoint, Err]) || {Endpoint, Err} <- Errors],
160            ensure_disconnected(?ETCD_CONN_NAME, Data),
161            Actions = [{state_timeout, reconnection_interval(), recover}],
162            {keep_state, reset_statem_data(Data), Actions}
163    end;
164recover(state_timeout, _PrevState, Data) ->
165    rabbit_log:debug("etcd peer discovery: connection entered a reconnection delay state"),
166    ensure_disconnected(?ETCD_CONN_NAME, Data),
167    {next_state, recover, reset_statem_data(Data)};
168recover({call, From}, Req, _Data) ->
169    rabbit_log:error("etcd v3 API: client received a call ~p while not connected, will do nothing", [Req]),
170    gen_statem:reply(From, {error, not_connected}),
171    keep_state_and_data.
172
173
174connected(enter, _PrevState, Data) ->
175    rabbit_log:info("etcd peer discovery: successfully connected to etcd"),
176
177    {keep_state, acquire_node_key_lease_grant(Data)};
178connected(info, {'DOWN', ConnRef, process, ConnPid, Reason}, Data = #statem_data{
179                                                               connection_pid = ConnPid,
180                                                               connection_monitor = ConnRef
181                                                             }) ->
182    rabbit_log:debug("etcd peer discovery: connection to etcd ~p is down: ~p", [ConnPid, Reason]),
183    maybe_demonitor(ConnRef),
184    {next_state, recover, reset_statem_data(Data)};
185connected({call, From}, {lock, _Node}, Data = #statem_data{connection_name = Conn, lock_ttl_in_seconds = TTL}) ->
186    case eetcd_lease:grant(eetcd_kv:new(Conn), TTL) of
187        {ok, #{'ID' := LeaseID}} ->
188            Key = lock_key_base(Data),
189            rabbit_log:debug("etcd peer discovery: granted a lease ~p for registration lock ~s with TTL = ~p", [LeaseID, Key, TTL]),
190            case eetcd_lock:lock(lock_context(Conn, Data), Key, LeaseID) of
191                {ok, #{key := GeneratedKey}} ->
192                    rabbit_log:debug("etcd peer discovery: successfully acquired a lock, lock owner key: ~s", [GeneratedKey]),
193                    reply_and_retain_state(From, {ok, GeneratedKey});
194                {error, _} = Error ->
195                    rabbit_log:debug("etcd peer discovery: failed to acquire a lock using key ~s: ~p", [Key, Error]),
196                    reply_and_retain_state(From, Error)
197            end;
198        {error, _} = Error ->
199            rabbit_log:debug("etcd peer discovery: failed to get a lease for registration lock: ~p", [Error]),
200            reply_and_retain_state(From, Error)
201    end;
202connected({call, From}, {unlock, GeneratedKey}, Data = #statem_data{connection_name = Conn}) ->
203    Ctx = unlock_context(Conn, Data),
204    case eetcd_lock:unlock(Ctx, GeneratedKey) of
205        {ok, _} ->
206            rabbit_log:debug("etcd peer discovery: successfully released lock, lock owner key: ~s", [GeneratedKey]),
207            reply_and_retain_state(From, ok);
208        {error, _} = Error ->
209            rabbit_log:debug("etcd peer discovery: failed to release registration lock, lock owner key: ~s, error ~p",
210                             [GeneratedKey, Error]),
211            reply_and_retain_state(From, Error)
212    end;
213connected({call, From}, register, Data = #statem_data{connection_name = Conn}) ->
214    Ctx = registration_context(Conn, Data),
215    Key = node_key(Data),
216    eetcd_kv:put(Ctx, Key, registration_value(Data)),
217    rabbit_log:debug("etcd peer discovery: put key ~p, done with registration", [Key]),
218    gen_statem:reply(From, ok),
219    keep_state_and_data;
220connected({call, From}, unregister, Data = #statem_data{connection_name = Conn}) ->
221    unregister(Conn, Data),
222    gen_statem:reply(From, ok),
223    {keep_state, Data#statem_data{
224        node_key_lease_id = undefined
225    }};
226connected({call, From}, list_keys, Data = #statem_data{connection_name = Conn}) ->
227    Prefix = node_key_base(Data),
228    C1 = eetcd_kv:new(Conn),
229    C2 = eetcd_kv:with_prefix(eetcd_kv:with_key(C1, Prefix)),
230    rabbit_log:debug("etcd peer discovery: will use prefix ~s to query for node keys", [Prefix]),
231    {ok, #{kvs := Result}} = eetcd_kv:get(C2),
232    rabbit_log:debug("etcd peer discovery returned keys: ~p", [Result]),
233    Values = [maps:get(value, M) || M <- Result],
234    case Values of
235        Xs when is_list(Xs) ->
236            rabbit_log:debug("etcd peer discovery: listing node keys returned ~b results", [length(Xs)]),
237            ParsedNodes = lists:map(fun extract_node/1, Xs),
238            {Successes, Failures} = lists:partition(fun filter_node/1, ParsedNodes),
239            JoinedString = lists:join(",", [rabbit_data_coercion:to_list(Node) || Node <- lists:usort(Successes)]),
240            rabbit_log:error("etcd peer discovery: successfully extracted nodes: ~s", [JoinedString]),
241            lists:foreach(fun(Val) ->
242                rabbit_log:error("etcd peer discovery: failed to extract node name from etcd value ~p", [Val])
243            end, Failures),
244            gen_statem:reply(From, lists:usort(Successes)),
245            keep_state_and_data;
246        Other ->
247            rabbit_log:debug("etcd peer discovery: listing node keys returned ~p", [Other]),
248            gen_statem:reply(From, []),
249            keep_state_and_data
250    end.
251
252disconnected(enter, _PrevState, _Data) ->
253    rabbit_log:info("etcd peer discovery: successfully disconnected from etcd"),
254    keep_state_and_data.
255
256
257%%
258%% Implementation
259%%
260
261acquire_node_key_lease_grant(Data = #statem_data{connection_name = Name, node_key_ttl_in_seconds = TTL}) ->
262    %% acquire a lease for TTL
263    {ok, #{'ID' := LeaseID}} = eetcd_lease:grant(Name, TTL),
264    {ok, KeepalivePid} = eetcd_lease:keep_alive(Name, LeaseID),
265    rabbit_log:debug("etcd peer discovery: acquired a lease ~p for node key ~s with TTL = ~p", [LeaseID, node_key(Data), TTL]),
266    Data#statem_data{
267        node_key_lease_id = LeaseID,
268        node_lease_keepalive_pid = KeepalivePid
269    }.
270
271registration_context(ConnName, #statem_data{node_key_lease_id = LeaseID}) ->
272    Ctx1 = eetcd_kv:new(ConnName),
273    eetcd_kv:with_lease(Ctx1, LeaseID).
274
275unregistration_context(ConnName, _Data) ->
276    eetcd_kv:new(ConnName).
277
278lock_context(ConnName, #statem_data{lock_ttl_in_seconds = LeaseTTL}) ->
279    %% LeaseTT is in seconds, eetcd_lock:with_timeout/2 expects milliseconds
280    eetcd_lock:with_timeout(eetcd_lock:new(ConnName), LeaseTTL * 1000).
281
282unlock_context(ConnName, #statem_data{lock_ttl_in_seconds = Timeout}) ->
283    %% caps the timeout here using the lock TTL value, it makes more
284    %% sense than picking an arbitrary number. MK.
285    eetcd_lock:with_timeout(eetcd_lock:new(ConnName), Timeout * 1000).
286
287node_key_base(#statem_data{cluster_name = ClusterName, key_prefix = Prefix}) ->
288    to_binary(rabbit_misc:format("/rabbitmq/discovery/~s/clusters/~s/nodes", [Prefix, ClusterName])).
289
290node_key(Data) ->
291    to_binary(rabbit_misc:format("~s/~s", [node_key_base(Data), node()])).
292
293lock_key_base(#statem_data{key_prefix = Prefix, cluster_name = ClusterName}) ->
294    Key = rabbit_misc:format("/rabbitmq/locks/~s/clusters/~s/registration",
295                             [Prefix, ClusterName]),
296    to_binary(Key).
297
298%% This value is not used and merely
299%% provides additional context to the operator.
300registration_value(#statem_data{node_key_lease_id = LeaseID, node_key_ttl_in_seconds = TTL}) ->
301    to_binary(rabbit_json:encode(#{
302        <<"node">>     => to_binary(node()),
303        <<"lease_id">> => LeaseID,
304        <<"ttl">>      => TTL
305    })).
306
307-spec extract_node(binary()) -> atom() | {error, any()}.
308
309extract_node(Payload) ->
310    case rabbit_json:decode(Payload) of
311        {error, Error} -> {error, Error};
312        Map ->
313            case maps:get(<<"node">>, Map, undefined) of
314                undefined -> undefined;
315                Node      -> rabbit_data_coercion:to_atom(Node)
316            end
317    end.
318
319filter_node(undefined)  -> false;
320filter_node({error, _}) -> false;
321filter_node(_Other)     -> true.
322
323
324error_is_already_started({_Endpoint, already_started}) ->
325    true;
326error_is_already_started({_Endpoint, _}) ->
327    false.
328
329connect(Name, Endpoints, Transport, TransportOpts, Data) ->
330    case eetcd_conn:lookup(Name) of
331        {ok, Pid} when is_pid(Pid) ->
332            {ok, Pid};
333        {error, eetcd_conn_unavailable} ->
334            do_connect(Name, Endpoints, Transport, TransportOpts, Data)
335    end.
336
337do_connect(Name, Endpoints, Transport, TransportOpts, Data = #statem_data{username = Username}) ->
338    case Username of
339        undefined -> rabbit_log:info("etcd peer discovery: will connect to etcd without authentication (no credentials configured)");
340        _         -> rabbit_log:info("etcd peer discovery: will connect to etcd as user '~s'", [Username])
341    end,
342    case eetcd:open(Name, Endpoints, connection_options(Data), Transport, TransportOpts) of
343        {ok, Pid} -> {ok, Pid};
344        {error, Errors0} ->
345            Errors = case is_list(Errors0) of
346                         true  -> Errors0;
347                         false -> [Errors0]
348                     end,
349            rabbit_log:debug("etcd peer discovery: connection errors: ~p",
350                             [Errors]),
351            rabbit_log:debug("etcd peer discovery: are all connection errors benign?: ~p",
352                             [lists:all(fun error_is_already_started/1, Errors)]),
353            %% If all errors are already_started we can ignore them.
354            %% eetcd registers connections under a name
355            case lists:all(fun error_is_already_started/1, Errors) of
356                true ->
357                    eetcd_conn:lookup(Name);
358                false ->
359                    {error, Errors}
360            end
361    end.
362
363connection_options(#statem_data{username = Username, obfuscated_password = Password}) ->
364    SharedOpts = [{mode, random}],
365    case {Username, Password} of
366        {undefined, _} -> SharedOpts;
367        {_, undefined} -> SharedOpts;
368        {UVal, PVal}   ->
369            [{name, UVal}, {password, to_list(deobfuscate(PVal))}] ++ SharedOpts
370    end.
371
372
373obfuscate(undefined) -> undefined;
374obfuscate(Password) ->
375    credentials_obfuscation:encrypt(to_binary(Password)).
376
377deobfuscate(undefined) -> undefined;
378deobfuscate(Password) ->
379    credentials_obfuscation:decrypt(to_binary(Password)).
380
381disconnect(ConnName, #statem_data{connection_monitor = Ref}) ->
382    maybe_demonitor(Ref),
383    do_disconnect(ConnName).
384
385unregister(Conn, Data = #statem_data{node_key_lease_id = LeaseID, node_lease_keepalive_pid = KAPid}) ->
386    Ctx = unregistration_context(Conn, Data),
387    Key = node_key(Data),
388    eetcd_kv:delete(Ctx, Key),
389    rabbit_log:debug("etcd peer discovery: deleted key ~s, done with unregistration", [Key]),
390    eetcd_lease:revoke(Ctx, LeaseID),
391    exit(KAPid, normal),
392    rabbit_log:debug("etcd peer discovery: revoked a lease ~p for node key ~s", [LeaseID, Key]),
393    ok.
394
395reply_and_retain_state(From, Value) ->
396    gen_statem:reply(From, Value),
397    keep_state_and_data.
398
399maybe_demonitor(undefined) ->
400    true;
401maybe_demonitor(Ref) when is_reference(Ref) ->
402    erlang:demonitor(Ref).
403
404reset_statem_data(Data0 = #statem_data{endpoints = Es, connection_monitor = Ref}) when Es =/= undefined ->
405    maybe_demonitor(Ref),
406    Data0#statem_data{
407        connection_pid = undefined,
408        connection_monitor = undefined
409    }.
410
411ensure_disconnected(Name, #statem_data{connection_monitor = Ref}) ->
412    maybe_demonitor(Ref),
413    do_disconnect(Name).
414
415do_disconnect(Name) ->
416    try
417        eetcd:close(Name)
418    catch _:_ ->
419        ok
420    end.
421
422reconnection_interval() ->
423    3000.
424
425normalize_settings(Map) when is_map(Map) ->
426    Endpoints = maps:get(endpoints, Map, []),
427    LegacyEndpoints = case maps:get(etcd_host, Map, undefined) of
428        undefined -> [];
429        Hostname ->
430            Port = maps:get(etcd_port, Map, 2379),
431            [rabbit_misc:format("~s:~p", [Hostname, Port])]
432    end,
433
434    AllEndpoints = Endpoints ++ LegacyEndpoints,
435    maps:merge(maps:without([etcd_prefix, etcd_node_ttl, lock_wait_time], Map),
436               #{endpoints => AllEndpoints}).
437
438pick_transport(#statem_data{tls_options = []}) ->
439    {tcp, []};
440pick_transport(#statem_data{tls_options = Opts}) ->
441    {tls, Opts}.
442