1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_misc). 9 10-ignore_xref([{maps, get, 2}]). 11 12-include("rabbit.hrl"). 13-include("rabbit_framing.hrl"). 14-include("rabbit_misc.hrl"). 15 16-ifdef(TEST). 17-export([decompose_pid/1, compose_pid/4]). 18-endif. 19 20-export([method_record_type/1, polite_pause/0, polite_pause/1]). 21-export([die/1, frame_error/2, amqp_error/4, quit/1, 22 protocol_error/3, protocol_error/4, protocol_error/1]). 23-export([type_class/1, assert_args_equivalence/4, assert_field_equivalence/4]). 24-export([dirty_read/1]). 25-export([table_lookup/2, set_table_value/4, amqp_table/1, to_amqp_table/1]). 26-export([r/3, r/2, r_arg/4, rs/1]). 27-export([enable_cover/0, report_cover/0]). 28-export([enable_cover/1, report_cover/1]). 29-export([start_cover/1]). 30-export([throw_on_error/2, with_exit_handler/2, is_abnormal_exit/1, 31 filter_exit_map/2]). 32-export([with_user/2]). 33-export([execute_mnesia_transaction/1]). 34-export([execute_mnesia_transaction/2]). 35-export([execute_mnesia_tx_with_tail/1]). 36-export([ensure_ok/2]). 37-export([tcp_name/3, format_inet_error/1]). 38-export([upmap/2, map_in_order/2, utf8_safe/1]). 39-export([table_filter/3]). 40-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). 41-export([format/2, format_many/1, format_stderr/2]). 42-export([unfold/2, ceil/1, queue_fold/3]). 43-export([sort_field_table/1]). 44-export([atom_to_binary/1, parse_bool/1, parse_int/1]). 45-export([pid_to_string/1, string_to_pid/1, 46 pid_change_node/2, node_to_fake_pid/1]). 47-export([version_compare/2, version_compare/3]). 48-export([version_minor_equivalent/2, strict_version_minor_equivalent/2]). 49-export([dict_cons/3, orddict_cons/3, maps_cons/3, gb_trees_cons/3]). 50-export([gb_trees_fold/3, gb_trees_foreach/2]). 51-export([all_module_attributes/1, 52 rabbitmq_related_module_attributes/1, 53 module_attributes_from_apps/2, 54 build_acyclic_graph/3]). 55-export([const/1]). 56-export([ntoa/1, ntoab/1]). 57-export([is_process_alive/1]). 58-export([pget/2, pget/3, pupdate/3, pget_or_die/2, pmerge/3, pset/3, plmerge/2]). 59-export([format_message_queue/2]). 60-export([append_rpc_all_nodes/4, append_rpc_all_nodes/5]). 61-export([os_cmd/1]). 62-export([is_os_process_alive/1]). 63-export([gb_sets_difference/2]). 64-export([version/0, otp_release/0, platform_and_version/0, otp_system_version/0, 65 rabbitmq_and_erlang_versions/0, which_applications/0]). 66-export([sequence_error/1]). 67-export([check_expiry/1]). 68-export([base64url/1]). 69-export([interval_operation/5]). 70-export([ensure_timer/4, stop_timer/2, send_after/3, cancel_timer/1]). 71-export([get_parent/0]). 72-export([store_proc_name/1, store_proc_name/2, get_proc_name/0]). 73-export([moving_average/4]). 74-export([escape_html_tags/1, b64decode_or_throw/1]). 75-export([get_env/3]). 76-export([get_channel_operation_timeout/0]). 77-export([random/1]). 78-export([rpc_call/4, rpc_call/5]). 79-export([get_gc_info/1]). 80-export([group_proplists_by/2]). 81 82%% Horrible macro to use in guards 83-define(IS_BENIGN_EXIT(R), 84 R =:= noproc; R =:= noconnection; R =:= nodedown; R =:= normal; 85 R =:= shutdown). 86 87%%---------------------------------------------------------------------------- 88 89-export_type([resource_name/0, thunk/1, channel_or_connection_exit/0]). 90 91-type ok_or_error() :: rabbit_types:ok_or_error(any()). 92-type thunk(T) :: fun(() -> T). 93-type resource_name() :: binary(). 94-type channel_or_connection_exit() 95 :: rabbit_types:channel_exit() | rabbit_types:connection_exit(). 96-type digraph_label() :: term(). 97-type graph_vertex_fun() :: 98 fun (({atom(), [term()]}) -> [{digraph:vertex(), digraph_label()}]). 99-type graph_edge_fun() :: 100 fun (({atom(), [term()]}) -> [{digraph:vertex(), digraph:vertex()}]). 101-type tref() :: {'erlang', reference()} | {timer, timer:tref()}. 102 103-spec method_record_type(rabbit_framing:amqp_method_record()) -> 104 rabbit_framing:amqp_method_name(). 105-spec polite_pause() -> 'done'. 106-spec polite_pause(non_neg_integer()) -> 'done'. 107-spec die(rabbit_framing:amqp_exception()) -> channel_or_connection_exit(). 108 109-spec quit(integer()) -> no_return(). 110 111-spec frame_error(rabbit_framing:amqp_method_name(), binary()) -> 112 rabbit_types:connection_exit(). 113-spec amqp_error 114 (rabbit_framing:amqp_exception(), string(), [any()], 115 rabbit_framing:amqp_method_name()) -> 116 rabbit_types:amqp_error(). 117-spec protocol_error(rabbit_framing:amqp_exception(), string(), [any()]) -> 118 channel_or_connection_exit(). 119-spec protocol_error 120 (rabbit_framing:amqp_exception(), string(), [any()], 121 rabbit_framing:amqp_method_name()) -> 122 channel_or_connection_exit(). 123-spec protocol_error(rabbit_types:amqp_error()) -> 124 channel_or_connection_exit(). 125-spec type_class(rabbit_framing:amqp_field_type()) -> atom(). 126-spec assert_args_equivalence 127 (rabbit_framing:amqp_table(), rabbit_framing:amqp_table(), 128 rabbit_types:r(any()), [binary()]) -> 129 'ok' | rabbit_types:connection_exit(). 130-spec assert_field_equivalence 131 (any(), any(), rabbit_types:r(any()), atom() | binary()) -> 132 'ok' | rabbit_types:connection_exit(). 133-spec equivalence_fail 134 (any(), any(), rabbit_types:r(any()), atom() | binary()) -> 135 rabbit_types:connection_exit(). 136-spec dirty_read({atom(), any()}) -> 137 rabbit_types:ok_or_error2(any(), 'not_found'). 138-spec table_lookup(rabbit_framing:amqp_table(), binary()) -> 139 'undefined' | {rabbit_framing:amqp_field_type(), any()}. 140-spec set_table_value 141 (rabbit_framing:amqp_table(), binary(), rabbit_framing:amqp_field_type(), 142 rabbit_framing:amqp_value()) -> 143 rabbit_framing:amqp_table(). 144-spec r(rabbit_types:vhost(), K) -> 145 rabbit_types:r3(rabbit_types:vhost(), K, '_') 146 when is_subtype(K, atom()). 147-spec r(rabbit_types:vhost() | rabbit_types:r(atom()), K, resource_name()) -> 148 rabbit_types:r3(rabbit_types:vhost(), K, resource_name()) 149 when is_subtype(K, atom()). 150-spec r_arg 151 (rabbit_types:vhost() | rabbit_types:r(atom()), K, 152 rabbit_framing:amqp_table(), binary()) -> 153 undefined | 154 rabbit_types:error( 155 {invalid_type, rabbit_framing:amqp_field_type()}) | 156 rabbit_types:r(K) when is_subtype(K, atom()). 157-spec rs(rabbit_types:r(atom())) -> string(). 158-spec enable_cover() -> ok_or_error(). 159-spec start_cover([{string(), string()} | string()]) -> 'ok'. 160-spec report_cover() -> 'ok'. 161-spec enable_cover([file:filename() | atom()]) -> ok_or_error(). 162-spec report_cover([file:filename() | atom()]) -> 'ok'. 163-spec throw_on_error 164 (atom(), thunk(rabbit_types:error(any()) | {ok, A} | A)) -> A. 165-spec with_exit_handler(thunk(A), thunk(A)) -> A. 166-spec is_abnormal_exit(any()) -> boolean(). 167-spec filter_exit_map(fun ((A) -> B), [A]) -> [B]. 168-spec with_user(rabbit_types:username(), thunk(A)) -> A. 169-spec execute_mnesia_transaction(thunk(A)) -> A. 170-spec execute_mnesia_transaction(thunk(A), fun ((A, boolean()) -> B)) -> B. 171-spec execute_mnesia_tx_with_tail 172 (thunk(fun ((boolean()) -> B))) -> B | (fun ((boolean()) -> B)). 173-spec ensure_ok(ok_or_error(), atom()) -> 'ok'. 174-spec tcp_name(atom(), inet:ip_address(), rabbit_net:ip_port()) -> 175 atom(). 176-spec format_inet_error(atom()) -> string(). 177-spec upmap(fun ((A) -> B), [A]) -> [B]. 178-spec map_in_order(fun ((A) -> B), [A]) -> [B]. 179-spec table_filter 180 (fun ((A) -> boolean()), fun ((A, boolean()) -> 'ok'), atom()) -> [A]. 181-spec dirty_read_all(atom()) -> [any()]. 182-spec dirty_foreach_key(fun ((any()) -> any()), atom()) -> 183 'ok' | 'aborted'. 184-spec dirty_dump_log(file:filename()) -> ok_or_error(). 185-spec format(string(), [any()]) -> string(). 186-spec format_many([{string(), [any()]}]) -> string(). 187-spec format_stderr(string(), [any()]) -> 'ok'. 188-spec unfold (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}. 189-spec ceil(number()) -> integer(). 190-spec queue_fold(fun ((any(), B) -> B), B, queue:queue()) -> B. 191-spec sort_field_table(rabbit_framing:amqp_table()) -> 192 rabbit_framing:amqp_table(). 193-spec pid_to_string(pid()) -> string(). 194-spec string_to_pid(string()) -> pid(). 195-spec pid_change_node(pid(), node()) -> pid(). 196-spec node_to_fake_pid(atom()) -> pid(). 197-spec version_compare(string(), string()) -> 'lt' | 'eq' | 'gt'. 198-spec version_compare 199 (rabbit_semver:version_string(), rabbit_semver:version_string(), 200 ('lt' | 'lte' | 'eq' | 'gte' | 'gt')) -> boolean(). 201-spec version_minor_equivalent(rabbit_semver:version_string(), rabbit_semver:version_string()) -> boolean(). 202-spec dict_cons(any(), any(), dict:dict()) -> dict:dict(). 203-spec orddict_cons(any(), any(), orddict:orddict()) -> orddict:orddict(). 204-spec gb_trees_cons(any(), any(), gb_trees:tree()) -> gb_trees:tree(). 205-spec gb_trees_fold(fun ((any(), any(), A) -> A), A, gb_trees:tree()) -> A. 206-spec gb_trees_foreach(fun ((any(), any()) -> any()), gb_trees:tree()) -> 207 'ok'. 208-spec all_module_attributes(atom()) -> [{atom(), atom(), [term()]}]. 209-spec build_acyclic_graph 210 (graph_vertex_fun(), graph_edge_fun(), [{atom(), [term()]}]) -> 211 rabbit_types:ok_or_error2( 212 digraph:graph(), 213 {'vertex', 'duplicate', digraph:vertex()} | 214 {'edge', 215 ({bad_vertex, digraph:vertex()} | 216 {bad_edge, [digraph:vertex()]}), 217 digraph:vertex(), digraph:vertex()}). 218-spec const(A) -> thunk(A). 219-spec ntoa(inet:ip_address()) -> string(). 220-spec ntoab(inet:ip_address()) -> string(). 221-spec is_process_alive(pid()) -> boolean(). 222 223-spec pmerge(term(), term(), [term()]) -> [term()]. 224-spec plmerge([term()], [term()]) -> [term()]. 225-spec pset(term(), term(), [term()]) -> [term()]. 226-spec format_message_queue(any(), priority_queue:q()) -> term(). 227-spec os_cmd(string()) -> string(). 228-spec is_os_process_alive(non_neg_integer()) -> boolean(). 229-spec gb_sets_difference(gb_sets:set(), gb_sets:set()) -> gb_sets:set(). 230-spec version() -> string(). 231-spec otp_release() -> string(). 232-spec otp_system_version() -> string(). 233-spec platform_and_version() -> string(). 234-spec rabbitmq_and_erlang_versions() -> {string(), string()}. 235-spec which_applications() -> [{atom(), string(), string()}]. 236-spec sequence_error([({'error', any()} | any())]) -> 237 {'error', any()} | any(). 238-spec check_expiry(integer()) -> rabbit_types:ok_or_error(any()). 239-spec base64url(binary()) -> string(). 240-spec interval_operation 241 ({atom(), atom(), any()}, float(), non_neg_integer(), non_neg_integer(), 242 non_neg_integer()) -> 243 {any(), non_neg_integer()}. 244-spec ensure_timer(A, non_neg_integer(), non_neg_integer(), any()) -> A. 245-spec stop_timer(A, non_neg_integer()) -> A. 246-spec send_after(non_neg_integer(), pid(), any()) -> tref(). 247-spec cancel_timer(tref()) -> 'ok'. 248-spec get_parent() -> pid(). 249-spec store_proc_name(atom(), rabbit_types:proc_name()) -> ok. 250-spec store_proc_name(rabbit_types:proc_type_and_name()) -> ok. 251-spec get_proc_name() -> rabbit_types:proc_name(). 252-spec moving_average(float(), float(), float(), float() | 'undefined') -> 253 float(). 254-spec get_env(atom(), atom(), term()) -> term(). 255-spec get_channel_operation_timeout() -> non_neg_integer(). 256-spec random(non_neg_integer()) -> non_neg_integer(). 257-spec get_gc_info(pid()) -> [any()]. 258-spec group_proplists_by(fun((proplists:proplist()) -> any()), 259 list(proplists:proplist())) -> list(list(proplists:proplist())). 260 261 262%%---------------------------------------------------------------------------- 263 264method_record_type(Record) -> 265 element(1, Record). 266 267polite_pause() -> 268 polite_pause(3000). 269 270polite_pause(N) -> 271 receive 272 after N -> done 273 end. 274 275die(Error) -> 276 protocol_error(Error, "~w", [Error]). 277 278frame_error(MethodName, BinaryFields) -> 279 protocol_error(frame_error, "cannot decode ~w", [BinaryFields], MethodName). 280 281amqp_error(Name, ExplanationFormat, Params, Method) -> 282 Explanation = format(ExplanationFormat, Params), 283 #amqp_error{name = Name, explanation = Explanation, method = Method}. 284 285protocol_error(Name, ExplanationFormat, Params) -> 286 protocol_error(Name, ExplanationFormat, Params, none). 287 288protocol_error(Name, ExplanationFormat, Params, Method) -> 289 protocol_error(amqp_error(Name, ExplanationFormat, Params, Method)). 290 291protocol_error(#amqp_error{} = Error) -> 292 exit(Error). 293 294type_class(byte) -> int; 295type_class(short) -> int; 296type_class(signedint) -> int; 297type_class(long) -> int; 298type_class(decimal) -> int; 299type_class(unsignedbyte) -> int; 300type_class(unsignedshort) -> int; 301type_class(unsignedint) -> int; 302type_class(float) -> float; 303type_class(double) -> float; 304type_class(Other) -> Other. 305 306assert_args_equivalence(Orig, New, Name, Keys) -> 307 [assert_args_equivalence1(Orig, New, Name, Key) || Key <- Keys], 308 ok. 309 310assert_args_equivalence1(Orig, New, Name, Key) -> 311 {Orig1, New1} = {table_lookup(Orig, Key), table_lookup(New, Key)}, 312 case {Orig1, New1} of 313 {Same, Same} -> 314 ok; 315 {{OrigType, OrigVal}, {NewType, NewVal}} -> 316 case type_class(OrigType) == type_class(NewType) andalso 317 OrigVal == NewVal of 318 true -> ok; 319 false -> assert_field_equivalence(OrigVal, NewVal, Name, Key) 320 end; 321 {OrigTypeVal, NewTypeVal} -> 322 assert_field_equivalence(OrigTypeVal, NewTypeVal, Name, Key) 323 end. 324 325%% Classic queues do not necessarily have an x-queue-type field associated with them 326%% so we special-case that scenario here 327%% 328%% Fixes rabbitmq/rabbitmq-common#341 329%% 330assert_field_equivalence(Current, Current, _Name, _Key) -> 331 ok; 332assert_field_equivalence(undefined, {longstr, <<"classic">>}, _Name, <<"x-queue-type">>) -> 333 ok; 334assert_field_equivalence({longstr, <<"classic">>}, undefined, _Name, <<"x-queue-type">>) -> 335 ok; 336assert_field_equivalence(Orig, New, Name, Key) -> 337 equivalence_fail(Orig, New, Name, Key). 338 339equivalence_fail(Orig, New, Name, Key) -> 340 protocol_error(precondition_failed, "inequivalent arg '~s' " 341 "for ~s: received ~s but current is ~s", 342 [Key, rs(Name), val(New), val(Orig)]). 343 344val(undefined) -> 345 "none"; 346val({Type, Value}) -> 347 ValFmt = case is_binary(Value) of 348 true -> "~s"; 349 false -> "~p" 350 end, 351 format("the value '" ++ ValFmt ++ "' of type '~s'", [Value, Type]); 352val(Value) -> 353 format(case is_binary(Value) of 354 true -> "'~s'"; 355 false -> "'~p'" 356 end, [Value]). 357 358%% Normally we'd call mnesia:dirty_read/1 here, but that is quite 359%% expensive due to general mnesia overheads (figuring out table types 360%% and locations, etc). We get away with bypassing these because we 361%% know that the tables we are looking at here 362%% - are not the schema table 363%% - have a local ram copy 364%% - do not have any indices 365dirty_read({Table, Key}) -> 366 case ets:lookup(Table, Key) of 367 [Result] -> {ok, Result}; 368 [] -> {error, not_found} 369 end. 370 371%% 372%% Attribute Tables 373%% 374 375table_lookup(Table, Key) -> 376 case lists:keysearch(Key, 1, Table) of 377 {value, {_, TypeBin, ValueBin}} -> {TypeBin, ValueBin}; 378 false -> undefined 379 end. 380 381set_table_value(Table, Key, Type, Value) -> 382 sort_field_table( 383 lists:keystore(Key, 1, Table, {Key, Type, Value})). 384 385to_amqp_table(M) when is_map(M) -> 386 lists:reverse(maps:fold(fun(K, V, Acc) -> [to_amqp_table_row(K, V)|Acc] end, 387 [], M)); 388to_amqp_table(L) when is_list(L) -> 389 L. 390 391to_amqp_table_row(K, V) -> 392 {T, V2} = type_val(V), 393 {K, T, V2}. 394 395to_amqp_array(L) -> 396 [type_val(I) || I <- L]. 397 398type_val(M) when is_map(M) -> {table, to_amqp_table(M)}; 399type_val(L) when is_list(L) -> {array, to_amqp_array(L)}; 400type_val(X) when is_binary(X) -> {longstr, X}; 401type_val(X) when is_integer(X) -> {long, X}; 402type_val(X) when is_number(X) -> {double, X}; 403type_val(true) -> {bool, true}; 404type_val(false) -> {bool, false}; 405type_val(null) -> throw({error, null_not_allowed}); 406type_val(X) -> throw({error, {unhandled_type, X}}). 407 408amqp_table(unknown) -> unknown; 409amqp_table(undefined) -> amqp_table([]); 410amqp_table([]) -> #{}; 411amqp_table(#{}) -> #{}; 412amqp_table(Table) -> maps:from_list([{Name, amqp_value(Type, Value)} || 413 {Name, Type, Value} <- Table]). 414 415amqp_value(array, Vs) -> [amqp_value(T, V) || {T, V} <- Vs]; 416amqp_value(table, V) -> amqp_table(V); 417amqp_value(decimal, {Before, After}) -> 418 erlang:list_to_float( 419 lists:flatten(io_lib:format("~p.~p", [Before, After]))); 420amqp_value(_Type, V) when is_binary(V) -> utf8_safe(V); 421amqp_value(_Type, V) -> V. 422 423 424%% 425%% Resources 426%% 427 428r(#resource{virtual_host = VHostPath}, Kind, Name) -> 429 #resource{virtual_host = VHostPath, kind = Kind, name = Name}; 430r(VHostPath, Kind, Name) -> 431 #resource{virtual_host = VHostPath, kind = Kind, name = Name}. 432 433r(VHostPath, Kind) -> 434 #resource{virtual_host = VHostPath, kind = Kind, name = '_'}. 435 436r_arg(#resource{virtual_host = VHostPath}, Kind, Table, Key) -> 437 r_arg(VHostPath, Kind, Table, Key); 438r_arg(VHostPath, Kind, Table, Key) -> 439 case table_lookup(Table, Key) of 440 {longstr, NameBin} -> r(VHostPath, Kind, NameBin); 441 undefined -> undefined; 442 {Type, _} -> {error, {invalid_type, Type}} 443 end. 444 445rs(#resource{virtual_host = VHostPath, kind = topic, name = Name}) -> 446 format("'~s' in vhost '~s'", [Name, VHostPath]); 447rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) -> 448 format("~s '~s' in vhost '~s'", [Kind, Name, VHostPath]). 449 450enable_cover() -> enable_cover(["."]). 451 452enable_cover(Dirs) -> 453 lists:foldl(fun (Dir, ok) -> 454 case cover:compile_beam_directory( 455 filename:join(lists:concat([Dir]),"ebin")) of 456 {error, _} = Err -> Err; 457 _ -> ok 458 end; 459 (_Dir, Err) -> 460 Err 461 end, ok, Dirs). 462 463start_cover(NodesS) -> 464 {ok, _} = cover:start([rabbit_nodes_common:make(N) || N <- NodesS]), 465 ok. 466 467report_cover() -> report_cover(["."]). 468 469report_cover(Dirs) -> [report_cover1(lists:concat([Dir])) || Dir <- Dirs], ok. 470 471report_cover1(Root) -> 472 Dir = filename:join(Root, "cover"), 473 ok = filelib:ensure_dir(filename:join(Dir, "junk")), 474 lists:foreach(fun (F) -> file:delete(F) end, 475 filelib:wildcard(filename:join(Dir, "*.html"))), 476 {ok, SummaryFile} = file:open(filename:join(Dir, "summary.txt"), [write]), 477 {CT, NCT} = 478 lists:foldl( 479 fun (M,{CovTot, NotCovTot}) -> 480 {ok, {M, {Cov, NotCov}}} = cover:analyze(M, module), 481 ok = report_coverage_percentage(SummaryFile, 482 Cov, NotCov, M), 483 {ok,_} = cover:analyze_to_file( 484 M, 485 filename:join(Dir, atom_to_list(M) ++ ".html"), 486 [html]), 487 {CovTot+Cov, NotCovTot+NotCov} 488 end, 489 {0, 0}, 490 lists:sort(cover:modules())), 491 ok = report_coverage_percentage(SummaryFile, CT, NCT, 'TOTAL'), 492 ok = file:close(SummaryFile), 493 ok. 494 495report_coverage_percentage(File, Cov, NotCov, Mod) -> 496 io:fwrite(File, "~6.2f ~p~n", 497 [if 498 Cov+NotCov > 0 -> 100.0*Cov/(Cov+NotCov); 499 true -> 100.0 500 end, 501 Mod]). 502 503%% @doc Halts the emulator returning the given status code to the os. 504%% On Windows this function will block indefinitely so as to give the io 505%% subsystem time to flush stdout completely. 506quit(Status) -> 507 case os:type() of 508 {unix, _} -> halt(Status); 509 {win32, _} -> init:stop(Status), 510 receive 511 after infinity -> ok 512 end 513 end. 514 515throw_on_error(E, Thunk) -> 516 case Thunk() of 517 {error, Reason} -> throw({E, Reason}); 518 {ok, Res} -> Res; 519 Res -> Res 520 end. 521 522with_exit_handler(Handler, Thunk) -> 523 try 524 Thunk() 525 catch 526 exit:{R, _} when ?IS_BENIGN_EXIT(R) -> Handler(); 527 exit:{{R, _}, _} when ?IS_BENIGN_EXIT(R) -> Handler() 528 end. 529 530is_abnormal_exit(R) when ?IS_BENIGN_EXIT(R) -> false; 531is_abnormal_exit({R, _}) when ?IS_BENIGN_EXIT(R) -> false; 532is_abnormal_exit(_) -> true. 533 534filter_exit_map(F, L) -> 535 Ref = make_ref(), 536 lists:filter(fun (R) -> R =/= Ref end, 537 [with_exit_handler( 538 fun () -> Ref end, 539 fun () -> F(I) end) || I <- L]). 540 541 542with_user(Username, Thunk) -> 543 fun () -> 544 case mnesia:read({rabbit_user, Username}) of 545 [] -> 546 mnesia:abort({no_such_user, Username}); 547 [_U] -> 548 Thunk() 549 end 550 end. 551 552execute_mnesia_transaction(TxFun) -> 553 %% Making this a sync_transaction allows us to use dirty_read 554 %% elsewhere and get a consistent result even when that read 555 %% executes on a different node. 556 case worker_pool:submit( 557 fun () -> 558 case mnesia:is_transaction() of 559 false -> DiskLogBefore = mnesia_dumper:get_log_writes(), 560 Res = mnesia:sync_transaction(TxFun), 561 DiskLogAfter = mnesia_dumper:get_log_writes(), 562 case DiskLogAfter == DiskLogBefore of 563 true -> file_handle_cache_stats:update( 564 mnesia_ram_tx), 565 Res; 566 false -> file_handle_cache_stats:update( 567 mnesia_disk_tx), 568 {sync, Res} 569 end; 570 true -> mnesia:sync_transaction(TxFun) 571 end 572 end, single) of 573 {sync, {atomic, Result}} -> mnesia_sync:sync(), Result; 574 {sync, {aborted, Reason}} -> throw({error, Reason}); 575 {atomic, Result} -> Result; 576 {aborted, Reason} -> throw({error, Reason}) 577 end. 578 579%% Like execute_mnesia_transaction/1 with additional Pre- and Post- 580%% commit function 581execute_mnesia_transaction(TxFun, PrePostCommitFun) -> 582 case mnesia:is_transaction() of 583 true -> throw(unexpected_transaction); 584 false -> ok 585 end, 586 PrePostCommitFun(execute_mnesia_transaction( 587 fun () -> 588 Result = TxFun(), 589 PrePostCommitFun(Result, true), 590 Result 591 end), false). 592 593%% Like execute_mnesia_transaction/2, but TxFun is expected to return a 594%% TailFun which gets called (only) immediately after the tx commit 595execute_mnesia_tx_with_tail(TxFun) -> 596 case mnesia:is_transaction() of 597 true -> execute_mnesia_transaction(TxFun); 598 false -> TailFun = execute_mnesia_transaction(TxFun), 599 TailFun() 600 end. 601 602ensure_ok(ok, _) -> ok; 603ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}). 604 605tcp_name(Prefix, IPAddress, Port) 606 when is_atom(Prefix) andalso is_number(Port) -> 607 list_to_atom( 608 format("~w_~s:~w", [Prefix, inet_parse:ntoa(IPAddress), Port])). 609 610format_inet_error(E) -> format("~w (~s)", [E, format_inet_error0(E)]). 611 612format_inet_error0(address) -> "cannot connect to host/port"; 613format_inet_error0(timeout) -> "timed out"; 614format_inet_error0(Error) -> inet:format_error(Error). 615 616%% base64:decode throws lots of weird errors. Catch and convert to one 617%% that will cause a bad_request. 618b64decode_or_throw(B64) -> 619 try 620 base64:decode(B64) 621 catch error:_ -> 622 throw({error, {not_base64, B64}}) 623 end. 624 625utf8_safe(V) -> 626 try 627 _ = xmerl_ucs:from_utf8(V), 628 V 629 catch exit:{ucs, _} -> 630 Enc = split_lines(base64:encode(V)), 631 <<"Not UTF-8, base64 is: ", Enc/binary>> 632 end. 633 634%% MIME enforces a limit on line length of base 64-encoded data to 76 characters. 635split_lines(<<Text:76/binary, Rest/binary>>) -> 636 <<Text/binary, $\n, (split_lines(Rest))/binary>>; 637split_lines(Text) -> 638 Text. 639 640 641%% This is a modified version of Luke Gorrie's pmap - 642%% https://lukego.livejournal.com/6753.html - that doesn't care about 643%% the order in which results are received. 644%% 645%% WARNING: This is is deliberately lightweight rather than robust -- if F 646%% throws, upmap will hang forever, so make sure F doesn't throw! 647upmap(F, L) -> 648 Parent = self(), 649 Ref = make_ref(), 650 [receive {Ref, Result} -> Result end 651 || _ <- [spawn(fun () -> Parent ! {Ref, F(X)} end) || X <- L]]. 652 653map_in_order(F, L) -> 654 lists:reverse( 655 lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)). 656 657%% Apply a pre-post-commit function to all entries in a table that 658%% satisfy a predicate, and return those entries. 659%% 660%% We ignore entries that have been modified or removed. 661table_filter(Pred, PrePostCommitFun, TableName) -> 662 lists:foldl( 663 fun (E, Acc) -> 664 case execute_mnesia_transaction( 665 fun () -> mnesia:match_object(TableName, E, read) =/= [] 666 andalso Pred(E) end, 667 fun (false, _Tx) -> false; 668 (true, Tx) -> PrePostCommitFun(E, Tx), true 669 end) of 670 false -> Acc; 671 true -> [E | Acc] 672 end 673 end, [], dirty_read_all(TableName)). 674 675dirty_read_all(TableName) -> 676 mnesia:dirty_select(TableName, [{'$1',[],['$1']}]). 677 678dirty_foreach_key(F, TableName) -> 679 dirty_foreach_key1(F, TableName, mnesia:dirty_first(TableName)). 680 681dirty_foreach_key1(_F, _TableName, '$end_of_table') -> 682 ok; 683dirty_foreach_key1(F, TableName, K) -> 684 case catch mnesia:dirty_next(TableName, K) of 685 {'EXIT', _} -> 686 aborted; 687 NextKey -> 688 F(K), 689 dirty_foreach_key1(F, TableName, NextKey) 690 end. 691 692dirty_dump_log(FileName) -> 693 {ok, LH} = disk_log:open([{name, dirty_dump_log}, 694 {mode, read_only}, 695 {file, FileName}]), 696 dirty_dump_log1(LH, disk_log:chunk(LH, start)), 697 disk_log:close(LH). 698 699dirty_dump_log1(_LH, eof) -> 700 io:format("Done.~n"); 701dirty_dump_log1(LH, {K, Terms}) -> 702 io:format("Chunk: ~p~n", [Terms]), 703 dirty_dump_log1(LH, disk_log:chunk(LH, K)); 704dirty_dump_log1(LH, {K, Terms, BadBytes}) -> 705 io:format("Bad Chunk, ~p: ~p~n", [BadBytes, Terms]), 706 dirty_dump_log1(LH, disk_log:chunk(LH, K)). 707 708format(Fmt, Args) -> lists:flatten(io_lib:format(Fmt, Args)). 709 710format_many(List) -> 711 lists:flatten([io_lib:format(F ++ "~n", A) || {F, A} <- List]). 712 713format_stderr(Fmt, Args) -> 714 io:format(standard_error, Fmt, Args), 715 ok. 716 717unfold(Fun, Init) -> 718 unfold(Fun, [], Init). 719 720unfold(Fun, Acc, Init) -> 721 case Fun(Init) of 722 {true, E, I} -> unfold(Fun, [E|Acc], I); 723 false -> {Acc, Init} 724 end. 725 726ceil(N) -> 727 T = trunc(N), 728 case N == T of 729 true -> T; 730 false -> 1 + T 731 end. 732 733parse_bool(<<"true">>) -> true; 734parse_bool(<<"false">>) -> false; 735parse_bool(true) -> true; 736parse_bool(false) -> false; 737parse_bool(undefined) -> undefined; 738parse_bool(V) -> throw({error, {not_boolean, V}}). 739 740parse_int(I) when is_integer(I) -> I; 741parse_int(F) when is_number(F) -> trunc(F); 742parse_int(S) -> try 743 list_to_integer(binary_to_list(S)) 744 catch error:badarg -> 745 throw({error, {not_integer, S}}) 746 end. 747 748 749queue_fold(Fun, Init, Q) -> 750 case queue:out(Q) of 751 {empty, _Q} -> Init; 752 {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1) 753 end. 754 755%% Sorts a list of AMQP 0-9-1 table fields as per the AMQP 0-9-1 spec 756sort_field_table([]) -> 757 []; 758sort_field_table(M) when is_map(M) andalso map_size(M) =:= 0 -> 759 []; 760sort_field_table(Arguments) when is_map(Arguments) -> 761 sort_field_table(maps:to_list(Arguments)); 762sort_field_table(Arguments) -> 763 lists:keysort(1, Arguments). 764 765atom_to_binary(A) -> 766 list_to_binary(atom_to_list(A)). 767 768%% This provides a string representation of a pid that is the same 769%% regardless of what node we are running on. The representation also 770%% permits easy identification of the pid's node. 771pid_to_string(Pid) when is_pid(Pid) -> 772 {Node, Cre, Id, Ser} = decompose_pid(Pid), 773 format("<~s.~B.~B.~B>", [Node, Cre, Id, Ser]). 774 775%% inverse of above 776string_to_pid(Str) -> 777 Err = {error, {invalid_pid_syntax, Str}}, 778 %% The \ before the trailing $ is only there to keep emacs 779 %% font-lock from getting confused. 780 case re:run(Str, "^<(.*)\\.(\\d+)\\.(\\d+)\\.(\\d+)>\$", 781 [{capture,all_but_first,list}]) of 782 {match, [NodeStr, CreStr, IdStr, SerStr]} -> 783 [Cre, Id, Ser] = lists:map(fun list_to_integer/1, 784 [CreStr, IdStr, SerStr]), 785 compose_pid(list_to_atom(NodeStr), Cre, Id, Ser); 786 nomatch -> 787 throw(Err) 788 end. 789 790pid_change_node(Pid, NewNode) -> 791 {_OldNode, Cre, Id, Ser} = decompose_pid(Pid), 792 compose_pid(NewNode, Cre, Id, Ser). 793 794%% node(node_to_fake_pid(Node)) =:= Node. 795node_to_fake_pid(Node) -> 796 compose_pid(Node, 0, 0, 0). 797 798decompose_pid(Pid) when is_pid(Pid) -> 799 %% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and 800 %% 8.7) 801 Node = node(Pid), 802 BinPid0 = term_to_binary(Pid), 803 case BinPid0 of 804 %% NEW_PID_EXT 805 <<131, 88, BinPid/bits>> -> 806 NodeByteSize = byte_size(BinPid0) - 14, 807 <<_NodePrefix:NodeByteSize/binary, Id:32, Ser:32, Cre:32>> = BinPid, 808 {Node, Cre, Id, Ser}; 809 %% PID_EXT 810 <<131, 103, BinPid/bits>> -> 811 NodeByteSize = byte_size(BinPid0) - 11, 812 <<_NodePrefix:NodeByteSize/binary, Id:32, Ser:32, Cre:8>> = BinPid, 813 {Node, Cre, Id, Ser} 814 end. 815 816compose_pid(Node, Cre, Id, Ser) -> 817 <<131,NodeEnc/binary>> = term_to_binary(Node), 818 binary_to_term(<<131,88,NodeEnc/binary,Id:32,Ser:32,Cre:32>>). 819 820version_compare(A, B, eq) -> rabbit_semver:eql(A, B); 821version_compare(A, B, lt) -> rabbit_semver:lt(A, B); 822version_compare(A, B, lte) -> rabbit_semver:lte(A, B); 823version_compare(A, B, gt) -> rabbit_semver:gt(A, B); 824version_compare(A, B, gte) -> rabbit_semver:gte(A, B). 825 826version_compare(A, B) -> 827 case version_compare(A, B, lt) of 828 true -> lt; 829 false -> case version_compare(A, B, gt) of 830 true -> gt; 831 false -> eq 832 end 833 end. 834 835%% For versions starting from 3.7.x: 836%% Versions are considered compatible (except for special cases; see 837%% below). The feature flags will determine if they are actually 838%% compatible. 839%% 840%% For versions up-to 3.7.x: 841%% a.b.c and a.b.d match, but a.b.c and a.d.e don't. If 842%% versions do not match that pattern, just compare them. 843%% 844%% Special case for 3.6.6 because it introduced a change to the schema. 845%% e.g. 3.6.6 is not compatible with 3.6.5 846%% This special case can be removed once 3.6.x reaches EOL 847version_minor_equivalent(A, B) -> 848 {{MajA, MinA, PatchA, _}, _} = rabbit_semver:normalize(rabbit_semver:parse(A)), 849 {{MajB, MinB, PatchB, _}, _} = rabbit_semver:normalize(rabbit_semver:parse(B)), 850 851 case {MajA, MinA, MajB, MinB} of 852 {3, 6, 3, 6} -> 853 if 854 PatchA >= 6 -> PatchB >= 6; 855 PatchA < 6 -> PatchB < 6; 856 true -> false 857 end; 858 _ 859 when (MajA < 3 orelse (MajA =:= 3 andalso MinA =< 6)) 860 orelse 861 (MajB < 3 orelse (MajB =:= 3 andalso MinB =< 6)) -> 862 MajA =:= MajB andalso MinA =:= MinB; 863 _ -> 864 %% Starting with RabbitMQ 3.7.x, we consider this 865 %% minor release series and all subsequent series to 866 %% be possibly compatible, based on just the version. 867 %% The real compatibility check is deferred to the 868 %% rabbit_feature_flags module in rabbitmq-server. 869 true 870 end. 871 872%% This is the same as above except that e.g. 3.7.x and 3.8.x are 873%% considered incompatible (as if there were no feature flags). This is 874%% useful to check plugin compatibility (`broker_versions_requirement` 875%% field in plugins). 876 877strict_version_minor_equivalent(A, B) -> 878 {{MajA, MinA, PatchA, _}, _} = rabbit_semver:normalize(rabbit_semver:parse(A)), 879 {{MajB, MinB, PatchB, _}, _} = rabbit_semver:normalize(rabbit_semver:parse(B)), 880 881 case {MajA, MinA, MajB, MinB} of 882 {3, 6, 3, 6} -> if 883 PatchA >= 6 -> PatchB >= 6; 884 PatchA < 6 -> PatchB < 6; 885 true -> false 886 end; 887 _ -> MajA =:= MajB andalso MinA =:= MinB 888 end. 889 890dict_cons(Key, Value, Dict) -> 891 dict:update(Key, fun (List) -> [Value | List] end, [Value], Dict). 892 893orddict_cons(Key, Value, Dict) -> 894 orddict:update(Key, fun (List) -> [Value | List] end, [Value], Dict). 895 896maps_cons(Key, Value, Map) -> 897 maps:update_with(Key, fun (List) -> [Value | List] end, [Value], Map). 898 899gb_trees_cons(Key, Value, Tree) -> 900 case gb_trees:lookup(Key, Tree) of 901 {value, Values} -> gb_trees:update(Key, [Value | Values], Tree); 902 none -> gb_trees:insert(Key, [Value], Tree) 903 end. 904 905gb_trees_fold(Fun, Acc, Tree) -> 906 gb_trees_fold1(Fun, Acc, gb_trees:next(gb_trees:iterator(Tree))). 907 908gb_trees_fold1(_Fun, Acc, none) -> 909 Acc; 910gb_trees_fold1(Fun, Acc, {Key, Val, It}) -> 911 gb_trees_fold1(Fun, Fun(Key, Val, Acc), gb_trees:next(It)). 912 913gb_trees_foreach(Fun, Tree) -> 914 gb_trees_fold(fun (Key, Val, Acc) -> Fun(Key, Val), Acc end, ok, Tree). 915 916module_attributes(Module) -> 917 try 918 Module:module_info(attributes) 919 catch 920 _:undef -> 921 io:format("WARNING: module ~p not found, so not scanned for boot steps.~n", 922 [Module]), 923 [] 924 end. 925 926all_module_attributes(Name) -> 927 Apps = [App || {App, _, _} <- application:loaded_applications()], 928 module_attributes_from_apps(Name, Apps). 929 930rabbitmq_related_module_attributes(Name) -> 931 Apps = rabbitmq_related_apps(), 932 module_attributes_from_apps(Name, Apps). 933 934rabbitmq_related_apps() -> 935 [App 936 || {App, _, _} <- application:loaded_applications(), 937 %% Only select RabbitMQ-related applications. 938 App =:= rabbit_common orelse 939 App =:= rabbitmq_prelaunch orelse 940 App =:= rabbit orelse 941 lists:member( 942 rabbit, 943 element(2, application:get_key(App, applications)))]. 944 945module_attributes_from_apps(Name, Apps) -> 946 Targets = 947 lists:usort( 948 lists:append( 949 [[{App, Module} || Module <- Modules] || 950 App <- Apps, 951 {ok, Modules} <- [application:get_key(App, modules)]])), 952 lists:foldl( 953 fun ({App, Module}, Acc) -> 954 case lists:append([Atts || {N, Atts} <- module_attributes(Module), 955 N =:= Name]) of 956 [] -> Acc; 957 Atts -> [{App, Module, Atts} | Acc] 958 end 959 end, [], Targets). 960 961build_acyclic_graph(VertexFun, EdgeFun, Graph) -> 962 G = digraph:new([acyclic]), 963 try 964 _ = [case digraph:vertex(G, Vertex) of 965 false -> digraph:add_vertex(G, Vertex, Label); 966 _ -> ok = throw({graph_error, {vertex, duplicate, Vertex}}) 967 end || GraphElem <- Graph, 968 {Vertex, Label} <- VertexFun(GraphElem)], 969 [case digraph:add_edge(G, From, To) of 970 {error, E} -> throw({graph_error, {edge, E, From, To}}); 971 _ -> ok 972 end || GraphElem <- Graph, 973 {From, To} <- EdgeFun(GraphElem)], 974 {ok, G} 975 catch {graph_error, Reason} -> 976 true = digraph:delete(G), 977 {error, Reason} 978 end. 979 980const(X) -> fun () -> X end. 981 982%% Format IPv4-mapped IPv6 addresses as IPv4, since they're what we see 983%% when IPv6 is enabled but not used (i.e. 99% of the time). 984ntoa({0,0,0,0,0,16#ffff,AB,CD}) -> 985 inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}); 986ntoa(IP) -> 987 inet_parse:ntoa(IP). 988 989ntoab(IP) -> 990 Str = ntoa(IP), 991 case string:str(Str, ":") of 992 0 -> Str; 993 _ -> "[" ++ Str ++ "]" 994 end. 995 996%% We try to avoid reconnecting to down nodes here; this is used in a 997%% loop in rabbit_amqqueue:on_node_down/1 and any delays we incur 998%% would be bad news. 999%% 1000%% See also rabbit_mnesia:is_process_alive/1 which also requires the 1001%% process be in the same running cluster as us (i.e. not partitioned 1002%% or some random node). 1003is_process_alive(Pid) when node(Pid) =:= node() -> 1004 erlang:is_process_alive(Pid); 1005is_process_alive(Pid) -> 1006 Node = node(Pid), 1007 lists:member(Node, [node() | nodes(connected)]) andalso 1008 rpc:call(Node, erlang, is_process_alive, [Pid]) =:= true. 1009 1010-spec pget(term(), list() | map()) -> term(). 1011pget(K, M) when is_map(M) -> 1012 case maps:find(K, M) of 1013 {ok, V} -> 1014 V; 1015 _ -> 1016 undefined 1017 end; 1018 1019pget(K, P) -> 1020 case lists:keyfind(K, 1, P) of 1021 {K, V} -> 1022 V; 1023 _ -> 1024 undefined 1025 end. 1026 1027-spec pget(term(), list() | map(), term()) -> term(). 1028pget(K, M, D) when is_map(M) -> 1029 case maps:find(K, M) of 1030 {ok, V} -> 1031 V; 1032 _ -> 1033 D 1034 end; 1035 1036pget(K, P, D) -> 1037 case lists:keyfind(K, 1, P) of 1038 {K, V} -> 1039 V; 1040 _ -> 1041 D 1042 end. 1043 1044-spec pget_or_die(term(), list() | map()) -> term() | no_return(). 1045pget_or_die(K, M) when is_map(M) -> 1046 case maps:find(K, M) of 1047 error -> exit({error, key_missing, K}); 1048 {ok, V} -> V 1049 end; 1050 1051pget_or_die(K, P) -> 1052 case proplists:get_value(K, P) of 1053 undefined -> exit({error, key_missing, K}); 1054 V -> V 1055 end. 1056 1057pupdate(K, UpdateFun, P) -> 1058 case lists:keyfind(K, 1, P) of 1059 {K, V} -> 1060 pset(K, UpdateFun(V), P); 1061 _ -> 1062 undefined 1063 end. 1064 1065%% property merge 1066pmerge(Key, Val, List) -> 1067 case proplists:is_defined(Key, List) of 1068 true -> List; 1069 _ -> [{Key, Val} | List] 1070 end. 1071 1072%% proplists merge 1073plmerge(P1, P2) -> 1074 %% Value from P1 suppresses value from P2 1075 maps:to_list(maps:merge(maps:from_list(P2), 1076 maps:from_list(P1))). 1077 1078%% groups a list of proplists by a key function 1079group_proplists_by(KeyFun, ListOfPropLists) -> 1080 Res = lists:foldl(fun(P, Agg) -> 1081 Key = KeyFun(P), 1082 Val = case maps:find(Key, Agg) of 1083 {ok, O} -> [P|O]; 1084 error -> [P] 1085 end, 1086 maps:put(Key, Val, Agg) 1087 end, #{}, ListOfPropLists), 1088 [ X || {_, X} <- maps:to_list(Res)]. 1089 1090pset(Key, Value, List) -> [{Key, Value} | proplists:delete(Key, List)]. 1091 1092format_message_queue(_Opt, MQ) -> 1093 Len = priority_queue:len(MQ), 1094 {Len, 1095 case Len > 100 of 1096 false -> priority_queue:to_list(MQ); 1097 true -> {summary, 1098 maps:to_list( 1099 lists:foldl( 1100 fun ({P, V}, Counts) -> 1101 maps:update_with( 1102 {P, format_message_queue_entry(V)}, 1103 fun(Old) -> Old + 1 end, 1, Counts) 1104 end, maps:new(), priority_queue:to_list(MQ)))} 1105 end}. 1106 1107format_message_queue_entry(V) when is_atom(V) -> 1108 V; 1109format_message_queue_entry(V) when is_tuple(V) -> 1110 list_to_tuple([format_message_queue_entry(E) || E <- tuple_to_list(V)]); 1111format_message_queue_entry(_V) -> 1112 '_'. 1113 1114%% Same as rpc:multicall/4 but concatenates all results. 1115%% M, F, A is expected to return a list. If it does not, 1116%% its return value will be wrapped in a list. 1117-spec append_rpc_all_nodes([node()], atom(), atom(), [any()]) -> [any()]. 1118append_rpc_all_nodes(Nodes, M, F, A) -> 1119 do_append_rpc_all_nodes(Nodes, M, F, A, ?RPC_INFINITE_TIMEOUT). 1120 1121-spec append_rpc_all_nodes([node()], atom(), atom(), [any()], timeout()) -> [any()]. 1122append_rpc_all_nodes(Nodes, M, F, A, Timeout) -> 1123 do_append_rpc_all_nodes(Nodes, M, F, A, Timeout). 1124 1125do_append_rpc_all_nodes(Nodes, M, F, A, ?RPC_INFINITE_TIMEOUT) -> 1126 {ResL, _} = rpc:multicall(Nodes, M, F, A, ?RPC_INFINITE_TIMEOUT), 1127 process_rpc_multicall_result(ResL); 1128do_append_rpc_all_nodes(Nodes, M, F, A, Timeout) -> 1129 {ResL, _} = try 1130 rpc:multicall(Nodes, M, F, A, Timeout) 1131 catch 1132 error:internal_error -> {[], Nodes} 1133 end, 1134 process_rpc_multicall_result(ResL). 1135 1136process_rpc_multicall_result(ResL) -> 1137 lists:append([case Res of 1138 {badrpc, _} -> []; 1139 Xs when is_list(Xs) -> Xs; 1140 %% wrap it in a list 1141 Other -> [Other] 1142 end || Res <- ResL]). 1143 1144os_cmd(Command) -> 1145 case os:type() of 1146 {win32, _} -> 1147 %% Clink workaround; see 1148 %% https://code.google.com/p/clink/issues/detail?id=141 1149 os:cmd(" " ++ Command); 1150 _ -> 1151 %% Don't just return "/bin/sh: <cmd>: not found" if not found 1152 Exec = hd(string:tokens(Command, " ")), 1153 case os:find_executable(Exec) of 1154 false -> throw({command_not_found, Exec}); 1155 _ -> os:cmd(Command) 1156 end 1157 end. 1158 1159is_os_process_alive(Pid) -> 1160 with_os([{unix, fun () -> 1161 run_ps(Pid) =:= 0 1162 end}, 1163 {win32, fun () -> 1164 PidS = rabbit_data_coercion:to_list(Pid), 1165 case os:find_executable("tasklist.exe") of 1166 false -> 1167 Cmd = 1168 format( 1169 "PowerShell -Command " 1170 "\"(Get-Process -Id ~s).ProcessName\"", 1171 [PidS]), 1172 Res = 1173 os_cmd(Cmd ++ " 2>&1") -- [$\r, $\n], 1174 case Res of 1175 "erl" -> true; 1176 "werl" -> true; 1177 _ -> false 1178 end; 1179 _ -> 1180 Cmd = 1181 "tasklist /nh /fi " 1182 "\"pid eq " ++ PidS ++ "\"", 1183 Res = os_cmd(Cmd ++ " 2>&1"), 1184 match =:= re:run(Res, 1185 "erl\\.exe", 1186 [{capture, none}]) 1187 end 1188 end}]). 1189 1190with_os(Handlers) -> 1191 {OsFamily, _} = os:type(), 1192 case proplists:get_value(OsFamily, Handlers) of 1193 undefined -> throw({unsupported_os, OsFamily}); 1194 Handler -> Handler() 1195 end. 1196 1197run_ps(Pid) -> 1198 Cmd = "ps -p " ++ rabbit_data_coercion:to_list(Pid), 1199 Port = erlang:open_port({spawn, Cmd}, 1200 [exit_status, {line, 16384}, 1201 use_stdio, stderr_to_stdout]), 1202 exit_loop(Port). 1203 1204exit_loop(Port) -> 1205 receive 1206 {Port, {exit_status, Rc}} -> Rc; 1207 {Port, _} -> exit_loop(Port) 1208 end. 1209 1210gb_sets_difference(S1, S2) -> 1211 gb_sets:fold(fun gb_sets:delete_any/2, S1, S2). 1212 1213version() -> 1214 {ok, VSN} = application:get_key(rabbit, vsn), 1215 VSN. 1216 1217%% See https://www.erlang.org/doc/system_principles/versions.html 1218otp_release() -> 1219 File = filename:join([code:root_dir(), "releases", 1220 erlang:system_info(otp_release), "OTP_VERSION"]), 1221 case file:read_file(File) of 1222 {ok, VerBin} -> 1223 %% 17.0 or later, we need the file for the minor version 1224 string:strip(binary_to_list(VerBin), both, $\n); 1225 {error, _} -> 1226 %% R16B03 or earlier (no file, otp_release is correct) 1227 %% or we couldn't read the file (so this is best we can do) 1228 erlang:system_info(otp_release) 1229 end. 1230 1231platform_and_version() -> 1232 string:join(["Erlang/OTP", otp_release()], " "). 1233 1234otp_system_version() -> 1235 string:strip(erlang:system_info(system_version), both, $\n). 1236 1237rabbitmq_and_erlang_versions() -> 1238 {version(), otp_release()}. 1239 1240%% application:which_applications(infinity) is dangerous, since it can 1241%% cause deadlocks on shutdown. So we have to use a timeout variant, 1242%% but w/o creating spurious timeout errors. The timeout value is twice 1243%% that of gen_server:call/2. 1244which_applications() -> 1245 try 1246 application:which_applications(10000) 1247 catch 1248 exit:{timeout, _} -> [] 1249 end. 1250 1251sequence_error([T]) -> T; 1252sequence_error([{error, _} = Error | _]) -> Error; 1253sequence_error([_ | Rest]) -> sequence_error(Rest). 1254 1255check_expiry(N) when N < 0 -> {error, {value_negative, N}}; 1256check_expiry(_N) -> ok. 1257 1258base64url(In) -> 1259 lists:reverse(lists:foldl(fun ($\+, Acc) -> [$\- | Acc]; 1260 ($\/, Acc) -> [$\_ | Acc]; 1261 ($\=, Acc) -> Acc; 1262 (Chr, Acc) -> [Chr | Acc] 1263 end, [], base64:encode_to_string(In))). 1264 1265%% Ideally, you'd want Fun to run every IdealInterval. but you don't 1266%% want it to take more than MaxRatio of IdealInterval. So if it takes 1267%% more then you want to run it less often. So we time how long it 1268%% takes to run, and then suggest how long you should wait before 1269%% running it again with a user specified max interval. Times are in millis. 1270interval_operation({M, F, A}, MaxRatio, MaxInterval, IdealInterval, LastInterval) -> 1271 {Micros, Res} = timer:tc(M, F, A), 1272 {Res, case {Micros > 1000 * (MaxRatio * IdealInterval), 1273 Micros > 1000 * (MaxRatio * LastInterval)} of 1274 {true, true} -> lists:min([MaxInterval, 1275 round(LastInterval * 1.5)]); 1276 {true, false} -> LastInterval; 1277 {false, false} -> lists:max([IdealInterval, 1278 round(LastInterval / 1.5)]) 1279 end}. 1280 1281ensure_timer(State, Idx, After, Msg) -> 1282 case element(Idx, State) of 1283 undefined -> TRef = send_after(After, self(), Msg), 1284 setelement(Idx, State, TRef); 1285 _ -> State 1286 end. 1287 1288stop_timer(State, Idx) -> 1289 case element(Idx, State) of 1290 undefined -> State; 1291 TRef -> cancel_timer(TRef), 1292 setelement(Idx, State, undefined) 1293 end. 1294 1295%% timer:send_after/3 goes through a single timer process but allows 1296%% long delays. erlang:send_after/3 does not have a bottleneck but 1297%% only allows max 2^32-1 millis. 1298-define(MAX_ERLANG_SEND_AFTER, 4294967295). 1299send_after(Millis, Pid, Msg) when Millis > ?MAX_ERLANG_SEND_AFTER -> 1300 {ok, Ref} = timer:send_after(Millis, Pid, Msg), 1301 {timer, Ref}; 1302send_after(Millis, Pid, Msg) -> 1303 {erlang, erlang:send_after(Millis, Pid, Msg)}. 1304 1305cancel_timer({erlang, Ref}) -> _ = erlang:cancel_timer(Ref), 1306 ok; 1307cancel_timer({timer, Ref}) -> {ok, cancel} = timer:cancel(Ref), 1308 ok. 1309 1310store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}). 1311store_proc_name(TypeProcName) -> put(process_name, TypeProcName). 1312 1313get_proc_name() -> 1314 case get(process_name) of 1315 undefined -> 1316 undefined; 1317 {_Type, Name} -> 1318 {ok, Name} 1319 end. 1320 1321%% application:get_env/3 is only available in R16B01 or later. 1322get_env(Application, Key, Def) -> 1323 case application:get_env(Application, Key) of 1324 {ok, Val} -> Val; 1325 undefined -> Def 1326 end. 1327 1328get_channel_operation_timeout() -> 1329 %% Default channel_operation_timeout set to net_ticktime + 10s to 1330 %% give allowance for any down messages to be received first, 1331 %% whenever it is used for cross-node calls with timeouts. 1332 Default = (net_kernel:get_net_ticktime() + 10) * 1000, 1333 application:get_env(rabbit, channel_operation_timeout, Default). 1334 1335moving_average(_Time, _HalfLife, Next, undefined) -> 1336 Next; 1337%% We want the Weight to decrease as Time goes up (since Weight is the 1338%% weight for the current sample, not the new one), so that the moving 1339%% average decays at the same speed regardless of how long the time is 1340%% between samplings. So we want Weight = math:exp(Something), where 1341%% Something turns out to be negative. 1342%% 1343%% We want to determine Something here in terms of the Time taken 1344%% since the last measurement, and a HalfLife. So we want Weight = 1345%% math:exp(Time * Constant / HalfLife). What should Constant be? We 1346%% want Weight to be 0.5 when Time = HalfLife. 1347%% 1348%% Plug those numbers in and you get 0.5 = math:exp(Constant). Take 1349%% the log of each side and you get math:log(0.5) = Constant. 1350moving_average(Time, HalfLife, Next, Current) -> 1351 Weight = math:exp(Time * math:log(0.5) / HalfLife), 1352 Next * (1 - Weight) + Current * Weight. 1353 1354random(N) -> 1355 rand:uniform(N). 1356 1357-spec escape_html_tags(string()) -> binary(). 1358 1359escape_html_tags(S) -> 1360 escape_html_tags(rabbit_data_coercion:to_list(S), []). 1361 1362 1363-spec escape_html_tags(string(), string()) -> binary(). 1364 1365escape_html_tags([], Acc) -> 1366 rabbit_data_coercion:to_binary(lists:reverse(Acc)); 1367escape_html_tags("<" ++ Rest, Acc) -> 1368 escape_html_tags(Rest, lists:reverse("<", Acc)); 1369escape_html_tags(">" ++ Rest, Acc) -> 1370 escape_html_tags(Rest, lists:reverse(">", Acc)); 1371escape_html_tags("&" ++ Rest, Acc) -> 1372 escape_html_tags(Rest, lists:reverse("&", Acc)); 1373escape_html_tags([C | Rest], Acc) -> 1374 escape_html_tags(Rest, [C | Acc]). 1375 1376%% If the server we are talking to has non-standard net_ticktime, and 1377%% our connection lasts a while, we could get disconnected because of 1378%% a timeout unless we set our ticktime to be the same. So let's do 1379%% that. 1380%% TODO: do not use an infinite timeout! 1381-spec rpc_call(node(), atom(), atom(), [any()]) -> any() | {badrpc, term()}. 1382rpc_call(Node, Mod, Fun, Args) -> 1383 rpc_call(Node, Mod, Fun, Args, ?RPC_INFINITE_TIMEOUT). 1384 1385-spec rpc_call(node(), atom(), atom(), [any()], infinity | non_neg_integer()) -> any() | {badrpc, term()}. 1386rpc_call(Node, Mod, Fun, Args, Timeout) -> 1387 case rpc:call(Node, net_kernel, get_net_ticktime, [], Timeout) of 1388 {badrpc, _} = E -> E; 1389 ignored -> 1390 rpc:call(Node, Mod, Fun, Args, Timeout); 1391 {ongoing_change_to, NewValue} -> 1392 _ = net_kernel:set_net_ticktime(NewValue, 0), 1393 rpc:call(Node, Mod, Fun, Args, Timeout); 1394 Time -> 1395 _ = net_kernel:set_net_ticktime(Time, 0), 1396 rpc:call(Node, Mod, Fun, Args, Timeout) 1397 end. 1398 1399get_gc_info(Pid) -> 1400 rabbit_runtime:get_gc_info(Pid). 1401 1402%% ------------------------------------------------------------------------- 1403%% Begin copypasta from gen_server2.erl 1404 1405get_parent() -> 1406 case get('$ancestors') of 1407 [Parent | _] when is_pid (Parent) -> Parent; 1408 [Parent | _] when is_atom(Parent) -> name_to_pid(Parent); 1409 _ -> exit(process_was_not_started_by_proc_lib) 1410 end. 1411 1412name_to_pid(Name) -> 1413 case whereis(Name) of 1414 undefined -> case whereis_name(Name) of 1415 undefined -> exit(could_not_find_registered_name); 1416 Pid -> Pid 1417 end; 1418 Pid -> Pid 1419 end. 1420 1421whereis_name(Name) -> 1422 case ets:lookup(global_names, Name) of 1423 [{_Name, Pid, _Method, _RPid, _Ref}] -> 1424 if node(Pid) == node() -> case erlang:is_process_alive(Pid) of 1425 true -> Pid; 1426 false -> undefined 1427 end; 1428 true -> Pid 1429 end; 1430 [] -> undefined 1431 end. 1432 1433%% End copypasta from gen_server2.erl 1434%% ------------------------------------------------------------------------- 1435