1%% This file is a copy of gen_server.erl from the R13B-1 Erlang/OTP 2%% distribution, with the following modifications: 3%% 4%% 1) the module name is gen_server2 5%% 6%% 2) more efficient handling of selective receives in callbacks 7%% gen_server2 processes drain their message queue into an internal 8%% buffer before invoking any callback module functions. Messages are 9%% dequeued from the buffer for processing. Thus the effective message 10%% queue of a gen_server2 process is the concatenation of the internal 11%% buffer and the real message queue. 12%% As a result of the draining, any selective receive invoked inside a 13%% callback is less likely to have to scan a large message queue. 14%% 15%% 3) gen_server2:cast is guaranteed to be order-preserving 16%% The original code could reorder messages when communicating with a 17%% process on a remote node that was not currently connected. 18%% 19%% 4) The callback module can optionally implement prioritise_call/4, 20%% prioritise_cast/3 and prioritise_info/3. These functions take 21%% Message, From, Length and State or just Message, Length and State 22%% (where Length is the current number of messages waiting to be 23%% processed) and return a single integer representing the priority 24%% attached to the message, or 'drop' to ignore it (for 25%% prioritise_cast/3 and prioritise_info/3 only). Messages with 26%% higher priorities are processed before requests with lower 27%% priorities. The default priority is 0. 28%% 29%% 5) The callback module can optionally implement 30%% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be 31%% called immediately prior to and post hibernation, respectively. If 32%% handle_pre_hibernate returns {hibernate, NewState} then the process 33%% will hibernate. If the module does not implement 34%% handle_pre_hibernate/1 then the default action is to hibernate. 35%% 36%% 6) init can return a 4th arg, {backoff, InitialTimeout, 37%% MinimumTimeout, DesiredHibernatePeriod} (all in milliseconds, 38%% 'infinity' does not make sense here). Then, on all callbacks which 39%% can return a timeout (including init), timeout can be 40%% 'hibernate'. When this is the case, the current timeout value will 41%% be used (initially, the InitialTimeout supplied from init). After 42%% this timeout has occurred, hibernation will occur as normal. Upon 43%% awaking, a new current timeout value will be calculated. 44%% 45%% The purpose is that the gen_server2 takes care of adjusting the 46%% current timeout value such that the process will increase the 47%% timeout value repeatedly if it is unable to sleep for the 48%% DesiredHibernatePeriod. If it is able to sleep for the 49%% DesiredHibernatePeriod it will decrease the current timeout down to 50%% the MinimumTimeout, so that the process is put to sleep sooner (and 51%% hopefully stays asleep for longer). In short, should a process 52%% using this receive a burst of messages, it should not hibernate 53%% between those messages, but as the messages become less frequent, 54%% the process will not only hibernate, it will do so sooner after 55%% each message. 56%% 57%% When using this backoff mechanism, normal timeout values (i.e. not 58%% 'hibernate') can still be used, and if they are used then the 59%% handle_info(timeout, State) will be called as normal. In this case, 60%% returning 'hibernate' from handle_info(timeout, State) will not 61%% hibernate the process immediately, as it would if backoff wasn't 62%% being used. Instead it'll wait for the current timeout as described 63%% above. 64%% 65%% 7) The callback module can return from any of the handle_* 66%% functions, a {become, Module, State} triple, or a {become, Module, 67%% State, Timeout} quadruple. This allows the gen_server to 68%% dynamically change the callback module. The State is the new state 69%% which will be passed into any of the callback functions in the new 70%% module. Note there is no form also encompassing a reply, thus if 71%% you wish to reply in handle_call/3 and change the callback module, 72%% you need to use gen_server2:reply/2 to issue the reply 73%% manually. The init function can similarly return a 5th argument, 74%% Module, in order to dynamically decide the callback module on init. 75%% 76%% 8) The callback module can optionally implement 77%% format_message_queue/2 which is the equivalent of format_status/2 78%% but where the second argument is specifically the priority_queue 79%% which contains the prioritised message_queue. 80%% 81%% 9) The function with_state/2 can be used to debug a process with 82%% heavyweight state (without needing to copy the entire state out of 83%% process as sys:get_status/1 would). Pass through a function which 84%% can be invoked on the state, get back the result. The state is not 85%% modified. 86%% 87%% 10) an mcall/1 function has been added for performing multiple 88%% call/3 in parallel. Unlike multi_call, which sends the same request 89%% to same-named processes residing on a supplied list of nodes, it 90%% operates on name/request pairs, where name is anything accepted by 91%% call/3, i.e. a pid, global name, local name, or local name on a 92%% particular node. 93%% 94%% 11) Internal buffer length is emitted as a core [RabbitMQ] metric. 95 96%% All modifications are (C) 2009-2021 VMware, Inc. or its affiliates. 97 98%% ``The contents of this file are subject to the Erlang Public License, 99%% Version 1.1, (the "License"); you may not use this file except in 100%% compliance with the License. You should have received a copy of the 101%% Erlang Public License along with this software. If not, it can be 102%% retrieved via the world wide web at https://www.erlang.org/. 103%% 104%% Software distributed under the License is distributed on an "AS IS" 105%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See 106%% the License for the specific language governing rights and limitations 107%% under the License. 108%% 109%% The Initial Developer of the Original Code is Ericsson Utvecklings AB. 110%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings 111%% AB. All Rights Reserved.'' 112%% 113%% $Id$ 114%% 115-module(gen_server2). 116 117-ifdef(OTP_RELEASE). 118-if(?OTP_RELEASE >= 22). 119-compile(nowarn_deprecated_function). 120-endif. 121-endif. 122 123%%% --------------------------------------------------- 124%%% 125%%% The idea behind THIS server is that the user module 126%%% provides (different) functions to handle different 127%%% kind of inputs. 128%%% If the Parent process terminates the Module:terminate/2 129%%% function is called. 130%%% 131%%% The user module should export: 132%%% 133%%% init(Args) 134%%% ==> {ok, State} 135%%% {ok, State, Timeout} 136%%% {ok, State, Timeout, Backoff} 137%%% {ok, State, Timeout, Backoff, Module} 138%%% ignore 139%%% {stop, Reason} 140%%% 141%%% handle_call(Msg, {From, Tag}, State) 142%%% 143%%% ==> {reply, Reply, State} 144%%% {reply, Reply, State, Timeout} 145%%% {noreply, State} 146%%% {noreply, State, Timeout} 147%%% {stop, Reason, Reply, State} 148%%% Reason = normal | shutdown | Term terminate(State) is called 149%%% 150%%% handle_cast(Msg, State) 151%%% 152%%% ==> {noreply, State} 153%%% {noreply, State, Timeout} 154%%% {stop, Reason, State} 155%%% Reason = normal | shutdown | Term terminate(State) is called 156%%% 157%%% handle_info(Info, State) Info is e.g. {'EXIT', P, R}, {nodedown, N}, ... 158%%% 159%%% ==> {noreply, State} 160%%% {noreply, State, Timeout} 161%%% {stop, Reason, State} 162%%% Reason = normal | shutdown | Term, terminate(State) is called 163%%% 164%%% terminate(Reason, State) Let the user module clean up 165%%% Reason = normal | shutdown | {shutdown, Term} | Term 166%%% always called when server terminates 167%%% 168%%% ==> ok | Term 169%%% 170%%% handle_pre_hibernate(State) 171%%% 172%%% ==> {hibernate, State} 173%%% {stop, Reason, State} 174%%% Reason = normal | shutdown | Term, terminate(State) is called 175%%% 176%%% handle_post_hibernate(State) 177%%% 178%%% ==> {noreply, State} 179%%% {stop, Reason, State} 180%%% Reason = normal | shutdown | Term, terminate(State) is called 181%%% 182%%% The work flow (of the server) can be described as follows: 183%%% 184%%% User module Generic 185%%% ----------- ------- 186%%% start -----> start 187%%% init <----- . 188%%% 189%%% loop 190%%% handle_call <----- . 191%%% -----> reply 192%%% 193%%% handle_cast <----- . 194%%% 195%%% handle_info <----- . 196%%% 197%%% terminate <----- . 198%%% 199%%% -----> reply 200%%% 201%%% 202%%% --------------------------------------------------- 203 204%% API 205-export([start/3, start/4, 206 start_link/3, start_link/4, 207 stop/1, stop/3, 208 call/2, call/3, 209 cast/2, reply/2, 210 abcast/2, abcast/3, 211 multi_call/2, multi_call/3, multi_call/4, 212 mcall/1, 213 with_state/2, 214 enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/1]). 215 216%% System exports 217-export([system_continue/3, 218 system_terminate/4, 219 system_code_change/4, 220 format_status/2]). 221 222%% Internal exports 223-export([init_it/6]). 224 225-import(error_logger, [format/2]). 226 227%% State record 228-record(gs2_state, {parent, name, state, mod, time, 229 timeout_state, queue, debug, prioritisers, 230 timer, emit_stats_fun, stop_stats_fun}). 231 232%%%========================================================================= 233%%% Specs. These exist only to shut up dialyzer's warnings 234%%%========================================================================= 235 236-type gs2_state() :: #gs2_state{}. 237 238-spec handle_common_termination(any(), atom(), gs2_state()) -> no_return(). 239-spec hibernate(gs2_state()) -> no_return(). 240-spec pre_hibernate(gs2_state()) -> no_return(). 241-spec system_terminate(_, _, _, gs2_state()) -> no_return(). 242 243-type millis() :: non_neg_integer(). 244 245-dialyzer({nowarn_function, do_multi_call/4}). 246 247%%%========================================================================= 248%%% API 249%%%========================================================================= 250 251-callback init(Args :: term()) -> 252 {ok, State :: term()} | 253 {ok, State :: term(), timeout() | hibernate} | 254 {ok, State :: term(), timeout() | hibernate, 255 {backoff, millis(), millis(), millis()}} | 256 {ok, State :: term(), timeout() | hibernate, 257 {backoff, millis(), millis(), millis()}, atom()} | 258 ignore | 259 {stop, Reason :: term()}. 260-callback handle_call(Request :: term(), From :: {pid(), Tag :: term()}, 261 State :: term()) -> 262 {reply, Reply :: term(), NewState :: term()} | 263 {reply, Reply :: term(), NewState :: term(), timeout() | hibernate} | 264 {noreply, NewState :: term()} | 265 {noreply, NewState :: term(), timeout() | hibernate} | 266 {stop, Reason :: term(), 267 Reply :: term(), NewState :: term()}. 268-callback handle_cast(Request :: term(), State :: term()) -> 269 {noreply, NewState :: term()} | 270 {noreply, NewState :: term(), timeout() | hibernate} | 271 {stop, Reason :: term(), NewState :: term()}. 272-callback handle_info(Info :: term(), State :: term()) -> 273 {noreply, NewState :: term()} | 274 {noreply, NewState :: term(), timeout() | hibernate} | 275 {stop, Reason :: term(), NewState :: term()}. 276-callback terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), 277 State :: term()) -> 278 ok | term(). 279-callback code_change(OldVsn :: (term() | {down, term()}), State :: term(), 280 Extra :: term()) -> 281 {ok, NewState :: term()} | {error, Reason :: term()}. 282 283%% It's not possible to define "optional" -callbacks, so putting specs 284%% for handle_pre_hibernate/1 and handle_post_hibernate/1 will result 285%% in warnings (the same applied for the behaviour_info before). 286 287%%% ----------------------------------------------------------------- 288%%% Starts a generic server. 289%%% start(Mod, Args, Options) 290%%% start(Name, Mod, Args, Options) 291%%% start_link(Mod, Args, Options) 292%%% start_link(Name, Mod, Args, Options) where: 293%%% Name ::= {local, atom()} | {global, atom()} 294%%% Mod ::= atom(), callback module implementing the 'real' server 295%%% Args ::= term(), init arguments (to Mod:init/1) 296%%% Options ::= [{timeout, Timeout} | {debug, [Flag]}] 297%%% Flag ::= trace | log | {logfile, File} | statistics | debug 298%%% (debug == log && statistics) 299%%% Returns: {ok, Pid} | 300%%% {error, {already_started, Pid}} | 301%%% {error, Reason} 302%%% ----------------------------------------------------------------- 303start(Mod, Args, Options) -> 304 gen:start(?MODULE, nolink, Mod, Args, Options). 305 306start(Name, Mod, Args, Options) -> 307 gen:start(?MODULE, nolink, Name, Mod, Args, Options). 308 309start_link(Mod, Args, Options) -> 310 gen:start(?MODULE, link, Mod, Args, Options). 311 312start_link(Name, Mod, Args, Options) -> 313 gen:start(?MODULE, link, Name, Mod, Args, Options). 314 315%% ----------------------------------------------------------------- 316%% Stop a generic server and wait for it to terminate. 317%% If the server is located at another node, that node will 318%% be monitored. 319%% ----------------------------------------------------------------- 320stop(Name) -> 321 gen:stop(Name). 322 323stop(Name, Reason, Timeout) -> 324 gen:stop(Name, Reason, Timeout). 325 326%% ----------------------------------------------------------------- 327%% Make a call to a generic server. 328%% If the server is located at another node, that node will 329%% be monitored. 330%% If the client is trapping exits and is linked server termination 331%% is handled here (? Shall we do that here (or rely on timeouts) ?). 332%% ----------------------------------------------------------------- 333call(Name, Request) -> 334 case catch gen:call(Name, '$gen_call', Request) of 335 {ok,Res} -> 336 Res; 337 {'EXIT',Reason} -> 338 exit({Reason, {?MODULE, call, [Name, Request]}}) 339 end. 340 341call(Name, Request, Timeout) -> 342 case catch gen:call(Name, '$gen_call', Request, Timeout) of 343 {ok,Res} -> 344 Res; 345 {'EXIT',Reason} -> 346 exit({Reason, {?MODULE, call, [Name, Request, Timeout]}}) 347 end. 348 349%% ----------------------------------------------------------------- 350%% Make a cast to a generic server. 351%% ----------------------------------------------------------------- 352cast({global,Name}, Request) -> 353 catch global:send(Name, {'$gen_cast', Request}), 354 ok; 355cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) -> 356 catch (Dest ! {'$gen_cast', Request}), 357 ok; 358cast(Dest, Request) when is_atom(Dest); is_pid(Dest) -> 359 catch (Dest ! {'$gen_cast', Request}), 360 ok. 361 362%% ----------------------------------------------------------------- 363%% Send a reply to the client. 364%% ----------------------------------------------------------------- 365reply({To, Tag}, Reply) -> 366 catch To ! {Tag, Reply}. 367 368%% ----------------------------------------------------------------- 369%% Asynchronous broadcast, returns nothing, it's just send'n pray 370%% ----------------------------------------------------------------- 371abcast(Name, Request) when is_atom(Name) -> 372 do_abcast([node() | nodes()], Name, {'$gen_cast', Request}). 373 374abcast(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) -> 375 do_abcast(Nodes, Name, {'$gen_cast', Request}). 376 377do_abcast([Node|Nodes], Name, Msg) when is_atom(Node) -> 378 catch ({Name, Node} ! Msg), 379 do_abcast(Nodes, Name, Msg); 380do_abcast([], _,_) -> abcast. 381 382%%% ----------------------------------------------------------------- 383%%% Make a call to servers at several nodes. 384%%% Returns: {[Replies],[BadNodes]} 385%%% A Timeout can be given 386%%% 387%%% A middleman process is used in case late answers arrives after 388%%% the timeout. If they would be allowed to glog the callers message 389%%% queue, it would probably become confused. Late answers will 390%%% now arrive to the terminated middleman and so be discarded. 391%%% ----------------------------------------------------------------- 392multi_call(Name, Req) 393 when is_atom(Name) -> 394 do_multi_call([node() | nodes()], Name, Req, infinity). 395 396multi_call(Nodes, Name, Req) 397 when is_list(Nodes), is_atom(Name) -> 398 do_multi_call(Nodes, Name, Req, infinity). 399 400multi_call(Nodes, Name, Req, infinity) -> 401 do_multi_call(Nodes, Name, Req, infinity); 402multi_call(Nodes, Name, Req, Timeout) 403 when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 -> 404 do_multi_call(Nodes, Name, Req, Timeout). 405 406%%% ----------------------------------------------------------------- 407%%% Make multiple calls to multiple servers, given pairs of servers 408%%% and messages. 409%%% Returns: {[{Dest, Reply}], [{Dest, Error}]} 410%%% 411%%% Dest can be pid() | RegName :: atom() | 412%%% {Name :: atom(), Node :: atom()} | {global, Name :: atom()} 413%%% 414%%% A middleman process is used to avoid clogging up the callers 415%%% message queue. 416%%% ----------------------------------------------------------------- 417mcall(CallSpecs) -> 418 Tag = make_ref(), 419 {_, MRef} = spawn_monitor( 420 fun() -> 421 Refs = lists:foldl( 422 fun ({Dest, _Request}=S, Dict) -> 423 dict:store(do_mcall(S), Dest, Dict) 424 end, dict:new(), CallSpecs), 425 collect_replies(Tag, Refs, [], []) 426 end), 427 receive 428 {'DOWN', MRef, _, _, {Tag, Result}} -> Result; 429 {'DOWN', MRef, _, _, Reason} -> exit(Reason) 430 end. 431 432do_mcall({{global,Name}=Dest, Request}) -> 433 %% whereis_name is simply an ets lookup, and is precisely what 434 %% global:send/2 does, yet we need a Ref to put in the call to the 435 %% server, so invoking whereis_name makes a lot more sense here. 436 case global:whereis_name(Name) of 437 Pid when is_pid(Pid) -> 438 MRef = erlang:monitor(process, Pid), 439 catch msend(Pid, MRef, Request), 440 MRef; 441 undefined -> 442 Ref = make_ref(), 443 self() ! {'DOWN', Ref, process, Dest, noproc}, 444 Ref 445 end; 446do_mcall({{Name,Node}=Dest, Request}) when is_atom(Name), is_atom(Node) -> 447 {_Node, MRef} = start_monitor(Node, Name), %% NB: we don't handle R6 448 catch msend(Dest, MRef, Request), 449 MRef; 450do_mcall({Dest, Request}) when is_atom(Dest); is_pid(Dest) -> 451 MRef = erlang:monitor(process, Dest), 452 catch msend(Dest, MRef, Request), 453 MRef. 454 455msend(Dest, MRef, Request) -> 456 erlang:send(Dest, {'$gen_call', {self(), MRef}, Request}, [noconnect]). 457 458collect_replies(Tag, Refs, Replies, Errors) -> 459 case dict:size(Refs) of 460 0 -> exit({Tag, {Replies, Errors}}); 461 _ -> receive 462 {MRef, Reply} -> 463 {Refs1, Replies1} = handle_call_result(MRef, Reply, 464 Refs, Replies), 465 collect_replies(Tag, Refs1, Replies1, Errors); 466 {'DOWN', MRef, _, _, Reason} -> 467 Reason1 = case Reason of 468 noconnection -> nodedown; 469 _ -> Reason 470 end, 471 {Refs1, Errors1} = handle_call_result(MRef, Reason1, 472 Refs, Errors), 473 collect_replies(Tag, Refs1, Replies, Errors1) 474 end 475 end. 476 477handle_call_result(MRef, Result, Refs, AccList) -> 478 %% we avoid the mailbox scanning cost of a call to erlang:demonitor/{1,2} 479 %% here, so we must cope with MRefs that we've already seen and erased 480 case dict:find(MRef, Refs) of 481 {ok, Pid} -> {dict:erase(MRef, Refs), [{Pid, Result}|AccList]}; 482 _ -> {Refs, AccList} 483 end. 484 485%% ----------------------------------------------------------------- 486%% Apply a function to a generic server's state. 487%% ----------------------------------------------------------------- 488with_state(Name, Fun) -> 489 case catch gen:call(Name, '$with_state', Fun, infinity) of 490 {ok,Res} -> 491 Res; 492 {'EXIT',Reason} -> 493 exit({Reason, {?MODULE, with_state, [Name, Fun]}}) 494 end. 495 496%%----------------------------------------------------------------- 497%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_ 498%% 499%% Description: Makes an existing process into a gen_server. 500%% The calling process will enter the gen_server receive 501%% loop and become a gen_server process. 502%% The process *must* have been started using one of the 503%% start functions in proc_lib, see proc_lib(3). 504%% The user is responsible for any initialization of the 505%% process, including registering a name for it. 506%%----------------------------------------------------------------- 507enter_loop(Mod, Options, State) -> 508 enter_loop(Mod, Options, State, self(), infinity, undefined). 509 510enter_loop(Mod, Options, State, Backoff = {backoff, _, _ , _}) -> 511 enter_loop(Mod, Options, State, self(), infinity, Backoff); 512 513enter_loop(Mod, Options, State, ServerName = {_, _}) -> 514 enter_loop(Mod, Options, State, ServerName, infinity, undefined); 515 516enter_loop(Mod, Options, State, Timeout) -> 517 enter_loop(Mod, Options, State, self(), Timeout, undefined). 518 519enter_loop(Mod, Options, State, ServerName, Backoff = {backoff, _, _, _}) -> 520 enter_loop(Mod, Options, State, ServerName, infinity, Backoff); 521 522enter_loop(Mod, Options, State, ServerName, Timeout) -> 523 enter_loop(Mod, Options, State, ServerName, Timeout, undefined). 524 525enter_loop(Mod, Options, State, ServerName, Timeout, Backoff) -> 526 Name = get_proc_name(ServerName), 527 Parent = get_parent(), 528 Debug = debug_options(Name, Options), 529 Queue = priority_queue:new(), 530 Backoff1 = extend_backoff(Backoff), 531 {EmitStatsFun, StopStatsFun} = stats_funs(), 532 loop(init_stats(find_prioritisers( 533 #gs2_state { parent = Parent, name = Name, state = State, 534 mod = Mod, time = Timeout, timeout_state = Backoff1, 535 queue = Queue, debug = Debug, 536 emit_stats_fun = EmitStatsFun, 537 stop_stats_fun = StopStatsFun }))). 538 539%%%======================================================================== 540%%% Gen-callback functions 541%%%======================================================================== 542 543%%% --------------------------------------------------- 544%%% Initiate the new process. 545%%% Register the name using the Rfunc function 546%%% Calls the Mod:init/Args function. 547%%% Finally an acknowledge is sent to Parent and the main 548%%% loop is entered. 549%%% --------------------------------------------------- 550init_it(Starter, self, Name, Mod, Args, Options) -> 551 init_it(Starter, self(), Name, Mod, Args, Options); 552init_it(Starter, Parent, Name0, Mod, Args, Options) -> 553 Name = name(Name0), 554 Debug = debug_options(Name, Options), 555 Queue = priority_queue:new(), 556 {EmitStatsFun, StopStatsFun} = stats_funs(), 557 GS2State = find_prioritisers( 558 #gs2_state { parent = Parent, 559 name = Name, 560 mod = Mod, 561 queue = Queue, 562 debug = Debug, 563 emit_stats_fun = EmitStatsFun, 564 stop_stats_fun = StopStatsFun }), 565 case catch Mod:init(Args) of 566 {ok, State} -> 567 proc_lib:init_ack(Starter, {ok, self()}), 568 loop(init_stats(GS2State#gs2_state { state = State, 569 time = infinity, 570 timeout_state = undefined })); 571 {ok, State, Timeout} -> 572 proc_lib:init_ack(Starter, {ok, self()}), 573 loop(init_stats( 574 GS2State#gs2_state { state = State, 575 time = Timeout, 576 timeout_state = undefined })); 577 {ok, State, Timeout, Backoff = {backoff, _, _, _}} -> 578 Backoff1 = extend_backoff(Backoff), 579 proc_lib:init_ack(Starter, {ok, self()}), 580 loop(init_stats(GS2State#gs2_state { state = State, 581 time = Timeout, 582 timeout_state = Backoff1 })); 583 {ok, State, Timeout, Backoff = {backoff, _, _, _}, Mod1} -> 584 Backoff1 = extend_backoff(Backoff), 585 proc_lib:init_ack(Starter, {ok, self()}), 586 loop(init_stats(find_prioritisers( 587 GS2State#gs2_state { mod = Mod1, 588 state = State, 589 time = Timeout, 590 timeout_state = Backoff1 }))); 591 {stop, Reason} -> 592 %% For consistency, we must make sure that the 593 %% registered name (if any) is unregistered before 594 %% the parent process is notified about the failure. 595 %% (Otherwise, the parent process could get 596 %% an 'already_started' error if it immediately 597 %% tried starting the process again.) 598 unregister_name(Name0), 599 proc_lib:init_ack(Starter, {error, Reason}), 600 exit(Reason); 601 ignore -> 602 unregister_name(Name0), 603 proc_lib:init_ack(Starter, ignore), 604 exit(normal); 605 {'EXIT', Reason} -> 606 unregister_name(Name0), 607 proc_lib:init_ack(Starter, {error, Reason}), 608 exit(Reason); 609 Else -> 610 Error = {bad_return_value, Else}, 611 proc_lib:init_ack(Starter, {error, Error}), 612 exit(Error) 613 end. 614 615name({local,Name}) -> Name; 616name({global,Name}) -> Name; 617%% name(Pid) when is_pid(Pid) -> Pid; 618%% when R12 goes away, drop the line beneath and uncomment the line above 619name(Name) -> Name. 620 621unregister_name({local,Name}) -> 622 _ = (catch unregister(Name)); 623unregister_name({global,Name}) -> 624 _ = global:unregister_name(Name); 625unregister_name(Pid) when is_pid(Pid) -> 626 Pid; 627%% Under R12 let's just ignore it, as we have a single term as Name. 628%% On R13 it will never get here, as we get tuple with 'local/global' atom. 629unregister_name(_Name) -> ok. 630 631extend_backoff(undefined) -> 632 undefined; 633extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) -> 634 {backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod, 635 rand:seed(exsplus)}. 636 637%%%======================================================================== 638%%% Internal functions 639%%%======================================================================== 640%%% --------------------------------------------------- 641%%% The MAIN loop. 642%%% --------------------------------------------------- 643loop(GS2State = #gs2_state { time = hibernate, 644 timeout_state = undefined, 645 queue = Queue }) -> 646 case priority_queue:is_empty(Queue) of 647 true -> 648 pre_hibernate(GS2State); 649 false -> 650 process_next_msg(GS2State) 651 end; 652 653loop(GS2State) -> 654 process_next_msg(drain(GS2State)). 655 656drain(GS2State) -> 657 receive 658 Input -> drain(in(Input, GS2State)) 659 after 0 -> GS2State 660 end. 661 662process_next_msg(GS2State0 = #gs2_state { time = Time, 663 timeout_state = TimeoutState, 664 queue = Queue }) -> 665 case priority_queue:out(Queue) of 666 {{value, Msg}, Queue1} -> 667 GS2State = ensure_stats_timer(GS2State0), 668 process_msg(Msg, GS2State#gs2_state { queue = Queue1 }); 669 {empty, Queue1} -> 670 {Time1, HibOnTimeout, GS2State} 671 = case {Time, TimeoutState} of 672 {hibernate, {backoff, Current, _Min, _Desired, _RSt}} -> 673 {Current, true, stop_stats_timer(GS2State0)}; 674 {hibernate, _} -> 675 %% wake_hib/7 will set Time to hibernate. If 676 %% we were woken and didn't receive a msg 677 %% then we will get here and need a sensible 678 %% value for Time1, otherwise we crash. 679 %% R13B1 always waits infinitely when waking 680 %% from hibernation, so that's what we do 681 %% here too. 682 {infinity, false, GS2State0}; 683 _ -> {Time, false, GS2State0} 684 end, 685 receive 686 Input -> 687 %% Time could be 'hibernate' here, so *don't* call loop 688 process_next_msg( 689 drain(in(Input, GS2State #gs2_state { queue = Queue1 }))) 690 after Time1 -> 691 case HibOnTimeout of 692 true -> 693 pre_hibernate( 694 GS2State #gs2_state { queue = Queue1 }); 695 false -> 696 process_msg(timeout, 697 GS2State #gs2_state { queue = Queue1 }) 698 end 699 end 700 end. 701 702wake_hib(GS2State = #gs2_state { timeout_state = TS }) -> 703 TimeoutState1 = case TS of 704 undefined -> 705 undefined; 706 {SleptAt, TimeoutState} -> 707 adjust_timeout_state(SleptAt, 708 erlang:monotonic_time(), 709 TimeoutState) 710 end, 711 post_hibernate( 712 drain(GS2State #gs2_state { timeout_state = TimeoutState1 })). 713 714hibernate(GS2State = #gs2_state { timeout_state = TimeoutState }) -> 715 TS = case TimeoutState of 716 undefined -> undefined; 717 {backoff, _, _, _, _} -> {erlang:monotonic_time(), 718 TimeoutState} 719 end, 720 proc_lib:hibernate(?MODULE, wake_hib, 721 [GS2State #gs2_state { timeout_state = TS }]). 722 723pre_hibernate(GS2State0 = #gs2_state { state = State, 724 mod = Mod, 725 emit_stats_fun = EmitStatsFun }) -> 726 GS2State = EmitStatsFun(stop_stats_timer(GS2State0)), 727 case erlang:function_exported(Mod, handle_pre_hibernate, 1) of 728 true -> 729 case catch Mod:handle_pre_hibernate(State) of 730 {hibernate, NState} -> 731 hibernate(GS2State #gs2_state { state = NState } ); 732 Reply -> 733 handle_common_termination(Reply, pre_hibernate, GS2State) 734 end; 735 false -> 736 hibernate(GS2State) 737 end. 738 739post_hibernate(GS2State0 = #gs2_state { state = State, 740 mod = Mod }) -> 741 GS2State = ensure_stats_timer(GS2State0), 742 case erlang:function_exported(Mod, handle_post_hibernate, 1) of 743 true -> 744 case catch Mod:handle_post_hibernate(State) of 745 {noreply, NState} -> 746 process_next_msg(GS2State #gs2_state { state = NState, 747 time = infinity }); 748 {noreply, NState, Time} -> 749 process_next_msg(GS2State #gs2_state { state = NState, 750 time = Time }); 751 Reply -> 752 handle_common_termination(Reply, post_hibernate, GS2State) 753 end; 754 false -> 755 %% use hibernate here, not infinity. This matches 756 %% R13B. The key is that we should be able to get through 757 %% to process_msg calling sys:handle_system_msg with Time 758 %% still set to hibernate, iff that msg is the very msg 759 %% that woke us up (or the first msg we receive after 760 %% waking up). 761 process_next_msg(GS2State #gs2_state { time = hibernate }) 762 end. 763 764adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO, 765 DesiredHibPeriod, RandomState}) -> 766 NapLengthMicros = erlang:convert_time_unit(AwokeAt - SleptAt, 767 native, micro_seconds), 768 CurrentMicros = CurrentTO * 1000, 769 MinimumMicros = MinimumTO * 1000, 770 DesiredHibMicros = DesiredHibPeriod * 1000, 771 GapBetweenMessagesMicros = NapLengthMicros + CurrentMicros, 772 Base = 773 %% If enough time has passed between the last two messages then we 774 %% should consider sleeping sooner. Otherwise stay awake longer. 775 case GapBetweenMessagesMicros > (MinimumMicros + DesiredHibMicros) of 776 true -> lists:max([MinimumTO, CurrentTO div 2]); 777 false -> CurrentTO 778 end, 779 {Extra, RandomState1} = rand:uniform_s(Base, RandomState), 780 CurrentTO1 = Base + Extra, 781 {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}. 782 783in({'$gen_cast', Msg} = Input, 784 GS2State = #gs2_state { prioritisers = {_, F, _} }) -> 785 in(Input, F(Msg, GS2State), GS2State); 786in({'$gen_call', From, Msg} = Input, 787 GS2State = #gs2_state { prioritisers = {F, _, _} }) -> 788 in(Input, F(Msg, From, GS2State), GS2State); 789in({'$with_state', _From, _Fun} = Input, GS2State) -> 790 in(Input, 0, GS2State); 791in({'EXIT', Parent, _R} = Input, GS2State = #gs2_state { parent = Parent }) -> 792 in(Input, infinity, GS2State); 793in({system, _From, _Req} = Input, GS2State) -> 794 in(Input, infinity, GS2State); 795in(emit_gen_server2_stats, GS2State = #gs2_state{ emit_stats_fun = EmitStatsFun}) -> 796 next_stats_timer(EmitStatsFun(GS2State)); 797in(Input, GS2State = #gs2_state { prioritisers = {_, _, F} }) -> 798 in(Input, F(Input, GS2State), GS2State). 799 800in(_Input, drop, GS2State) -> 801 GS2State; 802 803in(Input, Priority, GS2State = #gs2_state { queue = Queue }) -> 804 GS2State # gs2_state { queue = priority_queue:in(Input, Priority, Queue) }. 805 806process_msg({system, From, Req}, 807 GS2State = #gs2_state { parent = Parent, debug = Debug }) -> 808 case Req of 809 %% This clause will match only in R16B03. 810 %% Since 17.0 replace_state is not a system message. 811 {replace_state, StateFun} -> 812 GS2State1 = StateFun(GS2State), 813 _ = gen:reply(From, GS2State1), 814 system_continue(Parent, Debug, GS2State1); 815 _ -> 816 %% gen_server puts Hib on the end as the 7th arg, but that version 817 %% of the fun seems not to be documented so leaving out for now. 818 sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, GS2State) 819 end; 820process_msg({'$with_state', From, Fun}, 821 GS2State = #gs2_state{state = State}) -> 822 reply(From, catch Fun(State)), 823 loop(GS2State); 824process_msg({'EXIT', Parent, Reason} = Msg, 825 GS2State = #gs2_state { parent = Parent }) -> 826 terminate(Reason, Msg, GS2State); 827process_msg(Msg, GS2State = #gs2_state { debug = [] }) -> 828 handle_msg(Msg, GS2State); 829process_msg(Msg, GS2State = #gs2_state { name = Name, debug = Debug }) -> 830 Debug1 = sys:handle_debug(Debug, fun print_event/3, Name, {in, Msg}), 831 handle_msg(Msg, GS2State #gs2_state { debug = Debug1 }). 832 833%%% --------------------------------------------------- 834%%% Send/recive functions 835%%% --------------------------------------------------- 836 837do_multi_call(Nodes, Name, Req, infinity) -> 838 Tag = make_ref(), 839 Monitors = send_nodes(Nodes, Name, Tag, Req), 840 rec_nodes(Tag, Monitors, Name, undefined); 841do_multi_call(Nodes, Name, Req, Timeout) -> 842 Tag = make_ref(), 843 Caller = self(), 844 Receiver = 845 spawn( 846 fun () -> 847 %% Middleman process. Should be unsensitive to regular 848 %% exit signals. The synchronization is needed in case 849 %% the receiver would exit before the caller started 850 %% the monitor. 851 process_flag(trap_exit, true), 852 Mref = erlang:monitor(process, Caller), 853 receive 854 {Caller,Tag} -> 855 Monitors = send_nodes(Nodes, Name, Tag, Req), 856 TimerId = erlang:start_timer(Timeout, self(), ok), 857 Result = rec_nodes(Tag, Monitors, Name, TimerId), 858 exit({self(),Tag,Result}); 859 {'DOWN',Mref,_,_,_} -> 860 %% Caller died before sending us the go-ahead. 861 %% Give up silently. 862 exit(normal) 863 end 864 end), 865 Mref = erlang:monitor(process, Receiver), 866 Receiver ! {self(),Tag}, 867 receive 868 {'DOWN',Mref,_,_,{Receiver,Tag,Result}} -> 869 Result; 870 {'DOWN',Mref,_,_,Reason} -> 871 %% The middleman code failed. Or someone did 872 %% exit(_, kill) on the middleman process => Reason==killed 873 exit(Reason) 874 end. 875 876send_nodes(Nodes, Name, Tag, Req) -> 877 send_nodes(Nodes, Name, Tag, Req, []). 878 879send_nodes([Node|Tail], Name, Tag, Req, Monitors) 880 when is_atom(Node) -> 881 Monitor = start_monitor(Node, Name), 882 %% Handle non-existing names in rec_nodes. 883 catch {Name, Node} ! {'$gen_call', {self(), {Tag, Node}}, Req}, 884 send_nodes(Tail, Name, Tag, Req, [Monitor | Monitors]); 885send_nodes([_Node|Tail], Name, Tag, Req, Monitors) -> 886 %% Skip non-atom Node 887 send_nodes(Tail, Name, Tag, Req, Monitors); 888send_nodes([], _Name, _Tag, _Req, Monitors) -> 889 Monitors. 890 891%% Against old nodes: 892%% If no reply has been delivered within 2 secs. (per node) check that 893%% the server really exists and wait for ever for the answer. 894%% 895%% Against contemporary nodes: 896%% Wait for reply, server 'DOWN', or timeout from TimerId. 897 898rec_nodes(Tag, Nodes, Name, TimerId) -> 899 rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId). 900 901rec_nodes(Tag, [{N,R}|Tail], Name, Badnodes, Replies, Time, TimerId ) -> 902 receive 903 {'DOWN', R, _, _, _} -> 904 rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId); 905 {{Tag, N}, Reply} -> %% Tag is bound !!! 906 unmonitor(R), 907 rec_nodes(Tag, Tail, Name, Badnodes, 908 [{N,Reply}|Replies], Time, TimerId); 909 {timeout, TimerId, _} -> 910 unmonitor(R), 911 %% Collect all replies that already have arrived 912 rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) 913 end; 914rec_nodes(Tag, [N|Tail], Name, Badnodes, Replies, Time, TimerId) -> 915 %% R6 node 916 receive 917 {nodedown, N} -> 918 monitor_node(N, false), 919 rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId); 920 {{Tag, N}, Reply} -> %% Tag is bound !!! 921 receive {nodedown, N} -> ok after 0 -> ok end, 922 monitor_node(N, false), 923 rec_nodes(Tag, Tail, Name, Badnodes, 924 [{N,Reply}|Replies], 2000, TimerId); 925 {timeout, TimerId, _} -> 926 receive {nodedown, N} -> ok after 0 -> ok end, 927 monitor_node(N, false), 928 %% Collect all replies that already have arrived 929 rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies) 930 after Time -> 931 case rpc:call(N, erlang, whereis, [Name]) of 932 Pid when is_pid(Pid) -> % It exists try again. 933 rec_nodes(Tag, [N|Tail], Name, Badnodes, 934 Replies, infinity, TimerId); 935 _ -> % badnode 936 receive {nodedown, N} -> ok after 0 -> ok end, 937 monitor_node(N, false), 938 rec_nodes(Tag, Tail, Name, [N|Badnodes], 939 Replies, 2000, TimerId) 940 end 941 end; 942rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) -> 943 case catch erlang:cancel_timer(TimerId) of 944 false -> % It has already sent it's message 945 receive 946 {timeout, TimerId, _} -> ok 947 after 0 -> 948 ok 949 end; 950 _ -> % Timer was cancelled, or TimerId was 'undefined' 951 ok 952 end, 953 {Replies, Badnodes}. 954 955%% Collect all replies that already have arrived 956rec_nodes_rest(Tag, [{N,R}|Tail], Name, Badnodes, Replies) -> 957 receive 958 {'DOWN', R, _, _, _} -> 959 rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); 960 {{Tag, N}, Reply} -> %% Tag is bound !!! 961 unmonitor(R), 962 rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) 963 after 0 -> 964 unmonitor(R), 965 rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) 966 end; 967rec_nodes_rest(Tag, [N|Tail], Name, Badnodes, Replies) -> 968 %% R6 node 969 receive 970 {nodedown, N} -> 971 monitor_node(N, false), 972 rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); 973 {{Tag, N}, Reply} -> %% Tag is bound !!! 974 receive {nodedown, N} -> ok after 0 -> ok end, 975 monitor_node(N, false), 976 rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) 977 after 0 -> 978 receive {nodedown, N} -> ok after 0 -> ok end, 979 monitor_node(N, false), 980 rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) 981 end; 982rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) -> 983 {Replies, Badnodes}. 984 985 986%%% --------------------------------------------------- 987%%% Monitor functions 988%%% --------------------------------------------------- 989 990start_monitor(Node, Name) when is_atom(Node), is_atom(Name) -> 991 if node() =:= nonode@nohost, Node =/= nonode@nohost -> 992 Ref = make_ref(), 993 self() ! {'DOWN', Ref, process, {Name, Node}, noconnection}, 994 {Node, Ref}; 995 true -> 996 case catch erlang:monitor(process, {Name, Node}) of 997 {'EXIT', _} -> 998 %% Remote node is R6 999 monitor_node(Node, true), 1000 Node; 1001 Ref when is_reference(Ref) -> 1002 {Node, Ref} 1003 end 1004 end. 1005 1006%% Cancels a monitor started with Ref=erlang:monitor(_, _). 1007unmonitor(Ref) when is_reference(Ref) -> 1008 erlang:demonitor(Ref), 1009 receive 1010 {'DOWN', Ref, _, _, _} -> 1011 true 1012 after 0 -> 1013 true 1014 end. 1015 1016%%% --------------------------------------------------- 1017%%% Message handling functions 1018%%% --------------------------------------------------- 1019 1020dispatch({'$gen_cast', Msg}, Mod, State) -> 1021 Mod:handle_cast(Msg, State); 1022dispatch(Info, Mod, State) -> 1023 Mod:handle_info(Info, State). 1024 1025common_reply(_Name, From, Reply, _NState, [] = _Debug) -> 1026 reply(From, Reply), 1027 []; 1028common_reply(Name, {To, _Tag} = From, Reply, NState, Debug) -> 1029 reply(From, Reply), 1030 sys:handle_debug(Debug, fun print_event/3, Name, {out, Reply, To, NState}). 1031 1032common_noreply(_Name, _NState, [] = _Debug) -> 1033 []; 1034common_noreply(Name, NState, Debug) -> 1035 sys:handle_debug(Debug, fun print_event/3, Name, {noreply, NState}). 1036 1037common_become(_Name, _Mod, _NState, [] = _Debug) -> 1038 []; 1039common_become(Name, Mod, NState, Debug) -> 1040 sys:handle_debug(Debug, fun print_event/3, Name, {become, Mod, NState}). 1041 1042handle_msg({'$gen_call', From, Msg}, GS2State = #gs2_state { mod = Mod, 1043 state = State, 1044 name = Name, 1045 debug = Debug }) -> 1046 case catch Mod:handle_call(Msg, From, State) of 1047 {reply, Reply, NState} -> 1048 Debug1 = common_reply(Name, From, Reply, NState, Debug), 1049 loop(GS2State #gs2_state { state = NState, 1050 time = infinity, 1051 debug = Debug1 }); 1052 {reply, Reply, NState, Time1} -> 1053 Debug1 = common_reply(Name, From, Reply, NState, Debug), 1054 loop(GS2State #gs2_state { state = NState, 1055 time = Time1, 1056 debug = Debug1}); 1057 {stop, Reason, Reply, NState} -> 1058 {'EXIT', R} = 1059 (catch terminate(Reason, Msg, 1060 GS2State #gs2_state { state = NState })), 1061 _ = common_reply(Name, From, Reply, NState, Debug), 1062 exit(R); 1063 Other -> 1064 handle_common_reply(Other, Msg, GS2State) 1065 end; 1066handle_msg(Msg, GS2State = #gs2_state { mod = Mod, state = State }) -> 1067 Reply = (catch dispatch(Msg, Mod, State)), 1068 handle_common_reply(Reply, Msg, GS2State). 1069 1070handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name, 1071 debug = Debug}) -> 1072 case Reply of 1073 {noreply, NState} -> 1074 Debug1 = common_noreply(Name, NState, Debug), 1075 loop(GS2State #gs2_state {state = NState, 1076 time = infinity, 1077 debug = Debug1}); 1078 {noreply, NState, Time1} -> 1079 Debug1 = common_noreply(Name, NState, Debug), 1080 loop(GS2State #gs2_state {state = NState, 1081 time = Time1, 1082 debug = Debug1}); 1083 {become, Mod, NState} -> 1084 Debug1 = common_become(Name, Mod, NState, Debug), 1085 loop(find_prioritisers( 1086 GS2State #gs2_state { mod = Mod, 1087 state = NState, 1088 time = infinity, 1089 debug = Debug1 })); 1090 {become, Mod, NState, Time1} -> 1091 Debug1 = common_become(Name, Mod, NState, Debug), 1092 loop(find_prioritisers( 1093 GS2State #gs2_state { mod = Mod, 1094 state = NState, 1095 time = Time1, 1096 debug = Debug1 })); 1097 _ -> 1098 handle_common_termination(Reply, Msg, GS2State) 1099 end. 1100 1101handle_common_termination(Reply, Msg, GS2State) -> 1102 case Reply of 1103 {stop, Reason, NState} -> 1104 terminate(Reason, Msg, GS2State #gs2_state { state = NState }); 1105 {'EXIT', What} -> 1106 terminate(What, Msg, GS2State); 1107 _ -> 1108 terminate({bad_return_value, Reply}, Msg, GS2State) 1109 end. 1110 1111%%----------------------------------------------------------------- 1112%% Callback functions for system messages handling. 1113%%----------------------------------------------------------------- 1114system_continue(Parent, Debug, GS2State) -> 1115 loop(GS2State #gs2_state { parent = Parent, debug = Debug }). 1116 1117system_terminate(Reason, _Parent, Debug, GS2State) -> 1118 terminate(Reason, [], GS2State #gs2_state { debug = Debug }). 1119 1120system_code_change(GS2State = #gs2_state { mod = Mod, 1121 state = State }, 1122 _Module, OldVsn, Extra) -> 1123 case catch Mod:code_change(OldVsn, State, Extra) of 1124 {ok, NewState} -> 1125 NewGS2State = find_prioritisers( 1126 GS2State #gs2_state { state = NewState }), 1127 {ok, [NewGS2State]}; 1128 Else -> 1129 Else 1130 end. 1131 1132%%----------------------------------------------------------------- 1133%% Format debug messages. Print them as the call-back module sees 1134%% them, not as the real erlang messages. Use trace for that. 1135%%----------------------------------------------------------------- 1136print_event(Dev, {in, Msg}, Name) -> 1137 case Msg of 1138 {'$gen_call', {From, _Tag}, Call} -> 1139 io:format(Dev, "*DBG* ~p got call ~p from ~w~n", 1140 [Name, Call, From]); 1141 {'$gen_cast', Cast} -> 1142 io:format(Dev, "*DBG* ~p got cast ~p~n", 1143 [Name, Cast]); 1144 _ -> 1145 io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg]) 1146 end; 1147print_event(Dev, {out, Msg, To, State}, Name) -> 1148 io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n", 1149 [Name, Msg, To, State]); 1150print_event(Dev, {noreply, State}, Name) -> 1151 io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]); 1152print_event(Dev, Event, Name) -> 1153 io:format(Dev, "*DBG* ~p dbg ~p~n", [Name, Event]). 1154 1155 1156%%% --------------------------------------------------- 1157%%% Terminate the server. 1158%%% --------------------------------------------------- 1159 1160-spec terminate(_, _, _) -> no_return(). 1161 1162terminate(Reason, Msg, #gs2_state { name = Name, 1163 mod = Mod, 1164 state = State, 1165 debug = Debug, 1166 stop_stats_fun = StopStatsFun 1167 } = GS2State) -> 1168 StopStatsFun(stop_stats_timer(GS2State)), 1169 case catch Mod:terminate(Reason, State) of 1170 {'EXIT', R} -> 1171 error_info(R, Reason, Name, Msg, State, Debug), 1172 exit(R); 1173 _ -> 1174 case Reason of 1175 normal -> 1176 exit(normal); 1177 shutdown -> 1178 exit(shutdown); 1179 {shutdown,_}=Shutdown -> 1180 exit(Shutdown); 1181 _ -> 1182 error_info(Reason, undefined, Name, Msg, State, Debug), 1183 exit(Reason) 1184 end 1185 end. 1186 1187error_info(_Reason, _RootCause, application_controller, _Msg, _State, _Debug) -> 1188 %% OTP-5811 Don't send an error report if it's the system process 1189 %% application_controller which is terminating - let init take care 1190 %% of it instead 1191 ok; 1192error_info(Reason, RootCause, Name, Msg, State, Debug) -> 1193 Reason1 = error_reason(Reason), 1194 Fmt = 1195 "** Generic server ~p terminating~n" 1196 "** Last message in was ~p~n" 1197 "** When Server state == ~p~n" 1198 "** Reason for termination == ~n** ~p~n", 1199 case RootCause of 1200 undefined -> format(Fmt, [Name, Msg, State, Reason1]); 1201 _ -> format(Fmt ++ "** In 'terminate' callback " 1202 "with reason ==~n** ~p~n", 1203 [Name, Msg, State, Reason1, 1204 error_reason(RootCause)]) 1205 end, 1206 sys:print_log(Debug), 1207 ok. 1208 1209error_reason({undef,[{M,F,A}|MFAs]} = Reason) -> 1210 case code:is_loaded(M) of 1211 false -> {'module could not be loaded',[{M,F,A}|MFAs]}; 1212 _ -> case erlang:function_exported(M, F, length(A)) of 1213 true -> Reason; 1214 false -> {'function not exported',[{M,F,A}|MFAs]} 1215 end 1216 end; 1217error_reason(Reason) -> 1218 Reason. 1219 1220%%% --------------------------------------------------- 1221%%% Misc. functions. 1222%%% --------------------------------------------------- 1223 1224opt(Op, [{Op, Value}|_]) -> 1225 {ok, Value}; 1226opt(Op, [_|Options]) -> 1227 opt(Op, Options); 1228opt(_, []) -> 1229 false. 1230 1231debug_options(Name, Opts) -> 1232 case opt(debug, Opts) of 1233 {ok, Options} -> dbg_options(Name, Options); 1234 _ -> dbg_options(Name, []) 1235 end. 1236 1237dbg_options(Name, []) -> 1238 Opts = 1239 case init:get_argument(generic_debug) of 1240 error -> 1241 []; 1242 _ -> 1243 [log, statistics] 1244 end, 1245 dbg_opts(Name, Opts); 1246dbg_options(Name, Opts) -> 1247 dbg_opts(Name, Opts). 1248 1249dbg_opts(Name, Opts) -> 1250 case catch sys:debug_options(Opts) of 1251 {'EXIT',_} -> 1252 format("~p: ignoring erroneous debug options - ~p~n", 1253 [Name, Opts]), 1254 []; 1255 Dbg -> 1256 Dbg 1257 end. 1258 1259get_proc_name(Pid) when is_pid(Pid) -> 1260 Pid; 1261get_proc_name({local, Name}) -> 1262 case process_info(self(), registered_name) of 1263 {registered_name, Name} -> 1264 Name; 1265 {registered_name, _Name} -> 1266 exit(process_not_registered); 1267 [] -> 1268 exit(process_not_registered) 1269 end; 1270get_proc_name({global, Name}) -> 1271 case whereis_name(Name) of 1272 undefined -> 1273 exit(process_not_registered_globally); 1274 Pid when Pid =:= self() -> 1275 Name; 1276 _Pid -> 1277 exit(process_not_registered_globally) 1278 end. 1279 1280get_parent() -> 1281 case get('$ancestors') of 1282 [Parent | _] when is_pid(Parent)-> 1283 Parent; 1284 [Parent | _] when is_atom(Parent)-> 1285 name_to_pid(Parent); 1286 _ -> 1287 exit(process_was_not_started_by_proc_lib) 1288 end. 1289 1290name_to_pid(Name) -> 1291 case whereis(Name) of 1292 undefined -> 1293 case whereis_name(Name) of 1294 undefined -> 1295 exit(could_not_find_registered_name); 1296 Pid -> 1297 Pid 1298 end; 1299 Pid -> 1300 Pid 1301 end. 1302 1303whereis_name(Name) -> 1304 case ets:lookup(global_names, Name) of 1305 [{_Name, Pid, _Method, _RPid, _Ref}] -> 1306 if node(Pid) == node() -> 1307 case is_process_alive(Pid) of 1308 true -> Pid; 1309 false -> undefined 1310 end; 1311 true -> 1312 Pid 1313 end; 1314 [] -> undefined 1315 end. 1316 1317find_prioritisers(GS2State = #gs2_state { mod = Mod }) -> 1318 PCall = function_exported_or_default(Mod, 'prioritise_call', 4, 1319 fun (_Msg, _From, _State) -> 0 end), 1320 PCast = function_exported_or_default(Mod, 'prioritise_cast', 3, 1321 fun (_Msg, _State) -> 0 end), 1322 PInfo = function_exported_or_default(Mod, 'prioritise_info', 3, 1323 fun (_Msg, _State) -> 0 end), 1324 GS2State #gs2_state { prioritisers = {PCall, PCast, PInfo} }. 1325 1326function_exported_or_default(Mod, Fun, Arity, Default) -> 1327 case erlang:function_exported(Mod, Fun, Arity) of 1328 true -> case Arity of 1329 3 -> fun (Msg, GS2State = #gs2_state { queue = Queue, 1330 state = State }) -> 1331 Length = priority_queue:len(Queue), 1332 case catch Mod:Fun(Msg, Length, State) of 1333 drop -> 1334 drop; 1335 Res when is_integer(Res) -> 1336 Res; 1337 Err -> 1338 handle_common_termination(Err, Msg, GS2State) 1339 end 1340 end; 1341 4 -> fun (Msg, From, GS2State = #gs2_state { queue = Queue, 1342 state = State }) -> 1343 Length = priority_queue:len(Queue), 1344 case catch Mod:Fun(Msg, From, Length, State) of 1345 Res when is_integer(Res) -> 1346 Res; 1347 Err -> 1348 handle_common_termination(Err, Msg, GS2State) 1349 end 1350 end 1351 end; 1352 false -> Default 1353 end. 1354 1355%%----------------------------------------------------------------- 1356%% Status information 1357%%----------------------------------------------------------------- 1358format_status(Opt, StatusData) -> 1359 [PDict, SysState, Parent, Debug, 1360 #gs2_state{name = Name, state = State, mod = Mod, queue = Queue}] = 1361 StatusData, 1362 NameTag = if is_pid(Name) -> 1363 pid_to_list(Name); 1364 is_atom(Name) -> 1365 Name 1366 end, 1367 Header = lists:concat(["Status for generic server ", NameTag]), 1368 Log = sys:get_log(Debug), 1369 Specfic = callback(Mod, format_status, [Opt, [PDict, State]], 1370 fun () -> [{data, [{"State", State}]}] end), 1371 Messages = callback(Mod, format_message_queue, [Opt, Queue], 1372 fun () -> priority_queue:to_list(Queue) end), 1373 [{header, Header}, 1374 {data, [{"Status", SysState}, 1375 {"Parent", Parent}, 1376 {"Logged events", Log}, 1377 {"Queued messages", Messages}]} | 1378 Specfic]. 1379 1380callback(Mod, FunName, Args, DefaultThunk) -> 1381 case erlang:function_exported(Mod, FunName, length(Args)) of 1382 true -> case catch apply(Mod, FunName, Args) of 1383 {'EXIT', _} -> DefaultThunk(); 1384 Success -> Success 1385 end; 1386 false -> DefaultThunk() 1387 end. 1388 1389stats_funs() -> 1390 case ets:info(gen_server2_metrics) of 1391 undefined -> 1392 {fun(GS2State) -> GS2State end, 1393 fun(GS2State) -> GS2State end}; 1394 _ -> 1395 {fun emit_stats/1, fun stop_stats/1} 1396 end. 1397 1398init_stats(State = #gs2_state{ emit_stats_fun = EmitStatsFun }) -> 1399 StateWithInitTimer = rabbit_event:init_stats_timer(State, #gs2_state.timer), 1400 next_stats_timer(EmitStatsFun(StateWithInitTimer)). 1401 1402next_stats_timer(State) -> 1403 ensure_stats_timer(rabbit_event:reset_stats_timer(State, #gs2_state.timer)). 1404 1405ensure_stats_timer(State) -> 1406 rabbit_event:ensure_stats_timer(State, 1407 #gs2_state.timer, 1408 emit_gen_server2_stats). 1409 1410stop_stats_timer(State) -> 1411 rabbit_event:stop_stats_timer(State, #gs2_state.timer). 1412 1413emit_stats(State = #gs2_state{queue = Queue}) -> 1414 rabbit_core_metrics:gen_server2_stats(self(), priority_queue:len(Queue)), 1415 State. 1416 1417stop_stats(State) -> 1418 rabbit_core_metrics:gen_server2_deleted(self()), 1419 State. 1420