1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbitmq_peer_discovery_etcd_v3_client). 9 10%% API 11-export([]). 12 13 14-behaviour(gen_statem). 15 16-export([start_link/1, start/1, stop/0]). 17-export([init/1, callback_mode/0, terminate/3]). 18-export([register/1, register/0, unregister/1, unregister/0, list_nodes/0, list_nodes/1]). 19-export([lock/0, lock/1, lock/2, unlock/0, unlock/1, unlock/2]). 20-export([recover/3, connected/3, disconnected/3]). 21 22%% for tests 23-export([extract_node/1, filter_node/1, registration_value/1, node_key_base/1, node_key/1, lock_key_base/1]). 24 25-import(rabbit_data_coercion, [to_binary/1, to_list/1]). 26 27-compile(nowarn_unused_function). 28 29-include("rabbit_peer_discovery_etcd.hrl"). 30 31%% 32%% API 33%% 34 35-define(ETCD_CONN_NAME, ?MODULE). 36%% 60s by default matches the default heartbeat timeout. 37%% We add 1s for state machine bookkeeping and 38-define(DEFAULT_NODE_KEY_LEASE_TTL, 61). 39%% don't allow node lease key TTL to be lower than this 40%% as overly low values can cause annoying timeouts in etcd client operations 41-define(MINIMUM_NODE_KEY_LEASE_TTL, 15). 42%% default randomized delay range was 5s to 60s, so this value 43%% produces a comparable delay 44-define(DEFAULT_LOCK_WAIT_TTL, 70). 45%% don't allow lock lease TTL to be lower than this 46%% as overly low values can cause annoying timeouts in etcd client operations 47-define(MINIMUM_LOCK_WAIT_TTL, 30). 48 49-define(CALL_TIMEOUT, 15000). 50 51start(Conf) -> 52 gen_statem:start({local, ?MODULE}, ?MODULE, Conf, []). 53 54start_link(Conf) -> 55 gen_statem:start_link({local, ?MODULE}, ?MODULE, Conf, []). 56 57stop() -> 58 gen_statem:stop(?MODULE). 59 60init(Args) -> 61 ok = application:ensure_started(eetcd), 62 Settings = normalize_settings(Args), 63 Endpoints = maps:get(endpoints, Settings), 64 Username = maps:get(etcd_username, Settings, undefined), 65 Password = maps:get(etcd_password, Settings, undefined), 66 TLSOpts = maps:get(ssl_options, Settings, []), 67 Actions = [{next_event, internal, start}], 68 {ok, recover, #statem_data{ 69 endpoints = Endpoints, 70 tls_options = TLSOpts, 71 username = Username, 72 obfuscated_password = obfuscate(Password), 73 key_prefix = maps:get(etcd_prefix, Settings, <<"rabbitmq">>), 74 node_key_ttl_in_seconds = erlang:max( 75 ?MINIMUM_NODE_KEY_LEASE_TTL, 76 maps:get(etcd_node_ttl, Settings, ?DEFAULT_NODE_KEY_LEASE_TTL) 77 ), 78 cluster_name = maps:get(cluster_name, Settings, <<"default">>), 79 lock_ttl_in_seconds = erlang:max( 80 ?MINIMUM_LOCK_WAIT_TTL, 81 maps:get(lock_wait_time, Settings, ?DEFAULT_LOCK_WAIT_TTL) 82 ) 83 }, Actions}. 84 85callback_mode() -> [state_functions, state_enter]. 86 87terminate(Reason, State, Data) -> 88 rabbit_log:debug("etcd v3 API client will terminate in state ~p, reason: ~p", 89 [State, Reason]), 90 disconnect(?ETCD_CONN_NAME, Data), 91 rabbit_log:debug("etcd v3 API client has disconnected"), 92 rabbit_log:debug("etcd v3 API client: total number of connections to etcd is ~p", [length(eetcd_conn_sup:info())]), 93 ok. 94 95register() -> 96 register(?MODULE). 97 98register(ServerRef) -> 99 gen_statem:call(ServerRef, register, ?CALL_TIMEOUT). 100 101unregister() -> 102 ?MODULE:unregister(?MODULE). 103 104unregister(ServerRef) -> 105 gen_statem:call(ServerRef, unregister, ?CALL_TIMEOUT). 106 107list_nodes() -> 108 list_nodes(?MODULE). 109 110list_nodes(ServerRef) -> 111 gen_statem:call(ServerRef, list_keys, ?CALL_TIMEOUT). 112 113lock() -> 114 lock(?MODULE, node()). 115 116lock(Node) -> 117 lock(?MODULE, Node). 118 119lock(ServerRef, Node) -> 120 gen_statem:call(ServerRef, {lock, Node}, ?CALL_TIMEOUT). 121 122unlock() -> 123 unlock(?MODULE, node()). 124 125unlock(LockKey) -> 126 unlock(?MODULE, LockKey). 127 128unlock(ServerRef, LockKey) -> 129 gen_statem:call(ServerRef, {unlock, LockKey}, ?CALL_TIMEOUT). 130 131%% 132%% States 133%% 134 135recover(enter, _PrevState, #statem_data{endpoints = Endpoints}) -> 136 rabbit_log:debug("etcd v3 API client has entered recovery state, endpoints: ~s", 137 [string:join(Endpoints, ",")]), 138 keep_state_and_data; 139recover(internal, start, Data = #statem_data{endpoints = Endpoints, connection_monitor = Ref}) -> 140 rabbit_log:debug("etcd v3 API client will attempt to connect, endpoints: ~s", 141 [string:join(Endpoints, ",")]), 142 maybe_demonitor(Ref), 143 {Transport, TransportOpts} = pick_transport(Data), 144 case Transport of 145 tcp -> rabbit_log:info("etcd v3 API client is configured to connect over plain TCP, without using TLS"); 146 tls -> rabbit_log:info("etcd v3 API client is configured to use TLS") 147 end, 148 ConnName = ?ETCD_CONN_NAME, 149 case connect(ConnName, Endpoints, Transport, TransportOpts, Data) of 150 {ok, Pid} -> 151 rabbit_log:debug("etcd v3 API client connection: ~p", [Pid]), 152 rabbit_log:debug("etcd v3 API client: total number of connections to etcd is ~p", [length(eetcd_conn_sup:info())]), 153 {next_state, connected, Data#statem_data{ 154 connection_name = ConnName, 155 connection_pid = Pid, 156 connection_monitor = monitor(process, Pid) 157 }}; 158 {error, Errors} -> 159 [rabbit_log:error("etcd peer discovery: failed to connect to endpoint ~p: ~p", [Endpoint, Err]) || {Endpoint, Err} <- Errors], 160 ensure_disconnected(?ETCD_CONN_NAME, Data), 161 Actions = [{state_timeout, reconnection_interval(), recover}], 162 {keep_state, reset_statem_data(Data), Actions} 163 end; 164recover(state_timeout, _PrevState, Data) -> 165 rabbit_log:debug("etcd peer discovery: connection entered a reconnection delay state"), 166 ensure_disconnected(?ETCD_CONN_NAME, Data), 167 {next_state, recover, reset_statem_data(Data)}; 168recover({call, From}, Req, _Data) -> 169 rabbit_log:error("etcd v3 API: client received a call ~p while not connected, will do nothing", [Req]), 170 gen_statem:reply(From, {error, not_connected}), 171 keep_state_and_data. 172 173 174connected(enter, _PrevState, Data) -> 175 rabbit_log:info("etcd peer discovery: successfully connected to etcd"), 176 177 {keep_state, acquire_node_key_lease_grant(Data)}; 178connected(info, {'DOWN', ConnRef, process, ConnPid, Reason}, Data = #statem_data{ 179 connection_pid = ConnPid, 180 connection_monitor = ConnRef 181 }) -> 182 rabbit_log:debug("etcd peer discovery: connection to etcd ~p is down: ~p", [ConnPid, Reason]), 183 maybe_demonitor(ConnRef), 184 {next_state, recover, reset_statem_data(Data)}; 185connected({call, From}, {lock, _Node}, Data = #statem_data{connection_name = Conn, lock_ttl_in_seconds = TTL}) -> 186 case eetcd_lease:grant(eetcd_kv:new(Conn), TTL) of 187 {ok, #{'ID' := LeaseID}} -> 188 Key = lock_key_base(Data), 189 rabbit_log:debug("etcd peer discovery: granted a lease ~p for registration lock ~s with TTL = ~p", [LeaseID, Key, TTL]), 190 case eetcd_lock:lock(lock_context(Conn, Data), Key, LeaseID) of 191 {ok, #{key := GeneratedKey}} -> 192 rabbit_log:debug("etcd peer discovery: successfully acquired a lock, lock owner key: ~s", [GeneratedKey]), 193 reply_and_retain_state(From, {ok, GeneratedKey}); 194 {error, _} = Error -> 195 rabbit_log:debug("etcd peer discovery: failed to acquire a lock using key ~s: ~p", [Key, Error]), 196 reply_and_retain_state(From, Error) 197 end; 198 {error, _} = Error -> 199 rabbit_log:debug("etcd peer discovery: failed to get a lease for registration lock: ~p", [Error]), 200 reply_and_retain_state(From, Error) 201 end; 202connected({call, From}, {unlock, GeneratedKey}, Data = #statem_data{connection_name = Conn}) -> 203 Ctx = unlock_context(Conn, Data), 204 case eetcd_lock:unlock(Ctx, GeneratedKey) of 205 {ok, _} -> 206 rabbit_log:debug("etcd peer discovery: successfully released lock, lock owner key: ~s", [GeneratedKey]), 207 reply_and_retain_state(From, ok); 208 {error, _} = Error -> 209 rabbit_log:debug("etcd peer discovery: failed to release registration lock, lock owner key: ~s, error ~p", 210 [GeneratedKey, Error]), 211 reply_and_retain_state(From, Error) 212 end; 213connected({call, From}, register, Data = #statem_data{connection_name = Conn}) -> 214 Ctx = registration_context(Conn, Data), 215 Key = node_key(Data), 216 eetcd_kv:put(Ctx, Key, registration_value(Data)), 217 rabbit_log:debug("etcd peer discovery: put key ~p, done with registration", [Key]), 218 gen_statem:reply(From, ok), 219 keep_state_and_data; 220connected({call, From}, unregister, Data = #statem_data{connection_name = Conn}) -> 221 unregister(Conn, Data), 222 gen_statem:reply(From, ok), 223 {keep_state, Data#statem_data{ 224 node_key_lease_id = undefined 225 }}; 226connected({call, From}, list_keys, Data = #statem_data{connection_name = Conn}) -> 227 Prefix = node_key_base(Data), 228 C1 = eetcd_kv:new(Conn), 229 C2 = eetcd_kv:with_prefix(eetcd_kv:with_key(C1, Prefix)), 230 rabbit_log:debug("etcd peer discovery: will use prefix ~s to query for node keys", [Prefix]), 231 {ok, #{kvs := Result}} = eetcd_kv:get(C2), 232 rabbit_log:debug("etcd peer discovery returned keys: ~p", [Result]), 233 Values = [maps:get(value, M) || M <- Result], 234 case Values of 235 Xs when is_list(Xs) -> 236 rabbit_log:debug("etcd peer discovery: listing node keys returned ~b results", [length(Xs)]), 237 ParsedNodes = lists:map(fun extract_node/1, Xs), 238 {Successes, Failures} = lists:partition(fun filter_node/1, ParsedNodes), 239 JoinedString = lists:join(",", [rabbit_data_coercion:to_list(Node) || Node <- lists:usort(Successes)]), 240 rabbit_log:error("etcd peer discovery: successfully extracted nodes: ~s", [JoinedString]), 241 lists:foreach(fun(Val) -> 242 rabbit_log:error("etcd peer discovery: failed to extract node name from etcd value ~p", [Val]) 243 end, Failures), 244 gen_statem:reply(From, lists:usort(Successes)), 245 keep_state_and_data; 246 Other -> 247 rabbit_log:debug("etcd peer discovery: listing node keys returned ~p", [Other]), 248 gen_statem:reply(From, []), 249 keep_state_and_data 250 end. 251 252disconnected(enter, _PrevState, _Data) -> 253 rabbit_log:info("etcd peer discovery: successfully disconnected from etcd"), 254 keep_state_and_data. 255 256 257%% 258%% Implementation 259%% 260 261acquire_node_key_lease_grant(Data = #statem_data{connection_name = Name, node_key_ttl_in_seconds = TTL}) -> 262 %% acquire a lease for TTL 263 {ok, #{'ID' := LeaseID}} = eetcd_lease:grant(Name, TTL), 264 {ok, KeepalivePid} = eetcd_lease:keep_alive(Name, LeaseID), 265 rabbit_log:debug("etcd peer discovery: acquired a lease ~p for node key ~s with TTL = ~p", [LeaseID, node_key(Data), TTL]), 266 Data#statem_data{ 267 node_key_lease_id = LeaseID, 268 node_lease_keepalive_pid = KeepalivePid 269 }. 270 271registration_context(ConnName, #statem_data{node_key_lease_id = LeaseID}) -> 272 Ctx1 = eetcd_kv:new(ConnName), 273 eetcd_kv:with_lease(Ctx1, LeaseID). 274 275unregistration_context(ConnName, _Data) -> 276 eetcd_kv:new(ConnName). 277 278lock_context(ConnName, #statem_data{lock_ttl_in_seconds = LeaseTTL}) -> 279 %% LeaseTT is in seconds, eetcd_lock:with_timeout/2 expects milliseconds 280 eetcd_lock:with_timeout(eetcd_lock:new(ConnName), LeaseTTL * 1000). 281 282unlock_context(ConnName, #statem_data{lock_ttl_in_seconds = Timeout}) -> 283 %% caps the timeout here using the lock TTL value, it makes more 284 %% sense than picking an arbitrary number. MK. 285 eetcd_lock:with_timeout(eetcd_lock:new(ConnName), Timeout * 1000). 286 287node_key_base(#statem_data{cluster_name = ClusterName, key_prefix = Prefix}) -> 288 to_binary(rabbit_misc:format("/rabbitmq/discovery/~s/clusters/~s/nodes", [Prefix, ClusterName])). 289 290node_key(Data) -> 291 to_binary(rabbit_misc:format("~s/~s", [node_key_base(Data), node()])). 292 293lock_key_base(#statem_data{key_prefix = Prefix, cluster_name = ClusterName}) -> 294 Key = rabbit_misc:format("/rabbitmq/locks/~s/clusters/~s/registration", 295 [Prefix, ClusterName]), 296 to_binary(Key). 297 298%% This value is not used and merely 299%% provides additional context to the operator. 300registration_value(#statem_data{node_key_lease_id = LeaseID, node_key_ttl_in_seconds = TTL}) -> 301 to_binary(rabbit_json:encode(#{ 302 <<"node">> => to_binary(node()), 303 <<"lease_id">> => LeaseID, 304 <<"ttl">> => TTL 305 })). 306 307-spec extract_node(binary()) -> atom() | {error, any()}. 308 309extract_node(Payload) -> 310 case rabbit_json:decode(Payload) of 311 {error, Error} -> {error, Error}; 312 Map -> 313 case maps:get(<<"node">>, Map, undefined) of 314 undefined -> undefined; 315 Node -> rabbit_data_coercion:to_atom(Node) 316 end 317 end. 318 319filter_node(undefined) -> false; 320filter_node({error, _}) -> false; 321filter_node(_Other) -> true. 322 323 324error_is_already_started({_Endpoint, already_started}) -> 325 true; 326error_is_already_started({_Endpoint, _}) -> 327 false. 328 329connect(Name, Endpoints, Transport, TransportOpts, Data) -> 330 case eetcd_conn:lookup(Name) of 331 {ok, Pid} when is_pid(Pid) -> 332 {ok, Pid}; 333 {error, eetcd_conn_unavailable} -> 334 do_connect(Name, Endpoints, Transport, TransportOpts, Data) 335 end. 336 337do_connect(Name, Endpoints, Transport, TransportOpts, Data = #statem_data{username = Username}) -> 338 case Username of 339 undefined -> rabbit_log:info("etcd peer discovery: will connect to etcd without authentication (no credentials configured)"); 340 _ -> rabbit_log:info("etcd peer discovery: will connect to etcd as user '~s'", [Username]) 341 end, 342 case eetcd:open(Name, Endpoints, connection_options(Data), Transport, TransportOpts) of 343 {ok, Pid} -> {ok, Pid}; 344 {error, Errors0} -> 345 Errors = case is_list(Errors0) of 346 true -> Errors0; 347 false -> [Errors0] 348 end, 349 rabbit_log:debug("etcd peer discovery: connection errors: ~p", 350 [Errors]), 351 rabbit_log:debug("etcd peer discovery: are all connection errors benign?: ~p", 352 [lists:all(fun error_is_already_started/1, Errors)]), 353 %% If all errors are already_started we can ignore them. 354 %% eetcd registers connections under a name 355 case lists:all(fun error_is_already_started/1, Errors) of 356 true -> 357 eetcd_conn:lookup(Name); 358 false -> 359 {error, Errors} 360 end 361 end. 362 363connection_options(#statem_data{username = Username, obfuscated_password = Password}) -> 364 SharedOpts = [{mode, random}], 365 case {Username, Password} of 366 {undefined, _} -> SharedOpts; 367 {_, undefined} -> SharedOpts; 368 {UVal, PVal} -> 369 [{name, UVal}, {password, to_list(deobfuscate(PVal))}] ++ SharedOpts 370 end. 371 372 373obfuscate(undefined) -> undefined; 374obfuscate(Password) -> 375 credentials_obfuscation:encrypt(to_binary(Password)). 376 377deobfuscate(undefined) -> undefined; 378deobfuscate(Password) -> 379 credentials_obfuscation:decrypt(to_binary(Password)). 380 381disconnect(ConnName, #statem_data{connection_monitor = Ref}) -> 382 maybe_demonitor(Ref), 383 do_disconnect(ConnName). 384 385unregister(Conn, Data = #statem_data{node_key_lease_id = LeaseID, node_lease_keepalive_pid = KAPid}) -> 386 Ctx = unregistration_context(Conn, Data), 387 Key = node_key(Data), 388 eetcd_kv:delete(Ctx, Key), 389 rabbit_log:debug("etcd peer discovery: deleted key ~s, done with unregistration", [Key]), 390 eetcd_lease:revoke(Ctx, LeaseID), 391 exit(KAPid, normal), 392 rabbit_log:debug("etcd peer discovery: revoked a lease ~p for node key ~s", [LeaseID, Key]), 393 ok. 394 395reply_and_retain_state(From, Value) -> 396 gen_statem:reply(From, Value), 397 keep_state_and_data. 398 399maybe_demonitor(undefined) -> 400 true; 401maybe_demonitor(Ref) when is_reference(Ref) -> 402 erlang:demonitor(Ref). 403 404reset_statem_data(Data0 = #statem_data{endpoints = Es, connection_monitor = Ref}) when Es =/= undefined -> 405 maybe_demonitor(Ref), 406 Data0#statem_data{ 407 connection_pid = undefined, 408 connection_monitor = undefined 409 }. 410 411ensure_disconnected(Name, #statem_data{connection_monitor = Ref}) -> 412 maybe_demonitor(Ref), 413 do_disconnect(Name). 414 415do_disconnect(Name) -> 416 try 417 eetcd:close(Name) 418 catch _:_ -> 419 ok 420 end. 421 422reconnection_interval() -> 423 3000. 424 425normalize_settings(Map) when is_map(Map) -> 426 Endpoints = maps:get(endpoints, Map, []), 427 LegacyEndpoints = case maps:get(etcd_host, Map, undefined) of 428 undefined -> []; 429 Hostname -> 430 Port = maps:get(etcd_port, Map, 2379), 431 [rabbit_misc:format("~s:~p", [Hostname, Port])] 432 end, 433 434 AllEndpoints = Endpoints ++ LegacyEndpoints, 435 maps:merge(maps:without([etcd_prefix, etcd_node_ttl, lock_wait_time], Map), 436 #{endpoints => AllEndpoints}). 437 438pick_transport(#statem_data{tls_options = []}) -> 439 {tcp, []}; 440pick_transport(#statem_data{tls_options = Opts}) -> 441 {tls, Opts}. 442