1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2016-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbit_mgmt_data).
9
10-include("rabbit_mgmt_records.hrl").
11-include("rabbit_mgmt_metrics.hrl").
12-include_lib("rabbit_common/include/rabbit.hrl").
13-include_lib("rabbit_common/include/rabbit_core_metrics.hrl").
14
15-export([empty/2, pick_range/2]).
16
17% delegate api
18-export([overview_data/4,
19         consumer_data/2,
20         all_list_queue_data/3,
21         all_detail_queue_data/3,
22         all_exchange_data/3,
23         all_connection_data/3,
24         all_list_channel_data/3,
25         all_detail_channel_data/3,
26         all_vhost_data/3,
27         all_node_data/3,
28         augmented_created_stats/2,
29         augmented_created_stats/3,
30         augment_channel_pids/2,
31         augment_details/2,
32         lookup_element/2,
33         lookup_element/3
34        ]).
35
36-import(rabbit_misc, [pget/2]).
37
38-type maybe_slide() :: exometer_slide:slide() | not_found.
39-type ranges() :: {maybe_range(), maybe_range(), maybe_range(), maybe_range()}.
40-type maybe_range() :: no_range | #range{}.
41
42%%----------------------------------------------------------------------------
43%% Internal, query-time - node-local operations
44%%----------------------------------------------------------------------------
45
46created_stats(Name, Type) ->
47    case ets:select(Type, [{{'_', '$2', '$3'}, [{'==', Name, '$2'}], ['$3']}]) of
48        [] -> not_found;
49        [Elem] -> Elem
50    end.
51
52created_stats(Type) ->
53    %% TODO better tab2list?
54    ets:select(Type, [{{'_', '_', '$3'}, [], ['$3']}]).
55
56-spec all_detail_queue_data(pid(), [any()], ranges())  -> #{atom() => any()}.
57all_detail_queue_data(_Pid, Ids, Ranges) ->
58    lists:foldl(fun (Id, Acc) ->
59                        Data = detail_queue_data(Ranges, Id),
60                        maps:put(Id, Data, Acc)
61                end, #{}, Ids).
62
63all_list_queue_data(_Pid, Ids, Ranges) ->
64    lists:foldl(fun (Id, Acc) ->
65                        Data = list_queue_data(Ranges, Id),
66                        maps:put(Id, Data, Acc)
67                end, #{}, Ids).
68
69all_detail_channel_data(_Pid, Ids, Ranges) ->
70    lists:foldl(fun (Id, Acc) ->
71                        Data = detail_channel_data(Ranges, Id),
72                        maps:put(Id, Data, Acc)
73                end, #{}, Ids).
74
75all_list_channel_data(_Pid, Ids, Ranges) ->
76    lists:foldl(fun (Id, Acc) ->
77                        Data = list_channel_data(Ranges, Id),
78                        maps:put(Id, Data, Acc)
79                end, #{}, Ids).
80
81connection_data(Ranges, Id) ->
82    maps:from_list([raw_message_data(connection_stats_coarse_conn_stats,
83                                     pick_range(coarse_conn_stats, Ranges), Id),
84                    {connection_stats, lookup_element(connection_stats, Id)}]).
85
86exchange_data(Ranges, Id) ->
87    maps:from_list(
88      exchange_raw_detail_stats_data(Ranges, Id) ++
89      [raw_message_data(exchange_stats_publish_out,
90                        pick_range(fine_stats, Ranges), Id),
91       raw_message_data(exchange_stats_publish_in,
92                        pick_range(fine_stats, Ranges), Id)]).
93
94vhost_data(Ranges, Id) ->
95    maps:from_list([raw_message_data(vhost_stats_coarse_conn_stats,
96                                     pick_range(coarse_conn_stats, Ranges), Id),
97                    raw_message_data(vhost_msg_stats,
98                                     pick_range(queue_msg_rates, Ranges), Id),
99                    raw_message_data(vhost_stats_fine_stats,
100                                     pick_range(fine_stats, Ranges), Id),
101                    raw_message_data(vhost_stats_deliver_stats,
102                                     pick_range(deliver_get, Ranges), Id)]).
103
104node_data(Ranges, Id) ->
105    maps:from_list(
106      [{mgmt_stats, mgmt_queue_length_stats(Id)}] ++
107      [{node_node_metrics, node_node_metrics()}] ++
108      node_raw_detail_stats_data(Ranges, Id) ++
109      [raw_message_data(node_coarse_stats,
110                        pick_range(coarse_node_stats, Ranges), Id),
111       raw_message_data(node_persister_stats,
112                        pick_range(coarse_node_stats, Ranges), Id),
113       {node_stats, lookup_element(node_stats, Id)}] ++
114      node_connection_churn_rates_data(Ranges, Id)).
115
116overview_data(_Pid, User, Ranges, VHosts) ->
117    Raw = [raw_all_message_data(vhost_msg_stats, pick_range(queue_msg_counts, Ranges), VHosts),
118           raw_all_message_data(vhost_stats_fine_stats, pick_range(fine_stats, Ranges), VHosts),
119           raw_all_message_data(vhost_msg_rates, pick_range(queue_msg_rates, Ranges), VHosts),
120           raw_all_message_data(vhost_stats_deliver_stats, pick_range(deliver_get, Ranges), VHosts),
121           raw_message_data(connection_churn_rates, pick_range(queue_msg_rates, Ranges), node())],
122    maps:from_list(Raw ++
123                   [{connections_count, count_created_stats(connection_created_stats, User)},
124                    {channels_count, count_created_stats(channel_created_stats, User)},
125                    {consumers_count, ets:info(consumer_stats, size)}]).
126
127consumer_data(_Pid, VHost) ->
128    maps:from_list(
129    [{C, augment_msg_stats(augment_consumer(C))}
130     || C <- consumers_by_vhost(VHost)]).
131
132all_connection_data(_Pid, Ids, Ranges) ->
133    maps:from_list([{Id, connection_data(Ranges, Id)} || Id <- Ids]).
134
135all_exchange_data(_Pid, Ids, Ranges) ->
136    maps:from_list([{Id, exchange_data(Ranges, Id)} || Id <- Ids]).
137
138all_vhost_data(_Pid, Ids, Ranges) ->
139    maps:from_list([{Id, vhost_data(Ranges, Id)} || Id <- Ids]).
140
141all_node_data(_Pid, Ids, Ranges) ->
142    maps:from_list([{Id, node_data(Ranges, Id)} || Id <- Ids]).
143
144channel_raw_message_data(Ranges, Id) ->
145    [raw_message_data(channel_stats_fine_stats, pick_range(fine_stats, Ranges), Id),
146     raw_message_data(channel_stats_deliver_stats, pick_range(deliver_get, Ranges), Id),
147     raw_message_data(channel_process_stats, pick_range(process_stats, Ranges), Id)].
148
149queue_raw_message_data(Ranges, Id) ->
150    [raw_message_data(queue_stats_publish, pick_range(fine_stats, Ranges), Id),
151     raw_message_data(queue_stats_deliver_stats, pick_range(deliver_get, Ranges), Id),
152     raw_message_data(queue_process_stats, pick_range(process_stats, Ranges), Id),
153     raw_message_data(queue_msg_stats, pick_range(queue_msg_counts, Ranges), Id)].
154
155queue_raw_deliver_stats_data(Ranges, Id) ->
156     [raw_message_data2(channel_queue_stats_deliver_stats,
157                        pick_range(deliver_get, Ranges), Key)
158      || Key <- get_table_keys(channel_queue_stats_deliver_stats, second(Id))] ++
159     [raw_message_data2(queue_exchange_stats_publish,
160                        pick_range(fine_stats, Ranges), Key)
161      || Key <- get_table_keys(queue_exchange_stats_publish, first(Id))].
162
163node_raw_detail_stats_data(Ranges, Id) ->
164     [raw_message_data2(node_node_coarse_stats,
165                        pick_range(coarse_node_node_stats, Ranges), Key)
166      || Key <- get_table_keys(node_node_coarse_stats, first(Id))].
167
168node_connection_churn_rates_data(Ranges, Id) ->
169    [raw_message_data(connection_churn_rates,
170                      pick_range(churn_rates, Ranges), Id)].
171
172exchange_raw_detail_stats_data(Ranges, Id) ->
173     [raw_message_data2(channel_exchange_stats_fine_stats,
174                        pick_range(fine_stats, Ranges), Key)
175      || Key <- get_table_keys(channel_exchange_stats_fine_stats, second(Id))] ++
176     [raw_message_data2(queue_exchange_stats_publish,
177                        pick_range(fine_stats, Ranges), Key)
178      || Key <- get_table_keys(queue_exchange_stats_publish, second(Id))].
179
180channel_raw_detail_stats_data(Ranges, Id) ->
181     [raw_message_data2(channel_exchange_stats_fine_stats,
182                        pick_range(fine_stats, Ranges), Key)
183      || Key <- get_table_keys(channel_exchange_stats_fine_stats, first(Id))] ++
184     [raw_message_data2(channel_queue_stats_deliver_stats,
185                        pick_range(fine_stats, Ranges), Key)
186      || Key <- get_table_keys(channel_queue_stats_deliver_stats, first(Id))].
187
188raw_message_data2(Table, no_range, Id) ->
189    SmallSample = lookup_smaller_sample(Table, Id),
190    {{Table, Id}, {SmallSample, not_found}};
191raw_message_data2(Table, Range, Id) ->
192    SmallSample = lookup_smaller_sample(Table, Id),
193    Samples = lookup_samples(Table, Id, Range),
194    {{Table, Id}, {SmallSample, Samples}}.
195
196detail_queue_data(Ranges, Id) ->
197    maps:from_list(queue_raw_message_data(Ranges, Id) ++
198                   queue_raw_deliver_stats_data(Ranges, Id) ++
199                   [{queue_stats, lookup_element(queue_stats, Id)},
200                    {consumer_stats, get_queue_consumer_stats(Id)}]).
201
202list_queue_data(Ranges, Id) ->
203    maps:from_list(queue_raw_message_data(Ranges, Id) ++
204                   queue_raw_deliver_stats_data(Ranges, Id) ++
205                   [{queue_stats, lookup_element(queue_stats, Id)}]).
206
207detail_channel_data(Ranges, Id) ->
208    maps:from_list(channel_raw_message_data(Ranges, Id) ++
209                   channel_raw_detail_stats_data(Ranges, Id) ++
210                   [{channel_stats, lookup_element(channel_stats, Id)},
211                    {consumer_stats, get_consumer_stats(Id)}]).
212
213list_channel_data(Ranges, Id) ->
214    maps:from_list(channel_raw_message_data(Ranges, Id) ++
215                   channel_raw_detail_stats_data(Ranges, Id) ++
216                   [{channel_stats, lookup_element(channel_stats, Id)}]).
217
218-spec raw_message_data(atom(), maybe_range(), any()) ->
219    {atom(), {maybe_slide(), maybe_slide()}}.
220raw_message_data(Table, no_range, Id) ->
221    SmallSample = lookup_smaller_sample(Table, Id),
222    {Table, {SmallSample, not_found}};
223raw_message_data(Table, Range, Id) ->
224    SmallSample = lookup_smaller_sample(Table, Id),
225    Samples = lookup_samples(Table, Id, Range),
226    {Table, {SmallSample, Samples}}.
227
228raw_all_message_data(Table, Range, VHosts) ->
229    SmallSample = lookup_all(Table, VHosts, select_smaller_sample(Table)),
230    RangeSample = case Range of
231                      no_range -> not_found;
232                      _ ->
233                          lookup_all(Table, VHosts, select_range_sample(Table,
234                                                                        Range))
235                  end,
236    {Table, {SmallSample, RangeSample}}.
237
238get_queue_consumer_stats(Id) ->
239    Consumers = ets:select(consumer_stats, match_queue_consumer_spec(Id)),
240    [augment_consumer(C) || C <- Consumers].
241
242get_consumer_stats(Id) ->
243    Consumers = ets:select(consumer_stats, match_consumer_spec(Id)),
244    [augment_consumer(C) || C <- Consumers].
245
246count_created_stats(Type, all) ->
247    ets:info(Type, size);
248count_created_stats(Type, User) ->
249    length(filter_user(created_stats(Type), User)).
250
251augment_consumer({{Q, Ch, CTag}, Props}) ->
252    [{queue, format_resource(Q)},
253     {channel_details, augment_channel_pid(Ch)},
254     {channel_pid, Ch},
255     {consumer_tag, CTag} | Props].
256
257consumers_by_vhost(VHost) ->
258    ets:select(consumer_stats,
259               [{{{#resource{virtual_host = '$1', _ = '_'}, '_', '_'}, '_'},
260                 [{'orelse', {'==', 'all', VHost}, {'==', VHost, '$1'}}],
261                 ['$_']}]).
262
263augment_msg_stats(Props) ->
264    augment_details(Props, []) ++ Props.
265
266augment_details([{_, none} | T], Acc) ->
267    augment_details(T, Acc);
268augment_details([{_, unknown} | T], Acc) ->
269    augment_details(T, Acc);
270augment_details([{connection, Value} | T], Acc) ->
271    augment_details(T, [{connection_details, augment_connection_pid(Value)} | Acc]);
272augment_details([{channel, Value} | T], Acc) ->
273    augment_details(T, [{channel_details, augment_channel_pid(Value)} | Acc]);
274augment_details([{owner_pid, Value} | T], Acc) ->
275    augment_details(T, [{owner_pid_details, augment_connection_pid(Value)} | Acc]);
276augment_details([_ | T], Acc) ->
277    augment_details(T, Acc);
278augment_details([], Acc) ->
279    Acc.
280
281augment_channel_pids(_Pid, ChPids) ->
282    lists:map(fun (ChPid) -> augment_channel_pid(ChPid) end, ChPids).
283
284augment_channel_pid(Pid) ->
285    Ch = lookup_channel_with_fallback_to_connection(Pid),
286    Conn = lookup_element(connection_created_stats, pget(connection, Ch), 3),
287    case Conn of
288    [] -> %% If the connection has just been opened, we might not yet have the data
289        [];
290    _ ->
291        [{name,            pget(name,   Ch)},
292         {pid,             pget(pid,    Ch)},
293         {number,          pget(number, Ch)},
294         {user,            pget(user,   Ch)},
295         {connection_name, pget(name,         Conn)},
296         {peer_port,       pget(peer_port,    Conn)},
297         {peer_host,       pget(peer_host,    Conn)}]
298    end.
299
300lookup_channel_with_fallback_to_connection(ChannelOrConnectionPid) ->
301    % stream consumers report a stream connection PID for their channel PID,
302    % so we adapt to this here
303    case lookup_element(channel_created_stats, ChannelOrConnectionPid, 3) of
304        [] ->
305            case lookup_element(connection_created_stats, ChannelOrConnectionPid, 3) of
306                [] ->
307                    % not a channel and not a connection, not much we can do here
308                    [{pid, ChannelOrConnectionPid}];
309                Conn ->
310                    [{name, <<"">>},
311                     {pid, ChannelOrConnectionPid},
312                     {number, 0},
313                     {user, pget(user, Conn)},
314                     {connection, ChannelOrConnectionPid}]
315            end;
316        Ch ->
317            Ch
318    end.
319
320augment_connection_pid(Pid) ->
321    Conn = lookup_element(connection_created_stats, Pid, 3),
322    case Conn of
323    [] -> %% If the connection has just been opened, we might not yet have the data
324        [];
325    _ ->
326        [{name,         pget(name,         Conn)},
327         {peer_port,    pget(peer_port,    Conn)},
328         {peer_host,    pget(peer_host,    Conn)}]
329    end.
330
331augmented_created_stats(_Pid, Key, Type) ->
332    case created_stats(Key, Type) of
333        not_found -> not_found;
334        S -> augment_msg_stats(S)
335    end.
336
337augmented_created_stats(_Pid, Type) ->
338    [ augment_msg_stats(S) || S <- created_stats(Type) ].
339
340match_consumer_spec(Id) ->
341    [{{{'_', '$1', '_'}, '_'}, [{'==', Id, '$1'}], ['$_']}].
342
343match_queue_consumer_spec(Id) ->
344    [{{{'$1', '_', '_'}, '_'}, [{'==', {Id}, '$1'}], ['$_']}].
345
346lookup_element(Table, Key) -> lookup_element(Table, Key, 2).
347
348lookup_element(Table, Key, Pos) ->
349    try ets:lookup_element(Table, Key, Pos)
350    catch error:badarg -> []
351    end.
352
353-spec lookup_smaller_sample(atom(), any()) -> maybe_slide().
354lookup_smaller_sample(Table, Id) ->
355    case ets:lookup(Table, {Id, select_smaller_sample(Table)}) of
356        [] ->
357            not_found;
358        [{_, Slide}] ->
359            Slide1 = exometer_slide:optimize(Slide),
360            maybe_convert_for_compatibility(Table, Slide1)
361    end.
362
363-spec lookup_samples(atom(), any(), #range{}) -> maybe_slide().
364lookup_samples(Table, Id, Range) ->
365    case ets:lookup(Table, {Id, select_range_sample(Table, Range)}) of
366        [] ->
367            not_found;
368        [{_, Slide}] ->
369            Slide1 = exometer_slide:optimize(Slide),
370            maybe_convert_for_compatibility(Table, Slide1)
371    end.
372
373lookup_all(Table, Ids, SecondKey) ->
374    Slides = lists:foldl(fun(Id, Acc) ->
375                                 case ets:lookup(Table, {Id, SecondKey}) of
376                                     [] ->
377                                         Acc;
378                                     [{_, Slide}] ->
379                                         [Slide | Acc]
380                                 end
381                         end, [], Ids),
382    case Slides of
383        [] ->
384            not_found;
385        _ ->
386            Slide = exometer_slide:sum(Slides, empty(Table, 0)),
387            maybe_convert_for_compatibility(Table, Slide)
388    end.
389
390maybe_convert_for_compatibility(Table, Slide)
391  when Table =:= channel_stats_fine_stats orelse
392       Table =:= channel_exchange_stats_fine_stats orelse
393       Table =:= vhost_stats_fine_stats ->
394     ConversionNeeded = rabbit_feature_flags:is_disabled(
395                          drop_unroutable_metric),
396     case ConversionNeeded of
397         false ->
398             Slide;
399         true ->
400             %% drop_drop because the metric is named "drop_unroutable"
401             rabbit_mgmt_data_compat:drop_drop_unroutable_metric(Slide)
402     end;
403maybe_convert_for_compatibility(Table, Slide)
404  when Table =:= channel_queue_stats_deliver_stats orelse
405       Table =:= channel_stats_deliver_stats orelse
406       Table =:= queue_stats_deliver_stats orelse
407       Table =:= vhost_stats_deliver_stats ->
408    ConversionNeeded = rabbit_feature_flags:is_disabled(
409                         empty_basic_get_metric),
410    case ConversionNeeded of
411        false ->
412            Slide;
413        true ->
414            rabbit_mgmt_data_compat:drop_get_empty_queue_metric(Slide)
415    end;
416maybe_convert_for_compatibility(_, Slide) ->
417    Slide.
418
419get_table_keys(Table, Id0) ->
420    ets:select(Table, match_spec_keys(Id0)).
421
422match_spec_keys(Id) ->
423    MatchCondition = to_match_condition(Id),
424    MatchHead = {{{'$1', '$2'}, '_'}, '_'},
425    [{MatchHead, [MatchCondition], [{{'$1', '$2'}}]}].
426
427to_match_condition({'_', Id1}) when is_tuple(Id1) ->
428    {'==', {Id1}, '$2'};
429to_match_condition({'_', Id1}) ->
430    {'==', Id1, '$2'};
431to_match_condition({Id0, '_'}) when is_tuple(Id0) ->
432    {'==', {Id0}, '$1'};
433to_match_condition({Id0, '_'}) ->
434    {'==', Id0, '$1'}.
435
436mgmt_queue_length_stats(Id) when Id =:= node() ->
437    GCsQueueLengths = lists:map(fun (T) ->
438        case whereis(rabbit_mgmt_metrics_gc:name(T)) of
439            P when is_pid(P) ->
440                {message_queue_len, Len} =
441                    erlang:process_info(P, message_queue_len),
442                    {T, Len};
443            _ -> {T, 0}
444        end
445    end,
446    ?GC_EVENTS),
447    [{metrics_gc_queue_length, GCsQueueLengths}];
448mgmt_queue_length_stats(_Id) ->
449    % if it isn't for the current node just return an empty list
450    [].
451
452node_node_metrics() ->
453    maps:from_list(ets:tab2list(node_node_metrics)).
454
455select_range_sample(Table, #range{first = First, last = Last}) ->
456    Range = Last - First,
457    Policies = rabbit_mgmt_agent_config:get_env(sample_retention_policies),
458    Policy = retention_policy(Table),
459    [T | _] = TablePolicies = lists:sort(proplists:get_value(Policy, Policies)),
460    {_, Sample} = select_smallest_above(T, TablePolicies, Range),
461    Sample.
462
463select_smaller_sample(Table) ->
464    Policies = rabbit_mgmt_agent_config:get_env(sample_retention_policies),
465    Policy = retention_policy(Table),
466    TablePolicies = proplists:get_value(Policy, Policies),
467    [V | _] = lists:sort([I || {_, I} <- TablePolicies]),
468    V.
469
470select_smallest_above(V, [], _) ->
471    V;
472select_smallest_above(_, [{H, _} = S | _T], Interval) when (H * 1000) > Interval ->
473    S;
474select_smallest_above(_, [H | T], Interval) ->
475    select_smallest_above(H, T, Interval).
476
477pick_range(queue_msg_counts, {RangeL, _RangeM, _RangeD, _RangeN}) ->
478    RangeL;
479pick_range(K, {_RangeL, RangeM, _RangeD, _RangeN}) when K == fine_stats;
480                                                        K == deliver_get;
481                                                        K == queue_msg_rates ->
482    RangeM;
483pick_range(K, {_RangeL, _RangeM, RangeD, _RangeN}) when K == coarse_conn_stats;
484                                                        K == process_stats ->
485    RangeD;
486pick_range(K, {_RangeL, _RangeM, _RangeD, RangeN})
487  when K == coarse_node_stats;
488       K == coarse_node_node_stats;
489       K == churn_rates ->
490    RangeN.
491
492first(Id)  ->
493    {Id, '_'}.
494
495second(Id) ->
496    {'_', Id}.
497
498empty(Type, V) when Type =:= connection_stats_coarse_conn_stats;
499            Type =:= queue_msg_stats;
500            Type =:= vhost_msg_stats ->
501    {V, V, V};
502empty(Type, V) when Type =:= channel_stats_fine_stats;
503            Type =:= channel_exchange_stats_fine_stats;
504            Type =:= vhost_stats_fine_stats ->
505    {V, V, V, V};
506empty(Type, V) when Type =:= channel_queue_stats_deliver_stats;
507            Type =:= queue_stats_deliver_stats;
508            Type =:= vhost_stats_deliver_stats;
509            Type =:= channel_stats_deliver_stats ->
510    {V, V, V, V, V, V, V, V};
511empty(Type, V) when Type =:= channel_process_stats;
512            Type =:= queue_process_stats;
513            Type =:= queue_stats_publish;
514            Type =:= queue_exchange_stats_publish;
515            Type =:= exchange_stats_publish_out;
516            Type =:= exchange_stats_publish_in ->
517    {V};
518empty(node_coarse_stats, V) ->
519    {V, V, V, V, V, V, V, V};
520empty(node_persister_stats, V) ->
521    {V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V};
522empty(Type, V) when Type =:= node_node_coarse_stats;
523            Type =:= vhost_stats_coarse_conn_stats;
524            Type =:= queue_msg_rates;
525            Type =:= vhost_msg_rates ->
526    {V, V};
527empty(connection_churn_rates, V) ->
528    {V, V, V, V, V, V, V}.
529
530retention_policy(connection_stats_coarse_conn_stats) ->
531    basic;
532retention_policy(channel_stats_fine_stats) ->
533    basic;
534retention_policy(channel_queue_stats_deliver_stats) ->
535    detailed;
536retention_policy(channel_exchange_stats_fine_stats) ->
537    detailed;
538retention_policy(channel_process_stats) ->
539    basic;
540retention_policy(vhost_stats_fine_stats) ->
541    global;
542retention_policy(vhost_stats_deliver_stats) ->
543    global;
544retention_policy(vhost_stats_coarse_conn_stats) ->
545    global;
546retention_policy(vhost_msg_rates) ->
547    global;
548retention_policy(channel_stats_deliver_stats) ->
549    basic;
550retention_policy(queue_stats_deliver_stats) ->
551    basic;
552retention_policy(queue_stats_publish) ->
553    basic;
554retention_policy(queue_exchange_stats_publish) ->
555    basic;
556retention_policy(exchange_stats_publish_out) ->
557    basic;
558retention_policy(exchange_stats_publish_in) ->
559    basic;
560retention_policy(queue_process_stats) ->
561    basic;
562retention_policy(queue_msg_stats) ->
563    basic;
564retention_policy(queue_msg_rates) ->
565    basic;
566retention_policy(vhost_msg_stats) ->
567    global;
568retention_policy(node_coarse_stats) ->
569    global;
570retention_policy(node_persister_stats) ->
571    global;
572retention_policy(node_node_coarse_stats) ->
573    global;
574retention_policy(connection_churn_rates) ->
575    global.
576
577format_resource(unknown) -> unknown;
578format_resource(Res)     -> format_resource(name, Res).
579
580format_resource(_, unknown) ->
581    unknown;
582format_resource(NameAs, #resource{name = Name, virtual_host = VHost}) ->
583    [{NameAs, Name}, {vhost, VHost}].
584
585filter_user(List, #user{username = Username, tags = Tags}) ->
586    case is_monitor(Tags) of
587        true  -> List;
588        false -> [I || I <- List, pget(user, I) == Username]
589    end.
590
591is_monitor(T)     -> intersects(T, [administrator, monitoring]).
592intersects(A, B) -> lists:any(fun(I) -> lists:member(I, B) end, A).
593