1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% The Initial Developer of the Original Code is AWeber Communications. 6%% Copyright (c) 2015-2016 AWeber Communications 7%% Copyright (c) 2016-2021 VMware, Inc. or its affiliates. All rights reserved. 8%% 9 10-module(rabbit_peer_discovery_consul). 11-behaviour(rabbit_peer_discovery_backend). 12 13-include_lib("kernel/include/logger.hrl"). 14-include_lib("rabbit_common/include/rabbit.hrl"). 15-include_lib("rabbitmq_peer_discovery_common/include/rabbit_peer_discovery.hrl"). 16-include("rabbit_peer_discovery_consul.hrl"). 17 18-export([init/0, list_nodes/0, supports_registration/0, register/0, unregister/0, 19 post_registration/0, lock/1, unlock/1]). 20-export([send_health_check_pass/0]). 21-export([session_ttl_update_callback/1]). 22%% useful for debugging from the REPL with RABBITMQ_ALLOW_INPUT 23-export([service_id/0, service_address/0]). 24%% for tests 25-ifdef(TEST). 26-compile(export_all). 27-endif. 28 29-define(CONFIG_MODULE, rabbit_peer_discovery_config). 30-define(UTIL_MODULE, rabbit_peer_discovery_util). 31 32-define(CONSUL_CHECK_NOTES, "RabbitMQ Consul-based peer discovery plugin TTL check"). 33 34%% 35%% API 36%% 37 38init() -> 39 ?LOG_DEBUG( 40 "Peer discovery Consul: initialising...", 41 #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), 42 ok = application:ensure_started(inets), 43 %% we cannot start this plugin yet since it depends on the rabbit app, 44 %% which is in the process of being started by the time this function is called 45 application:load(rabbitmq_peer_discovery_common), 46 rabbit_peer_discovery_httpc:maybe_configure_proxy(), 47 rabbit_peer_discovery_httpc:maybe_configure_inet6(). 48 49-spec list_nodes() -> {ok, {Nodes :: list(), NodeType :: rabbit_types:node_type()}} | {error, Reason :: string()}. 50 51list_nodes() -> 52 Fun0 = fun() -> {ok, {[], disc}} end, 53 Fun1 = fun() -> 54 ?LOG_WARNING( 55 "Peer discovery backend is set to ~s but final " 56 "config does not contain " 57 "rabbit.cluster_formation.peer_discovery_consul. " 58 "Cannot discover any nodes because Consul cluster " 59 "details are not configured!", 60 [?MODULE], 61 #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), 62 {ok, {[], disc}} 63 end, 64 Fun2 = fun(Proplist) -> 65 M = maps:from_list(Proplist), 66 case rabbit_peer_discovery_httpc:get(get_config_key(consul_scheme, M), 67 get_config_key(consul_host, M), 68 get_integer_config_key(consul_port, M), 69 rabbit_peer_discovery_httpc:build_path([v1, health, service, get_config_key(consul_svc, M)]), 70 list_nodes_query_args(), 71 maybe_add_acl([]), 72 []) of 73 {ok, Nodes} -> 74 IncludeWithWarnings = get_config_key(consul_include_nodes_with_warnings, M), 75 Result = extract_nodes( 76 filter_nodes(Nodes, IncludeWithWarnings)), 77 {ok, {Result, disc}}; 78 {error, _} = Error -> 79 Error 80 end 81 end, 82 rabbit_peer_discovery_util:maybe_backend_configured(?BACKEND_CONFIG_KEY, Fun0, Fun1, Fun2). 83 84 85-spec supports_registration() -> boolean(). 86 87supports_registration() -> 88 true. 89 90 91-spec register() -> ok | {error, Reason :: string()}. 92register() -> 93 M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), 94 case registration_body() of 95 {ok, Body} -> 96 ?LOG_DEBUG( 97 "Consul registration body: ~s", [Body], 98 #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), 99 case rabbit_peer_discovery_httpc:put(get_config_key(consul_scheme, M), 100 get_config_key(consul_host, M), 101 get_integer_config_key(consul_port, M), 102 rabbit_peer_discovery_httpc:build_path([v1, agent, service, register]), 103 [], 104 maybe_add_acl([]), 105 Body) of 106 {ok, _} -> ok; 107 Error -> Error 108 end; 109 Error -> Error 110 end. 111 112 113-spec unregister() -> ok | {error, Reason :: string()}. 114unregister() -> 115 M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), 116 ID = service_id(), 117 ?LOG_DEBUG( 118 "Unregistering with Consul using service ID '~s'", [ID], 119 #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), 120 case rabbit_peer_discovery_httpc:put(get_config_key(consul_scheme, M), 121 get_config_key(consul_host, M), 122 get_integer_config_key(consul_port, M), 123 rabbit_peer_discovery_httpc:build_path([v1, agent, service, deregister, ID]), 124 [], 125 maybe_add_acl([]), 126 []) of 127 {ok, Response} -> 128 ?LOG_INFO( 129 "Consul's response to the unregistration attempt: ~p", 130 [Response], 131 #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), 132 ok; 133 Error -> 134 ?LOG_INFO( 135 "Failed to unregister service with ID '~s` with Consul: ~p", 136 [ID, Error], 137 #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), 138 Error 139 end. 140 141-spec post_registration() -> ok. 142 143post_registration() -> 144 %% don't wait for one full interval, make 145 %% sure we let Consul know the service is healthy 146 %% right after registration. See rabbitmq/rabbitmq_peer_discovery_consul#8. 147 send_health_check_pass(), 148 ok. 149 150-spec lock(Node :: atom()) -> {ok, Data :: term()} | {error, Reason :: string()}. 151 152lock(Node) -> 153 M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), 154 ?LOG_DEBUG( 155 "Effective Consul peer discovery configuration: ~p", [M], 156 #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), 157 case create_session(Node, get_config_key(consul_svc_ttl, M)) of 158 {ok, SessionId} -> 159 TRef = start_session_ttl_updater(SessionId), 160 Now = erlang:system_time(seconds), 161 EndTime = Now + get_config_key(lock_wait_time, M), 162 lock(TRef, SessionId, Now, EndTime); 163 {error, Reason} -> 164 {error, lists:flatten(io_lib:format("Error while creating a session, reason: ~s", 165 [Reason]))} 166 end. 167 168-spec unlock({SessionId :: string(), TRef :: timer:tref()}) -> ok. 169 170unlock({SessionId, TRef}) -> 171 timer:cancel(TRef), 172 ?LOG_DEBUG( 173 "Stopped session renewal", 174 #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), 175 case release_lock(SessionId) of 176 {ok, true} -> 177 ok; 178 {ok, false} -> 179 {error, lists:flatten(io_lib:format("Error while releasing the lock, session ~s may have been invalidated", [SessionId]))}; 180 {error, _} = Err -> 181 Err 182 end. 183 184%% 185%% Implementation 186%% 187 188-spec get_config_key(Key :: atom(), Map :: #{atom() => peer_discovery_config_value()}) 189 -> peer_discovery_config_value(). 190 191get_config_key(Key, Map) -> 192 ?CONFIG_MODULE:get(Key, ?CONFIG_MAPPING, Map). 193 194-spec get_integer_config_key(Key :: atom(), Map :: #{atom() => peer_discovery_config_value()}) 195 -> integer(). 196 197get_integer_config_key(Key, Map) -> 198 ?CONFIG_MODULE:get_integer(Key, ?CONFIG_MAPPING, Map). 199 200 201-spec filter_nodes(ConsulResult :: list(), AllowWarning :: atom()) -> list(). 202filter_nodes(Nodes, Warn) -> 203 case Warn of 204 true -> 205 lists:filter(fun(Node) -> 206 Checks = maps:get(<<"Checks">>, Node), 207 lists:all(fun(Check) -> 208 lists:member(maps:get(<<"Status">>, Check), 209 [<<"passing">>, <<"warning">>]) 210 end, 211 Checks) 212 end, 213 Nodes); 214 false -> Nodes 215 end. 216 217-spec extract_nodes(ConsulResult :: list()) -> list(). 218extract_nodes(Data) -> extract_nodes(Data, []). 219 220-spec extract_nodes(ConsulResult :: list(), Nodes :: list()) 221 -> list(). 222extract_nodes([], Nodes) -> Nodes; 223extract_nodes([H | T], Nodes) -> 224 Service = maps:get(<<"Service">>, H), 225 Value = maps:get(<<"Address">>, Service), 226 NodeName = case ?UTIL_MODULE:as_string(Value) of 227 "" -> 228 NodeData = maps:get(<<"Node">>, H), 229 Node = maps:get(<<"Node">>, NodeData), 230 maybe_add_domain(?UTIL_MODULE:node_name(Node)); 231 Address -> 232 ?UTIL_MODULE:node_name(Address) 233 end, 234 extract_nodes(T, lists:merge(Nodes, [NodeName])). 235 236-spec maybe_add_acl(QArgs :: list()) -> list(). 237maybe_add_acl(QArgs) -> 238 M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), 239 case get_config_key(consul_acl_token, M) of 240 "undefined" -> QArgs; 241 ACL -> lists:append(QArgs, [{"X-Consul-Token", ACL}]) 242 end. 243 244-spec list_nodes_query_args() -> list(). 245list_nodes_query_args() -> 246 M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), 247 list_nodes_query_args(get_config_key(cluster_name, M)). 248 249-spec list_nodes_query_args(ClusterName :: string()) -> list(). 250list_nodes_query_args(Cluster) -> 251 ClusterTag = case Cluster of 252 "default" -> []; 253 _ -> [{tag, Cluster}] 254 end, 255 M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), 256 list_nodes_query_args(ClusterTag, get_config_key(consul_include_nodes_with_warnings, M)). 257 258-spec list_nodes_query_args(Args :: list(), AllowWarn :: atom()) -> list(). 259list_nodes_query_args(Value, Warn) -> 260 case Warn of 261 true -> Value; 262 false -> [passing | Value] 263 end. 264 265-spec registration_body() -> {ok, Body :: binary()} | {error, atom()}. 266registration_body() -> 267 Payload = build_registration_body(), 268 registration_body(rabbit_json:try_encode(Payload)). 269 270-spec registration_body(Response :: {ok, Body :: string()} | 271 {error, Reason :: atom()}) 272 -> {ok, Body :: binary()} | {error, Reason :: atom()}. 273registration_body({ok, Body}) -> 274 {ok, rabbit_data_coercion:to_binary(Body)}; 275registration_body({error, Reason}) -> 276 ?LOG_ERROR( 277 "Error serializing the request body: ~p", 278 [Reason], 279 #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), 280 {error, Reason}. 281 282 283-spec build_registration_body() -> list(). 284build_registration_body() -> 285 Payload1 = registration_body_add_id(), 286 Payload2 = registration_body_add_name(Payload1), 287 Payload3 = registration_body_maybe_add_address(Payload2), 288 Payload4 = registration_body_add_port(Payload3), 289 Payload5 = registration_body_maybe_add_check(Payload4), 290 Payload6 = registration_body_maybe_add_tag(Payload5), 291 registration_body_maybe_add_meta(Payload6). 292 293-spec registration_body_add_id() -> list(). 294registration_body_add_id() -> 295 [{'ID', rabbit_data_coercion:to_atom(service_id())}]. 296 297-spec registration_body_add_name(Payload :: list()) -> list(). 298registration_body_add_name(Payload) -> 299 M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), 300 Name = rabbit_data_coercion:to_atom(get_config_key(consul_svc, M)), 301 lists:append(Payload, [{'Name', Name}]). 302 303-spec registration_body_maybe_add_address(Payload :: list()) 304 -> list(). 305registration_body_maybe_add_address(Payload) -> 306 registration_body_maybe_add_address(Payload, service_address()). 307 308-spec registration_body_maybe_add_address(Payload :: list(), string()) 309 -> list(). 310registration_body_maybe_add_address(Payload, "undefined") -> Payload; 311registration_body_maybe_add_address(Payload, Address) -> 312 lists:append(Payload, [{'Address', rabbit_data_coercion:to_atom(Address)}]). 313 314registration_body_maybe_add_check(Payload) -> 315 M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), 316 TTL = get_config_key(consul_svc_ttl, M), 317 registration_body_maybe_add_check(Payload, TTL). 318 319-spec registration_body_maybe_add_check(Payload :: list(), 320 TTL :: integer() | undefined) 321 -> list(). 322registration_body_maybe_add_check(Payload, undefined) -> 323 case registration_body_maybe_add_deregister([]) of 324 [{'DeregisterCriticalServiceAfter', _}]-> 325 ?LOG_WARNING( 326 "Can't use Consul's service deregistration feature without " 327 "using TTL. The parameter will be ignored", 328 #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), 329 Payload; 330 331 _ -> Payload 332 end; 333registration_body_maybe_add_check(Payload, TTL) -> 334 CheckItems = [{'Notes', rabbit_data_coercion:to_atom(?CONSUL_CHECK_NOTES)}, 335 {'TTL', rabbit_data_coercion:to_atom(service_ttl(TTL))}, 336 {'Status', 'passing'}], 337 Check = [{'Check', registration_body_maybe_add_deregister(CheckItems)}], 338 lists:append(Payload, Check). 339 340-spec registration_body_add_port(Payload :: list()) -> list(). 341registration_body_add_port(Payload) -> 342 M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), 343 lists:append(Payload, 344 [{'Port', get_config_key(consul_svc_port, M)}]). 345 346registration_body_maybe_add_deregister(Payload) -> 347 M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), 348 Deregister = get_config_key(consul_deregister_after, M), 349 registration_body_maybe_add_deregister(Payload, Deregister). 350 351-spec registration_body_maybe_add_deregister(Payload :: list(), 352 TTL :: integer() | undefined) 353 -> list(). 354registration_body_maybe_add_deregister(Payload, undefined) -> Payload; 355registration_body_maybe_add_deregister(Payload, Deregister_After) -> 356 Deregister = {'DeregisterCriticalServiceAfter', 357 rabbit_data_coercion:to_atom(service_ttl(Deregister_After))}, 358 Payload ++ [Deregister]. 359 360-spec registration_body_maybe_add_tag(Payload :: list()) -> list(). 361registration_body_maybe_add_tag(Payload) -> 362 M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), 363 Value = get_config_key(cluster_name, M), 364 Tags = ?UTIL_MODULE:as_list(get_config_key(consul_svc_tags, M)), 365 registration_body_maybe_add_tag(Payload, Value, Tags). 366 367-spec registration_body_maybe_add_tag(Payload :: list(), 368 ClusterName :: string(), 369 Tags :: list()) 370 -> list(). 371registration_body_maybe_add_tag(Payload, "default", []) -> Payload; 372registration_body_maybe_add_tag(Payload, "default", Tags) -> 373 lists:append(Payload, [{'Tags', [rabbit_data_coercion:to_atom(X) || X <- Tags]}]); 374registration_body_maybe_add_tag(Payload, Cluster, []) -> 375 lists:append(Payload, [{'Tags', [rabbit_data_coercion:to_atom(Cluster)]}]); 376registration_body_maybe_add_tag(Payload, Cluster, Tags) -> 377 lists:append(Payload, [{'Tags', [rabbit_data_coercion:to_atom(Cluster)] ++ [rabbit_data_coercion:to_atom(X) || X <- Tags]}]). 378 379 380-spec registration_body_maybe_add_meta(Payload :: list()) -> list(). 381registration_body_maybe_add_meta(Payload) -> 382 M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), 383 ClusterName = get_config_key(cluster_name, M), 384 Meta = ?UTIL_MODULE:as_list(get_config_key(consul_svc_meta, M)), 385 registration_body_maybe_add_meta(Payload, ClusterName, Meta). 386 387-spec registration_body_maybe_add_meta(Payload :: list(), 388 ClusterName :: string(), 389 Meta :: list()) -> list(). 390registration_body_maybe_add_meta(Payload, "default", []) -> 391 Payload; 392registration_body_maybe_add_meta(Payload, "default", Meta) -> 393 lists:append(Payload, [{<<"meta">>, Meta}]); 394registration_body_maybe_add_meta(Payload, _ClusterName, []) -> 395 Payload; 396registration_body_maybe_add_meta(Payload, ClusterName, Meta) -> 397 Merged = maps:to_list(maps:merge(#{<<"cluster">> => rabbit_data_coercion:to_binary(ClusterName)}, maps:from_list(Meta))), 398 lists:append(Payload, [{<<"meta">>, Merged}]). 399 400 401-spec validate_addr_parameters(false | true, false | true) -> false | true. 402validate_addr_parameters(false, true) -> 403 ?LOG_WARNING( 404 "The parameter CONSUL_SVC_ADDR_NODENAME" 405 " can be used only if CONSUL_SVC_ADDR_AUTO is true." 406 " CONSUL_SVC_ADDR_NODENAME value will be ignored.", 407 #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), 408 false; 409validate_addr_parameters(_, _) -> 410 true. 411 412 413-spec service_address() -> string(). 414service_address() -> 415 M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), 416 validate_addr_parameters(get_config_key(consul_svc_addr_auto, M), 417 get_config_key(consul_svc_addr_nodename, M)), 418 service_address(get_config_key(consul_svc_addr, M), 419 get_config_key(consul_svc_addr_auto, M), 420 get_config_key(consul_svc_addr_nic, M), 421 get_config_key(consul_svc_addr_nodename, M)). 422 423 424-spec service_address(Static :: string(), 425 Auto :: boolean(), 426 AutoNIC :: string(), 427 FromNodename :: boolean()) -> string(). 428service_address(_, true, "undefined", FromNodename) -> 429 rabbit_peer_discovery_util:node_hostname(FromNodename); 430service_address(Value, false, "undefined", _) -> 431 Value; 432service_address(_, true, NIC, _) -> 433 %% TODO: support IPv6 434 {ok, Addr} = rabbit_peer_discovery_util:nic_ipv4(NIC), 435 Addr; 436%% this combination makes no sense but this is what rabbitmq-autocluster 437%% and this plugin have been allowing for a couple of years, so we keep 438%% this clause around for backwards compatibility. 439%% See rabbitmq/rabbitmq-peer-discovery-consul#12 for details. 440service_address(_, false, NIC, _) -> 441 {ok, Addr} = rabbit_peer_discovery_util:nic_ipv4(NIC), 442 Addr. 443 444 445-spec service_id() -> string(). 446service_id() -> 447 M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), 448 service_id(get_config_key(consul_svc, M), 449 service_address()). 450 451-spec service_id(Name :: string(), Address :: string()) -> string(). 452service_id(Service, "undefined") -> Service; 453service_id(Service, Address) -> 454 string:join([Service, Address], ":"). 455 456-spec service_ttl(TTL :: integer()) -> string(). 457service_ttl(Value) -> 458 rabbit_peer_discovery_util:as_string(Value) ++ "s". 459 460-spec maybe_add_domain(Domain :: atom()) -> atom(). 461maybe_add_domain(Value) -> 462 M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), 463 case get_config_key(consul_use_longname, M) of 464 true -> 465 rabbit_data_coercion:to_atom(string:join([atom_to_list(Value), 466 "node", 467 get_config_key(consul_domain, M)], 468 ".")); 469 false -> Value 470 end. 471 472 473%%-------------------------------------------------------------------- 474%% @doc 475%% Let Consul know that this node is still around 476%% @end 477%%-------------------------------------------------------------------- 478 479-spec send_health_check_pass() -> ok. 480 481send_health_check_pass() -> 482 Service = string:join(["service", service_id()], ":"), 483 M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), 484 ?LOG_DEBUG( 485 "Running Consul health check", 486 #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), 487 case rabbit_peer_discovery_httpc:put(get_config_key(consul_scheme, M), 488 get_config_key(consul_host, M), 489 get_integer_config_key(consul_port, M), 490 rabbit_peer_discovery_httpc:build_path([v1, agent, check, pass, Service]), 491 [], 492 maybe_add_acl([]), 493 []) of 494 {ok, []} -> ok; 495 {error, "429"} -> 496 %% Too Many Requests, see https://www.consul.io/docs/agent/checks.html 497 ?LOG_WARNING( 498 "Consul responded to a health check with 429 Too Many Requests", 499 #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), 500 ok; 501 {error, "500"} -> 502 ?LOG_WARNING( 503 "Consul responded to a health check with a 500 status, will " 504 "wait and try re-registering", 505 #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), 506 maybe_re_register(wait_for_list_nodes()), 507 ok; 508 {error, Reason} -> 509 ?LOG_ERROR( 510 "Error running Consul health check: ~p", 511 [Reason], 512 #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), 513 ok 514 end. 515 516maybe_re_register({error, Reason}) -> 517 ?LOG_ERROR( 518 "Internal error in Consul while updating health check. " 519 "Cannot obtain list of nodes registered in Consul either: ~p", 520 [Reason], 521 #{domain => ?RMQLOG_DOMAIN_PEER_DIS}); 522maybe_re_register({ok, {Members, _NodeType}}) -> 523 maybe_re_register(Members); 524maybe_re_register({ok, Members}) -> 525 maybe_re_register(Members); 526maybe_re_register(Members) -> 527 case lists:member(node(), Members) of 528 true -> 529 ?LOG_ERROR( 530 "Internal error in Consul while updating health check", 531 #{domain => ?RMQLOG_DOMAIN_PEER_DIS}); 532 false -> 533 ?LOG_ERROR( 534 "Internal error in Consul while updating health check, " 535 "node is not registered. Re-registering", 536 #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), 537 register() 538 end. 539 540wait_for_list_nodes() -> 541 wait_for_list_nodes(60). 542 543wait_for_list_nodes(N) -> 544 case {list_nodes(), N} of 545 {Reply, 0} -> 546 Reply; 547 {{ok, _} = Reply, _} -> 548 Reply; 549 {{error, _}, _} -> 550 timer:sleep(1000), 551 wait_for_list_nodes(N - 1) 552 end. 553 554%%-------------------------------------------------------------------- 555%% @private 556%% @doc 557%% Create a session to be acquired for a common key 558%% @end 559%%-------------------------------------------------------------------- 560-spec create_session(string(), pos_integer()) -> {ok, string()} | {error, Reason::string()}. 561create_session(Name, TTL) -> 562 case consul_session_create([], maybe_add_acl([]), 563 [{'Name', Name}, 564 {'TTL', rabbit_data_coercion:to_atom(service_ttl(TTL))}]) of 565 {ok, Response} -> 566 {ok, get_session_id(Response)}; 567 {error, _} = Err -> 568 Err 569 end. 570 571%%-------------------------------------------------------------------- 572%% @private 573%% @doc 574%% Create session 575%% @end 576%%-------------------------------------------------------------------- 577-spec consul_session_create(Query, Headers, Body) -> {ok, string()} | {error, any()} when 578 Query :: list(), 579 Headers :: [{string(), string()}], 580 Body :: term(). 581consul_session_create(Query, Headers, Body) -> 582 M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), 583 case serialize_json_body(Body) of 584 {ok, Serialized} -> 585 rabbit_peer_discovery_httpc:put(get_config_key(consul_scheme, M), 586 get_config_key(consul_host, M), 587 get_integer_config_key(consul_port, M), 588 "v1/session/create", 589 Query, 590 Headers, 591 Serialized); 592 {error, _} = Err -> 593 Err 594 end. 595 596%%-------------------------------------------------------------------- 597%% @private 598%% @doc 599%% Process the result of JSON encoding the request body payload, 600%% returning the body as a binary() value or the error returned by 601%% the JSON serialization library. 602%% @end 603%%-------------------------------------------------------------------- 604-spec serialize_json_body(term()) -> {ok, Payload :: binary()} | {error, atom()}. 605serialize_json_body([]) -> {ok, []}; 606serialize_json_body(Payload) -> 607 case rabbit_json:try_encode(Payload) of 608 {ok, Body} -> {ok, Body}; 609 {error, Reason} -> {error, Reason} 610 end. 611 612%%-------------------------------------------------------------------- 613%% @private 614%% @doc 615%% Extract session ID from Consul response 616%% @end 617%%-------------------------------------------------------------------- 618-spec get_session_id(term()) -> string(). 619get_session_id(#{<<"ID">> := ID}) -> binary:bin_to_list(ID). 620 621%%-------------------------------------------------------------------- 622%% @private 623%% @doc 624%% Start periodically renewing an existing session ttl 625%% @end 626%%-------------------------------------------------------------------- 627-spec start_session_ttl_updater(string()) -> ok. 628start_session_ttl_updater(SessionId) -> 629 M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), 630 Interval = get_config_key(consul_svc_ttl, M), 631 ?LOG_DEBUG( 632 "Starting session renewal", 633 #{domain => ?RMQLOG_DOMAIN_PEER_DIS}), 634 {ok, TRef} = timer:apply_interval(Interval * 500, ?MODULE, 635 session_ttl_update_callback, [SessionId]), 636 TRef. 637 638%% 639%% @doc 640%% Tries to acquire lock. If the lock is held by someone else, waits until it 641%% is released, or too much time has passed 642%% @end 643-spec lock(timer:tref(), string(), pos_integer(), pos_integer()) -> {ok, string()} | {error, string()}. 644lock(TRef, _, Now, EndTime) when EndTime < Now -> 645 timer:cancel(TRef), 646 {error, "Acquiring lock taking too long, bailing out"}; 647lock(TRef, SessionId, _, EndTime) -> 648 case acquire_lock(SessionId) of 649 {ok, true} -> 650 {ok, {SessionId, TRef}}; 651 {ok, false} -> 652 case get_lock_status() of 653 {ok, {SessionHeld, ModifyIndex}} -> 654 Wait = max(EndTime - erlang:system_time(seconds), 0), 655 case wait_for_lock_release(SessionHeld, ModifyIndex, Wait) of 656 ok -> 657 lock(TRef, SessionId, erlang:system_time(seconds), EndTime); 658 {error, Reason} -> 659 timer:cancel(TRef), 660 {error, lists:flatten(io_lib:format("Error waiting for lock release, reason: ~s",[Reason]))} 661 end; 662 {error, Reason} -> 663 timer:cancel(TRef), 664 {error, lists:flatten(io_lib:format("Error obtaining lock status, reason: ~s", [Reason]))} 665 end; 666 {error, Reason} -> 667 timer:cancel(TRef), 668 {error, lists:flatten(io_lib:format("Error while acquiring lock, reason: ~s", [Reason]))} 669 end. 670 671%%-------------------------------------------------------------------- 672%% @private 673%% @doc 674%% Acquire session for a key 675%% @end 676%%-------------------------------------------------------------------- 677-spec acquire_lock(string()) -> {ok, any()} | {error, string()}. 678acquire_lock(SessionId) -> 679 consul_kv_write(startup_lock_path(), [{acquire, SessionId}], maybe_add_acl([]), []). 680 681%%-------------------------------------------------------------------- 682%% @private 683%% @doc 684%% Release a previously acquired lock held by a given session 685%% @end 686%%-------------------------------------------------------------------- 687-spec release_lock(string()) -> {ok, any()} | {error, string()}. 688release_lock(SessionId) -> 689 consul_kv_write(startup_lock_path(), [{release, SessionId}], maybe_add_acl([]), []). 690 691%%-------------------------------------------------------------------- 692%% @private 693%% @doc 694%% Write KV store key value 695%% @end 696%%-------------------------------------------------------------------- 697-spec consul_kv_write(Path, Query, Headers, Body) -> {ok, any()} | {error, string()} when 698 Path :: string(), 699 Query :: [{string(), string()}], 700 Headers :: [{string(), string()}], 701 Body :: term(). 702consul_kv_write(Path, Query, Headers, Body) -> 703 M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), 704 case serialize_json_body(Body) of 705 {ok, Serialized} -> 706 rabbit_peer_discovery_httpc:put(get_config_key(consul_scheme, M), 707 get_config_key(consul_host, M), 708 get_integer_config_key(consul_port, M), 709 "v1/kv/" ++ Path, 710 Query, 711 Headers, 712 Serialized); 713 {error, _} = Err -> 714 Err 715 end. 716 717%%-------------------------------------------------------------------- 718%% @private 719%% @doc 720%% Read KV store key value 721%% @end 722%%-------------------------------------------------------------------- 723-spec consul_kv_read(Path, Query, Headers) -> {ok, term()} | {error, string()} when 724 Path :: string(), 725 Query :: [{string(), string()}], 726 Headers :: [{string(), string()}]. 727consul_kv_read(Path, Query, Headers) -> 728 M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), 729 rabbit_peer_discovery_httpc:get(get_config_key(consul_scheme, M), 730 get_config_key(consul_host, M), 731 get_integer_config_key(consul_port, M), 732 "v1/kv/" ++ Path, 733 Query, 734 Headers, 735 []). 736 737%%-------------------------------------------------------------------- 738%% @private 739%% @doc 740%% Get lock status 741%% XXX: probably makes sense to wrap output in a record to be 742%% more future-proof 743%% @end 744%%-------------------------------------------------------------------- 745-spec get_lock_status() -> {ok, term()} | {error, string()}. 746get_lock_status() -> 747 case consul_kv_read(startup_lock_path(), [], maybe_add_acl([])) of 748 {ok, [KeyData | _]} -> 749 SessionHeld = maps:get(<<"Session">>, KeyData, undefined) =/= undefined, 750 ModifyIndex = maps:get(<<"ModifyIndex">>, KeyData), 751 {ok, {SessionHeld, ModifyIndex}}; 752 {error, _} = Err -> 753 Err 754 end. 755 756%%-------------------------------------------------------------------- 757%% @private 758%% @doc 759%% Returns consul path for startup lock 760%% @end 761%%-------------------------------------------------------------------- 762-spec startup_lock_path() -> string(). 763startup_lock_path() -> 764 base_path() ++ "/" ++ "startup_lock". 765 766%%-------------------------------------------------------------------- 767%% @private 768%% @doc Return a list of path segments that are the base path for all 769%% consul kv keys related to current cluster. 770%% @end 771%%-------------------------------------------------------------------- 772-spec base_path() -> string(). 773base_path() -> 774 M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), 775 Segments = [get_config_key(consul_lock_prefix, M), get_config_key(cluster_name, M)], 776 rabbit_peer_discovery_httpc:build_path(Segments). 777 778%%-------------------------------------------------------------------- 779%% @private 780%% @doc 781%% Wait for lock to be released if it has been acquired by another node 782%% @end 783%%-------------------------------------------------------------------- 784-spec wait_for_lock_release(atom(), pos_integer(), pos_integer()) -> ok | {error, string()}. 785wait_for_lock_release(false, _, _) -> ok; 786wait_for_lock_release(_, Index, Wait) -> 787 case consul_kv_read(startup_lock_path(), 788 [{index, Index}, {wait, service_ttl(Wait)}], 789 maybe_add_acl([])) of 790 {ok, _} -> ok; 791 {error, _} = Err -> Err 792 end. 793 794%%-------------------------------------------------------------------- 795%% @doc 796%% Renew an existing session 797%% @end 798%%-------------------------------------------------------------------- 799-spec session_ttl_update_callback(string()) -> string(). 800session_ttl_update_callback(SessionId) -> 801 _ = consul_session_renew(SessionId, [], maybe_add_acl([])), 802 SessionId. 803 804%%-------------------------------------------------------------------- 805%% @private 806%% @doc 807%% Renew session TTL 808%% @end 809%%-------------------------------------------------------------------- 810-spec consul_session_renew(string(), [{string(), string()}], [{string(), string()}]) -> {ok, term()} | {error, string()}. 811consul_session_renew(SessionId, Query, Headers) -> 812 M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), 813 rabbit_peer_discovery_httpc:put(get_config_key(consul_scheme, M), 814 get_config_key(consul_host, M), 815 get_integer_config_key(consul_port, M), 816 rabbit_peer_discovery_httpc:build_path([v1, session, renew, rabbit_data_coercion:to_atom(SessionId)]), 817 Query, 818 Headers, 819 []). 820