1%% This file is a copy of gen_server.erl from the R13B-1 Erlang/OTP
2%% distribution, with the following modifications:
3%%
4%% 1) the module name is gen_server2
5%%
6%% 2) more efficient handling of selective receives in callbacks
7%% gen_server2 processes drain their message queue into an internal
8%% buffer before invoking any callback module functions. Messages are
9%% dequeued from the buffer for processing. Thus the effective message
10%% queue of a gen_server2 process is the concatenation of the internal
11%% buffer and the real message queue.
12%% As a result of the draining, any selective receive invoked inside a
13%% callback is less likely to have to scan a large message queue.
14%%
15%% 3) gen_server2:cast is guaranteed to be order-preserving
16%% The original code could reorder messages when communicating with a
17%% process on a remote node that was not currently connected.
18%%
19%% 4) The callback module can optionally implement prioritise_call/4,
20%% prioritise_cast/3 and prioritise_info/3.  These functions take
21%% Message, From, Length and State or just Message, Length and State
22%% (where Length is the current number of messages waiting to be
23%% processed) and return a single integer representing the priority
24%% attached to the message, or 'drop' to ignore it (for
25%% prioritise_cast/3 and prioritise_info/3 only).  Messages with
26%% higher priorities are processed before requests with lower
27%% priorities. The default priority is 0.
28%%
29%% 5) The callback module can optionally implement
30%% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be
31%% called immediately prior to and post hibernation, respectively. If
32%% handle_pre_hibernate returns {hibernate, NewState} then the process
33%% will hibernate. If the module does not implement
34%% handle_pre_hibernate/1 then the default action is to hibernate.
35%%
36%% 6) init can return a 4th arg, {backoff, InitialTimeout,
37%% MinimumTimeout, DesiredHibernatePeriod} (all in milliseconds,
38%% 'infinity' does not make sense here). Then, on all callbacks which
39%% can return a timeout (including init), timeout can be
40%% 'hibernate'. When this is the case, the current timeout value will
41%% be used (initially, the InitialTimeout supplied from init). After
42%% this timeout has occurred, hibernation will occur as normal. Upon
43%% awaking, a new current timeout value will be calculated.
44%%
45%% The purpose is that the gen_server2 takes care of adjusting the
46%% current timeout value such that the process will increase the
47%% timeout value repeatedly if it is unable to sleep for the
48%% DesiredHibernatePeriod. If it is able to sleep for the
49%% DesiredHibernatePeriod it will decrease the current timeout down to
50%% the MinimumTimeout, so that the process is put to sleep sooner (and
51%% hopefully stays asleep for longer). In short, should a process
52%% using this receive a burst of messages, it should not hibernate
53%% between those messages, but as the messages become less frequent,
54%% the process will not only hibernate, it will do so sooner after
55%% each message.
56%%
57%% When using this backoff mechanism, normal timeout values (i.e. not
58%% 'hibernate') can still be used, and if they are used then the
59%% handle_info(timeout, State) will be called as normal. In this case,
60%% returning 'hibernate' from handle_info(timeout, State) will not
61%% hibernate the process immediately, as it would if backoff wasn't
62%% being used. Instead it'll wait for the current timeout as described
63%% above.
64%%
65%% 7) The callback module can return from any of the handle_*
66%% functions, a {become, Module, State} triple, or a {become, Module,
67%% State, Timeout} quadruple. This allows the gen_server to
68%% dynamically change the callback module. The State is the new state
69%% which will be passed into any of the callback functions in the new
70%% module. Note there is no form also encompassing a reply, thus if
71%% you wish to reply in handle_call/3 and change the callback module,
72%% you need to use gen_server2:reply/2 to issue the reply
73%% manually. The init function can similarly return a 5th argument,
74%% Module, in order to dynamically decide the callback module on init.
75%%
76%% 8) The callback module can optionally implement
77%% format_message_queue/2 which is the equivalent of format_status/2
78%% but where the second argument is specifically the priority_queue
79%% which contains the prioritised message_queue.
80%%
81%% 9) The function with_state/2 can be used to debug a process with
82%% heavyweight state (without needing to copy the entire state out of
83%% process as sys:get_status/1 would). Pass through a function which
84%% can be invoked on the state, get back the result. The state is not
85%% modified.
86%%
87%% 10) an mcall/1 function has been added for performing multiple
88%% call/3 in parallel. Unlike multi_call, which sends the same request
89%% to same-named processes residing on a supplied list of nodes, it
90%% operates on name/request pairs, where name is anything accepted by
91%% call/3, i.e. a pid, global name, local name, or local name on a
92%% particular node.
93%%
94%% 11) Internal buffer length is emitted as a core [RabbitMQ] metric.
95
96%% All modifications are (C) 2009-2021 VMware, Inc. or its affiliates.
97
98%% ``The contents of this file are subject to the Erlang Public License,
99%% Version 1.1, (the "License"); you may not use this file except in
100%% compliance with the License. You should have received a copy of the
101%% Erlang Public License along with this software. If not, it can be
102%% retrieved via the world wide web at https://www.erlang.org/.
103%%
104%% Software distributed under the License is distributed on an "AS IS"
105%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
106%% the License for the specific language governing rights and limitations
107%% under the License.
108%%
109%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
110%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
111%% AB. All Rights Reserved.''
112%%
113%%     $Id$
114%%
115-module(gen_server2).
116
117-ifdef(OTP_RELEASE).
118-if(?OTP_RELEASE >= 22).
119-compile(nowarn_deprecated_function).
120-endif.
121-endif.
122
123%%% ---------------------------------------------------
124%%%
125%%% The idea behind THIS server is that the user module
126%%% provides (different) functions to handle different
127%%% kind of inputs.
128%%% If the Parent process terminates the Module:terminate/2
129%%% function is called.
130%%%
131%%% The user module should export:
132%%%
133%%%   init(Args)
134%%%     ==> {ok, State}
135%%%         {ok, State, Timeout}
136%%%         {ok, State, Timeout, Backoff}
137%%%         {ok, State, Timeout, Backoff, Module}
138%%%         ignore
139%%%         {stop, Reason}
140%%%
141%%%   handle_call(Msg, {From, Tag}, State)
142%%%
143%%%    ==> {reply, Reply, State}
144%%%        {reply, Reply, State, Timeout}
145%%%        {noreply, State}
146%%%        {noreply, State, Timeout}
147%%%        {stop, Reason, Reply, State}
148%%%              Reason = normal | shutdown | Term terminate(State) is called
149%%%
150%%%   handle_cast(Msg, State)
151%%%
152%%%    ==> {noreply, State}
153%%%        {noreply, State, Timeout}
154%%%        {stop, Reason, State}
155%%%              Reason = normal | shutdown | Term terminate(State) is called
156%%%
157%%%   handle_info(Info, State) Info is e.g. {'EXIT', P, R}, {nodedown, N}, ...
158%%%
159%%%    ==> {noreply, State}
160%%%        {noreply, State, Timeout}
161%%%        {stop, Reason, State}
162%%%              Reason = normal | shutdown | Term, terminate(State) is called
163%%%
164%%%   terminate(Reason, State) Let the user module clean up
165%%%              Reason = normal | shutdown | {shutdown, Term} | Term
166%%%        always called when server terminates
167%%%
168%%%    ==> ok | Term
169%%%
170%%%   handle_pre_hibernate(State)
171%%%
172%%%    ==> {hibernate, State}
173%%%        {stop, Reason, State}
174%%%              Reason = normal | shutdown | Term, terminate(State) is called
175%%%
176%%%   handle_post_hibernate(State)
177%%%
178%%%    ==> {noreply, State}
179%%%        {stop, Reason, State}
180%%%              Reason = normal | shutdown | Term, terminate(State) is called
181%%%
182%%% The work flow (of the server) can be described as follows:
183%%%
184%%%   User module                          Generic
185%%%   -----------                          -------
186%%%     start            ----->             start
187%%%     init             <-----              .
188%%%
189%%%                                         loop
190%%%     handle_call      <-----              .
191%%%                      ----->             reply
192%%%
193%%%     handle_cast      <-----              .
194%%%
195%%%     handle_info      <-----              .
196%%%
197%%%     terminate        <-----              .
198%%%
199%%%                      ----->             reply
200%%%
201%%%
202%%% ---------------------------------------------------
203
204%% API
205-export([start/3, start/4,
206         start_link/3, start_link/4,
207         stop/1, stop/3,
208         call/2, call/3,
209         cast/2, reply/2,
210         abcast/2, abcast/3,
211         multi_call/2, multi_call/3, multi_call/4,
212         mcall/1,
213         with_state/2,
214         enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/1]).
215
216%% System exports
217-export([system_continue/3,
218         system_terminate/4,
219         system_code_change/4,
220         format_status/2]).
221
222%% Internal exports
223-export([init_it/6]).
224
225-import(error_logger, [format/2]).
226
227%% State record
228-record(gs2_state, {parent, name, state, mod, time,
229                    timeout_state, queue, debug, prioritisers,
230                    timer, emit_stats_fun, stop_stats_fun}).
231
232%%%=========================================================================
233%%%  Specs. These exist only to shut up dialyzer's warnings
234%%%=========================================================================
235
236-type gs2_state() :: #gs2_state{}.
237
238-spec handle_common_termination(any(), atom(), gs2_state()) -> no_return().
239-spec hibernate(gs2_state()) -> no_return().
240-spec pre_hibernate(gs2_state()) -> no_return().
241-spec system_terminate(_, _, _, gs2_state()) -> no_return().
242
243-type millis() :: non_neg_integer().
244
245-dialyzer({nowarn_function, do_multi_call/4}).
246
247%%%=========================================================================
248%%%  API
249%%%=========================================================================
250
251-callback init(Args :: term()) ->
252    {ok, State :: term()} |
253    {ok, State :: term(), timeout() | hibernate} |
254    {ok, State :: term(), timeout() | hibernate,
255     {backoff, millis(), millis(), millis()}} |
256    {ok, State :: term(), timeout() | hibernate,
257     {backoff, millis(), millis(), millis()}, atom()} |
258    ignore |
259    {stop, Reason :: term()}.
260-callback handle_call(Request :: term(), From :: {pid(), Tag :: term()},
261                      State :: term()) ->
262    {reply, Reply :: term(), NewState :: term()} |
263    {reply, Reply :: term(), NewState :: term(), timeout() | hibernate} |
264    {noreply, NewState :: term()} |
265    {noreply, NewState :: term(), timeout() | hibernate} |
266    {stop, Reason :: term(),
267     Reply :: term(), NewState :: term()}.
268-callback handle_cast(Request :: term(), State :: term()) ->
269    {noreply, NewState :: term()} |
270    {noreply, NewState :: term(), timeout() | hibernate} |
271    {stop, Reason :: term(), NewState :: term()}.
272-callback handle_info(Info :: term(), State :: term()) ->
273    {noreply, NewState :: term()} |
274    {noreply, NewState :: term(), timeout() | hibernate} |
275    {stop, Reason :: term(), NewState :: term()}.
276-callback terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
277                    State :: term()) ->
278    ok | term().
279-callback code_change(OldVsn :: (term() | {down, term()}), State :: term(),
280                      Extra :: term()) ->
281    {ok, NewState :: term()} | {error, Reason :: term()}.
282
283%% It's not possible to define "optional" -callbacks, so putting specs
284%% for handle_pre_hibernate/1 and handle_post_hibernate/1 will result
285%% in warnings (the same applied for the behaviour_info before).
286
287%%%  -----------------------------------------------------------------
288%%% Starts a generic server.
289%%% start(Mod, Args, Options)
290%%% start(Name, Mod, Args, Options)
291%%% start_link(Mod, Args, Options)
292%%% start_link(Name, Mod, Args, Options) where:
293%%%    Name ::= {local, atom()} | {global, atom()}
294%%%    Mod  ::= atom(), callback module implementing the 'real' server
295%%%    Args ::= term(), init arguments (to Mod:init/1)
296%%%    Options ::= [{timeout, Timeout} | {debug, [Flag]}]
297%%%      Flag ::= trace | log | {logfile, File} | statistics | debug
298%%%          (debug == log && statistics)
299%%% Returns: {ok, Pid} |
300%%%          {error, {already_started, Pid}} |
301%%%          {error, Reason}
302%%% -----------------------------------------------------------------
303start(Mod, Args, Options) ->
304    gen:start(?MODULE, nolink, Mod, Args, Options).
305
306start(Name, Mod, Args, Options) ->
307    gen:start(?MODULE, nolink, Name, Mod, Args, Options).
308
309start_link(Mod, Args, Options) ->
310    gen:start(?MODULE, link, Mod, Args, Options).
311
312start_link(Name, Mod, Args, Options) ->
313    gen:start(?MODULE, link, Name, Mod, Args, Options).
314
315%% -----------------------------------------------------------------
316%% Stop a generic server and wait for it to terminate.
317%% If the server is located at another node, that node will
318%% be monitored.
319%% -----------------------------------------------------------------
320stop(Name) ->
321    gen:stop(Name).
322
323stop(Name, Reason, Timeout) ->
324    gen:stop(Name, Reason, Timeout).
325
326%% -----------------------------------------------------------------
327%% Make a call to a generic server.
328%% If the server is located at another node, that node will
329%% be monitored.
330%% If the client is trapping exits and is linked server termination
331%% is handled here (? Shall we do that here (or rely on timeouts) ?).
332%% -----------------------------------------------------------------
333call(Name, Request) ->
334    case catch gen:call(Name, '$gen_call', Request) of
335        {ok,Res} ->
336            Res;
337        {'EXIT',Reason} ->
338            exit({Reason, {?MODULE, call, [Name, Request]}})
339    end.
340
341call(Name, Request, Timeout) ->
342    case catch gen:call(Name, '$gen_call', Request, Timeout) of
343        {ok,Res} ->
344            Res;
345        {'EXIT',Reason} ->
346            exit({Reason, {?MODULE, call, [Name, Request, Timeout]}})
347    end.
348
349%% -----------------------------------------------------------------
350%% Make a cast to a generic server.
351%% -----------------------------------------------------------------
352cast({global,Name}, Request) ->
353    catch global:send(Name, {'$gen_cast', Request}),
354    ok;
355cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) ->
356    catch (Dest ! {'$gen_cast', Request}),
357    ok;
358cast(Dest, Request) when is_atom(Dest); is_pid(Dest) ->
359    catch (Dest ! {'$gen_cast', Request}),
360    ok.
361
362%% -----------------------------------------------------------------
363%% Send a reply to the client.
364%% -----------------------------------------------------------------
365reply({To, Tag}, Reply) ->
366    catch To ! {Tag, Reply}.
367
368%% -----------------------------------------------------------------
369%% Asynchronous broadcast, returns nothing, it's just send'n pray
370%% -----------------------------------------------------------------
371abcast(Name, Request) when is_atom(Name) ->
372    do_abcast([node() | nodes()], Name, {'$gen_cast', Request}).
373
374abcast(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) ->
375    do_abcast(Nodes, Name, {'$gen_cast', Request}).
376
377do_abcast([Node|Nodes], Name, Msg) when is_atom(Node) ->
378    catch ({Name, Node} ! Msg),
379    do_abcast(Nodes, Name, Msg);
380do_abcast([], _,_) -> abcast.
381
382%%% -----------------------------------------------------------------
383%%% Make a call to servers at several nodes.
384%%% Returns: {[Replies],[BadNodes]}
385%%% A Timeout can be given
386%%%
387%%% A middleman process is used in case late answers arrives after
388%%% the timeout. If they would be allowed to glog the callers message
389%%% queue, it would probably become confused. Late answers will
390%%% now arrive to the terminated middleman and so be discarded.
391%%% -----------------------------------------------------------------
392multi_call(Name, Req)
393  when is_atom(Name) ->
394    do_multi_call([node() | nodes()], Name, Req, infinity).
395
396multi_call(Nodes, Name, Req)
397  when is_list(Nodes), is_atom(Name) ->
398    do_multi_call(Nodes, Name, Req, infinity).
399
400multi_call(Nodes, Name, Req, infinity) ->
401    do_multi_call(Nodes, Name, Req, infinity);
402multi_call(Nodes, Name, Req, Timeout)
403  when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 ->
404    do_multi_call(Nodes, Name, Req, Timeout).
405
406%%% -----------------------------------------------------------------
407%%% Make multiple calls to multiple servers, given pairs of servers
408%%% and messages.
409%%% Returns: {[{Dest, Reply}], [{Dest, Error}]}
410%%%
411%%% Dest can be pid() | RegName :: atom() |
412%%%             {Name :: atom(), Node :: atom()} | {global, Name :: atom()}
413%%%
414%%% A middleman process is used to avoid clogging up the callers
415%%% message queue.
416%%% -----------------------------------------------------------------
417mcall(CallSpecs) ->
418    Tag = make_ref(),
419    {_, MRef} = spawn_monitor(
420                  fun() ->
421                          Refs = lists:foldl(
422                                   fun ({Dest, _Request}=S, Dict) ->
423                                           dict:store(do_mcall(S), Dest, Dict)
424                                   end, dict:new(), CallSpecs),
425                          collect_replies(Tag, Refs, [], [])
426                  end),
427    receive
428        {'DOWN', MRef, _, _, {Tag, Result}} -> Result;
429        {'DOWN', MRef, _, _, Reason}        -> exit(Reason)
430    end.
431
432do_mcall({{global,Name}=Dest, Request}) ->
433    %% whereis_name is simply an ets lookup, and is precisely what
434    %% global:send/2 does, yet we need a Ref to put in the call to the
435    %% server, so invoking whereis_name makes a lot more sense here.
436    case global:whereis_name(Name) of
437        Pid when is_pid(Pid) ->
438            MRef = erlang:monitor(process, Pid),
439            catch msend(Pid, MRef, Request),
440            MRef;
441        undefined ->
442            Ref = make_ref(),
443            self() ! {'DOWN', Ref, process, Dest, noproc},
444            Ref
445    end;
446do_mcall({{Name,Node}=Dest, Request}) when is_atom(Name), is_atom(Node) ->
447    {_Node, MRef} = start_monitor(Node, Name), %% NB: we don't handle R6
448    catch msend(Dest, MRef, Request),
449    MRef;
450do_mcall({Dest, Request}) when is_atom(Dest); is_pid(Dest) ->
451    MRef = erlang:monitor(process, Dest),
452    catch msend(Dest, MRef, Request),
453    MRef.
454
455msend(Dest, MRef, Request) ->
456    erlang:send(Dest, {'$gen_call', {self(), MRef}, Request}, [noconnect]).
457
458collect_replies(Tag, Refs, Replies, Errors) ->
459    case dict:size(Refs) of
460        0 -> exit({Tag, {Replies, Errors}});
461        _ -> receive
462                 {MRef, Reply} ->
463                     {Refs1, Replies1} = handle_call_result(MRef, Reply,
464                                                            Refs, Replies),
465                     collect_replies(Tag, Refs1, Replies1, Errors);
466                 {'DOWN', MRef, _, _, Reason} ->
467                     Reason1 = case Reason of
468                                   noconnection -> nodedown;
469                                   _            -> Reason
470                               end,
471                     {Refs1, Errors1} = handle_call_result(MRef, Reason1,
472                                                           Refs, Errors),
473                     collect_replies(Tag, Refs1, Replies, Errors1)
474             end
475    end.
476
477handle_call_result(MRef, Result, Refs, AccList) ->
478    %% we avoid the mailbox scanning cost of a call to erlang:demonitor/{1,2}
479    %% here, so we must cope with MRefs that we've already seen and erased
480    case dict:find(MRef, Refs) of
481        {ok, Pid} -> {dict:erase(MRef, Refs), [{Pid, Result}|AccList]};
482        _         -> {Refs, AccList}
483    end.
484
485%% -----------------------------------------------------------------
486%% Apply a function to a generic server's state.
487%% -----------------------------------------------------------------
488with_state(Name, Fun) ->
489    case catch gen:call(Name, '$with_state', Fun, infinity) of
490        {ok,Res} ->
491            Res;
492        {'EXIT',Reason} ->
493            exit({Reason, {?MODULE, with_state, [Name, Fun]}})
494    end.
495
496%%-----------------------------------------------------------------
497%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_
498%%
499%% Description: Makes an existing process into a gen_server.
500%%              The calling process will enter the gen_server receive
501%%              loop and become a gen_server process.
502%%              The process *must* have been started using one of the
503%%              start functions in proc_lib, see proc_lib(3).
504%%              The user is responsible for any initialization of the
505%%              process, including registering a name for it.
506%%-----------------------------------------------------------------
507enter_loop(Mod, Options, State) ->
508    enter_loop(Mod, Options, State, self(), infinity, undefined).
509
510enter_loop(Mod, Options, State, Backoff = {backoff, _, _ , _}) ->
511    enter_loop(Mod, Options, State, self(), infinity, Backoff);
512
513enter_loop(Mod, Options, State, ServerName = {_, _}) ->
514    enter_loop(Mod, Options, State, ServerName, infinity, undefined);
515
516enter_loop(Mod, Options, State, Timeout) ->
517    enter_loop(Mod, Options, State, self(), Timeout, undefined).
518
519enter_loop(Mod, Options, State, ServerName, Backoff = {backoff, _, _, _}) ->
520    enter_loop(Mod, Options, State, ServerName, infinity, Backoff);
521
522enter_loop(Mod, Options, State, ServerName, Timeout) ->
523    enter_loop(Mod, Options, State, ServerName, Timeout, undefined).
524
525enter_loop(Mod, Options, State, ServerName, Timeout, Backoff) ->
526    Name = get_proc_name(ServerName),
527    Parent = get_parent(),
528    Debug = debug_options(Name, Options),
529    Queue = priority_queue:new(),
530    Backoff1 = extend_backoff(Backoff),
531    {EmitStatsFun, StopStatsFun} = stats_funs(),
532    loop(init_stats(find_prioritisers(
533           #gs2_state { parent = Parent, name = Name, state = State,
534                        mod = Mod, time = Timeout, timeout_state = Backoff1,
535                        queue = Queue, debug = Debug,
536                        emit_stats_fun = EmitStatsFun,
537                        stop_stats_fun = StopStatsFun }))).
538
539%%%========================================================================
540%%% Gen-callback functions
541%%%========================================================================
542
543%%% ---------------------------------------------------
544%%% Initiate the new process.
545%%% Register the name using the Rfunc function
546%%% Calls the Mod:init/Args function.
547%%% Finally an acknowledge is sent to Parent and the main
548%%% loop is entered.
549%%% ---------------------------------------------------
550init_it(Starter, self, Name, Mod, Args, Options) ->
551    init_it(Starter, self(), Name, Mod, Args, Options);
552init_it(Starter, Parent, Name0, Mod, Args, Options) ->
553    Name = name(Name0),
554    Debug = debug_options(Name, Options),
555    Queue = priority_queue:new(),
556    {EmitStatsFun, StopStatsFun} = stats_funs(),
557    GS2State = find_prioritisers(
558                 #gs2_state { parent  = Parent,
559                              name    = Name,
560                              mod     = Mod,
561                              queue   = Queue,
562                              debug   = Debug,
563                              emit_stats_fun = EmitStatsFun,
564                              stop_stats_fun = StopStatsFun }),
565    case catch Mod:init(Args) of
566        {ok, State} ->
567            proc_lib:init_ack(Starter, {ok, self()}),
568            loop(init_stats(GS2State#gs2_state { state         = State,
569                                                   time          = infinity,
570                                                   timeout_state = undefined }));
571        {ok, State, Timeout} ->
572            proc_lib:init_ack(Starter, {ok, self()}),
573            loop(init_stats(
574                   GS2State#gs2_state { state         = State,
575                                        time          = Timeout,
576                                        timeout_state = undefined }));
577        {ok, State, Timeout, Backoff = {backoff, _, _, _}} ->
578            Backoff1 = extend_backoff(Backoff),
579            proc_lib:init_ack(Starter, {ok, self()}),
580            loop(init_stats(GS2State#gs2_state { state         = State,
581                                                   time          = Timeout,
582                                                   timeout_state = Backoff1 }));
583        {ok, State, Timeout, Backoff = {backoff, _, _, _}, Mod1} ->
584            Backoff1 = extend_backoff(Backoff),
585            proc_lib:init_ack(Starter, {ok, self()}),
586            loop(init_stats(find_prioritisers(
587                                GS2State#gs2_state { mod           = Mod1,
588                                                     state         = State,
589                                                     time          = Timeout,
590                                                     timeout_state = Backoff1 })));
591        {stop, Reason} ->
592            %% For consistency, we must make sure that the
593            %% registered name (if any) is unregistered before
594            %% the parent process is notified about the failure.
595            %% (Otherwise, the parent process could get
596            %% an 'already_started' error if it immediately
597            %% tried starting the process again.)
598            unregister_name(Name0),
599            proc_lib:init_ack(Starter, {error, Reason}),
600            exit(Reason);
601        ignore ->
602            unregister_name(Name0),
603            proc_lib:init_ack(Starter, ignore),
604            exit(normal);
605        {'EXIT', Reason} ->
606            unregister_name(Name0),
607            proc_lib:init_ack(Starter, {error, Reason}),
608            exit(Reason);
609        Else ->
610            Error = {bad_return_value, Else},
611            proc_lib:init_ack(Starter, {error, Error}),
612            exit(Error)
613    end.
614
615name({local,Name}) -> Name;
616name({global,Name}) -> Name;
617%% name(Pid) when is_pid(Pid) -> Pid;
618%% when R12 goes away, drop the line beneath and uncomment the line above
619name(Name) -> Name.
620
621unregister_name({local,Name}) ->
622    _ = (catch unregister(Name));
623unregister_name({global,Name}) ->
624    _ = global:unregister_name(Name);
625unregister_name(Pid) when is_pid(Pid) ->
626    Pid;
627%% Under R12 let's just ignore it, as we have a single term as Name.
628%% On R13 it will never get here, as we get tuple with 'local/global' atom.
629unregister_name(_Name) -> ok.
630
631extend_backoff(undefined) ->
632    undefined;
633extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) ->
634    {backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod,
635      rand:seed(exsplus)}.
636
637%%%========================================================================
638%%% Internal functions
639%%%========================================================================
640%%% ---------------------------------------------------
641%%% The MAIN loop.
642%%% ---------------------------------------------------
643loop(GS2State = #gs2_state { time          = hibernate,
644                             timeout_state = undefined,
645                             queue         = Queue }) ->
646    case priority_queue:is_empty(Queue) of
647        true  ->
648            pre_hibernate(GS2State);
649        false ->
650            process_next_msg(GS2State)
651    end;
652
653loop(GS2State) ->
654    process_next_msg(drain(GS2State)).
655
656drain(GS2State) ->
657    receive
658        Input -> drain(in(Input, GS2State))
659    after 0 -> GS2State
660    end.
661
662process_next_msg(GS2State0 = #gs2_state { time          = Time,
663                                         timeout_state = TimeoutState,
664                                         queue         = Queue }) ->
665    case priority_queue:out(Queue) of
666        {{value, Msg}, Queue1} ->
667            GS2State = ensure_stats_timer(GS2State0),
668            process_msg(Msg, GS2State#gs2_state { queue = Queue1 });
669        {empty, Queue1} ->
670            {Time1, HibOnTimeout, GS2State}
671                = case {Time, TimeoutState} of
672                      {hibernate, {backoff, Current, _Min, _Desired, _RSt}} ->
673                          {Current, true, stop_stats_timer(GS2State0)};
674                      {hibernate, _} ->
675                          %% wake_hib/7 will set Time to hibernate. If
676                          %% we were woken and didn't receive a msg
677                          %% then we will get here and need a sensible
678                          %% value for Time1, otherwise we crash.
679                          %% R13B1 always waits infinitely when waking
680                          %% from hibernation, so that's what we do
681                          %% here too.
682                          {infinity, false, GS2State0};
683                      _ -> {Time, false, GS2State0}
684                  end,
685            receive
686                Input ->
687                    %% Time could be 'hibernate' here, so *don't* call loop
688                    process_next_msg(
689                      drain(in(Input, GS2State #gs2_state { queue = Queue1 })))
690            after Time1 ->
691                    case HibOnTimeout of
692                        true ->
693                            pre_hibernate(
694                              GS2State #gs2_state { queue = Queue1 });
695                        false ->
696                            process_msg(timeout,
697                                        GS2State #gs2_state { queue = Queue1 })
698                    end
699            end
700    end.
701
702wake_hib(GS2State = #gs2_state { timeout_state = TS }) ->
703    TimeoutState1 = case TS of
704                        undefined ->
705                            undefined;
706                        {SleptAt, TimeoutState} ->
707                            adjust_timeout_state(SleptAt,
708                                                 erlang:monotonic_time(),
709                                                 TimeoutState)
710                    end,
711    post_hibernate(
712      drain(GS2State #gs2_state { timeout_state = TimeoutState1 })).
713
714hibernate(GS2State = #gs2_state { timeout_state = TimeoutState }) ->
715    TS = case TimeoutState of
716             undefined             -> undefined;
717             {backoff, _, _, _, _} -> {erlang:monotonic_time(),
718                                       TimeoutState}
719         end,
720    proc_lib:hibernate(?MODULE, wake_hib,
721                       [GS2State #gs2_state { timeout_state = TS }]).
722
723pre_hibernate(GS2State0 = #gs2_state { state          = State,
724                                       mod            = Mod,
725                                       emit_stats_fun = EmitStatsFun }) ->
726    GS2State = EmitStatsFun(stop_stats_timer(GS2State0)),
727    case erlang:function_exported(Mod, handle_pre_hibernate, 1) of
728        true ->
729            case catch Mod:handle_pre_hibernate(State) of
730                {hibernate, NState} ->
731                    hibernate(GS2State #gs2_state { state = NState } );
732                Reply ->
733                    handle_common_termination(Reply, pre_hibernate, GS2State)
734            end;
735        false ->
736            hibernate(GS2State)
737    end.
738
739post_hibernate(GS2State0 = #gs2_state { state = State,
740                                        mod   = Mod }) ->
741    GS2State = ensure_stats_timer(GS2State0),
742    case erlang:function_exported(Mod, handle_post_hibernate, 1) of
743        true ->
744            case catch Mod:handle_post_hibernate(State) of
745                {noreply, NState} ->
746                    process_next_msg(GS2State #gs2_state { state = NState,
747                                                           time  = infinity });
748                {noreply, NState, Time} ->
749                    process_next_msg(GS2State #gs2_state { state = NState,
750                                                           time  = Time });
751                Reply ->
752                    handle_common_termination(Reply, post_hibernate, GS2State)
753            end;
754        false ->
755            %% use hibernate here, not infinity. This matches
756            %% R13B. The key is that we should be able to get through
757            %% to process_msg calling sys:handle_system_msg with Time
758            %% still set to hibernate, iff that msg is the very msg
759            %% that woke us up (or the first msg we receive after
760            %% waking up).
761            process_next_msg(GS2State #gs2_state { time = hibernate })
762    end.
763
764adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO,
765                                        DesiredHibPeriod, RandomState}) ->
766    NapLengthMicros = erlang:convert_time_unit(AwokeAt - SleptAt,
767                                                    native, micro_seconds),
768    CurrentMicros = CurrentTO * 1000,
769    MinimumMicros = MinimumTO * 1000,
770    DesiredHibMicros = DesiredHibPeriod * 1000,
771    GapBetweenMessagesMicros = NapLengthMicros + CurrentMicros,
772    Base =
773        %% If enough time has passed between the last two messages then we
774        %% should consider sleeping sooner. Otherwise stay awake longer.
775        case GapBetweenMessagesMicros > (MinimumMicros + DesiredHibMicros) of
776            true -> lists:max([MinimumTO, CurrentTO div 2]);
777            false -> CurrentTO
778        end,
779    {Extra, RandomState1} = rand:uniform_s(Base, RandomState),
780    CurrentTO1 = Base + Extra,
781    {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}.
782
783in({'$gen_cast', Msg} = Input,
784   GS2State = #gs2_state { prioritisers = {_, F, _} }) ->
785    in(Input, F(Msg, GS2State), GS2State);
786in({'$gen_call', From, Msg} = Input,
787   GS2State = #gs2_state { prioritisers = {F, _, _} }) ->
788    in(Input, F(Msg, From, GS2State), GS2State);
789in({'$with_state', _From, _Fun} = Input, GS2State) ->
790    in(Input, 0, GS2State);
791in({'EXIT', Parent, _R} = Input, GS2State = #gs2_state { parent = Parent }) ->
792    in(Input, infinity, GS2State);
793in({system, _From, _Req} = Input, GS2State) ->
794    in(Input, infinity, GS2State);
795in(emit_gen_server2_stats, GS2State = #gs2_state{ emit_stats_fun = EmitStatsFun}) ->
796    next_stats_timer(EmitStatsFun(GS2State));
797in(Input, GS2State = #gs2_state { prioritisers = {_, _, F} }) ->
798    in(Input, F(Input, GS2State), GS2State).
799
800in(_Input, drop, GS2State) ->
801    GS2State;
802
803in(Input, Priority, GS2State = #gs2_state { queue = Queue }) ->
804    GS2State # gs2_state { queue = priority_queue:in(Input, Priority, Queue) }.
805
806process_msg({system, From, Req},
807            GS2State = #gs2_state { parent = Parent, debug  = Debug }) ->
808    case Req of
809        %% This clause will match only in R16B03.
810        %% Since 17.0 replace_state is not a system message.
811        {replace_state, StateFun} ->
812            GS2State1 = StateFun(GS2State),
813            _ = gen:reply(From, GS2State1),
814            system_continue(Parent, Debug, GS2State1);
815        _ ->
816            %% gen_server puts Hib on the end as the 7th arg, but that version
817            %% of the fun seems not to be documented so leaving out for now.
818            sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, GS2State)
819    end;
820process_msg({'$with_state', From, Fun},
821           GS2State = #gs2_state{state = State}) ->
822    reply(From, catch Fun(State)),
823    loop(GS2State);
824process_msg({'EXIT', Parent, Reason} = Msg,
825            GS2State = #gs2_state { parent = Parent }) ->
826    terminate(Reason, Msg, GS2State);
827process_msg(Msg, GS2State = #gs2_state { debug  = [] }) ->
828    handle_msg(Msg, GS2State);
829process_msg(Msg, GS2State = #gs2_state { name = Name, debug  = Debug }) ->
830    Debug1 = sys:handle_debug(Debug, fun print_event/3, Name, {in, Msg}),
831    handle_msg(Msg, GS2State #gs2_state { debug = Debug1 }).
832
833%%% ---------------------------------------------------
834%%% Send/recive functions
835%%% ---------------------------------------------------
836
837do_multi_call(Nodes, Name, Req, infinity) ->
838    Tag = make_ref(),
839    Monitors = send_nodes(Nodes, Name, Tag, Req),
840    rec_nodes(Tag, Monitors, Name, undefined);
841do_multi_call(Nodes, Name, Req, Timeout) ->
842    Tag = make_ref(),
843    Caller = self(),
844    Receiver =
845        spawn(
846          fun () ->
847                  %% Middleman process. Should be unsensitive to regular
848                  %% exit signals. The synchronization is needed in case
849                  %% the receiver would exit before the caller started
850                  %% the monitor.
851                  process_flag(trap_exit, true),
852                  Mref = erlang:monitor(process, Caller),
853                  receive
854                      {Caller,Tag} ->
855                          Monitors = send_nodes(Nodes, Name, Tag, Req),
856                          TimerId = erlang:start_timer(Timeout, self(), ok),
857                          Result = rec_nodes(Tag, Monitors, Name, TimerId),
858                          exit({self(),Tag,Result});
859                      {'DOWN',Mref,_,_,_} ->
860                          %% Caller died before sending us the go-ahead.
861                          %% Give up silently.
862                          exit(normal)
863                  end
864          end),
865    Mref = erlang:monitor(process, Receiver),
866    Receiver ! {self(),Tag},
867    receive
868        {'DOWN',Mref,_,_,{Receiver,Tag,Result}} ->
869            Result;
870        {'DOWN',Mref,_,_,Reason} ->
871            %% The middleman code failed. Or someone did
872            %% exit(_, kill) on the middleman process => Reason==killed
873            exit(Reason)
874    end.
875
876send_nodes(Nodes, Name, Tag, Req) ->
877    send_nodes(Nodes, Name, Tag, Req, []).
878
879send_nodes([Node|Tail], Name, Tag, Req, Monitors)
880  when is_atom(Node) ->
881    Monitor = start_monitor(Node, Name),
882    %% Handle non-existing names in rec_nodes.
883    catch {Name, Node} ! {'$gen_call', {self(), {Tag, Node}}, Req},
884    send_nodes(Tail, Name, Tag, Req, [Monitor | Monitors]);
885send_nodes([_Node|Tail], Name, Tag, Req, Monitors) ->
886    %% Skip non-atom Node
887    send_nodes(Tail, Name, Tag, Req, Monitors);
888send_nodes([], _Name, _Tag, _Req, Monitors) ->
889    Monitors.
890
891%% Against old nodes:
892%% If no reply has been delivered within 2 secs. (per node) check that
893%% the server really exists and wait for ever for the answer.
894%%
895%% Against contemporary nodes:
896%% Wait for reply, server 'DOWN', or timeout from TimerId.
897
898rec_nodes(Tag, Nodes, Name, TimerId) ->
899    rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId).
900
901rec_nodes(Tag, [{N,R}|Tail], Name, Badnodes, Replies, Time, TimerId ) ->
902    receive
903        {'DOWN', R, _, _, _} ->
904            rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId);
905        {{Tag, N}, Reply} ->  %% Tag is bound !!!
906            unmonitor(R),
907            rec_nodes(Tag, Tail, Name, Badnodes,
908                      [{N,Reply}|Replies], Time, TimerId);
909        {timeout, TimerId, _} ->
910            unmonitor(R),
911            %% Collect all replies that already have arrived
912            rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
913    end;
914rec_nodes(Tag, [N|Tail], Name, Badnodes, Replies, Time, TimerId) ->
915    %% R6 node
916    receive
917        {nodedown, N} ->
918            monitor_node(N, false),
919            rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId);
920        {{Tag, N}, Reply} ->  %% Tag is bound !!!
921            receive {nodedown, N} -> ok after 0 -> ok end,
922            monitor_node(N, false),
923            rec_nodes(Tag, Tail, Name, Badnodes,
924                      [{N,Reply}|Replies], 2000, TimerId);
925        {timeout, TimerId, _} ->
926            receive {nodedown, N} -> ok after 0 -> ok end,
927            monitor_node(N, false),
928            %% Collect all replies that already have arrived
929            rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies)
930    after Time ->
931            case rpc:call(N, erlang, whereis, [Name]) of
932                Pid when is_pid(Pid) -> % It exists try again.
933                    rec_nodes(Tag, [N|Tail], Name, Badnodes,
934                              Replies, infinity, TimerId);
935                _ -> % badnode
936                    receive {nodedown, N} -> ok after 0 -> ok end,
937                    monitor_node(N, false),
938                    rec_nodes(Tag, Tail, Name, [N|Badnodes],
939                              Replies, 2000, TimerId)
940            end
941    end;
942rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) ->
943    case catch erlang:cancel_timer(TimerId) of
944        false ->  % It has already sent it's message
945            receive
946                {timeout, TimerId, _} -> ok
947            after 0 ->
948                    ok
949            end;
950        _ -> % Timer was cancelled, or TimerId was 'undefined'
951            ok
952    end,
953    {Replies, Badnodes}.
954
955%% Collect all replies that already have arrived
956rec_nodes_rest(Tag, [{N,R}|Tail], Name, Badnodes, Replies) ->
957    receive
958        {'DOWN', R, _, _, _} ->
959            rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
960        {{Tag, N}, Reply} -> %% Tag is bound !!!
961            unmonitor(R),
962            rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
963    after 0 ->
964            unmonitor(R),
965            rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
966    end;
967rec_nodes_rest(Tag, [N|Tail], Name, Badnodes, Replies) ->
968    %% R6 node
969    receive
970        {nodedown, N} ->
971            monitor_node(N, false),
972            rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
973        {{Tag, N}, Reply} ->  %% Tag is bound !!!
974            receive {nodedown, N} -> ok after 0 -> ok end,
975            monitor_node(N, false),
976            rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
977    after 0 ->
978            receive {nodedown, N} -> ok after 0 -> ok end,
979            monitor_node(N, false),
980            rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
981    end;
982rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) ->
983    {Replies, Badnodes}.
984
985
986%%% ---------------------------------------------------
987%%% Monitor functions
988%%% ---------------------------------------------------
989
990start_monitor(Node, Name) when is_atom(Node), is_atom(Name) ->
991    if node() =:= nonode@nohost, Node =/= nonode@nohost ->
992            Ref = make_ref(),
993            self() ! {'DOWN', Ref, process, {Name, Node}, noconnection},
994            {Node, Ref};
995       true ->
996            case catch erlang:monitor(process, {Name, Node}) of
997                {'EXIT', _} ->
998                    %% Remote node is R6
999                    monitor_node(Node, true),
1000                    Node;
1001                Ref when is_reference(Ref) ->
1002                    {Node, Ref}
1003            end
1004    end.
1005
1006%% Cancels a monitor started with Ref=erlang:monitor(_, _).
1007unmonitor(Ref) when is_reference(Ref) ->
1008    erlang:demonitor(Ref),
1009    receive
1010        {'DOWN', Ref, _, _, _} ->
1011            true
1012    after 0 ->
1013            true
1014    end.
1015
1016%%% ---------------------------------------------------
1017%%% Message handling functions
1018%%% ---------------------------------------------------
1019
1020dispatch({'$gen_cast', Msg}, Mod, State) ->
1021    Mod:handle_cast(Msg, State);
1022dispatch(Info, Mod, State) ->
1023    Mod:handle_info(Info, State).
1024
1025common_reply(_Name, From, Reply, _NState, [] = _Debug) ->
1026    reply(From, Reply),
1027    [];
1028common_reply(Name, {To, _Tag} = From, Reply, NState, Debug) ->
1029    reply(From, Reply),
1030    sys:handle_debug(Debug, fun print_event/3, Name, {out, Reply, To, NState}).
1031
1032common_noreply(_Name, _NState, [] = _Debug) ->
1033    [];
1034common_noreply(Name, NState, Debug) ->
1035    sys:handle_debug(Debug, fun print_event/3, Name, {noreply, NState}).
1036
1037common_become(_Name, _Mod, _NState, [] = _Debug) ->
1038    [];
1039common_become(Name, Mod, NState, Debug) ->
1040    sys:handle_debug(Debug, fun print_event/3, Name, {become, Mod, NState}).
1041
1042handle_msg({'$gen_call', From, Msg}, GS2State = #gs2_state { mod = Mod,
1043                                                             state = State,
1044                                                             name = Name,
1045                                                             debug = Debug }) ->
1046    case catch Mod:handle_call(Msg, From, State) of
1047        {reply, Reply, NState} ->
1048            Debug1 = common_reply(Name, From, Reply, NState, Debug),
1049            loop(GS2State #gs2_state { state = NState,
1050                                       time  = infinity,
1051                                       debug = Debug1 });
1052        {reply, Reply, NState, Time1} ->
1053            Debug1 = common_reply(Name, From, Reply, NState, Debug),
1054            loop(GS2State #gs2_state { state = NState,
1055                                       time  = Time1,
1056                                       debug = Debug1});
1057        {stop, Reason, Reply, NState} ->
1058            {'EXIT', R} =
1059                (catch terminate(Reason, Msg,
1060                                 GS2State #gs2_state { state = NState })),
1061            _ = common_reply(Name, From, Reply, NState, Debug),
1062            exit(R);
1063        Other ->
1064            handle_common_reply(Other, Msg, GS2State)
1065    end;
1066handle_msg(Msg, GS2State = #gs2_state { mod = Mod, state = State }) ->
1067    Reply = (catch dispatch(Msg, Mod, State)),
1068    handle_common_reply(Reply, Msg, GS2State).
1069
1070handle_common_reply(Reply, Msg, GS2State = #gs2_state { name  = Name,
1071                                                        debug = Debug}) ->
1072    case Reply of
1073        {noreply, NState} ->
1074            Debug1 = common_noreply(Name, NState, Debug),
1075            loop(GS2State #gs2_state {state = NState,
1076                                      time  = infinity,
1077                                      debug = Debug1});
1078        {noreply, NState, Time1} ->
1079            Debug1 = common_noreply(Name, NState, Debug),
1080            loop(GS2State #gs2_state {state = NState,
1081                                      time  = Time1,
1082                                      debug = Debug1});
1083        {become, Mod, NState} ->
1084            Debug1 = common_become(Name, Mod, NState, Debug),
1085            loop(find_prioritisers(
1086                   GS2State #gs2_state { mod   = Mod,
1087                                         state = NState,
1088                                         time  = infinity,
1089                                         debug = Debug1 }));
1090        {become, Mod, NState, Time1} ->
1091            Debug1 = common_become(Name, Mod, NState, Debug),
1092            loop(find_prioritisers(
1093                   GS2State #gs2_state { mod   = Mod,
1094                                         state = NState,
1095                                         time  = Time1,
1096                                         debug = Debug1 }));
1097        _ ->
1098            handle_common_termination(Reply, Msg, GS2State)
1099    end.
1100
1101handle_common_termination(Reply, Msg, GS2State) ->
1102    case Reply of
1103        {stop, Reason, NState} ->
1104            terminate(Reason, Msg, GS2State #gs2_state { state = NState });
1105        {'EXIT', What} ->
1106            terminate(What, Msg, GS2State);
1107        _ ->
1108            terminate({bad_return_value, Reply}, Msg, GS2State)
1109    end.
1110
1111%%-----------------------------------------------------------------
1112%% Callback functions for system messages handling.
1113%%-----------------------------------------------------------------
1114system_continue(Parent, Debug, GS2State) ->
1115    loop(GS2State #gs2_state { parent = Parent, debug = Debug }).
1116
1117system_terminate(Reason, _Parent, Debug, GS2State) ->
1118    terminate(Reason, [], GS2State #gs2_state { debug = Debug }).
1119
1120system_code_change(GS2State = #gs2_state { mod   = Mod,
1121                                           state = State },
1122                   _Module, OldVsn, Extra) ->
1123    case catch Mod:code_change(OldVsn, State, Extra) of
1124        {ok, NewState} ->
1125            NewGS2State = find_prioritisers(
1126                            GS2State #gs2_state { state = NewState }),
1127            {ok, [NewGS2State]};
1128        Else ->
1129            Else
1130    end.
1131
1132%%-----------------------------------------------------------------
1133%% Format debug messages.  Print them as the call-back module sees
1134%% them, not as the real erlang messages.  Use trace for that.
1135%%-----------------------------------------------------------------
1136print_event(Dev, {in, Msg}, Name) ->
1137    case Msg of
1138        {'$gen_call', {From, _Tag}, Call} ->
1139            io:format(Dev, "*DBG* ~p got call ~p from ~w~n",
1140                      [Name, Call, From]);
1141        {'$gen_cast', Cast} ->
1142            io:format(Dev, "*DBG* ~p got cast ~p~n",
1143                      [Name, Cast]);
1144        _ ->
1145            io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg])
1146    end;
1147print_event(Dev, {out, Msg, To, State}, Name) ->
1148    io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n",
1149              [Name, Msg, To, State]);
1150print_event(Dev, {noreply, State}, Name) ->
1151    io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]);
1152print_event(Dev, Event, Name) ->
1153    io:format(Dev, "*DBG* ~p dbg  ~p~n", [Name, Event]).
1154
1155
1156%%% ---------------------------------------------------
1157%%% Terminate the server.
1158%%% ---------------------------------------------------
1159
1160-spec terminate(_, _, _) -> no_return().
1161
1162terminate(Reason, Msg, #gs2_state { name  = Name,
1163                                    mod   = Mod,
1164                                    state = State,
1165                                    debug = Debug,
1166                                    stop_stats_fun = StopStatsFun
1167                                    } = GS2State) ->
1168    StopStatsFun(stop_stats_timer(GS2State)),
1169    case catch Mod:terminate(Reason, State) of
1170        {'EXIT', R} ->
1171            error_info(R, Reason, Name, Msg, State, Debug),
1172            exit(R);
1173        _ ->
1174            case Reason of
1175                normal ->
1176                    exit(normal);
1177                shutdown ->
1178                    exit(shutdown);
1179                {shutdown,_}=Shutdown ->
1180                    exit(Shutdown);
1181                _ ->
1182                    error_info(Reason, undefined, Name, Msg, State, Debug),
1183                    exit(Reason)
1184            end
1185    end.
1186
1187error_info(_Reason, _RootCause, application_controller, _Msg, _State, _Debug) ->
1188    %% OTP-5811 Don't send an error report if it's the system process
1189    %% application_controller which is terminating - let init take care
1190    %% of it instead
1191    ok;
1192error_info(Reason, RootCause, Name, Msg, State, Debug) ->
1193    Reason1 = error_reason(Reason),
1194    Fmt =
1195        "** Generic server ~p terminating~n"
1196        "** Last message in was ~p~n"
1197        "** When Server state == ~p~n"
1198        "** Reason for termination == ~n** ~p~n",
1199    case RootCause of
1200        undefined -> format(Fmt, [Name, Msg, State, Reason1]);
1201        _         -> format(Fmt ++ "** In 'terminate' callback "
1202                            "with reason ==~n** ~p~n",
1203                            [Name, Msg, State, Reason1,
1204                             error_reason(RootCause)])
1205    end,
1206    sys:print_log(Debug),
1207    ok.
1208
1209error_reason({undef,[{M,F,A}|MFAs]} = Reason) ->
1210    case code:is_loaded(M) of
1211        false -> {'module could not be loaded',[{M,F,A}|MFAs]};
1212        _     -> case erlang:function_exported(M, F, length(A)) of
1213                     true  -> Reason;
1214                     false -> {'function not exported',[{M,F,A}|MFAs]}
1215                 end
1216    end;
1217error_reason(Reason) ->
1218    Reason.
1219
1220%%% ---------------------------------------------------
1221%%% Misc. functions.
1222%%% ---------------------------------------------------
1223
1224opt(Op, [{Op, Value}|_]) ->
1225    {ok, Value};
1226opt(Op, [_|Options]) ->
1227    opt(Op, Options);
1228opt(_, []) ->
1229    false.
1230
1231debug_options(Name, Opts) ->
1232    case opt(debug, Opts) of
1233        {ok, Options} -> dbg_options(Name, Options);
1234        _ -> dbg_options(Name, [])
1235    end.
1236
1237dbg_options(Name, []) ->
1238    Opts =
1239        case init:get_argument(generic_debug) of
1240            error ->
1241                [];
1242            _ ->
1243                [log, statistics]
1244        end,
1245    dbg_opts(Name, Opts);
1246dbg_options(Name, Opts) ->
1247    dbg_opts(Name, Opts).
1248
1249dbg_opts(Name, Opts) ->
1250    case catch sys:debug_options(Opts) of
1251        {'EXIT',_} ->
1252            format("~p: ignoring erroneous debug options - ~p~n",
1253                   [Name, Opts]),
1254            [];
1255        Dbg ->
1256            Dbg
1257    end.
1258
1259get_proc_name(Pid) when is_pid(Pid) ->
1260    Pid;
1261get_proc_name({local, Name}) ->
1262    case process_info(self(), registered_name) of
1263        {registered_name, Name} ->
1264            Name;
1265        {registered_name, _Name} ->
1266            exit(process_not_registered);
1267        [] ->
1268            exit(process_not_registered)
1269    end;
1270get_proc_name({global, Name}) ->
1271    case whereis_name(Name) of
1272        undefined ->
1273            exit(process_not_registered_globally);
1274        Pid when Pid =:= self() ->
1275            Name;
1276        _Pid ->
1277            exit(process_not_registered_globally)
1278    end.
1279
1280get_parent() ->
1281    case get('$ancestors') of
1282        [Parent | _] when is_pid(Parent)->
1283            Parent;
1284        [Parent | _] when is_atom(Parent)->
1285            name_to_pid(Parent);
1286        _ ->
1287            exit(process_was_not_started_by_proc_lib)
1288    end.
1289
1290name_to_pid(Name) ->
1291    case whereis(Name) of
1292        undefined ->
1293            case whereis_name(Name) of
1294                undefined ->
1295                    exit(could_not_find_registered_name);
1296                Pid ->
1297                    Pid
1298            end;
1299        Pid ->
1300            Pid
1301    end.
1302
1303whereis_name(Name) ->
1304    case ets:lookup(global_names, Name) of
1305    [{_Name, Pid, _Method, _RPid, _Ref}] ->
1306        if node(Pid) == node() ->
1307            case is_process_alive(Pid) of
1308            true  -> Pid;
1309            false -> undefined
1310            end;
1311           true ->
1312            Pid
1313        end;
1314    [] -> undefined
1315    end.
1316
1317find_prioritisers(GS2State = #gs2_state { mod = Mod }) ->
1318    PCall = function_exported_or_default(Mod, 'prioritise_call', 4,
1319                                         fun (_Msg, _From, _State) -> 0 end),
1320    PCast = function_exported_or_default(Mod, 'prioritise_cast', 3,
1321                                         fun (_Msg, _State) -> 0 end),
1322    PInfo = function_exported_or_default(Mod, 'prioritise_info', 3,
1323                                         fun (_Msg, _State) -> 0 end),
1324    GS2State #gs2_state { prioritisers = {PCall, PCast, PInfo} }.
1325
1326function_exported_or_default(Mod, Fun, Arity, Default) ->
1327    case erlang:function_exported(Mod, Fun, Arity) of
1328        true -> case Arity of
1329                    3 -> fun (Msg, GS2State = #gs2_state { queue = Queue,
1330                                                           state = State }) ->
1331                                 Length = priority_queue:len(Queue),
1332                                 case catch Mod:Fun(Msg, Length, State) of
1333                                     drop ->
1334                                         drop;
1335                                     Res when is_integer(Res) ->
1336                                         Res;
1337                                     Err ->
1338                                         handle_common_termination(Err, Msg, GS2State)
1339                                 end
1340                         end;
1341                    4 -> fun (Msg, From, GS2State = #gs2_state { queue = Queue,
1342                                                                 state = State }) ->
1343                                 Length = priority_queue:len(Queue),
1344                                 case catch Mod:Fun(Msg, From, Length, State) of
1345                                     Res when is_integer(Res) ->
1346                                         Res;
1347                                     Err ->
1348                                         handle_common_termination(Err, Msg, GS2State)
1349                                 end
1350                         end
1351                end;
1352        false -> Default
1353    end.
1354
1355%%-----------------------------------------------------------------
1356%% Status information
1357%%-----------------------------------------------------------------
1358format_status(Opt, StatusData) ->
1359    [PDict, SysState, Parent, Debug,
1360     #gs2_state{name = Name, state = State, mod = Mod, queue = Queue}] =
1361        StatusData,
1362    NameTag = if is_pid(Name) ->
1363                      pid_to_list(Name);
1364                 is_atom(Name) ->
1365                      Name
1366              end,
1367    Header = lists:concat(["Status for generic server ", NameTag]),
1368    Log = sys:get_log(Debug),
1369    Specfic = callback(Mod, format_status, [Opt, [PDict, State]],
1370                       fun () -> [{data, [{"State", State}]}] end),
1371    Messages = callback(Mod, format_message_queue, [Opt, Queue],
1372                        fun () -> priority_queue:to_list(Queue) end),
1373    [{header, Header},
1374     {data, [{"Status", SysState},
1375             {"Parent", Parent},
1376             {"Logged events", Log},
1377             {"Queued messages", Messages}]} |
1378     Specfic].
1379
1380callback(Mod, FunName, Args, DefaultThunk) ->
1381    case erlang:function_exported(Mod, FunName, length(Args)) of
1382        true  -> case catch apply(Mod, FunName, Args) of
1383                     {'EXIT', _} -> DefaultThunk();
1384                     Success     -> Success
1385                 end;
1386        false -> DefaultThunk()
1387    end.
1388
1389stats_funs() ->
1390    case ets:info(gen_server2_metrics) of
1391        undefined ->
1392            {fun(GS2State) -> GS2State end,
1393             fun(GS2State) -> GS2State end};
1394        _ ->
1395            {fun emit_stats/1, fun stop_stats/1}
1396    end.
1397
1398init_stats(State = #gs2_state{ emit_stats_fun = EmitStatsFun }) ->
1399    StateWithInitTimer = rabbit_event:init_stats_timer(State, #gs2_state.timer),
1400    next_stats_timer(EmitStatsFun(StateWithInitTimer)).
1401
1402next_stats_timer(State) ->
1403    ensure_stats_timer(rabbit_event:reset_stats_timer(State, #gs2_state.timer)).
1404
1405ensure_stats_timer(State) ->
1406    rabbit_event:ensure_stats_timer(State,
1407                                    #gs2_state.timer,
1408                                    emit_gen_server2_stats).
1409
1410stop_stats_timer(State) ->
1411    rabbit_event:stop_stats_timer(State, #gs2_state.timer).
1412
1413emit_stats(State = #gs2_state{queue = Queue}) ->
1414    rabbit_core_metrics:gen_server2_stats(self(), priority_queue:len(Queue)),
1415    State.
1416
1417stop_stats(State) ->
1418    rabbit_core_metrics:gen_server2_deleted(self()),
1419    State.
1420