1%% 2%% %CopyrightBegin% 3%% 4%% Copyright Ericsson AB 2017-2020. All Rights Reserved. 5%% 6%% Licensed under the Apache License, Version 2.0 (the "License"); 7%% you may not use this file except in compliance with the License. 8%% You may obtain a copy of the License at 9%% 10%% http://www.apache.org/licenses/LICENSE-2.0 11%% 12%% Unless required by applicable law or agreed to in writing, software 13%% distributed under the License is distributed on an "AS IS" BASIS, 14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15%% See the License for the specific language governing permissions and 16%% limitations under the License. 17%% 18%% %CopyrightEnd% 19%% 20-module(logger_olp). 21-behaviour(gen_server). 22 23-include("logger_olp.hrl"). 24-include("logger_internal.hrl"). 25 26%% API 27-export([start_link/4, load/2, info/1, reset/1, stop/1, restart/1, 28 set_opts/2, get_opts/1, get_default_opts/0, get_pid/1, 29 call/2, cast/2, get_ref/0, get_ref/1]). 30 31%% gen_server and proc_lib callbacks 32-export([init/1, handle_call/3, handle_cast/2, handle_info/2, 33 terminate/2, code_change/3]). 34 35-define(OPT_KEYS,[sync_mode_qlen, 36 drop_mode_qlen, 37 flush_qlen, 38 burst_limit_enable, 39 burst_limit_max_count, 40 burst_limit_window_time, 41 overload_kill_enable, 42 overload_kill_qlen, 43 overload_kill_mem_size, 44 overload_kill_restart_after]). 45 46-export_type([olp_ref/0, options/0]). 47 48-opaque olp_ref() :: {atom(),pid(),ets:tid()}. 49 50-type options() :: logger:olp_config(). 51 52%%%----------------------------------------------------------------- 53%%% API 54 55-spec start_link(Name,Module,Args,Options) -> {ok,Pid,Olp} | {error,Reason} when 56 Name :: atom(), 57 Module :: module(), 58 Args :: term(), 59 Options :: options(), 60 Pid :: pid(), 61 Olp :: olp_ref(), 62 Reason :: term(). 63start_link(Name,Module,Args,Options0) when is_map(Options0) -> 64 Options = maps:merge(get_default_opts(),Options0), 65 case check_opts(Options) of 66 ok -> 67 proc_lib:start_link(?MODULE,init,[[Name,Module,Args,Options]]); 68 Error -> 69 Error 70 end. 71 72-spec load(Olp, Msg) -> ok when 73 Olp :: olp_ref(), 74 Msg :: term(). 75load({_Name,Pid,ModeRef},Msg) -> 76 %% If the process is getting overloaded, the message will be 77 %% synchronous instead of asynchronous (slows down the tempo of a 78 %% process causing much load). If the process is choked, drop mode 79 %% is set and no message is sent. 80 case get_mode(ModeRef) of 81 async -> 82 gen_server:cast(Pid, {'$olp_load',Msg}); 83 sync -> 84 case call(Pid, {'$olp_load',Msg}) of 85 ok -> 86 ok; 87 _Other -> 88 %% dropped or {error,busy} 89 ?observe(_Name,{dropped,1}) 90 end; 91 drop -> 92 ?observe(_Name,{dropped,1}) 93 end. 94 95-spec info(Olp) -> map() | {error, busy} when 96 Olp :: atom() | pid() | olp_ref(). 97info(Olp) -> 98 call(Olp, info). 99 100-spec reset(Olp) -> ok | {error, busy} when 101 Olp :: atom() | pid() | olp_ref(). 102reset(Olp) -> 103 call(Olp, reset). 104 105-spec stop(Olp) -> ok when 106 Olp :: atom() | pid() | olp_ref(). 107stop({_Name,Pid,_ModRef}) -> 108 stop(Pid); 109stop(Pid) -> 110 _ = gen_server:call(Pid, stop), 111 ok. 112 113-spec set_opts(Olp, Opts) -> ok | {error,term()} | {error, busy} when 114 Olp :: atom() | pid() | olp_ref(), 115 Opts :: options(). 116set_opts(Olp, Opts) -> 117 call(Olp, {set_opts,Opts}). 118 119-spec get_opts(Olp) -> options() | {error, busy} when 120 Olp :: atom() | pid() | olp_ref(). 121get_opts(Olp) -> 122 call(Olp, get_opts). 123 124-spec get_default_opts() -> options(). 125get_default_opts() -> 126 #{sync_mode_qlen => ?SYNC_MODE_QLEN, 127 drop_mode_qlen => ?DROP_MODE_QLEN, 128 flush_qlen => ?FLUSH_QLEN, 129 burst_limit_enable => ?BURST_LIMIT_ENABLE, 130 burst_limit_max_count => ?BURST_LIMIT_MAX_COUNT, 131 burst_limit_window_time => ?BURST_LIMIT_WINDOW_TIME, 132 overload_kill_enable => ?OVERLOAD_KILL_ENABLE, 133 overload_kill_qlen => ?OVERLOAD_KILL_QLEN, 134 overload_kill_mem_size => ?OVERLOAD_KILL_MEM_SIZE, 135 overload_kill_restart_after => ?OVERLOAD_KILL_RESTART_AFTER}. 136 137-spec restart(fun(() -> any())) -> ok. 138restart(Fun) -> 139 Result = 140 try Fun() 141 catch C:R:S -> 142 {error,{restart_failed,Fun,C,R,S}} 143 end, 144 ?LOG_INTERNAL(debug,#{},[{logger_olp,restart}, 145 {result,Result}]), 146 ok. 147 148-spec get_ref() -> olp_ref(). 149get_ref() -> 150 get(olp_ref). 151 152-spec get_ref(PidOrName) -> olp_ref() | {error, busy} when 153 PidOrName :: pid() | atom(). 154get_ref(PidOrName) -> 155 call(PidOrName,get_ref). 156 157-spec get_pid(olp_ref()) -> pid(). 158get_pid({_Name,Pid,_ModeRef}) -> 159 Pid. 160 161%%%=================================================================== 162%%% gen_server callbacks 163%%%=================================================================== 164 165init([Name,Module,Args,Options]) -> 166 register(Name, self()), 167 process_flag(message_queue_data, off_heap), 168 169 ?start_observation(Name), 170 171 ModeRef = {?MODULE, Name}, 172 OlpRef = {Name,self(),ModeRef}, 173 put(olp_ref,OlpRef), 174 try Module:init(Args) of 175 {ok,CBState} -> 176 set_mode(ModeRef, async), 177 T0 = ?timestamp(), 178 proc_lib:init_ack({ok,self(),OlpRef}), 179 %% Storing options in state to avoid copying 180 %% (sending) the option data with each message 181 State0 = ?merge_with_stats( 182 Options#{id => Name, 183 idle=> true, 184 module => Module, 185 mode_ref => ModeRef, 186 mode => async, 187 last_qlen => 0, 188 last_load_ts => T0, 189 burst_win_ts => T0, 190 burst_msg_count => 0, 191 cb_state => CBState}), 192 State = reset_restart_flag(State0), 193 gen_server:enter_loop(?MODULE, [], State); 194 Error -> 195 unregister(Name), 196 proc_lib:init_ack(Error) 197 catch 198 _:Error -> 199 unregister(Name), 200 proc_lib:init_ack(Error) 201 end. 202 203%% This is the synchronous load event. 204handle_call({'$olp_load', Msg}, _From, State) -> 205 {Result,State1} = do_load(Msg, call, State#{idle=>false}), 206 %% Result == ok | dropped 207 reply_return(Result,State1); 208 209handle_call(get_ref,_From,#{id:=Name,mode_ref:=ModeRef}=State) -> 210 reply_return({Name,self(),ModeRef},State); 211 212handle_call({set_opts,Opts0},_From,State) -> 213 Opts = maps:merge(maps:with(?OPT_KEYS,State),Opts0), 214 case check_opts(Opts) of 215 ok -> 216 reply_return(ok, maps:merge(State,Opts)); 217 Error -> 218 reply_return(Error, State) 219 end; 220 221handle_call(get_opts,_From,State) -> 222 reply_return(maps:with(?OPT_KEYS,State), State); 223 224handle_call(info, _From, State) -> 225 reply_return(State, State); 226 227handle_call(reset, _From, #{module:=Module,cb_state:=CBState}=State) -> 228 State1 = ?merge_with_stats(State), 229 CBState1 = try_callback_call(Module,reset_state,[CBState],CBState), 230 reply_return(ok, State1#{idle => true, 231 last_qlen => 0, 232 last_load_ts => ?timestamp(), 233 cb_state => CBState1}); 234 235handle_call(stop, _From, State) -> 236 {stop, {shutdown,stopped}, ok, State}; 237 238handle_call(Msg, From, #{module:=Module,cb_state:=CBState}=State) -> 239 case try_callback_call(Module,handle_call,[Msg, From, CBState]) of 240 {reply,Reply,CBState1} -> 241 reply_return(Reply,State#{cb_state=>CBState1}); 242 {noreply,CBState1} -> 243 noreply_return(State#{cb_state=>CBState1}); 244 {stop, Reason, Reply, CBState1} -> 245 {stop, Reason, Reply, State#{cb_state=>CBState1}}; 246 {stop, Reason, CBState1} -> 247 {stop, Reason, State#{cb_state=>CBState1}} 248 end. 249 250%% This is the asynchronous load event. 251handle_cast({'$olp_load', Msg}, State) -> 252 {_Result,State1} = do_load(Msg, cast, State#{idle=>false}), 253 noreply_return(State1); 254 255handle_cast(Msg, #{module:=Module, cb_state:=CBState} = State) -> 256 case try_callback_call(Module,handle_cast,[Msg, CBState]) of 257 {noreply,CBState1} -> 258 noreply_return(State#{cb_state=>CBState1}); 259 {stop, Reason, CBState1} -> 260 {stop, Reason, State#{cb_state=>CBState1}} 261 end. 262 263handle_info(timeout, #{mode_ref:=ModeRef} = State) -> 264 State1 = notify(idle,State), 265 State2 = maybe_notify_mode_change(async,State1), 266 {noreply, State2#{idle => true, 267 mode => set_mode(ModeRef, async), 268 burst_msg_count => 0}}; 269handle_info(Msg, #{module := Module, cb_state := CBState} = State) -> 270 case try_callback_call(Module,handle_info,[Msg, CBState]) of 271 {noreply,CBState1} -> 272 noreply_return(State#{cb_state=>CBState1}); 273 {stop, Reason, CBState1} -> 274 {stop, Reason, State#{cb_state=>CBState1}}; 275 {load,CBState1} -> 276 {_,State1} = do_load(Msg, cast, State#{idle=>false, 277 cb_state=>CBState1}), 278 noreply_return(State1) 279 end. 280 281terminate({shutdown,{overloaded,_QLen,_Mem}}, 282 #{id:=Name, module := Module, cb_state := CBState, 283 overload_kill_restart_after := RestartAfter} = State) -> 284 %% We're terminating because of an overload situation (see 285 %% kill_if_choked/3). 286 unregister(Name), %%!!!! to avoid error printout of callback crashed on stop 287 case try_callback_call(Module,terminate,[overloaded,CBState],ok) of 288 {ok,Fun} when is_function(Fun,0), is_integer(RestartAfter) -> 289 set_restart_flag(State), 290 _ = timer:apply_after(RestartAfter,?MODULE,restart,[Fun]), 291 ok; 292 _ -> 293 ok 294 end; 295terminate(Reason, #{id:=Name, module:=Module, cb_state:=CBState}) -> 296 _ = try_callback_call(Module,terminate,[Reason,CBState],ok), 297 unregister(Name), 298 ok. 299 300code_change(_OldVsn, State, _Extra) -> 301 {ok, State}. 302 303 304%%%----------------------------------------------------------------- 305%%% Internal functions 306-spec call(Olp, term()) -> term() | {error,busy} when 307 Olp :: atom() | pid() | olp_ref(). 308call({_Name, Pid, _ModeRef},Msg) -> 309 call(Pid, Msg); 310call(Server, Msg) -> 311 try 312 gen_server:call(Server, Msg) 313 catch 314 _:{timeout,_} -> {error,busy} 315 end. 316 317-spec cast(olp_ref(),term()) -> ok. 318cast({_Name, Pid, _ModeRef},Msg) -> 319 gen_server:cast(Pid, Msg). 320 321%% check for overload between every event (and set Mode to async, 322%% sync or drop accordingly), but never flush the whole mailbox 323%% before LogWindowSize events have been handled 324do_load(Msg, CallOrCast, State) -> 325 T1 = ?timestamp(), 326 State1 = ?update_time(T1,State), 327 328 %% check if the process is getting overloaded, or if it's 329 %% recovering from overload (the check must be done for each 330 %% event to react quickly to large bursts of events and 331 %% to ensure that the handler can never end up in drop mode 332 %% with an empty mailbox, which would stop operation) 333 {Mode1,QLen,Mem,State2} = check_load(State1), 334 335 %% kill the handler if it can't keep up with the load 336 kill_if_choked(QLen, Mem, State2), 337 338 if Mode1 == flush -> 339 flush(T1, State2); 340 true -> 341 handle_load(Mode1, T1, Msg, CallOrCast, State2) 342 end. 343 344%% this function is called by do_load/3 after an overload check 345%% has been performed, where QLen > FlushQLen 346flush(T1, State=#{id := _Name, last_load_ts := _T0, mode_ref := ModeRef}) -> 347 %% flush load messages in the mailbox (a limited number in order 348 %% to not cause long delays) 349 NewFlushed = flush_load(?FLUSH_MAX_N), 350 351 %% write info in log about flushed messages 352 State1=notify({flushed,NewFlushed},State), 353 354 %% because of the receive loop when flushing messages, the 355 %% handler will be scheduled out often and the mailbox could 356 %% grow very large, so we'd better check the queue again here 357 {_,QLen1} = process_info(self(), message_queue_len), 358 ?observe(_Name,{max_qlen,QLen1}), 359 360 %% Add 1 for the current log event 361 ?observe(_Name,{flushed,NewFlushed+1}), 362 363 State2 = ?update_max_time(?diff_time(T1,_T0),State1), 364 State3 = ?update_max_qlen(QLen1,State2), 365 State4 = maybe_notify_mode_change(async,State3), 366 {dropped,?update_other(flushed,FLUSHED,NewFlushed, 367 State4#{mode => set_mode(ModeRef,async), 368 last_qlen => QLen1, 369 last_load_ts => T1})}. 370 371%% this function is called to actually handle the message 372handle_load(Mode, T1, Msg, _CallOrCast, 373 State = #{id := _Name, 374 module := Module, 375 cb_state := CBState, 376 last_qlen := LastQLen, 377 last_load_ts := _T0}) -> 378 %% check if we need to limit the number of writes 379 %% during a burst of log events 380 {DoWrite,State1} = limit_burst(State), 381 382 {Result,LastQLen1,CBState1} = 383 if DoWrite -> 384 ?observe(_Name,{_CallOrCast,1}), 385 CBS = try_callback_call(Module,handle_load,[Msg,CBState]), 386 {ok,element(2, process_info(self(), message_queue_len)),CBS}; 387 true -> 388 ?observe(_Name,{flushed,1}), 389 {dropped,LastQLen,CBState} 390 end, 391 State2 = State1#{cb_state=>CBState1}, 392 393 State3 = State2#{mode => Mode}, 394 State4 = ?update_calls_or_casts(_CallOrCast,1,State3), 395 State5 = ?update_max_qlen(LastQLen1,State4), 396 State6 = 397 ?update_max_time(?diff_time(T1,_T0), 398 State5#{last_qlen := LastQLen1, 399 last_load_ts => T1}), 400 State7 = case Result of 401 ok -> 402 S = ?update_freq(T1,State6), 403 ?update_other(writes,WRITES,1,S); 404 _ -> 405 State6 406 end, 407 {Result,State7}. 408 409 410%%%----------------------------------------------------------------- 411%%% Check that the options are valid 412check_opts(Options) when is_map(Options) -> 413 case do_check_opts(maps:to_list(Options)) of 414 ok -> 415 case overload_levels_ok(Options) of 416 true -> 417 ok; 418 false -> 419 Faulty = maps:with([sync_mode_qlen, 420 drop_mode_qlen, 421 flush_qlen],Options), 422 {error,{invalid_olp_levels,Faulty}} 423 end; 424 {error,Key,Value} -> 425 {error,{invalid_olp_config,#{Key=>Value}}} 426 end. 427 428do_check_opts([{sync_mode_qlen,N}|Options]) when is_integer(N) -> 429 do_check_opts(Options); 430do_check_opts([{drop_mode_qlen,N}|Options]) when is_integer(N) -> 431 do_check_opts(Options); 432do_check_opts([{flush_qlen,N}|Options]) when is_integer(N) -> 433 do_check_opts(Options); 434do_check_opts([{burst_limit_enable,Bool}|Options]) when is_boolean(Bool) -> 435 do_check_opts(Options); 436do_check_opts([{burst_limit_max_count,N}|Options]) when is_integer(N) -> 437 do_check_opts(Options); 438do_check_opts([{burst_limit_window_time,N}|Options]) when is_integer(N) -> 439 do_check_opts(Options); 440do_check_opts([{overload_kill_enable,Bool}|Options]) when is_boolean(Bool) -> 441 do_check_opts(Options); 442do_check_opts([{overload_kill_qlen,N}|Options]) when is_integer(N) -> 443 do_check_opts(Options); 444do_check_opts([{overload_kill_mem_size,N}|Options]) when is_integer(N) -> 445 do_check_opts(Options); 446do_check_opts([{overload_kill_restart_after,NorA}|Options]) 447 when is_integer(NorA); NorA == infinity -> 448 do_check_opts(Options); 449do_check_opts([{Key,Value}|_]) -> 450 {error,Key,Value}; 451do_check_opts([]) -> 452 ok. 453 454set_restart_flag(#{id := Name, module := Module}) -> 455 Flag = list_to_atom(lists:concat([Module,"_",Name,"_restarting"])), 456 spawn(fun() -> 457 register(Flag, self()), 458 timer:sleep(infinity) 459 end), 460 ok. 461 462reset_restart_flag(#{id := Name, module := Module} = State) -> 463 Flag = list_to_atom(lists:concat([Module,"_",Name,"_restarting"])), 464 case whereis(Flag) of 465 undefined -> 466 State; 467 Pid -> 468 exit(Pid, kill), 469 notify(restart,State) 470 end. 471 472check_load(State = #{id:=_Name, mode_ref := ModeRef, mode := Mode, 473 sync_mode_qlen := SyncModeQLen, 474 drop_mode_qlen := DropModeQLen, 475 flush_qlen := FlushQLen}) -> 476 {_,Mem} = process_info(self(), memory), 477 ?observe(_Name,{max_mem,Mem}), 478 {_,QLen} = process_info(self(), message_queue_len), 479 ?observe(_Name,{max_qlen,QLen}), 480 %% When the handler process gets scheduled in, it's impossible 481 %% to predict the QLen. We could jump "up" arbitrarily from say 482 %% async to sync, async to drop, sync to flush, etc. However, when 483 %% the handler process manages the log events (without flushing), 484 %% one after the other, we will move "down" from drop to sync and 485 %% from sync to async. This way we don't risk getting stuck in 486 %% drop or sync mode with an empty mailbox. 487 {Mode1,_NewDrops,_NewFlushes} = 488 if 489 QLen >= FlushQLen -> 490 {flush, 0,1}; 491 QLen >= DropModeQLen -> 492 %% Note that drop mode will force load messages to 493 %% be dropped on the client side (never sent to 494 %% the olp process). 495 IncDrops = if Mode == drop -> 0; true -> 1 end, 496 {set_mode(ModeRef, drop), IncDrops,0}; 497 QLen >= SyncModeQLen -> 498 {set_mode(ModeRef, sync), 0,0}; 499 true -> 500 {set_mode(ModeRef, async), 0,0} 501 end, 502 State1 = ?update_other(drops,DROPS,_NewDrops,State), 503 State2 = ?update_max_qlen(QLen,State1), 504 State3 = ?update_max_mem(Mem,State2), 505 State4 = maybe_notify_mode_change(Mode1,State3), 506 {Mode1, QLen, Mem, 507 ?update_other(flushes,FLUSHES,_NewFlushes, 508 State4#{last_qlen => QLen})}. 509 510limit_burst(#{burst_limit_enable := false}=State) -> 511 {true,State}; 512limit_burst(#{burst_win_ts := BurstWinT0, 513 burst_msg_count := BurstMsgCount, 514 burst_limit_window_time := BurstLimitWinTime, 515 burst_limit_max_count := BurstLimitMaxCnt} = State) -> 516 if (BurstMsgCount >= BurstLimitMaxCnt) -> 517 %% the limit for allowed messages has been reached 518 BurstWinT1 = ?timestamp(), 519 case ?diff_time(BurstWinT1,BurstWinT0) of 520 BurstCheckTime when BurstCheckTime < (BurstLimitWinTime*1000) -> 521 %% we're still within the burst time frame 522 {false,?update_other(burst_drops,BURSTS,1,State)}; 523 _BurstCheckTime -> 524 %% burst time frame passed, reset counters 525 {true,State#{burst_win_ts => BurstWinT1, 526 burst_msg_count => 0}} 527 end; 528 true -> 529 %% the limit for allowed messages not yet reached 530 {true,State#{burst_win_ts => BurstWinT0, 531 burst_msg_count => BurstMsgCount+1}} 532 end. 533 534kill_if_choked(QLen, Mem, #{overload_kill_enable := KillIfOL, 535 overload_kill_qlen := OLKillQLen, 536 overload_kill_mem_size := OLKillMem}) -> 537 if KillIfOL andalso 538 ((QLen > OLKillQLen) orelse (Mem > OLKillMem)) -> 539 exit({shutdown,{overloaded,QLen,Mem}}); 540 true -> 541 ok 542 end. 543 544flush_load(Limit) -> 545 process_flag(priority, high), 546 Flushed = flush_load(0, Limit), 547 process_flag(priority, normal), 548 Flushed. 549 550flush_load(Limit, Limit) -> 551 Limit; 552flush_load(N, Limit) -> 553 %% flush log events but leave other events, such as info, reset 554 %% and stop, so that these have a chance to be processed even 555 %% under heavy load 556 receive 557 {'$gen_cast',{'$olp_load',_}} -> 558 flush_load(N+1, Limit); 559 {'$gen_call',{Pid,MRef},{'$olp_load',_}} -> 560 Pid ! {MRef, dropped}, 561 flush_load(N+1, Limit); 562 {log,_,_,_,_} -> 563 flush_load(N+1, Limit); 564 {log,_,_,_} -> 565 flush_load(N+1, Limit) 566 after 567 0 -> N 568 end. 569 570overload_levels_ok(Options) -> 571 SMQL = maps:get(sync_mode_qlen, Options, ?SYNC_MODE_QLEN), 572 DMQL = maps:get(drop_mode_qlen, Options, ?DROP_MODE_QLEN), 573 FQL = maps:get(flush_qlen, Options, ?FLUSH_QLEN), 574 (DMQL > 1) andalso (SMQL =< DMQL) andalso (DMQL =< FQL). 575 576get_mode(Ref) -> persistent_term:get(Ref, async). 577 578set_mode(Ref, M) -> 579 true = is_atom(M), persistent_term:put(Ref, M), M. 580 581maybe_notify_mode_change(drop,#{mode:=Mode0}=State) 582 when Mode0=/=drop -> 583 notify({mode_change,Mode0,drop},State); 584maybe_notify_mode_change(Mode1,#{mode:=drop}=State) 585 when Mode1==async; Mode1==sync -> 586 notify({mode_change,drop,Mode1},State); 587maybe_notify_mode_change(_,State) -> 588 State. 589 590notify(Note,#{module:=Module,cb_state:=CBState}=State) -> 591 CBState1 = try_callback_call(Module,notify,[Note,CBState],CBState), 592 State#{cb_state=>CBState1}. 593 594try_callback_call(Module, Function, Args) -> 595 try_callback_call(Module, Function, Args, '$no_default_return'). 596 597try_callback_call(Module, Function, Args, DefRet) -> 598 try apply(Module, Function, Args) 599 catch 600 throw:R -> R; 601 error:undef:S when DefRet=/='$no_default_return' -> 602 case S of 603 [{Module,Function,Args,_}|_] -> 604 DefRet; 605 _ -> 606 erlang:raise(error,undef,S) 607 end 608 end. 609 610noreply_return(#{idle:=true}=State) -> 611 {noreply,State}; 612noreply_return(#{idle:=false}=State) -> 613 {noreply,State,?IDLE_DETECT_TIME}. 614 615reply_return(Reply,#{idle:=true}=State) -> 616 {reply,Reply,State}; 617reply_return(Reply,#{idle:=false}=State) -> 618 {reply,Reply,State,?IDLE_DETECT_TIME}. 619