1%%
2%% %CopyrightBegin%
3%%
4%% Copyright Ericsson AB 2017-2020. All Rights Reserved.
5%%
6%% Licensed under the Apache License, Version 2.0 (the "License");
7%% you may not use this file except in compliance with the License.
8%% You may obtain a copy of the License at
9%%
10%%     http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing, software
13%% distributed under the License is distributed on an "AS IS" BASIS,
14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15%% See the License for the specific language governing permissions and
16%% limitations under the License.
17%%
18%% %CopyrightEnd%
19%%
20-module(logger_olp).
21-behaviour(gen_server).
22
23-include("logger_olp.hrl").
24-include("logger_internal.hrl").
25
26%% API
27-export([start_link/4, load/2, info/1, reset/1, stop/1, restart/1,
28         set_opts/2, get_opts/1, get_default_opts/0, get_pid/1,
29         call/2, cast/2, get_ref/0, get_ref/1]).
30
31%% gen_server and proc_lib callbacks
32-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
33         terminate/2, code_change/3]).
34
35-define(OPT_KEYS,[sync_mode_qlen,
36                  drop_mode_qlen,
37                  flush_qlen,
38                  burst_limit_enable,
39                  burst_limit_max_count,
40                  burst_limit_window_time,
41                  overload_kill_enable,
42                  overload_kill_qlen,
43                  overload_kill_mem_size,
44                  overload_kill_restart_after]).
45
46-export_type([olp_ref/0, options/0]).
47
48-opaque olp_ref() :: {atom(),pid(),ets:tid()}.
49
50-type options() :: logger:olp_config().
51
52%%%-----------------------------------------------------------------
53%%% API
54
55-spec start_link(Name,Module,Args,Options) -> {ok,Pid,Olp} | {error,Reason} when
56      Name :: atom(),
57      Module :: module(),
58      Args :: term(),
59      Options :: options(),
60      Pid :: pid(),
61      Olp :: olp_ref(),
62      Reason :: term().
63start_link(Name,Module,Args,Options0) when is_map(Options0) ->
64    Options = maps:merge(get_default_opts(),Options0),
65    case check_opts(Options) of
66        ok ->
67            proc_lib:start_link(?MODULE,init,[[Name,Module,Args,Options]]);
68        Error ->
69            Error
70    end.
71
72-spec load(Olp, Msg) -> ok when
73      Olp :: olp_ref(),
74      Msg :: term().
75load({_Name,Pid,ModeRef},Msg) ->
76    %% If the process is getting overloaded, the message will be
77    %% synchronous instead of asynchronous (slows down the tempo of a
78    %% process causing much load). If the process is choked, drop mode
79    %% is set and no message is sent.
80    case get_mode(ModeRef) of
81        async ->
82            gen_server:cast(Pid, {'$olp_load',Msg});
83        sync ->
84            case call(Pid, {'$olp_load',Msg}) of
85                ok ->
86                    ok;
87                _Other ->
88                    %% dropped or {error,busy}
89                    ?observe(_Name,{dropped,1})
90            end;
91        drop ->
92            ?observe(_Name,{dropped,1})
93    end.
94
95-spec info(Olp) -> map() | {error, busy} when
96      Olp :: atom() | pid() | olp_ref().
97info(Olp) ->
98    call(Olp, info).
99
100-spec reset(Olp) -> ok | {error, busy} when
101      Olp :: atom() | pid() | olp_ref().
102reset(Olp) ->
103    call(Olp, reset).
104
105-spec stop(Olp) -> ok when
106      Olp :: atom() | pid() | olp_ref().
107stop({_Name,Pid,_ModRef}) ->
108    stop(Pid);
109stop(Pid) ->
110    _ = gen_server:call(Pid, stop),
111    ok.
112
113-spec set_opts(Olp, Opts) -> ok | {error,term()} | {error, busy} when
114      Olp :: atom() | pid() | olp_ref(),
115      Opts :: options().
116set_opts(Olp, Opts) ->
117    call(Olp, {set_opts,Opts}).
118
119-spec get_opts(Olp) -> options() | {error, busy} when
120      Olp :: atom() | pid() | olp_ref().
121get_opts(Olp) ->
122    call(Olp, get_opts).
123
124-spec get_default_opts() -> options().
125get_default_opts() ->
126    #{sync_mode_qlen              => ?SYNC_MODE_QLEN,
127      drop_mode_qlen              => ?DROP_MODE_QLEN,
128      flush_qlen                  => ?FLUSH_QLEN,
129      burst_limit_enable          => ?BURST_LIMIT_ENABLE,
130      burst_limit_max_count       => ?BURST_LIMIT_MAX_COUNT,
131      burst_limit_window_time     => ?BURST_LIMIT_WINDOW_TIME,
132      overload_kill_enable        => ?OVERLOAD_KILL_ENABLE,
133      overload_kill_qlen          => ?OVERLOAD_KILL_QLEN,
134      overload_kill_mem_size      => ?OVERLOAD_KILL_MEM_SIZE,
135      overload_kill_restart_after => ?OVERLOAD_KILL_RESTART_AFTER}.
136
137-spec restart(fun(() -> any())) -> ok.
138restart(Fun) ->
139    Result =
140        try Fun()
141        catch C:R:S ->
142                {error,{restart_failed,Fun,C,R,S}}
143        end,
144    ?LOG_INTERNAL(debug,#{},[{logger_olp,restart},
145                             {result,Result}]),
146    ok.
147
148-spec get_ref() -> olp_ref().
149get_ref() ->
150    get(olp_ref).
151
152-spec get_ref(PidOrName) -> olp_ref() | {error, busy} when
153      PidOrName :: pid() | atom().
154get_ref(PidOrName) ->
155    call(PidOrName,get_ref).
156
157-spec get_pid(olp_ref()) -> pid().
158get_pid({_Name,Pid,_ModeRef}) ->
159    Pid.
160
161%%%===================================================================
162%%% gen_server callbacks
163%%%===================================================================
164
165init([Name,Module,Args,Options]) ->
166    register(Name, self()),
167    process_flag(message_queue_data, off_heap),
168
169    ?start_observation(Name),
170
171    ModeRef = {?MODULE, Name},
172    OlpRef = {Name,self(),ModeRef},
173    put(olp_ref,OlpRef),
174    try Module:init(Args) of
175        {ok,CBState} ->
176            set_mode(ModeRef, async),
177            T0 = ?timestamp(),
178            proc_lib:init_ack({ok,self(),OlpRef}),
179            %% Storing options in state to avoid copying
180            %% (sending) the option data with each message
181            State0 = ?merge_with_stats(
182                        Options#{id => Name,
183                                 idle=> true,
184                                 module => Module,
185                                 mode_ref => ModeRef,
186                                 mode => async,
187                                 last_qlen => 0,
188                                 last_load_ts => T0,
189                                 burst_win_ts => T0,
190                                 burst_msg_count => 0,
191                                 cb_state => CBState}),
192            State = reset_restart_flag(State0),
193            gen_server:enter_loop(?MODULE, [], State);
194        Error ->
195            unregister(Name),
196            proc_lib:init_ack(Error)
197    catch
198        _:Error ->
199            unregister(Name),
200            proc_lib:init_ack(Error)
201    end.
202
203%% This is the synchronous load event.
204handle_call({'$olp_load', Msg}, _From, State) ->
205    {Result,State1} = do_load(Msg, call, State#{idle=>false}),
206    %% Result == ok | dropped
207    reply_return(Result,State1);
208
209handle_call(get_ref,_From,#{id:=Name,mode_ref:=ModeRef}=State) ->
210    reply_return({Name,self(),ModeRef},State);
211
212handle_call({set_opts,Opts0},_From,State) ->
213    Opts = maps:merge(maps:with(?OPT_KEYS,State),Opts0),
214    case check_opts(Opts) of
215        ok ->
216            reply_return(ok, maps:merge(State,Opts));
217        Error ->
218            reply_return(Error, State)
219    end;
220
221handle_call(get_opts,_From,State) ->
222    reply_return(maps:with(?OPT_KEYS,State), State);
223
224handle_call(info, _From, State) ->
225    reply_return(State, State);
226
227handle_call(reset, _From, #{module:=Module,cb_state:=CBState}=State) ->
228    State1 = ?merge_with_stats(State),
229    CBState1 = try_callback_call(Module,reset_state,[CBState],CBState),
230    reply_return(ok, State1#{idle => true,
231                             last_qlen => 0,
232                             last_load_ts => ?timestamp(),
233                             cb_state => CBState1});
234
235handle_call(stop, _From, State) ->
236    {stop, {shutdown,stopped}, ok, State};
237
238handle_call(Msg, From, #{module:=Module,cb_state:=CBState}=State) ->
239    case try_callback_call(Module,handle_call,[Msg, From, CBState]) of
240        {reply,Reply,CBState1} ->
241            reply_return(Reply,State#{cb_state=>CBState1});
242        {noreply,CBState1} ->
243            noreply_return(State#{cb_state=>CBState1});
244        {stop, Reason, Reply, CBState1} ->
245            {stop, Reason, Reply, State#{cb_state=>CBState1}};
246        {stop, Reason, CBState1} ->
247            {stop, Reason, State#{cb_state=>CBState1}}
248    end.
249
250%% This is the asynchronous load event.
251handle_cast({'$olp_load', Msg}, State) ->
252    {_Result,State1} = do_load(Msg, cast, State#{idle=>false}),
253    noreply_return(State1);
254
255handle_cast(Msg, #{module:=Module, cb_state:=CBState} = State) ->
256    case try_callback_call(Module,handle_cast,[Msg, CBState]) of
257        {noreply,CBState1} ->
258            noreply_return(State#{cb_state=>CBState1});
259        {stop, Reason, CBState1} ->
260            {stop, Reason, State#{cb_state=>CBState1}}
261    end.
262
263handle_info(timeout, #{mode_ref:=ModeRef} = State) ->
264    State1 = notify(idle,State),
265    State2 = maybe_notify_mode_change(async,State1),
266    {noreply, State2#{idle => true,
267                      mode => set_mode(ModeRef, async),
268                      burst_msg_count => 0}};
269handle_info(Msg, #{module := Module, cb_state := CBState} = State) ->
270    case try_callback_call(Module,handle_info,[Msg, CBState]) of
271        {noreply,CBState1} ->
272            noreply_return(State#{cb_state=>CBState1});
273        {stop, Reason, CBState1} ->
274            {stop, Reason, State#{cb_state=>CBState1}};
275        {load,CBState1} ->
276            {_,State1} = do_load(Msg, cast, State#{idle=>false,
277                                                   cb_state=>CBState1}),
278            noreply_return(State1)
279    end.
280
281terminate({shutdown,{overloaded,_QLen,_Mem}},
282          #{id:=Name, module := Module, cb_state := CBState,
283            overload_kill_restart_after := RestartAfter} = State) ->
284    %% We're terminating because of an overload situation (see
285    %% kill_if_choked/3).
286    unregister(Name), %%!!!! to avoid error printout of callback crashed on stop
287    case try_callback_call(Module,terminate,[overloaded,CBState],ok) of
288        {ok,Fun} when is_function(Fun,0), is_integer(RestartAfter) ->
289            set_restart_flag(State),
290            _ = timer:apply_after(RestartAfter,?MODULE,restart,[Fun]),
291            ok;
292        _ ->
293            ok
294    end;
295terminate(Reason, #{id:=Name, module:=Module, cb_state:=CBState}) ->
296    _ = try_callback_call(Module,terminate,[Reason,CBState],ok),
297    unregister(Name),
298    ok.
299
300code_change(_OldVsn, State, _Extra) ->
301    {ok, State}.
302
303
304%%%-----------------------------------------------------------------
305%%% Internal functions
306-spec call(Olp, term()) -> term() | {error,busy} when
307      Olp :: atom() | pid() | olp_ref().
308call({_Name, Pid, _ModeRef},Msg) ->
309    call(Pid, Msg);
310call(Server, Msg) ->
311    try
312        gen_server:call(Server, Msg)
313    catch
314        _:{timeout,_} -> {error,busy}
315    end.
316
317-spec cast(olp_ref(),term()) -> ok.
318cast({_Name, Pid, _ModeRef},Msg) ->
319    gen_server:cast(Pid, Msg).
320
321%% check for overload between every event (and set Mode to async,
322%% sync or drop accordingly), but never flush the whole mailbox
323%% before LogWindowSize events have been handled
324do_load(Msg, CallOrCast, State) ->
325    T1 = ?timestamp(),
326    State1 = ?update_time(T1,State),
327
328    %% check if the process is getting overloaded, or if it's
329    %% recovering from overload (the check must be done for each
330    %% event to react quickly to large bursts of events and
331    %% to ensure that the handler can never end up in drop mode
332    %% with an empty mailbox, which would stop operation)
333    {Mode1,QLen,Mem,State2} = check_load(State1),
334
335    %% kill the handler if it can't keep up with the load
336    kill_if_choked(QLen, Mem, State2),
337
338    if Mode1 == flush ->
339            flush(T1, State2);
340       true ->
341            handle_load(Mode1, T1, Msg, CallOrCast, State2)
342    end.
343
344%% this function is called by do_load/3 after an overload check
345%% has been performed, where QLen > FlushQLen
346flush(T1, State=#{id := _Name, last_load_ts := _T0, mode_ref := ModeRef}) ->
347    %% flush load messages in the mailbox (a limited number in order
348    %% to not cause long delays)
349    NewFlushed = flush_load(?FLUSH_MAX_N),
350
351    %% write info in log about flushed messages
352    State1=notify({flushed,NewFlushed},State),
353
354    %% because of the receive loop when flushing messages, the
355    %% handler will be scheduled out often and the mailbox could
356    %% grow very large, so we'd better check the queue again here
357    {_,QLen1} = process_info(self(), message_queue_len),
358    ?observe(_Name,{max_qlen,QLen1}),
359
360    %% Add 1 for the current log event
361    ?observe(_Name,{flushed,NewFlushed+1}),
362
363    State2 = ?update_max_time(?diff_time(T1,_T0),State1),
364    State3 = ?update_max_qlen(QLen1,State2),
365    State4 = maybe_notify_mode_change(async,State3),
366    {dropped,?update_other(flushed,FLUSHED,NewFlushed,
367                           State4#{mode => set_mode(ModeRef,async),
368                                   last_qlen => QLen1,
369                                   last_load_ts => T1})}.
370
371%% this function is called to actually handle the message
372handle_load(Mode, T1, Msg, _CallOrCast,
373      State = #{id := _Name,
374                module := Module,
375                cb_state := CBState,
376                last_qlen := LastQLen,
377                last_load_ts := _T0}) ->
378    %% check if we need to limit the number of writes
379    %% during a burst of log events
380    {DoWrite,State1} = limit_burst(State),
381
382    {Result,LastQLen1,CBState1} =
383        if DoWrite ->
384                ?observe(_Name,{_CallOrCast,1}),
385                CBS = try_callback_call(Module,handle_load,[Msg,CBState]),
386                {ok,element(2, process_info(self(), message_queue_len)),CBS};
387           true ->
388                ?observe(_Name,{flushed,1}),
389                {dropped,LastQLen,CBState}
390        end,
391    State2 = State1#{cb_state=>CBState1},
392
393    State3 = State2#{mode => Mode},
394    State4 = ?update_calls_or_casts(_CallOrCast,1,State3),
395    State5 = ?update_max_qlen(LastQLen1,State4),
396    State6 =
397        ?update_max_time(?diff_time(T1,_T0),
398                         State5#{last_qlen := LastQLen1,
399                                 last_load_ts => T1}),
400    State7 = case Result of
401                 ok ->
402                     S = ?update_freq(T1,State6),
403                     ?update_other(writes,WRITES,1,S);
404                 _ ->
405                     State6
406             end,
407    {Result,State7}.
408
409
410%%%-----------------------------------------------------------------
411%%% Check that the options are valid
412check_opts(Options) when is_map(Options) ->
413    case do_check_opts(maps:to_list(Options)) of
414        ok ->
415            case overload_levels_ok(Options) of
416                true ->
417                    ok;
418                false ->
419                    Faulty = maps:with([sync_mode_qlen,
420                                        drop_mode_qlen,
421                                        flush_qlen],Options),
422                    {error,{invalid_olp_levels,Faulty}}
423            end;
424        {error,Key,Value} ->
425            {error,{invalid_olp_config,#{Key=>Value}}}
426    end.
427
428do_check_opts([{sync_mode_qlen,N}|Options]) when is_integer(N) ->
429    do_check_opts(Options);
430do_check_opts([{drop_mode_qlen,N}|Options]) when is_integer(N) ->
431    do_check_opts(Options);
432do_check_opts([{flush_qlen,N}|Options]) when is_integer(N) ->
433    do_check_opts(Options);
434do_check_opts([{burst_limit_enable,Bool}|Options]) when is_boolean(Bool) ->
435    do_check_opts(Options);
436do_check_opts([{burst_limit_max_count,N}|Options]) when is_integer(N) ->
437    do_check_opts(Options);
438do_check_opts([{burst_limit_window_time,N}|Options]) when is_integer(N) ->
439    do_check_opts(Options);
440do_check_opts([{overload_kill_enable,Bool}|Options]) when is_boolean(Bool) ->
441    do_check_opts(Options);
442do_check_opts([{overload_kill_qlen,N}|Options]) when is_integer(N) ->
443    do_check_opts(Options);
444do_check_opts([{overload_kill_mem_size,N}|Options]) when is_integer(N) ->
445    do_check_opts(Options);
446do_check_opts([{overload_kill_restart_after,NorA}|Options])
447  when is_integer(NorA); NorA == infinity ->
448    do_check_opts(Options);
449do_check_opts([{Key,Value}|_]) ->
450    {error,Key,Value};
451do_check_opts([]) ->
452    ok.
453
454set_restart_flag(#{id := Name, module := Module}) ->
455    Flag = list_to_atom(lists:concat([Module,"_",Name,"_restarting"])),
456    spawn(fun() ->
457                  register(Flag, self()),
458                  timer:sleep(infinity)
459          end),
460    ok.
461
462reset_restart_flag(#{id := Name, module := Module} = State) ->
463    Flag = list_to_atom(lists:concat([Module,"_",Name,"_restarting"])),
464    case whereis(Flag) of
465        undefined ->
466            State;
467        Pid ->
468            exit(Pid, kill),
469            notify(restart,State)
470    end.
471
472check_load(State = #{id:=_Name, mode_ref := ModeRef, mode := Mode,
473                     sync_mode_qlen := SyncModeQLen,
474                     drop_mode_qlen := DropModeQLen,
475                     flush_qlen := FlushQLen}) ->
476    {_,Mem} = process_info(self(), memory),
477    ?observe(_Name,{max_mem,Mem}),
478    {_,QLen} = process_info(self(), message_queue_len),
479    ?observe(_Name,{max_qlen,QLen}),
480    %% When the handler process gets scheduled in, it's impossible
481    %% to predict the QLen. We could jump "up" arbitrarily from say
482    %% async to sync, async to drop, sync to flush, etc. However, when
483    %% the handler process manages the log events (without flushing),
484    %% one after the other, we will move "down" from drop to sync and
485    %% from sync to async. This way we don't risk getting stuck in
486    %% drop or sync mode with an empty mailbox.
487    {Mode1,_NewDrops,_NewFlushes} =
488        if
489            QLen >= FlushQLen ->
490                {flush, 0,1};
491            QLen >= DropModeQLen ->
492                %% Note that drop mode will force load messages to
493                %% be dropped on the client side (never sent to
494                %% the olp process).
495                IncDrops = if Mode == drop -> 0; true -> 1 end,
496                {set_mode(ModeRef, drop), IncDrops,0};
497            QLen >= SyncModeQLen ->
498                {set_mode(ModeRef, sync), 0,0};
499            true ->
500                {set_mode(ModeRef, async), 0,0}
501        end,
502    State1 = ?update_other(drops,DROPS,_NewDrops,State),
503    State2 = ?update_max_qlen(QLen,State1),
504    State3 = ?update_max_mem(Mem,State2),
505    State4 = maybe_notify_mode_change(Mode1,State3),
506    {Mode1, QLen, Mem,
507     ?update_other(flushes,FLUSHES,_NewFlushes,
508                   State4#{last_qlen => QLen})}.
509
510limit_burst(#{burst_limit_enable := false}=State) ->
511     {true,State};
512limit_burst(#{burst_win_ts := BurstWinT0,
513              burst_msg_count := BurstMsgCount,
514              burst_limit_window_time := BurstLimitWinTime,
515              burst_limit_max_count := BurstLimitMaxCnt} = State) ->
516    if (BurstMsgCount >= BurstLimitMaxCnt) ->
517            %% the limit for allowed messages has been reached
518            BurstWinT1 = ?timestamp(),
519            case ?diff_time(BurstWinT1,BurstWinT0) of
520                BurstCheckTime when BurstCheckTime < (BurstLimitWinTime*1000) ->
521                    %% we're still within the burst time frame
522                    {false,?update_other(burst_drops,BURSTS,1,State)};
523                _BurstCheckTime ->
524                    %% burst time frame passed, reset counters
525                    {true,State#{burst_win_ts => BurstWinT1,
526                                 burst_msg_count => 0}}
527            end;
528       true ->
529            %% the limit for allowed messages not yet reached
530            {true,State#{burst_win_ts => BurstWinT0,
531                         burst_msg_count => BurstMsgCount+1}}
532    end.
533
534kill_if_choked(QLen, Mem, #{overload_kill_enable   := KillIfOL,
535                            overload_kill_qlen     := OLKillQLen,
536                            overload_kill_mem_size := OLKillMem}) ->
537    if KillIfOL andalso
538       ((QLen > OLKillQLen) orelse (Mem > OLKillMem)) ->
539            exit({shutdown,{overloaded,QLen,Mem}});
540       true ->
541            ok
542    end.
543
544flush_load(Limit) ->
545    process_flag(priority, high),
546    Flushed = flush_load(0, Limit),
547    process_flag(priority, normal),
548    Flushed.
549
550flush_load(Limit, Limit) ->
551    Limit;
552flush_load(N, Limit) ->
553    %% flush log events but leave other events, such as info, reset
554    %% and stop, so that these have a chance to be processed even
555    %% under heavy load
556    receive
557        {'$gen_cast',{'$olp_load',_}} ->
558            flush_load(N+1, Limit);
559        {'$gen_call',{Pid,MRef},{'$olp_load',_}} ->
560            Pid ! {MRef, dropped},
561            flush_load(N+1, Limit);
562        {log,_,_,_,_} ->
563            flush_load(N+1, Limit);
564        {log,_,_,_} ->
565            flush_load(N+1, Limit)
566    after
567        0 -> N
568    end.
569
570overload_levels_ok(Options) ->
571    SMQL = maps:get(sync_mode_qlen, Options, ?SYNC_MODE_QLEN),
572    DMQL = maps:get(drop_mode_qlen, Options, ?DROP_MODE_QLEN),
573    FQL = maps:get(flush_qlen, Options, ?FLUSH_QLEN),
574    (DMQL > 1) andalso (SMQL =< DMQL) andalso (DMQL =< FQL).
575
576get_mode(Ref) -> persistent_term:get(Ref, async).
577
578set_mode(Ref, M) ->
579    true = is_atom(M), persistent_term:put(Ref, M), M.
580
581maybe_notify_mode_change(drop,#{mode:=Mode0}=State)
582  when Mode0=/=drop ->
583    notify({mode_change,Mode0,drop},State);
584maybe_notify_mode_change(Mode1,#{mode:=drop}=State)
585  when Mode1==async; Mode1==sync ->
586    notify({mode_change,drop,Mode1},State);
587maybe_notify_mode_change(_,State) ->
588    State.
589
590notify(Note,#{module:=Module,cb_state:=CBState}=State) ->
591    CBState1 = try_callback_call(Module,notify,[Note,CBState],CBState),
592    State#{cb_state=>CBState1}.
593
594try_callback_call(Module, Function, Args) ->
595    try_callback_call(Module, Function, Args, '$no_default_return').
596
597try_callback_call(Module, Function, Args, DefRet) ->
598    try apply(Module, Function, Args)
599    catch
600        throw:R -> R;
601        error:undef:S when DefRet=/='$no_default_return' ->
602            case S of
603                [{Module,Function,Args,_}|_] ->
604                    DefRet;
605                _ ->
606                    erlang:raise(error,undef,S)
607            end
608    end.
609
610noreply_return(#{idle:=true}=State) ->
611    {noreply,State};
612noreply_return(#{idle:=false}=State) ->
613    {noreply,State,?IDLE_DETECT_TIME}.
614
615reply_return(Reply,#{idle:=true}=State) ->
616    {reply,Reply,State};
617reply_return(Reply,#{idle:=false}=State) ->
618    {reply,Reply,State,?IDLE_DETECT_TIME}.
619