1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8%% @private 9-module(amqp_channels_manager). 10 11-include("amqp_client_internal.hrl"). 12 13-behaviour(gen_server). 14 15-export([start_link/3, open_channel/4, set_channel_max/2, is_empty/1, 16 num_channels/1, pass_frame/3, signal_connection_closing/3, 17 process_channel_frame/4]). 18-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, 19 handle_info/2]). 20 21-record(state, {connection, 22 channel_sup_sup, 23 map_num_pa = gb_trees:empty(), %% Number -> {Pid, AState} 24 map_pid_num = #{}, %% Pid -> Number 25 channel_max = ?MAX_CHANNEL_NUMBER, 26 closing = false}). 27 28%%--------------------------------------------------------------------------- 29%% Interface 30%%--------------------------------------------------------------------------- 31 32start_link(Connection, ConnName, ChSupSup) -> 33 gen_server:start_link(?MODULE, [Connection, ConnName, ChSupSup], []). 34 35open_channel(ChMgr, ProposedNumber, Consumer, InfraArgs) -> 36 gen_server:call(ChMgr, {open_channel, ProposedNumber, Consumer, InfraArgs}, 37 amqp_util:call_timeout()). 38 39set_channel_max(ChMgr, ChannelMax) -> 40 gen_server:cast(ChMgr, {set_channel_max, ChannelMax}). 41 42is_empty(ChMgr) -> 43 gen_server:call(ChMgr, is_empty, amqp_util:call_timeout()). 44 45num_channels(ChMgr) -> 46 gen_server:call(ChMgr, num_channels, amqp_util:call_timeout()). 47 48pass_frame(ChMgr, ChNumber, Frame) -> 49 gen_server:cast(ChMgr, {pass_frame, ChNumber, Frame}). 50 51signal_connection_closing(ChMgr, ChannelCloseType, Reason) -> 52 gen_server:cast(ChMgr, {connection_closing, ChannelCloseType, Reason}). 53 54process_channel_frame(Frame, Channel, ChPid, AState) -> 55 case rabbit_command_assembler:process(Frame, AState) of 56 {ok, NewAState} -> NewAState; 57 {ok, Method, NewAState} -> rabbit_channel_common:do(ChPid, Method), 58 NewAState; 59 {ok, Method, Content, NewAState} -> rabbit_channel_common:do(ChPid, Method, 60 Content), 61 NewAState; 62 {error, Reason} -> ChPid ! {channel_exit, Channel, 63 Reason}, 64 AState 65 end. 66 67%%--------------------------------------------------------------------------- 68%% gen_server callbacks 69%%--------------------------------------------------------------------------- 70 71init([Connection, ConnName, ChSupSup]) -> 72 ?store_proc_name(ConnName), 73 {ok, #state{connection = Connection, channel_sup_sup = ChSupSup}}. 74 75terminate(_Reason, _State) -> 76 ok. 77 78code_change(_OldVsn, State, _Extra) -> 79 {ok, State}. 80 81handle_call({open_channel, ProposedNumber, Consumer, InfraArgs}, _, 82 State = #state{closing = false}) -> 83 handle_open_channel(ProposedNumber, Consumer, InfraArgs, State); 84handle_call(is_empty, _, State) -> 85 {reply, internal_is_empty(State), State}; 86handle_call(num_channels, _, State) -> 87 {reply, internal_num_channels(State), State}. 88 89handle_cast({set_channel_max, ChannelMax}, State) -> 90 {noreply, State#state{channel_max = ChannelMax}}; 91handle_cast({pass_frame, ChNumber, Frame}, State) -> 92 {noreply, internal_pass_frame(ChNumber, Frame, State)}; 93handle_cast({connection_closing, ChannelCloseType, Reason}, State) -> 94 handle_connection_closing(ChannelCloseType, Reason, State). 95 96handle_info({'DOWN', _, process, Pid, Reason}, State) -> 97 handle_down(Pid, Reason, State). 98 99%%--------------------------------------------------------------------------- 100%% Internal plumbing 101%%--------------------------------------------------------------------------- 102 103handle_open_channel(ProposedNumber, Consumer, InfraArgs, 104 State = #state{channel_sup_sup = ChSupSup}) -> 105 case new_number(ProposedNumber, State) of 106 {ok, Number} -> 107 {ok, _ChSup, {Ch, AState}} = 108 amqp_channel_sup_sup:start_channel_sup(ChSupSup, InfraArgs, 109 Number, Consumer), 110 NewState = internal_register(Number, Ch, AState, State), 111 erlang:monitor(process, Ch), 112 {reply, {ok, Ch}, NewState}; 113 {error, _} = Error -> 114 {reply, Error, State} 115 end. 116 117new_number(none, #state{channel_max = ChannelMax, map_num_pa = MapNPA}) -> 118 case gb_trees:is_empty(MapNPA) of 119 true -> {ok, 1}; 120 false -> {Smallest, _} = gb_trees:smallest(MapNPA), 121 if Smallest > 1 -> 122 {ok, Smallest - 1}; 123 true -> 124 {Largest, _} = gb_trees:largest(MapNPA), 125 if Largest < ChannelMax -> {ok, Largest + 1}; 126 true -> find_free(MapNPA) 127 end 128 end 129 end; 130new_number(Proposed, State = #state{channel_max = ChannelMax, 131 map_num_pa = MapNPA}) -> 132 IsValid = Proposed > 0 andalso Proposed =< ChannelMax andalso 133 not gb_trees:is_defined(Proposed, MapNPA), 134 case IsValid of true -> {ok, Proposed}; 135 false -> new_number(none, State) 136 end. 137 138find_free(MapNPA) -> 139 find_free(gb_trees:iterator(MapNPA), 1). 140 141find_free(It, Candidate) -> 142 case gb_trees:next(It) of 143 {Number, _, It1} -> if Number > Candidate -> 144 {ok, Number - 1}; 145 Number =:= Candidate -> 146 find_free(It1, Candidate + 1) 147 end; 148 none -> {error, out_of_channel_numbers} 149 end. 150 151handle_down(Pid, Reason, State) -> 152 case internal_lookup_pn(Pid, State) of 153 undefined -> {stop, {error, unexpected_down}, State}; 154 Number -> handle_channel_down(Pid, Number, Reason, State) 155 end. 156 157handle_channel_down(Pid, Number, Reason, State) -> 158 maybe_report_down(Pid, case Reason of {shutdown, R} -> R; 159 _ -> Reason 160 end, 161 State), 162 NewState = internal_unregister(Number, Pid, State), 163 check_all_channels_terminated(NewState), 164 {noreply, NewState}. 165 166maybe_report_down(_Pid, normal, _State) -> 167 ok; 168maybe_report_down(_Pid, shutdown, _State) -> 169 ok; 170maybe_report_down(_Pid, {app_initiated_close, _, _}, _State) -> 171 ok; 172maybe_report_down(_Pid, {server_initiated_close, _, _}, _State) -> 173 ok; 174maybe_report_down(_Pid, {connection_closing, _}, _State) -> 175 ok; 176maybe_report_down(_Pid, {server_misbehaved, AmqpError}, 177 #state{connection = Connection}) -> 178 amqp_gen_connection:server_misbehaved(Connection, AmqpError); 179maybe_report_down(Pid, Other, #state{connection = Connection}) -> 180 amqp_gen_connection:channel_internal_error(Connection, Pid, Other). 181 182check_all_channels_terminated(#state{closing = false}) -> 183 ok; 184check_all_channels_terminated(State = #state{closing = true, 185 connection = Connection}) -> 186 case internal_is_empty(State) of 187 true -> amqp_gen_connection:channels_terminated(Connection); 188 false -> ok 189 end. 190 191handle_connection_closing(ChannelCloseType, Reason, 192 State = #state{connection = Connection}) -> 193 case internal_is_empty(State) of 194 true -> amqp_gen_connection:channels_terminated(Connection); 195 false -> signal_channels_connection_closing(ChannelCloseType, Reason, 196 State) 197 end, 198 {noreply, State#state{closing = true}}. 199 200%%--------------------------------------------------------------------------- 201 202internal_pass_frame(Number, Frame, State) -> 203 case internal_lookup_npa(Number, State) of 204 undefined -> 205 ?LOG_INFO("Dropping frame ~p for invalid or closed " 206 "channel number ~p", [Frame, Number]), 207 State; 208 {ChPid, AState} -> 209 NewAState = process_channel_frame(Frame, Number, ChPid, AState), 210 internal_update_npa(Number, ChPid, NewAState, State) 211 end. 212 213internal_register(Number, Pid, AState, 214 State = #state{map_num_pa = MapNPA, map_pid_num = MapPN}) -> 215 MapNPA1 = gb_trees:enter(Number, {Pid, AState}, MapNPA), 216 MapPN1 = maps:put(Pid, Number, MapPN), 217 State#state{map_num_pa = MapNPA1, 218 map_pid_num = MapPN1}. 219 220internal_unregister(Number, Pid, 221 State = #state{map_num_pa = MapNPA, map_pid_num = MapPN}) -> 222 MapNPA1 = gb_trees:delete(Number, MapNPA), 223 MapPN1 = maps:remove(Pid, MapPN), 224 State#state{map_num_pa = MapNPA1, 225 map_pid_num = MapPN1}. 226 227internal_is_empty(#state{map_num_pa = MapNPA}) -> 228 gb_trees:is_empty(MapNPA). 229 230internal_num_channels(#state{map_num_pa = MapNPA}) -> 231 gb_trees:size(MapNPA). 232 233internal_lookup_npa(Number, #state{map_num_pa = MapNPA}) -> 234 case gb_trees:lookup(Number, MapNPA) of {value, PA} -> PA; 235 none -> undefined 236 end. 237 238internal_lookup_pn(Pid, #state{map_pid_num = MapPN}) -> 239 case maps:find(Pid, MapPN) of {ok, Number} -> Number; 240 error -> undefined 241 end. 242 243internal_update_npa(Number, Pid, AState, State = #state{map_num_pa = MapNPA}) -> 244 State#state{map_num_pa = gb_trees:update(Number, {Pid, AState}, MapNPA)}. 245 246signal_channels_connection_closing(ChannelCloseType, Reason, 247 #state{map_pid_num = MapPN}) -> 248 [amqp_channel:connection_closing(Pid, ChannelCloseType, Reason) 249 || Pid <- maps:keys(MapPN)]. 250