1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbit_event_consumer).
9
10-include_lib("rabbit_common/include/rabbit.hrl").
11
12-export([register/4]).
13-export([init/1, handle_call/2, handle_event/2, handle_info/2,
14         terminate/2, code_change/3]).
15
16-record(state, {pid, ref, monitor, pattern}).
17
18%%----------------------------------------------------------------------------
19
20register(Pid, Ref, Duration, Pattern) ->
21    case gen_event:add_handler(rabbit_event, ?MODULE, [Pid, Ref, Duration, Pattern]) of
22        ok ->
23            {ok, Ref};
24        Error ->
25            Error
26    end.
27
28%%----------------------------------------------------------------------------
29
30init([Pid, Ref, Duration, Pattern]) ->
31    MRef = erlang:monitor(process, Pid),
32    case Duration of
33        infinity -> infinity;
34        _        -> erlang:send_after(Duration * 1000, self(), rabbit_event_consumer_timeout)
35    end,
36    {ok, #state{pid = Pid, ref = Ref, monitor = MRef, pattern = Pattern}}.
37
38handle_call(_Request, State) -> {ok, not_understood, State}.
39
40handle_event(#event{type      = Type,
41                    props     = Props,
42                    timestamp = TS,
43                    reference = none}, #state{pid = Pid,
44                                              ref = Ref,
45                                              pattern = Pattern} = State) ->
46    case key(Type) of
47        ignore -> ok;
48        Key    -> case re:run(Key, Pattern, [{capture, none}]) of
49                      match ->
50                          Data = [{'event', Key}] ++
51                              fmt_proplist([{'timestamp_in_ms', TS} | Props]),
52                          Pid ! {Ref, Data, confinue};
53                      _ ->
54                          ok
55                  end
56    end,
57    {ok, State};
58handle_event(_Event, State) ->
59    {ok, State}.
60
61handle_info({'DOWN', MRef, _, _, _}, #state{monitor = MRef}) ->
62    remove_handler;
63handle_info(rabbit_event_consumer_timeout, #state{pid = Pid, ref = Ref}) ->
64    Pid ! {Ref, <<>>, finished},
65    remove_handler;
66handle_info(_Info, State) ->
67    {ok, State}.
68
69terminate(_Arg, #state{monitor = MRef}) ->
70    erlang:demonitor(MRef),
71    ok.
72
73code_change(_OldVsn, State, _Extra) -> {ok, State}.
74
75%%----------------------------------------------------------------------------
76
77%% pattern matching is way more efficient that the string operations,
78%% let's use all the keys we're aware of to speed up the handler.
79%% Any unknown or new one will be processed as before (see last function clause).
80key(queue_deleted) ->
81    <<"queue.deleted">>;
82key(queue_created) ->
83    <<"queue.created">>;
84key(exchange_created) ->
85    <<"exchange.created">>;
86key(exchange_deleted) ->
87    <<"exchange.deleted">>;
88key(binding_created) ->
89    <<"binding.created">>;
90key(connection_created) ->
91    <<"connection.created">>;
92key(connection_closed) ->
93    <<"connection.closed">>;
94key(channel_created) ->
95    <<"channel.created">>;
96key(channel_closed) ->
97    <<"channel.closed">>;
98key(consumer_created) ->
99    <<"consumer.created">>;
100key(consumer_deleted) ->
101    <<"consumer.deleted">>;
102key(queue_stats) ->
103    ignore;
104key(connection_stats) ->
105    ignore;
106key(policy_set) ->
107    <<"policy.set">>;
108key(policy_cleared) ->
109    <<"policy.cleared">>;
110key(parameter_set) ->
111    <<"parameter.set">>;
112key(parameter_cleared) ->
113    <<"parameter.cleared">>;
114key(vhost_created) ->
115    <<"vhost.created">>;
116key(vhost_deleted) ->
117    <<"vhost.deleted">>;
118key(vhost_limits_set) ->
119    <<"vhost.limits.set">>;
120key(vhost_limits_cleared) ->
121    <<"vhost.limits.cleared">>;
122key(user_authentication_success) ->
123    <<"user.authentication.success">>;
124key(user_authentication_failure) ->
125    <<"user.authentication.failure">>;
126key(user_created) ->
127    <<"user.created">>;
128key(user_deleted) ->
129    <<"user.deleted">>;
130key(user_password_changed) ->
131    <<"user.password.changed">>;
132key(user_password_cleared) ->
133    <<"user.password.cleared">>;
134key(user_tags_set) ->
135    <<"user.tags.set">>;
136key(permission_created) ->
137    <<"permission.created">>;
138key(permission_deleted) ->
139    <<"permission.deleted">>;
140key(topic_permission_created) ->
141    <<"topic.permission.created">>;
142key(topic_permission_deleted) ->
143    <<"topic.permission.deleted">>;
144key(alarm_set) ->
145    <<"alarm.set">>;
146key(alarm_cleared) ->
147    <<"alarm.cleared">>;
148key(shovel_worker_status) ->
149    <<"shovel.worker.status">>;
150key(shovel_worker_removed) ->
151    <<"shovel.worker.removed">>;
152key(federation_link_status) ->
153    <<"federation.link.status">>;
154key(federation_link_removed) ->
155    <<"federation.link.removed">>;
156key(S) ->
157    case string:tokens(atom_to_list(S), "_") of
158        [_, "stats"] -> ignore;
159        Tokens       -> list_to_binary(string:join(Tokens, "."))
160    end.
161
162fmt_proplist(Props) ->
163    lists:foldl(fun({K, V}, Acc) ->
164                        case fmt(K, V) of
165                            L when is_list(L) -> lists:append(L, Acc);
166                            T -> [T | Acc]
167                        end
168                end, [], Props).
169
170fmt(K, #resource{virtual_host = VHost,
171                 name         = Name}) -> [{K,           Name},
172                                           {'vhost', VHost}];
173fmt(K, true)                 -> {K, true};
174fmt(K, false)                -> {K, false};
175fmt(K, V) when is_atom(V)    -> {K, atom_to_binary(V, utf8)};
176fmt(K, V) when is_integer(V) -> {K, V};
177fmt(K, V) when is_number(V)  -> {K, V};
178fmt(K, V) when is_binary(V)  -> {K, V};
179fmt(K, [{_, _}|_] = Vs)      -> {K, fmt_proplist(Vs)};
180fmt(K, Vs) when is_list(Vs)  -> {K,  [fmt(V) || V <- Vs]};
181fmt(K, V) when is_pid(V)     -> {K, list_to_binary(rabbit_misc:pid_to_string(V))};
182fmt(K, V)                    -> {K,
183                                 list_to_binary(
184                                   rabbit_misc:format("~1000000000p", [V]))}.
185
186%% Exactly the same as fmt/2, duplicated only for performance issues
187fmt(true)                 -> true;
188fmt(false)                -> false;
189fmt(V) when is_atom(V)    -> atom_to_binary(V, utf8);
190fmt(V) when is_integer(V) -> V;
191fmt(V) when is_number(V)  -> V;
192fmt(V) when is_binary(V)  -> V;
193fmt([{_, _}|_] = Vs)      -> fmt_proplist(Vs);
194fmt(Vs) when is_list(Vs)  -> [fmt(V) || V <- Vs];
195fmt(V) when is_pid(V)     -> list_to_binary(rabbit_misc:pid_to_string(V));
196fmt(V)                    -> list_to_binary(
197                                rabbit_misc:format("~1000000000p", [V])).
198