1%%--------------------------------------------------------------------
2%%
3%% %CopyrightBegin%
4%%
5%% Copyright Ericsson AB 2000-2015. All Rights Reserved.
6%%
7%% Licensed under the Apache License, Version 2.0 (the "License");
8%% you may not use this file except in compliance with the License.
9%% You may obtain a copy of the License at
10%%
11%%     http://www.apache.org/licenses/LICENSE-2.0
12%%
13%% Unless required by applicable law or agreed to in writing, software
14%% distributed under the License is distributed on an "AS IS" BASIS,
15%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16%% See the License for the specific language governing permissions and
17%% limitations under the License.
18%%
19%% %CopyrightEnd%
20%%
21%%
22%%----------------------------------------------------------------------
23%% File    : cosNotification_eventDB.erl
24%% Purpose :
25%% Purpose : This module is supposed to centralize Event storage.
26%% Comments:
27%% * Setting Order Policy to AnyOrder eq. Priority order
28%%
29%% * Setting Discard Policy to AnyOrder eq. RejectNewEvents.
30%%
31%% * DB ordering: Since the deliver- and discard-order may differ we need
32%%   two ets-tables, both of type 'ordered_set'. They contain:
33%%   - table 1 (T1): deliver order key and the associated event.
34%%   - table 2 (T2): discard order key.
35%%
36%%   When adding a new event we add, if necessary, related keys in T2.
37%%   For example, if we should discard events in FIFO order, the delivery
38%%   order may be set to Priority order. If the Max Event limit is reached
39%%   we first look in T2 to find out which event to discard by using and
40%%   reorder the key elements. T2 gives {TimeStamp, Priority}, which is used
41%%   to lookup in T1 as {Priority, TimeStamp}.
42%%   A TimeStamp is always included in the DB keys, even if FIFO or LIFO
43%%   is used, since lots of events probably will have the same prioity and
44%%   with a little bit of bad luck some events will never be delivered.
45%%
46%%   Note: deliver order AnyOrder and PriorityOrder is equal since the later
47%%         is defined as default.
48%%         discard order AnyOrder and RejectNewEvents is equal since the later
49%%         is defined as default.
50%%   The keys used is ('-' indicates T2 is not needed and, thus, not instantiated):
51%%
52%%   T1 policy         T1 Key             T2 Policy       T2 Key
53%%   ------------------------------------------------------------------
54%%   DeadlineOrder     {DL, Key, Prio}    PriorityOrder   {Prio, Key, DL}
55%%   DeadlineOrder     {DL, Key}          FifoOrder       {Key, DL}
56%%   DeadlineOrder     {DL, Key}          LifoOrder       {Key, DL}
57%%   DeadlineOrder     {DL, Key}          RejectNewEvents     -
58%%   DeadlineOrder     {DL, Key}          DeadlineOrder       -
59%%   FifoOrder         {Key, DL}          DeadlineOrder   {DL, Key}
60%%   FifoOrder         {Key, Prio}        PriorityOrder   {Prio, Key}
61%%   FifoOrder         Key                RejectNewEvents     -
62%%   FifoOrder         Key                Fifo                -
63%%   FifoOrder         Key                Lifo                -
64%%   PriorityOrder     {Prio, Key, DL}    DeadlineOrder   {DL, Key, Prio}
65%%   PriorityOrder     {Prio, Key}        Fifo            {Key, Prio}
66%%   PriorityOrder     {Prio, Key}        Lifo            {Key, Prio}
67%%   PriorityOrder     {Prio, Key}        RejectNewEvents     -
68%%   ------------------------------------------------------------------
69%%   DL == Deadline, Key == TimeStamp, Prio == Priority
70%%
71%%   NOTE: If defined, the Discard DB Keys are the same as in Event DB, except
72%%   that the first and last Key change place. {K1,K2}<->{K2,K1} and
73%%   {K1,K2,K3}<->{K3,K2,K1}.
74%%----------------------------------------------------------------------
75-module(cosNotification_eventDB).
76
77%%--------------- INCLUDES -----------------------------------
78-include_lib("orber/include/corba.hrl").
79-include_lib("orber/include/ifr_types.hrl").
80-include_lib("cosTime/include/TimeBase.hrl").
81
82%% Application files
83-include("CosNotification.hrl").
84-include("CosNotifyChannelAdmin.hrl").
85-include("CosNotifyComm.hrl").
86-include("CosNotifyFilter.hrl").
87
88-include("CosNotification_Definitions.hrl").
89
90%%--------------- EXPORTS ------------------------------------
91%% Internal Filter Functions
92-export([validate_event/5,
93	 create_db/4,
94	 destroy_db/1,
95	 get_event/1,
96	 get_event/2,
97	 get_events/2,
98	 get_events/3,
99	 delete_events/1,
100	 update/2,
101	 update/4,
102	 add_event/2,
103	 add_event/4,
104	 add_and_get_event/2,
105	 add_and_get_event/3,
106	 add_and_get_event/4,
107	 add_and_get_event/5,
108	 gc_events/2,
109	 gc_events_local/4,
110	 gc_start/2,
111	 filter_events/2,
112	 filter_events/3,
113	 status/2]).
114
115%%--------------- DATA STRUCTURES ----------------------------
116-record(dbRef, {orderRef, discardRef, orderPolicy, discardPolicy,
117		defPriority, maxEvents, defStopT, startTsupport,
118		stopTsupport, gcTime, gcLimit, timeRef}).
119
120
121
122%%--------------- DEFINES ------------------------------------
123
124-define(CreateRef(OR, DR, O, D, DP, ME, DS, StaT, StoT, GT, GL, TR),
125	#dbRef{orderRef=OR, discardRef=DR, orderPolicy=O, discardPolicy=D,
126	       defPriority=DP, maxEvents=ME, defStopT=DS, startTsupport=StaT,
127	       stopTsupport=StoT, gcTime=GT, gcLimit=round(ME*GL/100),
128	       timeRef=TR}).
129
130
131-define(get_OrderP(DR),            DR#dbRef.orderPolicy).
132-define(get_DiscardP(DR),          DR#dbRef.discardPolicy).
133-define(get_OrderRef(DR),          DR#dbRef.orderRef).
134-define(get_DiscardRef(DR),        DR#dbRef.orderRef).
135-define(get_DefPriority(DR),       DR#dbRef.defPriority).
136-define(get_MaxEvents(DR),         DR#dbRef.maxEvents).
137-define(get_DefStopT(DR),          DR#dbRef.defStopT).
138-define(get_StartTsupport(DR),     DR#dbRef.startTsupport).
139-define(get_StopTsupport(DR),      DR#dbRef.stopTsupport).
140-define(get_GCTime(DR),            DR#dbRef.gcTime).
141-define(get_GCLimit(DR),           DR#dbRef.gcLimit).
142-define(get_TimeRef(DR),           DR#dbRef.timeRef).
143
144-define(set_OrderP(DR, O),         DR#dbRef{orderPolicy = O}).
145-define(set_DiscardP(DR, D),       DR#dbRef{discardPolicy = D}).
146-define(set_OrderRef(DR, E),       DR#dbRef{orderRef = E}).
147-define(set_DiscardRef(DR, E),     DR#dbRef{orderRef = E}).
148-define(set_DefPriority(DR, DP),   DR#dbRef{defPriority = DP}).
149-define(set_MaxEvents(DR, ME),     DR#dbRef{maxEvents = ME}).
150-define(set_DefStopT(DR, DS),      DR#dbRef{defStopT = DS}).
151-define(set_StartTsupport(DR, B),  DR#dbRef{startTsupport = B}).
152-define(set_StopTsupport(DR, B),   DR#dbRef{stopTsupport = B}).
153
154-define(is_StartTNotSupported(DR), DR#dbRef.startTsupport == false).
155-define(is_StopTNotSupported(DR),  DR#dbRef.stopTsupport  == false).
156-define(is_TimeoutNotUsed(DR),     DR#dbRef.defStopT  == 0).
157
158
159%%------------------------------------------------------------
160%% function : status
161%% Arguments: DBRef
162%%            Key - which information we want.
163%% Returns  : Data related to the Key.
164%%------------------------------------------------------------
165status(DBRef, eventCounter) ->
166    ets:info(?get_OrderRef(DBRef), size);
167status(DBRef, {batchLimit, Limit}) ->
168    case ets:info(?get_OrderRef(DBRef), size) of
169	Current when is_integer(Current) andalso Current >= Limit ->
170	    ?debug_print("BATCH LIMIT (~p) REACHED, CONTAINS: ~p~n", [Limit, Current]),
171	    true;
172	_Other ->
173	    ?debug_print("BATCH LIMIT (~p) NOT REACHED, CONTAINS: ~p~n",
174			 [Limit, _Other]),
175	    false
176    end;
177status(DBRef, {batchLimit, Limit, TemporaryMax}) ->
178    case ets:info(?get_OrderRef(DBRef), size) of
179	Current when is_integer(Current) andalso Current >= TemporaryMax ->
180	    ?debug_print("MAX LIMIT (~p) REACHED, CONTAINS: ~p~n",
181			 [TemporaryMax, Current]),
182	    true;
183	Current when is_integer(Current) andalso Current >= Limit ->
184	    ?debug_print("BATCH LIMIT (~p) REACHED, CONTAINS: ~p~n", [Limit, Current]),
185	    true;
186	_Other ->
187	    ?debug_print("BATCH LIMIT (~p) NOT REACHED, CONTAINS: ~p~n",
188			 [Limit, _Other]),
189	    false
190    end;
191status(_, _) ->
192    error.
193
194
195%%------------------------------------------------------------
196%% function : gc_events_local
197%% Arguments: DBRef
198%% Returns  :
199%% Comment  : This function is intended for "emergency" GC, i.e.,
200%%            when the DB must discard events we should first try
201%%            to remove events with expired deadlines.
202%%------------------------------------------------------------
203gc_events_local(_, _, false, _) ->
204    ok;
205gc_events_local(_, _, _, 0) ->
206    ok;
207gc_events_local(ORef, DRef, _, _) ->
208    gc_loop(ets:first(ORef), ORef, DRef).
209
210%%------------------------------------------------------------
211%% function : gc_events
212%% Arguments: DBRef
213%%            Priority - 'low', 'medium' or 'high'; will determine
214%%            how important a fast gc is.
215%% Returns  :
216%% Comment  : This function is intended for background GC.
217%%------------------------------------------------------------
218gc_events(DBRef, _Priority) when ?is_TimeoutNotUsed(DBRef) ->
219    ok;
220gc_events(DBRef, _Priority) when ?is_StopTNotSupported(DBRef) ->
221    ok;
222gc_events(DBRef, Priority) ->
223    TS = erlang:monotonic_time(),
224    {resolution, TR} = lists:keyfind(resolution, 1, erlang:system_info(os_monotonic_time_source)),
225    case get(oe_GC_timestamp) of
226	Num when TS > Num ->
227	    put(oe_GC_timestamp, TS + ?get_GCTime(DBRef) * TR),
228	    spawn_link(?MODULE, gc_start, [DBRef, Priority]);
229	_->
230	    ok
231    end.
232
233%%------------------------------------------------------------
234%% function : gc_start
235%% Arguments:
236%% Returns  :
237%%------------------------------------------------------------
238gc_start(#dbRef{orderRef = ORef, discardRef = DRef}, Priority) ->
239    process_flag(priority, Priority),
240    gc_loop(ets:first(ORef), ORef, DRef).
241
242gc_loop('$end_of_table', _, _) ->
243    ok;
244gc_loop(Key, ORef, DRef) ->
245    [{Keys,DL,_,_,_}]=ets:lookup(ORef, Key),
246    case check_deadline(DL) of
247	true when DRef == undefined ->
248	    ets:delete(ORef, Key);
249	true ->
250	    ets:delete(ORef, Key),
251	    gc_discard_DB(Keys, DRef);
252	_ ->
253	    ok
254    end,
255    gc_loop(ets:next(ORef, Key), ORef, DRef).
256
257gc_discard_DB({Key1, Key2}, DRef) ->
258    ets:delete(DRef, {Key2, Key1});
259gc_discard_DB({Key1, Key2, Key3}, DRef) ->
260    ets:delete(DRef, {Key3, Key2, Key1}).
261
262%%------------------------------------------------------------
263%% function : create_FIFO_Key
264%% Arguments:
265%% Returns  :
266%%------------------------------------------------------------
267create_FIFO_Key() ->
268    {M, S, U} = erlang:timestamp(),
269    -M*1000000000000 - S*1000000 - U.
270
271%%------------------------------------------------------------
272%% function : convert_FIFO_Key
273%% Arguments:
274%% Returns  : A timestamp tuple
275%% Comment  : Used when we must reuse a timestamp, i.e., only
276%%            when we must reorder the DB.
277%%------------------------------------------------------------
278convert_FIFO_Key(Key) ->
279    K = abs(Key),
280    Secs = trunc(K/1000000),
281    M = trunc(K/1000000000000),
282    S = Secs-M*1000000,
283    U = K - S*1000000-M*1000000000000,
284    {M, S, U}.
285
286%%------------------------------------------------------------
287%% function : extract_priority
288%% Arguments: Event
289%%            Defalt Value
290%%            Mapping Filter Value
291%%             - false  value not needed (depends on QoS type)
292%%             - undefined value needed but no filter associated.
293%% Returns  :
294%%------------------------------------------------------------
295extract_priority(_, _, false) ->
296    false;
297extract_priority(#'CosNotification_StructuredEvent'
298		 {header = #'CosNotification_EventHeader'
299		  {variable_header = VH}}, DefPriority, undefined) ->
300    extract_value(VH, ?not_Priority, DefPriority);
301%% Maybe a unstructured event.
302extract_priority(_, DefPriority, undefined) ->
303    DefPriority;
304extract_priority(_, _, PriorityOverride) ->
305    %% Must have an associated MappingFilter for Priority.
306    PriorityOverride.
307
308%%------------------------------------------------------------
309%% function : extract_start_time
310%% Arguments:
311%% Returns  :
312%%------------------------------------------------------------
313extract_start_time(_, false, _) ->
314    false;
315extract_start_time(#'CosNotification_StructuredEvent'
316		 {header = #'CosNotification_EventHeader'
317		  {variable_header = VH}}, _, TRef) ->
318    ST = case extract_value(VH, ?not_StartTime, undefined) of
319	     UTC when is_record(UTC, 'TimeBase_UtcT') ->
320		 UTC;
321	     _ ->
322		 false
323	 end,
324    convert_time(ST, TRef, erlang:timestamp());
325extract_start_time(_, _, _) ->
326    false.
327
328%%------------------------------------------------------------
329%% function : extract_deadline
330%% Arguments: Structured Event
331%%            Default Timeout Value - TimeT or UtcT (see cosTime).
332%%            StopTSupported - boolean().
333%%            TRef - reference to TimeService
334%%            Mapping Filter Value
335%%             - false eq. value not needed (depends on QoS type)
336%%             - undefined eq. value needed but no filter associated.
337%%            Now - used when we want to reuse old TimeStamp which
338%%                  must be done when changing QoS.
339%% Returns  : A modified return from erlang:timestamp().
340%%------------------------------------------------------------
341extract_deadline(_, _, _, _, false) ->
342    false;
343extract_deadline(Event, DefaultT, StopTSupported, TRef, MappingVal) ->
344    extract_deadline(Event, DefaultT, StopTSupported, TRef, MappingVal, erlang:timestamp()).
345
346extract_deadline(_, _, _, _, false, _) ->
347    false;
348extract_deadline(#'CosNotification_StructuredEvent'
349		 {header = #'CosNotification_EventHeader'
350		  {variable_header = VH}}, DefaultT, StopTSupported,
351		 TRef, undefined, Now) ->
352    DL = case extract_value(VH, ?not_Timeout, undefined) of
353	     undefined when StopTSupported == true, TRef =/= undefined ->
354		 case extract_value(VH, ?not_StopTime, undefined) of
355		     undefined ->
356			 DefaultT;
357		     DefinedTime ->
358			 DefinedTime
359		 end;
360	     undefined ->
361		 DefaultT;
362	     DefinedTime ->
363		 DefinedTime
364	 end,
365    convert_time(DL, TRef, Now);
366%% Maybe a unstructured event.
367extract_deadline(_, Time, _, TRef, undefined, Now) ->
368    convert_time(Time, TRef, Now);
369extract_deadline(_, _, _, TRef, DOverride, Now) ->
370    %% Must have an associated MappingFilter defining a Deadline.
371    convert_time(DOverride, TRef, Now).
372
373convert_time(0, _, _) ->
374    false;
375convert_time(UTC, TRef, {M,S,U}) when is_record(UTC, 'TimeBase_UtcT') ->
376    case catch get_time_diff(UTC, TRef) of
377	{'EXCEPTION', _} ->
378	    false;
379	{'EXIT', _} ->
380	    false;
381	DL ->
382	    MicroSecs = round(DL/10),
383	    Secs      = round(MicroSecs/1000000),
384	    MegaSecs  = round(Secs/1000000),
385	    {-M-MegaSecs, -S-Secs+MegaSecs, -U-MicroSecs+Secs}
386    end;
387convert_time(DL, _, {M,S,U}) when is_integer(DL) ->
388    MicroSecs = round(DL/10),
389    Secs      = round(MicroSecs/1000000),
390    MegaSecs  = round(Secs/1000000),
391    {-M-MegaSecs, -S-Secs+MegaSecs, -U-MicroSecs+Secs};
392convert_time(_, _, _) ->
393    false.
394
395
396get_time_diff(UTC, TRef) ->
397    UTO  = 'CosTime_TimeService':universal_time(TRef),
398    UTO2 = 'CosTime_TimeService':uto_from_utc(TRef, UTC),
399    TIO  = 'CosTime_UTO':time_to_interval(UTO, UTO2),
400    #'TimeBase_IntervalT'{lower_bound=LB, upper_bound = UB} =
401	'CosTime_TIO':'_get_time_interval'(TIO),
402    UB-LB.
403
404check_deadline(DL) when is_tuple(DL) ->
405    {M,S,U}  = erlang:timestamp(),
406    DL >= {-M,-S,-U};
407check_deadline(_DL) ->
408    %% This case will cover if no timeout is set.
409    false.
410
411check_start_time(ST) when is_tuple(ST) ->
412    {M,S,U}  = erlang:timestamp(),
413    ST >= {-M,-S,-U};
414check_start_time(_ST) ->
415    %% This case will cover if no earliest delivery time is set.
416    true.
417
418%%------------------------------------------------------------
419%% function : extract_value
420%% Arguments: A Property Sequence
421%%            ID - wanted property string()
422%%            Other - default-value.
423%% Returns  : Value associated with given ID or default value.
424%%------------------------------------------------------------
425extract_value([], _, Other) ->
426    Other;
427extract_value([#'CosNotification_Property'{name=ID, value=V}|_], ID, _) ->
428    any:get_value(V);
429extract_value([_H|T], ID, Other) ->
430    extract_value(T, ID, Other).
431
432%%------------------------------------------------------------
433%% function : get_event
434%% Arguments:
435%% Returns  :
436%%------------------------------------------------------------
437get_event(DBRef) ->
438    get_event(DBRef, true).
439get_event(DBRef, Delete) ->
440    case get_events(DBRef, 1, Delete) of
441	{[], false} ->
442	    {[], false};
443	{[], false, Keys} ->
444	    {[], false, Keys};
445	{[Event], Bool} ->
446	    {Event, Bool};
447	{[Event], Bool, Keys} ->
448	    {Event, Bool, Keys}
449    end.
450
451%%------------------------------------------------------------
452%% function : get_events
453%% Arguments:
454%% Returns  : A list of events (possibly empty) and a boolean
455%%            indicating if event found.
456%% Comments : Try to extract Max events from the database.
457%%------------------------------------------------------------
458get_events(#dbRef{orderRef = ORef, discardRef = DRef}, Max) ->
459    event_loop(ets:last(ORef), ORef, DRef, Max, [], [], true).
460
461get_events(#dbRef{orderRef = ORef, discardRef = DRef}, Max, Delete) ->
462    event_loop(ets:last(ORef), ORef, DRef, Max, [], [], Delete).
463
464event_loop('$end_of_table', _, _, _, [], _, true) ->
465    {[], false};
466event_loop('$end_of_table', _, _, _, [], [], _) ->
467    {[], false, []};
468event_loop('$end_of_table', _ORef, _, _, Accum, _Keys, true) ->
469    {lists:reverse(Accum), true};
470event_loop('$end_of_table', _ORef, _, _, Accum, Keys, _) ->
471    {lists:reverse(Accum), true, Keys};
472event_loop(_, _ORef, _, 0, [], _Keys, true) ->
473    %% Only possible if some tries to pull a sequence of 0 events.
474    %% Should we really test for this case?
475    {[], false};
476event_loop(_, _ORef, _, 0, [], Keys, _) ->
477    {[], false, Keys};
478event_loop(_, _ORef, _, 0, Accum, _Keys, true) ->
479    {lists:reverse(Accum), true};
480event_loop(_, _ORef, _, 0, Accum, Keys, _) ->
481    {lists:reverse(Accum), true, Keys};
482event_loop(Key, ORef, undefined, Left, Accum, Keys, Delete) ->
483    [{_,DL,ST,_PO,Event}]=ets:lookup(ORef, Key),
484    case check_deadline(DL) of
485	true ->
486	    ets:delete(ORef, Key),
487	    event_loop(ets:prev(ORef, Key), ORef, undefined,
488		       Left, Accum, Keys, Delete);
489	false ->
490	    case check_start_time(ST) of
491		true when Delete == true ->
492		    ets:delete(ORef, Key),
493		    event_loop(ets:prev(ORef, Key), ORef, undefined,
494			       Left-1, [Event|Accum], Keys, Delete);
495		true ->
496		    event_loop(ets:prev(ORef, Key), ORef, undefined,
497			       Left-1, [Event|Accum], [{ORef, Key}|Keys], Delete);
498		false ->
499		    event_loop(ets:prev(ORef, Key), ORef, undefined,
500			       Left, Accum, Keys, Delete)
501	    end
502    end;
503event_loop({Key1, Key2}, ORef, DRef, Left, Accum, Keys, Delete) ->
504    [{_,DL,ST,_PO,Event}]=ets:lookup(ORef, {Key1, Key2}),
505    case check_deadline(DL) of
506	true ->
507	    ets:delete(ORef, {Key1, Key2}),
508	    ets:delete(DRef, {Key2, Key1}),
509	    event_loop(ets:prev(ORef, {Key1, Key2}), ORef, DRef,
510		       Left, Accum, Keys, Delete);
511	false ->
512	    case check_start_time(ST) of
513		true when Delete == true ->
514		    ets:delete(ORef, {Key1, Key2}),
515		    ets:delete(DRef, {Key2, Key1}),
516		    event_loop(ets:prev(ORef, {Key1, Key2}), ORef, DRef,
517			       Left-1, [Event|Accum], Keys, Delete);
518		true ->
519		    event_loop(ets:prev(ORef, {Key1, Key2}), ORef, DRef,
520			       Left-1, [Event|Accum],
521			       [{ORef, {Key1, Key2}}, {DRef, {Key2, Key1}}|Keys],
522			       Delete);
523		false ->
524		    event_loop(ets:prev(ORef, {Key1, Key2}), ORef, DRef,
525			       Left, Accum, Keys, Delete)
526	    end
527    end;
528event_loop({Key1, Key2, Key3}, ORef, DRef, Left, Accum, Keys, Delete) ->
529    [{_,DL,ST,_PO,Event}]=ets:lookup(ORef, {Key1, Key2, Key3}),
530    case check_deadline(DL) of
531	true ->
532	    ets:delete(ORef, {Key1, Key2, Key3}),
533	    ets:delete(DRef, {Key3, Key2, Key1}),
534	    event_loop(ets:prev(ORef, {Key1, Key2, Key3}), ORef, DRef,
535		       Left, Accum, Keys, Delete);
536	false ->
537	    case check_start_time(ST) of
538		true when Delete  == true ->
539		    ets:delete(ORef, {Key1, Key2, Key3}),
540		    ets:delete(DRef, {Key3, Key2, Key1}),
541		    event_loop(ets:prev(ORef, {Key1, Key2, Key3}), ORef, DRef,
542			       Left-1, [Event|Accum], Keys, Delete);
543		true ->
544		    event_loop(ets:prev(ORef, {Key1, Key2, Key3}), ORef, DRef,
545			       Left-1, [Event|Accum],
546			       [{ORef, {Key1, Key2, Key3}},
547				{DRef, {Key3, Key2, Key1}}|Keys], Delete);
548		false ->
549		    event_loop(ets:prev(ORef, {Key1, Key2, Key3}), ORef, DRef,
550			       Left, Accum, Keys, Delete)
551	    end
552    end.
553
554%%------------------------------------------------------------
555%% function : delete_events
556%% Arguments: EventList - what's returned by get_event, get_events
557%%                        and add_and_get_event.
558%% Returns  :
559%% Comment  : Shall be invoked when it's safe to premanently remove
560%%            the events found in the EventList.
561%%
562%%------------------------------------------------------------
563delete_events([]) ->
564    ok;
565delete_events([{DB, Key}|T]) ->
566    ets:delete(DB, Key),
567    delete_events(T).
568
569%%------------------------------------------------------------
570%% function : update
571%% Arguments:
572%% Returns  :
573%% Comment  : As default we shall deliver Events in Priority order.
574%%            Hence, if AnyOrder set we will still deliver in
575%%            Priority order.
576%%------------------------------------------------------------
577update(undefined, _QoS) ->
578    ok;
579update(DBRef, QoS) ->
580    update(DBRef, QoS, undefined, undefined).
581
582update(DBRef, QoS, LifeFilter, PrioFilter) ->
583    case updated_order(DBRef, ?not_GetOrderPolicy(QoS)) of
584	false ->
585	    case updated_discard(DBRef, ?not_GetDiscardPolicy(QoS)) of
586		false ->
587		    DBR2 = ?set_DefPriority(DBRef, ?not_GetPriority(QoS)),
588		    DBR3 = ?set_MaxEvents(DBR2, ?not_GetMaxEventsPerConsumer(QoS)),
589		    DBR4 = ?set_DefStopT(DBR3, ?not_GetTimeout(QoS)),
590		    DBR5 = ?set_StartTsupport(DBR4, ?not_GetStartTimeSupported(QoS)),
591		    DBR6 = ?set_StopTsupport(DBR5, ?not_GetStopTimeSupported(QoS)),
592		    case ets:info(?get_OrderRef(DBR6), size) of
593			N when N =< ?get_MaxEvents(DBR6) ->
594			    %% Even if the QoS MaxEvents have been changed
595			    %% we don't reach the limit.
596			    DBR6;
597			N ->
598			    %% The QoS MaxEvents must have been decreased.
599			    discard_events(DBR6, N-?get_MaxEvents(DBR6)),
600			    DBR6
601		    end;
602		true ->
603		    destroy_discard_db(DBRef),
604		    NewDBRef = create_db(QoS, ?get_GCTime(DBRef), ?get_GCLimit(DBRef),
605					 ?get_TimeRef(DBRef)),
606		    move_events(DBRef, NewDBRef, ets:first(?get_OrderRef(DBRef)),
607				LifeFilter, PrioFilter)
608	    end;
609	true ->
610	    destroy_discard_db(DBRef),
611	    NewDBRef = create_db(QoS, ?get_GCTime(DBRef), ?get_GCLimit(DBRef),
612				?get_TimeRef(DBRef)),
613	    move_events(DBRef, NewDBRef, ets:first(?get_OrderRef(DBRef)),
614			LifeFilter, PrioFilter)
615    end.
616
617updated_order(#dbRef{orderPolicy = Equal}, Equal) -> false;
618updated_order(#dbRef{orderPolicy = ?not_PriorityOrder}, ?not_AnyOrder) -> false;
619updated_order(#dbRef{orderPolicy = ?not_AnyOrder}, ?not_PriorityOrder) -> false;
620updated_order(_, _) -> true.
621
622updated_discard(#dbRef{discardPolicy = Equal}, Equal) -> false;
623updated_discard(#dbRef{discardPolicy = ?not_RejectNewEvents}, ?not_AnyOrder) -> false;
624updated_discard(#dbRef{discardPolicy = ?not_AnyOrder}, ?not_RejectNewEvents) -> false;
625updated_discard(_, _) -> true.
626
627move_events(DBRef, NewDBRef, '$end_of_table', _, _) ->
628    destroy_order_db(DBRef),
629    case ets:info(?get_OrderRef(NewDBRef), size) of
630	N when N =< ?get_MaxEvents(NewDBRef) ->
631	    %% Even if the QoS MaxEvents have been changed
632	    %% we don't reach the limit.
633	    NewDBRef;
634	N ->
635	    %% The QoS MaxEvents must have been decreased.
636	    discard_events(DBRef, N-?get_MaxEvents(NewDBRef)),
637	    NewDBRef
638    end;
639move_events(DBRef, NewDBRef, Key, LifeFilter, PrioFilter) ->
640    [{Keys, DeadLine, StartTime, PriorityOverride, Event}] =
641	ets:lookup(?get_OrderRef(DBRef), Key),
642    case check_deadline(DeadLine) of
643	true ->
644	    ok;
645	_->
646	    write_event(?get_OrderP(DBRef),
647			{Keys, DeadLine, StartTime, PriorityOverride, Event},
648			DBRef, NewDBRef, Key, LifeFilter, PrioFilter)
649    end,
650    ets:delete(?get_OrderRef(DBRef), Key),
651    move_events(DBRef, NewDBRef, ets:next(?get_OrderRef(DBRef), Key),
652		LifeFilter, PrioFilter).
653
654%% We cannot use do_add_event directly since we MUST lookup the timestamp (TS).
655write_event(?not_DeadlineOrder, {{_, TS, _Prio}, DL, ST, PO, Event}, _DBRef, NewDBRef,
656	    _Key, _LifeFilter, _PrioFilter) ->
657    StartT = update_starttime(NewDBRef, Event, ST),
658    %% Deadline and Priority previously extracted.
659    do_add_event(NewDBRef, Event, TS, DL, StartT, PO);
660write_event(?not_DeadlineOrder, {{_, TS}, DL, _ST, PO, Event}, _DBRef, NewDBRef,
661	    _Key, _LifeFilter, PrioFilter) ->
662    %% Priority not previously extracted.
663    POverride = update_priority(NewDBRef, PrioFilter, Event, PO),
664    StartT    = extract_start_time(Event, ?get_StartTsupport(NewDBRef),
665				   ?get_TimeRef(NewDBRef)),
666    do_add_event(NewDBRef, Event, TS, DL, StartT, POverride);
667write_event(?not_FifoOrder, {{TS, _PorD}, DL, ST, PO, Event}, _DBRef, NewDBRef,
668	    _Key, LifeFilter, PrioFilter) ->
669    %% Priority or Deadline have been extracted before but we cannot tell which.
670    POverride = update_priority(NewDBRef, PrioFilter, Event, PO),
671    DeadL     = update_deadline(NewDBRef, LifeFilter, Event, TS, DL),
672    StartT    = update_starttime(NewDBRef, Event, ST),
673    do_add_event(NewDBRef, Event, TS, DeadL, StartT, POverride);
674write_event(?not_FifoOrder, {TS, DL, ST, PO, Event}, _DBRef, NewDBRef,
675	    _Key, LifeFilter, PrioFilter) ->
676    %% Priority and Deadline not extracetd before. Do it now.
677    POverride = update_priority(NewDBRef, PrioFilter, Event, PO),
678    DeadL     = update_deadline(NewDBRef, LifeFilter, Event, TS, DL),
679    StartT    = update_starttime(NewDBRef, Event, ST),
680    do_add_event(NewDBRef, Event, TS, DeadL, StartT, POverride);
681%% Order Policy must be AnyOrder or PriorityOrder.
682write_event(_, {{_Prio, TS}, DL, ST, PO, Event}, _DBRef, NewDBRef,
683	    _Key, LifeFilter, _PrioFilter) ->
684    DeadL  = update_deadline(NewDBRef, LifeFilter, Event, TS, DL),
685    StartT = update_starttime(NewDBRef, Event, ST),
686    do_add_event(NewDBRef, Event, TS, DeadL, StartT, PO);
687write_event(_, {{_Prio, TS, DL}, DL, ST, PO, Event}, _DBRef, NewDBRef, _Key, _, _) ->
688    %% Both Deadline and Priority have been extracetd before.
689    StartT = update_starttime(NewDBRef, Event, ST),
690    do_add_event(NewDBRef, Event, TS, DL, StartT, PO).
691
692
693%%------------------------------------------------------------
694%% function : update_priority
695%% Arguments:
696%% Returns  :
697%% Comment  : The purpose with this function is to avoid
698%%            calling MappingFilter priority again, especially
699%%            deadline again (we especially want to avoid calling
700%%            since it may require intra-ORB communication.
701%%            Use only when doing an update.
702%%------------------------------------------------------------
703update_priority(DBRef, PrioFilter, Event, OldPrio) when is_atom(OldPrio) ->
704    get_prio_mapping_value(DBRef, PrioFilter, Event);
705update_priority(_DBRef, _PrioFilter, _Event, OldPrio) ->
706    OldPrio.
707
708%%------------------------------------------------------------
709%% function : update_deadline
710%% Arguments:
711%% Returns  :
712%% Comment  : The purpose with this function is to avoid
713%%            calling MappingFilter or parsing the events for
714%%            deadline again (we especially want to avoid calling
715%%            the MappingFilter since it may require intra-ORB
716%%            communication. Use only when doing an update.
717%%------------------------------------------------------------
718update_deadline(DBRef, _LifeFilter, _Event, _TS, _OldDeadL) when
719  ?get_DiscardP(DBRef) =/= ?not_DeadlineOrder,
720  ?get_OrderP(DBRef) =/= ?not_DeadlineOrder,
721  ?is_StopTNotSupported(DBRef) ->
722    %% We do not need to extract the Deadline since it will not be used.
723    false;
724update_deadline(DBRef, LifeFilter, Event, TS, OldDeadL) when is_atom(OldDeadL) ->
725    %% We need the Deadline and it have not been extracetd before.
726    DOverride = get_life_mapping_value(DBRef, LifeFilter, Event),
727    %% We must find out when the event was delivered; setting a deadline using
728    %% a new timestamp would not be accurate since we cannot tell for how long
729    %% the event have been waiting.
730    OldNow = convert_FIFO_Key(TS),
731    extract_deadline(Event, ?get_DefStopT(DBRef), ?get_StopTsupport(DBRef),
732		     ?get_TimeRef(DBRef), DOverride, OldNow);
733update_deadline(_DBRef, _LifeFilter, _Event, _TS, OldDeadL) ->
734    %% We need the Deadline and it have been extracetd before.
735    OldDeadL.
736
737%%------------------------------------------------------------
738%% function : update_starttime
739%% Arguments:
740%% Returns  :
741%% Comment  : The purpose with this function is to avoid
742%%            parsing the events for starttime again.
743%%            Use only when doing an update.
744%%------------------------------------------------------------
745update_starttime(DBRef, Event, OldStartT) when is_atom(OldStartT) ->
746    %% Probably not previously extracted; try to get it.
747    extract_start_time(Event, ?get_StartTsupport(DBRef), ?get_TimeRef(DBRef));
748update_starttime(_DBRef, _Event, OldStartT) ->
749    %% Previously extracted.
750    OldStartT.
751
752%%------------------------------------------------------------
753%% function : discard_events
754%% Arguments: DBRef
755%%            N - number of events we must discard.
756%% Returns  :
757%% Comment  : As default we shall Reject New Events when the limit
758%%            is reached. Any discard order will do the same.
759%%
760%%            This function can only be used for the discard policies
761%%            Fifo, Priority and Deadline. Any or RejectNewEvents
762%%            will not allow events to be stored at all, i.e., no events
763%%            to discard. Lifo will not be stored either since when
764%%            trying to add an event it is definitely the last event in.
765%%------------------------------------------------------------
766%% Since no Discard DB must the same Order policy.
767discard_events(#dbRef{orderRef = ORef, discardRef = undefined,
768		      discardPolicy = ?not_DeadlineOrder}, N) ->
769    ?debug_print("Discarding ~p events Deadline Order.",[N]),
770    index_loop_backward(ets:last(ORef), undefined, ORef, N);
771discard_events(#dbRef{orderRef = ORef, discardRef = DRef,
772		      discardPolicy = ?not_DeadlineOrder}, N) ->
773    ?debug_print("Discarding ~p events Deadline Order.",[N]),
774    index_loop_backward(ets:last(DRef), DRef, ORef, N);
775%% Fifo.
776discard_events(#dbRef{orderRef = ORef, discardRef = undefined,
777		      discardPolicy = ?not_FifoOrder}, N) ->
778    ?debug_print("Discarding ~p events Fifo Order.",[N]),
779    index_loop_backward(ets:last(ORef), undefined, ORef, N);
780discard_events(#dbRef{orderRef = ORef, discardRef = DRef,
781		      discardPolicy = ?not_FifoOrder}, N) ->
782    ?debug_print("Discarding ~p events Fifo Order.",[N]),
783    index_loop_backward(ets:last(DRef), DRef, ORef, N);
784%% Lifo- or Priority-Order
785discard_events(#dbRef{orderRef = ORef, discardRef = undefined}, N) ->
786    ?debug_print("Discarding ~p events Lifo- or Priority-Order.",[N]),
787    index_loop_forward(ets:first(ORef), undefined, ORef, N);
788discard_events(#dbRef{orderRef = ORef, discardRef = DRef}, N) ->
789    ?debug_print("Discarding ~p events Lifo- or Priority-Order.",[N]),
790    index_loop_forward(ets:first(DRef), DRef, ORef, N).
791
792
793index_loop_forward('$end_of_table', _, _, _Left) ->
794    ok;
795index_loop_forward(_, _, _, 0) ->
796    ok;
797index_loop_forward(Key, undefined, ORef, Left) ->
798    ets:delete(ORef, Key),
799    NewKey=ets:next(ORef, Key),
800    index_loop_forward(NewKey, undefined, ORef, Left-1);
801
802index_loop_forward({Key1, Key2, Key3}, DRef, ORef, Left) ->
803    ets:delete(DRef, {Key1, Key2, Key3}),
804    ets:delete(ORef, {Key3, Key2, Key1}),
805    NewKey=ets:next(DRef, {Key1, Key2, Key3}),
806    index_loop_forward(NewKey, DRef, ORef, Left-1);
807
808index_loop_forward({Key1, Key2}, DRef, ORef, Left) ->
809    ets:delete(DRef, {Key1, Key2}),
810    ets:delete(ORef, {Key2, Key1}),
811    NewKey=ets:next(DRef, {Key1, Key2}),
812    index_loop_forward(NewKey, DRef, ORef, Left-1).
813
814index_loop_backward('$end_of_table', _, _, _) ->
815    ok;
816index_loop_backward(_, _, _, 0) ->
817    ok;
818index_loop_backward(Key, undefined, ORef, Left) ->
819    ets:delete(ORef, Key),
820    NewKey=ets:prev(ORef, Key),
821    index_loop_backward(NewKey, undefined, ORef, Left-1);
822index_loop_backward({Key1, Key2}, DRef, ORef, Left) ->
823    ets:delete(DRef, {Key1, Key2}),
824    ets:delete(ORef, {Key2, Key1}),
825    NewKey=ets:prev(DRef, {Key1, Key2}),
826    index_loop_backward(NewKey, DRef, ORef, Left-1);
827index_loop_backward({Key1, Key2, Key3}, DRef, ORef, Left) ->
828    ets:delete(DRef, {Key1, Key2, Key3}),
829    ets:delete(ORef, {Key3, Key2, Key1}),
830    NewKey=ets:prev(DRef, {Key1, Key2, Key3}),
831    index_loop_backward(NewKey, DRef, ORef, Left-1).
832
833%%------------------------------------------------------------
834%% function : add_and_get_event
835%% Arguments: DBRef and Event
836%% Returns  : {[], bool()} | {Event, bool()}
837%% Comment  : This function is a mixture of ad anf get events.
838%%            The intended use to avoid storing an event when
839%%            not necessary.
840%%------------------------------------------------------------
841add_and_get_event(DBRef, Event) ->
842    add_and_get_event(DBRef, Event, undefined, undefined, true).
843
844add_and_get_event(DBRef, Event, Delete) ->
845    add_and_get_event(DBRef, Event, undefined, undefined, Delete).
846
847add_and_get_event(DBRef, Event, LifeFilter, PrioFilter) ->
848    add_and_get_event(DBRef, Event, LifeFilter, PrioFilter, true).
849
850add_and_get_event(DBRef, Event, LifeFilter, PrioFilter, Delete) ->
851    case ets:info(?get_OrderRef(DBRef), size) of
852	0 when ?is_StartTNotSupported(DBRef), ?is_StopTNotSupported(DBRef),
853	       Delete == true ->
854	    %% No stored events and no timeouts used; just return the event.
855	    {Event, false};
856	0 when ?is_StartTNotSupported(DBRef), ?is_StopTNotSupported(DBRef) ->
857	    %% No stored events and no timeouts used; just return the event.
858	    {Event, false, []};
859	0 when ?is_StartTNotSupported(DBRef) ->
860	    %% Only deadline supported, lookup values and cehck if ok.
861	    DOverride = get_life_mapping_value(DBRef, LifeFilter, Event),
862	    DL = extract_deadline(Event, ?get_DefStopT(DBRef),
863				  ?get_StopTsupport(DBRef), ?get_TimeRef(DBRef),
864				  DOverride),
865	    case check_deadline(DL) of
866		true when Delete == true ->
867		    %% Expired, just discard the event.
868		    {[], false};
869		true ->
870		    {[], false, []};
871		_ when Delete == true ->
872		    %% Not expired, we can safely return the event.
873		    {Event, false};
874		_ ->
875		    %% Not expired, we can safely return the event.
876		    {Event, false, []}
877	    end;
878	0 when ?is_StopTNotSupported(DBRef) ->
879	    %% Only starttime allowed, test if we can deliver the event now.
880	    ST = extract_start_time(Event, ?get_StartTsupport(DBRef),
881				    ?get_TimeRef(DBRef)),
882	    case check_start_time(ST) of
883		false when Delete == true ->
884		    DOverride = get_life_mapping_value(DBRef, LifeFilter, Event),
885		    POverride = get_prio_mapping_value(DBRef, PrioFilter, Event),
886		    DL = extract_deadline(Event, ?get_DefStopT(DBRef),
887					  ?get_StopTsupport(DBRef),
888					  ?get_TimeRef(DBRef), DOverride),
889		    do_add_event(DBRef, Event, create_FIFO_Key(), DL, ST, POverride),
890		    {[], true};
891		false ->
892		    DOverride = get_life_mapping_value(DBRef, LifeFilter, Event),
893		    POverride = get_prio_mapping_value(DBRef, PrioFilter, Event),
894		    DL = extract_deadline(Event, ?get_DefStopT(DBRef),
895					  ?get_StopTsupport(DBRef),
896					  ?get_TimeRef(DBRef), DOverride),
897		    do_add_event(DBRef, Event, create_FIFO_Key(), DL, ST, POverride),
898		    {[], true, []};
899		_ when Delete == true ->
900		    %% Starttime ok, just return the event.
901		    {Event, false};
902		_ ->
903		    %% Starttime ok, just return the event.
904		    {Event, false, []}
905	    end;
906	_->
907	    %% Event already stored, just have to accept the overhead.
908	    ST = extract_start_time(Event, ?get_StartTsupport(DBRef),
909				    ?get_TimeRef(DBRef)),
910	    DOverride = get_life_mapping_value(DBRef, LifeFilter, Event),
911	    POverride = get_prio_mapping_value(DBRef, PrioFilter, Event),
912	    DL = extract_deadline(Event, ?get_DefStopT(DBRef),
913				  ?get_StopTsupport(DBRef),
914				  ?get_TimeRef(DBRef), DOverride),
915	    do_add_event(DBRef, Event, create_FIFO_Key(), DL, ST, POverride),
916	    get_event(DBRef, Delete)
917    end.
918
919%%------------------------------------------------------------
920%% function : add_event
921%% Arguments: DBRef and Event
922%% Returns  : true (or whatever 'ets:insert' returns) |
923%%            {'EXCEPTION',  #'IMP_LIMIT'{}}
924%% Comment  : As default we shall deliver Events in Priority order.
925%%            Hence, if AnyOrder set we will still deliver in
926%%            Priority order. But we cannot use only the Priority
927%%            value since if "all" events have the same priority
928%%            there is a risk that some never will be delivered if
929%%            the EventDB always contain events.
930%%
931%%            When discard and order policy is equal we only use one
932%%            DB since all we have to do is to "read from the other
933%%            end" to discard the correct event(s).
934%%
935%%            In the discard DB we must also store keys necessary to
936%%            lookup the event in the order DB.
937%%
938%%            If event limit reached 'IMPL_LIMIT' is raised if
939%%            the discard policy is RejectNewEvents or AnyOrder.
940%%            Theses two policies we currently define to be equal.
941%%------------------------------------------------------------
942
943add_event(DBRef, Event) ->
944    %% Save overhead by first checking if we really need to extract
945    %% Deadline and/or Priority.
946    Deadline = get_life_mapping_value(DBRef, undefined, Event),
947    Priority = get_prio_mapping_value(DBRef, undefined, Event),
948    add_event_helper(DBRef, Event, Deadline, Priority).
949
950add_event(DBRef, Event, LifeFilter, PrioFilter) ->
951    %% Save overhead by first checking if we really need to extract
952    %% Deadline and/or Priority.
953    Deadline = get_life_mapping_value(DBRef, LifeFilter, Event),
954    Priority = get_prio_mapping_value(DBRef, PrioFilter, Event),
955    add_event_helper(DBRef, Event, Deadline, Priority).
956
957add_event_helper(DBRef, Event, DOverride, POverride) ->
958    case ets:info(?get_OrderRef(DBRef), size) of
959	N when N < ?get_MaxEvents(DBRef), N > ?get_GCLimit(DBRef) ->
960	    gc_events(DBRef, low),
961	    DL = extract_deadline(Event, ?get_DefStopT(DBRef),
962				  ?get_StopTsupport(DBRef), ?get_TimeRef(DBRef),
963				  DOverride),
964	    case check_deadline(DL) of
965		true ->
966		    true;
967		_ ->
968		    ST = extract_start_time(Event, ?get_StartTsupport(DBRef),
969					    ?get_TimeRef(DBRef)),
970		    do_add_event(DBRef, Event, create_FIFO_Key(), DL, ST, POverride)
971	    end;
972	N when N < ?get_MaxEvents(DBRef) ->
973	    DL = extract_deadline(Event, ?get_DefStopT(DBRef),
974				  ?get_StopTsupport(DBRef), ?get_TimeRef(DBRef),
975				  DOverride),
976	    case check_deadline(DL) of
977		true ->
978		    true;
979		_ ->
980		    ST = extract_start_time(Event, ?get_StartTsupport(DBRef),
981					    ?get_TimeRef(DBRef)),
982		    do_add_event(DBRef, Event, create_FIFO_Key(), DL, ST, POverride)
983	    end;
984	_N when ?get_DiscardP(DBRef) == ?not_RejectNewEvents ->
985	    gc_events(DBRef, low),
986	    corba:raise(#'IMP_LIMIT'{completion_status=?COMPLETED_NO});
987	_N when ?get_DiscardP(DBRef) == ?not_AnyOrder ->
988	    gc_events(DBRef, low),
989	    corba:raise(#'IMP_LIMIT'{completion_status=?COMPLETED_NO});
990	_N when ?get_DiscardP(DBRef) == ?not_LifoOrder ->
991	    gc_events(DBRef, low),
992	    corba:raise(#'IMP_LIMIT'{completion_status=?COMPLETED_NO});
993	_N ->
994	    gc_events(DBRef, low),
995	    %% Other discard policy; we must first store the event
996	    %% and the look up in the Discard DB which event we
997	    %% should remove.
998	    DL = extract_deadline(Event, ?get_DefStopT(DBRef),
999				  ?get_StopTsupport(DBRef), ?get_TimeRef(DBRef),
1000				  DOverride),
1001	    case check_deadline(DL) of
1002		true ->
1003		    true;
1004		_ ->
1005		    ST = extract_start_time(Event, ?get_StartTsupport(DBRef),
1006					    ?get_TimeRef(DBRef)),
1007		    do_add_event(DBRef, Event, create_FIFO_Key(), DL, ST, POverride),
1008		    discard_events(DBRef, 1)
1009	    end
1010    end.
1011
1012
1013do_add_event(#dbRef{orderRef = ORef, orderPolicy = ?not_DeadlineOrder,
1014		 discardRef = DRef, discardPolicy = ?not_PriorityOrder,
1015		 defPriority = DefPrio, defStopT = _DefStopT}, Event, Key, DL, ST, PO) ->
1016    Prio = extract_priority(Event, DefPrio, PO),
1017    ets:insert(ORef, {{DL, Key, Prio}, DL, ST, PO, Event}),
1018    ets:insert(DRef, {{Prio, Key, DL}});
1019do_add_event(#dbRef{orderRef = ORef, orderPolicy = ?not_DeadlineOrder,
1020		 discardRef = DRef, discardPolicy = ?not_FifoOrder,
1021		 defStopT = _DefStopT}, Event, Key, DL, ST, PO) ->
1022    ets:insert(ORef, {{DL, Key}, DL, ST, PO, Event}),
1023    ets:insert(DRef, {{Key, DL}});
1024do_add_event(#dbRef{orderRef = ORef, orderPolicy = ?not_DeadlineOrder,
1025		 discardRef = DRef, discardPolicy = ?not_LifoOrder,
1026		 defStopT = _DefStopT}, Event, Key, DL, ST, PO) ->
1027    ets:insert(ORef, {{DL, Key}, DL, ST, PO, Event}),
1028    ets:insert(DRef, {{Key, DL}});
1029%% Either the same (DeadlineOrder), RejectNewEvents or AnyOrder. No need
1030%% to store anything in the discard policy, i.e., if the same we'll just
1031%% read "from the other end" and AnyOrder and RejectNewEvents is equal.
1032do_add_event(#dbRef{orderRef = ORef, orderPolicy = ?not_DeadlineOrder,
1033		 defStopT = _DefStopT}, Event, Key, DL, ST, PO) ->
1034    ets:insert(ORef, {{DL, Key}, DL, ST, PO, Event});
1035
1036
1037do_add_event(#dbRef{orderRef = ORef, orderPolicy = ?not_FifoOrder,
1038		 discardRef = DRef, discardPolicy = ?not_DeadlineOrder,
1039		 defStopT = _DefStopT}, Event, Key, DL, ST, PO) ->
1040    ets:insert(ORef, {{Key, DL}, DL, ST, PO, Event}),
1041    ets:insert(DRef, {{DL, Key}});
1042do_add_event(#dbRef{orderRef = ORef, orderPolicy = ?not_FifoOrder,
1043		 discardRef = DRef, discardPolicy = ?not_PriorityOrder,
1044		 defPriority = DefPrio}, Event, Key, DL, ST, PO) ->
1045    Prio = extract_priority(Event, DefPrio, PO),
1046    ets:insert(ORef, {{Key, Prio}, DL, ST, PO, Event}),
1047    ets:insert(DRef, {{Prio, Key}});
1048%% The discard policy must RejectNewEvents, AnyOrder, Fifo or Lifo order.
1049do_add_event(#dbRef{orderRef = ORef, orderPolicy = ?not_FifoOrder,
1050		 discardRef = _DRef}, Event, Key, DL, ST, PO) ->
1051    ets:insert(ORef, {Key, DL, ST, PO, Event});
1052
1053%% Order Policy must be AnyOrder or PriorityOrder.
1054do_add_event(#dbRef{orderRef = ORef,
1055		 discardRef = DRef, discardPolicy = ?not_DeadlineOrder,
1056		 defPriority = DefPrio, defStopT = _DefStopT}, Event, Key, DL, ST, PO) ->
1057    Prio = extract_priority(Event, DefPrio, PO),
1058    ets:insert(ORef, {{Prio, Key, DL}, DL, ST, PO, Event}),
1059    ets:insert(DRef, {{DL, Key, Prio}});
1060do_add_event(#dbRef{orderRef = ORef,
1061		 discardRef = DRef, discardPolicy = ?not_FifoOrder,
1062		 defPriority = DefPrio}, Event, Key, DL, ST, PO) ->
1063    Prio = extract_priority(Event, DefPrio, PO),
1064    ets:insert(ORef, {{Prio, Key}, DL, ST, PO, Event}),
1065    ets:insert(DRef, {{Key, Prio}});
1066
1067do_add_event(#dbRef{orderRef = ORef,
1068		 discardRef = DRef, discardPolicy = ?not_LifoOrder,
1069		 defPriority = DefPrio}, Event, Key, DL, ST, PO) ->
1070    Prio = extract_priority(Event, DefPrio, PO),
1071    ets:insert(ORef, {{Prio, Key}, DL, ST, PO, Event}),
1072    ets:insert(DRef, {{Key, Prio}});
1073
1074%% Order Policy must be AnyOrder or PriorityOrder and Discard Policy must be
1075%% AnyOrder or RejectNewEvents
1076do_add_event(#dbRef{orderRef = ORef, defPriority = DefPrio}, Event, Key, DL, ST, PO) ->
1077    Prio = extract_priority(Event, DefPrio, PO),
1078    ets:insert(ORef, {{Prio, Key}, DL, ST, PO, Event}).
1079
1080%%------------------------------------------------------------
1081%% function : destroy_db
1082%% Arguments: A DB reference
1083%% Returns  :
1084%%------------------------------------------------------------
1085destroy_db(#dbRef{orderRef = ORef, discardRef = undefined}) ->
1086    ets:delete(ORef);
1087destroy_db(#dbRef{orderRef = ORef, discardRef = DRef}) ->
1088    ets:delete(ORef),
1089    ets:delete(DRef).
1090
1091%%------------------------------------------------------------
1092%% function : destroy_discard_db
1093%% Arguments: A DB reference
1094%% Returns  :
1095%%------------------------------------------------------------
1096destroy_discard_db(#dbRef{discardRef = undefined}) ->
1097    ok;
1098destroy_discard_db(#dbRef{discardRef = DRef}) ->
1099    ets:delete(DRef).
1100
1101%%------------------------------------------------------------
1102%% function : destroy_order_db
1103%% Arguments: A DB reference
1104%% Returns  :
1105%%------------------------------------------------------------
1106destroy_order_db(#dbRef{orderRef = ORef}) ->
1107    ets:delete(ORef).
1108
1109%%------------------------------------------------------------
1110%% function : create_db
1111%% Arguments: QoS (local representation).
1112%% Returns  : A DB reference
1113%%------------------------------------------------------------
1114create_db(QoS, GCTime, GCLimit, TimeRef) ->
1115    DiscardRef =
1116	case {?not_GetDiscardPolicy(QoS), ?not_GetOrderPolicy(QoS)} of
1117	    {Equal, Equal} ->
1118		undefined;
1119	    {?not_PriorityOrder, ?not_AnyOrder} ->
1120		%% NOTE: Any- and Priority-Order delivery policy is equal.
1121		undefined;
1122	    {?not_RejectNewEvents, _} ->
1123		undefined;
1124	    {?not_AnyOrder, _} ->
1125		undefined;
1126	    {?not_LifoOrder, ?not_FifoOrder} ->
1127		undefined;
1128	    _ ->
1129		ets:new(oe_ets, [set, public, ordered_set])
1130	end,
1131    DBRef = ?CreateRef(ets:new(oe_ets, [set, public, ordered_set]),
1132		       DiscardRef,
1133		       ?not_GetOrderPolicy(QoS), ?not_GetDiscardPolicy(QoS),
1134		       ?not_GetPriority(QoS), ?not_GetMaxEventsPerConsumer(QoS),
1135		       ?not_GetTimeout(QoS), ?not_GetStartTimeSupported(QoS),
1136		       ?not_GetStopTimeSupported(QoS), GCTime, GCLimit, TimeRef),
1137    if
1138	?is_TimeoutNotUsed(DBRef), ?is_StopTNotSupported(DBRef) ->
1139	    ok;
1140	true ->
1141	    TS = erlang:monotonic_time(),
1142	    {resolution, TR} = lists:keyfind(resolution, 1,
1143					     erlang:system_info(os_monotonic_time_source)),
1144	    put(oe_GC_timestamp, TS+GCTime*TR)
1145    end,
1146    DBRef.
1147
1148%%------------------------------------------------------------
1149%% function : get_prio_mapping_value
1150%% Arguments: A MappingFilter reference | undefined
1151%%            Event (Any or Structured)
1152%% Returns  : undefined | Data
1153%%------------------------------------------------------------
1154get_prio_mapping_value(DBRef, _, _) when ?get_DiscardP(DBRef) =/= ?not_PriorityOrder,
1155					 ?get_OrderP(DBRef) =/= ?not_AnyOrder,
1156					 ?get_OrderP(DBRef) =/= ?not_PriorityOrder ->
1157    false;
1158get_prio_mapping_value(_, undefined, _) ->
1159    undefined;
1160get_prio_mapping_value(_, MFilter, Event) when is_record(Event, 'any') ->
1161    case catch 'CosNotifyFilter_MappingFilter':match(MFilter, Event) of
1162	{false, DefVal} when is_record(DefVal, 'any') ->
1163	    any:get_value(DefVal);
1164	{true, Matched} when is_record(Matched, 'any') ->
1165	    any:get_value(Matched);
1166	_ ->
1167	    undefined
1168    end;
1169get_prio_mapping_value(_, MFilter, Event) ->
1170    case catch 'CosNotifyFilter_MappingFilter':match_structured(MFilter, Event) of
1171	{false, DefVal} when is_record(DefVal, 'any') ->
1172	    any:get_value(DefVal);
1173	{true, Matched} when is_record(Matched, 'any') ->
1174	    any:get_value(Matched);
1175	_ ->
1176	    undefined
1177    end.
1178
1179%%------------------------------------------------------------
1180%% function : get_life_mapping_value
1181%% Arguments: A MappingFilter reference | undefined
1182%%            Event (Any or Structured)
1183%% Returns  : undefined | Data
1184%%------------------------------------------------------------
1185get_life_mapping_value(DBRef, _, _) when ?get_DiscardP(DBRef) =/= ?not_DeadlineOrder,
1186					 ?get_OrderP(DBRef) =/= ?not_DeadlineOrder,
1187					 ?is_StopTNotSupported(DBRef) ->
1188    false;
1189get_life_mapping_value(_, undefined, _) ->
1190    undefined;
1191get_life_mapping_value(_, MFilter, Event) when is_record(Event, 'any') ->
1192    case catch 'CosNotifyFilter_MappingFilter':match(MFilter, Event) of
1193	{false, DefVal} when is_record(DefVal, 'any') ->
1194	    any:get_value(DefVal);
1195	{true, Matched} when is_record(Matched, 'any') ->
1196	    any:get_value(Matched);
1197	_ ->
1198	    undefined
1199    end;
1200get_life_mapping_value(_, MFilter, Event) ->
1201    case catch 'CosNotifyFilter_MappingFilter':match_structured(MFilter, Event) of
1202	{false, DefVal} when is_record(DefVal, 'any') ->
1203	    any:get_value(DefVal);
1204	{true, Matched} when is_record(Matched, 'any') ->
1205	    any:get_value(Matched);
1206	_ ->
1207	    undefined
1208    end.
1209
1210%%------------------------------------------------------------
1211%% function : validate_event
1212%% Arguments: Subscribe data
1213%%            A sequence of Events, 'structured' or an 'any' record
1214%%            A list of filter references
1215%%            Status, i.e., do we have to filter the events or just check subscr.
1216%% Returns  : A tuple of two lists; list1 the events that passed
1217%%            and list2 the events that didn't pass.
1218%%------------------------------------------------------------
1219validate_event(true, Events, Filters, _, 'MATCH') ->
1220    filter_events(Events, Filters, false);
1221validate_event(true, Events, _Filters, _, _) ->
1222    {Events, []};
1223validate_event({_Which, _WC}, Event, Filters, _, 'MATCH') when is_record(Event, any) ->
1224    filter_events(Event, Filters, false);
1225validate_event({_Which, _WC}, Event, _Filters, _, _) when is_record(Event, any) ->
1226    {Event, []};
1227validate_event({Which, WC}, Events, Filters, DBRef, 'MATCH')  ->
1228    Passed=validate_event2(DBRef, Events, Which, WC, []),
1229    filter_events(Passed, Filters, true);
1230validate_event({Which, WC}, Events, _Filters, DBRef, _)  ->
1231    Passed=validate_event2(DBRef, Events, Which, WC, []),
1232    {lists:reverse(Passed), []}.
1233
1234validate_event2(_, [], _, _, []) ->
1235    [];
1236validate_event2(_, [], _, _, Acc) ->
1237    Acc;
1238validate_event2(DBRef, [Event|T], Which, WC, Acc) ->
1239    ET = ((Event#'CosNotification_StructuredEvent'.header)
1240	  #'CosNotification_EventHeader'.fixed_header)
1241	#'CosNotification_FixedEventHeader'.event_type,
1242    CheckList =
1243	case Which of
1244	    both ->
1245		[ET];
1246	    domain ->
1247		[ET,
1248		 ET#'CosNotification_EventType'{type_name=""},
1249		 ET#'CosNotification_EventType'{type_name="*"}];
1250	    type ->
1251		[ET,
1252		 ET#'CosNotification_EventType'{domain_name=""},
1253		 ET#'CosNotification_EventType'{domain_name="*"}];
1254	    _ ->
1255		[ET,
1256		 ET#'CosNotification_EventType'{type_name=""},
1257		 ET#'CosNotification_EventType'{type_name="*"},
1258		 ET#'CosNotification_EventType'{domain_name=""},
1259		 ET#'CosNotification_EventType'{domain_name="*"}]
1260	end,
1261    case check_subscription(DBRef, CheckList) of
1262	true ->
1263	    validate_event2(DBRef, T, Which, WC, [Event|Acc]);
1264	_->
1265	    case catch cosNotification_Filter:match_types(
1266			 ET#'CosNotification_EventType'.domain_name,
1267			 ET#'CosNotification_EventType'.type_name,
1268			 WC) of
1269		true ->
1270		    validate_event2(DBRef, T, Which, WC, [Event|Acc]);
1271		_->
1272		    validate_event2(DBRef, T, Which, WC, Acc)
1273	    end
1274    end.
1275
1276check_subscription(_, []) ->
1277    false;
1278check_subscription(DBRef, [H|T]) ->
1279    case ets:lookup(DBRef, H) of
1280	[] ->
1281	    check_subscription(DBRef, T);
1282	_ ->
1283	    true
1284    end.
1285
1286
1287%%------------------------------------------------------------
1288%% function : filter_events
1289%% Arguments: A sequence of structured Events or #any
1290%% Returns  : A tuple of two lists; list1 the events that passed
1291%%            and list2 the events that didn't pass.
1292%%------------------------------------------------------------
1293
1294filter_events(Events, []) ->
1295    {Events, []};
1296filter_events(Events, Filters) ->
1297    filter_events(Events, Filters, [], [], false).
1298
1299filter_events(Events, [], false) ->
1300    {Events, []};
1301filter_events(Events, [], _) ->
1302    {lists:reverse(Events), []};
1303filter_events(Events, Filters, Reversed) ->
1304    filter_events(Events, Filters, [], [], Reversed).
1305
1306filter_events([], _, AccPassed, AccFailed, false) ->
1307    {lists:reverse(AccPassed), lists:reverse(AccFailed)};
1308filter_events([], _, AccPassed, AccFailed, _) ->
1309    {AccPassed, AccFailed};
1310filter_events([H|T], Filters, AccPassed, AccFailed, Reversed) ->
1311    case call_filters(Filters, H) of
1312	true ->
1313	    filter_events(T, Filters, [H|AccPassed], AccFailed, Reversed);
1314	_ ->
1315	    filter_events(T, Filters, AccPassed, [H|AccFailed], Reversed)
1316    end;
1317filter_events(Any, Filters, _AccPassed, _AccFailed, _Reversed) ->
1318    case call_filters(Filters, Any) of
1319	true ->
1320	    {Any, []};
1321	_ ->
1322	    {[], Any}
1323    end.
1324
1325call_filters([], _) ->
1326    false;
1327call_filters([{_,H}|T], Event) when is_record(Event, any) ->
1328    case catch 'CosNotifyFilter_Filter':match(H, Event) of
1329	true ->
1330	    true;
1331	_->
1332	    call_filters(T, Event)
1333    end;
1334call_filters([{_,H}|T], Event) when ?not_isConvertedAny(Event) ->
1335    case catch 'CosNotifyFilter_Filter':match(H,
1336					      Event#'CosNotification_StructuredEvent'.remainder_of_body) of
1337	true ->
1338	    true;
1339	_->
1340	    call_filters(T, Event)
1341    end;
1342call_filters([{_,H}|T], Event) ->
1343    case catch 'CosNotifyFilter_Filter':match_structured(H, Event) of
1344	true ->
1345	    true;
1346	_->
1347	    call_filters(T, Event)
1348    end.
1349
1350
1351%%--------------- END OF MODULE ------------------------------
1352