1%%-------------------------------------------------------------------- 2%% 3%% %CopyrightBegin% 4%% 5%% Copyright Ericsson AB 2000-2015. All Rights Reserved. 6%% 7%% Licensed under the Apache License, Version 2.0 (the "License"); 8%% you may not use this file except in compliance with the License. 9%% You may obtain a copy of the License at 10%% 11%% http://www.apache.org/licenses/LICENSE-2.0 12%% 13%% Unless required by applicable law or agreed to in writing, software 14%% distributed under the License is distributed on an "AS IS" BASIS, 15%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16%% See the License for the specific language governing permissions and 17%% limitations under the License. 18%% 19%% %CopyrightEnd% 20%% 21%% 22%%---------------------------------------------------------------------- 23%% File : cosNotification_eventDB.erl 24%% Purpose : 25%% Purpose : This module is supposed to centralize Event storage. 26%% Comments: 27%% * Setting Order Policy to AnyOrder eq. Priority order 28%% 29%% * Setting Discard Policy to AnyOrder eq. RejectNewEvents. 30%% 31%% * DB ordering: Since the deliver- and discard-order may differ we need 32%% two ets-tables, both of type 'ordered_set'. They contain: 33%% - table 1 (T1): deliver order key and the associated event. 34%% - table 2 (T2): discard order key. 35%% 36%% When adding a new event we add, if necessary, related keys in T2. 37%% For example, if we should discard events in FIFO order, the delivery 38%% order may be set to Priority order. If the Max Event limit is reached 39%% we first look in T2 to find out which event to discard by using and 40%% reorder the key elements. T2 gives {TimeStamp, Priority}, which is used 41%% to lookup in T1 as {Priority, TimeStamp}. 42%% A TimeStamp is always included in the DB keys, even if FIFO or LIFO 43%% is used, since lots of events probably will have the same prioity and 44%% with a little bit of bad luck some events will never be delivered. 45%% 46%% Note: deliver order AnyOrder and PriorityOrder is equal since the later 47%% is defined as default. 48%% discard order AnyOrder and RejectNewEvents is equal since the later 49%% is defined as default. 50%% The keys used is ('-' indicates T2 is not needed and, thus, not instantiated): 51%% 52%% T1 policy T1 Key T2 Policy T2 Key 53%% ------------------------------------------------------------------ 54%% DeadlineOrder {DL, Key, Prio} PriorityOrder {Prio, Key, DL} 55%% DeadlineOrder {DL, Key} FifoOrder {Key, DL} 56%% DeadlineOrder {DL, Key} LifoOrder {Key, DL} 57%% DeadlineOrder {DL, Key} RejectNewEvents - 58%% DeadlineOrder {DL, Key} DeadlineOrder - 59%% FifoOrder {Key, DL} DeadlineOrder {DL, Key} 60%% FifoOrder {Key, Prio} PriorityOrder {Prio, Key} 61%% FifoOrder Key RejectNewEvents - 62%% FifoOrder Key Fifo - 63%% FifoOrder Key Lifo - 64%% PriorityOrder {Prio, Key, DL} DeadlineOrder {DL, Key, Prio} 65%% PriorityOrder {Prio, Key} Fifo {Key, Prio} 66%% PriorityOrder {Prio, Key} Lifo {Key, Prio} 67%% PriorityOrder {Prio, Key} RejectNewEvents - 68%% ------------------------------------------------------------------ 69%% DL == Deadline, Key == TimeStamp, Prio == Priority 70%% 71%% NOTE: If defined, the Discard DB Keys are the same as in Event DB, except 72%% that the first and last Key change place. {K1,K2}<->{K2,K1} and 73%% {K1,K2,K3}<->{K3,K2,K1}. 74%%---------------------------------------------------------------------- 75-module(cosNotification_eventDB). 76 77%%--------------- INCLUDES ----------------------------------- 78-include_lib("orber/include/corba.hrl"). 79-include_lib("orber/include/ifr_types.hrl"). 80-include_lib("cosTime/include/TimeBase.hrl"). 81 82%% Application files 83-include("CosNotification.hrl"). 84-include("CosNotifyChannelAdmin.hrl"). 85-include("CosNotifyComm.hrl"). 86-include("CosNotifyFilter.hrl"). 87 88-include("CosNotification_Definitions.hrl"). 89 90%%--------------- EXPORTS ------------------------------------ 91%% Internal Filter Functions 92-export([validate_event/5, 93 create_db/4, 94 destroy_db/1, 95 get_event/1, 96 get_event/2, 97 get_events/2, 98 get_events/3, 99 delete_events/1, 100 update/2, 101 update/4, 102 add_event/2, 103 add_event/4, 104 add_and_get_event/2, 105 add_and_get_event/3, 106 add_and_get_event/4, 107 add_and_get_event/5, 108 gc_events/2, 109 gc_events_local/4, 110 gc_start/2, 111 filter_events/2, 112 filter_events/3, 113 status/2]). 114 115%%--------------- DATA STRUCTURES ---------------------------- 116-record(dbRef, {orderRef, discardRef, orderPolicy, discardPolicy, 117 defPriority, maxEvents, defStopT, startTsupport, 118 stopTsupport, gcTime, gcLimit, timeRef}). 119 120 121 122%%--------------- DEFINES ------------------------------------ 123 124-define(CreateRef(OR, DR, O, D, DP, ME, DS, StaT, StoT, GT, GL, TR), 125 #dbRef{orderRef=OR, discardRef=DR, orderPolicy=O, discardPolicy=D, 126 defPriority=DP, maxEvents=ME, defStopT=DS, startTsupport=StaT, 127 stopTsupport=StoT, gcTime=GT, gcLimit=round(ME*GL/100), 128 timeRef=TR}). 129 130 131-define(get_OrderP(DR), DR#dbRef.orderPolicy). 132-define(get_DiscardP(DR), DR#dbRef.discardPolicy). 133-define(get_OrderRef(DR), DR#dbRef.orderRef). 134-define(get_DiscardRef(DR), DR#dbRef.orderRef). 135-define(get_DefPriority(DR), DR#dbRef.defPriority). 136-define(get_MaxEvents(DR), DR#dbRef.maxEvents). 137-define(get_DefStopT(DR), DR#dbRef.defStopT). 138-define(get_StartTsupport(DR), DR#dbRef.startTsupport). 139-define(get_StopTsupport(DR), DR#dbRef.stopTsupport). 140-define(get_GCTime(DR), DR#dbRef.gcTime). 141-define(get_GCLimit(DR), DR#dbRef.gcLimit). 142-define(get_TimeRef(DR), DR#dbRef.timeRef). 143 144-define(set_OrderP(DR, O), DR#dbRef{orderPolicy = O}). 145-define(set_DiscardP(DR, D), DR#dbRef{discardPolicy = D}). 146-define(set_OrderRef(DR, E), DR#dbRef{orderRef = E}). 147-define(set_DiscardRef(DR, E), DR#dbRef{orderRef = E}). 148-define(set_DefPriority(DR, DP), DR#dbRef{defPriority = DP}). 149-define(set_MaxEvents(DR, ME), DR#dbRef{maxEvents = ME}). 150-define(set_DefStopT(DR, DS), DR#dbRef{defStopT = DS}). 151-define(set_StartTsupport(DR, B), DR#dbRef{startTsupport = B}). 152-define(set_StopTsupport(DR, B), DR#dbRef{stopTsupport = B}). 153 154-define(is_StartTNotSupported(DR), DR#dbRef.startTsupport == false). 155-define(is_StopTNotSupported(DR), DR#dbRef.stopTsupport == false). 156-define(is_TimeoutNotUsed(DR), DR#dbRef.defStopT == 0). 157 158 159%%------------------------------------------------------------ 160%% function : status 161%% Arguments: DBRef 162%% Key - which information we want. 163%% Returns : Data related to the Key. 164%%------------------------------------------------------------ 165status(DBRef, eventCounter) -> 166 ets:info(?get_OrderRef(DBRef), size); 167status(DBRef, {batchLimit, Limit}) -> 168 case ets:info(?get_OrderRef(DBRef), size) of 169 Current when is_integer(Current) andalso Current >= Limit -> 170 ?debug_print("BATCH LIMIT (~p) REACHED, CONTAINS: ~p~n", [Limit, Current]), 171 true; 172 _Other -> 173 ?debug_print("BATCH LIMIT (~p) NOT REACHED, CONTAINS: ~p~n", 174 [Limit, _Other]), 175 false 176 end; 177status(DBRef, {batchLimit, Limit, TemporaryMax}) -> 178 case ets:info(?get_OrderRef(DBRef), size) of 179 Current when is_integer(Current) andalso Current >= TemporaryMax -> 180 ?debug_print("MAX LIMIT (~p) REACHED, CONTAINS: ~p~n", 181 [TemporaryMax, Current]), 182 true; 183 Current when is_integer(Current) andalso Current >= Limit -> 184 ?debug_print("BATCH LIMIT (~p) REACHED, CONTAINS: ~p~n", [Limit, Current]), 185 true; 186 _Other -> 187 ?debug_print("BATCH LIMIT (~p) NOT REACHED, CONTAINS: ~p~n", 188 [Limit, _Other]), 189 false 190 end; 191status(_, _) -> 192 error. 193 194 195%%------------------------------------------------------------ 196%% function : gc_events_local 197%% Arguments: DBRef 198%% Returns : 199%% Comment : This function is intended for "emergency" GC, i.e., 200%% when the DB must discard events we should first try 201%% to remove events with expired deadlines. 202%%------------------------------------------------------------ 203gc_events_local(_, _, false, _) -> 204 ok; 205gc_events_local(_, _, _, 0) -> 206 ok; 207gc_events_local(ORef, DRef, _, _) -> 208 gc_loop(ets:first(ORef), ORef, DRef). 209 210%%------------------------------------------------------------ 211%% function : gc_events 212%% Arguments: DBRef 213%% Priority - 'low', 'medium' or 'high'; will determine 214%% how important a fast gc is. 215%% Returns : 216%% Comment : This function is intended for background GC. 217%%------------------------------------------------------------ 218gc_events(DBRef, _Priority) when ?is_TimeoutNotUsed(DBRef) -> 219 ok; 220gc_events(DBRef, _Priority) when ?is_StopTNotSupported(DBRef) -> 221 ok; 222gc_events(DBRef, Priority) -> 223 TS = erlang:monotonic_time(), 224 {resolution, TR} = lists:keyfind(resolution, 1, erlang:system_info(os_monotonic_time_source)), 225 case get(oe_GC_timestamp) of 226 Num when TS > Num -> 227 put(oe_GC_timestamp, TS + ?get_GCTime(DBRef) * TR), 228 spawn_link(?MODULE, gc_start, [DBRef, Priority]); 229 _-> 230 ok 231 end. 232 233%%------------------------------------------------------------ 234%% function : gc_start 235%% Arguments: 236%% Returns : 237%%------------------------------------------------------------ 238gc_start(#dbRef{orderRef = ORef, discardRef = DRef}, Priority) -> 239 process_flag(priority, Priority), 240 gc_loop(ets:first(ORef), ORef, DRef). 241 242gc_loop('$end_of_table', _, _) -> 243 ok; 244gc_loop(Key, ORef, DRef) -> 245 [{Keys,DL,_,_,_}]=ets:lookup(ORef, Key), 246 case check_deadline(DL) of 247 true when DRef == undefined -> 248 ets:delete(ORef, Key); 249 true -> 250 ets:delete(ORef, Key), 251 gc_discard_DB(Keys, DRef); 252 _ -> 253 ok 254 end, 255 gc_loop(ets:next(ORef, Key), ORef, DRef). 256 257gc_discard_DB({Key1, Key2}, DRef) -> 258 ets:delete(DRef, {Key2, Key1}); 259gc_discard_DB({Key1, Key2, Key3}, DRef) -> 260 ets:delete(DRef, {Key3, Key2, Key1}). 261 262%%------------------------------------------------------------ 263%% function : create_FIFO_Key 264%% Arguments: 265%% Returns : 266%%------------------------------------------------------------ 267create_FIFO_Key() -> 268 {M, S, U} = erlang:timestamp(), 269 -M*1000000000000 - S*1000000 - U. 270 271%%------------------------------------------------------------ 272%% function : convert_FIFO_Key 273%% Arguments: 274%% Returns : A timestamp tuple 275%% Comment : Used when we must reuse a timestamp, i.e., only 276%% when we must reorder the DB. 277%%------------------------------------------------------------ 278convert_FIFO_Key(Key) -> 279 K = abs(Key), 280 Secs = trunc(K/1000000), 281 M = trunc(K/1000000000000), 282 S = Secs-M*1000000, 283 U = K - S*1000000-M*1000000000000, 284 {M, S, U}. 285 286%%------------------------------------------------------------ 287%% function : extract_priority 288%% Arguments: Event 289%% Defalt Value 290%% Mapping Filter Value 291%% - false value not needed (depends on QoS type) 292%% - undefined value needed but no filter associated. 293%% Returns : 294%%------------------------------------------------------------ 295extract_priority(_, _, false) -> 296 false; 297extract_priority(#'CosNotification_StructuredEvent' 298 {header = #'CosNotification_EventHeader' 299 {variable_header = VH}}, DefPriority, undefined) -> 300 extract_value(VH, ?not_Priority, DefPriority); 301%% Maybe a unstructured event. 302extract_priority(_, DefPriority, undefined) -> 303 DefPriority; 304extract_priority(_, _, PriorityOverride) -> 305 %% Must have an associated MappingFilter for Priority. 306 PriorityOverride. 307 308%%------------------------------------------------------------ 309%% function : extract_start_time 310%% Arguments: 311%% Returns : 312%%------------------------------------------------------------ 313extract_start_time(_, false, _) -> 314 false; 315extract_start_time(#'CosNotification_StructuredEvent' 316 {header = #'CosNotification_EventHeader' 317 {variable_header = VH}}, _, TRef) -> 318 ST = case extract_value(VH, ?not_StartTime, undefined) of 319 UTC when is_record(UTC, 'TimeBase_UtcT') -> 320 UTC; 321 _ -> 322 false 323 end, 324 convert_time(ST, TRef, erlang:timestamp()); 325extract_start_time(_, _, _) -> 326 false. 327 328%%------------------------------------------------------------ 329%% function : extract_deadline 330%% Arguments: Structured Event 331%% Default Timeout Value - TimeT or UtcT (see cosTime). 332%% StopTSupported - boolean(). 333%% TRef - reference to TimeService 334%% Mapping Filter Value 335%% - false eq. value not needed (depends on QoS type) 336%% - undefined eq. value needed but no filter associated. 337%% Now - used when we want to reuse old TimeStamp which 338%% must be done when changing QoS. 339%% Returns : A modified return from erlang:timestamp(). 340%%------------------------------------------------------------ 341extract_deadline(_, _, _, _, false) -> 342 false; 343extract_deadline(Event, DefaultT, StopTSupported, TRef, MappingVal) -> 344 extract_deadline(Event, DefaultT, StopTSupported, TRef, MappingVal, erlang:timestamp()). 345 346extract_deadline(_, _, _, _, false, _) -> 347 false; 348extract_deadline(#'CosNotification_StructuredEvent' 349 {header = #'CosNotification_EventHeader' 350 {variable_header = VH}}, DefaultT, StopTSupported, 351 TRef, undefined, Now) -> 352 DL = case extract_value(VH, ?not_Timeout, undefined) of 353 undefined when StopTSupported == true, TRef =/= undefined -> 354 case extract_value(VH, ?not_StopTime, undefined) of 355 undefined -> 356 DefaultT; 357 DefinedTime -> 358 DefinedTime 359 end; 360 undefined -> 361 DefaultT; 362 DefinedTime -> 363 DefinedTime 364 end, 365 convert_time(DL, TRef, Now); 366%% Maybe a unstructured event. 367extract_deadline(_, Time, _, TRef, undefined, Now) -> 368 convert_time(Time, TRef, Now); 369extract_deadline(_, _, _, TRef, DOverride, Now) -> 370 %% Must have an associated MappingFilter defining a Deadline. 371 convert_time(DOverride, TRef, Now). 372 373convert_time(0, _, _) -> 374 false; 375convert_time(UTC, TRef, {M,S,U}) when is_record(UTC, 'TimeBase_UtcT') -> 376 case catch get_time_diff(UTC, TRef) of 377 {'EXCEPTION', _} -> 378 false; 379 {'EXIT', _} -> 380 false; 381 DL -> 382 MicroSecs = round(DL/10), 383 Secs = round(MicroSecs/1000000), 384 MegaSecs = round(Secs/1000000), 385 {-M-MegaSecs, -S-Secs+MegaSecs, -U-MicroSecs+Secs} 386 end; 387convert_time(DL, _, {M,S,U}) when is_integer(DL) -> 388 MicroSecs = round(DL/10), 389 Secs = round(MicroSecs/1000000), 390 MegaSecs = round(Secs/1000000), 391 {-M-MegaSecs, -S-Secs+MegaSecs, -U-MicroSecs+Secs}; 392convert_time(_, _, _) -> 393 false. 394 395 396get_time_diff(UTC, TRef) -> 397 UTO = 'CosTime_TimeService':universal_time(TRef), 398 UTO2 = 'CosTime_TimeService':uto_from_utc(TRef, UTC), 399 TIO = 'CosTime_UTO':time_to_interval(UTO, UTO2), 400 #'TimeBase_IntervalT'{lower_bound=LB, upper_bound = UB} = 401 'CosTime_TIO':'_get_time_interval'(TIO), 402 UB-LB. 403 404check_deadline(DL) when is_tuple(DL) -> 405 {M,S,U} = erlang:timestamp(), 406 DL >= {-M,-S,-U}; 407check_deadline(_DL) -> 408 %% This case will cover if no timeout is set. 409 false. 410 411check_start_time(ST) when is_tuple(ST) -> 412 {M,S,U} = erlang:timestamp(), 413 ST >= {-M,-S,-U}; 414check_start_time(_ST) -> 415 %% This case will cover if no earliest delivery time is set. 416 true. 417 418%%------------------------------------------------------------ 419%% function : extract_value 420%% Arguments: A Property Sequence 421%% ID - wanted property string() 422%% Other - default-value. 423%% Returns : Value associated with given ID or default value. 424%%------------------------------------------------------------ 425extract_value([], _, Other) -> 426 Other; 427extract_value([#'CosNotification_Property'{name=ID, value=V}|_], ID, _) -> 428 any:get_value(V); 429extract_value([_H|T], ID, Other) -> 430 extract_value(T, ID, Other). 431 432%%------------------------------------------------------------ 433%% function : get_event 434%% Arguments: 435%% Returns : 436%%------------------------------------------------------------ 437get_event(DBRef) -> 438 get_event(DBRef, true). 439get_event(DBRef, Delete) -> 440 case get_events(DBRef, 1, Delete) of 441 {[], false} -> 442 {[], false}; 443 {[], false, Keys} -> 444 {[], false, Keys}; 445 {[Event], Bool} -> 446 {Event, Bool}; 447 {[Event], Bool, Keys} -> 448 {Event, Bool, Keys} 449 end. 450 451%%------------------------------------------------------------ 452%% function : get_events 453%% Arguments: 454%% Returns : A list of events (possibly empty) and a boolean 455%% indicating if event found. 456%% Comments : Try to extract Max events from the database. 457%%------------------------------------------------------------ 458get_events(#dbRef{orderRef = ORef, discardRef = DRef}, Max) -> 459 event_loop(ets:last(ORef), ORef, DRef, Max, [], [], true). 460 461get_events(#dbRef{orderRef = ORef, discardRef = DRef}, Max, Delete) -> 462 event_loop(ets:last(ORef), ORef, DRef, Max, [], [], Delete). 463 464event_loop('$end_of_table', _, _, _, [], _, true) -> 465 {[], false}; 466event_loop('$end_of_table', _, _, _, [], [], _) -> 467 {[], false, []}; 468event_loop('$end_of_table', _ORef, _, _, Accum, _Keys, true) -> 469 {lists:reverse(Accum), true}; 470event_loop('$end_of_table', _ORef, _, _, Accum, Keys, _) -> 471 {lists:reverse(Accum), true, Keys}; 472event_loop(_, _ORef, _, 0, [], _Keys, true) -> 473 %% Only possible if some tries to pull a sequence of 0 events. 474 %% Should we really test for this case? 475 {[], false}; 476event_loop(_, _ORef, _, 0, [], Keys, _) -> 477 {[], false, Keys}; 478event_loop(_, _ORef, _, 0, Accum, _Keys, true) -> 479 {lists:reverse(Accum), true}; 480event_loop(_, _ORef, _, 0, Accum, Keys, _) -> 481 {lists:reverse(Accum), true, Keys}; 482event_loop(Key, ORef, undefined, Left, Accum, Keys, Delete) -> 483 [{_,DL,ST,_PO,Event}]=ets:lookup(ORef, Key), 484 case check_deadline(DL) of 485 true -> 486 ets:delete(ORef, Key), 487 event_loop(ets:prev(ORef, Key), ORef, undefined, 488 Left, Accum, Keys, Delete); 489 false -> 490 case check_start_time(ST) of 491 true when Delete == true -> 492 ets:delete(ORef, Key), 493 event_loop(ets:prev(ORef, Key), ORef, undefined, 494 Left-1, [Event|Accum], Keys, Delete); 495 true -> 496 event_loop(ets:prev(ORef, Key), ORef, undefined, 497 Left-1, [Event|Accum], [{ORef, Key}|Keys], Delete); 498 false -> 499 event_loop(ets:prev(ORef, Key), ORef, undefined, 500 Left, Accum, Keys, Delete) 501 end 502 end; 503event_loop({Key1, Key2}, ORef, DRef, Left, Accum, Keys, Delete) -> 504 [{_,DL,ST,_PO,Event}]=ets:lookup(ORef, {Key1, Key2}), 505 case check_deadline(DL) of 506 true -> 507 ets:delete(ORef, {Key1, Key2}), 508 ets:delete(DRef, {Key2, Key1}), 509 event_loop(ets:prev(ORef, {Key1, Key2}), ORef, DRef, 510 Left, Accum, Keys, Delete); 511 false -> 512 case check_start_time(ST) of 513 true when Delete == true -> 514 ets:delete(ORef, {Key1, Key2}), 515 ets:delete(DRef, {Key2, Key1}), 516 event_loop(ets:prev(ORef, {Key1, Key2}), ORef, DRef, 517 Left-1, [Event|Accum], Keys, Delete); 518 true -> 519 event_loop(ets:prev(ORef, {Key1, Key2}), ORef, DRef, 520 Left-1, [Event|Accum], 521 [{ORef, {Key1, Key2}}, {DRef, {Key2, Key1}}|Keys], 522 Delete); 523 false -> 524 event_loop(ets:prev(ORef, {Key1, Key2}), ORef, DRef, 525 Left, Accum, Keys, Delete) 526 end 527 end; 528event_loop({Key1, Key2, Key3}, ORef, DRef, Left, Accum, Keys, Delete) -> 529 [{_,DL,ST,_PO,Event}]=ets:lookup(ORef, {Key1, Key2, Key3}), 530 case check_deadline(DL) of 531 true -> 532 ets:delete(ORef, {Key1, Key2, Key3}), 533 ets:delete(DRef, {Key3, Key2, Key1}), 534 event_loop(ets:prev(ORef, {Key1, Key2, Key3}), ORef, DRef, 535 Left, Accum, Keys, Delete); 536 false -> 537 case check_start_time(ST) of 538 true when Delete == true -> 539 ets:delete(ORef, {Key1, Key2, Key3}), 540 ets:delete(DRef, {Key3, Key2, Key1}), 541 event_loop(ets:prev(ORef, {Key1, Key2, Key3}), ORef, DRef, 542 Left-1, [Event|Accum], Keys, Delete); 543 true -> 544 event_loop(ets:prev(ORef, {Key1, Key2, Key3}), ORef, DRef, 545 Left-1, [Event|Accum], 546 [{ORef, {Key1, Key2, Key3}}, 547 {DRef, {Key3, Key2, Key1}}|Keys], Delete); 548 false -> 549 event_loop(ets:prev(ORef, {Key1, Key2, Key3}), ORef, DRef, 550 Left, Accum, Keys, Delete) 551 end 552 end. 553 554%%------------------------------------------------------------ 555%% function : delete_events 556%% Arguments: EventList - what's returned by get_event, get_events 557%% and add_and_get_event. 558%% Returns : 559%% Comment : Shall be invoked when it's safe to premanently remove 560%% the events found in the EventList. 561%% 562%%------------------------------------------------------------ 563delete_events([]) -> 564 ok; 565delete_events([{DB, Key}|T]) -> 566 ets:delete(DB, Key), 567 delete_events(T). 568 569%%------------------------------------------------------------ 570%% function : update 571%% Arguments: 572%% Returns : 573%% Comment : As default we shall deliver Events in Priority order. 574%% Hence, if AnyOrder set we will still deliver in 575%% Priority order. 576%%------------------------------------------------------------ 577update(undefined, _QoS) -> 578 ok; 579update(DBRef, QoS) -> 580 update(DBRef, QoS, undefined, undefined). 581 582update(DBRef, QoS, LifeFilter, PrioFilter) -> 583 case updated_order(DBRef, ?not_GetOrderPolicy(QoS)) of 584 false -> 585 case updated_discard(DBRef, ?not_GetDiscardPolicy(QoS)) of 586 false -> 587 DBR2 = ?set_DefPriority(DBRef, ?not_GetPriority(QoS)), 588 DBR3 = ?set_MaxEvents(DBR2, ?not_GetMaxEventsPerConsumer(QoS)), 589 DBR4 = ?set_DefStopT(DBR3, ?not_GetTimeout(QoS)), 590 DBR5 = ?set_StartTsupport(DBR4, ?not_GetStartTimeSupported(QoS)), 591 DBR6 = ?set_StopTsupport(DBR5, ?not_GetStopTimeSupported(QoS)), 592 case ets:info(?get_OrderRef(DBR6), size) of 593 N when N =< ?get_MaxEvents(DBR6) -> 594 %% Even if the QoS MaxEvents have been changed 595 %% we don't reach the limit. 596 DBR6; 597 N -> 598 %% The QoS MaxEvents must have been decreased. 599 discard_events(DBR6, N-?get_MaxEvents(DBR6)), 600 DBR6 601 end; 602 true -> 603 destroy_discard_db(DBRef), 604 NewDBRef = create_db(QoS, ?get_GCTime(DBRef), ?get_GCLimit(DBRef), 605 ?get_TimeRef(DBRef)), 606 move_events(DBRef, NewDBRef, ets:first(?get_OrderRef(DBRef)), 607 LifeFilter, PrioFilter) 608 end; 609 true -> 610 destroy_discard_db(DBRef), 611 NewDBRef = create_db(QoS, ?get_GCTime(DBRef), ?get_GCLimit(DBRef), 612 ?get_TimeRef(DBRef)), 613 move_events(DBRef, NewDBRef, ets:first(?get_OrderRef(DBRef)), 614 LifeFilter, PrioFilter) 615 end. 616 617updated_order(#dbRef{orderPolicy = Equal}, Equal) -> false; 618updated_order(#dbRef{orderPolicy = ?not_PriorityOrder}, ?not_AnyOrder) -> false; 619updated_order(#dbRef{orderPolicy = ?not_AnyOrder}, ?not_PriorityOrder) -> false; 620updated_order(_, _) -> true. 621 622updated_discard(#dbRef{discardPolicy = Equal}, Equal) -> false; 623updated_discard(#dbRef{discardPolicy = ?not_RejectNewEvents}, ?not_AnyOrder) -> false; 624updated_discard(#dbRef{discardPolicy = ?not_AnyOrder}, ?not_RejectNewEvents) -> false; 625updated_discard(_, _) -> true. 626 627move_events(DBRef, NewDBRef, '$end_of_table', _, _) -> 628 destroy_order_db(DBRef), 629 case ets:info(?get_OrderRef(NewDBRef), size) of 630 N when N =< ?get_MaxEvents(NewDBRef) -> 631 %% Even if the QoS MaxEvents have been changed 632 %% we don't reach the limit. 633 NewDBRef; 634 N -> 635 %% The QoS MaxEvents must have been decreased. 636 discard_events(DBRef, N-?get_MaxEvents(NewDBRef)), 637 NewDBRef 638 end; 639move_events(DBRef, NewDBRef, Key, LifeFilter, PrioFilter) -> 640 [{Keys, DeadLine, StartTime, PriorityOverride, Event}] = 641 ets:lookup(?get_OrderRef(DBRef), Key), 642 case check_deadline(DeadLine) of 643 true -> 644 ok; 645 _-> 646 write_event(?get_OrderP(DBRef), 647 {Keys, DeadLine, StartTime, PriorityOverride, Event}, 648 DBRef, NewDBRef, Key, LifeFilter, PrioFilter) 649 end, 650 ets:delete(?get_OrderRef(DBRef), Key), 651 move_events(DBRef, NewDBRef, ets:next(?get_OrderRef(DBRef), Key), 652 LifeFilter, PrioFilter). 653 654%% We cannot use do_add_event directly since we MUST lookup the timestamp (TS). 655write_event(?not_DeadlineOrder, {{_, TS, _Prio}, DL, ST, PO, Event}, _DBRef, NewDBRef, 656 _Key, _LifeFilter, _PrioFilter) -> 657 StartT = update_starttime(NewDBRef, Event, ST), 658 %% Deadline and Priority previously extracted. 659 do_add_event(NewDBRef, Event, TS, DL, StartT, PO); 660write_event(?not_DeadlineOrder, {{_, TS}, DL, _ST, PO, Event}, _DBRef, NewDBRef, 661 _Key, _LifeFilter, PrioFilter) -> 662 %% Priority not previously extracted. 663 POverride = update_priority(NewDBRef, PrioFilter, Event, PO), 664 StartT = extract_start_time(Event, ?get_StartTsupport(NewDBRef), 665 ?get_TimeRef(NewDBRef)), 666 do_add_event(NewDBRef, Event, TS, DL, StartT, POverride); 667write_event(?not_FifoOrder, {{TS, _PorD}, DL, ST, PO, Event}, _DBRef, NewDBRef, 668 _Key, LifeFilter, PrioFilter) -> 669 %% Priority or Deadline have been extracted before but we cannot tell which. 670 POverride = update_priority(NewDBRef, PrioFilter, Event, PO), 671 DeadL = update_deadline(NewDBRef, LifeFilter, Event, TS, DL), 672 StartT = update_starttime(NewDBRef, Event, ST), 673 do_add_event(NewDBRef, Event, TS, DeadL, StartT, POverride); 674write_event(?not_FifoOrder, {TS, DL, ST, PO, Event}, _DBRef, NewDBRef, 675 _Key, LifeFilter, PrioFilter) -> 676 %% Priority and Deadline not extracetd before. Do it now. 677 POverride = update_priority(NewDBRef, PrioFilter, Event, PO), 678 DeadL = update_deadline(NewDBRef, LifeFilter, Event, TS, DL), 679 StartT = update_starttime(NewDBRef, Event, ST), 680 do_add_event(NewDBRef, Event, TS, DeadL, StartT, POverride); 681%% Order Policy must be AnyOrder or PriorityOrder. 682write_event(_, {{_Prio, TS}, DL, ST, PO, Event}, _DBRef, NewDBRef, 683 _Key, LifeFilter, _PrioFilter) -> 684 DeadL = update_deadline(NewDBRef, LifeFilter, Event, TS, DL), 685 StartT = update_starttime(NewDBRef, Event, ST), 686 do_add_event(NewDBRef, Event, TS, DeadL, StartT, PO); 687write_event(_, {{_Prio, TS, DL}, DL, ST, PO, Event}, _DBRef, NewDBRef, _Key, _, _) -> 688 %% Both Deadline and Priority have been extracetd before. 689 StartT = update_starttime(NewDBRef, Event, ST), 690 do_add_event(NewDBRef, Event, TS, DL, StartT, PO). 691 692 693%%------------------------------------------------------------ 694%% function : update_priority 695%% Arguments: 696%% Returns : 697%% Comment : The purpose with this function is to avoid 698%% calling MappingFilter priority again, especially 699%% deadline again (we especially want to avoid calling 700%% since it may require intra-ORB communication. 701%% Use only when doing an update. 702%%------------------------------------------------------------ 703update_priority(DBRef, PrioFilter, Event, OldPrio) when is_atom(OldPrio) -> 704 get_prio_mapping_value(DBRef, PrioFilter, Event); 705update_priority(_DBRef, _PrioFilter, _Event, OldPrio) -> 706 OldPrio. 707 708%%------------------------------------------------------------ 709%% function : update_deadline 710%% Arguments: 711%% Returns : 712%% Comment : The purpose with this function is to avoid 713%% calling MappingFilter or parsing the events for 714%% deadline again (we especially want to avoid calling 715%% the MappingFilter since it may require intra-ORB 716%% communication. Use only when doing an update. 717%%------------------------------------------------------------ 718update_deadline(DBRef, _LifeFilter, _Event, _TS, _OldDeadL) when 719 ?get_DiscardP(DBRef) =/= ?not_DeadlineOrder, 720 ?get_OrderP(DBRef) =/= ?not_DeadlineOrder, 721 ?is_StopTNotSupported(DBRef) -> 722 %% We do not need to extract the Deadline since it will not be used. 723 false; 724update_deadline(DBRef, LifeFilter, Event, TS, OldDeadL) when is_atom(OldDeadL) -> 725 %% We need the Deadline and it have not been extracetd before. 726 DOverride = get_life_mapping_value(DBRef, LifeFilter, Event), 727 %% We must find out when the event was delivered; setting a deadline using 728 %% a new timestamp would not be accurate since we cannot tell for how long 729 %% the event have been waiting. 730 OldNow = convert_FIFO_Key(TS), 731 extract_deadline(Event, ?get_DefStopT(DBRef), ?get_StopTsupport(DBRef), 732 ?get_TimeRef(DBRef), DOverride, OldNow); 733update_deadline(_DBRef, _LifeFilter, _Event, _TS, OldDeadL) -> 734 %% We need the Deadline and it have been extracetd before. 735 OldDeadL. 736 737%%------------------------------------------------------------ 738%% function : update_starttime 739%% Arguments: 740%% Returns : 741%% Comment : The purpose with this function is to avoid 742%% parsing the events for starttime again. 743%% Use only when doing an update. 744%%------------------------------------------------------------ 745update_starttime(DBRef, Event, OldStartT) when is_atom(OldStartT) -> 746 %% Probably not previously extracted; try to get it. 747 extract_start_time(Event, ?get_StartTsupport(DBRef), ?get_TimeRef(DBRef)); 748update_starttime(_DBRef, _Event, OldStartT) -> 749 %% Previously extracted. 750 OldStartT. 751 752%%------------------------------------------------------------ 753%% function : discard_events 754%% Arguments: DBRef 755%% N - number of events we must discard. 756%% Returns : 757%% Comment : As default we shall Reject New Events when the limit 758%% is reached. Any discard order will do the same. 759%% 760%% This function can only be used for the discard policies 761%% Fifo, Priority and Deadline. Any or RejectNewEvents 762%% will not allow events to be stored at all, i.e., no events 763%% to discard. Lifo will not be stored either since when 764%% trying to add an event it is definitely the last event in. 765%%------------------------------------------------------------ 766%% Since no Discard DB must the same Order policy. 767discard_events(#dbRef{orderRef = ORef, discardRef = undefined, 768 discardPolicy = ?not_DeadlineOrder}, N) -> 769 ?debug_print("Discarding ~p events Deadline Order.",[N]), 770 index_loop_backward(ets:last(ORef), undefined, ORef, N); 771discard_events(#dbRef{orderRef = ORef, discardRef = DRef, 772 discardPolicy = ?not_DeadlineOrder}, N) -> 773 ?debug_print("Discarding ~p events Deadline Order.",[N]), 774 index_loop_backward(ets:last(DRef), DRef, ORef, N); 775%% Fifo. 776discard_events(#dbRef{orderRef = ORef, discardRef = undefined, 777 discardPolicy = ?not_FifoOrder}, N) -> 778 ?debug_print("Discarding ~p events Fifo Order.",[N]), 779 index_loop_backward(ets:last(ORef), undefined, ORef, N); 780discard_events(#dbRef{orderRef = ORef, discardRef = DRef, 781 discardPolicy = ?not_FifoOrder}, N) -> 782 ?debug_print("Discarding ~p events Fifo Order.",[N]), 783 index_loop_backward(ets:last(DRef), DRef, ORef, N); 784%% Lifo- or Priority-Order 785discard_events(#dbRef{orderRef = ORef, discardRef = undefined}, N) -> 786 ?debug_print("Discarding ~p events Lifo- or Priority-Order.",[N]), 787 index_loop_forward(ets:first(ORef), undefined, ORef, N); 788discard_events(#dbRef{orderRef = ORef, discardRef = DRef}, N) -> 789 ?debug_print("Discarding ~p events Lifo- or Priority-Order.",[N]), 790 index_loop_forward(ets:first(DRef), DRef, ORef, N). 791 792 793index_loop_forward('$end_of_table', _, _, _Left) -> 794 ok; 795index_loop_forward(_, _, _, 0) -> 796 ok; 797index_loop_forward(Key, undefined, ORef, Left) -> 798 ets:delete(ORef, Key), 799 NewKey=ets:next(ORef, Key), 800 index_loop_forward(NewKey, undefined, ORef, Left-1); 801 802index_loop_forward({Key1, Key2, Key3}, DRef, ORef, Left) -> 803 ets:delete(DRef, {Key1, Key2, Key3}), 804 ets:delete(ORef, {Key3, Key2, Key1}), 805 NewKey=ets:next(DRef, {Key1, Key2, Key3}), 806 index_loop_forward(NewKey, DRef, ORef, Left-1); 807 808index_loop_forward({Key1, Key2}, DRef, ORef, Left) -> 809 ets:delete(DRef, {Key1, Key2}), 810 ets:delete(ORef, {Key2, Key1}), 811 NewKey=ets:next(DRef, {Key1, Key2}), 812 index_loop_forward(NewKey, DRef, ORef, Left-1). 813 814index_loop_backward('$end_of_table', _, _, _) -> 815 ok; 816index_loop_backward(_, _, _, 0) -> 817 ok; 818index_loop_backward(Key, undefined, ORef, Left) -> 819 ets:delete(ORef, Key), 820 NewKey=ets:prev(ORef, Key), 821 index_loop_backward(NewKey, undefined, ORef, Left-1); 822index_loop_backward({Key1, Key2}, DRef, ORef, Left) -> 823 ets:delete(DRef, {Key1, Key2}), 824 ets:delete(ORef, {Key2, Key1}), 825 NewKey=ets:prev(DRef, {Key1, Key2}), 826 index_loop_backward(NewKey, DRef, ORef, Left-1); 827index_loop_backward({Key1, Key2, Key3}, DRef, ORef, Left) -> 828 ets:delete(DRef, {Key1, Key2, Key3}), 829 ets:delete(ORef, {Key3, Key2, Key1}), 830 NewKey=ets:prev(DRef, {Key1, Key2, Key3}), 831 index_loop_backward(NewKey, DRef, ORef, Left-1). 832 833%%------------------------------------------------------------ 834%% function : add_and_get_event 835%% Arguments: DBRef and Event 836%% Returns : {[], bool()} | {Event, bool()} 837%% Comment : This function is a mixture of ad anf get events. 838%% The intended use to avoid storing an event when 839%% not necessary. 840%%------------------------------------------------------------ 841add_and_get_event(DBRef, Event) -> 842 add_and_get_event(DBRef, Event, undefined, undefined, true). 843 844add_and_get_event(DBRef, Event, Delete) -> 845 add_and_get_event(DBRef, Event, undefined, undefined, Delete). 846 847add_and_get_event(DBRef, Event, LifeFilter, PrioFilter) -> 848 add_and_get_event(DBRef, Event, LifeFilter, PrioFilter, true). 849 850add_and_get_event(DBRef, Event, LifeFilter, PrioFilter, Delete) -> 851 case ets:info(?get_OrderRef(DBRef), size) of 852 0 when ?is_StartTNotSupported(DBRef), ?is_StopTNotSupported(DBRef), 853 Delete == true -> 854 %% No stored events and no timeouts used; just return the event. 855 {Event, false}; 856 0 when ?is_StartTNotSupported(DBRef), ?is_StopTNotSupported(DBRef) -> 857 %% No stored events and no timeouts used; just return the event. 858 {Event, false, []}; 859 0 when ?is_StartTNotSupported(DBRef) -> 860 %% Only deadline supported, lookup values and cehck if ok. 861 DOverride = get_life_mapping_value(DBRef, LifeFilter, Event), 862 DL = extract_deadline(Event, ?get_DefStopT(DBRef), 863 ?get_StopTsupport(DBRef), ?get_TimeRef(DBRef), 864 DOverride), 865 case check_deadline(DL) of 866 true when Delete == true -> 867 %% Expired, just discard the event. 868 {[], false}; 869 true -> 870 {[], false, []}; 871 _ when Delete == true -> 872 %% Not expired, we can safely return the event. 873 {Event, false}; 874 _ -> 875 %% Not expired, we can safely return the event. 876 {Event, false, []} 877 end; 878 0 when ?is_StopTNotSupported(DBRef) -> 879 %% Only starttime allowed, test if we can deliver the event now. 880 ST = extract_start_time(Event, ?get_StartTsupport(DBRef), 881 ?get_TimeRef(DBRef)), 882 case check_start_time(ST) of 883 false when Delete == true -> 884 DOverride = get_life_mapping_value(DBRef, LifeFilter, Event), 885 POverride = get_prio_mapping_value(DBRef, PrioFilter, Event), 886 DL = extract_deadline(Event, ?get_DefStopT(DBRef), 887 ?get_StopTsupport(DBRef), 888 ?get_TimeRef(DBRef), DOverride), 889 do_add_event(DBRef, Event, create_FIFO_Key(), DL, ST, POverride), 890 {[], true}; 891 false -> 892 DOverride = get_life_mapping_value(DBRef, LifeFilter, Event), 893 POverride = get_prio_mapping_value(DBRef, PrioFilter, Event), 894 DL = extract_deadline(Event, ?get_DefStopT(DBRef), 895 ?get_StopTsupport(DBRef), 896 ?get_TimeRef(DBRef), DOverride), 897 do_add_event(DBRef, Event, create_FIFO_Key(), DL, ST, POverride), 898 {[], true, []}; 899 _ when Delete == true -> 900 %% Starttime ok, just return the event. 901 {Event, false}; 902 _ -> 903 %% Starttime ok, just return the event. 904 {Event, false, []} 905 end; 906 _-> 907 %% Event already stored, just have to accept the overhead. 908 ST = extract_start_time(Event, ?get_StartTsupport(DBRef), 909 ?get_TimeRef(DBRef)), 910 DOverride = get_life_mapping_value(DBRef, LifeFilter, Event), 911 POverride = get_prio_mapping_value(DBRef, PrioFilter, Event), 912 DL = extract_deadline(Event, ?get_DefStopT(DBRef), 913 ?get_StopTsupport(DBRef), 914 ?get_TimeRef(DBRef), DOverride), 915 do_add_event(DBRef, Event, create_FIFO_Key(), DL, ST, POverride), 916 get_event(DBRef, Delete) 917 end. 918 919%%------------------------------------------------------------ 920%% function : add_event 921%% Arguments: DBRef and Event 922%% Returns : true (or whatever 'ets:insert' returns) | 923%% {'EXCEPTION', #'IMP_LIMIT'{}} 924%% Comment : As default we shall deliver Events in Priority order. 925%% Hence, if AnyOrder set we will still deliver in 926%% Priority order. But we cannot use only the Priority 927%% value since if "all" events have the same priority 928%% there is a risk that some never will be delivered if 929%% the EventDB always contain events. 930%% 931%% When discard and order policy is equal we only use one 932%% DB since all we have to do is to "read from the other 933%% end" to discard the correct event(s). 934%% 935%% In the discard DB we must also store keys necessary to 936%% lookup the event in the order DB. 937%% 938%% If event limit reached 'IMPL_LIMIT' is raised if 939%% the discard policy is RejectNewEvents or AnyOrder. 940%% Theses two policies we currently define to be equal. 941%%------------------------------------------------------------ 942 943add_event(DBRef, Event) -> 944 %% Save overhead by first checking if we really need to extract 945 %% Deadline and/or Priority. 946 Deadline = get_life_mapping_value(DBRef, undefined, Event), 947 Priority = get_prio_mapping_value(DBRef, undefined, Event), 948 add_event_helper(DBRef, Event, Deadline, Priority). 949 950add_event(DBRef, Event, LifeFilter, PrioFilter) -> 951 %% Save overhead by first checking if we really need to extract 952 %% Deadline and/or Priority. 953 Deadline = get_life_mapping_value(DBRef, LifeFilter, Event), 954 Priority = get_prio_mapping_value(DBRef, PrioFilter, Event), 955 add_event_helper(DBRef, Event, Deadline, Priority). 956 957add_event_helper(DBRef, Event, DOverride, POverride) -> 958 case ets:info(?get_OrderRef(DBRef), size) of 959 N when N < ?get_MaxEvents(DBRef), N > ?get_GCLimit(DBRef) -> 960 gc_events(DBRef, low), 961 DL = extract_deadline(Event, ?get_DefStopT(DBRef), 962 ?get_StopTsupport(DBRef), ?get_TimeRef(DBRef), 963 DOverride), 964 case check_deadline(DL) of 965 true -> 966 true; 967 _ -> 968 ST = extract_start_time(Event, ?get_StartTsupport(DBRef), 969 ?get_TimeRef(DBRef)), 970 do_add_event(DBRef, Event, create_FIFO_Key(), DL, ST, POverride) 971 end; 972 N when N < ?get_MaxEvents(DBRef) -> 973 DL = extract_deadline(Event, ?get_DefStopT(DBRef), 974 ?get_StopTsupport(DBRef), ?get_TimeRef(DBRef), 975 DOverride), 976 case check_deadline(DL) of 977 true -> 978 true; 979 _ -> 980 ST = extract_start_time(Event, ?get_StartTsupport(DBRef), 981 ?get_TimeRef(DBRef)), 982 do_add_event(DBRef, Event, create_FIFO_Key(), DL, ST, POverride) 983 end; 984 _N when ?get_DiscardP(DBRef) == ?not_RejectNewEvents -> 985 gc_events(DBRef, low), 986 corba:raise(#'IMP_LIMIT'{completion_status=?COMPLETED_NO}); 987 _N when ?get_DiscardP(DBRef) == ?not_AnyOrder -> 988 gc_events(DBRef, low), 989 corba:raise(#'IMP_LIMIT'{completion_status=?COMPLETED_NO}); 990 _N when ?get_DiscardP(DBRef) == ?not_LifoOrder -> 991 gc_events(DBRef, low), 992 corba:raise(#'IMP_LIMIT'{completion_status=?COMPLETED_NO}); 993 _N -> 994 gc_events(DBRef, low), 995 %% Other discard policy; we must first store the event 996 %% and the look up in the Discard DB which event we 997 %% should remove. 998 DL = extract_deadline(Event, ?get_DefStopT(DBRef), 999 ?get_StopTsupport(DBRef), ?get_TimeRef(DBRef), 1000 DOverride), 1001 case check_deadline(DL) of 1002 true -> 1003 true; 1004 _ -> 1005 ST = extract_start_time(Event, ?get_StartTsupport(DBRef), 1006 ?get_TimeRef(DBRef)), 1007 do_add_event(DBRef, Event, create_FIFO_Key(), DL, ST, POverride), 1008 discard_events(DBRef, 1) 1009 end 1010 end. 1011 1012 1013do_add_event(#dbRef{orderRef = ORef, orderPolicy = ?not_DeadlineOrder, 1014 discardRef = DRef, discardPolicy = ?not_PriorityOrder, 1015 defPriority = DefPrio, defStopT = _DefStopT}, Event, Key, DL, ST, PO) -> 1016 Prio = extract_priority(Event, DefPrio, PO), 1017 ets:insert(ORef, {{DL, Key, Prio}, DL, ST, PO, Event}), 1018 ets:insert(DRef, {{Prio, Key, DL}}); 1019do_add_event(#dbRef{orderRef = ORef, orderPolicy = ?not_DeadlineOrder, 1020 discardRef = DRef, discardPolicy = ?not_FifoOrder, 1021 defStopT = _DefStopT}, Event, Key, DL, ST, PO) -> 1022 ets:insert(ORef, {{DL, Key}, DL, ST, PO, Event}), 1023 ets:insert(DRef, {{Key, DL}}); 1024do_add_event(#dbRef{orderRef = ORef, orderPolicy = ?not_DeadlineOrder, 1025 discardRef = DRef, discardPolicy = ?not_LifoOrder, 1026 defStopT = _DefStopT}, Event, Key, DL, ST, PO) -> 1027 ets:insert(ORef, {{DL, Key}, DL, ST, PO, Event}), 1028 ets:insert(DRef, {{Key, DL}}); 1029%% Either the same (DeadlineOrder), RejectNewEvents or AnyOrder. No need 1030%% to store anything in the discard policy, i.e., if the same we'll just 1031%% read "from the other end" and AnyOrder and RejectNewEvents is equal. 1032do_add_event(#dbRef{orderRef = ORef, orderPolicy = ?not_DeadlineOrder, 1033 defStopT = _DefStopT}, Event, Key, DL, ST, PO) -> 1034 ets:insert(ORef, {{DL, Key}, DL, ST, PO, Event}); 1035 1036 1037do_add_event(#dbRef{orderRef = ORef, orderPolicy = ?not_FifoOrder, 1038 discardRef = DRef, discardPolicy = ?not_DeadlineOrder, 1039 defStopT = _DefStopT}, Event, Key, DL, ST, PO) -> 1040 ets:insert(ORef, {{Key, DL}, DL, ST, PO, Event}), 1041 ets:insert(DRef, {{DL, Key}}); 1042do_add_event(#dbRef{orderRef = ORef, orderPolicy = ?not_FifoOrder, 1043 discardRef = DRef, discardPolicy = ?not_PriorityOrder, 1044 defPriority = DefPrio}, Event, Key, DL, ST, PO) -> 1045 Prio = extract_priority(Event, DefPrio, PO), 1046 ets:insert(ORef, {{Key, Prio}, DL, ST, PO, Event}), 1047 ets:insert(DRef, {{Prio, Key}}); 1048%% The discard policy must RejectNewEvents, AnyOrder, Fifo or Lifo order. 1049do_add_event(#dbRef{orderRef = ORef, orderPolicy = ?not_FifoOrder, 1050 discardRef = _DRef}, Event, Key, DL, ST, PO) -> 1051 ets:insert(ORef, {Key, DL, ST, PO, Event}); 1052 1053%% Order Policy must be AnyOrder or PriorityOrder. 1054do_add_event(#dbRef{orderRef = ORef, 1055 discardRef = DRef, discardPolicy = ?not_DeadlineOrder, 1056 defPriority = DefPrio, defStopT = _DefStopT}, Event, Key, DL, ST, PO) -> 1057 Prio = extract_priority(Event, DefPrio, PO), 1058 ets:insert(ORef, {{Prio, Key, DL}, DL, ST, PO, Event}), 1059 ets:insert(DRef, {{DL, Key, Prio}}); 1060do_add_event(#dbRef{orderRef = ORef, 1061 discardRef = DRef, discardPolicy = ?not_FifoOrder, 1062 defPriority = DefPrio}, Event, Key, DL, ST, PO) -> 1063 Prio = extract_priority(Event, DefPrio, PO), 1064 ets:insert(ORef, {{Prio, Key}, DL, ST, PO, Event}), 1065 ets:insert(DRef, {{Key, Prio}}); 1066 1067do_add_event(#dbRef{orderRef = ORef, 1068 discardRef = DRef, discardPolicy = ?not_LifoOrder, 1069 defPriority = DefPrio}, Event, Key, DL, ST, PO) -> 1070 Prio = extract_priority(Event, DefPrio, PO), 1071 ets:insert(ORef, {{Prio, Key}, DL, ST, PO, Event}), 1072 ets:insert(DRef, {{Key, Prio}}); 1073 1074%% Order Policy must be AnyOrder or PriorityOrder and Discard Policy must be 1075%% AnyOrder or RejectNewEvents 1076do_add_event(#dbRef{orderRef = ORef, defPriority = DefPrio}, Event, Key, DL, ST, PO) -> 1077 Prio = extract_priority(Event, DefPrio, PO), 1078 ets:insert(ORef, {{Prio, Key}, DL, ST, PO, Event}). 1079 1080%%------------------------------------------------------------ 1081%% function : destroy_db 1082%% Arguments: A DB reference 1083%% Returns : 1084%%------------------------------------------------------------ 1085destroy_db(#dbRef{orderRef = ORef, discardRef = undefined}) -> 1086 ets:delete(ORef); 1087destroy_db(#dbRef{orderRef = ORef, discardRef = DRef}) -> 1088 ets:delete(ORef), 1089 ets:delete(DRef). 1090 1091%%------------------------------------------------------------ 1092%% function : destroy_discard_db 1093%% Arguments: A DB reference 1094%% Returns : 1095%%------------------------------------------------------------ 1096destroy_discard_db(#dbRef{discardRef = undefined}) -> 1097 ok; 1098destroy_discard_db(#dbRef{discardRef = DRef}) -> 1099 ets:delete(DRef). 1100 1101%%------------------------------------------------------------ 1102%% function : destroy_order_db 1103%% Arguments: A DB reference 1104%% Returns : 1105%%------------------------------------------------------------ 1106destroy_order_db(#dbRef{orderRef = ORef}) -> 1107 ets:delete(ORef). 1108 1109%%------------------------------------------------------------ 1110%% function : create_db 1111%% Arguments: QoS (local representation). 1112%% Returns : A DB reference 1113%%------------------------------------------------------------ 1114create_db(QoS, GCTime, GCLimit, TimeRef) -> 1115 DiscardRef = 1116 case {?not_GetDiscardPolicy(QoS), ?not_GetOrderPolicy(QoS)} of 1117 {Equal, Equal} -> 1118 undefined; 1119 {?not_PriorityOrder, ?not_AnyOrder} -> 1120 %% NOTE: Any- and Priority-Order delivery policy is equal. 1121 undefined; 1122 {?not_RejectNewEvents, _} -> 1123 undefined; 1124 {?not_AnyOrder, _} -> 1125 undefined; 1126 {?not_LifoOrder, ?not_FifoOrder} -> 1127 undefined; 1128 _ -> 1129 ets:new(oe_ets, [set, public, ordered_set]) 1130 end, 1131 DBRef = ?CreateRef(ets:new(oe_ets, [set, public, ordered_set]), 1132 DiscardRef, 1133 ?not_GetOrderPolicy(QoS), ?not_GetDiscardPolicy(QoS), 1134 ?not_GetPriority(QoS), ?not_GetMaxEventsPerConsumer(QoS), 1135 ?not_GetTimeout(QoS), ?not_GetStartTimeSupported(QoS), 1136 ?not_GetStopTimeSupported(QoS), GCTime, GCLimit, TimeRef), 1137 if 1138 ?is_TimeoutNotUsed(DBRef), ?is_StopTNotSupported(DBRef) -> 1139 ok; 1140 true -> 1141 TS = erlang:monotonic_time(), 1142 {resolution, TR} = lists:keyfind(resolution, 1, 1143 erlang:system_info(os_monotonic_time_source)), 1144 put(oe_GC_timestamp, TS+GCTime*TR) 1145 end, 1146 DBRef. 1147 1148%%------------------------------------------------------------ 1149%% function : get_prio_mapping_value 1150%% Arguments: A MappingFilter reference | undefined 1151%% Event (Any or Structured) 1152%% Returns : undefined | Data 1153%%------------------------------------------------------------ 1154get_prio_mapping_value(DBRef, _, _) when ?get_DiscardP(DBRef) =/= ?not_PriorityOrder, 1155 ?get_OrderP(DBRef) =/= ?not_AnyOrder, 1156 ?get_OrderP(DBRef) =/= ?not_PriorityOrder -> 1157 false; 1158get_prio_mapping_value(_, undefined, _) -> 1159 undefined; 1160get_prio_mapping_value(_, MFilter, Event) when is_record(Event, 'any') -> 1161 case catch 'CosNotifyFilter_MappingFilter':match(MFilter, Event) of 1162 {false, DefVal} when is_record(DefVal, 'any') -> 1163 any:get_value(DefVal); 1164 {true, Matched} when is_record(Matched, 'any') -> 1165 any:get_value(Matched); 1166 _ -> 1167 undefined 1168 end; 1169get_prio_mapping_value(_, MFilter, Event) -> 1170 case catch 'CosNotifyFilter_MappingFilter':match_structured(MFilter, Event) of 1171 {false, DefVal} when is_record(DefVal, 'any') -> 1172 any:get_value(DefVal); 1173 {true, Matched} when is_record(Matched, 'any') -> 1174 any:get_value(Matched); 1175 _ -> 1176 undefined 1177 end. 1178 1179%%------------------------------------------------------------ 1180%% function : get_life_mapping_value 1181%% Arguments: A MappingFilter reference | undefined 1182%% Event (Any or Structured) 1183%% Returns : undefined | Data 1184%%------------------------------------------------------------ 1185get_life_mapping_value(DBRef, _, _) when ?get_DiscardP(DBRef) =/= ?not_DeadlineOrder, 1186 ?get_OrderP(DBRef) =/= ?not_DeadlineOrder, 1187 ?is_StopTNotSupported(DBRef) -> 1188 false; 1189get_life_mapping_value(_, undefined, _) -> 1190 undefined; 1191get_life_mapping_value(_, MFilter, Event) when is_record(Event, 'any') -> 1192 case catch 'CosNotifyFilter_MappingFilter':match(MFilter, Event) of 1193 {false, DefVal} when is_record(DefVal, 'any') -> 1194 any:get_value(DefVal); 1195 {true, Matched} when is_record(Matched, 'any') -> 1196 any:get_value(Matched); 1197 _ -> 1198 undefined 1199 end; 1200get_life_mapping_value(_, MFilter, Event) -> 1201 case catch 'CosNotifyFilter_MappingFilter':match_structured(MFilter, Event) of 1202 {false, DefVal} when is_record(DefVal, 'any') -> 1203 any:get_value(DefVal); 1204 {true, Matched} when is_record(Matched, 'any') -> 1205 any:get_value(Matched); 1206 _ -> 1207 undefined 1208 end. 1209 1210%%------------------------------------------------------------ 1211%% function : validate_event 1212%% Arguments: Subscribe data 1213%% A sequence of Events, 'structured' or an 'any' record 1214%% A list of filter references 1215%% Status, i.e., do we have to filter the events or just check subscr. 1216%% Returns : A tuple of two lists; list1 the events that passed 1217%% and list2 the events that didn't pass. 1218%%------------------------------------------------------------ 1219validate_event(true, Events, Filters, _, 'MATCH') -> 1220 filter_events(Events, Filters, false); 1221validate_event(true, Events, _Filters, _, _) -> 1222 {Events, []}; 1223validate_event({_Which, _WC}, Event, Filters, _, 'MATCH') when is_record(Event, any) -> 1224 filter_events(Event, Filters, false); 1225validate_event({_Which, _WC}, Event, _Filters, _, _) when is_record(Event, any) -> 1226 {Event, []}; 1227validate_event({Which, WC}, Events, Filters, DBRef, 'MATCH') -> 1228 Passed=validate_event2(DBRef, Events, Which, WC, []), 1229 filter_events(Passed, Filters, true); 1230validate_event({Which, WC}, Events, _Filters, DBRef, _) -> 1231 Passed=validate_event2(DBRef, Events, Which, WC, []), 1232 {lists:reverse(Passed), []}. 1233 1234validate_event2(_, [], _, _, []) -> 1235 []; 1236validate_event2(_, [], _, _, Acc) -> 1237 Acc; 1238validate_event2(DBRef, [Event|T], Which, WC, Acc) -> 1239 ET = ((Event#'CosNotification_StructuredEvent'.header) 1240 #'CosNotification_EventHeader'.fixed_header) 1241 #'CosNotification_FixedEventHeader'.event_type, 1242 CheckList = 1243 case Which of 1244 both -> 1245 [ET]; 1246 domain -> 1247 [ET, 1248 ET#'CosNotification_EventType'{type_name=""}, 1249 ET#'CosNotification_EventType'{type_name="*"}]; 1250 type -> 1251 [ET, 1252 ET#'CosNotification_EventType'{domain_name=""}, 1253 ET#'CosNotification_EventType'{domain_name="*"}]; 1254 _ -> 1255 [ET, 1256 ET#'CosNotification_EventType'{type_name=""}, 1257 ET#'CosNotification_EventType'{type_name="*"}, 1258 ET#'CosNotification_EventType'{domain_name=""}, 1259 ET#'CosNotification_EventType'{domain_name="*"}] 1260 end, 1261 case check_subscription(DBRef, CheckList) of 1262 true -> 1263 validate_event2(DBRef, T, Which, WC, [Event|Acc]); 1264 _-> 1265 case catch cosNotification_Filter:match_types( 1266 ET#'CosNotification_EventType'.domain_name, 1267 ET#'CosNotification_EventType'.type_name, 1268 WC) of 1269 true -> 1270 validate_event2(DBRef, T, Which, WC, [Event|Acc]); 1271 _-> 1272 validate_event2(DBRef, T, Which, WC, Acc) 1273 end 1274 end. 1275 1276check_subscription(_, []) -> 1277 false; 1278check_subscription(DBRef, [H|T]) -> 1279 case ets:lookup(DBRef, H) of 1280 [] -> 1281 check_subscription(DBRef, T); 1282 _ -> 1283 true 1284 end. 1285 1286 1287%%------------------------------------------------------------ 1288%% function : filter_events 1289%% Arguments: A sequence of structured Events or #any 1290%% Returns : A tuple of two lists; list1 the events that passed 1291%% and list2 the events that didn't pass. 1292%%------------------------------------------------------------ 1293 1294filter_events(Events, []) -> 1295 {Events, []}; 1296filter_events(Events, Filters) -> 1297 filter_events(Events, Filters, [], [], false). 1298 1299filter_events(Events, [], false) -> 1300 {Events, []}; 1301filter_events(Events, [], _) -> 1302 {lists:reverse(Events), []}; 1303filter_events(Events, Filters, Reversed) -> 1304 filter_events(Events, Filters, [], [], Reversed). 1305 1306filter_events([], _, AccPassed, AccFailed, false) -> 1307 {lists:reverse(AccPassed), lists:reverse(AccFailed)}; 1308filter_events([], _, AccPassed, AccFailed, _) -> 1309 {AccPassed, AccFailed}; 1310filter_events([H|T], Filters, AccPassed, AccFailed, Reversed) -> 1311 case call_filters(Filters, H) of 1312 true -> 1313 filter_events(T, Filters, [H|AccPassed], AccFailed, Reversed); 1314 _ -> 1315 filter_events(T, Filters, AccPassed, [H|AccFailed], Reversed) 1316 end; 1317filter_events(Any, Filters, _AccPassed, _AccFailed, _Reversed) -> 1318 case call_filters(Filters, Any) of 1319 true -> 1320 {Any, []}; 1321 _ -> 1322 {[], Any} 1323 end. 1324 1325call_filters([], _) -> 1326 false; 1327call_filters([{_,H}|T], Event) when is_record(Event, any) -> 1328 case catch 'CosNotifyFilter_Filter':match(H, Event) of 1329 true -> 1330 true; 1331 _-> 1332 call_filters(T, Event) 1333 end; 1334call_filters([{_,H}|T], Event) when ?not_isConvertedAny(Event) -> 1335 case catch 'CosNotifyFilter_Filter':match(H, 1336 Event#'CosNotification_StructuredEvent'.remainder_of_body) of 1337 true -> 1338 true; 1339 _-> 1340 call_filters(T, Event) 1341 end; 1342call_filters([{_,H}|T], Event) -> 1343 case catch 'CosNotifyFilter_Filter':match_structured(H, Event) of 1344 true -> 1345 true; 1346 _-> 1347 call_filters(T, Event) 1348 end. 1349 1350 1351%%--------------- END OF MODULE ------------------------------ 1352