1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2016-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_mgmt_data). 9 10-include("rabbit_mgmt_records.hrl"). 11-include("rabbit_mgmt_metrics.hrl"). 12-include_lib("rabbit_common/include/rabbit.hrl"). 13-include_lib("rabbit_common/include/rabbit_core_metrics.hrl"). 14 15-export([empty/2, pick_range/2]). 16 17% delegate api 18-export([overview_data/4, 19 consumer_data/2, 20 all_list_queue_data/3, 21 all_detail_queue_data/3, 22 all_exchange_data/3, 23 all_connection_data/3, 24 all_list_channel_data/3, 25 all_detail_channel_data/3, 26 all_vhost_data/3, 27 all_node_data/3, 28 augmented_created_stats/2, 29 augmented_created_stats/3, 30 augment_channel_pids/2, 31 augment_details/2, 32 lookup_element/2, 33 lookup_element/3 34 ]). 35 36-import(rabbit_misc, [pget/2]). 37 38-type maybe_slide() :: exometer_slide:slide() | not_found. 39-type ranges() :: {maybe_range(), maybe_range(), maybe_range(), maybe_range()}. 40-type maybe_range() :: no_range | #range{}. 41 42%%---------------------------------------------------------------------------- 43%% Internal, query-time - node-local operations 44%%---------------------------------------------------------------------------- 45 46created_stats(Name, Type) -> 47 case ets:select(Type, [{{'_', '$2', '$3'}, [{'==', Name, '$2'}], ['$3']}]) of 48 [] -> not_found; 49 [Elem] -> Elem 50 end. 51 52created_stats(Type) -> 53 %% TODO better tab2list? 54 ets:select(Type, [{{'_', '_', '$3'}, [], ['$3']}]). 55 56-spec all_detail_queue_data(pid(), [any()], ranges()) -> #{atom() => any()}. 57all_detail_queue_data(_Pid, Ids, Ranges) -> 58 lists:foldl(fun (Id, Acc) -> 59 Data = detail_queue_data(Ranges, Id), 60 maps:put(Id, Data, Acc) 61 end, #{}, Ids). 62 63all_list_queue_data(_Pid, Ids, Ranges) -> 64 lists:foldl(fun (Id, Acc) -> 65 Data = list_queue_data(Ranges, Id), 66 maps:put(Id, Data, Acc) 67 end, #{}, Ids). 68 69all_detail_channel_data(_Pid, Ids, Ranges) -> 70 lists:foldl(fun (Id, Acc) -> 71 Data = detail_channel_data(Ranges, Id), 72 maps:put(Id, Data, Acc) 73 end, #{}, Ids). 74 75all_list_channel_data(_Pid, Ids, Ranges) -> 76 lists:foldl(fun (Id, Acc) -> 77 Data = list_channel_data(Ranges, Id), 78 maps:put(Id, Data, Acc) 79 end, #{}, Ids). 80 81connection_data(Ranges, Id) -> 82 maps:from_list([raw_message_data(connection_stats_coarse_conn_stats, 83 pick_range(coarse_conn_stats, Ranges), Id), 84 {connection_stats, lookup_element(connection_stats, Id)}]). 85 86exchange_data(Ranges, Id) -> 87 maps:from_list( 88 exchange_raw_detail_stats_data(Ranges, Id) ++ 89 [raw_message_data(exchange_stats_publish_out, 90 pick_range(fine_stats, Ranges), Id), 91 raw_message_data(exchange_stats_publish_in, 92 pick_range(fine_stats, Ranges), Id)]). 93 94vhost_data(Ranges, Id) -> 95 maps:from_list([raw_message_data(vhost_stats_coarse_conn_stats, 96 pick_range(coarse_conn_stats, Ranges), Id), 97 raw_message_data(vhost_msg_stats, 98 pick_range(queue_msg_rates, Ranges), Id), 99 raw_message_data(vhost_stats_fine_stats, 100 pick_range(fine_stats, Ranges), Id), 101 raw_message_data(vhost_stats_deliver_stats, 102 pick_range(deliver_get, Ranges), Id)]). 103 104node_data(Ranges, Id) -> 105 maps:from_list( 106 [{mgmt_stats, mgmt_queue_length_stats(Id)}] ++ 107 [{node_node_metrics, node_node_metrics()}] ++ 108 node_raw_detail_stats_data(Ranges, Id) ++ 109 [raw_message_data(node_coarse_stats, 110 pick_range(coarse_node_stats, Ranges), Id), 111 raw_message_data(node_persister_stats, 112 pick_range(coarse_node_stats, Ranges), Id), 113 {node_stats, lookup_element(node_stats, Id)}] ++ 114 node_connection_churn_rates_data(Ranges, Id)). 115 116overview_data(_Pid, User, Ranges, VHosts) -> 117 Raw = [raw_all_message_data(vhost_msg_stats, pick_range(queue_msg_counts, Ranges), VHosts), 118 raw_all_message_data(vhost_stats_fine_stats, pick_range(fine_stats, Ranges), VHosts), 119 raw_all_message_data(vhost_msg_rates, pick_range(queue_msg_rates, Ranges), VHosts), 120 raw_all_message_data(vhost_stats_deliver_stats, pick_range(deliver_get, Ranges), VHosts), 121 raw_message_data(connection_churn_rates, pick_range(queue_msg_rates, Ranges), node())], 122 maps:from_list(Raw ++ 123 [{connections_count, count_created_stats(connection_created_stats, User)}, 124 {channels_count, count_created_stats(channel_created_stats, User)}, 125 {consumers_count, ets:info(consumer_stats, size)}]). 126 127consumer_data(_Pid, VHost) -> 128 maps:from_list( 129 [{C, augment_msg_stats(augment_consumer(C))} 130 || C <- consumers_by_vhost(VHost)]). 131 132all_connection_data(_Pid, Ids, Ranges) -> 133 maps:from_list([{Id, connection_data(Ranges, Id)} || Id <- Ids]). 134 135all_exchange_data(_Pid, Ids, Ranges) -> 136 maps:from_list([{Id, exchange_data(Ranges, Id)} || Id <- Ids]). 137 138all_vhost_data(_Pid, Ids, Ranges) -> 139 maps:from_list([{Id, vhost_data(Ranges, Id)} || Id <- Ids]). 140 141all_node_data(_Pid, Ids, Ranges) -> 142 maps:from_list([{Id, node_data(Ranges, Id)} || Id <- Ids]). 143 144channel_raw_message_data(Ranges, Id) -> 145 [raw_message_data(channel_stats_fine_stats, pick_range(fine_stats, Ranges), Id), 146 raw_message_data(channel_stats_deliver_stats, pick_range(deliver_get, Ranges), Id), 147 raw_message_data(channel_process_stats, pick_range(process_stats, Ranges), Id)]. 148 149queue_raw_message_data(Ranges, Id) -> 150 [raw_message_data(queue_stats_publish, pick_range(fine_stats, Ranges), Id), 151 raw_message_data(queue_stats_deliver_stats, pick_range(deliver_get, Ranges), Id), 152 raw_message_data(queue_process_stats, pick_range(process_stats, Ranges), Id), 153 raw_message_data(queue_msg_stats, pick_range(queue_msg_counts, Ranges), Id)]. 154 155queue_raw_deliver_stats_data(Ranges, Id) -> 156 [raw_message_data2(channel_queue_stats_deliver_stats, 157 pick_range(deliver_get, Ranges), Key) 158 || Key <- get_table_keys(channel_queue_stats_deliver_stats, second(Id))] ++ 159 [raw_message_data2(queue_exchange_stats_publish, 160 pick_range(fine_stats, Ranges), Key) 161 || Key <- get_table_keys(queue_exchange_stats_publish, first(Id))]. 162 163node_raw_detail_stats_data(Ranges, Id) -> 164 [raw_message_data2(node_node_coarse_stats, 165 pick_range(coarse_node_node_stats, Ranges), Key) 166 || Key <- get_table_keys(node_node_coarse_stats, first(Id))]. 167 168node_connection_churn_rates_data(Ranges, Id) -> 169 [raw_message_data(connection_churn_rates, 170 pick_range(churn_rates, Ranges), Id)]. 171 172exchange_raw_detail_stats_data(Ranges, Id) -> 173 [raw_message_data2(channel_exchange_stats_fine_stats, 174 pick_range(fine_stats, Ranges), Key) 175 || Key <- get_table_keys(channel_exchange_stats_fine_stats, second(Id))] ++ 176 [raw_message_data2(queue_exchange_stats_publish, 177 pick_range(fine_stats, Ranges), Key) 178 || Key <- get_table_keys(queue_exchange_stats_publish, second(Id))]. 179 180channel_raw_detail_stats_data(Ranges, Id) -> 181 [raw_message_data2(channel_exchange_stats_fine_stats, 182 pick_range(fine_stats, Ranges), Key) 183 || Key <- get_table_keys(channel_exchange_stats_fine_stats, first(Id))] ++ 184 [raw_message_data2(channel_queue_stats_deliver_stats, 185 pick_range(fine_stats, Ranges), Key) 186 || Key <- get_table_keys(channel_queue_stats_deliver_stats, first(Id))]. 187 188raw_message_data2(Table, no_range, Id) -> 189 SmallSample = lookup_smaller_sample(Table, Id), 190 {{Table, Id}, {SmallSample, not_found}}; 191raw_message_data2(Table, Range, Id) -> 192 SmallSample = lookup_smaller_sample(Table, Id), 193 Samples = lookup_samples(Table, Id, Range), 194 {{Table, Id}, {SmallSample, Samples}}. 195 196detail_queue_data(Ranges, Id) -> 197 maps:from_list(queue_raw_message_data(Ranges, Id) ++ 198 queue_raw_deliver_stats_data(Ranges, Id) ++ 199 [{queue_stats, lookup_element(queue_stats, Id)}, 200 {consumer_stats, get_queue_consumer_stats(Id)}]). 201 202list_queue_data(Ranges, Id) -> 203 maps:from_list(queue_raw_message_data(Ranges, Id) ++ 204 queue_raw_deliver_stats_data(Ranges, Id) ++ 205 [{queue_stats, lookup_element(queue_stats, Id)}]). 206 207detail_channel_data(Ranges, Id) -> 208 maps:from_list(channel_raw_message_data(Ranges, Id) ++ 209 channel_raw_detail_stats_data(Ranges, Id) ++ 210 [{channel_stats, lookup_element(channel_stats, Id)}, 211 {consumer_stats, get_consumer_stats(Id)}]). 212 213list_channel_data(Ranges, Id) -> 214 maps:from_list(channel_raw_message_data(Ranges, Id) ++ 215 channel_raw_detail_stats_data(Ranges, Id) ++ 216 [{channel_stats, lookup_element(channel_stats, Id)}]). 217 218-spec raw_message_data(atom(), maybe_range(), any()) -> 219 {atom(), {maybe_slide(), maybe_slide()}}. 220raw_message_data(Table, no_range, Id) -> 221 SmallSample = lookup_smaller_sample(Table, Id), 222 {Table, {SmallSample, not_found}}; 223raw_message_data(Table, Range, Id) -> 224 SmallSample = lookup_smaller_sample(Table, Id), 225 Samples = lookup_samples(Table, Id, Range), 226 {Table, {SmallSample, Samples}}. 227 228raw_all_message_data(Table, Range, VHosts) -> 229 SmallSample = lookup_all(Table, VHosts, select_smaller_sample(Table)), 230 RangeSample = case Range of 231 no_range -> not_found; 232 _ -> 233 lookup_all(Table, VHosts, select_range_sample(Table, 234 Range)) 235 end, 236 {Table, {SmallSample, RangeSample}}. 237 238get_queue_consumer_stats(Id) -> 239 Consumers = ets:select(consumer_stats, match_queue_consumer_spec(Id)), 240 [augment_consumer(C) || C <- Consumers]. 241 242get_consumer_stats(Id) -> 243 Consumers = ets:select(consumer_stats, match_consumer_spec(Id)), 244 [augment_consumer(C) || C <- Consumers]. 245 246count_created_stats(Type, all) -> 247 ets:info(Type, size); 248count_created_stats(Type, User) -> 249 length(filter_user(created_stats(Type), User)). 250 251augment_consumer({{Q, Ch, CTag}, Props}) -> 252 [{queue, format_resource(Q)}, 253 {channel_details, augment_channel_pid(Ch)}, 254 {channel_pid, Ch}, 255 {consumer_tag, CTag} | Props]. 256 257consumers_by_vhost(VHost) -> 258 ets:select(consumer_stats, 259 [{{{#resource{virtual_host = '$1', _ = '_'}, '_', '_'}, '_'}, 260 [{'orelse', {'==', 'all', VHost}, {'==', VHost, '$1'}}], 261 ['$_']}]). 262 263augment_msg_stats(Props) -> 264 augment_details(Props, []) ++ Props. 265 266augment_details([{_, none} | T], Acc) -> 267 augment_details(T, Acc); 268augment_details([{_, unknown} | T], Acc) -> 269 augment_details(T, Acc); 270augment_details([{connection, Value} | T], Acc) -> 271 augment_details(T, [{connection_details, augment_connection_pid(Value)} | Acc]); 272augment_details([{channel, Value} | T], Acc) -> 273 augment_details(T, [{channel_details, augment_channel_pid(Value)} | Acc]); 274augment_details([{owner_pid, Value} | T], Acc) -> 275 augment_details(T, [{owner_pid_details, augment_connection_pid(Value)} | Acc]); 276augment_details([_ | T], Acc) -> 277 augment_details(T, Acc); 278augment_details([], Acc) -> 279 Acc. 280 281augment_channel_pids(_Pid, ChPids) -> 282 lists:map(fun (ChPid) -> augment_channel_pid(ChPid) end, ChPids). 283 284augment_channel_pid(Pid) -> 285 Ch = lookup_channel_with_fallback_to_connection(Pid), 286 Conn = lookup_element(connection_created_stats, pget(connection, Ch), 3), 287 case Conn of 288 [] -> %% If the connection has just been opened, we might not yet have the data 289 []; 290 _ -> 291 [{name, pget(name, Ch)}, 292 {pid, pget(pid, Ch)}, 293 {number, pget(number, Ch)}, 294 {user, pget(user, Ch)}, 295 {connection_name, pget(name, Conn)}, 296 {peer_port, pget(peer_port, Conn)}, 297 {peer_host, pget(peer_host, Conn)}] 298 end. 299 300lookup_channel_with_fallback_to_connection(ChannelOrConnectionPid) -> 301 % stream consumers report a stream connection PID for their channel PID, 302 % so we adapt to this here 303 case lookup_element(channel_created_stats, ChannelOrConnectionPid, 3) of 304 [] -> 305 case lookup_element(connection_created_stats, ChannelOrConnectionPid, 3) of 306 [] -> 307 % not a channel and not a connection, not much we can do here 308 [{pid, ChannelOrConnectionPid}]; 309 Conn -> 310 [{name, <<"">>}, 311 {pid, ChannelOrConnectionPid}, 312 {number, 0}, 313 {user, pget(user, Conn)}, 314 {connection, ChannelOrConnectionPid}] 315 end; 316 Ch -> 317 Ch 318 end. 319 320augment_connection_pid(Pid) -> 321 Conn = lookup_element(connection_created_stats, Pid, 3), 322 case Conn of 323 [] -> %% If the connection has just been opened, we might not yet have the data 324 []; 325 _ -> 326 [{name, pget(name, Conn)}, 327 {peer_port, pget(peer_port, Conn)}, 328 {peer_host, pget(peer_host, Conn)}] 329 end. 330 331augmented_created_stats(_Pid, Key, Type) -> 332 case created_stats(Key, Type) of 333 not_found -> not_found; 334 S -> augment_msg_stats(S) 335 end. 336 337augmented_created_stats(_Pid, Type) -> 338 [ augment_msg_stats(S) || S <- created_stats(Type) ]. 339 340match_consumer_spec(Id) -> 341 [{{{'_', '$1', '_'}, '_'}, [{'==', Id, '$1'}], ['$_']}]. 342 343match_queue_consumer_spec(Id) -> 344 [{{{'$1', '_', '_'}, '_'}, [{'==', {Id}, '$1'}], ['$_']}]. 345 346lookup_element(Table, Key) -> lookup_element(Table, Key, 2). 347 348lookup_element(Table, Key, Pos) -> 349 try ets:lookup_element(Table, Key, Pos) 350 catch error:badarg -> [] 351 end. 352 353-spec lookup_smaller_sample(atom(), any()) -> maybe_slide(). 354lookup_smaller_sample(Table, Id) -> 355 case ets:lookup(Table, {Id, select_smaller_sample(Table)}) of 356 [] -> 357 not_found; 358 [{_, Slide}] -> 359 Slide1 = exometer_slide:optimize(Slide), 360 maybe_convert_for_compatibility(Table, Slide1) 361 end. 362 363-spec lookup_samples(atom(), any(), #range{}) -> maybe_slide(). 364lookup_samples(Table, Id, Range) -> 365 case ets:lookup(Table, {Id, select_range_sample(Table, Range)}) of 366 [] -> 367 not_found; 368 [{_, Slide}] -> 369 Slide1 = exometer_slide:optimize(Slide), 370 maybe_convert_for_compatibility(Table, Slide1) 371 end. 372 373lookup_all(Table, Ids, SecondKey) -> 374 Slides = lists:foldl(fun(Id, Acc) -> 375 case ets:lookup(Table, {Id, SecondKey}) of 376 [] -> 377 Acc; 378 [{_, Slide}] -> 379 [Slide | Acc] 380 end 381 end, [], Ids), 382 case Slides of 383 [] -> 384 not_found; 385 _ -> 386 Slide = exometer_slide:sum(Slides, empty(Table, 0)), 387 maybe_convert_for_compatibility(Table, Slide) 388 end. 389 390maybe_convert_for_compatibility(Table, Slide) 391 when Table =:= channel_stats_fine_stats orelse 392 Table =:= channel_exchange_stats_fine_stats orelse 393 Table =:= vhost_stats_fine_stats -> 394 ConversionNeeded = rabbit_feature_flags:is_disabled( 395 drop_unroutable_metric), 396 case ConversionNeeded of 397 false -> 398 Slide; 399 true -> 400 %% drop_drop because the metric is named "drop_unroutable" 401 rabbit_mgmt_data_compat:drop_drop_unroutable_metric(Slide) 402 end; 403maybe_convert_for_compatibility(Table, Slide) 404 when Table =:= channel_queue_stats_deliver_stats orelse 405 Table =:= channel_stats_deliver_stats orelse 406 Table =:= queue_stats_deliver_stats orelse 407 Table =:= vhost_stats_deliver_stats -> 408 ConversionNeeded = rabbit_feature_flags:is_disabled( 409 empty_basic_get_metric), 410 case ConversionNeeded of 411 false -> 412 Slide; 413 true -> 414 rabbit_mgmt_data_compat:drop_get_empty_queue_metric(Slide) 415 end; 416maybe_convert_for_compatibility(_, Slide) -> 417 Slide. 418 419get_table_keys(Table, Id0) -> 420 ets:select(Table, match_spec_keys(Id0)). 421 422match_spec_keys(Id) -> 423 MatchCondition = to_match_condition(Id), 424 MatchHead = {{{'$1', '$2'}, '_'}, '_'}, 425 [{MatchHead, [MatchCondition], [{{'$1', '$2'}}]}]. 426 427to_match_condition({'_', Id1}) when is_tuple(Id1) -> 428 {'==', {Id1}, '$2'}; 429to_match_condition({'_', Id1}) -> 430 {'==', Id1, '$2'}; 431to_match_condition({Id0, '_'}) when is_tuple(Id0) -> 432 {'==', {Id0}, '$1'}; 433to_match_condition({Id0, '_'}) -> 434 {'==', Id0, '$1'}. 435 436mgmt_queue_length_stats(Id) when Id =:= node() -> 437 GCsQueueLengths = lists:map(fun (T) -> 438 case whereis(rabbit_mgmt_metrics_gc:name(T)) of 439 P when is_pid(P) -> 440 {message_queue_len, Len} = 441 erlang:process_info(P, message_queue_len), 442 {T, Len}; 443 _ -> {T, 0} 444 end 445 end, 446 ?GC_EVENTS), 447 [{metrics_gc_queue_length, GCsQueueLengths}]; 448mgmt_queue_length_stats(_Id) -> 449 % if it isn't for the current node just return an empty list 450 []. 451 452node_node_metrics() -> 453 maps:from_list(ets:tab2list(node_node_metrics)). 454 455select_range_sample(Table, #range{first = First, last = Last}) -> 456 Range = Last - First, 457 Policies = rabbit_mgmt_agent_config:get_env(sample_retention_policies), 458 Policy = retention_policy(Table), 459 [T | _] = TablePolicies = lists:sort(proplists:get_value(Policy, Policies)), 460 {_, Sample} = select_smallest_above(T, TablePolicies, Range), 461 Sample. 462 463select_smaller_sample(Table) -> 464 Policies = rabbit_mgmt_agent_config:get_env(sample_retention_policies), 465 Policy = retention_policy(Table), 466 TablePolicies = proplists:get_value(Policy, Policies), 467 [V | _] = lists:sort([I || {_, I} <- TablePolicies]), 468 V. 469 470select_smallest_above(V, [], _) -> 471 V; 472select_smallest_above(_, [{H, _} = S | _T], Interval) when (H * 1000) > Interval -> 473 S; 474select_smallest_above(_, [H | T], Interval) -> 475 select_smallest_above(H, T, Interval). 476 477pick_range(queue_msg_counts, {RangeL, _RangeM, _RangeD, _RangeN}) -> 478 RangeL; 479pick_range(K, {_RangeL, RangeM, _RangeD, _RangeN}) when K == fine_stats; 480 K == deliver_get; 481 K == queue_msg_rates -> 482 RangeM; 483pick_range(K, {_RangeL, _RangeM, RangeD, _RangeN}) when K == coarse_conn_stats; 484 K == process_stats -> 485 RangeD; 486pick_range(K, {_RangeL, _RangeM, _RangeD, RangeN}) 487 when K == coarse_node_stats; 488 K == coarse_node_node_stats; 489 K == churn_rates -> 490 RangeN. 491 492first(Id) -> 493 {Id, '_'}. 494 495second(Id) -> 496 {'_', Id}. 497 498empty(Type, V) when Type =:= connection_stats_coarse_conn_stats; 499 Type =:= queue_msg_stats; 500 Type =:= vhost_msg_stats -> 501 {V, V, V}; 502empty(Type, V) when Type =:= channel_stats_fine_stats; 503 Type =:= channel_exchange_stats_fine_stats; 504 Type =:= vhost_stats_fine_stats -> 505 {V, V, V, V}; 506empty(Type, V) when Type =:= channel_queue_stats_deliver_stats; 507 Type =:= queue_stats_deliver_stats; 508 Type =:= vhost_stats_deliver_stats; 509 Type =:= channel_stats_deliver_stats -> 510 {V, V, V, V, V, V, V, V}; 511empty(Type, V) when Type =:= channel_process_stats; 512 Type =:= queue_process_stats; 513 Type =:= queue_stats_publish; 514 Type =:= queue_exchange_stats_publish; 515 Type =:= exchange_stats_publish_out; 516 Type =:= exchange_stats_publish_in -> 517 {V}; 518empty(node_coarse_stats, V) -> 519 {V, V, V, V, V, V, V, V}; 520empty(node_persister_stats, V) -> 521 {V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V}; 522empty(Type, V) when Type =:= node_node_coarse_stats; 523 Type =:= vhost_stats_coarse_conn_stats; 524 Type =:= queue_msg_rates; 525 Type =:= vhost_msg_rates -> 526 {V, V}; 527empty(connection_churn_rates, V) -> 528 {V, V, V, V, V, V, V}. 529 530retention_policy(connection_stats_coarse_conn_stats) -> 531 basic; 532retention_policy(channel_stats_fine_stats) -> 533 basic; 534retention_policy(channel_queue_stats_deliver_stats) -> 535 detailed; 536retention_policy(channel_exchange_stats_fine_stats) -> 537 detailed; 538retention_policy(channel_process_stats) -> 539 basic; 540retention_policy(vhost_stats_fine_stats) -> 541 global; 542retention_policy(vhost_stats_deliver_stats) -> 543 global; 544retention_policy(vhost_stats_coarse_conn_stats) -> 545 global; 546retention_policy(vhost_msg_rates) -> 547 global; 548retention_policy(channel_stats_deliver_stats) -> 549 basic; 550retention_policy(queue_stats_deliver_stats) -> 551 basic; 552retention_policy(queue_stats_publish) -> 553 basic; 554retention_policy(queue_exchange_stats_publish) -> 555 basic; 556retention_policy(exchange_stats_publish_out) -> 557 basic; 558retention_policy(exchange_stats_publish_in) -> 559 basic; 560retention_policy(queue_process_stats) -> 561 basic; 562retention_policy(queue_msg_stats) -> 563 basic; 564retention_policy(queue_msg_rates) -> 565 basic; 566retention_policy(vhost_msg_stats) -> 567 global; 568retention_policy(node_coarse_stats) -> 569 global; 570retention_policy(node_persister_stats) -> 571 global; 572retention_policy(node_node_coarse_stats) -> 573 global; 574retention_policy(connection_churn_rates) -> 575 global. 576 577format_resource(unknown) -> unknown; 578format_resource(Res) -> format_resource(name, Res). 579 580format_resource(_, unknown) -> 581 unknown; 582format_resource(NameAs, #resource{name = Name, virtual_host = VHost}) -> 583 [{NameAs, Name}, {vhost, VHost}]. 584 585filter_user(List, #user{username = Username, tags = Tags}) -> 586 case is_monitor(Tags) of 587 true -> List; 588 false -> [I || I <- List, pget(user, I) == Username] 589 end. 590 591is_monitor(T) -> intersects(T, [administrator, monitoring]). 592intersects(A, B) -> lists:any(fun(I) -> lists:member(I, B) end, A). 593