1%%--------------------------------------------------------------------
2%%
3%% %CopyrightBegin%
4%%
5%% Copyright Ericsson AB 1999-2016. All Rights Reserved.
6%%
7%% Licensed under the Apache License, Version 2.0 (the "License");
8%% you may not use this file except in compliance with the License.
9%% You may obtain a copy of the License at
10%%
11%%     http://www.apache.org/licenses/LICENSE-2.0
12%%
13%% Unless required by applicable law or agreed to in writing, software
14%% distributed under the License is distributed on an "AS IS" BASIS,
15%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16%% See the License for the specific language governing permissions and
17%% limitations under the License.
18%%
19%% %CopyrightEnd%
20%%
21%%
22%%-------------------------------------------------------------------
23%% File    : CosNotifyChannelAdmin_ConsumerAdmin_impl.erl
24%% Purpose :
25%%-------------------------------------------------------------------
26
27-module('CosNotifyChannelAdmin_ConsumerAdmin_impl').
28
29%%--------------- INCLUDES -----------------------------------
30-include_lib("orber/include/corba.hrl").
31-include_lib("orber/include/ifr_types.hrl").
32%% Application files
33-include("CosNotification.hrl").
34-include("CosNotifyChannelAdmin.hrl").
35-include("CosNotifyComm.hrl").
36-include("CosNotifyFilter.hrl").
37-include("CosNotification_Definitions.hrl").
38
39%%--------------- EXPORTS ------------------------------------
40%%--------------- External -----------------------------------
41%%----- CosNotifyChannelAdmin::ConsumerAdmin -----------------
42-export([get_proxy_supplier/4,
43	 obtain_notification_pull_supplier/4,
44	 obtain_notification_push_supplier/4,
45	 destroy/3]).
46
47%%----- Inherit from CosNotification::QoSAdmin ---------------
48-export([get_qos/3,
49	 set_qos/4,
50	 validate_qos/4]).
51
52%%----- Inherit from CosNotifyComm::NotifySubscribe ----------
53-export([subscription_change/5]).
54
55%%----- Inherit from CosNotifyFilter::FilterAdmin ------------
56-export([add_filter/4,
57	 remove_filter/4,
58	 get_filter/4,
59	 get_all_filters/3,
60	 remove_all_filters/3]).
61
62%%----- Inherit from CosEventChannelAdmin::ConsumerAdmin -----
63-export([obtain_push_supplier/3,
64	 obtain_pull_supplier/3]).
65
66%% Attributes (external)
67-export(['_get_MyID'/3,
68	 '_get_MyChannel'/3,
69	 '_get_MyOperator'/3,
70	 '_get_priority_filter'/3,
71	 '_set_priority_filter'/4,
72	 '_get_lifetime_filter'/3,
73	 '_set_lifetime_filter'/4,
74	 '_get_pull_suppliers'/3,
75	 '_get_push_suppliers'/3]).
76
77%%--------------- Internal -----------------------------------
78%%----- Inherit from cosNotificationComm ---------------------
79-export([callAny/5,
80	 callSeq/5]).
81
82%%--------------- gen_server specific exports ----------------
83-export([handle_info/2, code_change/3]).
84-export([init/1, terminate/2]).
85
86%%--------------- LOCAL DEFINITIONS --------------------------
87%% Data structures
88-record(state, {myId,
89	        myChannel,
90		myChannelPid,
91		myOperator,
92		myFilters = [],
93		mySuppliers = [],
94		idCounter = 0,
95		priorityFilter,
96		lifetimeFilter,
97		etsR,
98		qosGlobal,
99		qosLocal,
100		options}).
101
102%% Data structures constructors
103-define(get_InitState(_MyID, _MyCh, _MyChP, _MyOp, _PFil, _LFil, _QoS, _LQS, _O),
104	#state{myId           = _MyID,
105	       myChannel      = _MyCh,
106	       myChannelPid   = _MyChP,
107	       myOperator     = _MyOp,
108	       priorityFilter = _PFil,
109	       lifetimeFilter = _LFil,
110	       qosGlobal      = _QoS,
111	       qosLocal       = _LQS,
112	       options        = _O,
113	       etsR = ets:new(oe_ets, [set, protected])}).
114
115%% Data structures selectors
116-define(get_PushSupplierIDs(S),   find_ids(S#state.mySuppliers, pusher)).
117-define(get_PullSupplierIDs(S),   find_ids(S#state.mySuppliers, puller)).
118-define(get_AllSuppliers(S),      S#state.mySuppliers).
119-define(get_AllSupplierRefs(S),   find_refs(S#state.mySuppliers)).
120-define(get_Supplier(S, I),       find_obj(lists:keysearch(I,1,S#state.mySuppliers),
121					   supplier)).
122
123-define(get_MyID(S),              S#state.myId).
124-define(get_MyChannel(S),         S#state.myChannel).
125-define(get_MyChannelPid(S),      S#state.myChannelPid).
126-define(get_MyOperator(S),        S#state.myOperator).
127-define(get_PrioFilter(S),        S#state.priorityFilter).
128-define(get_LifeFilter(S),        S#state.lifetimeFilter).
129-define(get_GlobalQoS(S),         S#state.qosGlobal).
130-define(get_LocalQoS(S),          S#state.qosLocal).
131-define(get_BothQoS(S),           {S#state.qosGlobal, S#state.qosLocal}).
132-define(get_Filter(S, I),         find_obj(lists:keysearch(I, 1, S#state.myFilters),
133					   filter)).
134-define(get_AllFilter(S),         S#state.myFilters).
135-define(get_AllFilterID(S),       find_ids(S#state.myFilters)).
136-define(get_Options(S),           S#state.options).
137-define(get_IdCounter(S),         S#state.idCounter).
138
139%% Data structures modifiers
140-define(set_PrioFilter(S,D),      S#state{priorityFilter=D}).
141-define(set_LifeFilter(S,D),      S#state{lifetimeFilter=D}).
142-define(set_LocalQoS(S,D),        S#state{qosLocal=D}).
143-define(set_GlobalQoS(S,D),       S#state{qosGlobal=D}).
144-define(set_BothQoS(S,GD,LD),     S#state{qosGlobal=GD, qosLocal=LD}).
145-define(add_PushSupplier(S,I,R,P),S#state{mySuppliers=[{I,R,P,pusher}|S#state.mySuppliers]}).
146-define(add_PullSupplier(S,I,R,P),S#state{mySuppliers=[{I,R,P,puller}|S#state.mySuppliers]}).
147-define(del_Supplier(S,I),        S#state{mySuppliers=
148					  lists:keydelete(I, 1,
149							  S#state.mySuppliers)}).
150-define(del_SupplierRef(S,O),     S#state{mySuppliers=
151					  lists:keydelete(O, 2,
152							  S#state.mySuppliers)}).
153-define(del_SupplierPid(S,P),     S#state{mySuppliers=
154					  lists:keydelete(P, 3,
155							  S#state.mySuppliers)}).
156-define(add_Filter(S,I,O),        S#state{myFilters=[{I,O}|S#state.myFilters]}).
157-define(del_Filter(S,I),          S#state{myFilters=
158					  delete_filter(lists:keydelete(I, 1,
159									S#state.myFilters),
160							S#state.myFilters)}).
161-define(del_AllFilter(S),         S#state{myFilters=[]}).
162-define(set_IdCounter(S,V),       S#state{idCounter=V}).
163-define(new_Id(S),               'CosNotification_Common':create_id(
164							   S#state.idCounter)).
165
166%% MISC
167-define(is_PersistentConnection(S),
168	?not_GetConnectionReliability((S#state.qosLocal)) == ?not_Persistent).
169-define(is_PersistentEvent(S),
170	?not_GetEventReliability((S#state.qosLocal)) == ?not_Persistent).
171-define(is_ANDOP(S),              S#state.myOperator == 'AND_OP').
172
173%%----------------------------------------------------------%
174%% function : handle_info, code_change
175%% Arguments:
176%% Returns  :
177%% Effect   : Functions demanded by the gen_server module.
178%%-----------------------------------------------------------
179
180code_change(_OldVsn, State, _Extra) ->
181    {ok, State}.
182
183handle_info(Info, State) ->
184    case Info of
185        {'EXIT', Pid, Reason} when ?get_MyChannelPid(State) == Pid ->
186            ?DBG("PARENT CHANNEL: ~p  TERMINATED.~n",[Reason]),
187	    {stop, Reason, State};
188        {'EXIT', Pid, normal} ->
189            {noreply, ?del_SupplierPid(State, Pid)};
190        _Other ->
191            {noreply, State}
192    end.
193
194%%----------------------------------------------------------%
195%% function : init, terminate
196%% Arguments:
197%%-----------------------------------------------------------
198
199init([MyId, MyChannel, MyChannelPid, MyOperator, InitQoS, LQS, Options]) ->
200    process_flag(trap_exit, true),
201    PriorityFilter = corba:create_nil_objref(),
202    LifeTimeFilter = corba:create_nil_objref(),
203    {ok, ?get_InitState(MyId, MyChannel, MyChannelPid, MyOperator,
204			PriorityFilter, LifeTimeFilter, InitQoS, LQS, Options)}.
205
206terminate(_Reason, _State) ->
207    ok.
208
209%%-----------------------------------------------------------
210%%----- CosNotifyChannelAdmin_ConsumerAdmin attributes ------
211%%-----------------------------------------------------------
212%%----------------------------------------------------------%
213%% Attribute: '_get_MyID'
214%% Type     : readonly
215%% Returns  :
216%%-----------------------------------------------------------
217'_get_MyID'(_OE_THIS, _OE_FROM, State) ->
218    {reply, ?get_MyID(State), State}.
219
220%%----------------------------------------------------------%
221%% Attribute: '_get_MyChannel'
222%% Type     : readonly
223%% Returns  :
224%%-----------------------------------------------------------
225'_get_MyChannel'(_OE_THIS, _OE_FROM, State) ->
226    {reply, ?get_MyChannel(State), State}.
227
228%%----------------------------------------------------------%
229%% Attribute: '_get_MyOperator'
230%% Type     : readonly
231%% Returns  :
232%%-----------------------------------------------------------
233'_get_MyOperator'(_OE_THIS, _OE_FROM, State) ->
234    {reply, ?get_MyOperator(State), State}.
235
236%%----------------------------------------------------------%
237%% Attribute: '_get_priority_filter'
238%% Type     : read and write
239%% Returns  :
240%%-----------------------------------------------------------
241'_get_priority_filter'(_OE_THIS, _OE_FROM, State) ->
242    {reply, ?get_PrioFilter(State), State}.
243
244'_set_priority_filter'(_OE_THIS, _OE_FROM, State, PrioFilter) ->
245    {reply, ok, ?set_PrioFilter(State, PrioFilter)}.
246
247%%----------------------------------------------------------%
248%% Attribute: '_get_lifetime_filter'
249%% Type     : read and write
250%% Returns  :
251%%-----------------------------------------------------------
252'_get_lifetime_filter'(_OE_THIS, _OE_FROM, State) ->
253    {reply, ?get_LifeFilter(State), State}.
254'_set_lifetime_filter'(_OE_THIS, _OE_FROM, State, LifeFilter) ->
255    {reply, ok, ?set_LifeFilter(State, LifeFilter)}.
256
257%%----------------------------------------------------------%
258%% Attribute: '_get_pull_suppliers'
259%% Type     : readonly
260%% Returns  : ProxyIDSeq
261%%-----------------------------------------------------------
262'_get_pull_suppliers'(_OE_THIS, _OE_FROM, State) ->
263    {reply, ?get_PullSupplierIDs(State), State}.
264
265%%----------------------------------------------------------%
266%% Attribute: '_get_push_suppliers'
267%% Type     : readonly
268%% Returns  : ProxyIDSeq
269%%-----------------------------------------------------------
270'_get_push_suppliers'(_OE_THIS, _OE_FROM, State) ->
271    {reply, ?get_PushSupplierIDs(State), State}.
272
273%%-----------------------------------------------------------
274%%------- Exported external functions -----------------------
275%%-----------------------------------------------------------
276%%----------------------------------------------------------%
277%% function : get_proxy_supplier
278%% Arguments: ProxyID - unique identifier (long)
279%% Returns  : ObjRef | {'EXCEPTION', #'ProxyNotFound'{}}
280%%-----------------------------------------------------------
281get_proxy_supplier(_OE_THIS, _OE_FROM, State, Proxy_id) ->
282    {reply, ?get_Supplier(State, Proxy_id), State}.
283
284%%----------------------------------------------------------%
285%% function : obtain_notification_pull_supplier
286%% Arguments: Ctype - 'ANY_EVENT' | 'STRUCTURED_EVENT' | 'SEQUENCE_EVENT'
287%% Returns  : A Proxy of the requested type.
288%%-----------------------------------------------------------
289obtain_notification_pull_supplier(OE_THIS, _OE_FROM, State, Ctype) ->
290    %% Choose which module to use.
291    {Mod, Type} =
292	case Ctype of
293	    'ANY_EVENT' ->
294		{'CosNotifyChannelAdmin_ProxyPullSupplier', 'PULL_ANY'};
295	    'STRUCTURED_EVENT' ->
296		{'CosNotifyChannelAdmin_StructuredProxyPullSupplier', 'PULL_STRUCTURED'};
297	    'SEQUENCE_EVENT' ->
298		{'CosNotifyChannelAdmin_SequenceProxyPullSupplier', 'PULL_SEQUENCE'};
299	    _ ->
300		orber:dbg("[~p] CosNotifyChannelAdmin_ConsumerAdmin:"
301			  "obtain_notification_pull_supplier(~p);~n"
302			  "Incorrect enumerant",
303			  [?LINE, Ctype], ?DEBUG_LEVEL),
304		corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO})
305	end,
306    SO = 'CosNotification_Common':get_option(server_options, ?get_Options(State),
307					     ?not_DEFAULT_SETTINGS),
308    case Mod:oe_create_link([Type, OE_THIS, self(), ?get_GlobalQoS(State),
309			     ?get_LocalQoS(State), ?get_MyChannel(State),
310			     ?get_Options(State), ?get_MyOperator(State)],
311			    [{sup_child, true}|SO]) of
312	{ok, Pid, PrRef} ->
313	    ProxyID = ?new_Id(State),
314	    NewState = ?add_PullSupplier(State, ProxyID, PrRef, Pid),
315	    {reply, {PrRef, ProxyID},  ?set_IdCounter(NewState, ProxyID)};
316	What ->
317	    orber:dbg("[~p] CosNotifyChannelAdmin_ConsumerAdmin:"
318		      "obtain_notification_pull_supplier();~n"
319		      "Unable to create: ~p/~p~n"
320		      "Reason: ~p", [?LINE, Mod, Type, What], ?DEBUG_LEVEL),
321	    corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO})
322    end.
323
324%%----------------------------------------------------------%
325%% function : obtain_notification_push_supplier
326%% Arguments: Ctype - 'ANY_EVENT' | 'STRUCTURED_EVENT' | 'SEQUENCE_EVENT'
327%% Returns  : A Proxy of the requested type.
328%%-----------------------------------------------------------
329obtain_notification_push_supplier(OE_THIS, _OE_FROM, State, Ctype) ->
330    %% Choose which module to use.
331    {Mod, Type} =
332	case Ctype of
333	    'ANY_EVENT' ->
334		{'CosNotifyChannelAdmin_ProxyPushSupplier', 'PUSH_ANY'};
335	    'STRUCTURED_EVENT' ->
336		{'CosNotifyChannelAdmin_StructuredProxyPushSupplier', 'PUSH_STRUCTURED'};
337	    'SEQUENCE_EVENT' ->
338		{'CosNotifyChannelAdmin_SequenceProxyPushSupplier', 'PUSH_SEQUENCE'};
339	    _ ->
340		orber:dbg("[~p] CosNotifyChannelAdmin_ConsumerAdmin:"
341			  "obtain_notification_push_supplier(~p);~n"
342			  "Incorrect enumerant", [?LINE, Ctype], ?DEBUG_LEVEL),
343		corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO})
344	end,
345    SO = 'CosNotification_Common':get_option(server_options, ?get_Options(State),
346					     ?not_DEFAULT_SETTINGS),
347    case Mod:oe_create_link([Type, OE_THIS, self(), ?get_GlobalQoS(State),
348			     ?get_LocalQoS(State), ?get_MyChannel(State),
349			     ?get_Options(State), ?get_MyOperator(State)],
350			    [{sup_child, true}|SO]) of
351	{ok, Pid, PrRef} ->
352	    ProxyID = ?new_Id(State),
353	    NewState = ?add_PushSupplier(State, ProxyID, PrRef, Pid),
354	    {reply, {PrRef, ProxyID}, ?set_IdCounter(NewState, ProxyID)};
355	What ->
356	    orber:dbg("[~p] CosNotifyChannelAdmin_ConsumerAdmin:obtain_notification_push_supplier();~n"
357		      "Unable to create: ~p/~p~n"
358		      "Reason: ~p",
359		      [?LINE, Mod, Type, What], ?DEBUG_LEVEL),
360	    corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO})
361    end.
362
363
364%%----------------------------------------------------------%
365%% function : destroy
366%% Arguments: -
367%% Returns  : ok
368%%------------------------------------------------------------
369destroy(_OE_THIS, _OE_FROM, State) ->
370    {stop, normal, ok, State}.
371
372%%----- Inherit from CosNotification::QoSAdmin --------------
373%%----------------------------------------------------------%
374%% function : get_qos
375%% Arguments:
376%% Returns  :
377%%-----------------------------------------------------------
378get_qos(_OE_THIS, _OE_FROM, State) ->
379    {reply, ?get_GlobalQoS(State), State}.
380
381%%----------------------------------------------------------%
382%% function : set_qos
383%% Arguments: QoS - CosNotification::QoSProperties, i.e.,
384%%            [#'Property'{name, value}, ...] where name eq. string()
385%%            and value eq. any().
386%% Returns  : ok | {'EXCEPTION', CosNotification::UnsupportedQoS}
387%%-----------------------------------------------------------
388set_qos(_OE_THIS, _OE_FROM, State, QoS) ->
389    {NewQoS, LQS} = 'CosNotification_Common':set_qos(QoS, ?get_BothQoS(State),
390						     admin, ?get_MyChannel(State),
391						     ?get_AllSupplierRefs(State)),
392    {reply, ok, ?set_BothQoS(State, NewQoS, LQS)}.
393
394%%----------------------------------------------------------%
395%% function : validate_qos
396%% Arguments: Required_qos - CosNotification::QoSProperties
397%%            [#'Property'{name, value}, ...] where name eq. string()
398%%            and value eq. any().
399%% Returns  : {'EXCEPTION', CosNotification::UnsupportedQoS}
400%%-----------------------------------------------------------
401validate_qos(_OE_THIS, _OE_FROM, State, Required_qos) ->
402    QoS = 'CosNotification_Common':validate_qos(Required_qos, ?get_BothQoS(State),
403						admin, ?get_MyChannel(State),
404						?get_AllSupplierRefs(State)),
405    {reply, {ok, QoS}, State}.
406
407%%----- Inherit from CosNotifyComm::NotifySubscribe ---------
408%%----------------------------------------------------------*
409%% function : subscription_change
410%% Arguments:
411%% Returns  : ok |
412%%            {'EXCEPTION', #'CosNotifyComm_InvalidEventType'{type}}
413%%-----------------------------------------------------------
414subscription_change(_OE_THIS, _OE_FROM, State, _Added, _Removed) ->
415    ?DBG("CALLBACK INFORMED:  ~p   ~p~n",[_Added, _Removed]),
416    {reply, ok, State}.
417
418%%----- Inherit from CosNotifyFilter::FilterAdmin -----------
419%%----------------------------------------------------------%
420%% function : add_filter
421%% Arguments: Filter - CosNotifyFilter::Filter
422%% Returns  : FilterID - long
423%%-----------------------------------------------------------
424add_filter(_OE_THIS, _OE_FROM, State, Filter) ->
425    'CosNotification_Common':type_check(Filter, 'CosNotifyFilter_Filter'),
426    FilterID = ?new_Id(State),
427    NewState = ?set_IdCounter(State, FilterID),
428    {reply, FilterID, ?add_Filter(NewState, FilterID, Filter)}.
429
430%%----------------------------------------------------------%
431%% function : remove_filter
432%% Arguments: FilterID - long
433%% Returns  : ok
434%%-----------------------------------------------------------
435remove_filter(_OE_THIS, _OE_FROM, State, FilterID) when is_integer(FilterID) ->
436    {reply, ok, ?del_Filter(State, FilterID)};
437remove_filter(_,_,_,_) ->
438    corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}).
439
440%%----------------------------------------------------------%
441%% function : get_filter
442%% Arguments: FilterID - long
443%% Returns  : Filter - CosNotifyFilter::Filter |
444%%            {'EXCEPTION', #'CosNotifyFilter_FilterNotFound'{}}
445%%-----------------------------------------------------------
446get_filter(_OE_THIS, _OE_FROM, State, FilterID) when is_integer(FilterID) ->
447    {reply, ?get_Filter(State, FilterID), State};
448get_filter(_,_,_,_) ->
449    corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}).
450
451%%----------------------------------------------------------%
452%% function : get_all_filters
453%% Arguments: -
454%% Returns  : Filter - CosNotifyFilter::FilterIDSeq
455%%-----------------------------------------------------------
456get_all_filters(_OE_THIS, _OE_FROM, State) ->
457    {reply, ?get_AllFilterID(State), State}.
458
459%%----------------------------------------------------------%
460%% function : remove_all_filters
461%% Arguments: -
462%% Returns  : ok
463%%-----------------------------------------------------------
464remove_all_filters(_OE_THIS, _OE_FROM, State) ->
465    {reply, ok, ?del_AllFilter(State)}.
466
467%%----- Inherit from CosEventChannelAdmin::ConsumerAdmin ----
468%%----------------------------------------------------------%
469%% function : obtain_push_supplier
470%% Arguments: -
471%% Returns  : ProxyPushSupplier
472%%-----------------------------------------------------------
473obtain_push_supplier(OE_THIS, _OE_FROM, State) ->
474    SO = 'CosNotification_Common':get_option(server_options, ?get_Options(State),
475					     ?not_DEFAULT_SETTINGS),
476    case 'CosNotifyChannelAdmin_ProxyPushSupplier':oe_create_link(['PUSH_ANY', OE_THIS,
477								   self(),
478								   ?get_GlobalQoS(State),
479								   ?get_LocalQoS(State),
480								   ?get_MyChannel(State),
481								   ?get_Options(State),
482								   ?get_MyOperator(State)],
483								  [{sup_child, true}|SO]) of
484	{ok, Pid, PrRef} ->
485	    ProxyID = ?new_Id(State),
486	    NewState = ?add_PushSupplier(State, ProxyID, PrRef, Pid),
487	    {reply, PrRef, ?set_IdCounter(NewState, ProxyID)};
488	What ->
489	    orber:dbg("[~p] CosNotifyChannelAdmin_ConsumerAdmin:obtain_push_supplier();~n"
490		      "Unable to create: CosNotifyChannelAdmin_ProxyPushSupplier~n"
491		      "Reason: ~p", [?LINE, What], ?DEBUG_LEVEL),
492	    corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO})
493    end.
494
495%%----------------------------------------------------------%
496%% function : obtain_pull_supplier
497%% Arguments: -
498%% Returns  : ProxyPullSupplier
499%%-----------------------------------------------------------
500obtain_pull_supplier(OE_THIS, _OE_FROM, State) ->
501    SO = 'CosNotification_Common':get_option(server_options, ?get_Options(State),
502					     ?not_DEFAULT_SETTINGS),
503    case 'CosNotifyChannelAdmin_ProxyPullSupplier':oe_create_link(['PULL_ANY', OE_THIS,
504								   self(),
505								   ?get_GlobalQoS(State),
506								   ?get_LocalQoS(State),
507								   ?get_MyChannel(State),
508								   ?get_Options(State),
509								   ?get_MyOperator(State)],
510								  [{sup_child, true}|SO]) of
511	{ok, Pid, PrRef} ->
512	    ProxyID = ?new_Id(State),
513	    NewState = ?add_PullSupplier(State, ProxyID, PrRef, Pid),
514	    {reply, PrRef, ?set_IdCounter(NewState, ProxyID)};
515	What ->
516	    orber:dbg("[~p] CosNotifyChannelAdmin_ConsumerAdmin:obtain_pull_supplier();~n"
517		      "Unable to create: CosNotifyChannelAdmin_ProxyPullSupplier~n"
518		      "Reason: ~p", [?LINE, What], ?DEBUG_LEVEL),
519	    corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO})
520    end.
521
522%%--------------- LOCAL FUNCTIONS ----------------------------
523%% To match suppliers
524find_obj({value, {_,Obj,_,_}},_) -> Obj;
525%% To match filters
526find_obj({value, {_,Obj}},_) -> Obj;
527find_obj(_, supplier) -> {'EXCEPTION', #'CosNotifyChannelAdmin_ProxyNotFound'{}};
528find_obj(_, filter) -> {'EXCEPTION', #'CosNotifyFilter_FilterNotFound'{}}.
529
530find_ids(List) ->
531    find_ids(List, [], false).
532find_ids(List, Type) ->
533    find_ids(List, [], Type).
534
535find_ids([], Acc, _) ->
536    Acc;
537find_ids([{I,_}|T], Acc, Type) ->
538    find_ids(T, [I|Acc], Type);
539find_ids([{I,_,_,Type}|T], Acc, Type) ->
540    find_ids(T, [I|Acc], Type);
541find_ids([{_I,_,_,_}|T], Acc, Type) ->
542    find_ids(T, Acc, Type);
543find_ids(What, _, _) ->
544    orber:dbg("[~p] CosNotifyChannelAdmin_ConsumerAdmin:find_ids();~n"
545	      "Id corrupt: ~p", [?LINE, What], ?DEBUG_LEVEL),
546    corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}).
547
548find_refs(List) ->
549    find_refs(List, []).
550
551find_refs([], Acc) ->
552    Acc;
553find_refs([{_,R,_,_}|T], Acc) ->
554    find_refs(T, [R|Acc]);
555find_refs(What, _) ->
556    orber:dbg("[~p] CosNotifyChannelAdmin_ConsumerAdmin:find_refs();~n"
557	      "Reference corrupt: ~p", [?LINE, What], ?DEBUG_LEVEL),
558    corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}).
559
560%% Delete a single filter.
561%% The list do not differ, i.e., no filter removed, raise exception.
562delete_filter(List,List) -> corba:raise(#'CosNotifyFilter_FilterNotFound'{});
563delete_filter(List, _) -> List.
564
565
566%%-----------------------------------------------------------
567%% function : callSeq
568%% Arguments:
569%% Returns  :
570%%-----------------------------------------------------------
571callSeq(_OE_THIS, OE_FROM, State, Events, _Status) ->
572    corba:reply(OE_FROM, ok),
573    case cosNotification_eventDB:filter_events(Events, ?get_AllFilter(State)) of
574	{[], _}  when ?is_ANDOP(State) ->
575	    %% Since AND it doesn't matter what the proxies 'think'. Done.
576	    {noreply, State};
577	{[], Failed} ->
578	    %% Is OR but the Proxy may allow the events; pass on.
579	    forward(seq, State, Failed, 'MATCH');
580	{Passed, _}  when ?is_ANDOP(State) ->
581	    %% Since AND we only forward those who passed this objects filters.
582	    forward(seq, State, Passed, 'MATCH');
583	{Passed, []} ->
584	    %% Since OR we forward and tell the proxy to do no filtering.
585	    forward(seq, State, Passed, 'MATCHED');
586	{Passed, Failed} ->
587	    %% Since OR we forward both and instruct the proxy to check only
588	    %% the ones that failed.
589	    forward(seq, State, Passed, 'MATCHED'),
590	    forward(seq, State, Failed, 'MATCH')
591    end.
592
593
594%%-----------------------------------------------------------
595%% function : callAny
596%% Arguments:
597%% Returns  :
598%%-----------------------------------------------------------
599callAny(_OE_THIS, OE_FROM, State, Event, _Status) ->
600    corba:reply(OE_FROM, ok),
601    case cosNotification_eventDB:filter_events([Event], ?get_AllFilter(State)) of
602	{[], _}  when ?is_ANDOP(State) ->
603	    %% Since AND it doesn't matter what the proxies 'think'. Done.
604	    {noreply, State};
605	{[], [Failed]} ->
606	    %% Is OR but the Proxy may allow the event; pass on.
607	    forward(any, State, Failed, 'MATCH');
608	{[Passed], _}  when ?is_ANDOP(State) ->
609	    %% Since AND we only forward those who passed this objects filters.
610	    forward(any, State, Passed, 'MATCH');
611	{[Passed], _} ->
612	    %% Since OR we forward and instruct the proxy to do no checks.
613	    forward(any, State, Passed, 'MATCHED')
614    end.
615
616
617
618%% Forward events
619forward(Type, State, Event, Status) ->
620    forward(Type, ?get_AllSuppliers(State), State, Event, Status).
621forward(_, [], State, _, _) ->
622    {noreply, State};
623forward(any, [{_,H,_,_}|T], State, Event, Status) ->
624    case catch oe_CosNotificationComm_Event:callAny(H, Event, Status) of
625	ok ->
626	    ?DBG("CONSUMERADM FORWARD ANY: ~p~n",[Event]),
627	    forward(any, T, State, Event, Status);
628	{'EXCEPTION', E} when is_record(E, 'OBJECT_NOT_EXIST') ->
629	    orber:dbg("[~p] CosNotifyChannelAdmin_ConsumerAdmin:forward();~n"
630		      "Proxy no longer exists; dropping it: ~p",
631		      [?LINE, H], ?DEBUG_LEVEL),
632	    NewState = ?del_SupplierRef(State,H),
633	    forward(any, T, NewState, Event, Status);
634	R when ?is_PersistentConnection(State) ->
635	    orber:dbg("[~p] CosNotifyChannelAdmin_ConsumerAdmin:forward();~n"
636		      "Proxy behaves badly: ~p/~p",
637		      [?LINE, R, H], ?DEBUG_LEVEL),
638	    forward(any, T, State, Event, Status);
639	R ->
640	    orber:dbg("[~p] CosNotifyChannelAdmin_ConsumerAdmin:forward();~n"
641		      "Proxy behaves badly: ~p~n"
642		      "Dropping it: ~p", [?LINE, R, H], ?DEBUG_LEVEL),
643	    NewState = ?del_SupplierRef(State, H),
644	    forward(any, T, NewState, Event, Status)
645    end;
646forward(seq, [{_,H,_,_}|T], State, Event, Status) ->
647    case catch oe_CosNotificationComm_Event:callSeq(H, Event, Status) of
648	ok ->
649	    ?DBG("CONSUMERADM FORWARD SEQUENCE: ~p~n",[Event]),
650	    forward(seq, T, State, Event, Status);
651	{'EXCEPTION', E} when is_record(E, 'OBJECT_NOT_EXIST') ->
652	    orber:dbg("[~p] CosNotifyChannelAdmin_ConsumerAdmin:forward();~n"
653		      "Proxy no longer exists; dropping it: ~p",
654		      [?LINE, H], ?DEBUG_LEVEL),
655	    NewState = ?del_SupplierRef(State,H),
656	    forward(seq, T, NewState, Event, Status);
657	R when ?is_PersistentConnection(State) ->
658	    orber:dbg("[~p] CosNotifyChannelAdmin_ConsumerAdmin:forward();~n"
659		      "Proxy behaves badly: ~p/~p", [?LINE, R, H], ?DEBUG_LEVEL),
660	    forward(seq, T, State, Event, Status);
661	R ->
662	    orber:dbg("[~p] CosNotifyChannelAdmin_ConsumerAdmin:forward();~n"
663		      "Proxy behaves badly: ~p~n"
664		      "Dropping it: ~p", [?LINE, R, H], ?DEBUG_LEVEL),
665	    NewState = ?del_SupplierRef(State, H),
666	    forward(seq, T, NewState, Event, Status)
667    end.
668
669
670%%--------------- MISC FUNCTIONS, E.G. DEBUGGING -------------
671%%--------------- END OF MODULE ------------------------------
672