1%%-------------------------------------------------------------------- 2%% 3%% %CopyrightBegin% 4%% 5%% Copyright Ericsson AB 1999-2016. All Rights Reserved. 6%% 7%% Licensed under the Apache License, Version 2.0 (the "License"); 8%% you may not use this file except in compliance with the License. 9%% You may obtain a copy of the License at 10%% 11%% http://www.apache.org/licenses/LICENSE-2.0 12%% 13%% Unless required by applicable law or agreed to in writing, software 14%% distributed under the License is distributed on an "AS IS" BASIS, 15%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16%% See the License for the specific language governing permissions and 17%% limitations under the License. 18%% 19%% %CopyrightEnd% 20%% 21%% 22%%------------------------------------------------------------------- 23%% File : CosNotifyChannelAdmin_ConsumerAdmin_impl.erl 24%% Purpose : 25%%------------------------------------------------------------------- 26 27-module('CosNotifyChannelAdmin_ConsumerAdmin_impl'). 28 29%%--------------- INCLUDES ----------------------------------- 30-include_lib("orber/include/corba.hrl"). 31-include_lib("orber/include/ifr_types.hrl"). 32%% Application files 33-include("CosNotification.hrl"). 34-include("CosNotifyChannelAdmin.hrl"). 35-include("CosNotifyComm.hrl"). 36-include("CosNotifyFilter.hrl"). 37-include("CosNotification_Definitions.hrl"). 38 39%%--------------- EXPORTS ------------------------------------ 40%%--------------- External ----------------------------------- 41%%----- CosNotifyChannelAdmin::ConsumerAdmin ----------------- 42-export([get_proxy_supplier/4, 43 obtain_notification_pull_supplier/4, 44 obtain_notification_push_supplier/4, 45 destroy/3]). 46 47%%----- Inherit from CosNotification::QoSAdmin --------------- 48-export([get_qos/3, 49 set_qos/4, 50 validate_qos/4]). 51 52%%----- Inherit from CosNotifyComm::NotifySubscribe ---------- 53-export([subscription_change/5]). 54 55%%----- Inherit from CosNotifyFilter::FilterAdmin ------------ 56-export([add_filter/4, 57 remove_filter/4, 58 get_filter/4, 59 get_all_filters/3, 60 remove_all_filters/3]). 61 62%%----- Inherit from CosEventChannelAdmin::ConsumerAdmin ----- 63-export([obtain_push_supplier/3, 64 obtain_pull_supplier/3]). 65 66%% Attributes (external) 67-export(['_get_MyID'/3, 68 '_get_MyChannel'/3, 69 '_get_MyOperator'/3, 70 '_get_priority_filter'/3, 71 '_set_priority_filter'/4, 72 '_get_lifetime_filter'/3, 73 '_set_lifetime_filter'/4, 74 '_get_pull_suppliers'/3, 75 '_get_push_suppliers'/3]). 76 77%%--------------- Internal ----------------------------------- 78%%----- Inherit from cosNotificationComm --------------------- 79-export([callAny/5, 80 callSeq/5]). 81 82%%--------------- gen_server specific exports ---------------- 83-export([handle_info/2, code_change/3]). 84-export([init/1, terminate/2]). 85 86%%--------------- LOCAL DEFINITIONS -------------------------- 87%% Data structures 88-record(state, {myId, 89 myChannel, 90 myChannelPid, 91 myOperator, 92 myFilters = [], 93 mySuppliers = [], 94 idCounter = 0, 95 priorityFilter, 96 lifetimeFilter, 97 etsR, 98 qosGlobal, 99 qosLocal, 100 options}). 101 102%% Data structures constructors 103-define(get_InitState(_MyID, _MyCh, _MyChP, _MyOp, _PFil, _LFil, _QoS, _LQS, _O), 104 #state{myId = _MyID, 105 myChannel = _MyCh, 106 myChannelPid = _MyChP, 107 myOperator = _MyOp, 108 priorityFilter = _PFil, 109 lifetimeFilter = _LFil, 110 qosGlobal = _QoS, 111 qosLocal = _LQS, 112 options = _O, 113 etsR = ets:new(oe_ets, [set, protected])}). 114 115%% Data structures selectors 116-define(get_PushSupplierIDs(S), find_ids(S#state.mySuppliers, pusher)). 117-define(get_PullSupplierIDs(S), find_ids(S#state.mySuppliers, puller)). 118-define(get_AllSuppliers(S), S#state.mySuppliers). 119-define(get_AllSupplierRefs(S), find_refs(S#state.mySuppliers)). 120-define(get_Supplier(S, I), find_obj(lists:keysearch(I,1,S#state.mySuppliers), 121 supplier)). 122 123-define(get_MyID(S), S#state.myId). 124-define(get_MyChannel(S), S#state.myChannel). 125-define(get_MyChannelPid(S), S#state.myChannelPid). 126-define(get_MyOperator(S), S#state.myOperator). 127-define(get_PrioFilter(S), S#state.priorityFilter). 128-define(get_LifeFilter(S), S#state.lifetimeFilter). 129-define(get_GlobalQoS(S), S#state.qosGlobal). 130-define(get_LocalQoS(S), S#state.qosLocal). 131-define(get_BothQoS(S), {S#state.qosGlobal, S#state.qosLocal}). 132-define(get_Filter(S, I), find_obj(lists:keysearch(I, 1, S#state.myFilters), 133 filter)). 134-define(get_AllFilter(S), S#state.myFilters). 135-define(get_AllFilterID(S), find_ids(S#state.myFilters)). 136-define(get_Options(S), S#state.options). 137-define(get_IdCounter(S), S#state.idCounter). 138 139%% Data structures modifiers 140-define(set_PrioFilter(S,D), S#state{priorityFilter=D}). 141-define(set_LifeFilter(S,D), S#state{lifetimeFilter=D}). 142-define(set_LocalQoS(S,D), S#state{qosLocal=D}). 143-define(set_GlobalQoS(S,D), S#state{qosGlobal=D}). 144-define(set_BothQoS(S,GD,LD), S#state{qosGlobal=GD, qosLocal=LD}). 145-define(add_PushSupplier(S,I,R,P),S#state{mySuppliers=[{I,R,P,pusher}|S#state.mySuppliers]}). 146-define(add_PullSupplier(S,I,R,P),S#state{mySuppliers=[{I,R,P,puller}|S#state.mySuppliers]}). 147-define(del_Supplier(S,I), S#state{mySuppliers= 148 lists:keydelete(I, 1, 149 S#state.mySuppliers)}). 150-define(del_SupplierRef(S,O), S#state{mySuppliers= 151 lists:keydelete(O, 2, 152 S#state.mySuppliers)}). 153-define(del_SupplierPid(S,P), S#state{mySuppliers= 154 lists:keydelete(P, 3, 155 S#state.mySuppliers)}). 156-define(add_Filter(S,I,O), S#state{myFilters=[{I,O}|S#state.myFilters]}). 157-define(del_Filter(S,I), S#state{myFilters= 158 delete_filter(lists:keydelete(I, 1, 159 S#state.myFilters), 160 S#state.myFilters)}). 161-define(del_AllFilter(S), S#state{myFilters=[]}). 162-define(set_IdCounter(S,V), S#state{idCounter=V}). 163-define(new_Id(S), 'CosNotification_Common':create_id( 164 S#state.idCounter)). 165 166%% MISC 167-define(is_PersistentConnection(S), 168 ?not_GetConnectionReliability((S#state.qosLocal)) == ?not_Persistent). 169-define(is_PersistentEvent(S), 170 ?not_GetEventReliability((S#state.qosLocal)) == ?not_Persistent). 171-define(is_ANDOP(S), S#state.myOperator == 'AND_OP'). 172 173%%----------------------------------------------------------% 174%% function : handle_info, code_change 175%% Arguments: 176%% Returns : 177%% Effect : Functions demanded by the gen_server module. 178%%----------------------------------------------------------- 179 180code_change(_OldVsn, State, _Extra) -> 181 {ok, State}. 182 183handle_info(Info, State) -> 184 case Info of 185 {'EXIT', Pid, Reason} when ?get_MyChannelPid(State) == Pid -> 186 ?DBG("PARENT CHANNEL: ~p TERMINATED.~n",[Reason]), 187 {stop, Reason, State}; 188 {'EXIT', Pid, normal} -> 189 {noreply, ?del_SupplierPid(State, Pid)}; 190 _Other -> 191 {noreply, State} 192 end. 193 194%%----------------------------------------------------------% 195%% function : init, terminate 196%% Arguments: 197%%----------------------------------------------------------- 198 199init([MyId, MyChannel, MyChannelPid, MyOperator, InitQoS, LQS, Options]) -> 200 process_flag(trap_exit, true), 201 PriorityFilter = corba:create_nil_objref(), 202 LifeTimeFilter = corba:create_nil_objref(), 203 {ok, ?get_InitState(MyId, MyChannel, MyChannelPid, MyOperator, 204 PriorityFilter, LifeTimeFilter, InitQoS, LQS, Options)}. 205 206terminate(_Reason, _State) -> 207 ok. 208 209%%----------------------------------------------------------- 210%%----- CosNotifyChannelAdmin_ConsumerAdmin attributes ------ 211%%----------------------------------------------------------- 212%%----------------------------------------------------------% 213%% Attribute: '_get_MyID' 214%% Type : readonly 215%% Returns : 216%%----------------------------------------------------------- 217'_get_MyID'(_OE_THIS, _OE_FROM, State) -> 218 {reply, ?get_MyID(State), State}. 219 220%%----------------------------------------------------------% 221%% Attribute: '_get_MyChannel' 222%% Type : readonly 223%% Returns : 224%%----------------------------------------------------------- 225'_get_MyChannel'(_OE_THIS, _OE_FROM, State) -> 226 {reply, ?get_MyChannel(State), State}. 227 228%%----------------------------------------------------------% 229%% Attribute: '_get_MyOperator' 230%% Type : readonly 231%% Returns : 232%%----------------------------------------------------------- 233'_get_MyOperator'(_OE_THIS, _OE_FROM, State) -> 234 {reply, ?get_MyOperator(State), State}. 235 236%%----------------------------------------------------------% 237%% Attribute: '_get_priority_filter' 238%% Type : read and write 239%% Returns : 240%%----------------------------------------------------------- 241'_get_priority_filter'(_OE_THIS, _OE_FROM, State) -> 242 {reply, ?get_PrioFilter(State), State}. 243 244'_set_priority_filter'(_OE_THIS, _OE_FROM, State, PrioFilter) -> 245 {reply, ok, ?set_PrioFilter(State, PrioFilter)}. 246 247%%----------------------------------------------------------% 248%% Attribute: '_get_lifetime_filter' 249%% Type : read and write 250%% Returns : 251%%----------------------------------------------------------- 252'_get_lifetime_filter'(_OE_THIS, _OE_FROM, State) -> 253 {reply, ?get_LifeFilter(State), State}. 254'_set_lifetime_filter'(_OE_THIS, _OE_FROM, State, LifeFilter) -> 255 {reply, ok, ?set_LifeFilter(State, LifeFilter)}. 256 257%%----------------------------------------------------------% 258%% Attribute: '_get_pull_suppliers' 259%% Type : readonly 260%% Returns : ProxyIDSeq 261%%----------------------------------------------------------- 262'_get_pull_suppliers'(_OE_THIS, _OE_FROM, State) -> 263 {reply, ?get_PullSupplierIDs(State), State}. 264 265%%----------------------------------------------------------% 266%% Attribute: '_get_push_suppliers' 267%% Type : readonly 268%% Returns : ProxyIDSeq 269%%----------------------------------------------------------- 270'_get_push_suppliers'(_OE_THIS, _OE_FROM, State) -> 271 {reply, ?get_PushSupplierIDs(State), State}. 272 273%%----------------------------------------------------------- 274%%------- Exported external functions ----------------------- 275%%----------------------------------------------------------- 276%%----------------------------------------------------------% 277%% function : get_proxy_supplier 278%% Arguments: ProxyID - unique identifier (long) 279%% Returns : ObjRef | {'EXCEPTION', #'ProxyNotFound'{}} 280%%----------------------------------------------------------- 281get_proxy_supplier(_OE_THIS, _OE_FROM, State, Proxy_id) -> 282 {reply, ?get_Supplier(State, Proxy_id), State}. 283 284%%----------------------------------------------------------% 285%% function : obtain_notification_pull_supplier 286%% Arguments: Ctype - 'ANY_EVENT' | 'STRUCTURED_EVENT' | 'SEQUENCE_EVENT' 287%% Returns : A Proxy of the requested type. 288%%----------------------------------------------------------- 289obtain_notification_pull_supplier(OE_THIS, _OE_FROM, State, Ctype) -> 290 %% Choose which module to use. 291 {Mod, Type} = 292 case Ctype of 293 'ANY_EVENT' -> 294 {'CosNotifyChannelAdmin_ProxyPullSupplier', 'PULL_ANY'}; 295 'STRUCTURED_EVENT' -> 296 {'CosNotifyChannelAdmin_StructuredProxyPullSupplier', 'PULL_STRUCTURED'}; 297 'SEQUENCE_EVENT' -> 298 {'CosNotifyChannelAdmin_SequenceProxyPullSupplier', 'PULL_SEQUENCE'}; 299 _ -> 300 orber:dbg("[~p] CosNotifyChannelAdmin_ConsumerAdmin:" 301 "obtain_notification_pull_supplier(~p);~n" 302 "Incorrect enumerant", 303 [?LINE, Ctype], ?DEBUG_LEVEL), 304 corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}) 305 end, 306 SO = 'CosNotification_Common':get_option(server_options, ?get_Options(State), 307 ?not_DEFAULT_SETTINGS), 308 case Mod:oe_create_link([Type, OE_THIS, self(), ?get_GlobalQoS(State), 309 ?get_LocalQoS(State), ?get_MyChannel(State), 310 ?get_Options(State), ?get_MyOperator(State)], 311 [{sup_child, true}|SO]) of 312 {ok, Pid, PrRef} -> 313 ProxyID = ?new_Id(State), 314 NewState = ?add_PullSupplier(State, ProxyID, PrRef, Pid), 315 {reply, {PrRef, ProxyID}, ?set_IdCounter(NewState, ProxyID)}; 316 What -> 317 orber:dbg("[~p] CosNotifyChannelAdmin_ConsumerAdmin:" 318 "obtain_notification_pull_supplier();~n" 319 "Unable to create: ~p/~p~n" 320 "Reason: ~p", [?LINE, Mod, Type, What], ?DEBUG_LEVEL), 321 corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}) 322 end. 323 324%%----------------------------------------------------------% 325%% function : obtain_notification_push_supplier 326%% Arguments: Ctype - 'ANY_EVENT' | 'STRUCTURED_EVENT' | 'SEQUENCE_EVENT' 327%% Returns : A Proxy of the requested type. 328%%----------------------------------------------------------- 329obtain_notification_push_supplier(OE_THIS, _OE_FROM, State, Ctype) -> 330 %% Choose which module to use. 331 {Mod, Type} = 332 case Ctype of 333 'ANY_EVENT' -> 334 {'CosNotifyChannelAdmin_ProxyPushSupplier', 'PUSH_ANY'}; 335 'STRUCTURED_EVENT' -> 336 {'CosNotifyChannelAdmin_StructuredProxyPushSupplier', 'PUSH_STRUCTURED'}; 337 'SEQUENCE_EVENT' -> 338 {'CosNotifyChannelAdmin_SequenceProxyPushSupplier', 'PUSH_SEQUENCE'}; 339 _ -> 340 orber:dbg("[~p] CosNotifyChannelAdmin_ConsumerAdmin:" 341 "obtain_notification_push_supplier(~p);~n" 342 "Incorrect enumerant", [?LINE, Ctype], ?DEBUG_LEVEL), 343 corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}) 344 end, 345 SO = 'CosNotification_Common':get_option(server_options, ?get_Options(State), 346 ?not_DEFAULT_SETTINGS), 347 case Mod:oe_create_link([Type, OE_THIS, self(), ?get_GlobalQoS(State), 348 ?get_LocalQoS(State), ?get_MyChannel(State), 349 ?get_Options(State), ?get_MyOperator(State)], 350 [{sup_child, true}|SO]) of 351 {ok, Pid, PrRef} -> 352 ProxyID = ?new_Id(State), 353 NewState = ?add_PushSupplier(State, ProxyID, PrRef, Pid), 354 {reply, {PrRef, ProxyID}, ?set_IdCounter(NewState, ProxyID)}; 355 What -> 356 orber:dbg("[~p] CosNotifyChannelAdmin_ConsumerAdmin:obtain_notification_push_supplier();~n" 357 "Unable to create: ~p/~p~n" 358 "Reason: ~p", 359 [?LINE, Mod, Type, What], ?DEBUG_LEVEL), 360 corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}) 361 end. 362 363 364%%----------------------------------------------------------% 365%% function : destroy 366%% Arguments: - 367%% Returns : ok 368%%------------------------------------------------------------ 369destroy(_OE_THIS, _OE_FROM, State) -> 370 {stop, normal, ok, State}. 371 372%%----- Inherit from CosNotification::QoSAdmin -------------- 373%%----------------------------------------------------------% 374%% function : get_qos 375%% Arguments: 376%% Returns : 377%%----------------------------------------------------------- 378get_qos(_OE_THIS, _OE_FROM, State) -> 379 {reply, ?get_GlobalQoS(State), State}. 380 381%%----------------------------------------------------------% 382%% function : set_qos 383%% Arguments: QoS - CosNotification::QoSProperties, i.e., 384%% [#'Property'{name, value}, ...] where name eq. string() 385%% and value eq. any(). 386%% Returns : ok | {'EXCEPTION', CosNotification::UnsupportedQoS} 387%%----------------------------------------------------------- 388set_qos(_OE_THIS, _OE_FROM, State, QoS) -> 389 {NewQoS, LQS} = 'CosNotification_Common':set_qos(QoS, ?get_BothQoS(State), 390 admin, ?get_MyChannel(State), 391 ?get_AllSupplierRefs(State)), 392 {reply, ok, ?set_BothQoS(State, NewQoS, LQS)}. 393 394%%----------------------------------------------------------% 395%% function : validate_qos 396%% Arguments: Required_qos - CosNotification::QoSProperties 397%% [#'Property'{name, value}, ...] where name eq. string() 398%% and value eq. any(). 399%% Returns : {'EXCEPTION', CosNotification::UnsupportedQoS} 400%%----------------------------------------------------------- 401validate_qos(_OE_THIS, _OE_FROM, State, Required_qos) -> 402 QoS = 'CosNotification_Common':validate_qos(Required_qos, ?get_BothQoS(State), 403 admin, ?get_MyChannel(State), 404 ?get_AllSupplierRefs(State)), 405 {reply, {ok, QoS}, State}. 406 407%%----- Inherit from CosNotifyComm::NotifySubscribe --------- 408%%----------------------------------------------------------* 409%% function : subscription_change 410%% Arguments: 411%% Returns : ok | 412%% {'EXCEPTION', #'CosNotifyComm_InvalidEventType'{type}} 413%%----------------------------------------------------------- 414subscription_change(_OE_THIS, _OE_FROM, State, _Added, _Removed) -> 415 ?DBG("CALLBACK INFORMED: ~p ~p~n",[_Added, _Removed]), 416 {reply, ok, State}. 417 418%%----- Inherit from CosNotifyFilter::FilterAdmin ----------- 419%%----------------------------------------------------------% 420%% function : add_filter 421%% Arguments: Filter - CosNotifyFilter::Filter 422%% Returns : FilterID - long 423%%----------------------------------------------------------- 424add_filter(_OE_THIS, _OE_FROM, State, Filter) -> 425 'CosNotification_Common':type_check(Filter, 'CosNotifyFilter_Filter'), 426 FilterID = ?new_Id(State), 427 NewState = ?set_IdCounter(State, FilterID), 428 {reply, FilterID, ?add_Filter(NewState, FilterID, Filter)}. 429 430%%----------------------------------------------------------% 431%% function : remove_filter 432%% Arguments: FilterID - long 433%% Returns : ok 434%%----------------------------------------------------------- 435remove_filter(_OE_THIS, _OE_FROM, State, FilterID) when is_integer(FilterID) -> 436 {reply, ok, ?del_Filter(State, FilterID)}; 437remove_filter(_,_,_,_) -> 438 corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}). 439 440%%----------------------------------------------------------% 441%% function : get_filter 442%% Arguments: FilterID - long 443%% Returns : Filter - CosNotifyFilter::Filter | 444%% {'EXCEPTION', #'CosNotifyFilter_FilterNotFound'{}} 445%%----------------------------------------------------------- 446get_filter(_OE_THIS, _OE_FROM, State, FilterID) when is_integer(FilterID) -> 447 {reply, ?get_Filter(State, FilterID), State}; 448get_filter(_,_,_,_) -> 449 corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}). 450 451%%----------------------------------------------------------% 452%% function : get_all_filters 453%% Arguments: - 454%% Returns : Filter - CosNotifyFilter::FilterIDSeq 455%%----------------------------------------------------------- 456get_all_filters(_OE_THIS, _OE_FROM, State) -> 457 {reply, ?get_AllFilterID(State), State}. 458 459%%----------------------------------------------------------% 460%% function : remove_all_filters 461%% Arguments: - 462%% Returns : ok 463%%----------------------------------------------------------- 464remove_all_filters(_OE_THIS, _OE_FROM, State) -> 465 {reply, ok, ?del_AllFilter(State)}. 466 467%%----- Inherit from CosEventChannelAdmin::ConsumerAdmin ---- 468%%----------------------------------------------------------% 469%% function : obtain_push_supplier 470%% Arguments: - 471%% Returns : ProxyPushSupplier 472%%----------------------------------------------------------- 473obtain_push_supplier(OE_THIS, _OE_FROM, State) -> 474 SO = 'CosNotification_Common':get_option(server_options, ?get_Options(State), 475 ?not_DEFAULT_SETTINGS), 476 case 'CosNotifyChannelAdmin_ProxyPushSupplier':oe_create_link(['PUSH_ANY', OE_THIS, 477 self(), 478 ?get_GlobalQoS(State), 479 ?get_LocalQoS(State), 480 ?get_MyChannel(State), 481 ?get_Options(State), 482 ?get_MyOperator(State)], 483 [{sup_child, true}|SO]) of 484 {ok, Pid, PrRef} -> 485 ProxyID = ?new_Id(State), 486 NewState = ?add_PushSupplier(State, ProxyID, PrRef, Pid), 487 {reply, PrRef, ?set_IdCounter(NewState, ProxyID)}; 488 What -> 489 orber:dbg("[~p] CosNotifyChannelAdmin_ConsumerAdmin:obtain_push_supplier();~n" 490 "Unable to create: CosNotifyChannelAdmin_ProxyPushSupplier~n" 491 "Reason: ~p", [?LINE, What], ?DEBUG_LEVEL), 492 corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}) 493 end. 494 495%%----------------------------------------------------------% 496%% function : obtain_pull_supplier 497%% Arguments: - 498%% Returns : ProxyPullSupplier 499%%----------------------------------------------------------- 500obtain_pull_supplier(OE_THIS, _OE_FROM, State) -> 501 SO = 'CosNotification_Common':get_option(server_options, ?get_Options(State), 502 ?not_DEFAULT_SETTINGS), 503 case 'CosNotifyChannelAdmin_ProxyPullSupplier':oe_create_link(['PULL_ANY', OE_THIS, 504 self(), 505 ?get_GlobalQoS(State), 506 ?get_LocalQoS(State), 507 ?get_MyChannel(State), 508 ?get_Options(State), 509 ?get_MyOperator(State)], 510 [{sup_child, true}|SO]) of 511 {ok, Pid, PrRef} -> 512 ProxyID = ?new_Id(State), 513 NewState = ?add_PullSupplier(State, ProxyID, PrRef, Pid), 514 {reply, PrRef, ?set_IdCounter(NewState, ProxyID)}; 515 What -> 516 orber:dbg("[~p] CosNotifyChannelAdmin_ConsumerAdmin:obtain_pull_supplier();~n" 517 "Unable to create: CosNotifyChannelAdmin_ProxyPullSupplier~n" 518 "Reason: ~p", [?LINE, What], ?DEBUG_LEVEL), 519 corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}) 520 end. 521 522%%--------------- LOCAL FUNCTIONS ---------------------------- 523%% To match suppliers 524find_obj({value, {_,Obj,_,_}},_) -> Obj; 525%% To match filters 526find_obj({value, {_,Obj}},_) -> Obj; 527find_obj(_, supplier) -> {'EXCEPTION', #'CosNotifyChannelAdmin_ProxyNotFound'{}}; 528find_obj(_, filter) -> {'EXCEPTION', #'CosNotifyFilter_FilterNotFound'{}}. 529 530find_ids(List) -> 531 find_ids(List, [], false). 532find_ids(List, Type) -> 533 find_ids(List, [], Type). 534 535find_ids([], Acc, _) -> 536 Acc; 537find_ids([{I,_}|T], Acc, Type) -> 538 find_ids(T, [I|Acc], Type); 539find_ids([{I,_,_,Type}|T], Acc, Type) -> 540 find_ids(T, [I|Acc], Type); 541find_ids([{_I,_,_,_}|T], Acc, Type) -> 542 find_ids(T, Acc, Type); 543find_ids(What, _, _) -> 544 orber:dbg("[~p] CosNotifyChannelAdmin_ConsumerAdmin:find_ids();~n" 545 "Id corrupt: ~p", [?LINE, What], ?DEBUG_LEVEL), 546 corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}). 547 548find_refs(List) -> 549 find_refs(List, []). 550 551find_refs([], Acc) -> 552 Acc; 553find_refs([{_,R,_,_}|T], Acc) -> 554 find_refs(T, [R|Acc]); 555find_refs(What, _) -> 556 orber:dbg("[~p] CosNotifyChannelAdmin_ConsumerAdmin:find_refs();~n" 557 "Reference corrupt: ~p", [?LINE, What], ?DEBUG_LEVEL), 558 corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}). 559 560%% Delete a single filter. 561%% The list do not differ, i.e., no filter removed, raise exception. 562delete_filter(List,List) -> corba:raise(#'CosNotifyFilter_FilterNotFound'{}); 563delete_filter(List, _) -> List. 564 565 566%%----------------------------------------------------------- 567%% function : callSeq 568%% Arguments: 569%% Returns : 570%%----------------------------------------------------------- 571callSeq(_OE_THIS, OE_FROM, State, Events, _Status) -> 572 corba:reply(OE_FROM, ok), 573 case cosNotification_eventDB:filter_events(Events, ?get_AllFilter(State)) of 574 {[], _} when ?is_ANDOP(State) -> 575 %% Since AND it doesn't matter what the proxies 'think'. Done. 576 {noreply, State}; 577 {[], Failed} -> 578 %% Is OR but the Proxy may allow the events; pass on. 579 forward(seq, State, Failed, 'MATCH'); 580 {Passed, _} when ?is_ANDOP(State) -> 581 %% Since AND we only forward those who passed this objects filters. 582 forward(seq, State, Passed, 'MATCH'); 583 {Passed, []} -> 584 %% Since OR we forward and tell the proxy to do no filtering. 585 forward(seq, State, Passed, 'MATCHED'); 586 {Passed, Failed} -> 587 %% Since OR we forward both and instruct the proxy to check only 588 %% the ones that failed. 589 forward(seq, State, Passed, 'MATCHED'), 590 forward(seq, State, Failed, 'MATCH') 591 end. 592 593 594%%----------------------------------------------------------- 595%% function : callAny 596%% Arguments: 597%% Returns : 598%%----------------------------------------------------------- 599callAny(_OE_THIS, OE_FROM, State, Event, _Status) -> 600 corba:reply(OE_FROM, ok), 601 case cosNotification_eventDB:filter_events([Event], ?get_AllFilter(State)) of 602 {[], _} when ?is_ANDOP(State) -> 603 %% Since AND it doesn't matter what the proxies 'think'. Done. 604 {noreply, State}; 605 {[], [Failed]} -> 606 %% Is OR but the Proxy may allow the event; pass on. 607 forward(any, State, Failed, 'MATCH'); 608 {[Passed], _} when ?is_ANDOP(State) -> 609 %% Since AND we only forward those who passed this objects filters. 610 forward(any, State, Passed, 'MATCH'); 611 {[Passed], _} -> 612 %% Since OR we forward and instruct the proxy to do no checks. 613 forward(any, State, Passed, 'MATCHED') 614 end. 615 616 617 618%% Forward events 619forward(Type, State, Event, Status) -> 620 forward(Type, ?get_AllSuppliers(State), State, Event, Status). 621forward(_, [], State, _, _) -> 622 {noreply, State}; 623forward(any, [{_,H,_,_}|T], State, Event, Status) -> 624 case catch oe_CosNotificationComm_Event:callAny(H, Event, Status) of 625 ok -> 626 ?DBG("CONSUMERADM FORWARD ANY: ~p~n",[Event]), 627 forward(any, T, State, Event, Status); 628 {'EXCEPTION', E} when is_record(E, 'OBJECT_NOT_EXIST') -> 629 orber:dbg("[~p] CosNotifyChannelAdmin_ConsumerAdmin:forward();~n" 630 "Proxy no longer exists; dropping it: ~p", 631 [?LINE, H], ?DEBUG_LEVEL), 632 NewState = ?del_SupplierRef(State,H), 633 forward(any, T, NewState, Event, Status); 634 R when ?is_PersistentConnection(State) -> 635 orber:dbg("[~p] CosNotifyChannelAdmin_ConsumerAdmin:forward();~n" 636 "Proxy behaves badly: ~p/~p", 637 [?LINE, R, H], ?DEBUG_LEVEL), 638 forward(any, T, State, Event, Status); 639 R -> 640 orber:dbg("[~p] CosNotifyChannelAdmin_ConsumerAdmin:forward();~n" 641 "Proxy behaves badly: ~p~n" 642 "Dropping it: ~p", [?LINE, R, H], ?DEBUG_LEVEL), 643 NewState = ?del_SupplierRef(State, H), 644 forward(any, T, NewState, Event, Status) 645 end; 646forward(seq, [{_,H,_,_}|T], State, Event, Status) -> 647 case catch oe_CosNotificationComm_Event:callSeq(H, Event, Status) of 648 ok -> 649 ?DBG("CONSUMERADM FORWARD SEQUENCE: ~p~n",[Event]), 650 forward(seq, T, State, Event, Status); 651 {'EXCEPTION', E} when is_record(E, 'OBJECT_NOT_EXIST') -> 652 orber:dbg("[~p] CosNotifyChannelAdmin_ConsumerAdmin:forward();~n" 653 "Proxy no longer exists; dropping it: ~p", 654 [?LINE, H], ?DEBUG_LEVEL), 655 NewState = ?del_SupplierRef(State,H), 656 forward(seq, T, NewState, Event, Status); 657 R when ?is_PersistentConnection(State) -> 658 orber:dbg("[~p] CosNotifyChannelAdmin_ConsumerAdmin:forward();~n" 659 "Proxy behaves badly: ~p/~p", [?LINE, R, H], ?DEBUG_LEVEL), 660 forward(seq, T, State, Event, Status); 661 R -> 662 orber:dbg("[~p] CosNotifyChannelAdmin_ConsumerAdmin:forward();~n" 663 "Proxy behaves badly: ~p~n" 664 "Dropping it: ~p", [?LINE, R, H], ?DEBUG_LEVEL), 665 NewState = ?del_SupplierRef(State, H), 666 forward(seq, T, NewState, Event, Status) 667 end. 668 669 670%%--------------- MISC FUNCTIONS, E.G. DEBUGGING ------------- 671%%--------------- END OF MODULE ------------------------------ 672