1%%-------------------------------------------------------------------- 2%% 3%% %CopyrightBegin% 4%% 5%% Copyright Ericsson AB 2001-2016. All Rights Reserved. 6%% 7%% Licensed under the Apache License, Version 2.0 (the "License"); 8%% you may not use this file except in compliance with the License. 9%% You may obtain a copy of the License at 10%% 11%% http://www.apache.org/licenses/LICENSE-2.0 12%% 13%% Unless required by applicable law or agreed to in writing, software 14%% distributed under the License is distributed on an "AS IS" BASIS, 15%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16%% See the License for the specific language governing permissions and 17%% limitations under the License. 18%% 19%% %CopyrightEnd% 20%% 21%% 22%%---------------------------------------------------------------------- 23%% File : oe_CosEventComm_CAdmin_impl.erl 24%% Description : 25%% 26%%---------------------------------------------------------------------- 27-module(oe_CosEventComm_CAdmin_impl). 28 29%%---------------------------------------------------------------------- 30%% Include files 31%%---------------------------------------------------------------------- 32-include_lib("orber/include/corba.hrl"). 33-include("cosEventApp.hrl"). 34 35%%---------------------------------------------------------------------- 36%% External exports 37%%---------------------------------------------------------------------- 38-export([init/1, 39 terminate/2, 40 code_change/3, 41 handle_info/2]). 42 43%% Exports from "CosEventChannelAdmin::ConsumerAdmin" 44-export([obtain_push_supplier/3, 45 obtain_pull_supplier/3]). 46 47 48%%---------------------------------------------------------------------- 49%% Internal exports 50%%---------------------------------------------------------------------- 51%% Exports from "oe_CosEventComm::Event" 52-export([send/3, send_sync/4]). 53 54%%---------------------------------------------------------------------- 55%% Records 56%%---------------------------------------------------------------------- 57-record(state, {channel_pid, typecheck, maxevents, proxies = [], 58 server_options}). 59 60%%---------------------------------------------------------------------- 61%% Macros 62%%---------------------------------------------------------------------- 63 64 65%%====================================================================== 66%% External functions 67%%====================================================================== 68%%---------------------------------------------------------------------- 69%% Function : init/1 70%% Returns : {ok, State} | 71%% {ok, State, Timeout} | 72%% ignore | 73%% {stop, Reason} 74%% Description: Initiates the server 75%%---------------------------------------------------------------------- 76init([ChannelPid, TypeCheck, MaxEvents, ServerOpts]) -> 77 process_flag(trap_exit, true), 78 {ok, #state{channel_pid = ChannelPid, typecheck = TypeCheck, 79 maxevents = MaxEvents, server_options = ServerOpts}}. 80 81%%---------------------------------------------------------------------- 82%% Function : terminate/2 83%% Returns : any (ignored by gen_server) 84%% Description: Shutdown the server 85%%---------------------------------------------------------------------- 86terminate(_Reason, _State) -> 87 ?DBG("Terminating ~p~n", [_Reason]), 88 ok. 89 90%%---------------------------------------------------------------------- 91%% Function : code_change/3 92%% Returns : {ok, NewState} 93%% Description: Convert process state when code is changed 94%%---------------------------------------------------------------------- 95code_change(_OldVsn, State, _Extra) -> 96 {ok, State}. 97 98%%---------------------------------------------------------------------% 99%% function : handle_info 100%% Arguments: 101%% Returns : {noreply, State} | 102%% {stop, Reason, State} 103%% Effect : Functions demanded by the gen_server module. 104%%---------------------------------------------------------------------- 105handle_info({'EXIT', Pid, Reason}, #state{channel_pid = Pid} = State) -> 106 ?DBG("Parent Channel terminated ~p~n", [Reason]), 107 orber:dbg("[~p] oe_CosEventComm_PullerS_impl:handle_info(~p);~n" 108 "My Channel terminated and so will I which will cause" 109 " my children to do the same thing.", 110 [?LINE, Reason], ?DEBUG_LEVEL), 111 {stop, Reason, State}; 112handle_info({'EXIT', Pid, _Reason}, #state{proxies = Proxies} = State) -> 113 %% A child terminated which is normal. Hence, no logging. 114 ?DBG("Probably a child terminated ~p~n", [_Reason]), 115 {noreply, State#state{proxies = lists:keydelete(Pid, 2, Proxies)}}; 116handle_info(_Info, State) -> 117 ?DBG("Unknown Info ~p~n", [_Info]), 118 {noreply, State}. 119 120%%---------------------------------------------------------------------- 121%% Function : obtain_push_supplier 122%% Arguments : 123%% Returns : 124%% Description: 125%%---------------------------------------------------------------------- 126obtain_push_supplier(_, _, #state{server_options = ServerOpts} = State) -> 127 case catch 'oe_CosEventComm_PusherS':oe_create_link([self(), 128 State#state.typecheck], 129 [{sup_child, true}|ServerOpts]) of 130 {ok, Pid, Proxy} -> 131 ?DBG("Started a new oe_CosEventComm_PusherS.~n", []), 132 {reply, Proxy, State#state{proxies = [{Proxy, Pid}|State#state.proxies]}}; 133 Other -> 134 orber:dbg("[~p] oe_CosEventComm_CAdmin:obtain_push_supplier();~nError: ~p", 135 [?LINE, Other], ?DEBUG_LEVEL), 136 corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}) 137 end. 138 139%%---------------------------------------------------------------------- 140%% Function : obtain_pull_supplier 141%% Arguments : 142%% Returns : 143%% Description: 144%%---------------------------------------------------------------------- 145obtain_pull_supplier(_, _, #state{server_options = ServerOpts} = State) -> 146 case catch 'oe_CosEventComm_PullerS':oe_create_link([self(), 147 State#state.typecheck, 148 State#state.maxevents], 149 [{sup_child, true}|ServerOpts]) of 150 {ok, Pid, Proxy} -> 151 ?DBG("Started a new oe_CosEventComm_PullerS.~n", []), 152 {reply, Proxy, State#state{proxies = [{Proxy, Pid}|State#state.proxies]}}; 153 Other -> 154 orber:dbg("[~p] oe_CosEventComm_CAdmin:obtain_pull_supplier();~nError: ~p", 155 [?LINE, Other], ?DEBUG_LEVEL), 156 corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}) 157 end. 158 159 160%%---------------------------------------------------------------------- 161%% Function : send 162%% Arguments : 163%% Returns : 164%% Description: 165%%---------------------------------------------------------------------- 166send(_, #state{proxies = Proxies} = State, Any) -> 167 ?DBG("Received Event ~p~n", [Any]), 168 case send_helper(Proxies, Any, [], false) of 169 ok -> 170 ?DBG("Received Event and forwarded it successfully.~n", []), 171 {noreply, State}; 172 {error, Dropped} -> 173 ?DBG("Received Event but forward failed to: ~p~n", [Dropped]), 174 RemainingProxies = delete_proxies(Dropped, Proxies), 175 {noreply, State#state{proxies = RemainingProxies}} 176 end. 177 178%%---------------------------------------------------------------------- 179%% Function : send_sync 180%% Arguments : 181%% Returns : 182%% Description: 183%%---------------------------------------------------------------------- 184send_sync(_, OE_From, #state{proxies = Proxies} = State, Any) -> 185 ?DBG("Received Event ~p~n", [Any]), 186 corba:reply(OE_From, ok), 187 case send_helper(Proxies, Any, [], true) of 188 ok -> 189 ?DBG("Received Event and forwarded (sync) it successfully.~n", []), 190 {noreply, State}; 191 {error, Dropped} -> 192 ?DBG("Received Event but forward (sync) failed to: ~p~n", [Dropped]), 193 RemainingProxies = delete_proxies(Dropped, Proxies), 194 {noreply, State#state{proxies = RemainingProxies}} 195 end. 196 197 198%%====================================================================== 199%% Internal functions 200%%====================================================================== 201send_helper([], _, [], _) -> 202 ok; 203send_helper([], _, Dropped, _) -> 204 {error, Dropped}; 205send_helper([{ObjRef, Pid}|T], Event, Dropped, false) -> 206 case catch 'oe_CosEventComm_Event':send(ObjRef, Event) of 207 ok -> 208 send_helper(T, Event, Dropped, false); 209 What -> 210 orber:dbg("[~p] oe_CosEventComm_CAdmin:send_helper(~p, ~p);~n" 211 "Bad return value ~p. Closing connection.", 212 [?LINE, ObjRef, Event, What], ?DEBUG_LEVEL), 213 send_helper(T, Event, [{ObjRef, Pid}|Dropped], false) 214 end; 215send_helper([{ObjRef, Pid}|T], Event, Dropped, Sync) -> 216 case catch 'oe_CosEventComm_Event':send_sync(ObjRef, Event) of 217 ok -> 218 send_helper(T, Event, Dropped, Sync); 219 What -> 220 orber:dbg("[~p] oe_CosEventComm_CAdmin:send_helper(~p, ~p);~n" 221 "Bad return value ~p. Closing connection.", 222 [?LINE, ObjRef, Event, What], ?DEBUG_LEVEL), 223 send_helper(T, Event, [{ObjRef, Pid}|Dropped], Sync) 224 end. 225 226delete_proxies([], RemainingProxies) -> 227 RemainingProxies; 228delete_proxies([{_,Pid}|T], Proxies) -> 229 Rest = lists:keydelete(Pid, 2, Proxies), 230 delete_proxies(T, Rest). 231 232%%====================================================================== 233%% END OF MODULE 234%%====================================================================== 235