1%% 2%% %CopyrightBegin% 3%% 4%% Copyright Ericsson AB 1996-2018. All Rights Reserved. 5%% 6%% Licensed under the Apache License, Version 2.0 (the "License"); 7%% you may not use this file except in compliance with the License. 8%% You may obtain a copy of the License at 9%% 10%% http://www.apache.org/licenses/LICENSE-2.0 11%% 12%% Unless required by applicable law or agreed to in writing, software 13%% distributed under the License is distributed on an "AS IS" BASIS, 14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15%% See the License for the specific language governing permissions and 16%% limitations under the License. 17%% 18%% %CopyrightEnd% 19%% 20 21%% 22%% The mnesia_init process loads tables from local disc or from 23%% another nodes. It also coordinates updates of the info about 24%% where we can read and write tables. 25%% 26%% Tables may need to be loaded initially at startup of the local 27%% node or when other nodes announces that they already have loaded 28%% tables that we also want. 29%% 30%% Initially we set the load request queue to those tables that we 31%% safely can load locally, i.e. tables where we have the last 32%% consistent replica and we have received mnesia_down from all 33%% other nodes holding the table. Then we let the mnesia_init 34%% process enter its normal working state. 35%% 36%% When we need to load a table we append a request to the load 37%% request queue. All other requests are regarded as high priority 38%% and are processed immediately (e.g. update table whereabouts). 39%% We processes the load request queue as a "background" job.. 40 41-module(mnesia_controller). 42 43-behaviour(gen_server). 44 45%% Mnesia internal stuff 46-export([ 47 start/0, 48 i_have_tab/1, 49 info/0, 50 get_info/1, 51 get_workers/1, 52 force_load_table/1, 53 async_dump_log/1, 54 sync_dump_log/1, 55 snapshot_dcd/1, 56 connect_nodes/1, 57 connect_nodes/2, 58 wait_for_schema_commit_lock/0, 59 release_schema_commit_lock/0, 60 create_table/1, 61 get_disc_copy/1, 62 get_remote_cstructs/0, % new function 63 get_cstructs/0, % old function 64 sync_and_block_table_whereabouts/4, 65 sync_del_table_copy_whereabouts/2, 66 block_table/1, 67 unblock_table/1, 68 block_controller/0, 69 unblock_controller/0, 70 unannounce_add_table_copy/2, 71 master_nodes_updated/2, 72 mnesia_down/1, 73 add_active_replica/2, 74 add_active_replica/3, 75 add_active_replica/4, 76 update/1, 77 change_table_access_mode/1, 78 change_table_majority/1, 79 del_active_replica/2, 80 wait_for_tables/2, 81 get_network_copy/3, 82 merge_schema/0, 83 start_remote_sender/4, 84 schedule_late_disc_load/2 85 ]). 86 87%% gen_server callbacks 88-export([init/1, 89 handle_call/3, 90 handle_cast/2, 91 handle_info/2, 92 terminate/2, 93 code_change/3]). 94 95%% Module internal stuff 96-export([call/1, 97 cast/1, 98 dump_and_reply/2, 99 load_and_reply/2, 100 send_and_reply/2, 101 wait_for_tables_init/2, 102 connect_nodes2/3 103 ]). 104 105-compile({no_auto_import,[error/2]}). 106 107-import(mnesia_lib, [set/2, add/2]). 108-import(mnesia_lib, [fatal/2, error/2, verbose/2, dbg_out/2]). 109 110-include("mnesia.hrl"). 111 112-define(SERVER_NAME, ?MODULE). 113 114-record(state, {supervisor, 115 schema_is_merged = false, 116 early_msgs = [], 117 loader_pid = [], %% Was Pid is now [{Pid,Work}|..] 118 loader_queue, %% Was list is now gb_tree 119 sender_pid = [], %% Was a pid or undef is now [{Pid,Work}|..] 120 sender_queue = [], 121 late_loader_queue, %% Was list is now gb_tree 122 dumper_pid, %% Dumper or schema commit pid 123 dumper_queue = [], %% Dumper or schema commit queue 124 others = [], %% Processes that needs the copier_done msg 125 dump_log_timer_ref, 126 is_stopping = false 127 }). 128%% Backwards Comp. Sender_pid is now a list of senders.. 129get_senders(#state{sender_pid = Pids}) when is_list(Pids) -> Pids. 130%% Backwards Comp. loader_pid is now a list of loaders.. 131get_loaders(#state{loader_pid = Pids}) when is_list(Pids) -> Pids. 132max_loaders() -> 133 case ?catch_val(no_table_loaders) of 134 {'EXIT', _} -> 135 mnesia_lib:set(no_table_loaders,1), 136 1; 137 Val -> Val 138 end. 139 140-record(schema_commit_lock, {owner}). 141-record(block_controller, {owner}). 142 143-record(dump_log, {initiated_by, 144 opt_reply_to, 145 operation = dump_log 146 }). 147 148-record(net_load, {table, 149 reason, 150 opt_reply_to, 151 cstruct = unknown 152 }). 153 154-record(send_table, {table, 155 receiver_pid, 156 remote_storage 157 }). 158 159-record(disc_load, {table, 160 reason, 161 opt_reply_to 162 }). 163 164-record(late_load, {table, 165 reason, 166 opt_reply_to, 167 loaders 168 }). 169 170-record(loader_done, {worker_pid, 171 is_loaded, 172 table_name, 173 needs_announce, 174 needs_sync, 175 needs_reply, 176 reply_to, 177 reply}). 178 179-record(sender_done, {worker_pid, 180 worker_res, 181 table_name 182 }). 183 184-record(dumper_done, {worker_pid, 185 worker_res 186 }). 187 188%% Local function in order to avoid external function call 189val(Var) -> 190 case ?catch_val_and_stack(Var) of 191 {'EXIT', Stacktrace} -> mnesia_lib:other_val(Var, Stacktrace); 192 Value -> Value 193 end. 194 195start() -> 196 gen_server:start_link({local, ?SERVER_NAME}, ?MODULE, [self()], 197 [{timeout, infinity} 198 %% ,{debug, [trace]} 199 ]). 200 201sync_dump_log(InitBy) -> 202 call({sync_dump_log, InitBy}). 203 204async_dump_log(InitBy) -> 205 ?SERVER_NAME ! {async_dump_log, InitBy}, 206 ok. 207 208snapshot_dcd(Tables) when is_list(Tables) -> 209 case [T || T <- Tables, 210 mnesia_lib:storage_type_at_node(node(), T) =/= disc_copies] of 211 [] -> 212 call({snapshot_dcd, Tables}); 213 BadTabs -> 214 {error, {not_disc_copies, BadTabs}} 215 end. 216 217%% Wait for tables to be active 218%% If needed, we will wait for Mnesia to start 219%% If Mnesia stops, we will wait for Mnesia to restart 220%% We will wait even if the list of tables is empty 221%% 222wait_for_tables(Tabs, Timeout) when is_list(Tabs), Timeout == infinity -> 223 do_wait_for_tables(Tabs, Timeout); 224wait_for_tables(Tabs, Timeout) when is_list(Tabs), 225 is_integer(Timeout), Timeout >= 0 -> 226 do_wait_for_tables(Tabs, Timeout); 227wait_for_tables(Tabs, Timeout) -> 228 {error, {badarg, Tabs, Timeout}}. 229 230do_wait_for_tables(Tabs, 0) -> 231 reply_wait(Tabs); 232do_wait_for_tables(Tabs, Timeout) -> 233 Pid = spawn_link(?MODULE, wait_for_tables_init, [self(), Tabs]), 234 receive 235 {?SERVER_NAME, Pid, Res} -> 236 Res; 237 {'EXIT', Pid, _} -> 238 reply_wait(Tabs) 239 after Timeout -> 240 unlink(Pid), 241 exit(Pid, timeout), 242 reply_wait(Tabs) 243 end. 244 245reply_wait(Tabs) -> 246 try mnesia_lib:active_tables() of 247 Active when is_list(Active) -> 248 case Tabs -- Active of 249 [] -> 250 ok; 251 BadTabs -> 252 {timeout, BadTabs} 253 end 254 catch exit:_ -> {error, {node_not_running, node()}} 255 end. 256 257wait_for_tables_init(From, Tabs) -> 258 process_flag(trap_exit, true), 259 Res = wait_for_init(From, Tabs, whereis(?SERVER_NAME)), 260 From ! {?SERVER_NAME, self(), Res}, 261 unlink(From), 262 exit(normal). 263 264wait_for_init(From, Tabs, Init) -> 265 try link(Init) of 266 true when is_pid(Init) -> 267 cast({sync_tabs, Tabs, self()}), 268 rec_tabs(Tabs, Tabs, From, Init) 269 catch error:_ -> %% Mnesia is not started 270 {error, {node_not_running, node()}} 271 end. 272 273sync_reply(Waiter, Tab) -> 274 Waiter ! {?SERVER_NAME, {tab_synced, Tab}}. 275 276rec_tabs([Tab | Tabs], AllTabs, From, Init) -> 277 receive 278 {?SERVER_NAME, {tab_synced, Tab}} -> 279 rec_tabs(Tabs, AllTabs, From, Init); 280 281 {'EXIT', From, _} -> 282 %% This will trigger an exit signal 283 %% to mnesia_init 284 exit(wait_for_tables_timeout); 285 286 {'EXIT', Init, _} -> 287 %% Oops, mnesia_init stopped, 288 exit(mnesia_stopped) 289 end; 290rec_tabs([], _, _, Init) -> 291 unlink(Init), 292 ok. 293 294get_remote_cstructs() -> 295 get_cstructs(). %% Sigh not forward compatible always check version 296 297%% Old function kept for backwards compatibility; converts cstructs before sending. 298get_cstructs() -> 299 {cstructs, Cstructs, Running} = call(get_cstructs), 300 Node = node(group_leader()), 301 {cstructs, mnesia_schema:normalize_cs(Cstructs, Node), Running}. 302 303update(Fun) -> 304 call({update,Fun}). 305 306 307mnesia_down(Node) -> 308 case whereis(?SERVER_NAME) of 309 undefined -> mnesia_monitor:mnesia_down(?SERVER_NAME, Node); 310 Pid -> gen_server:cast(Pid, {mnesia_down, Node}) 311 end. 312 313wait_for_schema_commit_lock() -> 314 try 315 Pid = whereis(?SERVER_NAME), 316 link(Pid), %% Keep the link until release_schema_commit_lock 317 gen_server:call(Pid, wait_for_schema_commit_lock, infinity) 318 catch _:_ -> 319 mnesia:abort({node_not_running, node()}) 320 end. 321 322block_controller() -> 323 call(block_controller). 324 325unblock_controller() -> 326 cast(unblock_controller). 327 328release_schema_commit_lock() -> 329 cast({release_schema_commit_lock, self()}), 330 unlink(whereis(?SERVER_NAME)). 331 332%% Special for preparation of add table copy 333get_network_copy(Tid, Tab, Cs) -> 334% We can't let the controller queue this one 335% because that may cause a deadlock between schema_operations 336% and initial tableloadings which both takes schema locks. 337% But we have to get copier_done msgs when the other side 338% goes down. 339 call({add_other, self()}), 340 Reason = {dumper,{add_table_copy, Tid}}, 341 Work = #net_load{table = Tab,reason = Reason,cstruct = Cs}, 342 %% I'll need this cause it's linked trough the subscriber 343 %% might be solved by using monitor in subscr instead. 344 process_flag(trap_exit, true), 345 Load = load_table_fun(Work), 346 Res = ?CATCH(Load()), 347 process_flag(trap_exit, false), 348 call({del_other, self()}), 349 case Res of 350 #loader_done{is_loaded = true} -> 351 Tab = Res#loader_done.table_name, 352 case Res#loader_done.needs_announce of 353 true -> 354 i_have_tab(Tab); 355 false -> 356 ignore 357 end, 358 Res#loader_done.reply; 359 #loader_done{} -> 360 Res#loader_done.reply; 361 Else -> 362 {not_loaded, Else} 363 end. 364 365%% This functions is invoked from the dumper 366%% 367%% There are two cases here: 368%% startup -> 369%% no need for sync, since mnesia_controller not started yet 370%% schema_trans -> 371%% already synced with mnesia_controller since the dumper 372%% is syncronously started from mnesia_controller 373 374create_table(Tab) -> 375 {loaded, ok} = mnesia_loader:disc_load_table(Tab, {dumper,create_table}). 376 377get_disc_copy(Tab) -> 378 disc_load_table(Tab, {dumper,change_table_copy_type}, undefined). 379 380%% Returns ok instead of yes 381force_load_table(Tab) when is_atom(Tab), Tab /= schema -> 382 case ?catch_val({Tab, storage_type}) of 383 ram_copies -> 384 do_force_load_table(Tab); 385 disc_copies -> 386 do_force_load_table(Tab); 387 disc_only_copies -> 388 do_force_load_table(Tab); 389 {ext, _, _} -> 390 do_force_load_table(Tab); 391 unknown -> 392 set({Tab, load_by_force}, true), 393 cast({force_load_updated, Tab}), 394 wait_for_tables([Tab], infinity); 395 {'EXIT', _} -> 396 {error, {no_exists, Tab}} 397 end; 398force_load_table(Tab) -> 399 {error, {bad_type, Tab}}. 400 401do_force_load_table(Tab) -> 402 Loaded = ?catch_val({Tab, load_reason}), 403 case Loaded of 404 unknown -> 405 set({Tab, load_by_force}, true), 406 mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user), 407 wait_for_tables([Tab], infinity); 408 {'EXIT', _} -> 409 set({Tab, load_by_force}, true), 410 mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user), 411 wait_for_tables([Tab], infinity); 412 _ -> 413 ok 414 end. 415master_nodes_updated(schema, _Masters) -> 416 ignore; 417master_nodes_updated(Tab, Masters) -> 418 cast({master_nodes_updated, Tab, Masters}). 419 420schedule_late_disc_load(Tabs, Reason) -> 421 MsgTag = late_disc_load, 422 try_schedule_late_disc_load(Tabs, Reason, MsgTag). 423 424try_schedule_late_disc_load(Tabs, _Reason, MsgTag) 425 when Tabs == [], MsgTag /= schema_is_merged -> 426 ignore; 427try_schedule_late_disc_load(Tabs, Reason, MsgTag) -> 428 GetIntents = 429 fun() -> 430 Item = mnesia_late_disc_load, 431 Nodes = val({current, db_nodes}), 432 mnesia:lock({global, Item, Nodes}, write), 433 case multicall(Nodes -- [node()], disc_load_intents) of 434 {Replies, []} -> 435 call({MsgTag, Tabs, Reason, Replies}), 436 done; 437 {_, BadNodes} -> 438 %% Some nodes did not respond, lets try again 439 {retry, BadNodes} 440 end 441 end, 442 case mnesia:transaction(GetIntents) of 443 {atomic, done} -> 444 done; 445 {atomic, {retry, BadNodes}} -> 446 verbose("Retry late_load_tables because bad nodes: ~p~n", 447 [BadNodes]), 448 try_schedule_late_disc_load(Tabs, Reason, MsgTag); 449 {aborted, AbortReason} -> 450 fatal("Cannot late_load_tables ~tp: ~tp~n", 451 [[Tabs, Reason, MsgTag], AbortReason]) 452 end. 453 454connect_nodes(Ns) -> 455 connect_nodes(Ns, fun default_merge/1). 456 457connect_nodes(Ns, UserFun) -> 458 case mnesia:system_info(is_running) of 459 no -> 460 {error, {node_not_running, node()}}; 461 yes -> 462 Pid = spawn_link(?MODULE,connect_nodes2,[self(),Ns, UserFun]), 463 receive 464 {?MODULE, Pid, Res, New} -> 465 case Res of 466 ok -> 467 mnesia_lib:add_list(extra_db_nodes, New), 468 {ok, New}; 469 {aborted, {throw, Str}} when is_list(Str) -> 470 %%mnesia_recover:disconnect_nodes(New), 471 {error, {merge_schema_failed, lists:flatten(Str)}}; 472 Else -> 473 {error, Else} 474 end; 475 {'EXIT', Pid, Reason} -> 476 {error, Reason} 477 end 478 end. 479 480connect_nodes2(Father, Ns, UserFun) -> 481 Current = val({current, db_nodes}), 482 abcast([node()|Ns], {merging_schema, node()}), 483 {NewC, OldC} = mnesia_recover:connect_nodes(Ns), 484 Connected = NewC ++OldC, 485 New1 = mnesia_lib:intersect(Ns, Connected), 486 New = New1 -- Current, 487 process_flag(trap_exit, true), 488 Res = try_merge_schema(New, [], UserFun), 489 Msg = {schema_is_merged, [], late_merge, []}, 490 _ = multicall([node()|Ns], Msg), 491 After = val({current, db_nodes}), 492 Father ! {?MODULE, self(), Res, mnesia_lib:intersect(Ns,After)}, 493 unlink(Father), 494 ok. 495 496%% Merge the local schema with the schema on other nodes. 497%% But first we must let all processes that want to force 498%% load tables wait until the schema merge is done. 499 500merge_schema() -> 501 AllNodes = mnesia_lib:all_nodes(), 502 case try_merge_schema(AllNodes, [node()], fun default_merge/1) of 503 ok -> 504 schema_is_merged(); 505 {aborted, {throw, Str}} when is_list(Str) -> 506 fatal("Failed to merge schema: ~s~n", [Str]); 507 Else -> 508 fatal("Failed to merge schema: ~p~n", [Else]) 509 end. 510 511default_merge(F) -> 512 F([]). 513 514try_merge_schema(Nodes, Told0, UserFun) -> 515 case mnesia_schema:merge_schema(UserFun) of 516 {atomic, not_merged} -> 517 %% No more nodes that we need to merge the schema with 518 %% Ensure we have told everybody that we are running 519 case val({current,db_nodes}) -- mnesia_lib:uniq(Told0) of 520 [] -> ok; 521 Tell -> 522 im_running(Tell, [node()]), 523 ok 524 end; 525 {atomic, {merged, OldFriends, NewFriends}} -> 526 %% Check if new nodes has been added to the schema 527 Diff = mnesia_lib:all_nodes() -- [node() | Nodes], 528 mnesia_recover:connect_nodes(Diff), 529 530 %% Tell everybody to adopt orphan tables 531 im_running(OldFriends, NewFriends), 532 im_running(NewFriends, OldFriends), 533 Told = case lists:member(node(), NewFriends) of 534 true -> Told0 ++ OldFriends; 535 false -> Told0 ++ NewFriends 536 end, 537 try_merge_schema(Nodes, Told, UserFun); 538 {atomic, {"Cannot get cstructs", Node, Reason}} -> 539 dbg_out("Cannot get cstructs, Node ~p ~tp~n", [Node, Reason]), 540 timer:sleep(300), % Avoid a endless loop look alike 541 try_merge_schema(Nodes, Told0, UserFun); 542 {aborted, {shutdown, _}} -> %% One of the nodes is going down 543 timer:sleep(300), % Avoid a endless loop look alike 544 try_merge_schema(Nodes, Told0, UserFun); 545 Other -> 546 Other 547 end. 548 549im_running(OldFriends, NewFriends) -> 550 abcast(OldFriends, {im_running, node(), NewFriends}). 551 552schema_is_merged() -> 553 MsgTag = schema_is_merged, 554 SafeLoads = initial_safe_loads(), 555 556 %% At this point we do not know anything about 557 %% which tables that the other nodes already 558 %% has loaded and therefore we let the normal 559 %% processing of the loader_queue take care 560 %% of it, since we at that time point will 561 %% know the whereabouts. We rely on the fact 562 %% that all nodes tells each other directly 563 %% when they have loaded a table and are 564 %% willing to share it. 565 566 try_schedule_late_disc_load(SafeLoads, initial, MsgTag). 567 568 569cast(Msg) -> 570 case whereis(?SERVER_NAME) of 571 undefined -> ok; 572 Pid -> gen_server:cast(Pid, Msg) 573 end. 574 575abcast(Nodes, Msg) -> 576 gen_server:abcast(Nodes, ?SERVER_NAME, Msg). 577 578call(Msg) -> 579 case whereis(?SERVER_NAME) of 580 undefined -> 581 {error, {node_not_running, node()}}; 582 Pid -> 583 link(Pid), 584 Res = gen_server:call(Pid, Msg, infinity), 585 unlink(Pid), 586 587 %% We get an exit signal if server dies 588 receive 589 {'EXIT', Pid, _Reason} -> 590 {error, {node_not_running, node()}} 591 after 0 -> 592 Res 593 end 594 end. 595 596remote_call(Node, Func, Args) -> 597 try gen_server:call({?MODULE, Node}, {Func, Args, self()}, infinity) 598 catch exit:Error -> {error, Error} 599 end. 600 601multicall(Nodes, Msg) -> 602 {Good, Bad} = gen_server:multi_call(Nodes, ?MODULE, Msg, infinity), 603 PatchedGood = [Reply || {_Node, Reply} <- Good], 604 {PatchedGood, Bad}. %% Make the replies look like rpc:multicalls.. 605%% rpc:multicall(Nodes, ?MODULE, call, [Msg]). 606 607next_async_dump_log() -> 608 Interval = mnesia_monitor:get_env(dump_log_time_threshold), 609 Msg = {next_async_dump_log, time_threshold}, 610 Ref = erlang:send_after(Interval, self(), Msg), 611 Ref. 612 613%%%---------------------------------------------------------------------- 614%%% Callback functions from gen_server 615%%%---------------------------------------------------------------------- 616 617%%---------------------------------------------------------------------- 618%% Func: init/1 619%% Returns: {ok, State} | 620%% {ok, State, Timeout} | 621%% {stop, Reason} 622%%---------------------------------------------------------------------- 623init([Parent]) -> 624 process_flag(trap_exit, true), 625 mnesia_lib:verbose("~p starting: ~p~n", [?SERVER_NAME, self()]), 626 627 %% Handshake and initialize transaction recovery 628 %% for new nodes detected in the schema 629 All = mnesia_lib:all_nodes(), 630 Diff = All -- [node() | val(original_nodes)], 631 mnesia_lib:unset(original_nodes), 632 mnesia_recover:connect_nodes(Diff), 633 634 Ref = next_async_dump_log(), 635 mnesia_dumper:start_regulator(), 636 637 Empty = gb_trees:empty(), 638 {ok, #state{supervisor = Parent, dump_log_timer_ref = Ref, 639 loader_queue = Empty, 640 late_loader_queue = Empty}}. 641 642%%---------------------------------------------------------------------- 643%% Func: handle_call/3 644%% Returns: {reply, Reply, State} | 645%% {reply, Reply, State, Timeout} | 646%% {noreply, State} | 647%% {noreply, State, Timeout} | 648%% {stop, Reason, Reply, State} | (terminate/2 is called) 649%% {stop, Reason, Reply, State} (terminate/2 is called) 650%%---------------------------------------------------------------------- 651 652handle_call({sync_dump_log, InitBy}, From, State) -> 653 Worker = #dump_log{initiated_by = InitBy, 654 opt_reply_to = From 655 }, 656 State2 = add_worker(Worker, State), 657 noreply(State2); 658 659handle_call({snapshot_dcd, Tables}, From, State) -> 660 Worker = #dump_log{initiated_by = user, 661 opt_reply_to = From, 662 operation = fun() -> 663 mnesia_dumper:snapshot_dcd(Tables) 664 end}, 665 State2 = add_worker(Worker, State), 666 noreply(State2); 667 668handle_call(wait_for_schema_commit_lock, From, State) -> 669 Worker = #schema_commit_lock{owner = From}, 670 State2 = add_worker(Worker, State), 671 noreply(State2); 672 673handle_call(block_controller, From, State) -> 674 Worker = #block_controller{owner = From}, 675 State2 = add_worker(Worker, State), 676 noreply(State2); 677 678handle_call({update,Fun}, From, State) -> 679 Res = ?CATCH(Fun()), 680 reply(From, Res), 681 noreply(State); 682 683handle_call(get_cstructs, From, State) -> 684 Tabs = val({schema, tables}), 685 Cstructs = [val({T, cstruct}) || T <- Tabs], 686 Running = val({current, db_nodes}), 687 reply(From, {cstructs, Cstructs, Running}), 688 noreply(State); 689 690handle_call({schema_is_merged, [], late_merge, []}, From, 691 State = #state{schema_is_merged = Merged}) -> 692 case Merged of 693 {false, Node} when Node == node(From) -> 694 Msgs = State#state.early_msgs, 695 State1 = State#state{early_msgs = [], schema_is_merged = true}, 696 handle_early_msgs(lists:reverse(Msgs), State1); 697 _ -> 698 %% Ooops this came to early, before we have merged :-) 699 %% or it came to late or from a node we don't care about 700 reply(From, ignore), 701 noreply(State) 702 end; 703 704handle_call({schema_is_merged, TabsR, Reason, RemoteLoaders}, From, State) -> 705 State2 = late_disc_load(TabsR, Reason, RemoteLoaders, From, State), 706 707 %% Handle early messages 708 Msgs = State2#state.early_msgs, 709 State3 = State2#state{early_msgs = [], schema_is_merged = true}, 710 handle_early_msgs(lists:reverse(Msgs), State3); 711 712handle_call(disc_load_intents,From,State = #state{loader_queue=LQ,late_loader_queue=LLQ}) -> 713 LQTabs = gb_trees:keys(LQ), 714 LLQTabs = gb_trees:keys(LLQ), 715 ActiveTabs = lists:sort(mnesia_lib:local_active_tables()), 716 reply(From, {ok, node(), ordsets:union([LQTabs,LLQTabs,ActiveTabs])}), 717 noreply(State); 718 719handle_call({update_where_to_write, [add, Tab, AddNode], _From}, _Dummy, State) -> 720 Current = val({current, db_nodes}), 721 Res = 722 case lists:member(AddNode, Current) and 723 (State#state.schema_is_merged == true) of 724 true -> 725 mnesia_lib:add_lsort({Tab, where_to_write}, AddNode), 726 update_where_to_wlock(Tab); 727 false -> 728 ignore 729 end, 730 {reply, Res, State}; 731 732handle_call({add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From}, 733 ReplyTo, State) -> 734 KnownNode = lists:member(ToNode, val({current, db_nodes})), 735 Merged = State#state.schema_is_merged, 736 if 737 KnownNode == false -> 738 reply(ReplyTo, ignore), 739 noreply(State); 740 Merged == true -> 741 Res = case ?catch_val({Tab, cstruct}) of 742 {'EXIT', _} -> %% Tab deleted 743 deleted; 744 _ -> 745 add_active_replica(Tab, ToNode, RemoteS, AccessMode) 746 end, 747 reply(ReplyTo, Res), 748 noreply(State); 749 true -> %% Schema is not merged 750 Msg = {add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From}, 751 Msgs = State#state.early_msgs, 752 reply(ReplyTo, ignore), %% Reply ignore and add data after schema merge 753 noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]}) 754 end; 755 756handle_call({unannounce_add_table_copy, [Tab, Node], From}, ReplyTo, State) -> 757 KnownNode = lists:member(node(From), val({current, db_nodes})), 758 Merged = State#state.schema_is_merged, 759 if 760 KnownNode == false -> 761 reply(ReplyTo, ignore), 762 noreply(State); 763 Merged == true -> 764 Res = unannounce_add_table_copy(Tab, Node), 765 reply(ReplyTo, Res), 766 noreply(State); 767 true -> %% Schema is not merged 768 Msg = {unannounce_add_table_copy, [Tab, Node], From}, 769 Msgs = State#state.early_msgs, 770 reply(ReplyTo, ignore), %% Reply ignore and add data after schema merge 771 %% Set ReplyTO to undefined so we don't reply twice 772 noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]}) 773 end; 774 775handle_call(Msg, From, State) when State#state.schema_is_merged /= true -> 776 %% Buffer early messages 777 Msgs = State#state.early_msgs, 778 noreply(State#state{early_msgs = [{call, Msg, From} | Msgs]}); 779 780handle_call({late_disc_load, Tabs, Reason, RemoteLoaders}, From, State) -> 781 State2 = late_disc_load(Tabs, Reason, RemoteLoaders, From, State), 782 noreply(State2); 783 784handle_call({unblock_table, Tab}, _Dummy, State) -> 785 Var = {Tab, where_to_commit}, 786 case val(Var) of 787 {blocked, List} -> 788 set(Var, List); % where_to_commit 789 _ -> 790 ignore 791 end, 792 {reply, ok, State}; 793 794handle_call({block_table, [Tab], From}, _Dummy, State) -> 795 case lists:member(node(From), val({current, db_nodes})) of 796 true -> 797 block_table(Tab); 798 false -> 799 ignore 800 end, 801 {reply, ok, State}; 802 803handle_call({check_w2r, _Node, Tab}, _From, State) -> 804 {reply, val({Tab, where_to_read}), State}; 805 806handle_call({add_other, Who}, _From, State = #state{others=Others0}) -> 807 Others = [Who|Others0], 808 {reply, ok, State#state{others=Others}}; 809handle_call({del_other, Who}, _From, State = #state{others=Others0}) -> 810 Others = lists:delete(Who, Others0), 811 {reply, ok, State#state{others=Others}}; 812 813handle_call(Msg, _From, State) -> 814 error("~p got unexpected call: ~tp~n", [?SERVER_NAME, Msg]), 815 noreply(State). 816 817late_disc_load(TabsR, Reason, RemoteLoaders, From, 818 State = #state{loader_queue = LQ, late_loader_queue = LLQ}) -> 819 verbose("Intend to load tables: ~tp~n", [TabsR]), 820 ?eval_debug_fun({?MODULE, late_disc_load}, 821 [{tabs, TabsR}, 822 {reason, Reason}, 823 {loaders, RemoteLoaders}]), 824 825 reply(From, queued), 826 %% RemoteLoaders is a list of {ok, Node, Tabs} tuples 827 828 %% Remove deleted tabs and queued/loaded 829 LocalTabs = gb_sets:from_ordset(lists:sort(mnesia_lib:val({schema,local_tables}))), 830 Filter = fun(TabInfo0, Acc) -> 831 TabInfo = {Tab,_} = 832 case TabInfo0 of 833 {_,_} -> TabInfo0; 834 TabN -> {TabN,Reason} 835 end, 836 case gb_sets:is_member(Tab, LocalTabs) of 837 true -> 838 case ?catch_val({Tab, where_to_read}) == node() of 839 true -> Acc; 840 false -> 841 case gb_trees:is_defined(Tab,LQ) of 842 true -> Acc; 843 false -> [TabInfo | Acc] 844 end 845 end; 846 false -> Acc 847 end 848 end, 849 850 Tabs = lists:foldl(Filter, [], TabsR), 851 852 Nodes = val({current, db_nodes}), 853 LateQueue = late_loaders(Tabs, RemoteLoaders, Nodes, LLQ), 854 State#state{late_loader_queue = LateQueue}. 855 856late_loaders([{Tab, Reason} | Tabs], RemoteLoaders, Nodes, LLQ) -> 857 case gb_trees:is_defined(Tab, LLQ) of 858 false -> 859 LoadNodes = late_load_filter(RemoteLoaders, Tab, Nodes, []), 860 case LoadNodes of 861 [] -> cast({disc_load, Tab, Reason}); % Ugly cast 862 _ -> ignore 863 end, 864 LateLoad = #late_load{table=Tab,loaders=LoadNodes,reason=Reason}, 865 late_loaders(Tabs, RemoteLoaders, Nodes, gb_trees:insert(Tab,LateLoad,LLQ)); 866 true -> 867 late_loaders(Tabs, RemoteLoaders, Nodes, LLQ) 868 end; 869late_loaders([], _RemoteLoaders, _Nodes, LLQ) -> 870 LLQ. 871 872late_load_filter([{error, _} | RemoteLoaders], Tab, Nodes, Acc) -> 873 late_load_filter(RemoteLoaders, Tab, Nodes, Acc); 874late_load_filter([{badrpc, _} | RemoteLoaders], Tab, Nodes, Acc) -> 875 late_load_filter(RemoteLoaders, Tab, Nodes, Acc); 876late_load_filter([RL | RemoteLoaders], Tab, Nodes, Acc) -> 877 {ok, Node, Intents} = RL, 878 Access = val({Tab, access_mode}), 879 LocalC = val({Tab, local_content}), 880 StillActive = lists:member(Node, Nodes), 881 RemoteIntent = lists:member(Tab, Intents), 882 if 883 Access == read_write, 884 LocalC == false, 885 StillActive == true, 886 RemoteIntent == true -> 887 Masters = mnesia_recover:get_master_nodes(Tab), 888 case lists:member(Node, Masters) of 889 true -> 890 %% The other node is master node for 891 %% the table, accept his load intent 892 late_load_filter(RemoteLoaders, Tab, Nodes, [Node | Acc]); 893 false when Masters == [] -> 894 %% The table has no master nodes 895 %% accept his load intent 896 late_load_filter(RemoteLoaders, Tab, Nodes, [Node | Acc]); 897 false -> 898 %% Some one else is master node for 899 %% the table, ignore his load intent 900 late_load_filter(RemoteLoaders, Tab, Nodes, Acc) 901 end; 902 true -> 903 late_load_filter(RemoteLoaders, Tab, Nodes, Acc) 904 end; 905late_load_filter([], _Tab, _Nodes, Acc) -> 906 Acc. 907 908%%---------------------------------------------------------------------- 909%% Func: handle_cast/2 910%% Returns: {noreply, State} | 911%% {noreply, State, Timeout} | 912%% {stop, Reason, State} (terminate/2 is called) 913%%---------------------------------------------------------------------- 914 915handle_cast({release_schema_commit_lock, _Owner}, State) -> 916 if 917 State#state.is_stopping == true -> 918 {stop, shutdown, State}; 919 true -> 920 case State#state.dumper_queue of 921 [#schema_commit_lock{}|Rest] -> 922 [_Worker | Rest] = State#state.dumper_queue, 923 State2 = State#state{dumper_pid = undefined, 924 dumper_queue = Rest}, 925 State3 = opt_start_worker(State2), 926 noreply(State3); 927 _ -> 928 noreply(State) 929 end 930 end; 931 932handle_cast(unblock_controller, State) -> 933 if 934 State#state.is_stopping == true -> 935 {stop, shutdown, State}; 936 is_record(hd(State#state.dumper_queue), block_controller) -> 937 [_Worker | Rest] = State#state.dumper_queue, 938 State2 = State#state{dumper_pid = undefined, 939 dumper_queue = Rest}, 940 State3 = opt_start_worker(State2), 941 noreply(State3) 942 end; 943 944handle_cast({mnesia_down, Node}, State) -> 945 maybe_log_mnesia_down(Node), 946 mnesia_lib:del({current, db_nodes}, Node), 947 mnesia_lib:unset({node_up, Node}), 948 mnesia_checkpoint:tm_mnesia_down(Node), 949 Alltabs = val({schema, tables}), 950 reconfigure_tables(Node, Alltabs), 951 %% Done from (external point of view) 952 mnesia_monitor:mnesia_down(?SERVER_NAME, Node), 953 954 %% Fix if we are late_merging against the node that went down 955 case State#state.schema_is_merged of 956 {false, Node} -> 957 spawn(?MODULE, call, [{schema_is_merged, [], late_merge, []}]); 958 _ -> 959 ignore 960 end, 961 962 %% Fix internal stuff 963 LateQ = remove_loaders(Alltabs, Node, State#state.late_loader_queue), 964 965 case get_senders(State) ++ get_loaders(State) of 966 [] -> ignore; 967 Senders -> 968 lists:foreach(fun({Pid,_}) -> Pid ! {copier_done, Node} end, 969 Senders) 970 end, 971 lists:foreach(fun(Pid) -> Pid ! {copier_done,Node} end, 972 State#state.others), 973 974 Remove = fun(ST) -> 975 node(ST#send_table.receiver_pid) /= Node 976 end, 977 NewSenders = lists:filter(Remove, State#state.sender_queue), 978 Early = remove_early_messages(State#state.early_msgs, Node), 979 noreply(State#state{sender_queue = NewSenders, 980 early_msgs = Early, 981 late_loader_queue = LateQ 982 }); 983 984handle_cast({merging_schema, Node}, State) -> 985 case State#state.schema_is_merged of 986 false -> 987 %% This comes from dynamic connect_nodes which are made 988 %% after mnesia:start() and the schema_merge. 989 ImANewKidInTheBlock = 990 (val({schema, storage_type}) == ram_copies) 991 andalso (mnesia_lib:val({schema, local_tables}) == [schema]), 992 case ImANewKidInTheBlock of 993 true -> %% I'm newly started ram_node.. 994 noreply(State#state{schema_is_merged = {false, Node}}); 995 false -> 996 noreply(State) 997 end; 998 _ -> %% Already merging schema. 999 noreply(State) 1000 end; 1001 1002handle_cast(Msg, State) when State#state.schema_is_merged /= true -> 1003 %% Buffer early messages 1004 Msgs = State#state.early_msgs, 1005 noreply(State#state{early_msgs = [{cast, Msg} | Msgs]}); 1006 1007%% This must be done after schema_is_merged otherwise adopt_orphan 1008%% might trigger a table load from wrong nodes as a result of that we don't 1009%% know which tables we can load safly first. 1010handle_cast({im_running, Node, NewFriends}, State) -> 1011 LocalTabs = mnesia_lib:local_active_tables() -- [schema], 1012 RemoveLocalOnly = fun(Tab) -> not val({Tab, local_content}) end, 1013 Tabs = lists:filter(RemoveLocalOnly, LocalTabs), 1014 Nodes = mnesia_lib:union([Node],val({current, db_nodes})), 1015 Ns = mnesia_lib:intersect(NewFriends, Nodes), 1016 abcast(Ns, {adopt_orphans, node(), Tabs}), 1017 noreply(State); 1018 1019handle_cast({disc_load, Tab, Reason}, State) -> 1020 Worker = #disc_load{table = Tab, reason = Reason}, 1021 State2 = add_worker(Worker, State), 1022 noreply(State2); 1023 1024handle_cast(Worker = #send_table{}, State) -> 1025 State2 = add_worker(Worker, State), 1026 noreply(State2); 1027 1028handle_cast({sync_tabs, Tabs, From}, State) -> 1029 %% user initiated wait_for_tables 1030 handle_sync_tabs(Tabs, From), 1031 noreply(State); 1032 1033handle_cast({i_have_tab, Tab, Node}, State) -> 1034 case lists:member(Node, val({current, db_nodes})) of 1035 true -> 1036 State2 = node_has_tabs([Tab], Node, State), 1037 noreply(State2); 1038 false -> 1039 noreply(State) 1040 end; 1041 1042handle_cast({force_load_updated, Tab}, State) -> 1043 case val({Tab, active_replicas}) of 1044 [] -> 1045 %% No valid replicas 1046 noreply(State); 1047 [SomeNode | _] -> 1048 State2 = node_has_tabs([Tab], SomeNode, State), 1049 noreply(State2) 1050 end; 1051 1052handle_cast({master_nodes_updated, Tab, Masters}, State) -> 1053 Active = val({Tab, active_replicas}), 1054 Valid = 1055 case val({Tab, load_by_force}) of 1056 true -> 1057 Active; 1058 false -> 1059 if 1060 Masters == [] -> 1061 Active; 1062 true -> 1063 mnesia_lib:intersect(Masters, Active) 1064 end 1065 end, 1066 case Valid of 1067 [] -> 1068 %% No valid replicas 1069 noreply(State); 1070 [SomeNode | _] -> 1071 State2 = node_has_tabs([Tab], SomeNode, State), 1072 noreply(State2) 1073 end; 1074 1075handle_cast({adopt_orphans, Node, Tabs}, State) -> 1076 State2 = node_has_tabs(Tabs, Node, State), 1077 1078 case ?catch_val({node_up,Node}) of 1079 true -> ignore; 1080 _ -> 1081 %% Register the other node as up and running 1082 set({node_up, Node}, true), 1083 mnesia_recover:log_mnesia_up(Node), 1084 verbose("Logging mnesia_up ~w~n",[Node]), 1085 mnesia_lib:report_system_event({mnesia_up, Node}), 1086 %% Load orphan tables 1087 LocalTabs = val({schema, local_tables}) -- [schema], 1088 Nodes = val({current, db_nodes}), 1089 {LocalOrphans, RemoteMasters} = 1090 orphan_tables(LocalTabs, Node, Nodes, [], []), 1091 Reason = {adopt_orphan, node()}, 1092 mnesia_late_loader:async_late_disc_load(node(), LocalOrphans, Reason), 1093 1094 Fun = 1095 fun(N) -> 1096 RemoteOrphans = 1097 [Tab || {Tab, Ns} <- RemoteMasters, 1098 lists:member(N, Ns)], 1099 mnesia_late_loader:maybe_async_late_disc_load(N, RemoteOrphans, Reason) 1100 end, 1101 lists:foreach(Fun, Nodes) 1102 end, 1103 noreply(State2); 1104 1105handle_cast(Msg, State) -> 1106 error("~p got unexpected cast: ~tp~n", [?SERVER_NAME, Msg]), 1107 noreply(State). 1108 1109handle_sync_tabs([Tab | Tabs], From) -> 1110 case val({Tab, where_to_read}) of 1111 nowhere -> 1112 case get({sync_tab, Tab}) of 1113 undefined -> 1114 put({sync_tab, Tab}, [From]); 1115 Pids -> 1116 put({sync_tab, Tab}, [From | Pids]) 1117 end; 1118 _ -> 1119 sync_reply(From, Tab) 1120 end, 1121 handle_sync_tabs(Tabs, From); 1122handle_sync_tabs([], _From) -> 1123 ok. 1124 1125%%---------------------------------------------------------------------- 1126%% Func: handle_info/2 1127%% Returns: {noreply, State} | 1128%% {noreply, State, Timeout} | 1129%% {stop, Reason, State} (terminate/2 is called) 1130%%---------------------------------------------------------------------- 1131 1132handle_info({next_async_dump_log, InitBy}, State) -> 1133 async_dump_log(InitBy), 1134 Ref = next_async_dump_log(), 1135 noreply(State#state{dump_log_timer_ref=Ref}); 1136 1137handle_info({async_dump_log, InitBy}, State) -> 1138 Worker = #dump_log{initiated_by = InitBy}, 1139 State2 = add_worker(Worker, State), 1140 noreply(State2); 1141 1142handle_info(#dumper_done{worker_pid=Pid, worker_res=Res}, State) -> 1143 if 1144 State#state.is_stopping == true -> 1145 {stop, shutdown, State}; 1146 Res == dumped, Pid == State#state.dumper_pid -> 1147 [Worker | Rest] = State#state.dumper_queue, 1148 reply(Worker#dump_log.opt_reply_to, Res), 1149 State2 = State#state{dumper_pid = undefined, 1150 dumper_queue = Rest}, 1151 State3 = opt_start_worker(State2), 1152 noreply(State3); 1153 true -> 1154 fatal("Dumper failed: ~p~n state: ~tp~n", [Res, State]), 1155 {stop, fatal, State} 1156 end; 1157 1158handle_info(Done = #loader_done{worker_pid=WPid, table_name=Tab}, State0) -> 1159 LateQueue0 = State0#state.late_loader_queue, 1160 State1 = State0#state{loader_pid = lists:keydelete(WPid,1,get_loaders(State0))}, 1161 1162 State2 = 1163 case Done#loader_done.is_loaded of 1164 true -> 1165 %% Optional table announcement 1166 if 1167 Done#loader_done.needs_announce == true, 1168 Done#loader_done.needs_reply == true -> 1169 i_have_tab(Tab), 1170 %% Should be {dumper,{add_table_copy, _}} only 1171 reply(Done#loader_done.reply_to, 1172 Done#loader_done.reply); 1173 Done#loader_done.needs_reply == true -> 1174 %% Should be {dumper,{add_table_copy,_}} only 1175 reply(Done#loader_done.reply_to, 1176 Done#loader_done.reply); 1177 Done#loader_done.needs_announce == true, Tab == schema -> 1178 i_have_tab(Tab); 1179 Done#loader_done.needs_announce == true -> 1180 i_have_tab(Tab), 1181 %% Local node needs to perform user_sync_tab/1 1182 Ns = val({current, db_nodes}), 1183 abcast(Ns, {i_have_tab, Tab, node()}); 1184 Tab == schema -> 1185 ignore; 1186 true -> 1187 %% Local node needs to perform user_sync_tab/1 1188 Ns = val({current, db_nodes}), 1189 AlreadyKnows = val({Tab, active_replicas}), 1190 abcast(Ns -- AlreadyKnows, {i_have_tab, Tab, node()}) 1191 end, 1192 %% Optional user sync 1193 case Done#loader_done.needs_sync of 1194 true -> user_sync_tab(Tab); 1195 false -> ignore 1196 end, 1197 State1#state{late_loader_queue=gb_trees:delete_any(Tab, LateQueue0)}; 1198 false -> 1199 %% Either the node went down or table was not 1200 %% loaded remotly yet 1201 case Done#loader_done.needs_reply of 1202 true -> 1203 reply(Done#loader_done.reply_to, 1204 Done#loader_done.reply); 1205 false -> 1206 ignore 1207 end, 1208 case ?catch_val({Tab, active_replicas}) of 1209 [_|_] -> % still available elsewhere 1210 {value,{_,Worker}} = lists:keysearch(WPid,1,get_loaders(State0)), 1211 add_loader(Tab,Worker,State1); 1212 _ -> 1213 DelState = State1#state{late_loader_queue=gb_trees:delete_any(Tab, LateQueue0)}, 1214 case ?catch_val({Tab, storage_type}) of 1215 ram_copies -> 1216 cast({disc_load, Tab, ram_only}), 1217 DelState; 1218 _ -> 1219 DelState 1220 end 1221 end 1222 end, 1223 State3 = opt_start_worker(State2), 1224 noreply(State3); 1225 1226handle_info(#sender_done{worker_pid=Pid, worker_res=Res}, State) -> 1227 Senders = get_senders(State), 1228 {value, {Pid,_Worker}} = lists:keysearch(Pid, 1, Senders), 1229 if 1230 Res == ok -> 1231 State2 = State#state{sender_pid = lists:keydelete(Pid, 1, Senders)}, 1232 State3 = opt_start_worker(State2), 1233 noreply(State3); 1234 true -> 1235 %% No need to send any message to the table receiver 1236 %% since it will soon get a mnesia_down anyway 1237 fatal("Sender failed: ~p~n state: ~tp~n", [Res, State]), 1238 {stop, fatal, State} 1239 end; 1240 1241handle_info({'EXIT', Pid, R}, State) when Pid == State#state.supervisor -> 1242 ?SAFE(set(mnesia_status, stopping)), 1243 case State#state.dumper_pid of 1244 undefined -> 1245 dbg_out("~p was ~tp~n", [?SERVER_NAME, R]), 1246 {stop, shutdown, State}; 1247 _ -> 1248 noreply(State#state{is_stopping = true}) 1249 end; 1250 1251handle_info({'EXIT', Pid, R}, State) when Pid == State#state.dumper_pid -> 1252 case State#state.dumper_queue of 1253 [#schema_commit_lock{}|Workers] -> %% Schema trans crashed or was killed 1254 dbg_out("WARNING: Dumper ~p exited ~tp~n", [Pid, R]), 1255 State2 = State#state{dumper_queue = Workers, dumper_pid = undefined}, 1256 State3 = opt_start_worker(State2), 1257 noreply(State3); 1258 _Other -> 1259 fatal("Dumper or schema commit crashed: ~p~n state: ~tp~n", [R, State]), 1260 {stop, fatal, State} 1261 end; 1262 1263handle_info(Msg = {'EXIT', Pid, R}, State) when R /= wait_for_tables_timeout -> 1264 case lists:keymember(Pid, 1, get_senders(State)) of 1265 true -> 1266 %% No need to send any message to the table receiver 1267 %% since it will soon get a mnesia_down anyway 1268 fatal("Sender crashed: ~p~n state: ~tp~n", [{Pid,R}, State]), 1269 {stop, fatal, State}; 1270 false -> 1271 case lists:keymember(Pid, 1, get_loaders(State)) of 1272 true -> 1273 fatal("Loader crashed: ~p~n state: ~tp~n", [R, State]), 1274 {stop, fatal, State}; 1275 false -> 1276 error("~p got unexpected info: ~tp~n", [?SERVER_NAME, Msg]), 1277 noreply(State) 1278 end 1279 end; 1280 1281handle_info({From, get_state}, State) -> 1282 From ! {?SERVER_NAME, State}, 1283 noreply(State); 1284 1285%% No real need for buffering 1286handle_info(Msg, State) when State#state.schema_is_merged /= true -> 1287 %% Buffer early messages 1288 Msgs = State#state.early_msgs, 1289 noreply(State#state{early_msgs = [{info, Msg} | Msgs]}); 1290 1291handle_info({'EXIT', Pid, wait_for_tables_timeout}, State) -> 1292 sync_tab_timeout(Pid, get()), 1293 noreply(State); 1294 1295handle_info(Msg, State) -> 1296 error("~p got unexpected info: ~tp~n", [?SERVER_NAME, Msg]), 1297 noreply(State). 1298 1299sync_tab_timeout(Pid, [{{sync_tab, Tab}, Pids} | Tail]) -> 1300 case lists:delete(Pid, Pids) of 1301 [] -> 1302 erase({sync_tab, Tab}); 1303 Pids2 -> 1304 put({sync_tab, Tab}, Pids2) 1305 end, 1306 sync_tab_timeout(Pid, Tail); 1307sync_tab_timeout(Pid, [_ | Tail]) -> 1308 sync_tab_timeout(Pid, Tail); 1309sync_tab_timeout(_Pid, []) -> 1310 ok. 1311 1312%% Pick the load record that has the highest load order 1313%% Returns {BestLoad, RemainingQueue} or {none, []} if queue is empty 1314pick_next(Queue) -> 1315 List = gb_trees:values(Queue), 1316 case pick_next(List, none, none) of 1317 none -> {none, gb_trees:empty()}; 1318 {Tab, Worker} -> {Worker, gb_trees:delete(Tab,Queue)} 1319 end. 1320 1321pick_next([Head = #net_load{table=Tab}| Tail], Load, Order) -> 1322 select_best(Head, Tail, ?catch_val({Tab, load_order}), Load, Order); 1323pick_next([Head = #disc_load{table=Tab}| Tail], Load, Order) -> 1324 select_best(Head, Tail, ?catch_val({Tab, load_order}), Load, Order); 1325pick_next([], none, _Order) -> 1326 none; 1327pick_next([], Load, _Order) -> 1328 {element(2,Load), Load}. 1329 1330select_best(_Head, Tail, {'EXIT', _WHAT}, Load, Order) -> 1331 %% Table have been deleted drop it. 1332 pick_next(Tail, Load, Order); 1333select_best(Load, Tail, Order, none, none) -> 1334 pick_next(Tail, Load, Order); 1335select_best(Load, Tail, Order, _OldLoad, OldOrder) when Order > OldOrder -> 1336 pick_next(Tail, Load, Order); 1337select_best(_Load, Tail, _Order, OldLoad, OldOrder) -> 1338 pick_next(Tail, OldLoad, OldOrder). 1339 1340%%---------------------------------------------------------------------- 1341%% Func: terminate/2 1342%% Purpose: Shutdown the server 1343%% Returns: any (ignored by gen_server) 1344%%---------------------------------------------------------------------- 1345terminate(Reason, State) -> 1346 mnesia_monitor:terminate_proc(?SERVER_NAME, Reason, State). 1347 1348%%---------------------------------------------------------------------- 1349%% Func: code_change/3 1350%% Purpose: Upgrade process when its code is to be changed 1351%% Returns: {ok, NewState} 1352%%---------------------------------------------------------------------- 1353code_change(_OldVsn, State0, _Extra) -> 1354 %% Loader Queue 1355 State1 = case State0#state.loader_pid of 1356 Pids when is_list(Pids) -> State0; 1357 undefined -> State0#state{loader_pid = [],loader_queue=gb_trees:empty()}; 1358 Pid when is_pid(Pid) -> 1359 [Loader|Rest] = State0#state.loader_queue, 1360 LQ0 = [{element(2,Rec),Rec} || Rec <- Rest], 1361 LQ1 = lists:sort(LQ0), 1362 LQ = gb_trees:from_orddict(LQ1), 1363 State0#state{loader_pid=[{Pid,Loader}], loader_queue=LQ} 1364 end, 1365 %% LateLoaderQueue 1366 State = if is_list(State1#state.late_loader_queue) -> 1367 LLQ0 = State1#state.late_loader_queue, 1368 LLQ1 = lists:sort([{element(2,Rec),Rec} || Rec <- LLQ0]), 1369 LLQ = gb_trees:from_orddict(LLQ1), 1370 State1#state{late_loader_queue=LLQ}; 1371 true -> 1372 State1 1373 end, 1374 {ok, State}. 1375 1376%%%---------------------------------------------------------------------- 1377%%% Internal functions 1378%%%---------------------------------------------------------------------- 1379 1380maybe_log_mnesia_down(N) -> 1381 %% We use mnesia_down when deciding which tables to load locally, 1382 %% so if we are not running (i.e haven't decided which tables 1383 %% to load locally), don't log mnesia_down yet. 1384 case mnesia_lib:is_running() of 1385 yes -> 1386 verbose("Logging mnesia_down ~w~n", [N]), 1387 mnesia_recover:log_mnesia_down(N), 1388 ok; 1389 _ -> 1390 Filter = fun(Tab) -> 1391 inactive_copy_holders(Tab, N) 1392 end, 1393 HalfLoadedTabs = lists:any(Filter, val({schema, local_tables}) -- [schema]), 1394 if 1395 HalfLoadedTabs == true -> 1396 verbose("Logging mnesia_down ~w~n", [N]), 1397 mnesia_recover:log_mnesia_down(N), 1398 ok; 1399 true -> 1400 %% Unfortunately we have not loaded some common 1401 %% tables yet, so we cannot rely on the nodedown 1402 log_later %% BUGBUG handle this case!!! 1403 end 1404 end. 1405 1406inactive_copy_holders(Tab, Node) -> 1407 Cs = val({Tab, cstruct}), 1408 case mnesia_lib:cs_to_storage_type(Node, Cs) of 1409 unknown -> 1410 false; 1411 _Storage -> 1412 mnesia_lib:not_active_here(Tab) 1413 end. 1414 1415orphan_tables([Tab | Tabs], Node, Ns, Local, Remote) -> 1416 Cs = val({Tab, cstruct}), 1417 CopyHolders = mnesia_lib:copy_holders(Cs), 1418 RamCopyHolders = Cs#cstruct.ram_copies, 1419 DiscCopyHolders = CopyHolders -- RamCopyHolders, 1420 DiscNodes = val({schema, disc_copies}), 1421 LocalContent = Cs#cstruct.local_content, 1422 RamCopyHoldersOnDiscNodes = mnesia_lib:intersect(RamCopyHolders, DiscNodes), 1423 Active = val({Tab, active_replicas}), 1424 BeingCreated = (?catch_val({Tab, create_table}) == true), 1425 Read = val({Tab, where_to_read}), 1426 case lists:member(Node, DiscCopyHolders) of 1427 _ when BeingCreated == true -> 1428 orphan_tables(Tabs, Node, Ns, Local, Remote); 1429 _ when Read == node() -> %% Allready loaded 1430 orphan_tables(Tabs, Node, Ns, Local, Remote); 1431 true when Active == [] -> 1432 case DiscCopyHolders -- Ns of 1433 [] -> 1434 %% We're last up and the other nodes have not 1435 %% loaded the table. Lets load it if we are 1436 %% the smallest node. 1437 case lists:min(DiscCopyHolders) of 1438 Min when Min == node() -> 1439 case mnesia_recover:get_master_nodes(Tab) of 1440 [] -> 1441 L = [Tab | Local], 1442 orphan_tables(Tabs, Node, Ns, L, Remote); 1443 Masters -> 1444 %% Do not disc_load table from RamCopyHolders 1445 R = [{Tab, Masters -- RamCopyHolders} | Remote], 1446 orphan_tables(Tabs, Node, Ns, Local, R) 1447 end; 1448 _ -> 1449 orphan_tables(Tabs, Node, Ns, Local, Remote) 1450 end; 1451 _ -> 1452 orphan_tables(Tabs, Node, Ns, Local, Remote) 1453 end; 1454 false when Active == [], DiscCopyHolders == [], RamCopyHoldersOnDiscNodes == [] -> 1455 %% Special case when all replicas resides on disc less nodes 1456 orphan_tables(Tabs, Node, Ns, [Tab | Local], Remote); 1457 _ when LocalContent == true -> 1458 orphan_tables(Tabs, Node, Ns, [Tab | Local], Remote); 1459 _ -> 1460 orphan_tables(Tabs, Node, Ns, Local, Remote) 1461 end; 1462orphan_tables([], _, _, LocalOrphans, RemoteMasters) -> 1463 {LocalOrphans, RemoteMasters}. 1464 1465node_has_tabs([Tab | Tabs], Node, State) when Node /= node() -> 1466 State2 = 1467 try update_whereabouts(Tab, Node, State) of 1468 State1 = #state{} -> State1 1469 catch exit:R -> %% Tab was just deleted? 1470 case ?catch_val({Tab, cstruct}) of 1471 {'EXIT', _} -> State; % yes 1472 _ -> erlang:error(R) 1473 end 1474 end, 1475 node_has_tabs(Tabs, Node, State2); 1476node_has_tabs([Tab | Tabs], Node, State) -> 1477 user_sync_tab(Tab), 1478 node_has_tabs(Tabs, Node, State); 1479node_has_tabs([], _Node, State) -> 1480 State. 1481 1482update_whereabouts(Tab, Node, State) -> 1483 Storage = val({Tab, storage_type}), 1484 Read = val({Tab, where_to_read}), 1485 LocalC = val({Tab, local_content}), 1486 BeingCreated = (?catch_val({Tab, create_table}) == true), 1487 Masters = mnesia_recover:get_master_nodes(Tab), 1488 ByForce = val({Tab, load_by_force}), 1489 GoGetIt = 1490 if 1491 ByForce == true -> 1492 true; 1493 Masters == [] -> 1494 true; 1495 true -> 1496 lists:member(Node, Masters) 1497 end, 1498 1499 dbg_out("Table ~w is loaded on ~w. s=~w, r=~w, lc=~w, f=~w, m=~w~n", 1500 [Tab, Node, Storage, Read, LocalC, ByForce, GoGetIt]), 1501 if 1502 LocalC == true -> 1503 %% Local contents, don't care about other node 1504 State; 1505 BeingCreated == true -> 1506 %% The table is currently being created 1507 %% It will be handled elsewhere 1508 State; 1509 Storage == unknown, Read == nowhere -> 1510 %% No own copy, time to read remotely 1511 %% if the other node is a good node 1512 add_active_replica(Tab, Node), 1513 case GoGetIt of 1514 true -> 1515 set({Tab, where_to_read}, Node), 1516 user_sync_tab(Tab), 1517 State; 1518 false -> 1519 State 1520 end; 1521 Storage == unknown -> 1522 %% No own copy, continue to read remotely 1523 add_active_replica(Tab, Node), 1524 NodeST = mnesia_lib:semantics(mnesia_lib:storage_type_at_node(Node, Tab), storage), 1525 ReadST = mnesia_lib:semantics(mnesia_lib:storage_type_at_node(Read, Tab), storage), 1526 if %% Avoid reading from disc_only_copies 1527 NodeST == disc_only_copies -> 1528 ignore; 1529 ReadST == disc_only_copies -> 1530 mnesia_lib:set_remote_where_to_read(Tab); 1531 true -> 1532 ignore 1533 end, 1534 user_sync_tab(Tab), 1535 State; 1536 Read == nowhere -> 1537 %% Own copy, go and get a copy of the table 1538 %% if the other node is master or if there 1539 %% are no master at all 1540 add_active_replica(Tab, Node), 1541 case GoGetIt of 1542 true -> 1543 Worker = #net_load{table = Tab, 1544 reason = {active_remote, Node}}, 1545 add_worker(Worker, State); 1546 false -> 1547 State 1548 end; 1549 true -> 1550 %% We already have an own copy 1551 add_active_replica(Tab, Node), 1552 user_sync_tab(Tab), 1553 State 1554 end. 1555 1556initial_safe_loads() -> 1557 case val({schema, storage_type}) of 1558 ram_copies -> 1559 Downs = [], 1560 Tabs = val({schema, local_tables}) -- [schema], 1561 LastC = fun(T) -> last_consistent_replica(T, Downs) end, 1562 lists:zf(LastC, Tabs); 1563 1564 disc_copies -> 1565 Downs = mnesia_recover:get_mnesia_downs(), 1566 dbg_out("mnesia_downs = ~p~n", [Downs]), 1567 1568 Tabs = val({schema, local_tables}) -- [schema], 1569 LastC = fun(T) -> last_consistent_replica(T, Downs) end, 1570 lists:zf(LastC, Tabs) 1571 end. 1572 1573last_consistent_replica(Tab, Downs) -> 1574 Cs = val({Tab, cstruct}), 1575 Storage = mnesia_lib:cs_to_storage_type(node(), Cs), 1576 Ram = Cs#cstruct.ram_copies, 1577 Disc = Cs#cstruct.disc_copies, 1578 DiscOnly = Cs#cstruct.disc_only_copies, 1579 Ext = Cs#cstruct.external_copies, 1580 BetterCopies0 = mnesia_lib:remote_copy_holders(Cs) -- Downs, 1581 BetterCopies = BetterCopies0 -- Ram, 1582 AccessMode = Cs#cstruct.access_mode, 1583 Copies = mnesia_lib:copy_holders(Cs), 1584 Masters = mnesia_recover:get_master_nodes(Tab), 1585 LocalMaster0 = lists:member(node(), Masters), 1586 LocalContent = Cs#cstruct.local_content, 1587 RemoteMaster = 1588 if 1589 Masters == [] -> false; 1590 true -> not LocalMaster0 1591 end, 1592 LocalMaster = 1593 if 1594 Masters == [] -> false; 1595 true -> LocalMaster0 1596 end, 1597 if 1598 Copies == [node()] -> 1599 %% Only one copy holder and it is local. 1600 %% It may also be a local contents table 1601 {true, {Tab, local_only}}; 1602 LocalContent == true -> 1603 {true, {Tab, local_content}}; 1604 LocalMaster == true -> 1605 %% We have a local master 1606 {true, {Tab, local_master}}; 1607 RemoteMaster == true -> 1608 %% Wait for remote master copy 1609 false; 1610 Storage == ram_copies -> 1611 if 1612 Disc == [], DiscOnly == [], Ext == [] -> 1613 %% Nobody has copy on disc 1614 {true, {Tab, ram_only}}; 1615 true -> 1616 %% Some other node has copy on disc 1617 false 1618 end; 1619 AccessMode == read_only -> 1620 %% No one has been able to update the table, 1621 %% i.e. all disc resident copies are equal 1622 {true, {Tab, read_only}}; 1623 BetterCopies /= [], Masters /= [node()] -> 1624 %% There are better copies on other nodes 1625 %% and we do not have the only master copy 1626 false; 1627 true -> 1628 {true, {Tab, initial}} 1629 end. 1630 1631reconfigure_tables(N, [Tab |Tail]) -> 1632 del_active_replica(Tab, N), 1633 case val({Tab, where_to_read}) of 1634 N -> mnesia_lib:set_remote_where_to_read(Tab); 1635 _ -> ignore 1636 end, 1637 reconfigure_tables(N, Tail); 1638reconfigure_tables(_, []) -> 1639 ok. 1640 1641remove_loaders([Tab| Tabs], N, Loaders) -> 1642 LateQ = drop_loaders(Tab, N, Loaders), 1643 remove_loaders(Tabs, N, LateQ); 1644remove_loaders([],_, LateQ) -> LateQ. 1645 1646remove_early_messages([], _Node) -> 1647 []; 1648remove_early_messages([{call, {add_active_replica, [_, Node, _, _], _}, _}|R], Node) -> 1649 remove_early_messages(R, Node); %% Does a reply before queuing 1650remove_early_messages([{call, {block_table, _, From}, ReplyTo}|R], Node) 1651 when node(From) == Node -> 1652 reply(ReplyTo, ok), %% Remove gen:server waits.. 1653 remove_early_messages(R, Node); 1654remove_early_messages([{cast, {i_have_tab, _Tab, Node}}|R], Node) -> 1655 remove_early_messages(R, Node); 1656remove_early_messages([{cast, {adopt_orphans, Node, _Tabs}}|R], Node) -> 1657 remove_early_messages(R, Node); 1658remove_early_messages([M|R],Node) -> 1659 [M|remove_early_messages(R,Node)]. 1660 1661%% Drop loader from late load queue and possibly trigger a disc_load 1662drop_loaders(Tab, Node, LLQ) -> 1663 case gb_trees:lookup(Tab,LLQ) of 1664 none -> 1665 LLQ; 1666 {value, H} -> 1667 %% Check if it is time to issue a disc_load request 1668 case H#late_load.loaders of 1669 [Node] -> 1670 Reason = {H#late_load.reason, last_loader_down, Node}, 1671 cast({disc_load, Tab, Reason}); % Ugly cast 1672 _ -> 1673 ignore 1674 end, 1675 %% Drop the node from the list of loaders 1676 H2 = H#late_load{loaders = H#late_load.loaders -- [Node]}, 1677 gb_trees:update(Tab, H2, LLQ) 1678 end. 1679 1680add_active_replica(Tab, Node) -> 1681 add_active_replica(Tab, Node, val({Tab, cstruct})). 1682 1683add_active_replica(Tab, Node, Cs = #cstruct{}) -> 1684 Storage = mnesia_lib:schema_cs_to_storage_type(Node, Cs), 1685 AccessMode = Cs#cstruct.access_mode, 1686 add_active_replica(Tab, Node, Storage, AccessMode). 1687 1688%% Block table primitives 1689 1690block_table(Tab) -> 1691 Var = {Tab, where_to_commit}, 1692 case is_tab_blocked(val(Var)) of 1693 {true, _} -> ok; 1694 {false, W2C} -> set(Var, mark_blocked_tab(true, W2C)) 1695 end. 1696 1697unblock_table(Tab) -> 1698 call({unblock_table, Tab}). 1699 1700is_tab_blocked(W2C) when is_list(W2C) -> 1701 {false, W2C}; 1702is_tab_blocked({blocked, W2C}) when is_list(W2C) -> 1703 {true, W2C}. 1704 1705mark_blocked_tab(true, Value) -> 1706 {blocked, Value}; 1707mark_blocked_tab(false, Value) -> 1708 Value. 1709 1710%% 1711 1712add_active_replica(Tab, Node, Storage, AccessMode) -> 1713 Var = {Tab, where_to_commit}, 1714 {Blocked, Old} = is_tab_blocked(val(Var)), 1715 Del = lists:keydelete(Node, 1, Old), 1716 case AccessMode of 1717 read_write -> 1718 New = lists:sort([{Node, Storage} | Del]), 1719 set(Var, mark_blocked_tab(Blocked, New)), % where_to_commit 1720 mnesia_lib:add_lsort({Tab, where_to_write}, Node); 1721 read_only -> 1722 set(Var, mark_blocked_tab(Blocked, Del)), 1723 mnesia_lib:del({Tab, where_to_write}, Node) 1724 end, 1725 1726 update_where_to_wlock(Tab), 1727 add({Tab, active_replicas}, Node). 1728 1729del_active_replica(Tab, Node) -> 1730 Var = {Tab, where_to_commit}, 1731 {Blocked, Old} = is_tab_blocked(val(Var)), 1732 Del = lists:keydelete(Node, 1, Old), 1733 New = lists:sort(Del), 1734 set(Var, mark_blocked_tab(Blocked, New)), % where_to_commit 1735 mnesia_lib:del({Tab, active_replicas}, Node), 1736 mnesia_lib:del({Tab, where_to_write}, Node), 1737 update_where_to_wlock(Tab). 1738 1739change_table_access_mode(Cs) -> 1740 W = fun() -> 1741 Tab = Cs#cstruct.name, 1742 lists:foreach(fun(N) -> add_active_replica(Tab, N, Cs) end, 1743 val({Tab, active_replicas})) 1744 end, 1745 update(W). 1746 1747change_table_majority(Cs) -> 1748 W = fun() -> 1749 Tab = Cs#cstruct.name, 1750 set({Tab, majority}, Cs#cstruct.majority), 1751 update_where_to_wlock(Tab) 1752 end, 1753 update(W). 1754 1755update_where_to_wlock(Tab) -> 1756 WNodes = val({Tab, where_to_write}), 1757 Majority = ?catch_val({Tab, majority}) == true, 1758 set({Tab, where_to_wlock}, {WNodes, Majority}). 1759 1760%% node To now has tab loaded, but this must be undone 1761%% This code is rpc:call'ed from the tab_copier process 1762%% when it has *not* released it's table lock 1763unannounce_add_table_copy(Tab, To) -> 1764 ?SAFE(del_active_replica(Tab, To)), 1765 try To = val({Tab , where_to_read}), 1766 mnesia_lib:set_remote_where_to_read(Tab) 1767 catch _:_ -> ignore 1768 end. 1769 1770user_sync_tab(Tab) -> 1771 case val(debug) of 1772 trace -> 1773 mnesia_subscr:subscribe(whereis(mnesia_event), {table, Tab}); 1774 _ -> 1775 ignore 1776 end, 1777 1778 case erase({sync_tab, Tab}) of 1779 undefined -> 1780 ok; 1781 Pids -> 1782 lists:foreach(fun(Pid) -> sync_reply(Pid, Tab) end, Pids) 1783 end. 1784 1785i_have_tab(Tab) -> 1786 case val({Tab, local_content}) of 1787 true -> 1788 mnesia_lib:set_local_content_whereabouts(Tab); 1789 false -> 1790 set({Tab, where_to_read}, node()) 1791 end, 1792 add_active_replica(Tab, node()). 1793 1794sync_and_block_table_whereabouts(Tab, ToNode, RemoteS, AccessMode) when Tab /= schema -> 1795 Current = val({current, db_nodes}), 1796 Ns = 1797 case lists:member(ToNode, Current) of 1798 true -> Current -- [ToNode]; 1799 false -> Current 1800 end, 1801 _ = remote_call(ToNode, block_table, [Tab]), 1802 [remote_call(Node, add_active_replica, [Tab, ToNode, RemoteS, AccessMode]) || 1803 Node <- [ToNode | Ns]], 1804 ok. 1805 1806sync_del_table_copy_whereabouts(Tab, ToNode) when Tab /= schema -> 1807 Current = val({current, db_nodes}), 1808 Ns = 1809 case lists:member(ToNode, Current) of 1810 true -> Current; 1811 false -> [ToNode | Current] 1812 end, 1813 Args = [Tab, ToNode], 1814 [remote_call(Node, unannounce_add_table_copy, Args) || Node <- Ns], 1815 ok. 1816 1817get_info(Timeout) -> 1818 case whereis(?SERVER_NAME) of 1819 undefined -> 1820 {timeout, Timeout}; 1821 Pid -> 1822 Pid ! {self(), get_state}, 1823 receive 1824 {?SERVER_NAME, State = #state{loader_queue=LQ,late_loader_queue=LLQ}} -> 1825 {info,State#state{loader_queue=gb_trees:to_list(LQ), 1826 late_loader_queue=gb_trees:to_list(LLQ)}} 1827 after Timeout -> 1828 {timeout, Timeout} 1829 end 1830 end. 1831 1832get_workers(Timeout) -> 1833 case whereis(?SERVER_NAME) of 1834 undefined -> 1835 {timeout, Timeout}; 1836 Pid -> 1837 Pid ! {self(), get_state}, 1838 receive 1839 {?SERVER_NAME, State = #state{}} -> 1840 {workers, get_loaders(State), get_senders(State), State#state.dumper_pid} 1841 after Timeout -> 1842 {timeout, Timeout} 1843 end 1844 end. 1845 1846info() -> 1847 Tabs = mnesia_lib:local_active_tables(), 1848 io:format( "---> Active tables <--- ~n", []), 1849 info(Tabs). 1850 1851info([Tab | Tail]) -> 1852 case val({Tab, storage_type}) of 1853 disc_only_copies -> 1854 info_format(Tab, 1855 dets:info(Tab, size), 1856 dets:info(Tab, file_size), 1857 "bytes on disc"); 1858 {ext, Alias, Mod} -> 1859 info_format(Tab, 1860 Mod:info(Alias, Tab, size), 1861 Mod:info(Alias, Tab, memory), 1862 "words of mem"); 1863 _ -> 1864 info_format(Tab, 1865 ?ets_info(Tab, size), 1866 ?ets_info(Tab, memory), 1867 "words of mem") 1868 end, 1869 info(Tail); 1870info([]) -> ok. 1871 1872 1873info_format(Tab, Size, Mem, Media) -> 1874 StrT = mnesia_lib:pad_name(atom_to_list(Tab), 15, []), 1875 StrS = mnesia_lib:pad_name(integer_to_list(Size), 8, []), 1876 StrM = mnesia_lib:pad_name(integer_to_list(Mem), 8, []), 1877 io:format("~ts: with ~s records occupying ~s ~s~n", 1878 [StrT, StrS, StrM, Media]). 1879 1880%% Handle early arrived messages 1881handle_early_msgs([Msg | Msgs], State) -> 1882 %% The messages are in reverse order 1883 case handle_early_msg(Msg, State) of 1884%% {stop, Reason, Reply, State2} -> % Will not happen according to dialyzer 1885%% {stop, Reason, Reply, State2}; 1886 {stop, Reason, State2} -> 1887 {stop, Reason, State2}; 1888 {noreply, State2} -> 1889 handle_early_msgs(Msgs, State2); 1890 {reply, Reply, State2} -> 1891 {call, _Call, From} = Msg, 1892 reply(From, Reply), 1893 handle_early_msgs(Msgs, State2) 1894 end; 1895handle_early_msgs([], State) -> 1896 noreply(State). 1897 1898handle_early_msg({call, Msg, From}, State) -> 1899 handle_call(Msg, From, State); 1900handle_early_msg({cast, Msg}, State) -> 1901 handle_cast(Msg, State); 1902handle_early_msg({info, Msg}, State) -> 1903 handle_info(Msg, State). 1904 1905noreply(State) -> 1906 {noreply, State}. 1907 1908reply(undefined, Reply) -> 1909 Reply; 1910reply(ReplyTo, Reply) -> 1911 gen_server:reply(ReplyTo, Reply), 1912 Reply. 1913 1914%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 1915%% Worker management 1916 1917%% Returns new State 1918add_worker(Worker = #dump_log{}, State) -> 1919 InitBy = Worker#dump_log.initiated_by, 1920 Queue = State#state.dumper_queue, 1921 Status = 1922 case lists:keymember(InitBy, #dump_log.initiated_by, Queue) of 1923 true when Worker#dump_log.opt_reply_to == undefined -> 1924 %% The same threshold has been exceeded again, 1925 %% before we have had the possibility to 1926 %% process the older one. 1927 DetectedBy = {dump_log, InitBy}, 1928 Event = {mnesia_overload, DetectedBy}, 1929 mnesia_lib:report_system_event(Event), 1930 true; 1931 _ -> 1932 false 1933 end, 1934 mnesia_recover:log_dump_overload(Status), 1935 Queue2 = Queue ++ [Worker], 1936 State2 = State#state{dumper_queue = Queue2}, 1937 opt_start_worker(State2); 1938add_worker(Worker = #schema_commit_lock{}, State) -> 1939 Queue = State#state.dumper_queue, 1940 Queue2 = Queue ++ [Worker], 1941 State2 = State#state{dumper_queue = Queue2}, 1942 opt_start_worker(State2); 1943add_worker(Worker = #net_load{}, State) -> 1944 opt_start_worker(add_loader(Worker#net_load.table,Worker,State)); 1945add_worker(Worker = #send_table{}, State) -> 1946 Queue = State#state.sender_queue, 1947 State2 = State#state{sender_queue = Queue ++ [Worker]}, 1948 opt_start_worker(State2); 1949add_worker(Worker = #disc_load{}, State) -> 1950 opt_start_worker(add_loader(Worker#disc_load.table,Worker,State)); 1951% Block controller should be used for upgrading mnesia. 1952add_worker(Worker = #block_controller{}, State) -> 1953 Queue = State#state.dumper_queue, 1954 Queue2 = [Worker | Queue], 1955 State2 = State#state{dumper_queue = Queue2}, 1956 opt_start_worker(State2). 1957 1958add_loader(Tab,Worker,State = #state{loader_queue=LQ0}) -> 1959 case gb_trees:is_defined(Tab, LQ0) of 1960 true -> State; 1961 false -> 1962 LQ=gb_trees:insert(Tab, Worker, LQ0), 1963 State#state{loader_queue=LQ} 1964 end. 1965 1966%% Optionally start a worker 1967%% 1968%% Dumpers and loaders may run simultaneously 1969%% but neither of them may run during schema commit. 1970%% Loaders may not start if a schema commit is enqueued. 1971opt_start_worker(State) when State#state.is_stopping == true -> 1972 State; 1973opt_start_worker(State) -> 1974 %% Prioritize dumper and schema commit 1975 %% by checking them first 1976 case State#state.dumper_queue of 1977 [Worker | _Rest] when State#state.dumper_pid == undefined -> 1978 %% Great, a worker in queue and neither 1979 %% a schema transaction is being 1980 %% committed and nor a dumper is running 1981 1982 %% Start worker but keep him in the queue 1983 if 1984 is_record(Worker, schema_commit_lock) -> 1985 ReplyTo = Worker#schema_commit_lock.owner, 1986 reply(ReplyTo, granted), 1987 {Owner, _Tag} = ReplyTo, 1988 opt_start_loader(State#state{dumper_pid = Owner}); 1989 1990 is_record(Worker, dump_log) -> 1991 Pid = spawn_link(?MODULE, dump_and_reply, [self(), Worker]), 1992 State2 = State#state{dumper_pid = Pid}, 1993 1994 %% If the worker was a dumper we may 1995 %% possibly be able to start a loader 1996 %% or sender 1997 State3 = opt_start_sender(State2), 1998 opt_start_loader(State3); 1999 2000 is_record(Worker, block_controller) -> 2001 case {get_senders(State), get_loaders(State)} of 2002 {[], []} -> 2003 ReplyTo = Worker#block_controller.owner, 2004 reply(ReplyTo, granted), 2005 {Owner, _Tag} = ReplyTo, 2006 State#state{dumper_pid = Owner}; 2007 _ -> 2008 State 2009 end 2010 end; 2011 _ -> 2012 %% Bad luck, try with a loader or sender instead 2013 State2 = opt_start_sender(State), 2014 opt_start_loader(State2) 2015 end. 2016 2017opt_start_sender(State) -> 2018 case State#state.sender_queue of 2019 []-> State; %% No need 2020 SenderQ -> 2021 {NewS,Kept} = opt_start_sender2(SenderQ, get_senders(State), 2022 [], get_loaders(State)), 2023 State#state{sender_pid = NewS, sender_queue = Kept} 2024 end. 2025 2026opt_start_sender2([], Pids,Kept, _) -> {Pids,Kept}; 2027opt_start_sender2([Sender|R], Pids, Kept, LoaderQ) -> 2028 Tab = Sender#send_table.table, 2029 Active = val({Tab, active_replicas}), 2030 IgotIt = lists:member(node(), Active), 2031 IsLoading = lists:any(fun({_Pid,Loader}) -> 2032 Tab == element(#net_load.table, Loader) 2033 end, LoaderQ), 2034 if 2035 IgotIt, IsLoading -> 2036 %% I'm currently finishing loading the table let him wait 2037 opt_start_sender2(R,Pids, [Sender|Kept], LoaderQ); 2038 IgotIt -> 2039 %% Start worker but keep him in the queue 2040 Pid = spawn_link(?MODULE, send_and_reply,[self(), Sender]), 2041 opt_start_sender2(R,[{Pid,Sender}|Pids],Kept,LoaderQ); 2042 true -> 2043 verbose("Send table failed ~tp not active on this node ~n", [Tab]), 2044 Sender#send_table.receiver_pid ! {copier_done, node()}, 2045 opt_start_sender2(R,Pids, Kept, LoaderQ) 2046 end. 2047 2048opt_start_loader(State = #state{loader_queue = LoaderQ}) -> 2049 Current = get_loaders(State), 2050 Max = max_loaders(), 2051 case gb_trees:is_empty(LoaderQ) of 2052 true -> 2053 State; 2054 _ when length(Current) >= Max -> 2055 State; 2056 false -> 2057 SchemaQueue = State#state.dumper_queue, 2058 case lists:keymember(schema_commit_lock, 1, SchemaQueue) of 2059 false -> 2060 case pick_next(LoaderQ) of 2061 {none,Rest} -> 2062 State#state{loader_queue=Rest}; 2063 {Worker,Rest} -> 2064 case already_loading(Worker, get_loaders(State)) of 2065 true -> 2066 opt_start_loader(State#state{loader_queue = Rest}); 2067 false -> 2068 %% Start worker but keep him in the queue 2069 Pid = load_and_reply(self(), Worker), 2070 State#state{loader_pid=[{Pid,Worker}|get_loaders(State)], 2071 loader_queue = Rest} 2072 end 2073 end; 2074 true -> 2075 %% Bad luck, we must wait for the schema commit 2076 State 2077 end 2078 end. 2079 2080already_loading(#net_load{table=Tab},Loaders) -> 2081 already_loading2(Tab,Loaders); 2082already_loading(#disc_load{table=Tab},Loaders) -> 2083 already_loading2(Tab,Loaders). 2084 2085already_loading2(Tab, [{_,#net_load{table=Tab}}|_]) -> true; 2086already_loading2(Tab, [{_,#disc_load{table=Tab}}|_]) -> true; 2087already_loading2(Tab, [_|Rest]) -> already_loading2(Tab,Rest); 2088already_loading2(_,[]) -> false. 2089 2090start_remote_sender(Node, Tab, Receiver, Storage) -> 2091 Msg = #send_table{table = Tab, 2092 receiver_pid = Receiver, 2093 remote_storage = Storage}, 2094 gen_server:cast({?SERVER_NAME, Node}, Msg). 2095 2096dump_and_reply(ReplyTo, Worker) -> 2097 %% No trap_exit, die intentionally instead 2098 Res = case Worker#dump_log.operation of 2099 dump_log -> 2100 mnesia_dumper:opt_dump_log(Worker#dump_log.initiated_by); 2101 F when is_function(F, 0) -> 2102 F() 2103 end, 2104 ReplyTo ! #dumper_done{worker_pid = self(), 2105 worker_res = Res}, 2106 unlink(ReplyTo), 2107 exit(normal). 2108 2109send_and_reply(ReplyTo, Worker) -> 2110 %% No trap_exit, die intentionally instead 2111 Res = mnesia_loader:send_table(Worker#send_table.receiver_pid, 2112 Worker#send_table.table, 2113 Worker#send_table.remote_storage), 2114 ReplyTo ! #sender_done{worker_pid = self(), 2115 worker_res = Res}, 2116 unlink(ReplyTo), 2117 exit(normal). 2118 2119load_and_reply(ReplyTo, Worker) -> 2120 Load = load_table_fun(Worker), 2121 SendAndReply = 2122 fun() -> 2123 process_flag(trap_exit, true), 2124 Done = Load(), 2125 ReplyTo ! Done#loader_done{worker_pid = self()}, 2126 unlink(ReplyTo), 2127 exit(normal) 2128 end, 2129 spawn_link(SendAndReply). 2130 2131%% Now it is time to load the table 2132%% but first we must check if it still is neccessary 2133load_table_fun(#net_load{cstruct=Cs, table=Tab, reason=Reason, opt_reply_to=ReplyTo}) -> 2134 LocalC = val({Tab, local_content}), 2135 AccessMode = val({Tab, access_mode}), 2136 ReadNode = val({Tab, where_to_read}), 2137 Active = filter_active(Tab), 2138 Done = #loader_done{is_loaded = true, 2139 table_name = Tab, 2140 needs_announce = false, 2141 needs_sync = false, 2142 needs_reply = (ReplyTo /= undefined), 2143 reply_to = ReplyTo, 2144 reply = {loaded, ok} 2145 }, 2146 AddTableCopy = case Reason of 2147 {dumper,{add_table_copy,_}} -> true; 2148 _ -> false 2149 end, 2150 2151 OnlyRamCopies = case Cs of 2152 #cstruct{disc_copies = DC, 2153 disc_only_copies = DOC, 2154 external_copies = Ext} -> 2155 [] =:= (DC ++ (DOC ++ Ext)) -- [node()]; 2156 _ -> 2157 false 2158 end, 2159 if 2160 ReadNode == node() -> 2161 %% Already loaded locally 2162 fun() -> Done end; 2163 LocalC == true -> 2164 fun() -> 2165 Res = mnesia_loader:disc_load_table(Tab, load_local_content), 2166 Done#loader_done{reply = Res, needs_announce = true, needs_sync = true} 2167 end; 2168 AccessMode == read_only, not AddTableCopy -> 2169 fun() -> disc_load_table(Tab, Reason, ReplyTo) end; 2170 Active =:= [], AddTableCopy, OnlyRamCopies -> 2171 fun() -> disc_load_table(Tab, Reason, ReplyTo) end; 2172 true -> 2173 fun() -> 2174 %% Either we cannot read the table yet 2175 %% or someone is moving a replica between 2176 %% two nodes 2177 Res = mnesia_loader:net_load_table(Tab, Reason, Active, Cs), 2178 case Res of 2179 {loaded, ok} -> 2180 Done#loader_done{needs_sync = true, 2181 reply = Res}; 2182 {not_loaded, _} -> 2183 Done#loader_done{is_loaded = false, 2184 reply = Res} 2185 end 2186 end 2187 end; 2188load_table_fun(#disc_load{table=Tab, reason=Reason, opt_reply_to=ReplyTo}) -> 2189 ReadNode = val({Tab, where_to_read}), 2190 Active = filter_active(Tab), 2191 Done = #loader_done{is_loaded = true, 2192 table_name = Tab, 2193 needs_announce = false, 2194 needs_sync = false, 2195 needs_reply = false 2196 }, 2197 if 2198 Active == [], ReadNode == nowhere -> 2199 %% Not loaded anywhere, lets load it from disc 2200 fun() -> disc_load_table(Tab, Reason, ReplyTo) end; 2201 ReadNode == nowhere -> 2202 %% Already loaded on other node, lets get it 2203 Cs = val({Tab, cstruct}), 2204 fun() -> 2205 case mnesia_loader:net_load_table(Tab, Reason, Active, Cs) of 2206 {loaded, ok} -> 2207 Done#loader_done{needs_sync = true}; 2208 {not_loaded, storage_unknown} -> 2209 Done#loader_done{is_loaded = false}; 2210 {not_loaded, ErrReason} -> 2211 Done#loader_done{is_loaded = false, 2212 reply = {not_loaded,ErrReason}} 2213 end 2214 end; 2215 true -> 2216 %% Already readable, do not worry be happy 2217 fun() -> Done end 2218 end. 2219 2220disc_load_table(Tab, Reason, ReplyTo) -> 2221 Done = #loader_done{is_loaded = true, 2222 table_name = Tab, 2223 needs_announce = false, 2224 needs_sync = false, 2225 needs_reply = ReplyTo /= undefined, 2226 reply_to = ReplyTo, 2227 reply = {loaded, ok} 2228 }, 2229 Res = mnesia_loader:disc_load_table(Tab, Reason), 2230 if 2231 Res == {loaded, ok} -> 2232 Done#loader_done{needs_announce = true, 2233 needs_sync = true, 2234 reply = Res}; 2235 ReplyTo /= undefined -> 2236 Done#loader_done{is_loaded = false, 2237 reply = Res}; 2238 true -> 2239 fatal("Cannot load table ~tp from disc: ~tp~n", [Tab, Res]) 2240 end. 2241 2242filter_active(Tab) -> 2243 ByForce = val({Tab, load_by_force}), 2244 Active = val({Tab, active_replicas}), 2245 Masters = mnesia_recover:get_master_nodes(Tab), 2246 Ns = do_filter_active(ByForce, Active, Masters), 2247 %% Reorder the so that we load from fastest first 2248 LS = ?catch_val({Tab, storage_type}), 2249 DOC = val({Tab, disc_only_copies}), 2250 {Good,Worse} = 2251 case LS of 2252 disc_only_copies -> 2253 G = mnesia_lib:intersect(Ns, DOC), 2254 {G,Ns--G}; 2255 _ -> 2256 G = Ns -- DOC, 2257 {G,Ns--G} 2258 end, 2259 %% Pick a random node of the fastest 2260 Len = length(Good), 2261 if 2262 Len > 0 -> 2263 R = erlang:phash(node(), Len+1), 2264 random(R-1,Good,Worse); 2265 true -> 2266 Worse 2267 end. 2268 2269random(N, [H|R], Acc) when N > 0 -> 2270 random(N-1,R, [H|Acc]); 2271random(0, L, Acc) -> 2272 L ++ Acc. 2273 2274do_filter_active(true, Active, _Masters) -> 2275 Active; 2276do_filter_active(false, Active, []) -> 2277 Active; 2278do_filter_active(false, Active, Masters) -> 2279 mnesia_lib:intersect(Active, Masters). 2280 2281 2282