1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% The Initial Developer of the Original Code is AWeber Communications.
6%% Copyright (c) 2015-2016 AWeber Communications
7%% Copyright (c) 2016-2021 VMware, Inc. or its affiliates. All rights reserved.
8%%
9
10-module(rabbit_peer_discovery_consul).
11-behaviour(rabbit_peer_discovery_backend).
12
13-include_lib("kernel/include/logger.hrl").
14-include_lib("rabbit_common/include/rabbit.hrl").
15-include_lib("rabbitmq_peer_discovery_common/include/rabbit_peer_discovery.hrl").
16-include("rabbit_peer_discovery_consul.hrl").
17
18-export([init/0, list_nodes/0, supports_registration/0, register/0, unregister/0,
19         post_registration/0, lock/1, unlock/1]).
20-export([send_health_check_pass/0]).
21-export([session_ttl_update_callback/1]).
22%% useful for debugging from the REPL with RABBITMQ_ALLOW_INPUT
23-export([service_id/0, service_address/0]).
24%% for tests
25-ifdef(TEST).
26-compile(export_all).
27-endif.
28
29-define(CONFIG_MODULE, rabbit_peer_discovery_config).
30-define(UTIL_MODULE,   rabbit_peer_discovery_util).
31
32-define(CONSUL_CHECK_NOTES, "RabbitMQ Consul-based peer discovery plugin TTL check").
33
34%%
35%% API
36%%
37
38init() ->
39    ?LOG_DEBUG(
40       "Peer discovery Consul: initialising...",
41       #{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
42    ok = application:ensure_started(inets),
43    %% we cannot start this plugin yet since it depends on the rabbit app,
44    %% which is in the process of being started by the time this function is called
45    application:load(rabbitmq_peer_discovery_common),
46    rabbit_peer_discovery_httpc:maybe_configure_proxy(),
47    rabbit_peer_discovery_httpc:maybe_configure_inet6().
48
49-spec list_nodes() -> {ok, {Nodes :: list(), NodeType :: rabbit_types:node_type()}} | {error, Reason :: string()}.
50
51list_nodes() ->
52    Fun0 = fun() -> {ok, {[], disc}} end,
53    Fun1 = fun() ->
54                   ?LOG_WARNING(
55                      "Peer discovery backend is set to ~s but final "
56                      "config does not contain "
57                      "rabbit.cluster_formation.peer_discovery_consul. "
58                      "Cannot discover any nodes because Consul cluster "
59                      "details are not configured!",
60                      [?MODULE],
61                      #{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
62                   {ok, {[], disc}}
63           end,
64    Fun2 = fun(Proplist) ->
65                   M = maps:from_list(Proplist),
66                   case rabbit_peer_discovery_httpc:get(get_config_key(consul_scheme, M),
67                                                        get_config_key(consul_host, M),
68                                                        get_integer_config_key(consul_port, M),
69                                                        rabbit_peer_discovery_httpc:build_path([v1, health, service, get_config_key(consul_svc, M)]),
70                                                        list_nodes_query_args(),
71                                                        maybe_add_acl([]),
72                                                        []) of
73                       {ok, Nodes} ->
74                           IncludeWithWarnings = get_config_key(consul_include_nodes_with_warnings, M),
75                           Result = extract_nodes(
76                                      filter_nodes(Nodes, IncludeWithWarnings)),
77                           {ok, {Result, disc}};
78                       {error, _} = Error ->
79                           Error
80                   end
81           end,
82    rabbit_peer_discovery_util:maybe_backend_configured(?BACKEND_CONFIG_KEY, Fun0, Fun1, Fun2).
83
84
85-spec supports_registration() -> boolean().
86
87supports_registration() ->
88    true.
89
90
91-spec register() -> ok | {error, Reason :: string()}.
92register() ->
93  M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
94  case registration_body() of
95    {ok, Body} ->
96      ?LOG_DEBUG(
97         "Consul registration body: ~s", [Body],
98         #{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
99      case rabbit_peer_discovery_httpc:put(get_config_key(consul_scheme, M),
100                                            get_config_key(consul_host, M),
101                                            get_integer_config_key(consul_port, M),
102                                            rabbit_peer_discovery_httpc:build_path([v1, agent, service, register]),
103                                            [],
104                                            maybe_add_acl([]),
105                                            Body) of
106        {ok, _} -> ok;
107        Error   -> Error
108      end;
109    Error -> Error
110  end.
111
112
113-spec unregister() -> ok | {error, Reason :: string()}.
114unregister() ->
115  M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
116  ID = service_id(),
117  ?LOG_DEBUG(
118     "Unregistering with Consul using service ID '~s'", [ID],
119     #{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
120  case rabbit_peer_discovery_httpc:put(get_config_key(consul_scheme, M),
121                                       get_config_key(consul_host, M),
122                                       get_integer_config_key(consul_port, M),
123                                       rabbit_peer_discovery_httpc:build_path([v1, agent, service, deregister, ID]),
124                                       [],
125                                       maybe_add_acl([]),
126                                       []) of
127    {ok, Response} ->
128          ?LOG_INFO(
129             "Consul's response to the unregistration attempt: ~p",
130             [Response],
131             #{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
132          ok;
133    Error   ->
134          ?LOG_INFO(
135             "Failed to unregister service with ID '~s` with Consul: ~p",
136             [ID, Error],
137             #{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
138          Error
139  end.
140
141-spec post_registration() -> ok.
142
143post_registration() ->
144    %% don't wait for one full interval, make
145    %% sure we let Consul know the service is healthy
146    %% right after registration. See rabbitmq/rabbitmq_peer_discovery_consul#8.
147    send_health_check_pass(),
148    ok.
149
150-spec lock(Node :: atom()) -> {ok, Data :: term()} | {error, Reason :: string()}.
151
152lock(Node) ->
153    M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
154    ?LOG_DEBUG(
155       "Effective Consul peer discovery configuration: ~p", [M],
156       #{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
157    case create_session(Node, get_config_key(consul_svc_ttl, M)) of
158        {ok, SessionId} ->
159            TRef = start_session_ttl_updater(SessionId),
160            Now = erlang:system_time(seconds),
161            EndTime = Now + get_config_key(lock_wait_time, M),
162            lock(TRef, SessionId, Now, EndTime);
163        {error, Reason} ->
164            {error, lists:flatten(io_lib:format("Error while creating a session, reason: ~s",
165                                                [Reason]))}
166    end.
167
168-spec unlock({SessionId :: string(), TRef :: timer:tref()}) -> ok.
169
170unlock({SessionId, TRef}) ->
171    timer:cancel(TRef),
172    ?LOG_DEBUG(
173       "Stopped session renewal",
174       #{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
175    case release_lock(SessionId) of
176        {ok, true} ->
177            ok;
178        {ok, false} ->
179            {error, lists:flatten(io_lib:format("Error while releasing the lock, session ~s may have been invalidated", [SessionId]))};
180        {error, _} = Err ->
181            Err
182    end.
183
184%%
185%% Implementation
186%%
187
188-spec get_config_key(Key :: atom(), Map :: #{atom() => peer_discovery_config_value()})
189                    -> peer_discovery_config_value().
190
191get_config_key(Key, Map) ->
192    ?CONFIG_MODULE:get(Key, ?CONFIG_MAPPING, Map).
193
194-spec get_integer_config_key(Key :: atom(), Map :: #{atom() => peer_discovery_config_value()})
195                            -> integer().
196
197get_integer_config_key(Key, Map) ->
198    ?CONFIG_MODULE:get_integer(Key, ?CONFIG_MAPPING, Map).
199
200
201-spec filter_nodes(ConsulResult :: list(), AllowWarning :: atom()) -> list().
202filter_nodes(Nodes, Warn) ->
203  case Warn of
204    true ->
205      lists:filter(fun(Node) ->
206                    Checks = maps:get(<<"Checks">>, Node),
207                    lists:all(fun(Check) ->
208                      lists:member(maps:get(<<"Status">>, Check),
209                                   [<<"passing">>, <<"warning">>])
210                              end,
211                              Checks)
212                   end,
213                   Nodes);
214    false -> Nodes
215  end.
216
217-spec extract_nodes(ConsulResult :: list()) -> list().
218extract_nodes(Data) -> extract_nodes(Data, []).
219
220-spec extract_nodes(ConsulResult :: list(), Nodes :: list())
221    -> list().
222extract_nodes([], Nodes)    -> Nodes;
223extract_nodes([H | T], Nodes) ->
224  Service  = maps:get(<<"Service">>, H),
225  Value    = maps:get(<<"Address">>, Service),
226  NodeName = case ?UTIL_MODULE:as_string(Value) of
227    "" ->
228      NodeData = maps:get(<<"Node">>, H),
229      Node = maps:get(<<"Node">>, NodeData),
230      maybe_add_domain(?UTIL_MODULE:node_name(Node));
231    Address ->
232      ?UTIL_MODULE:node_name(Address)
233  end,
234  extract_nodes(T, lists:merge(Nodes, [NodeName])).
235
236-spec maybe_add_acl(QArgs :: list()) -> list().
237maybe_add_acl(QArgs) ->
238  M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
239  case get_config_key(consul_acl_token, M) of
240    "undefined" -> QArgs;
241    ACL         -> lists:append(QArgs, [{"X-Consul-Token", ACL}])
242  end.
243
244-spec list_nodes_query_args() -> list().
245list_nodes_query_args() ->
246  M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
247  list_nodes_query_args(get_config_key(cluster_name, M)).
248
249-spec list_nodes_query_args(ClusterName :: string()) -> list().
250list_nodes_query_args(Cluster) ->
251  ClusterTag = case Cluster of
252    "default" -> [];
253    _         -> [{tag, Cluster}]
254  end,
255  M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
256  list_nodes_query_args(ClusterTag, get_config_key(consul_include_nodes_with_warnings, M)).
257
258-spec list_nodes_query_args(Args :: list(), AllowWarn :: atom()) -> list().
259list_nodes_query_args(Value, Warn) ->
260    case Warn of
261        true  -> Value;
262        false -> [passing | Value]
263    end.
264
265-spec registration_body() -> {ok, Body :: binary()} | {error, atom()}.
266registration_body() ->
267  Payload = build_registration_body(),
268  registration_body(rabbit_json:try_encode(Payload)).
269
270-spec registration_body(Response :: {ok, Body :: string()} |
271                                    {error, Reason :: atom()})
272  -> {ok, Body :: binary()} | {error, Reason :: atom()}.
273registration_body({ok, Body}) ->
274  {ok, rabbit_data_coercion:to_binary(Body)};
275registration_body({error, Reason}) ->
276  ?LOG_ERROR(
277     "Error serializing the request body: ~p",
278     [Reason],
279     #{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
280  {error, Reason}.
281
282
283-spec build_registration_body() -> list().
284build_registration_body() ->
285  Payload1 = registration_body_add_id(),
286  Payload2 = registration_body_add_name(Payload1),
287  Payload3 = registration_body_maybe_add_address(Payload2),
288  Payload4 = registration_body_add_port(Payload3),
289  Payload5 = registration_body_maybe_add_check(Payload4),
290  Payload6 = registration_body_maybe_add_tag(Payload5),
291  registration_body_maybe_add_meta(Payload6).
292
293-spec registration_body_add_id() -> list().
294registration_body_add_id() ->
295  [{'ID', rabbit_data_coercion:to_atom(service_id())}].
296
297-spec registration_body_add_name(Payload :: list()) -> list().
298registration_body_add_name(Payload) ->
299  M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
300  Name = rabbit_data_coercion:to_atom(get_config_key(consul_svc, M)),
301  lists:append(Payload, [{'Name', Name}]).
302
303-spec registration_body_maybe_add_address(Payload :: list())
304    -> list().
305registration_body_maybe_add_address(Payload) ->
306  registration_body_maybe_add_address(Payload, service_address()).
307
308-spec registration_body_maybe_add_address(Payload :: list(), string())
309    -> list().
310registration_body_maybe_add_address(Payload, "undefined") -> Payload;
311registration_body_maybe_add_address(Payload, Address) ->
312  lists:append(Payload, [{'Address', rabbit_data_coercion:to_atom(Address)}]).
313
314registration_body_maybe_add_check(Payload) ->
315  M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
316  TTL = get_config_key(consul_svc_ttl, M),
317  registration_body_maybe_add_check(Payload, TTL).
318
319-spec registration_body_maybe_add_check(Payload :: list(),
320                                        TTL :: integer() | undefined)
321    -> list().
322registration_body_maybe_add_check(Payload, undefined) ->
323    case registration_body_maybe_add_deregister([]) of
324        [{'DeregisterCriticalServiceAfter', _}]->
325            ?LOG_WARNING(
326               "Can't use Consul's service deregistration feature without "
327               "using TTL. The parameter  will be ignored",
328               #{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
329            Payload;
330
331        _ -> Payload
332    end;
333registration_body_maybe_add_check(Payload, TTL) ->
334    CheckItems = [{'Notes', rabbit_data_coercion:to_atom(?CONSUL_CHECK_NOTES)},
335        {'TTL', rabbit_data_coercion:to_atom(service_ttl(TTL))},
336        {'Status', 'passing'}],
337    Check = [{'Check',  registration_body_maybe_add_deregister(CheckItems)}],
338    lists:append(Payload, Check).
339
340-spec registration_body_add_port(Payload :: list()) -> list().
341registration_body_add_port(Payload) ->
342  M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
343  lists:append(Payload,
344               [{'Port', get_config_key(consul_svc_port, M)}]).
345
346registration_body_maybe_add_deregister(Payload) ->
347    M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
348    Deregister = get_config_key(consul_deregister_after, M),
349    registration_body_maybe_add_deregister(Payload, Deregister).
350
351-spec registration_body_maybe_add_deregister(Payload :: list(),
352    TTL :: integer() | undefined)
353        -> list().
354registration_body_maybe_add_deregister(Payload, undefined) -> Payload;
355registration_body_maybe_add_deregister(Payload, Deregister_After) ->
356    Deregister = {'DeregisterCriticalServiceAfter',
357        rabbit_data_coercion:to_atom(service_ttl(Deregister_After))},
358    Payload ++ [Deregister].
359
360-spec registration_body_maybe_add_tag(Payload :: list()) -> list().
361registration_body_maybe_add_tag(Payload) ->
362  M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
363  Value = get_config_key(cluster_name, M),
364  Tags = ?UTIL_MODULE:as_list(get_config_key(consul_svc_tags, M)),
365  registration_body_maybe_add_tag(Payload, Value, Tags).
366
367-spec registration_body_maybe_add_tag(Payload :: list(),
368                                      ClusterName :: string(),
369                                      Tags :: list())
370    -> list().
371registration_body_maybe_add_tag(Payload, "default", []) -> Payload;
372registration_body_maybe_add_tag(Payload, "default", Tags) ->
373  lists:append(Payload, [{'Tags', [rabbit_data_coercion:to_atom(X) || X <- Tags]}]);
374registration_body_maybe_add_tag(Payload, Cluster, []) ->
375  lists:append(Payload, [{'Tags', [rabbit_data_coercion:to_atom(Cluster)]}]);
376registration_body_maybe_add_tag(Payload, Cluster, Tags) ->
377  lists:append(Payload, [{'Tags', [rabbit_data_coercion:to_atom(Cluster)] ++ [rabbit_data_coercion:to_atom(X) || X <- Tags]}]).
378
379
380-spec registration_body_maybe_add_meta(Payload :: list()) -> list().
381registration_body_maybe_add_meta(Payload) ->
382  M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
383  ClusterName = get_config_key(cluster_name, M),
384  Meta = ?UTIL_MODULE:as_list(get_config_key(consul_svc_meta, M)),
385  registration_body_maybe_add_meta(Payload, ClusterName, Meta).
386
387-spec registration_body_maybe_add_meta(Payload :: list(),
388                                       ClusterName :: string(),
389                                       Meta :: list()) -> list().
390registration_body_maybe_add_meta(Payload, "default", []) ->
391  Payload;
392registration_body_maybe_add_meta(Payload, "default", Meta) ->
393  lists:append(Payload, [{<<"meta">>, Meta}]);
394registration_body_maybe_add_meta(Payload, _ClusterName, []) ->
395  Payload;
396registration_body_maybe_add_meta(Payload, ClusterName, Meta) ->
397  Merged = maps:to_list(maps:merge(#{<<"cluster">> => rabbit_data_coercion:to_binary(ClusterName)}, maps:from_list(Meta))),
398  lists:append(Payload, [{<<"meta">>, Merged}]).
399
400
401-spec validate_addr_parameters(false | true, false | true) -> false | true.
402validate_addr_parameters(false, true) ->
403    ?LOG_WARNING(
404       "The parameter CONSUL_SVC_ADDR_NODENAME"
405       " can be used only if CONSUL_SVC_ADDR_AUTO is true."
406       " CONSUL_SVC_ADDR_NODENAME value will be ignored.",
407       #{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
408    false;
409validate_addr_parameters(_, _) ->
410    true.
411
412
413-spec service_address() -> string().
414service_address() ->
415  M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
416  validate_addr_parameters(get_config_key(consul_svc_addr_auto, M),
417      get_config_key(consul_svc_addr_nodename, M)),
418  service_address(get_config_key(consul_svc_addr, M),
419                  get_config_key(consul_svc_addr_auto, M),
420                  get_config_key(consul_svc_addr_nic, M),
421                  get_config_key(consul_svc_addr_nodename, M)).
422
423
424-spec service_address(Static :: string(),
425                      Auto :: boolean(),
426                      AutoNIC :: string(),
427                      FromNodename :: boolean()) -> string().
428service_address(_, true, "undefined", FromNodename) ->
429  rabbit_peer_discovery_util:node_hostname(FromNodename);
430service_address(Value, false, "undefined", _) ->
431  Value;
432service_address(_, true, NIC, _) ->
433  %% TODO: support IPv6
434  {ok, Addr} = rabbit_peer_discovery_util:nic_ipv4(NIC),
435  Addr;
436%% this combination makes no sense but this is what rabbitmq-autocluster
437%% and this plugin have been allowing for a couple of years, so we keep
438%% this clause around for backwards compatibility.
439%% See rabbitmq/rabbitmq-peer-discovery-consul#12 for details.
440service_address(_, false, NIC, _) ->
441  {ok, Addr} = rabbit_peer_discovery_util:nic_ipv4(NIC),
442  Addr.
443
444
445-spec service_id() -> string().
446service_id() ->
447  M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
448  service_id(get_config_key(consul_svc, M),
449             service_address()).
450
451-spec service_id(Name :: string(), Address :: string()) -> string().
452service_id(Service, "undefined") -> Service;
453service_id(Service, Address) ->
454  string:join([Service, Address], ":").
455
456-spec service_ttl(TTL :: integer()) -> string().
457service_ttl(Value) ->
458  rabbit_peer_discovery_util:as_string(Value) ++ "s".
459
460-spec maybe_add_domain(Domain :: atom()) -> atom().
461maybe_add_domain(Value) ->
462  M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
463  case get_config_key(consul_use_longname, M) of
464      true ->
465          rabbit_data_coercion:to_atom(string:join([atom_to_list(Value),
466                                    "node",
467                                    get_config_key(consul_domain, M)],
468                                   "."));
469      false -> Value
470  end.
471
472
473%%--------------------------------------------------------------------
474%% @doc
475%% Let Consul know that this node is still around
476%% @end
477%%--------------------------------------------------------------------
478
479-spec send_health_check_pass() -> ok.
480
481send_health_check_pass() ->
482  Service = string:join(["service", service_id()], ":"),
483  M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
484  ?LOG_DEBUG(
485     "Running Consul health check",
486     #{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
487  case rabbit_peer_discovery_httpc:put(get_config_key(consul_scheme, M),
488                                       get_config_key(consul_host, M),
489                                       get_integer_config_key(consul_port, M),
490                                       rabbit_peer_discovery_httpc:build_path([v1, agent, check, pass, Service]),
491                                       [],
492                                       maybe_add_acl([]),
493                                       []) of
494    {ok, []} -> ok;
495    {error, "429"} ->
496          %% Too Many Requests, see https://www.consul.io/docs/agent/checks.html
497          ?LOG_WARNING(
498             "Consul responded to a health check with 429 Too Many Requests",
499             #{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
500          ok;
501    {error, "500"} ->
502          ?LOG_WARNING(
503             "Consul responded to a health check with a 500 status, will "
504             "wait and try re-registering",
505             #{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
506          maybe_re_register(wait_for_list_nodes()),
507          ok;
508    {error, Reason} ->
509          ?LOG_ERROR(
510             "Error running Consul health check: ~p",
511             [Reason],
512             #{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
513      ok
514  end.
515
516maybe_re_register({error, Reason}) ->
517    ?LOG_ERROR(
518       "Internal error in Consul while updating health check. "
519       "Cannot obtain list of nodes registered in Consul either: ~p",
520       [Reason],
521       #{domain => ?RMQLOG_DOMAIN_PEER_DIS});
522maybe_re_register({ok, {Members, _NodeType}}) ->
523    maybe_re_register(Members);
524maybe_re_register({ok, Members}) ->
525    maybe_re_register(Members);
526maybe_re_register(Members) ->
527    case lists:member(node(), Members) of
528        true ->
529            ?LOG_ERROR(
530               "Internal error in Consul while updating health check",
531               #{domain => ?RMQLOG_DOMAIN_PEER_DIS});
532        false ->
533            ?LOG_ERROR(
534               "Internal error in Consul while updating health check, "
535               "node is not registered. Re-registering",
536               #{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
537            register()
538    end.
539
540wait_for_list_nodes() ->
541    wait_for_list_nodes(60).
542
543wait_for_list_nodes(N) ->
544    case {list_nodes(), N} of
545        {Reply, 0} ->
546            Reply;
547        {{ok, _} = Reply, _} ->
548            Reply;
549        {{error, _}, _} ->
550            timer:sleep(1000),
551            wait_for_list_nodes(N - 1)
552    end.
553
554%%--------------------------------------------------------------------
555%% @private
556%% @doc
557%% Create a session to be acquired for a common key
558%% @end
559%%--------------------------------------------------------------------
560-spec create_session(string(), pos_integer()) -> {ok, string()} | {error, Reason::string()}.
561create_session(Name, TTL) ->
562    case consul_session_create([], maybe_add_acl([]),
563                               [{'Name', Name},
564                                {'TTL', rabbit_data_coercion:to_atom(service_ttl(TTL))}]) of
565        {ok, Response} ->
566            {ok, get_session_id(Response)};
567        {error, _} = Err ->
568            Err
569    end.
570
571%%--------------------------------------------------------------------
572%% @private
573%% @doc
574%% Create session
575%% @end
576%%--------------------------------------------------------------------
577-spec consul_session_create(Query, Headers, Body) -> {ok, string()} | {error, any()} when
578      Query :: list(),
579      Headers :: [{string(), string()}],
580      Body :: term().
581consul_session_create(Query, Headers, Body) ->
582    M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
583    case serialize_json_body(Body) of
584        {ok, Serialized} ->
585            rabbit_peer_discovery_httpc:put(get_config_key(consul_scheme, M),
586                                            get_config_key(consul_host, M),
587                                            get_integer_config_key(consul_port, M),
588                                            "v1/session/create",
589                                            Query,
590                                            Headers,
591                                            Serialized);
592        {error, _} = Err ->
593            Err
594    end.
595
596%%--------------------------------------------------------------------
597%% @private
598%% @doc
599%% Process the result of JSON encoding the request body payload,
600%% returning the body as a binary() value or the error returned by
601%% the JSON serialization library.
602%% @end
603%%--------------------------------------------------------------------
604-spec serialize_json_body(term()) -> {ok, Payload :: binary()} | {error, atom()}.
605serialize_json_body([]) -> {ok, []};
606serialize_json_body(Payload) ->
607    case rabbit_json:try_encode(Payload) of
608        {ok, Body} -> {ok, Body};
609        {error, Reason} -> {error, Reason}
610    end.
611
612%%--------------------------------------------------------------------
613%% @private
614%% @doc
615%% Extract session ID from Consul response
616%% @end
617%%--------------------------------------------------------------------
618-spec get_session_id(term()) -> string().
619get_session_id(#{<<"ID">> := ID}) -> binary:bin_to_list(ID).
620
621%%--------------------------------------------------------------------
622%% @private
623%% @doc
624%% Start periodically renewing an existing session ttl
625%% @end
626%%--------------------------------------------------------------------
627-spec start_session_ttl_updater(string()) -> ok.
628start_session_ttl_updater(SessionId) ->
629    M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
630    Interval = get_config_key(consul_svc_ttl, M),
631    ?LOG_DEBUG(
632       "Starting session renewal",
633       #{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
634    {ok, TRef} = timer:apply_interval(Interval * 500, ?MODULE,
635                                      session_ttl_update_callback, [SessionId]),
636    TRef.
637
638%%
639%% @doc
640%% Tries to acquire lock. If the lock is held by someone else, waits until it
641%% is released, or too much time has passed
642%% @end
643-spec lock(timer:tref(), string(), pos_integer(), pos_integer()) -> {ok, string()} | {error, string()}.
644lock(TRef, _, Now, EndTime) when EndTime < Now ->
645    timer:cancel(TRef),
646    {error, "Acquiring lock taking too long, bailing out"};
647lock(TRef, SessionId, _, EndTime) ->
648    case acquire_lock(SessionId) of
649        {ok, true} ->
650            {ok, {SessionId, TRef}};
651        {ok, false} ->
652            case get_lock_status() of
653                {ok, {SessionHeld, ModifyIndex}} ->
654                    Wait = max(EndTime - erlang:system_time(seconds), 0),
655                    case wait_for_lock_release(SessionHeld, ModifyIndex, Wait) of
656                        ok ->
657                            lock(TRef, SessionId, erlang:system_time(seconds), EndTime);
658                        {error, Reason} ->
659                            timer:cancel(TRef),
660                            {error, lists:flatten(io_lib:format("Error waiting for lock release, reason: ~s",[Reason]))}
661                    end;
662                {error, Reason} ->
663                    timer:cancel(TRef),
664                    {error, lists:flatten(io_lib:format("Error obtaining lock status, reason: ~s", [Reason]))}
665            end;
666        {error, Reason} ->
667            timer:cancel(TRef),
668            {error, lists:flatten(io_lib:format("Error while acquiring lock, reason: ~s", [Reason]))}
669    end.
670
671%%--------------------------------------------------------------------
672%% @private
673%% @doc
674%% Acquire session for a key
675%% @end
676%%--------------------------------------------------------------------
677-spec acquire_lock(string()) -> {ok, any()} | {error, string()}.
678acquire_lock(SessionId) ->
679    consul_kv_write(startup_lock_path(), [{acquire, SessionId}], maybe_add_acl([]), []).
680
681%%--------------------------------------------------------------------
682%% @private
683%% @doc
684%% Release a previously acquired lock held by a given session
685%% @end
686%%--------------------------------------------------------------------
687-spec release_lock(string()) -> {ok, any()} | {error, string()}.
688release_lock(SessionId) ->
689    consul_kv_write(startup_lock_path(), [{release, SessionId}], maybe_add_acl([]), []).
690
691%%--------------------------------------------------------------------
692%% @private
693%% @doc
694%% Write KV store key value
695%% @end
696%%--------------------------------------------------------------------
697-spec consul_kv_write(Path, Query, Headers, Body) -> {ok, any()} | {error, string()} when
698      Path :: string(),
699      Query :: [{string(), string()}],
700      Headers :: [{string(), string()}],
701      Body :: term().
702consul_kv_write(Path, Query, Headers, Body) ->
703    M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
704    case serialize_json_body(Body) of
705        {ok, Serialized} ->
706            rabbit_peer_discovery_httpc:put(get_config_key(consul_scheme, M),
707                                            get_config_key(consul_host, M),
708                                            get_integer_config_key(consul_port, M),
709                                            "v1/kv/" ++ Path,
710                                            Query,
711                                            Headers,
712                                            Serialized);
713        {error, _} = Err ->
714            Err
715    end.
716
717%%--------------------------------------------------------------------
718%% @private
719%% @doc
720%% Read KV store key value
721%% @end
722%%--------------------------------------------------------------------
723-spec consul_kv_read(Path, Query, Headers) -> {ok, term()} | {error, string()} when
724      Path :: string(),
725      Query :: [{string(), string()}],
726      Headers :: [{string(), string()}].
727consul_kv_read(Path, Query, Headers) ->
728    M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
729    rabbit_peer_discovery_httpc:get(get_config_key(consul_scheme, M),
730                                    get_config_key(consul_host, M),
731                                    get_integer_config_key(consul_port, M),
732                                    "v1/kv/" ++ Path,
733                                    Query,
734                                    Headers,
735                                    []).
736
737%%--------------------------------------------------------------------
738%% @private
739%% @doc
740%% Get lock status
741%% XXX: probably makes sense to wrap output in a record to be
742%% more future-proof
743%% @end
744%%--------------------------------------------------------------------
745-spec get_lock_status() -> {ok, term()} | {error, string()}.
746get_lock_status() ->
747    case consul_kv_read(startup_lock_path(), [], maybe_add_acl([])) of
748        {ok, [KeyData | _]} ->
749            SessionHeld = maps:get(<<"Session">>, KeyData, undefined) =/= undefined,
750            ModifyIndex = maps:get(<<"ModifyIndex">>, KeyData),
751            {ok, {SessionHeld, ModifyIndex}};
752        {error, _} = Err ->
753            Err
754    end.
755
756%%--------------------------------------------------------------------
757%% @private
758%% @doc
759%% Returns consul path for startup lock
760%% @end
761%%--------------------------------------------------------------------
762-spec startup_lock_path() -> string().
763startup_lock_path() ->
764    base_path() ++ "/" ++ "startup_lock".
765
766%%--------------------------------------------------------------------
767%% @private
768%% @doc Return a list of path segments that are the base path for all
769%% consul kv keys related to current cluster.
770%% @end
771%%--------------------------------------------------------------------
772-spec base_path() -> string().
773base_path() ->
774    M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
775    Segments = [get_config_key(consul_lock_prefix, M), get_config_key(cluster_name, M)],
776    rabbit_peer_discovery_httpc:build_path(Segments).
777
778%%--------------------------------------------------------------------
779%% @private
780%% @doc
781%% Wait for lock to be released if it has been acquired by another node
782%% @end
783%%--------------------------------------------------------------------
784-spec wait_for_lock_release(atom(), pos_integer(), pos_integer()) -> ok | {error, string()}.
785wait_for_lock_release(false, _, _) -> ok;
786wait_for_lock_release(_, Index, Wait) ->
787    case consul_kv_read(startup_lock_path(),
788                        [{index, Index}, {wait, service_ttl(Wait)}],
789                        maybe_add_acl([])) of
790        {ok, _}          -> ok;
791        {error, _} = Err -> Err
792    end.
793
794%%--------------------------------------------------------------------
795%% @doc
796%% Renew an existing session
797%% @end
798%%--------------------------------------------------------------------
799-spec session_ttl_update_callback(string()) -> string().
800session_ttl_update_callback(SessionId) ->
801    _ = consul_session_renew(SessionId, [], maybe_add_acl([])),
802    SessionId.
803
804%%--------------------------------------------------------------------
805%% @private
806%% @doc
807%% Renew session TTL
808%% @end
809%%--------------------------------------------------------------------
810-spec consul_session_renew(string(), [{string(), string()}], [{string(), string()}]) -> {ok, term()} | {error, string()}.
811consul_session_renew(SessionId, Query, Headers) ->
812    M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
813    rabbit_peer_discovery_httpc:put(get_config_key(consul_scheme, M),
814                                    get_config_key(consul_host, M),
815                                    get_integer_config_key(consul_port, M),
816                                    rabbit_peer_discovery_httpc:build_path([v1, session, renew, rabbit_data_coercion:to_atom(SessionId)]),
817                                    Query,
818                                    Headers,
819                                    []).
820