1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_event_consumer). 9 10-include_lib("rabbit_common/include/rabbit.hrl"). 11 12-export([register/4]). 13-export([init/1, handle_call/2, handle_event/2, handle_info/2, 14 terminate/2, code_change/3]). 15 16-record(state, {pid, ref, monitor, pattern}). 17 18%%---------------------------------------------------------------------------- 19 20register(Pid, Ref, Duration, Pattern) -> 21 case gen_event:add_handler(rabbit_event, ?MODULE, [Pid, Ref, Duration, Pattern]) of 22 ok -> 23 {ok, Ref}; 24 Error -> 25 Error 26 end. 27 28%%---------------------------------------------------------------------------- 29 30init([Pid, Ref, Duration, Pattern]) -> 31 MRef = erlang:monitor(process, Pid), 32 case Duration of 33 infinity -> infinity; 34 _ -> erlang:send_after(Duration * 1000, self(), rabbit_event_consumer_timeout) 35 end, 36 {ok, #state{pid = Pid, ref = Ref, monitor = MRef, pattern = Pattern}}. 37 38handle_call(_Request, State) -> {ok, not_understood, State}. 39 40handle_event(#event{type = Type, 41 props = Props, 42 timestamp = TS, 43 reference = none}, #state{pid = Pid, 44 ref = Ref, 45 pattern = Pattern} = State) -> 46 case key(Type) of 47 ignore -> ok; 48 Key -> case re:run(Key, Pattern, [{capture, none}]) of 49 match -> 50 Data = [{'event', Key}] ++ 51 fmt_proplist([{'timestamp_in_ms', TS} | Props]), 52 Pid ! {Ref, Data, confinue}; 53 _ -> 54 ok 55 end 56 end, 57 {ok, State}; 58handle_event(_Event, State) -> 59 {ok, State}. 60 61handle_info({'DOWN', MRef, _, _, _}, #state{monitor = MRef}) -> 62 remove_handler; 63handle_info(rabbit_event_consumer_timeout, #state{pid = Pid, ref = Ref}) -> 64 Pid ! {Ref, <<>>, finished}, 65 remove_handler; 66handle_info(_Info, State) -> 67 {ok, State}. 68 69terminate(_Arg, #state{monitor = MRef}) -> 70 erlang:demonitor(MRef), 71 ok. 72 73code_change(_OldVsn, State, _Extra) -> {ok, State}. 74 75%%---------------------------------------------------------------------------- 76 77%% pattern matching is way more efficient that the string operations, 78%% let's use all the keys we're aware of to speed up the handler. 79%% Any unknown or new one will be processed as before (see last function clause). 80key(queue_deleted) -> 81 <<"queue.deleted">>; 82key(queue_created) -> 83 <<"queue.created">>; 84key(exchange_created) -> 85 <<"exchange.created">>; 86key(exchange_deleted) -> 87 <<"exchange.deleted">>; 88key(binding_created) -> 89 <<"binding.created">>; 90key(connection_created) -> 91 <<"connection.created">>; 92key(connection_closed) -> 93 <<"connection.closed">>; 94key(channel_created) -> 95 <<"channel.created">>; 96key(channel_closed) -> 97 <<"channel.closed">>; 98key(consumer_created) -> 99 <<"consumer.created">>; 100key(consumer_deleted) -> 101 <<"consumer.deleted">>; 102key(queue_stats) -> 103 ignore; 104key(connection_stats) -> 105 ignore; 106key(policy_set) -> 107 <<"policy.set">>; 108key(policy_cleared) -> 109 <<"policy.cleared">>; 110key(parameter_set) -> 111 <<"parameter.set">>; 112key(parameter_cleared) -> 113 <<"parameter.cleared">>; 114key(vhost_created) -> 115 <<"vhost.created">>; 116key(vhost_deleted) -> 117 <<"vhost.deleted">>; 118key(vhost_limits_set) -> 119 <<"vhost.limits.set">>; 120key(vhost_limits_cleared) -> 121 <<"vhost.limits.cleared">>; 122key(user_authentication_success) -> 123 <<"user.authentication.success">>; 124key(user_authentication_failure) -> 125 <<"user.authentication.failure">>; 126key(user_created) -> 127 <<"user.created">>; 128key(user_deleted) -> 129 <<"user.deleted">>; 130key(user_password_changed) -> 131 <<"user.password.changed">>; 132key(user_password_cleared) -> 133 <<"user.password.cleared">>; 134key(user_tags_set) -> 135 <<"user.tags.set">>; 136key(permission_created) -> 137 <<"permission.created">>; 138key(permission_deleted) -> 139 <<"permission.deleted">>; 140key(topic_permission_created) -> 141 <<"topic.permission.created">>; 142key(topic_permission_deleted) -> 143 <<"topic.permission.deleted">>; 144key(alarm_set) -> 145 <<"alarm.set">>; 146key(alarm_cleared) -> 147 <<"alarm.cleared">>; 148key(shovel_worker_status) -> 149 <<"shovel.worker.status">>; 150key(shovel_worker_removed) -> 151 <<"shovel.worker.removed">>; 152key(federation_link_status) -> 153 <<"federation.link.status">>; 154key(federation_link_removed) -> 155 <<"federation.link.removed">>; 156key(S) -> 157 case string:tokens(atom_to_list(S), "_") of 158 [_, "stats"] -> ignore; 159 Tokens -> list_to_binary(string:join(Tokens, ".")) 160 end. 161 162fmt_proplist(Props) -> 163 lists:foldl(fun({K, V}, Acc) -> 164 case fmt(K, V) of 165 L when is_list(L) -> lists:append(L, Acc); 166 T -> [T | Acc] 167 end 168 end, [], Props). 169 170fmt(K, #resource{virtual_host = VHost, 171 name = Name}) -> [{K, Name}, 172 {'vhost', VHost}]; 173fmt(K, true) -> {K, true}; 174fmt(K, false) -> {K, false}; 175fmt(K, V) when is_atom(V) -> {K, atom_to_binary(V, utf8)}; 176fmt(K, V) when is_integer(V) -> {K, V}; 177fmt(K, V) when is_number(V) -> {K, V}; 178fmt(K, V) when is_binary(V) -> {K, V}; 179fmt(K, [{_, _}|_] = Vs) -> {K, fmt_proplist(Vs)}; 180fmt(K, Vs) when is_list(Vs) -> {K, [fmt(V) || V <- Vs]}; 181fmt(K, V) when is_pid(V) -> {K, list_to_binary(rabbit_misc:pid_to_string(V))}; 182fmt(K, V) -> {K, 183 list_to_binary( 184 rabbit_misc:format("~1000000000p", [V]))}. 185 186%% Exactly the same as fmt/2, duplicated only for performance issues 187fmt(true) -> true; 188fmt(false) -> false; 189fmt(V) when is_atom(V) -> atom_to_binary(V, utf8); 190fmt(V) when is_integer(V) -> V; 191fmt(V) when is_number(V) -> V; 192fmt(V) when is_binary(V) -> V; 193fmt([{_, _}|_] = Vs) -> fmt_proplist(Vs); 194fmt(Vs) when is_list(Vs) -> [fmt(V) || V <- Vs]; 195fmt(V) when is_pid(V) -> list_to_binary(rabbit_misc:pid_to_string(V)); 196fmt(V) -> list_to_binary( 197 rabbit_misc:format("~1000000000p", [V])). 198