1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_channel_interceptor). 9 10-include_lib("rabbit_common/include/rabbit_framing.hrl"). 11-include_lib("rabbit_common/include/rabbit.hrl"). 12 13-export([init/1, intercept_in/3]). 14 15-behaviour(rabbit_registry_class). 16 17-export([added_to_rabbit_registry/2, removed_from_rabbit_registry/1]). 18 19-type(method_name() :: rabbit_framing:amqp_method_name()). 20-type(original_method() :: rabbit_framing:amqp_method_record()). 21-type(processed_method() :: rabbit_framing:amqp_method_record()). 22-type(original_content() :: rabbit_types:maybe(rabbit_types:content())). 23-type(processed_content() :: rabbit_types:maybe(rabbit_types:content())). 24-type(interceptor_state() :: term()). 25 26-callback description() -> [proplists:property()]. 27%% Derive some initial state from the channel. This will be passed back 28%% as the third argument of intercept/3. 29-callback init(rabbit_channel:channel()) -> interceptor_state(). 30-callback intercept(original_method(), original_content(), 31 interceptor_state()) -> 32 {processed_method(), processed_content()} | rabbit_types:amqp_error() | 33 rabbit_misc:channel_or_connection_exit(). 34-callback applies_to() -> list(method_name()). 35 36added_to_rabbit_registry(_Type, _ModuleName) -> 37 rabbit_channel:refresh_interceptors(). 38removed_from_rabbit_registry(_Type) -> 39 rabbit_channel:refresh_interceptors(). 40 41init(Ch) -> 42 Mods = [M || {_, M} <- rabbit_registry:lookup_all(channel_interceptor)], 43 check_no_overlap(Mods), 44 [{Mod, Mod:init(Ch)} || Mod <- Mods]. 45 46check_no_overlap(Mods) -> 47 check_no_overlap1([sets:from_list(Mod:applies_to()) || Mod <- Mods]). 48 49%% Check no non-empty pairwise intersection in a list of sets 50check_no_overlap1(Sets) -> 51 lists:foldl(fun(Set, Union) -> 52 Is = sets:intersection(Set, Union), 53 case sets:size(Is) of 54 0 -> ok; 55 _ -> 56 internal_error("Interceptor: more than one module handles ~p", [Is]) 57 end, 58 sets:union(Set, Union) 59 end, 60 sets:new(), 61 Sets), 62 ok. 63 64intercept_in(M, C, Mods) -> 65 lists:foldl(fun({Mod, ModState}, {M1, C1}) -> 66 call_module(Mod, ModState, M1, C1) 67 end, 68 {M, C}, 69 Mods). 70 71call_module(Mod, St, M, C) -> 72 % this little dance is because Mod might be unloaded at any point 73 case (catch {ok, Mod:intercept(M, C, St)}) of 74 {ok, R} -> validate_response(Mod, M, C, R); 75 {'EXIT', {undef, [{Mod, intercept, _, _} | _]}} -> {M, C} 76 end. 77 78validate_response(Mod, M1, C1, R = {M2, C2}) -> 79 case {validate_method(M1, M2), validate_content(C1, C2)} of 80 {true, true} -> R; 81 {false, _} -> 82 internal_error("Interceptor: ~p expected to return " 83 "method: ~p but returned: ~p", 84 [Mod, rabbit_misc:method_record_type(M1), 85 rabbit_misc:method_record_type(M2)]); 86 {_, false} -> 87 internal_error("Interceptor: ~p expected to return " 88 "content iff content is provided but " 89 "content in = ~p; content out = ~p", 90 [Mod, C1, C2]) 91 end; 92validate_response(_Mod, _M1, _C1, AMQPError = #amqp_error{}) -> 93 internal_error(AMQPError). 94 95validate_method(M, M2) -> 96 rabbit_misc:method_record_type(M) =:= rabbit_misc:method_record_type(M2). 97 98validate_content(none, none) -> true; 99validate_content(#content{}, #content{}) -> true; 100validate_content(_, _) -> false. 101 102%% keep dialyzer happy 103-spec internal_error(rabbit_types:amqp_error()) -> 104 rabbit_misc:channel_or_connection_exit(). 105internal_error(AMQPError = #amqp_error{}) -> 106 rabbit_misc:protocol_error(AMQPError). 107 108-spec internal_error(string(), [any()]) -> 109 rabbit_misc:channel_or_connection_exit(). 110internal_error(Format, Args) -> 111 rabbit_misc:protocol_error(internal_error, Format, Args). 112