1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbit_misc).
9
10-ignore_xref([{maps, get, 2}]).
11
12-include("rabbit.hrl").
13-include("rabbit_framing.hrl").
14-include("rabbit_misc.hrl").
15
16-ifdef(TEST).
17-export([decompose_pid/1, compose_pid/4]).
18-endif.
19
20-export([method_record_type/1, polite_pause/0, polite_pause/1]).
21-export([die/1, frame_error/2, amqp_error/4, quit/1,
22         protocol_error/3, protocol_error/4, protocol_error/1]).
23-export([type_class/1, assert_args_equivalence/4, assert_field_equivalence/4]).
24-export([dirty_read/1]).
25-export([table_lookup/2, set_table_value/4, amqp_table/1, to_amqp_table/1]).
26-export([r/3, r/2, r_arg/4, rs/1]).
27-export([enable_cover/0, report_cover/0]).
28-export([enable_cover/1, report_cover/1]).
29-export([start_cover/1]).
30-export([throw_on_error/2, with_exit_handler/2, is_abnormal_exit/1,
31         filter_exit_map/2]).
32-export([with_user/2]).
33-export([execute_mnesia_transaction/1]).
34-export([execute_mnesia_transaction/2]).
35-export([execute_mnesia_tx_with_tail/1]).
36-export([ensure_ok/2]).
37-export([tcp_name/3, format_inet_error/1]).
38-export([upmap/2, map_in_order/2, utf8_safe/1]).
39-export([table_filter/3]).
40-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
41-export([format/2, format_many/1, format_stderr/2]).
42-export([unfold/2, ceil/1, queue_fold/3]).
43-export([sort_field_table/1]).
44-export([atom_to_binary/1, parse_bool/1, parse_int/1]).
45-export([pid_to_string/1, string_to_pid/1,
46         pid_change_node/2, node_to_fake_pid/1]).
47-export([version_compare/2, version_compare/3]).
48-export([version_minor_equivalent/2, strict_version_minor_equivalent/2]).
49-export([dict_cons/3, orddict_cons/3, maps_cons/3, gb_trees_cons/3]).
50-export([gb_trees_fold/3, gb_trees_foreach/2]).
51-export([all_module_attributes/1,
52         rabbitmq_related_module_attributes/1,
53         module_attributes_from_apps/2,
54         build_acyclic_graph/3]).
55-export([const/1]).
56-export([ntoa/1, ntoab/1]).
57-export([is_process_alive/1]).
58-export([pget/2, pget/3, pupdate/3, pget_or_die/2, pmerge/3, pset/3, plmerge/2]).
59-export([format_message_queue/2]).
60-export([append_rpc_all_nodes/4, append_rpc_all_nodes/5]).
61-export([os_cmd/1]).
62-export([is_os_process_alive/1]).
63-export([gb_sets_difference/2]).
64-export([version/0, otp_release/0, platform_and_version/0, otp_system_version/0,
65         rabbitmq_and_erlang_versions/0, which_applications/0]).
66-export([sequence_error/1]).
67-export([check_expiry/1]).
68-export([base64url/1]).
69-export([interval_operation/5]).
70-export([ensure_timer/4, stop_timer/2, send_after/3, cancel_timer/1]).
71-export([get_parent/0]).
72-export([store_proc_name/1, store_proc_name/2, get_proc_name/0]).
73-export([moving_average/4]).
74-export([escape_html_tags/1, b64decode_or_throw/1]).
75-export([get_env/3]).
76-export([get_channel_operation_timeout/0]).
77-export([random/1]).
78-export([rpc_call/4, rpc_call/5]).
79-export([get_gc_info/1]).
80-export([group_proplists_by/2]).
81
82%% Horrible macro to use in guards
83-define(IS_BENIGN_EXIT(R),
84        R =:= noproc; R =:= noconnection; R =:= nodedown; R =:= normal;
85            R =:= shutdown).
86
87%%----------------------------------------------------------------------------
88
89-export_type([resource_name/0, thunk/1, channel_or_connection_exit/0]).
90
91-type ok_or_error() :: rabbit_types:ok_or_error(any()).
92-type thunk(T) :: fun(() -> T).
93-type resource_name() :: binary().
94-type channel_or_connection_exit()
95      :: rabbit_types:channel_exit() | rabbit_types:connection_exit().
96-type digraph_label() :: term().
97-type graph_vertex_fun() ::
98        fun (({atom(), [term()]}) -> [{digraph:vertex(), digraph_label()}]).
99-type graph_edge_fun() ::
100        fun (({atom(), [term()]}) -> [{digraph:vertex(), digraph:vertex()}]).
101-type tref() :: {'erlang', reference()} | {timer, timer:tref()}.
102
103-spec method_record_type(rabbit_framing:amqp_method_record()) ->
104          rabbit_framing:amqp_method_name().
105-spec polite_pause() -> 'done'.
106-spec polite_pause(non_neg_integer()) -> 'done'.
107-spec die(rabbit_framing:amqp_exception()) -> channel_or_connection_exit().
108
109-spec quit(integer()) -> no_return().
110
111-spec frame_error(rabbit_framing:amqp_method_name(), binary()) ->
112          rabbit_types:connection_exit().
113-spec amqp_error
114        (rabbit_framing:amqp_exception(), string(), [any()],
115         rabbit_framing:amqp_method_name()) ->
116            rabbit_types:amqp_error().
117-spec protocol_error(rabbit_framing:amqp_exception(), string(), [any()]) ->
118          channel_or_connection_exit().
119-spec protocol_error
120        (rabbit_framing:amqp_exception(), string(), [any()],
121         rabbit_framing:amqp_method_name()) ->
122            channel_or_connection_exit().
123-spec protocol_error(rabbit_types:amqp_error()) ->
124          channel_or_connection_exit().
125-spec type_class(rabbit_framing:amqp_field_type()) -> atom().
126-spec assert_args_equivalence
127        (rabbit_framing:amqp_table(), rabbit_framing:amqp_table(),
128         rabbit_types:r(any()), [binary()]) ->
129            'ok' | rabbit_types:connection_exit().
130-spec assert_field_equivalence
131        (any(), any(), rabbit_types:r(any()), atom() | binary()) ->
132            'ok' | rabbit_types:connection_exit().
133-spec equivalence_fail
134        (any(), any(), rabbit_types:r(any()), atom() | binary()) ->
135            rabbit_types:connection_exit().
136-spec dirty_read({atom(), any()}) ->
137          rabbit_types:ok_or_error2(any(), 'not_found').
138-spec table_lookup(rabbit_framing:amqp_table(), binary()) ->
139          'undefined' | {rabbit_framing:amqp_field_type(), any()}.
140-spec set_table_value
141        (rabbit_framing:amqp_table(), binary(), rabbit_framing:amqp_field_type(),
142         rabbit_framing:amqp_value()) ->
143            rabbit_framing:amqp_table().
144-spec r(rabbit_types:vhost(), K) ->
145          rabbit_types:r3(rabbit_types:vhost(), K, '_')
146          when is_subtype(K, atom()).
147-spec r(rabbit_types:vhost() | rabbit_types:r(atom()), K, resource_name()) ->
148          rabbit_types:r3(rabbit_types:vhost(), K, resource_name())
149          when is_subtype(K, atom()).
150-spec r_arg
151        (rabbit_types:vhost() | rabbit_types:r(atom()), K,
152         rabbit_framing:amqp_table(), binary()) ->
153            undefined |
154            rabbit_types:error(
155              {invalid_type, rabbit_framing:amqp_field_type()}) |
156            rabbit_types:r(K) when is_subtype(K, atom()).
157-spec rs(rabbit_types:r(atom())) -> string().
158-spec enable_cover() -> ok_or_error().
159-spec start_cover([{string(), string()} | string()]) -> 'ok'.
160-spec report_cover() -> 'ok'.
161-spec enable_cover([file:filename() | atom()]) -> ok_or_error().
162-spec report_cover([file:filename() | atom()]) -> 'ok'.
163-spec throw_on_error
164        (atom(), thunk(rabbit_types:error(any()) | {ok, A} | A)) -> A.
165-spec with_exit_handler(thunk(A), thunk(A)) -> A.
166-spec is_abnormal_exit(any()) -> boolean().
167-spec filter_exit_map(fun ((A) -> B), [A]) -> [B].
168-spec with_user(rabbit_types:username(), thunk(A)) -> A.
169-spec execute_mnesia_transaction(thunk(A)) -> A.
170-spec execute_mnesia_transaction(thunk(A), fun ((A, boolean()) -> B)) -> B.
171-spec execute_mnesia_tx_with_tail
172        (thunk(fun ((boolean()) -> B))) -> B | (fun ((boolean()) -> B)).
173-spec ensure_ok(ok_or_error(), atom()) -> 'ok'.
174-spec tcp_name(atom(), inet:ip_address(), rabbit_net:ip_port()) ->
175          atom().
176-spec format_inet_error(atom()) -> string().
177-spec upmap(fun ((A) -> B), [A]) -> [B].
178-spec map_in_order(fun ((A) -> B), [A]) -> [B].
179-spec table_filter
180        (fun ((A) -> boolean()), fun ((A, boolean()) -> 'ok'), atom()) -> [A].
181-spec dirty_read_all(atom()) -> [any()].
182-spec dirty_foreach_key(fun ((any()) -> any()), atom()) ->
183          'ok' | 'aborted'.
184-spec dirty_dump_log(file:filename()) -> ok_or_error().
185-spec format(string(), [any()]) -> string().
186-spec format_many([{string(), [any()]}]) -> string().
187-spec format_stderr(string(), [any()]) -> 'ok'.
188-spec unfold (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}.
189-spec ceil(number()) -> integer().
190-spec queue_fold(fun ((any(), B) -> B), B, queue:queue()) -> B.
191-spec sort_field_table(rabbit_framing:amqp_table()) ->
192          rabbit_framing:amqp_table().
193-spec pid_to_string(pid()) -> string().
194-spec string_to_pid(string()) -> pid().
195-spec pid_change_node(pid(), node()) -> pid().
196-spec node_to_fake_pid(atom()) -> pid().
197-spec version_compare(string(), string()) -> 'lt' | 'eq' | 'gt'.
198-spec version_compare
199        (rabbit_semver:version_string(), rabbit_semver:version_string(),
200         ('lt' | 'lte' | 'eq' | 'gte' | 'gt')) -> boolean().
201-spec version_minor_equivalent(rabbit_semver:version_string(), rabbit_semver:version_string()) -> boolean().
202-spec dict_cons(any(), any(), dict:dict()) -> dict:dict().
203-spec orddict_cons(any(), any(), orddict:orddict()) -> orddict:orddict().
204-spec gb_trees_cons(any(), any(), gb_trees:tree()) -> gb_trees:tree().
205-spec gb_trees_fold(fun ((any(), any(), A) -> A), A, gb_trees:tree()) -> A.
206-spec gb_trees_foreach(fun ((any(), any()) -> any()), gb_trees:tree()) ->
207          'ok'.
208-spec all_module_attributes(atom()) -> [{atom(), atom(), [term()]}].
209-spec build_acyclic_graph
210        (graph_vertex_fun(), graph_edge_fun(), [{atom(), [term()]}]) ->
211            rabbit_types:ok_or_error2(
212              digraph:graph(),
213              {'vertex', 'duplicate', digraph:vertex()} |
214              {'edge',
215                ({bad_vertex, digraph:vertex()} |
216                 {bad_edge, [digraph:vertex()]}),
217                digraph:vertex(), digraph:vertex()}).
218-spec const(A) -> thunk(A).
219-spec ntoa(inet:ip_address()) -> string().
220-spec ntoab(inet:ip_address()) -> string().
221-spec is_process_alive(pid()) -> boolean().
222
223-spec pmerge(term(), term(), [term()]) -> [term()].
224-spec plmerge([term()], [term()]) -> [term()].
225-spec pset(term(), term(), [term()]) -> [term()].
226-spec format_message_queue(any(), priority_queue:q()) -> term().
227-spec os_cmd(string()) -> string().
228-spec is_os_process_alive(non_neg_integer()) -> boolean().
229-spec gb_sets_difference(gb_sets:set(), gb_sets:set()) -> gb_sets:set().
230-spec version() -> string().
231-spec otp_release() -> string().
232-spec otp_system_version() -> string().
233-spec platform_and_version() -> string().
234-spec rabbitmq_and_erlang_versions() -> {string(), string()}.
235-spec which_applications() -> [{atom(), string(), string()}].
236-spec sequence_error([({'error', any()} | any())]) ->
237          {'error', any()} | any().
238-spec check_expiry(integer()) -> rabbit_types:ok_or_error(any()).
239-spec base64url(binary()) -> string().
240-spec interval_operation
241        ({atom(), atom(), any()}, float(), non_neg_integer(), non_neg_integer(),
242         non_neg_integer()) ->
243            {any(), non_neg_integer()}.
244-spec ensure_timer(A, non_neg_integer(), non_neg_integer(), any()) -> A.
245-spec stop_timer(A, non_neg_integer()) -> A.
246-spec send_after(non_neg_integer(), pid(), any()) -> tref().
247-spec cancel_timer(tref()) -> 'ok'.
248-spec get_parent() -> pid().
249-spec store_proc_name(atom(), rabbit_types:proc_name()) -> ok.
250-spec store_proc_name(rabbit_types:proc_type_and_name()) -> ok.
251-spec get_proc_name() -> rabbit_types:proc_name().
252-spec moving_average(float(), float(), float(), float() | 'undefined') ->
253          float().
254-spec get_env(atom(), atom(), term())  -> term().
255-spec get_channel_operation_timeout() -> non_neg_integer().
256-spec random(non_neg_integer()) -> non_neg_integer().
257-spec get_gc_info(pid()) -> [any()].
258-spec group_proplists_by(fun((proplists:proplist()) -> any()),
259                         list(proplists:proplist())) -> list(list(proplists:proplist())).
260
261
262%%----------------------------------------------------------------------------
263
264method_record_type(Record) ->
265    element(1, Record).
266
267polite_pause() ->
268    polite_pause(3000).
269
270polite_pause(N) ->
271    receive
272    after N -> done
273    end.
274
275die(Error) ->
276    protocol_error(Error, "~w", [Error]).
277
278frame_error(MethodName, BinaryFields) ->
279    protocol_error(frame_error, "cannot decode ~w", [BinaryFields], MethodName).
280
281amqp_error(Name, ExplanationFormat, Params, Method) ->
282    Explanation = format(ExplanationFormat, Params),
283    #amqp_error{name = Name, explanation = Explanation, method = Method}.
284
285protocol_error(Name, ExplanationFormat, Params) ->
286    protocol_error(Name, ExplanationFormat, Params, none).
287
288protocol_error(Name, ExplanationFormat, Params, Method) ->
289    protocol_error(amqp_error(Name, ExplanationFormat, Params, Method)).
290
291protocol_error(#amqp_error{} = Error) ->
292    exit(Error).
293
294type_class(byte)          -> int;
295type_class(short)         -> int;
296type_class(signedint)     -> int;
297type_class(long)          -> int;
298type_class(decimal)       -> int;
299type_class(unsignedbyte)  -> int;
300type_class(unsignedshort) -> int;
301type_class(unsignedint)   -> int;
302type_class(float)         -> float;
303type_class(double)        -> float;
304type_class(Other)         -> Other.
305
306assert_args_equivalence(Orig, New, Name, Keys) ->
307    [assert_args_equivalence1(Orig, New, Name, Key) || Key <- Keys],
308    ok.
309
310assert_args_equivalence1(Orig, New, Name, Key) ->
311    {Orig1, New1} = {table_lookup(Orig, Key), table_lookup(New, Key)},
312    case {Orig1, New1} of
313        {Same, Same} ->
314            ok;
315        {{OrigType, OrigVal}, {NewType, NewVal}} ->
316            case type_class(OrigType) == type_class(NewType) andalso
317                 OrigVal == NewVal of
318                 true  -> ok;
319                 false -> assert_field_equivalence(OrigVal, NewVal, Name, Key)
320            end;
321        {OrigTypeVal, NewTypeVal} ->
322            assert_field_equivalence(OrigTypeVal, NewTypeVal, Name, Key)
323    end.
324
325%% Classic queues do not necessarily have an x-queue-type field associated with them
326%% so we special-case that scenario here
327%%
328%% Fixes rabbitmq/rabbitmq-common#341
329%%
330assert_field_equivalence(Current, Current, _Name, _Key) ->
331    ok;
332assert_field_equivalence(undefined, {longstr, <<"classic">>}, _Name, <<"x-queue-type">>) ->
333    ok;
334assert_field_equivalence({longstr, <<"classic">>}, undefined, _Name, <<"x-queue-type">>) ->
335    ok;
336assert_field_equivalence(Orig, New, Name, Key) ->
337    equivalence_fail(Orig, New, Name, Key).
338
339equivalence_fail(Orig, New, Name, Key) ->
340    protocol_error(precondition_failed, "inequivalent arg '~s' "
341                   "for ~s: received ~s but current is ~s",
342                   [Key, rs(Name), val(New), val(Orig)]).
343
344val(undefined) ->
345    "none";
346val({Type, Value}) ->
347    ValFmt = case is_binary(Value) of
348                 true  -> "~s";
349                 false -> "~p"
350             end,
351    format("the value '" ++ ValFmt ++ "' of type '~s'", [Value, Type]);
352val(Value) ->
353    format(case is_binary(Value) of
354               true  -> "'~s'";
355               false -> "'~p'"
356           end, [Value]).
357
358%% Normally we'd call mnesia:dirty_read/1 here, but that is quite
359%% expensive due to general mnesia overheads (figuring out table types
360%% and locations, etc). We get away with bypassing these because we
361%% know that the tables we are looking at here
362%% - are not the schema table
363%% - have a local ram copy
364%% - do not have any indices
365dirty_read({Table, Key}) ->
366    case ets:lookup(Table, Key) of
367        [Result] -> {ok, Result};
368        []       -> {error, not_found}
369    end.
370
371%%
372%% Attribute Tables
373%%
374
375table_lookup(Table, Key) ->
376    case lists:keysearch(Key, 1, Table) of
377        {value, {_, TypeBin, ValueBin}} -> {TypeBin, ValueBin};
378        false                           -> undefined
379    end.
380
381set_table_value(Table, Key, Type, Value) ->
382    sort_field_table(
383      lists:keystore(Key, 1, Table, {Key, Type, Value})).
384
385to_amqp_table(M) when is_map(M) ->
386    lists:reverse(maps:fold(fun(K, V, Acc) -> [to_amqp_table_row(K, V)|Acc] end,
387                            [], M));
388to_amqp_table(L) when is_list(L) ->
389    L.
390
391to_amqp_table_row(K, V) ->
392    {T, V2} = type_val(V),
393    {K, T, V2}.
394
395to_amqp_array(L) ->
396    [type_val(I) || I <- L].
397
398type_val(M) when is_map(M)     -> {table,   to_amqp_table(M)};
399type_val(L) when is_list(L)    -> {array,   to_amqp_array(L)};
400type_val(X) when is_binary(X)  -> {longstr, X};
401type_val(X) when is_integer(X) -> {long,    X};
402type_val(X) when is_number(X)  -> {double,  X};
403type_val(true)                 -> {bool, true};
404type_val(false)                -> {bool, false};
405type_val(null)                 -> throw({error, null_not_allowed});
406type_val(X)                    -> throw({error, {unhandled_type, X}}).
407
408amqp_table(unknown)   -> unknown;
409amqp_table(undefined) -> amqp_table([]);
410amqp_table([])        -> #{};
411amqp_table(#{})       -> #{};
412amqp_table(Table)     -> maps:from_list([{Name, amqp_value(Type, Value)} ||
413                                            {Name, Type, Value} <- Table]).
414
415amqp_value(array, Vs)                  -> [amqp_value(T, V) || {T, V} <- Vs];
416amqp_value(table, V)                   -> amqp_table(V);
417amqp_value(decimal, {Before, After})   ->
418    erlang:list_to_float(
419      lists:flatten(io_lib:format("~p.~p", [Before, After])));
420amqp_value(_Type, V) when is_binary(V) -> utf8_safe(V);
421amqp_value(_Type, V)                   -> V.
422
423
424%%
425%% Resources
426%%
427
428r(#resource{virtual_host = VHostPath}, Kind, Name) ->
429    #resource{virtual_host = VHostPath, kind = Kind, name = Name};
430r(VHostPath, Kind, Name) ->
431    #resource{virtual_host = VHostPath, kind = Kind, name = Name}.
432
433r(VHostPath, Kind) ->
434    #resource{virtual_host = VHostPath, kind = Kind, name = '_'}.
435
436r_arg(#resource{virtual_host = VHostPath}, Kind, Table, Key) ->
437    r_arg(VHostPath, Kind, Table, Key);
438r_arg(VHostPath, Kind, Table, Key) ->
439    case table_lookup(Table, Key) of
440        {longstr, NameBin} -> r(VHostPath, Kind, NameBin);
441        undefined          -> undefined;
442        {Type, _}          -> {error, {invalid_type, Type}}
443    end.
444
445rs(#resource{virtual_host = VHostPath, kind = topic, name = Name}) ->
446    format("'~s' in vhost '~s'", [Name, VHostPath]);
447rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) ->
448    format("~s '~s' in vhost '~s'", [Kind, Name, VHostPath]).
449
450enable_cover() -> enable_cover(["."]).
451
452enable_cover(Dirs) ->
453    lists:foldl(fun (Dir, ok) ->
454                        case cover:compile_beam_directory(
455                               filename:join(lists:concat([Dir]),"ebin")) of
456                            {error, _} = Err -> Err;
457                            _                -> ok
458                        end;
459                    (_Dir, Err) ->
460                        Err
461                end, ok, Dirs).
462
463start_cover(NodesS) ->
464    {ok, _} = cover:start([rabbit_nodes_common:make(N) || N <- NodesS]),
465    ok.
466
467report_cover() -> report_cover(["."]).
468
469report_cover(Dirs) -> [report_cover1(lists:concat([Dir])) || Dir <- Dirs], ok.
470
471report_cover1(Root) ->
472    Dir = filename:join(Root, "cover"),
473    ok = filelib:ensure_dir(filename:join(Dir, "junk")),
474    lists:foreach(fun (F) -> file:delete(F) end,
475                  filelib:wildcard(filename:join(Dir, "*.html"))),
476    {ok, SummaryFile} = file:open(filename:join(Dir, "summary.txt"), [write]),
477    {CT, NCT} =
478        lists:foldl(
479          fun (M,{CovTot, NotCovTot}) ->
480                  {ok, {M, {Cov, NotCov}}} = cover:analyze(M, module),
481                  ok = report_coverage_percentage(SummaryFile,
482                                                  Cov, NotCov, M),
483                  {ok,_} = cover:analyze_to_file(
484                             M,
485                             filename:join(Dir, atom_to_list(M) ++ ".html"),
486                             [html]),
487                  {CovTot+Cov, NotCovTot+NotCov}
488          end,
489          {0, 0},
490          lists:sort(cover:modules())),
491    ok = report_coverage_percentage(SummaryFile, CT, NCT, 'TOTAL'),
492    ok = file:close(SummaryFile),
493    ok.
494
495report_coverage_percentage(File, Cov, NotCov, Mod) ->
496    io:fwrite(File, "~6.2f ~p~n",
497              [if
498                   Cov+NotCov > 0 -> 100.0*Cov/(Cov+NotCov);
499                   true -> 100.0
500               end,
501               Mod]).
502
503%% @doc Halts the emulator returning the given status code to the os.
504%% On Windows this function will block indefinitely so as to give the io
505%% subsystem time to flush stdout completely.
506quit(Status) ->
507    case os:type() of
508        {unix,  _} -> halt(Status);
509        {win32, _} -> init:stop(Status),
510                      receive
511                      after infinity -> ok
512                      end
513    end.
514
515throw_on_error(E, Thunk) ->
516    case Thunk() of
517        {error, Reason} -> throw({E, Reason});
518        {ok, Res}       -> Res;
519        Res             -> Res
520    end.
521
522with_exit_handler(Handler, Thunk) ->
523    try
524        Thunk()
525    catch
526        exit:{R, _}      when ?IS_BENIGN_EXIT(R) -> Handler();
527        exit:{{R, _}, _} when ?IS_BENIGN_EXIT(R) -> Handler()
528    end.
529
530is_abnormal_exit(R)      when ?IS_BENIGN_EXIT(R) -> false;
531is_abnormal_exit({R, _}) when ?IS_BENIGN_EXIT(R) -> false;
532is_abnormal_exit(_)                              -> true.
533
534filter_exit_map(F, L) ->
535    Ref = make_ref(),
536    lists:filter(fun (R) -> R =/= Ref end,
537                 [with_exit_handler(
538                    fun () -> Ref end,
539                    fun () -> F(I) end) || I <- L]).
540
541
542with_user(Username, Thunk) ->
543    fun () ->
544            case mnesia:read({rabbit_user, Username}) of
545                [] ->
546                    mnesia:abort({no_such_user, Username});
547                [_U] ->
548                    Thunk()
549            end
550    end.
551
552execute_mnesia_transaction(TxFun) ->
553    %% Making this a sync_transaction allows us to use dirty_read
554    %% elsewhere and get a consistent result even when that read
555    %% executes on a different node.
556    case worker_pool:submit(
557           fun () ->
558                   case mnesia:is_transaction() of
559                       false -> DiskLogBefore = mnesia_dumper:get_log_writes(),
560                                Res = mnesia:sync_transaction(TxFun),
561                                DiskLogAfter  = mnesia_dumper:get_log_writes(),
562                                case DiskLogAfter == DiskLogBefore of
563                                    true  -> file_handle_cache_stats:update(
564                                              mnesia_ram_tx),
565                                             Res;
566                                    false -> file_handle_cache_stats:update(
567                                              mnesia_disk_tx),
568                                             {sync, Res}
569                                end;
570                       true  -> mnesia:sync_transaction(TxFun)
571                   end
572           end, single) of
573        {sync, {atomic,  Result}} -> mnesia_sync:sync(), Result;
574        {sync, {aborted, Reason}} -> throw({error, Reason});
575        {atomic,  Result}         -> Result;
576        {aborted, Reason}         -> throw({error, Reason})
577    end.
578
579%% Like execute_mnesia_transaction/1 with additional Pre- and Post-
580%% commit function
581execute_mnesia_transaction(TxFun, PrePostCommitFun) ->
582    case mnesia:is_transaction() of
583        true  -> throw(unexpected_transaction);
584        false -> ok
585    end,
586    PrePostCommitFun(execute_mnesia_transaction(
587                       fun () ->
588                               Result = TxFun(),
589                               PrePostCommitFun(Result, true),
590                               Result
591                       end), false).
592
593%% Like execute_mnesia_transaction/2, but TxFun is expected to return a
594%% TailFun which gets called (only) immediately after the tx commit
595execute_mnesia_tx_with_tail(TxFun) ->
596    case mnesia:is_transaction() of
597        true  -> execute_mnesia_transaction(TxFun);
598        false -> TailFun = execute_mnesia_transaction(TxFun),
599                 TailFun()
600    end.
601
602ensure_ok(ok, _) -> ok;
603ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}).
604
605tcp_name(Prefix, IPAddress, Port)
606  when is_atom(Prefix) andalso is_number(Port) ->
607    list_to_atom(
608      format("~w_~s:~w", [Prefix, inet_parse:ntoa(IPAddress), Port])).
609
610format_inet_error(E) -> format("~w (~s)", [E, format_inet_error0(E)]).
611
612format_inet_error0(address) -> "cannot connect to host/port";
613format_inet_error0(timeout) -> "timed out";
614format_inet_error0(Error)   -> inet:format_error(Error).
615
616%% base64:decode throws lots of weird errors. Catch and convert to one
617%% that will cause a bad_request.
618b64decode_or_throw(B64) ->
619    try
620        base64:decode(B64)
621    catch error:_ ->
622            throw({error, {not_base64, B64}})
623    end.
624
625utf8_safe(V) ->
626    try
627        _ = xmerl_ucs:from_utf8(V),
628        V
629    catch exit:{ucs, _} ->
630            Enc = split_lines(base64:encode(V)),
631            <<"Not UTF-8, base64 is: ", Enc/binary>>
632    end.
633
634%% MIME enforces a limit on line length of base 64-encoded data to 76 characters.
635split_lines(<<Text:76/binary, Rest/binary>>) ->
636    <<Text/binary, $\n, (split_lines(Rest))/binary>>;
637split_lines(Text) ->
638    Text.
639
640
641%% This is a modified version of Luke Gorrie's pmap -
642%% https://lukego.livejournal.com/6753.html - that doesn't care about
643%% the order in which results are received.
644%%
645%% WARNING: This is is deliberately lightweight rather than robust -- if F
646%% throws, upmap will hang forever, so make sure F doesn't throw!
647upmap(F, L) ->
648    Parent = self(),
649    Ref = make_ref(),
650    [receive {Ref, Result} -> Result end
651     || _ <- [spawn(fun () -> Parent ! {Ref, F(X)} end) || X <- L]].
652
653map_in_order(F, L) ->
654    lists:reverse(
655      lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)).
656
657%% Apply a pre-post-commit function to all entries in a table that
658%% satisfy a predicate, and return those entries.
659%%
660%% We ignore entries that have been modified or removed.
661table_filter(Pred, PrePostCommitFun, TableName) ->
662    lists:foldl(
663      fun (E, Acc) ->
664              case execute_mnesia_transaction(
665                     fun () -> mnesia:match_object(TableName, E, read) =/= []
666                                   andalso Pred(E) end,
667                     fun (false, _Tx) -> false;
668                         (true,   Tx) -> PrePostCommitFun(E, Tx), true
669                     end) of
670                  false -> Acc;
671                  true  -> [E | Acc]
672              end
673      end, [], dirty_read_all(TableName)).
674
675dirty_read_all(TableName) ->
676    mnesia:dirty_select(TableName, [{'$1',[],['$1']}]).
677
678dirty_foreach_key(F, TableName) ->
679    dirty_foreach_key1(F, TableName, mnesia:dirty_first(TableName)).
680
681dirty_foreach_key1(_F, _TableName, '$end_of_table') ->
682    ok;
683dirty_foreach_key1(F, TableName, K) ->
684    case catch mnesia:dirty_next(TableName, K) of
685        {'EXIT', _} ->
686            aborted;
687        NextKey ->
688            F(K),
689            dirty_foreach_key1(F, TableName, NextKey)
690    end.
691
692dirty_dump_log(FileName) ->
693    {ok, LH} = disk_log:open([{name, dirty_dump_log},
694                              {mode, read_only},
695                              {file, FileName}]),
696    dirty_dump_log1(LH, disk_log:chunk(LH, start)),
697    disk_log:close(LH).
698
699dirty_dump_log1(_LH, eof) ->
700    io:format("Done.~n");
701dirty_dump_log1(LH, {K, Terms}) ->
702    io:format("Chunk: ~p~n", [Terms]),
703    dirty_dump_log1(LH, disk_log:chunk(LH, K));
704dirty_dump_log1(LH, {K, Terms, BadBytes}) ->
705    io:format("Bad Chunk, ~p: ~p~n", [BadBytes, Terms]),
706    dirty_dump_log1(LH, disk_log:chunk(LH, K)).
707
708format(Fmt, Args) -> lists:flatten(io_lib:format(Fmt, Args)).
709
710format_many(List) ->
711    lists:flatten([io_lib:format(F ++ "~n", A) || {F, A} <- List]).
712
713format_stderr(Fmt, Args) ->
714    io:format(standard_error, Fmt, Args),
715    ok.
716
717unfold(Fun, Init) ->
718    unfold(Fun, [], Init).
719
720unfold(Fun, Acc, Init) ->
721    case Fun(Init) of
722        {true, E, I} -> unfold(Fun, [E|Acc], I);
723        false -> {Acc, Init}
724    end.
725
726ceil(N) ->
727    T = trunc(N),
728    case N == T of
729        true  -> T;
730        false -> 1 + T
731    end.
732
733parse_bool(<<"true">>)  -> true;
734parse_bool(<<"false">>) -> false;
735parse_bool(true)        -> true;
736parse_bool(false)       -> false;
737parse_bool(undefined)   -> undefined;
738parse_bool(V)           -> throw({error, {not_boolean, V}}).
739
740parse_int(I) when is_integer(I) -> I;
741parse_int(F) when is_number(F)  -> trunc(F);
742parse_int(S)                    -> try
743                                       list_to_integer(binary_to_list(S))
744                                   catch error:badarg ->
745                                           throw({error, {not_integer, S}})
746                                   end.
747
748
749queue_fold(Fun, Init, Q) ->
750    case queue:out(Q) of
751        {empty, _Q}      -> Init;
752        {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1)
753    end.
754
755%% Sorts a list of AMQP 0-9-1 table fields as per the AMQP 0-9-1 spec
756sort_field_table([]) ->
757    [];
758sort_field_table(M) when is_map(M) andalso map_size(M) =:= 0 ->
759    [];
760sort_field_table(Arguments) when is_map(Arguments) ->
761    sort_field_table(maps:to_list(Arguments));
762sort_field_table(Arguments) ->
763    lists:keysort(1, Arguments).
764
765atom_to_binary(A) ->
766    list_to_binary(atom_to_list(A)).
767
768%% This provides a string representation of a pid that is the same
769%% regardless of what node we are running on. The representation also
770%% permits easy identification of the pid's node.
771pid_to_string(Pid) when is_pid(Pid) ->
772    {Node, Cre, Id, Ser} = decompose_pid(Pid),
773    format("<~s.~B.~B.~B>", [Node, Cre, Id, Ser]).
774
775%% inverse of above
776string_to_pid(Str) ->
777    Err = {error, {invalid_pid_syntax, Str}},
778    %% The \ before the trailing $ is only there to keep emacs
779    %% font-lock from getting confused.
780    case re:run(Str, "^<(.*)\\.(\\d+)\\.(\\d+)\\.(\\d+)>\$",
781                [{capture,all_but_first,list}]) of
782        {match, [NodeStr, CreStr, IdStr, SerStr]} ->
783            [Cre, Id, Ser] = lists:map(fun list_to_integer/1,
784                                       [CreStr, IdStr, SerStr]),
785            compose_pid(list_to_atom(NodeStr), Cre, Id, Ser);
786        nomatch ->
787            throw(Err)
788    end.
789
790pid_change_node(Pid, NewNode) ->
791    {_OldNode, Cre, Id, Ser} = decompose_pid(Pid),
792    compose_pid(NewNode, Cre, Id, Ser).
793
794%% node(node_to_fake_pid(Node)) =:= Node.
795node_to_fake_pid(Node) ->
796    compose_pid(Node, 0, 0, 0).
797
798decompose_pid(Pid) when is_pid(Pid) ->
799    %% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and
800    %% 8.7)
801    Node = node(Pid),
802    BinPid0 = term_to_binary(Pid),
803    case BinPid0 of
804        %% NEW_PID_EXT
805        <<131, 88, BinPid/bits>> ->
806            NodeByteSize = byte_size(BinPid0) - 14,
807            <<_NodePrefix:NodeByteSize/binary, Id:32, Ser:32, Cre:32>> = BinPid,
808            {Node, Cre, Id, Ser};
809        %% PID_EXT
810        <<131, 103, BinPid/bits>> ->
811            NodeByteSize = byte_size(BinPid0) - 11,
812            <<_NodePrefix:NodeByteSize/binary, Id:32, Ser:32, Cre:8>> = BinPid,
813            {Node, Cre, Id, Ser}
814    end.
815
816compose_pid(Node, Cre, Id, Ser) ->
817    <<131,NodeEnc/binary>> = term_to_binary(Node),
818    binary_to_term(<<131,88,NodeEnc/binary,Id:32,Ser:32,Cre:32>>).
819
820version_compare(A, B, eq)  -> rabbit_semver:eql(A, B);
821version_compare(A, B, lt)  -> rabbit_semver:lt(A, B);
822version_compare(A, B, lte) -> rabbit_semver:lte(A, B);
823version_compare(A, B, gt)  -> rabbit_semver:gt(A, B);
824version_compare(A, B, gte) -> rabbit_semver:gte(A, B).
825
826version_compare(A, B) ->
827    case version_compare(A, B, lt) of
828        true -> lt;
829        false -> case version_compare(A, B, gt) of
830                     true -> gt;
831                     false -> eq
832                 end
833    end.
834
835%% For versions starting from 3.7.x:
836%% Versions are considered compatible (except for special cases; see
837%% below). The feature flags will determine if they are actually
838%% compatible.
839%%
840%% For versions up-to 3.7.x:
841%% a.b.c and a.b.d match, but a.b.c and a.d.e don't. If
842%% versions do not match that pattern, just compare them.
843%%
844%% Special case for 3.6.6 because it introduced a change to the schema.
845%% e.g. 3.6.6 is not compatible with 3.6.5
846%% This special case can be removed once 3.6.x reaches EOL
847version_minor_equivalent(A, B) ->
848    {{MajA, MinA, PatchA, _}, _} = rabbit_semver:normalize(rabbit_semver:parse(A)),
849    {{MajB, MinB, PatchB, _}, _} = rabbit_semver:normalize(rabbit_semver:parse(B)),
850
851    case {MajA, MinA, MajB, MinB} of
852        {3, 6, 3, 6} ->
853            if
854                PatchA >= 6 -> PatchB >= 6;
855                PatchA < 6  -> PatchB < 6;
856                true -> false
857            end;
858        _
859          when (MajA < 3 orelse (MajA =:= 3 andalso MinA =< 6))
860               orelse
861               (MajB < 3 orelse (MajB =:= 3 andalso MinB =< 6)) ->
862            MajA =:= MajB andalso MinA =:= MinB;
863        _ ->
864            %% Starting with RabbitMQ 3.7.x, we consider this
865            %% minor release series and all subsequent series to
866            %% be possibly compatible, based on just the version.
867            %% The real compatibility check is deferred to the
868            %% rabbit_feature_flags module in rabbitmq-server.
869            true
870    end.
871
872%% This is the same as above except that e.g. 3.7.x and 3.8.x are
873%% considered incompatible (as if there were no feature flags). This is
874%% useful to check plugin compatibility (`broker_versions_requirement`
875%% field in plugins).
876
877strict_version_minor_equivalent(A, B) ->
878    {{MajA, MinA, PatchA, _}, _} = rabbit_semver:normalize(rabbit_semver:parse(A)),
879    {{MajB, MinB, PatchB, _}, _} = rabbit_semver:normalize(rabbit_semver:parse(B)),
880
881    case {MajA, MinA, MajB, MinB} of
882        {3, 6, 3, 6} -> if
883                            PatchA >= 6 -> PatchB >= 6;
884                            PatchA < 6  -> PatchB < 6;
885                            true -> false
886                        end;
887        _            -> MajA =:= MajB andalso MinA =:= MinB
888    end.
889
890dict_cons(Key, Value, Dict) ->
891    dict:update(Key, fun (List) -> [Value | List] end, [Value], Dict).
892
893orddict_cons(Key, Value, Dict) ->
894    orddict:update(Key, fun (List) -> [Value | List] end, [Value], Dict).
895
896maps_cons(Key, Value, Map) ->
897    maps:update_with(Key, fun (List) -> [Value | List] end, [Value], Map).
898
899gb_trees_cons(Key, Value, Tree) ->
900    case gb_trees:lookup(Key, Tree) of
901        {value, Values} -> gb_trees:update(Key, [Value | Values], Tree);
902        none            -> gb_trees:insert(Key, [Value], Tree)
903    end.
904
905gb_trees_fold(Fun, Acc, Tree) ->
906    gb_trees_fold1(Fun, Acc, gb_trees:next(gb_trees:iterator(Tree))).
907
908gb_trees_fold1(_Fun, Acc, none) ->
909    Acc;
910gb_trees_fold1(Fun, Acc, {Key, Val, It}) ->
911    gb_trees_fold1(Fun, Fun(Key, Val, Acc), gb_trees:next(It)).
912
913gb_trees_foreach(Fun, Tree) ->
914    gb_trees_fold(fun (Key, Val, Acc) -> Fun(Key, Val), Acc end, ok, Tree).
915
916module_attributes(Module) ->
917    try
918        Module:module_info(attributes)
919    catch
920        _:undef ->
921            io:format("WARNING: module ~p not found, so not scanned for boot steps.~n",
922                      [Module]),
923            []
924    end.
925
926all_module_attributes(Name) ->
927    Apps = [App || {App, _, _} <- application:loaded_applications()],
928    module_attributes_from_apps(Name, Apps).
929
930rabbitmq_related_module_attributes(Name) ->
931    Apps = rabbitmq_related_apps(),
932    module_attributes_from_apps(Name, Apps).
933
934rabbitmq_related_apps() ->
935    [App
936     || {App, _, _} <- application:loaded_applications(),
937        %% Only select RabbitMQ-related applications.
938        App =:= rabbit_common orelse
939        App =:= rabbitmq_prelaunch orelse
940        App =:= rabbit orelse
941        lists:member(
942          rabbit,
943          element(2, application:get_key(App, applications)))].
944
945module_attributes_from_apps(Name, Apps) ->
946    Targets =
947        lists:usort(
948          lists:append(
949            [[{App, Module} || Module <- Modules] ||
950                App           <- Apps,
951                {ok, Modules} <- [application:get_key(App, modules)]])),
952    lists:foldl(
953      fun ({App, Module}, Acc) ->
954              case lists:append([Atts || {N, Atts} <- module_attributes(Module),
955                                         N =:= Name]) of
956                  []   -> Acc;
957                  Atts -> [{App, Module, Atts} | Acc]
958              end
959      end, [], Targets).
960
961build_acyclic_graph(VertexFun, EdgeFun, Graph) ->
962    G = digraph:new([acyclic]),
963    try
964        _ = [case digraph:vertex(G, Vertex) of
965                 false -> digraph:add_vertex(G, Vertex, Label);
966                 _     -> ok = throw({graph_error, {vertex, duplicate, Vertex}})
967             end || GraphElem       <- Graph,
968                    {Vertex, Label} <- VertexFun(GraphElem)],
969        [case digraph:add_edge(G, From, To) of
970             {error, E} -> throw({graph_error, {edge, E, From, To}});
971             _          -> ok
972         end || GraphElem  <- Graph,
973                {From, To} <- EdgeFun(GraphElem)],
974        {ok, G}
975    catch {graph_error, Reason} ->
976            true = digraph:delete(G),
977            {error, Reason}
978    end.
979
980const(X) -> fun () -> X end.
981
982%% Format IPv4-mapped IPv6 addresses as IPv4, since they're what we see
983%% when IPv6 is enabled but not used (i.e. 99% of the time).
984ntoa({0,0,0,0,0,16#ffff,AB,CD}) ->
985    inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256});
986ntoa(IP) ->
987    inet_parse:ntoa(IP).
988
989ntoab(IP) ->
990    Str = ntoa(IP),
991    case string:str(Str, ":") of
992        0 -> Str;
993        _ -> "[" ++ Str ++ "]"
994    end.
995
996%% We try to avoid reconnecting to down nodes here; this is used in a
997%% loop in rabbit_amqqueue:on_node_down/1 and any delays we incur
998%% would be bad news.
999%%
1000%% See also rabbit_mnesia:is_process_alive/1 which also requires the
1001%% process be in the same running cluster as us (i.e. not partitioned
1002%% or some random node).
1003is_process_alive(Pid) when node(Pid) =:= node() ->
1004    erlang:is_process_alive(Pid);
1005is_process_alive(Pid) ->
1006    Node = node(Pid),
1007    lists:member(Node, [node() | nodes(connected)]) andalso
1008        rpc:call(Node, erlang, is_process_alive, [Pid]) =:= true.
1009
1010-spec pget(term(), list() | map()) -> term().
1011pget(K, M) when is_map(M) ->
1012    case maps:find(K, M) of
1013        {ok, V} ->
1014            V;
1015        _ ->
1016            undefined
1017    end;
1018
1019pget(K, P) ->
1020    case lists:keyfind(K, 1, P) of
1021        {K, V} ->
1022            V;
1023        _ ->
1024            undefined
1025    end.
1026
1027-spec pget(term(), list() | map(), term()) -> term().
1028pget(K, M, D) when is_map(M) ->
1029    case maps:find(K, M) of
1030        {ok, V} ->
1031            V;
1032        _ ->
1033            D
1034    end;
1035
1036pget(K, P, D) ->
1037    case lists:keyfind(K, 1, P) of
1038        {K, V} ->
1039            V;
1040        _ ->
1041            D
1042    end.
1043
1044-spec pget_or_die(term(), list() | map()) -> term() | no_return().
1045pget_or_die(K, M) when is_map(M) ->
1046    case maps:find(K, M) of
1047        error   -> exit({error, key_missing, K});
1048        {ok, V} -> V
1049    end;
1050
1051pget_or_die(K, P) ->
1052    case proplists:get_value(K, P) of
1053        undefined -> exit({error, key_missing, K});
1054        V         -> V
1055    end.
1056
1057pupdate(K, UpdateFun, P) ->
1058    case lists:keyfind(K, 1, P) of
1059        {K, V} ->
1060            pset(K, UpdateFun(V), P);
1061        _ ->
1062            undefined
1063    end.
1064
1065%% property merge
1066pmerge(Key, Val, List) ->
1067      case proplists:is_defined(Key, List) of
1068              true -> List;
1069              _    -> [{Key, Val} | List]
1070      end.
1071
1072%% proplists merge
1073plmerge(P1, P2) ->
1074    %% Value from P1 suppresses value from P2
1075    maps:to_list(maps:merge(maps:from_list(P2),
1076                            maps:from_list(P1))).
1077
1078%% groups a list of proplists by a key function
1079group_proplists_by(KeyFun, ListOfPropLists) ->
1080    Res = lists:foldl(fun(P, Agg) ->
1081                        Key = KeyFun(P),
1082                        Val = case maps:find(Key, Agg) of
1083                            {ok, O} -> [P|O];
1084                            error   -> [P]
1085                        end,
1086                        maps:put(Key, Val, Agg)
1087                      end, #{}, ListOfPropLists),
1088    [ X || {_, X} <- maps:to_list(Res)].
1089
1090pset(Key, Value, List) -> [{Key, Value} | proplists:delete(Key, List)].
1091
1092format_message_queue(_Opt, MQ) ->
1093    Len = priority_queue:len(MQ),
1094    {Len,
1095     case Len > 100 of
1096         false -> priority_queue:to_list(MQ);
1097         true  -> {summary,
1098                   maps:to_list(
1099                     lists:foldl(
1100                       fun ({P, V}, Counts) ->
1101                               maps:update_with(
1102                                 {P, format_message_queue_entry(V)},
1103                                 fun(Old) -> Old + 1 end, 1, Counts)
1104                       end, maps:new(), priority_queue:to_list(MQ)))}
1105     end}.
1106
1107format_message_queue_entry(V) when is_atom(V) ->
1108    V;
1109format_message_queue_entry(V) when is_tuple(V) ->
1110    list_to_tuple([format_message_queue_entry(E) || E <- tuple_to_list(V)]);
1111format_message_queue_entry(_V) ->
1112    '_'.
1113
1114%% Same as rpc:multicall/4 but concatenates all results.
1115%% M, F, A is expected to return a list. If it does not,
1116%% its return value will be wrapped in a list.
1117-spec append_rpc_all_nodes([node()], atom(), atom(), [any()]) -> [any()].
1118append_rpc_all_nodes(Nodes, M, F, A) ->
1119    do_append_rpc_all_nodes(Nodes, M, F, A, ?RPC_INFINITE_TIMEOUT).
1120
1121-spec append_rpc_all_nodes([node()], atom(), atom(), [any()], timeout()) -> [any()].
1122append_rpc_all_nodes(Nodes, M, F, A, Timeout) ->
1123    do_append_rpc_all_nodes(Nodes, M, F, A, Timeout).
1124
1125do_append_rpc_all_nodes(Nodes, M, F, A, ?RPC_INFINITE_TIMEOUT) ->
1126    {ResL, _} = rpc:multicall(Nodes, M, F, A, ?RPC_INFINITE_TIMEOUT),
1127    process_rpc_multicall_result(ResL);
1128do_append_rpc_all_nodes(Nodes, M, F, A, Timeout) ->
1129    {ResL, _} = try
1130                    rpc:multicall(Nodes, M, F, A, Timeout)
1131                catch
1132                    error:internal_error -> {[], Nodes}
1133                end,
1134    process_rpc_multicall_result(ResL).
1135
1136process_rpc_multicall_result(ResL) ->
1137    lists:append([case Res of
1138                      {badrpc, _}         -> [];
1139                      Xs when is_list(Xs) -> Xs;
1140                      %% wrap it in a list
1141                      Other               -> [Other]
1142                  end || Res <- ResL]).
1143
1144os_cmd(Command) ->
1145    case os:type() of
1146        {win32, _} ->
1147            %% Clink workaround; see
1148            %% https://code.google.com/p/clink/issues/detail?id=141
1149            os:cmd(" " ++ Command);
1150        _ ->
1151            %% Don't just return "/bin/sh: <cmd>: not found" if not found
1152            Exec = hd(string:tokens(Command, " ")),
1153            case os:find_executable(Exec) of
1154                false -> throw({command_not_found, Exec});
1155                _     -> os:cmd(Command)
1156            end
1157    end.
1158
1159is_os_process_alive(Pid) ->
1160    with_os([{unix, fun () ->
1161                            run_ps(Pid) =:= 0
1162                    end},
1163             {win32, fun () ->
1164                             PidS = rabbit_data_coercion:to_list(Pid),
1165                             case os:find_executable("tasklist.exe") of
1166                                 false ->
1167                                     Cmd =
1168                                     format(
1169                                       "PowerShell -Command "
1170                                       "\"(Get-Process -Id ~s).ProcessName\"",
1171                                       [PidS]),
1172                                     Res =
1173                                     os_cmd(Cmd ++ " 2>&1") -- [$\r, $\n],
1174                                     case Res of
1175                                         "erl"  -> true;
1176                                         "werl" -> true;
1177                                         _      -> false
1178                                     end;
1179                                 _ ->
1180                                     Cmd =
1181                                     "tasklist /nh /fi "
1182                                     "\"pid eq " ++ PidS ++ "\"",
1183                                     Res = os_cmd(Cmd ++ " 2>&1"),
1184                                     match =:= re:run(Res,
1185                                                      "erl\\.exe",
1186                                                      [{capture, none}])
1187                             end
1188                     end}]).
1189
1190with_os(Handlers) ->
1191    {OsFamily, _} = os:type(),
1192    case proplists:get_value(OsFamily, Handlers) of
1193        undefined -> throw({unsupported_os, OsFamily});
1194        Handler   -> Handler()
1195    end.
1196
1197run_ps(Pid) ->
1198    Cmd  = "ps -p " ++ rabbit_data_coercion:to_list(Pid),
1199    Port = erlang:open_port({spawn, Cmd},
1200                            [exit_status, {line, 16384},
1201                             use_stdio, stderr_to_stdout]),
1202    exit_loop(Port).
1203
1204exit_loop(Port) ->
1205    receive
1206        {Port, {exit_status, Rc}} -> Rc;
1207        {Port, _}                 -> exit_loop(Port)
1208    end.
1209
1210gb_sets_difference(S1, S2) ->
1211    gb_sets:fold(fun gb_sets:delete_any/2, S1, S2).
1212
1213version() ->
1214    {ok, VSN} = application:get_key(rabbit, vsn),
1215    VSN.
1216
1217%% See https://www.erlang.org/doc/system_principles/versions.html
1218otp_release() ->
1219    File = filename:join([code:root_dir(), "releases",
1220                          erlang:system_info(otp_release), "OTP_VERSION"]),
1221    case file:read_file(File) of
1222        {ok, VerBin} ->
1223            %% 17.0 or later, we need the file for the minor version
1224            string:strip(binary_to_list(VerBin), both, $\n);
1225        {error, _} ->
1226            %% R16B03 or earlier (no file, otp_release is correct)
1227            %% or we couldn't read the file (so this is best we can do)
1228            erlang:system_info(otp_release)
1229    end.
1230
1231platform_and_version() ->
1232    string:join(["Erlang/OTP", otp_release()], " ").
1233
1234otp_system_version() ->
1235    string:strip(erlang:system_info(system_version), both, $\n).
1236
1237rabbitmq_and_erlang_versions() ->
1238  {version(), otp_release()}.
1239
1240%% application:which_applications(infinity) is dangerous, since it can
1241%% cause deadlocks on shutdown. So we have to use a timeout variant,
1242%% but w/o creating spurious timeout errors. The timeout value is twice
1243%% that of gen_server:call/2.
1244which_applications() ->
1245    try
1246        application:which_applications(10000)
1247    catch
1248        exit:{timeout, _} -> []
1249    end.
1250
1251sequence_error([T])                      -> T;
1252sequence_error([{error, _} = Error | _]) -> Error;
1253sequence_error([_ | Rest])               -> sequence_error(Rest).
1254
1255check_expiry(N) when N < 0                 -> {error, {value_negative, N}};
1256check_expiry(_N)                           -> ok.
1257
1258base64url(In) ->
1259    lists:reverse(lists:foldl(fun ($\+, Acc) -> [$\- | Acc];
1260                                  ($\/, Acc) -> [$\_ | Acc];
1261                                  ($\=, Acc) -> Acc;
1262                                  (Chr, Acc) -> [Chr | Acc]
1263                              end, [], base64:encode_to_string(In))).
1264
1265%% Ideally, you'd want Fun to run every IdealInterval. but you don't
1266%% want it to take more than MaxRatio of IdealInterval. So if it takes
1267%% more then you want to run it less often. So we time how long it
1268%% takes to run, and then suggest how long you should wait before
1269%% running it again with a user specified max interval. Times are in millis.
1270interval_operation({M, F, A}, MaxRatio, MaxInterval, IdealInterval, LastInterval) ->
1271    {Micros, Res} = timer:tc(M, F, A),
1272    {Res, case {Micros > 1000 * (MaxRatio * IdealInterval),
1273                Micros > 1000 * (MaxRatio * LastInterval)} of
1274              {true,  true}  -> lists:min([MaxInterval,
1275                                           round(LastInterval * 1.5)]);
1276              {true,  false} -> LastInterval;
1277              {false, false} -> lists:max([IdealInterval,
1278                                           round(LastInterval / 1.5)])
1279          end}.
1280
1281ensure_timer(State, Idx, After, Msg) ->
1282    case element(Idx, State) of
1283        undefined -> TRef = send_after(After, self(), Msg),
1284                     setelement(Idx, State, TRef);
1285        _         -> State
1286    end.
1287
1288stop_timer(State, Idx) ->
1289    case element(Idx, State) of
1290        undefined -> State;
1291        TRef      -> cancel_timer(TRef),
1292                     setelement(Idx, State, undefined)
1293    end.
1294
1295%% timer:send_after/3 goes through a single timer process but allows
1296%% long delays. erlang:send_after/3 does not have a bottleneck but
1297%% only allows max 2^32-1 millis.
1298-define(MAX_ERLANG_SEND_AFTER, 4294967295).
1299send_after(Millis, Pid, Msg) when Millis > ?MAX_ERLANG_SEND_AFTER ->
1300    {ok, Ref} = timer:send_after(Millis, Pid, Msg),
1301    {timer, Ref};
1302send_after(Millis, Pid, Msg) ->
1303    {erlang, erlang:send_after(Millis, Pid, Msg)}.
1304
1305cancel_timer({erlang, Ref}) -> _ = erlang:cancel_timer(Ref),
1306                               ok;
1307cancel_timer({timer, Ref})  -> {ok, cancel} = timer:cancel(Ref),
1308                               ok.
1309
1310store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}).
1311store_proc_name(TypeProcName)   -> put(process_name, TypeProcName).
1312
1313get_proc_name() ->
1314    case get(process_name) of
1315        undefined ->
1316            undefined;
1317        {_Type, Name} ->
1318            {ok, Name}
1319    end.
1320
1321%% application:get_env/3 is only available in R16B01 or later.
1322get_env(Application, Key, Def) ->
1323    case application:get_env(Application, Key) of
1324        {ok, Val} -> Val;
1325        undefined -> Def
1326    end.
1327
1328get_channel_operation_timeout() ->
1329    %% Default channel_operation_timeout set to net_ticktime + 10s to
1330    %% give allowance for any down messages to be received first,
1331    %% whenever it is used for cross-node calls with timeouts.
1332    Default = (net_kernel:get_net_ticktime() + 10) * 1000,
1333    application:get_env(rabbit, channel_operation_timeout, Default).
1334
1335moving_average(_Time, _HalfLife, Next, undefined) ->
1336    Next;
1337%% We want the Weight to decrease as Time goes up (since Weight is the
1338%% weight for the current sample, not the new one), so that the moving
1339%% average decays at the same speed regardless of how long the time is
1340%% between samplings. So we want Weight = math:exp(Something), where
1341%% Something turns out to be negative.
1342%%
1343%% We want to determine Something here in terms of the Time taken
1344%% since the last measurement, and a HalfLife. So we want Weight =
1345%% math:exp(Time * Constant / HalfLife). What should Constant be? We
1346%% want Weight to be 0.5 when Time = HalfLife.
1347%%
1348%% Plug those numbers in and you get 0.5 = math:exp(Constant). Take
1349%% the log of each side and you get math:log(0.5) = Constant.
1350moving_average(Time,  HalfLife,  Next, Current) ->
1351    Weight = math:exp(Time * math:log(0.5) / HalfLife),
1352    Next * (1 - Weight) + Current * Weight.
1353
1354random(N) ->
1355    rand:uniform(N).
1356
1357-spec escape_html_tags(string()) -> binary().
1358
1359escape_html_tags(S) ->
1360    escape_html_tags(rabbit_data_coercion:to_list(S), []).
1361
1362
1363-spec escape_html_tags(string(), string()) -> binary().
1364
1365escape_html_tags([], Acc) ->
1366    rabbit_data_coercion:to_binary(lists:reverse(Acc));
1367escape_html_tags("<" ++ Rest, Acc) ->
1368    escape_html_tags(Rest, lists:reverse("&lt;", Acc));
1369escape_html_tags(">" ++ Rest, Acc) ->
1370    escape_html_tags(Rest, lists:reverse("&gt;", Acc));
1371escape_html_tags("&" ++ Rest, Acc) ->
1372    escape_html_tags(Rest, lists:reverse("&amp;", Acc));
1373escape_html_tags([C | Rest], Acc) ->
1374    escape_html_tags(Rest, [C | Acc]).
1375
1376%% If the server we are talking to has non-standard net_ticktime, and
1377%% our connection lasts a while, we could get disconnected because of
1378%% a timeout unless we set our ticktime to be the same. So let's do
1379%% that.
1380%% TODO: do not use an infinite timeout!
1381-spec rpc_call(node(), atom(), atom(), [any()]) -> any() | {badrpc, term()}.
1382rpc_call(Node, Mod, Fun, Args) ->
1383    rpc_call(Node, Mod, Fun, Args, ?RPC_INFINITE_TIMEOUT).
1384
1385-spec rpc_call(node(), atom(), atom(), [any()], infinity | non_neg_integer()) -> any() | {badrpc, term()}.
1386rpc_call(Node, Mod, Fun, Args, Timeout) ->
1387    case rpc:call(Node, net_kernel, get_net_ticktime, [], Timeout) of
1388        {badrpc, _} = E -> E;
1389        ignored ->
1390            rpc:call(Node, Mod, Fun, Args, Timeout);
1391        {ongoing_change_to, NewValue} ->
1392            _ = net_kernel:set_net_ticktime(NewValue, 0),
1393            rpc:call(Node, Mod, Fun, Args, Timeout);
1394        Time            ->
1395            _ = net_kernel:set_net_ticktime(Time, 0),
1396            rpc:call(Node, Mod, Fun, Args, Timeout)
1397    end.
1398
1399get_gc_info(Pid) ->
1400    rabbit_runtime:get_gc_info(Pid).
1401
1402%% -------------------------------------------------------------------------
1403%% Begin copypasta from gen_server2.erl
1404
1405get_parent() ->
1406    case get('$ancestors') of
1407        [Parent | _] when is_pid (Parent) -> Parent;
1408        [Parent | _] when is_atom(Parent) -> name_to_pid(Parent);
1409        _ -> exit(process_was_not_started_by_proc_lib)
1410    end.
1411
1412name_to_pid(Name) ->
1413    case whereis(Name) of
1414        undefined -> case whereis_name(Name) of
1415                         undefined -> exit(could_not_find_registered_name);
1416                         Pid       -> Pid
1417                     end;
1418        Pid       -> Pid
1419    end.
1420
1421whereis_name(Name) ->
1422    case ets:lookup(global_names, Name) of
1423        [{_Name, Pid, _Method, _RPid, _Ref}] ->
1424            if node(Pid) == node() -> case erlang:is_process_alive(Pid) of
1425                                          true  -> Pid;
1426                                          false -> undefined
1427                                      end;
1428               true                -> Pid
1429            end;
1430        [] -> undefined
1431    end.
1432
1433%% End copypasta from gen_server2.erl
1434%% -------------------------------------------------------------------------
1435