1%% ``Licensed under the Apache License, Version 2.0 (the "License"); 2%% you may not use this file except in compliance with the License. 3%% You may obtain a copy of the License at 4%% 5%% http://www.apache.org/licenses/LICENSE-2.0 6%% 7%% Unless required by applicable law or agreed to in writing, software 8%% distributed under the License is distributed on an "AS IS" BASIS, 9%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 10%% See the License for the specific language governing permissions and 11%% limitations under the License. 12%% 13%% The Initial Developer of the Original Code is Ericsson Utvecklings AB. 14%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings 15%% AB. All Rights Reserved.'' 16%% 17%% $Id: mnesia_controller.erl,v 1.3 2010/03/04 13:54:19 maria Exp $ 18%% The mnesia_init process loads tables from local disc or from 19%% another nodes. It also coordinates updates of the info about 20%% where we can read and write tables. 21%% 22%% Tables may need to be loaded initially at startup of the local 23%% node or when other nodes announces that they already have loaded 24%% tables that we also want. 25%% 26%% Initially we set the load request queue to those tables that we 27%% safely can load locally, i.e. tables where we have the last 28%% consistent replica and we have received mnesia_down from all 29%% other nodes holding the table. Then we let the mnesia_init 30%% process enter its normal working state. 31%% 32%% When we need to load a table we append a request to the load 33%% request queue. All other requests are regarded as high priority 34%% and are processed immediately (e.g. update table whereabouts). 35%% We processes the load request queue as a "background" job.. 36 37-module(mnesia_controller). 38 39-behaviour(gen_server). 40 41%% Mnesia internal stuff 42-export([ 43 start/0, 44 i_have_tab/1, 45 info/0, 46 get_info/1, 47 get_workers/1, 48 force_load_table/1, 49 async_dump_log/1, 50 sync_dump_log/1, 51 connect_nodes/1, 52 wait_for_schema_commit_lock/0, 53 release_schema_commit_lock/0, 54 create_table/1, 55 get_disc_copy/1, 56 get_cstructs/0, 57 sync_and_block_table_whereabouts/4, 58 sync_del_table_copy_whereabouts/2, 59 block_table/1, 60 unblock_table/1, 61 block_controller/0, 62 unblock_controller/0, 63 unannounce_add_table_copy/2, 64 master_nodes_updated/2, 65 mnesia_down/1, 66 add_active_replica/2, 67 add_active_replica/3, 68 add_active_replica/4, 69 change_table_access_mode/1, 70 del_active_replica/2, 71 wait_for_tables/2, 72 get_network_copy/2, 73 merge_schema/0, 74 start_remote_sender/4, 75 schedule_late_disc_load/2 76 ]). 77 78%% gen_server callbacks 79-export([init/1, 80 handle_call/3, 81 handle_cast/2, 82 handle_info/2, 83 terminate/2, 84 code_change/3]). 85 86%% Module internal stuff 87-export([call/1, 88 cast/1, 89 dump_and_reply/2, 90 load_and_reply/2, 91 send_and_reply/2, 92 wait_for_tables_init/2 93 ]). 94 95-import(mnesia_lib, [set/2, add/2]). 96-import(mnesia_lib, [fatal/2, error/2, verbose/2, dbg_out/2]). 97 98-include("mnesia.hrl"). 99 100-define(SERVER_NAME, ?MODULE). 101 102-record(state, {supervisor, 103 schema_is_merged = false, 104 early_msgs = [], 105 loader_pid, 106 loader_queue = [], 107 sender_pid, 108 sender_queue = [], 109 late_loader_queue = [], 110 dumper_pid, % Dumper or schema commit pid 111 dumper_queue = [], % Dumper or schema commit queue 112 dump_log_timer_ref, 113 is_stopping = false 114 }). 115 116-record(worker_reply, {what, 117 pid, 118 result 119 }). 120 121-record(schema_commit_lock, {owner}). 122-record(block_controller, {owner}). 123 124-record(dump_log, {initiated_by, 125 opt_reply_to 126 }). 127 128-record(net_load, {table, 129 reason, 130 opt_reply_to, 131 cstruct = unknown 132 }). 133 134-record(send_table, {table, 135 receiver_pid, 136 remote_storage 137 }). 138 139-record(disc_load, {table, 140 reason, 141 opt_reply_to 142 }). 143 144-record(late_load, {table, 145 reason, 146 opt_reply_to, 147 loaders 148 }). 149 150-record(loader_done, {worker_pid, 151 is_loaded, 152 table_name, 153 needs_announce, 154 needs_sync, 155 needs_reply, 156 reply_to, 157 reply}). 158 159-record(sender_done, {worker_pid, 160 worker_res, 161 table_name 162 }). 163 164-record(dumper_done, {worker_pid, 165 worker_res 166 }). 167 168val(Var) -> 169 case ?catch_val(Var) of 170 {'EXIT', Reason} -> mnesia_lib:other_val(Var, Reason); 171 Value -> Value 172 end. 173 174start() -> 175 gen_server:start_link({local, ?SERVER_NAME}, ?MODULE, [self()], 176 [{timeout, infinity} 177 %% ,{debug, [trace]} 178 ]). 179 180sync_dump_log(InitBy) -> 181 call({sync_dump_log, InitBy}). 182 183async_dump_log(InitBy) -> 184 ?SERVER_NAME ! {async_dump_log, InitBy}. 185 186%% Wait for tables to be active 187%% If needed, we will wait for Mnesia to start 188%% If Mnesia stops, we will wait for Mnesia to restart 189%% We will wait even if the list of tables is empty 190%% 191wait_for_tables(Tabs, Timeout) when list(Tabs), Timeout == infinity -> 192 do_wait_for_tables(Tabs, Timeout); 193wait_for_tables(Tabs, Timeout) when list(Tabs), 194 integer(Timeout), Timeout >= 0 -> 195 do_wait_for_tables(Tabs, Timeout); 196wait_for_tables(Tabs, Timeout) -> 197 {error, {badarg, Tabs, Timeout}}. 198 199do_wait_for_tables(Tabs, 0) -> 200 reply_wait(Tabs); 201do_wait_for_tables(Tabs, Timeout) -> 202 Pid = spawn_link(?MODULE, wait_for_tables_init, [self(), Tabs]), 203 receive 204 {?SERVER_NAME, Pid, Res} -> 205 Res; 206 207 {'EXIT', Pid, _} -> 208 reply_wait(Tabs) 209 210 after Timeout -> 211 unlink(Pid), 212 exit(Pid, timeout), 213 reply_wait(Tabs) 214 end. 215 216reply_wait(Tabs) -> 217 case catch mnesia_lib:active_tables() of 218 {'EXIT', _} -> 219 {error, {node_not_running, node()}}; 220 Active when list(Active) -> 221 case Tabs -- Active of 222 [] -> 223 ok; 224 BadTabs -> 225 {timeout, BadTabs} 226 end 227 end. 228 229wait_for_tables_init(From, Tabs) -> 230 process_flag(trap_exit, true), 231 Res = wait_for_init(From, Tabs, whereis(?SERVER_NAME)), 232 From ! {?SERVER_NAME, self(), Res}, 233 unlink(From), 234 exit(normal). 235 236wait_for_init(From, Tabs, Init) -> 237 case catch link(Init) of 238 {'EXIT', _} -> 239 %% Mnesia is not started 240 {error, {node_not_running, node()}}; 241 true when pid(Init) -> 242 cast({sync_tabs, Tabs, self()}), 243 rec_tabs(Tabs, Tabs, From, Init) 244 end. 245 246sync_reply(Waiter, Tab) -> 247 Waiter ! {?SERVER_NAME, {tab_synced, Tab}}. 248 249rec_tabs([Tab | Tabs], AllTabs, From, Init) -> 250 receive 251 {?SERVER_NAME, {tab_synced, Tab}} -> 252 rec_tabs(Tabs, AllTabs, From, Init); 253 254 {'EXIT', From, _} -> 255 %% This will trigger an exit signal 256 %% to mnesia_init 257 exit(wait_for_tables_timeout); 258 259 {'EXIT', Init, _} -> 260 %% Oops, mnesia_init stopped, 261 exit(mnesia_stopped) 262 end; 263rec_tabs([], _, _, Init) -> 264 unlink(Init), 265 ok. 266 267get_cstructs() -> 268 call(get_cstructs). 269 270mnesia_down(Node) -> 271 case cast({mnesia_down, Node}) of 272 {error, _} -> mnesia_monitor:mnesia_down(?SERVER_NAME, Node); 273 _Pid -> ok 274 end. 275wait_for_schema_commit_lock() -> 276 link(whereis(?SERVER_NAME)), 277 unsafe_call(wait_for_schema_commit_lock). 278 279block_controller() -> 280 call(block_controller). 281 282unblock_controller() -> 283 cast(unblock_controller). 284 285release_schema_commit_lock() -> 286 cast({release_schema_commit_lock, self()}), 287 unlink(whereis(?SERVER_NAME)). 288 289%% Special for preparation of add table copy 290get_network_copy(Tab, Cs) -> 291 Work = #net_load{table = Tab, 292 reason = {dumper, add_table_copy}, 293 cstruct = Cs 294 }, 295 Res = (catch load_table(Work)), 296 if Res#loader_done.is_loaded == true -> 297 Tab = Res#loader_done.table_name, 298 case Res#loader_done.needs_announce of 299 true -> 300 i_have_tab(Tab); 301 false -> 302 ignore 303 end; 304 true -> ignore 305 end, 306 307 receive %% Flush copier done message 308 {copier_done, _Node} -> 309 ok 310 after 500 -> %% avoid hanging if something is wrong and we shall fail. 311 ignore 312 end, 313 Res#loader_done.reply. 314 315%% This functions is invoked from the dumper 316%% 317%% There are two cases here: 318%% startup -> 319%% no need for sync, since mnesia_controller not started yet 320%% schema_trans -> 321%% already synced with mnesia_controller since the dumper 322%% is syncronously started from mnesia_controller 323 324create_table(Tab) -> 325 {loaded, ok} = mnesia_loader:disc_load_table(Tab, {dumper,create_table}). 326 327get_disc_copy(Tab) -> 328 disc_load_table(Tab, {dumper,change_table_copy_type}, undefined). 329 330%% Returns ok instead of yes 331force_load_table(Tab) when atom(Tab), Tab /= schema -> 332 case ?catch_val({Tab, storage_type}) of 333 ram_copies -> 334 do_force_load_table(Tab); 335 disc_copies -> 336 do_force_load_table(Tab); 337 disc_only_copies -> 338 do_force_load_table(Tab); 339 unknown -> 340 set({Tab, load_by_force}, true), 341 cast({force_load_updated, Tab}), 342 wait_for_tables([Tab], infinity); 343 {'EXIT', _} -> 344 {error, {no_exists, Tab}} 345 end; 346force_load_table(Tab) -> 347 {error, {bad_type, Tab}}. 348 349do_force_load_table(Tab) -> 350 Loaded = ?catch_val({Tab, load_reason}), 351 case Loaded of 352 unknown -> 353 set({Tab, load_by_force}, true), 354 mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user), 355 wait_for_tables([Tab], infinity); 356 {'EXIT', _} -> 357 set({Tab, load_by_force}, true), 358 mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user), 359 wait_for_tables([Tab], infinity); 360 _ -> 361 ok 362 end. 363master_nodes_updated(schema, _Masters) -> 364 ignore; 365master_nodes_updated(Tab, Masters) -> 366 cast({master_nodes_updated, Tab, Masters}). 367 368schedule_late_disc_load(Tabs, Reason) -> 369 MsgTag = late_disc_load, 370 try_schedule_late_disc_load(Tabs, Reason, MsgTag). 371 372try_schedule_late_disc_load(Tabs, _Reason, MsgTag) 373 when Tabs == [], MsgTag /= schema_is_merged -> 374 ignore; 375try_schedule_late_disc_load(Tabs, Reason, MsgTag) -> 376 GetIntents = 377 fun() -> 378 Item = mnesia_late_disc_load, 379 Nodes = val({current, db_nodes}), 380 mnesia:lock({global, Item, Nodes}, write), 381 case multicall(Nodes -- [node()], disc_load_intents) of 382 {Replies, []} -> 383 call({MsgTag, Tabs, Reason, Replies}), 384 done; 385 {_, BadNodes} -> 386 %% Some nodes did not respond, lets try again 387 {retry, BadNodes} 388 end 389 end, 390 case mnesia:transaction(GetIntents) of 391 {'atomic', done} -> 392 done; 393 {'atomic', {retry, BadNodes}} -> 394 verbose("Retry late_load_tables because bad nodes: ~p~n", 395 [BadNodes]), 396 try_schedule_late_disc_load(Tabs, Reason, MsgTag); 397 {aborted, AbortReason} -> 398 fatal("Cannot late_load_tables~p: ~p~n", 399 [[Tabs, Reason, MsgTag], AbortReason]) 400 end. 401 402connect_nodes(Ns) -> 403 case mnesia:system_info(is_running) of 404 no -> 405 {error, {node_not_running, node()}}; 406 yes -> 407 {NewC, OldC} = mnesia_recover:connect_nodes(Ns), 408 Connected = NewC ++OldC, 409 New1 = mnesia_lib:intersect(Ns, Connected), 410 New = New1 -- val({current, db_nodes}), 411 412 case try_merge_schema(New) of 413 ok -> 414 mnesia_lib:add_list(extra_db_nodes, New), 415 {ok, New}; 416 {aborted, {throw, Str}} when list(Str) -> 417 %%mnesia_recover:disconnect_nodes(New), 418 {error, {merge_schema_failed, lists:flatten(Str)}}; 419 Else -> 420 %% Unconnect nodes where merge failed!! 421 %% mnesia_recover:disconnect_nodes(New), 422 {error, Else} 423 end 424 end. 425 426%% Merge the local schema with the schema on other nodes. 427%% But first we must let all processes that want to force 428%% load tables wait until the schema merge is done. 429 430merge_schema() -> 431 AllNodes = mnesia_lib:all_nodes(), 432 case try_merge_schema(AllNodes) of 433 ok -> 434 schema_is_merged(); 435 {aborted, {throw, Str}} when list(Str) -> 436 fatal("Failed to merge schema: ~s~n", [Str]); 437 Else -> 438 fatal("Failed to merge schema: ~p~n", [Else]) 439 end. 440 441try_merge_schema(Nodes) -> 442 case mnesia_schema:merge_schema() of 443 {'atomic', not_merged} -> 444 %% No more nodes that we need to merge the schema with 445 ok; 446 {'atomic', {merged, OldFriends, NewFriends}} -> 447 %% Check if new nodes has been added to the schema 448 Diff = mnesia_lib:all_nodes() -- [node() | Nodes], 449 mnesia_recover:connect_nodes(Diff), 450 451 %% Tell everybody to adopt orphan tables 452 im_running(OldFriends, NewFriends), 453 im_running(NewFriends, OldFriends), 454 455 try_merge_schema(Nodes); 456 {'atomic', {"Cannot get cstructs", Node, Reason}} -> 457 dbg_out("Cannot get cstructs, Node ~p ~p~n", [Node, Reason]), 458 timer:sleep(1000), % Avoid a endless loop look alike 459 try_merge_schema(Nodes); 460 Other -> 461 Other 462 end. 463 464im_running(OldFriends, NewFriends) -> 465 abcast(OldFriends, {im_running, node(), NewFriends}). 466 467schema_is_merged() -> 468 MsgTag = schema_is_merged, 469 SafeLoads = initial_safe_loads(), 470 471 %% At this point we do not know anything about 472 %% which tables that the other nodes already 473 %% has loaded and therefore we let the normal 474 %% processing of the loader_queue take care 475 %% of it, since we at that time point will 476 %% know the whereabouts. We rely on the fact 477 %% that all nodes tells each other directly 478 %% when they have loaded a table and are 479 %% willing to share it. 480 481 try_schedule_late_disc_load(SafeLoads, initial, MsgTag). 482 483 484cast(Msg) -> 485 case whereis(?SERVER_NAME) of 486 undefined ->{error, {node_not_running, node()}}; 487 Pid -> gen_server:cast(Pid, Msg) 488 end. 489 490abcast(Nodes, Msg) -> 491 gen_server:abcast(Nodes, ?SERVER_NAME, Msg). 492 493unsafe_call(Msg) -> 494 case whereis(?SERVER_NAME) of 495 undefined -> {error, {node_not_running, node()}}; 496 Pid -> gen_server:call(Pid, Msg, infinity) 497 end. 498 499call(Msg) -> 500 case whereis(?SERVER_NAME) of 501 undefined -> 502 {error, {node_not_running, node()}}; 503 Pid -> 504 link(Pid), 505 Res = gen_server:call(Pid, Msg, infinity), 506 unlink(Pid), 507 508 %% We get an exit signal if server dies 509 receive 510 {'EXIT', Pid, _Reason} -> 511 {error, {node_not_running, node()}} 512 after 0 -> 513 ignore 514 end, 515 Res 516 end. 517 518remote_call(Node, Func, Args) -> 519 case catch gen_server:call({?MODULE, Node}, {Func, Args, self()}, infinity) of 520 {'EXIT', Error} -> 521 {error, Error}; 522 Else -> 523 Else 524 end. 525 526multicall(Nodes, Msg) -> 527 {Good, Bad} = gen_server:multi_call(Nodes, ?MODULE, Msg, infinity), 528 PatchedGood = [Reply || {_Node, Reply} <- Good], 529 {PatchedGood, Bad}. %% Make the replies look like rpc:multicalls.. 530%% rpc:multicall(Nodes, ?MODULE, call, [Msg]). 531 532%%%---------------------------------------------------------------------- 533%%% Callback functions from gen_server 534%%%---------------------------------------------------------------------- 535 536%%---------------------------------------------------------------------- 537%% Func: init/1 538%% Returns: {ok, State} | 539%% {ok, State, Timeout} | 540%% {stop, Reason} 541%%---------------------------------------------------------------------- 542init([Parent]) -> 543 process_flag(trap_exit, true), 544 mnesia_lib:verbose("~p starting: ~p~n", [?SERVER_NAME, self()]), 545 546 %% Handshake and initialize transaction recovery 547 %% for new nodes detected in the schema 548 All = mnesia_lib:all_nodes(), 549 Diff = All -- [node() | val(original_nodes)], 550 mnesia_lib:unset(original_nodes), 551 mnesia_recover:connect_nodes(Diff), 552 553 Interval = mnesia_monitor:get_env(dump_log_time_threshold), 554 Msg = {async_dump_log, time_threshold}, 555 {ok, Ref} = timer:send_interval(Interval, Msg), 556 mnesia_dumper:start_regulator(), 557 558 {ok, #state{supervisor = Parent, dump_log_timer_ref = Ref}}. 559 560%%---------------------------------------------------------------------- 561%% Func: handle_call/3 562%% Returns: {reply, Reply, State} | 563%% {reply, Reply, State, Timeout} | 564%% {noreply, State} | 565%% {noreply, State, Timeout} | 566%% {stop, Reason, Reply, State} | (terminate/2 is called) 567%% {stop, Reason, Reply, State} (terminate/2 is called) 568%%---------------------------------------------------------------------- 569 570handle_call({sync_dump_log, InitBy}, From, State) -> 571 Worker = #dump_log{initiated_by = InitBy, 572 opt_reply_to = From 573 }, 574 State2 = add_worker(Worker, State), 575 noreply(State2); 576 577handle_call(wait_for_schema_commit_lock, From, State) -> 578 Worker = #schema_commit_lock{owner = From}, 579 State2 = add_worker(Worker, State), 580 noreply(State2); 581 582handle_call(block_controller, From, State) -> 583 Worker = #block_controller{owner = From}, 584 State2 = add_worker(Worker, State), 585 noreply(State2); 586 587 588handle_call(get_cstructs, From, State) -> 589 Tabs = val({schema, tables}), 590 Cstructs = [val({T, cstruct}) || T <- Tabs], 591 Running = val({current, db_nodes}), 592 reply(From, {cstructs, Cstructs, Running}), 593 noreply(State); 594 595handle_call({schema_is_merged, TabsR, Reason, RemoteLoaders}, From, State) -> 596 State2 = late_disc_load(TabsR, Reason, RemoteLoaders, From, State), 597 598 %% Handle early messages 599 Msgs = State2#state.early_msgs, 600 State3 = State2#state{early_msgs = [], schema_is_merged = true}, 601 Ns = val({current, db_nodes}), 602 dbg_out("Schema is merged ~w, State ~w~n", [Ns, State3]), 603%% dbg_out("handle_early_msgs ~p ~n", [Msgs]), % qqqq 604 handle_early_msgs(lists:reverse(Msgs), State3); 605 606handle_call(disc_load_intents, From, State) -> 607 Tabs = disc_load_intents(State#state.loader_queue) ++ 608 disc_load_intents(State#state.late_loader_queue), 609 ActiveTabs = mnesia_lib:local_active_tables(), 610 reply(From, {ok, node(), mnesia_lib:union(Tabs, ActiveTabs)}), 611 noreply(State); 612 613handle_call({update_where_to_write, [add, Tab, AddNode], _From}, _Dummy, State) -> 614%%% dbg_out("update_w2w ~p", [[add, Tab, AddNode]]), %%% qqqq 615 Current = val({current, db_nodes}), 616 Res = 617 case lists:member(AddNode, Current) and 618 State#state.schema_is_merged == true of 619 true -> 620 mnesia_lib:add({Tab, where_to_write}, AddNode); 621 false -> 622 ignore 623 end, 624 {reply, Res, State}; 625 626handle_call({add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From}, 627 ReplyTo, State) -> 628 KnownNode = lists:member(ToNode, val({current, db_nodes})), 629 Merged = State#state.schema_is_merged, 630 if 631 KnownNode == false -> 632 reply(ReplyTo, ignore), 633 noreply(State); 634 Merged == true -> 635 Res = add_active_replica(Tab, ToNode, RemoteS, AccessMode), 636 reply(ReplyTo, Res), 637 noreply(State); 638 true -> %% Schema is not merged 639 Msg = {add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From}, 640 Msgs = State#state.early_msgs, 641 reply(ReplyTo, ignore), %% Reply ignore and add data after schema merge 642 noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]}) 643 end; 644 645handle_call({unannounce_add_table_copy, [Tab, Node], From}, ReplyTo, State) -> 646 KnownNode = lists:member(node(From), val({current, db_nodes})), 647 Merged = State#state.schema_is_merged, 648 if 649 KnownNode == false -> 650 reply(ReplyTo, ignore), 651 noreply(State); 652 Merged == true -> 653 Res = unannounce_add_table_copy(Tab, Node), 654 reply(ReplyTo, Res), 655 noreply(State); 656 true -> %% Schema is not merged 657 Msg = {unannounce_add_table_copy, [Tab, Node], From}, 658 Msgs = State#state.early_msgs, 659 reply(ReplyTo, ignore), %% Reply ignore and add data after schema merge 660 %% Set ReplyTO to undefined so we don't reply twice 661 noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]}) 662 end; 663 664handle_call(Msg, From, State) when State#state.schema_is_merged == false -> 665 %% Buffer early messages 666%% dbg_out("Buffered early msg ~p ~n", [Msg]), %% qqqq 667 Msgs = State#state.early_msgs, 668 noreply(State#state{early_msgs = [{call, Msg, From} | Msgs]}); 669 670handle_call({net_load, Tab, Cs}, From, State) -> 671 Worker = #net_load{table = Tab, 672 opt_reply_to = From, 673 reason = add_table_copy, 674 cstruct = Cs 675 }, 676 State2 = add_worker(Worker, State), 677 noreply(State2); 678 679handle_call({late_disc_load, Tabs, Reason, RemoteLoaders}, From, State) -> 680 State2 = late_disc_load(Tabs, Reason, RemoteLoaders, From, State), 681 noreply(State2); 682 683handle_call({block_table, [Tab], From}, _Dummy, State) -> 684 case lists:member(node(From), val({current, db_nodes})) of 685 true -> 686 block_table(Tab); 687 false -> 688 ignore 689 end, 690 {reply, ok, State}; 691 692handle_call({check_w2r, _Node, Tab}, _From, State) -> 693 {reply, val({Tab, where_to_read}), State}; 694 695handle_call(Msg, _From, State) -> 696 error("~p got unexpected call: ~p~n", [?SERVER_NAME, Msg]), 697 noreply(State). 698 699disc_load_intents([H | T]) when record(H, disc_load) -> 700 [H#disc_load.table | disc_load_intents(T)]; 701disc_load_intents([H | T]) when record(H, late_load) -> 702 [H#late_load.table | disc_load_intents(T)]; 703disc_load_intents( [H | T]) when record(H, net_load) -> 704 disc_load_intents(T); 705disc_load_intents([]) -> 706 []. 707 708late_disc_load(TabsR, Reason, RemoteLoaders, From, State) -> 709 verbose("Intend to load tables: ~p~n", [TabsR]), 710 ?eval_debug_fun({?MODULE, late_disc_load}, 711 [{tabs, TabsR}, 712 {reason, Reason}, 713 {loaders, RemoteLoaders}]), 714 715 reply(From, queued), 716 %% RemoteLoaders is a list of {ok, Node, Tabs} tuples 717 718 %% Remove deleted tabs 719 LocalTabs = mnesia_lib:val({schema, local_tables}), 720 Filter = fun({Tab, Reas}, Acc) -> 721 case lists:member(Tab, LocalTabs) of 722 true -> [{Tab, Reas} | Acc]; 723 false -> Acc 724 end; 725 (Tab, Acc) -> 726 case lists:member(Tab, LocalTabs) of 727 true -> [Tab | Acc]; 728 false -> Acc 729 end 730 end, 731 732 Tabs = lists:foldl(Filter, [], TabsR), 733 734 Nodes = val({current, db_nodes}), 735 LateLoaders = late_loaders(Tabs, Reason, RemoteLoaders, Nodes), 736 LateQueue = State#state.late_loader_queue ++ LateLoaders, 737 State#state{late_loader_queue = LateQueue}. 738 739late_loaders([{Tab, Reason} | Tabs], DefaultReason, RemoteLoaders, Nodes) -> 740 LoadNodes = late_load_filter(RemoteLoaders, Tab, Nodes, []), 741 case LoadNodes of 742 [] -> 743 cast({disc_load, Tab, Reason}); % Ugly cast 744 _ -> 745 ignore 746 end, 747 LateLoad = #late_load{table = Tab, loaders = LoadNodes, reason = Reason}, 748 [LateLoad | late_loaders(Tabs, DefaultReason, RemoteLoaders, Nodes)]; 749 750late_loaders([Tab | Tabs], Reason, RemoteLoaders, Nodes) -> 751 Loaders = late_load_filter(RemoteLoaders, Tab, Nodes, []), 752 case Loaders of 753 [] -> 754 cast({disc_load, Tab, Reason}); % Ugly cast 755 _ -> 756 ignore 757 end, 758 LateLoad = #late_load{table = Tab, loaders = Loaders, reason = Reason}, 759 [LateLoad | late_loaders(Tabs, Reason, RemoteLoaders, Nodes)]; 760late_loaders([], _Reason, _RemoteLoaders, _Nodes) -> 761 []. 762 763late_load_filter([{error, _} | RemoteLoaders], Tab, Nodes, Acc) -> 764 late_load_filter(RemoteLoaders, Tab, Nodes, Acc); 765late_load_filter([{badrpc, _} | RemoteLoaders], Tab, Nodes, Acc) -> 766 late_load_filter(RemoteLoaders, Tab, Nodes, Acc); 767late_load_filter([RL | RemoteLoaders], Tab, Nodes, Acc) -> 768 {ok, Node, Intents} = RL, 769 Access = val({Tab, access_mode}), 770 LocalC = val({Tab, local_content}), 771 StillActive = lists:member(Node, Nodes), 772 RemoteIntent = lists:member(Tab, Intents), 773 if 774 Access == read_write, 775 LocalC == false, 776 StillActive == true, 777 RemoteIntent == true -> 778 Masters = mnesia_recover:get_master_nodes(Tab), 779 case lists:member(Node, Masters) of 780 true -> 781 %% The other node is master node for 782 %% the table, accept his load intent 783 late_load_filter(RemoteLoaders, Tab, Nodes, [Node | Acc]); 784 false when Masters == [] -> 785 %% The table has no master nodes 786 %% accept his load intent 787 late_load_filter(RemoteLoaders, Tab, Nodes, [Node | Acc]); 788 false -> 789 %% Some one else is master node for 790 %% the table, ignore his load intent 791 late_load_filter(RemoteLoaders, Tab, Nodes, Acc) 792 end; 793 true -> 794 late_load_filter(RemoteLoaders, Tab, Nodes, Acc) 795 end; 796late_load_filter([], _Tab, _Nodes, Acc) -> 797 Acc. 798 799%%---------------------------------------------------------------------- 800%% Func: handle_cast/2 801%% Returns: {noreply, State} | 802%% {noreply, State, Timeout} | 803%% {stop, Reason, State} (terminate/2 is called) 804%%---------------------------------------------------------------------- 805 806handle_cast({release_schema_commit_lock, _Owner}, State) -> 807 if 808 State#state.is_stopping == true -> 809 {stop, shutdown, State}; 810 true -> 811 case State#state.dumper_queue of 812 [#schema_commit_lock{}|Rest] -> 813 [_Worker | Rest] = State#state.dumper_queue, 814 State2 = State#state{dumper_pid = undefined, 815 dumper_queue = Rest}, 816 State3 = opt_start_worker(State2), 817 noreply(State3); 818 _ -> 819 noreply(State) 820 end 821 end; 822 823handle_cast(unblock_controller, State) -> 824 if 825 State#state.is_stopping == true -> 826 {stop, shutdown, State}; 827 record(hd(State#state.dumper_queue), block_controller) -> 828 [_Worker | Rest] = State#state.dumper_queue, 829 State2 = State#state{dumper_pid = undefined, 830 dumper_queue = Rest}, 831 State3 = opt_start_worker(State2), 832 noreply(State3) 833 end; 834 835handle_cast({mnesia_down, Node}, State) -> 836 maybe_log_mnesia_down(Node), 837 mnesia_lib:del({current, db_nodes}, Node), 838 mnesia_checkpoint:tm_mnesia_down(Node), 839 Alltabs = val({schema, tables}), 840 State2 = reconfigure_tables(Node, State, Alltabs), 841 case State#state.sender_pid of 842 undefined -> ignore; 843 Pid when pid(Pid) -> Pid ! {copier_done, Node} 844 end, 845 case State#state.loader_pid of 846 undefined -> ignore; 847 Pid2 when pid(Pid2) -> Pid2 ! {copier_done, Node} 848 end, 849 NewSenders = 850 case State#state.sender_queue of 851 [OldSender | RestSenders] -> 852 Remove = fun(ST) -> 853 node(ST#send_table.receiver_pid) /= Node 854 end, 855 NewS = lists:filter(Remove, RestSenders), 856 %% Keep old sender it will be removed by sender_done 857 [OldSender | NewS]; 858 [] -> 859 [] 860 end, 861 Early = remove_early_messages(State2#state.early_msgs, Node), 862 mnesia_monitor:mnesia_down(?SERVER_NAME, Node), 863 noreply(State2#state{sender_queue = NewSenders, early_msgs = Early}); 864 865handle_cast({im_running, _Node, NewFriends}, State) -> 866 Tabs = mnesia_lib:local_active_tables() -- [schema], 867 Ns = mnesia_lib:intersect(NewFriends, val({current, db_nodes})), 868 abcast(Ns, {adopt_orphans, node(), Tabs}), 869 noreply(State); 870 871handle_cast(Msg, State) when State#state.schema_is_merged == false -> 872 %% Buffer early messages 873 Msgs = State#state.early_msgs, 874 noreply(State#state{early_msgs = [{cast, Msg} | Msgs]}); 875 876handle_cast({disc_load, Tab, Reason}, State) -> 877 Worker = #disc_load{table = Tab, reason = Reason}, 878 State2 = add_worker(Worker, State), 879 noreply(State2); 880 881handle_cast(Worker, State) when record(Worker, send_table) -> 882 State2 = add_worker(Worker, State), 883 noreply(State2); 884 885handle_cast({sync_tabs, Tabs, From}, State) -> 886 %% user initiated wait_for_tables 887 handle_sync_tabs(Tabs, From), 888 noreply(State); 889 890handle_cast({i_have_tab, Tab, Node}, State) -> 891 case lists:member(Node, val({current, db_nodes})) of 892 true -> 893 State2 = node_has_tabs([Tab], Node, State), 894 noreply(State2); 895 false -> 896 noreply(State) 897 end; 898 899handle_cast({force_load_updated, Tab}, State) -> 900 case val({Tab, active_replicas}) of 901 [] -> 902 %% No valid replicas 903 noreply(State); 904 [SomeNode | _] -> 905 State2 = node_has_tabs([Tab], SomeNode, State), 906 noreply(State2) 907 end; 908 909handle_cast({master_nodes_updated, Tab, Masters}, State) -> 910 Active = val({Tab, active_replicas}), 911 Valid = 912 case val({Tab, load_by_force}) of 913 true -> 914 Active; 915 false -> 916 if 917 Masters == [] -> 918 Active; 919 true -> 920 mnesia_lib:intersect(Masters, Active) 921 end 922 end, 923 case Valid of 924 [] -> 925 %% No valid replicas 926 noreply(State); 927 [SomeNode | _] -> 928 State2 = node_has_tabs([Tab], SomeNode, State), 929 noreply(State2) 930 end; 931 932handle_cast({adopt_orphans, Node, Tabs}, State) -> 933 934 State2 = node_has_tabs(Tabs, Node, State), 935 936 %% Register the other node as up and running 937 mnesia_recover:log_mnesia_up(Node), 938 verbose("Logging mnesia_up ~w~n", [Node]), 939 mnesia_lib:report_system_event({mnesia_up, Node}), 940 941 %% Load orphan tables 942 LocalTabs = val({schema, local_tables}) -- [schema], 943 Nodes = val({current, db_nodes}), 944 {LocalOrphans, RemoteMasters} = 945 orphan_tables(LocalTabs, Node, Nodes, [], []), 946 Reason = {adopt_orphan, node()}, 947 mnesia_late_loader:async_late_disc_load(node(), LocalOrphans, Reason), 948 949 Fun = 950 fun(N) -> 951 RemoteOrphans = 952 [Tab || {Tab, Ns} <- RemoteMasters, 953 lists:member(N, Ns)], 954 mnesia_late_loader:maybe_async_late_disc_load(N, RemoteOrphans, Reason) 955 end, 956 lists:foreach(Fun, Nodes), 957 958 Queue = State2#state.loader_queue, 959 State3 = State2#state{loader_queue = Queue}, 960 noreply(State3); 961 962handle_cast(Msg, State) -> 963 error("~p got unexpected cast: ~p~n", [?SERVER_NAME, Msg]), 964 noreply(State). 965 966handle_sync_tabs([Tab | Tabs], From) -> 967 case val({Tab, where_to_read}) of 968 nowhere -> 969 case get({sync_tab, Tab}) of 970 undefined -> 971 put({sync_tab, Tab}, [From]); 972 Pids -> 973 put({sync_tab, Tab}, [From | Pids]) 974 end; 975 _ -> 976 sync_reply(From, Tab) 977 end, 978 handle_sync_tabs(Tabs, From); 979handle_sync_tabs([], _From) -> 980 ok. 981 982%%---------------------------------------------------------------------- 983%% Func: handle_info/2 984%% Returns: {noreply, State} | 985%% {noreply, State, Timeout} | 986%% {stop, Reason, State} (terminate/2 is called) 987%%---------------------------------------------------------------------- 988 989handle_info({async_dump_log, InitBy}, State) -> 990 Worker = #dump_log{initiated_by = InitBy}, 991 State2 = add_worker(Worker, State), 992 noreply(State2); 993 994handle_info(Done, State) when record(Done, dumper_done) -> 995 Pid = Done#dumper_done.worker_pid, 996 Res = Done#dumper_done.worker_res, 997 if 998 State#state.is_stopping == true -> 999 {stop, shutdown, State}; 1000 Res == dumped, Pid == State#state.dumper_pid -> 1001 [Worker | Rest] = State#state.dumper_queue, 1002 reply(Worker#dump_log.opt_reply_to, Res), 1003 State2 = State#state{dumper_pid = undefined, 1004 dumper_queue = Rest}, 1005 State3 = opt_start_worker(State2), 1006 noreply(State3); 1007 true -> 1008 fatal("Dumper failed: ~p~n state: ~p~n", [Res, State]), 1009 {stop, fatal, State} 1010 end; 1011 1012handle_info(Done, State) when record(Done, loader_done) -> 1013 if 1014 %% Assertion 1015 Done#loader_done.worker_pid == State#state.loader_pid -> ok 1016 end, 1017 1018 [_Worker | Rest] = LoadQ0 = State#state.loader_queue, 1019 LateQueue0 = State#state.late_loader_queue, 1020 {LoadQ, LateQueue} = 1021 case Done#loader_done.is_loaded of 1022 true -> 1023 Tab = Done#loader_done.table_name, 1024 1025 %% Optional user sync 1026 case Done#loader_done.needs_sync of 1027 true -> user_sync_tab(Tab); 1028 false -> ignore 1029 end, 1030 1031 %% Optional table announcement 1032 case Done#loader_done.needs_announce of 1033 true -> 1034 i_have_tab(Tab), 1035 case Tab of 1036 schema -> 1037 ignore; 1038 _ -> 1039 %% Local node needs to perform user_sync_tab/1 1040 Ns = val({current, db_nodes}), 1041 abcast(Ns, {i_have_tab, Tab, node()}) 1042 end; 1043 false -> 1044 case Tab of 1045 schema -> 1046 ignore; 1047 _ -> 1048 %% Local node needs to perform user_sync_tab/1 1049 Ns = val({current, db_nodes}), 1050 AlreadyKnows = val({Tab, active_replicas}), 1051 abcast(Ns -- AlreadyKnows, {i_have_tab, Tab, node()}) 1052 end 1053 end, 1054 1055 %% Optional client reply 1056 case Done#loader_done.needs_reply of 1057 true -> 1058 reply(Done#loader_done.reply_to, 1059 Done#loader_done.reply); 1060 false -> 1061 ignore 1062 end, 1063 {Rest, reply_late_load(Tab, LateQueue0)}; 1064 false -> 1065 case Done#loader_done.reply of 1066 restart -> 1067 {LoadQ0, LateQueue0}; 1068 _ -> 1069 {Rest, LateQueue0} 1070 end 1071 end, 1072 1073 State2 = State#state{loader_pid = undefined, 1074 loader_queue = LoadQ, 1075 late_loader_queue = LateQueue}, 1076 1077 State3 = opt_start_worker(State2), 1078 noreply(State3); 1079 1080handle_info(Done, State) when record(Done, sender_done) -> 1081 Pid = Done#sender_done.worker_pid, 1082 Res = Done#sender_done.worker_res, 1083 if 1084 Res == ok, Pid == State#state.sender_pid -> 1085 [Worker | Rest] = State#state.sender_queue, 1086 Worker#send_table.receiver_pid ! {copier_done, node()}, 1087 State2 = State#state{sender_pid = undefined, 1088 sender_queue = Rest}, 1089 State3 = opt_start_worker(State2), 1090 noreply(State3); 1091 true -> 1092 %% No need to send any message to the table receiver 1093 %% since it will soon get a mnesia_down anyway 1094 fatal("Sender failed: ~p~n state: ~p~n", [Res, State]), 1095 {stop, fatal, State} 1096 end; 1097 1098handle_info({'EXIT', Pid, R}, State) when Pid == State#state.supervisor -> 1099 catch set(mnesia_status, stopping), 1100 case State#state.dumper_pid of 1101 undefined -> 1102 dbg_out("~p was ~p~n", [?SERVER_NAME, R]), 1103 {stop, shutdown, State}; 1104 _ -> 1105 noreply(State#state{is_stopping = true}) 1106 end; 1107 1108handle_info({'EXIT', Pid, R}, State) when Pid == State#state.dumper_pid -> 1109 case State#state.dumper_queue of 1110 [#schema_commit_lock{}|Workers] -> %% Schema trans crashed or was killed 1111 State2 = State#state{dumper_queue = Workers, dumper_pid = undefined}, 1112 State3 = opt_start_worker(State2), 1113 noreply(State3); 1114 _Other -> 1115 fatal("Dumper or schema commit crashed: ~p~n state: ~p~n", [R, State]), 1116 {stop, fatal, State} 1117 end; 1118 1119handle_info({'EXIT', Pid, R}, State) when Pid == State#state.loader_pid -> 1120 fatal("Loader crashed: ~p~n state: ~p~n", [R, State]), 1121 {stop, fatal, State}; 1122 1123handle_info({'EXIT', Pid, R}, State) when Pid == State#state.sender_pid -> 1124 %% No need to send any message to the table receiver 1125 %% since it will soon get a mnesia_down anyway 1126 fatal("Sender crashed: ~p~n state: ~p~n", [R, State]), 1127 {stop, fatal, State}; 1128 1129handle_info({From, get_state}, State) -> 1130 From ! {?SERVER_NAME, State}, 1131 noreply(State); 1132 1133%% No real need for buffering 1134handle_info(Msg, State) when State#state.schema_is_merged == false -> 1135 %% Buffer early messages 1136 Msgs = State#state.early_msgs, 1137 noreply(State#state{early_msgs = [{info, Msg} | Msgs]}); 1138 1139handle_info({'EXIT', Pid, wait_for_tables_timeout}, State) -> 1140 sync_tab_timeout(Pid, get()), 1141 noreply(State); 1142 1143handle_info(Msg, State) -> 1144 error("~p got unexpected info: ~p~n", [?SERVER_NAME, Msg]), 1145 noreply(State). 1146 1147reply_late_load(Tab, [H | T]) when H#late_load.table == Tab -> 1148 reply(H#late_load.opt_reply_to, ok), 1149 reply_late_load(Tab, T); 1150reply_late_load(Tab, [H | T]) -> 1151 [H | reply_late_load(Tab, T)]; 1152reply_late_load(_Tab, []) -> 1153 []. 1154 1155sync_tab_timeout(Pid, [{{sync_tab, Tab}, Pids} | Tail]) -> 1156 case lists:delete(Pid, Pids) of 1157 [] -> 1158 erase({sync_tab, Tab}); 1159 Pids2 -> 1160 put({sync_tab, Tab}, Pids2) 1161 end, 1162 sync_tab_timeout(Pid, Tail); 1163sync_tab_timeout(Pid, [_ | Tail]) -> 1164 sync_tab_timeout(Pid, Tail); 1165sync_tab_timeout(_Pid, []) -> 1166 ok. 1167 1168%% Pick the load record that has the highest load order 1169%% Returns {BestLoad, RemainingQueue} or {none, []} if queue is empty 1170pick_next(Queue) -> 1171 pick_next(Queue, none, none, []). 1172 1173pick_next([Head | Tail], Load, Order, Rest) when record(Head, net_load) -> 1174 Tab = Head#net_load.table, 1175 select_best(Head, Tail, val({Tab, load_order}), Load, Order, Rest); 1176pick_next([Head | Tail], Load, Order, Rest) when record(Head, disc_load) -> 1177 Tab = Head#disc_load.table, 1178 select_best(Head, Tail, val({Tab, load_order}), Load, Order, Rest); 1179pick_next([], Load, _Order, Rest) -> 1180 {Load, Rest}. 1181 1182select_best(Load, Tail, Order, none, none, Rest) -> 1183 pick_next(Tail, Load, Order, Rest); 1184select_best(Load, Tail, Order, OldLoad, OldOrder, Rest) when Order > OldOrder -> 1185 pick_next(Tail, Load, Order, [OldLoad | Rest]); 1186select_best(Load, Tail, _Order, OldLoad, OldOrder, Rest) -> 1187 pick_next(Tail, OldLoad, OldOrder, [Load | Rest]). 1188 1189%%---------------------------------------------------------------------- 1190%% Func: terminate/2 1191%% Purpose: Shutdown the server 1192%% Returns: any (ignored by gen_server) 1193%%---------------------------------------------------------------------- 1194terminate(Reason, State) -> 1195 mnesia_monitor:terminate_proc(?SERVER_NAME, Reason, State). 1196 1197%%---------------------------------------------------------------------- 1198%% Func: code_change/3 1199%% Purpose: Upgrade process when its code is to be changed 1200%% Returns: {ok, NewState} 1201%%---------------------------------------------------------------------- 1202code_change(_OldVsn, State, _Extra) -> 1203 {ok, State}. 1204 1205%%%---------------------------------------------------------------------- 1206%%% Internal functions 1207%%%---------------------------------------------------------------------- 1208 1209maybe_log_mnesia_down(N) -> 1210 %% We use mnesia_down when deciding which tables to load locally, 1211 %% so if we are not running (i.e haven't decided which tables 1212 %% to load locally), don't log mnesia_down yet. 1213 case mnesia_lib:is_running() of 1214 yes -> 1215 verbose("Logging mnesia_down ~w~n", [N]), 1216 mnesia_recover:log_mnesia_down(N), 1217 ok; 1218 _ -> 1219 Filter = fun(Tab) -> 1220 inactive_copy_holders(Tab, N) 1221 end, 1222 HalfLoadedTabs = lists:any(Filter, val({schema, local_tables}) -- [schema]), 1223 if 1224 HalfLoadedTabs == true -> 1225 verbose("Logging mnesia_down ~w~n", [N]), 1226 mnesia_recover:log_mnesia_down(N), 1227 ok; 1228 true -> 1229 %% Unfortunately we have not loaded some common 1230 %% tables yet, so we cannot rely on the nodedown 1231 log_later %% BUGBUG handle this case!!! 1232 end 1233 end. 1234 1235inactive_copy_holders(Tab, Node) -> 1236 Cs = val({Tab, cstruct}), 1237 case mnesia_lib:cs_to_storage_type(Node, Cs) of 1238 unknown -> 1239 false; 1240 _Storage -> 1241 mnesia_lib:not_active_here(Tab) 1242 end. 1243 1244orphan_tables([Tab | Tabs], Node, Ns, Local, Remote) -> 1245 Cs = val({Tab, cstruct}), 1246 CopyHolders = mnesia_lib:copy_holders(Cs), 1247 RamCopyHolders = Cs#cstruct.ram_copies, 1248 DiscCopyHolders = CopyHolders -- RamCopyHolders, 1249 DiscNodes = val({schema, disc_copies}), 1250 LocalContent = Cs#cstruct.local_content, 1251 RamCopyHoldersOnDiscNodes = mnesia_lib:intersect(RamCopyHolders, DiscNodes), 1252 Active = val({Tab, active_replicas}), 1253 case lists:member(Node, DiscCopyHolders) of 1254 true when Active == [] -> 1255 case DiscCopyHolders -- Ns of 1256 [] -> 1257 %% We're last up and the other nodes have not 1258 %% loaded the table. Lets load it if we are 1259 %% the smallest node. 1260 case lists:min(DiscCopyHolders) of 1261 Min when Min == node() -> 1262 case mnesia_recover:get_master_nodes(Tab) of 1263 [] -> 1264 L = [Tab | Local], 1265 orphan_tables(Tabs, Node, Ns, L, Remote); 1266 Masters -> 1267 R = [{Tab, Masters} | Remote], 1268 orphan_tables(Tabs, Node, Ns, Local, R) 1269 end; 1270 _ -> 1271 orphan_tables(Tabs, Node, Ns, Local, Remote) 1272 end; 1273 _ -> 1274 orphan_tables(Tabs, Node, Ns, Local, Remote) 1275 end; 1276 false when Active == [], DiscCopyHolders == [], RamCopyHoldersOnDiscNodes == [] -> 1277 %% Special case when all replicas resides on disc less nodes 1278 orphan_tables(Tabs, Node, Ns, [Tab | Local], Remote); 1279 _ when LocalContent == true -> 1280 orphan_tables(Tabs, Node, Ns, [Tab | Local], Remote); 1281 _ -> 1282 orphan_tables(Tabs, Node, Ns, Local, Remote) 1283 end; 1284orphan_tables([], _, _, LocalOrphans, RemoteMasters) -> 1285 {LocalOrphans, RemoteMasters}. 1286 1287node_has_tabs([Tab | Tabs], Node, State) when Node /= node() -> 1288 State2 = update_whereabouts(Tab, Node, State), 1289 node_has_tabs(Tabs, Node, State2); 1290node_has_tabs([Tab | Tabs], Node, State) -> 1291 user_sync_tab(Tab), 1292 node_has_tabs(Tabs, Node, State); 1293node_has_tabs([], _Node, State) -> 1294 State. 1295 1296update_whereabouts(Tab, Node, State) -> 1297 Storage = val({Tab, storage_type}), 1298 Read = val({Tab, where_to_read}), 1299 LocalC = val({Tab, local_content}), 1300 BeingCreated = (?catch_val({Tab, create_table}) == true), 1301 Masters = mnesia_recover:get_master_nodes(Tab), 1302 ByForce = val({Tab, load_by_force}), 1303 GoGetIt = 1304 if 1305 ByForce == true -> 1306 true; 1307 Masters == [] -> 1308 true; 1309 true -> 1310 lists:member(Node, Masters) 1311 end, 1312 1313 dbg_out("Table ~w is loaded on ~w. s=~w, r=~w, lc=~w, f=~w, m=~w~n", 1314 [Tab, Node, Storage, Read, LocalC, ByForce, GoGetIt]), 1315 if 1316 LocalC == true -> 1317 %% Local contents, don't care about other node 1318 State; 1319 Storage == unknown, Read == nowhere -> 1320 %% No own copy, time to read remotely 1321 %% if the other node is a good node 1322 add_active_replica(Tab, Node), 1323 case GoGetIt of 1324 true -> 1325 set({Tab, where_to_read}, Node), 1326 user_sync_tab(Tab), 1327 State; 1328 false -> 1329 State 1330 end; 1331 Storage == unknown -> 1332 %% No own copy, continue to read remotely 1333 add_active_replica(Tab, Node), 1334 NodeST = mnesia_lib:storage_type_at_node(Node, Tab), 1335 ReadST = mnesia_lib:storage_type_at_node(Read, Tab), 1336 if %% Avoid reading from disc_only_copies 1337 NodeST == disc_only_copies -> 1338 ignore; 1339 ReadST == disc_only_copies -> 1340 mnesia_lib:set_remote_where_to_read(Tab); 1341 true -> 1342 ignore 1343 end, 1344 user_sync_tab(Tab), 1345 State; 1346 BeingCreated == true -> 1347 %% The table is currently being created 1348 %% and we shall have an own copy of it. 1349 %% We will load the (empty) table locally. 1350 add_active_replica(Tab, Node), 1351 State; 1352 Read == nowhere -> 1353 %% Own copy, go and get a copy of the table 1354 %% if the other node is master or if there 1355 %% are no master at all 1356 add_active_replica(Tab, Node), 1357 case GoGetIt of 1358 true -> 1359 Worker = #net_load{table = Tab, 1360 reason = {active_remote, Node}}, 1361 add_worker(Worker, State); 1362 false -> 1363 State 1364 end; 1365 true -> 1366 %% We already have an own copy 1367 add_active_replica(Tab, Node), 1368 user_sync_tab(Tab), 1369 State 1370 end. 1371 1372initial_safe_loads() -> 1373 case val({schema, storage_type}) of 1374 ram_copies -> 1375 Downs = [], 1376 Tabs = val({schema, local_tables}) -- [schema], 1377 LastC = fun(T) -> last_consistent_replica(T, Downs) end, 1378 lists:zf(LastC, Tabs); 1379 1380 disc_copies -> 1381 Downs = mnesia_recover:get_mnesia_downs(), 1382 dbg_out("mnesia_downs = ~p~n", [Downs]), 1383 1384 Tabs = val({schema, local_tables}) -- [schema], 1385 LastC = fun(T) -> last_consistent_replica(T, Downs) end, 1386 lists:zf(LastC, Tabs) 1387 end. 1388 1389last_consistent_replica(Tab, Downs) -> 1390 Cs = val({Tab, cstruct}), 1391 Storage = mnesia_lib:cs_to_storage_type(node(), Cs), 1392 Ram = Cs#cstruct.ram_copies, 1393 Disc = Cs#cstruct.disc_copies, 1394 DiscOnly = Cs#cstruct.disc_only_copies, 1395 BetterCopies0 = mnesia_lib:remote_copy_holders(Cs) -- Downs, 1396 BetterCopies = BetterCopies0 -- Ram, 1397 AccessMode = Cs#cstruct.access_mode, 1398 Copies = mnesia_lib:copy_holders(Cs), 1399 Masters = mnesia_recover:get_master_nodes(Tab), 1400 LocalMaster0 = lists:member(node(), Masters), 1401 LocalContent = Cs#cstruct.local_content, 1402 RemoteMaster = 1403 if 1404 Masters == [] -> false; 1405 true -> not LocalMaster0 1406 end, 1407 LocalMaster = 1408 if 1409 Masters == [] -> false; 1410 true -> LocalMaster0 1411 end, 1412 if 1413 Copies == [node()] -> 1414 %% Only one copy holder and it is local. 1415 %% It may also be a local contents table 1416 {true, {Tab, local_only}}; 1417 LocalContent == true -> 1418 {true, {Tab, local_content}}; 1419 LocalMaster == true -> 1420 %% We have a local master 1421 {true, {Tab, local_master}}; 1422 RemoteMaster == true -> 1423 %% Wait for remote master copy 1424 false; 1425 Storage == ram_copies -> 1426 if 1427 Disc == [], DiscOnly == [] -> 1428 %% Nobody has copy on disc 1429 {true, {Tab, ram_only}}; 1430 true -> 1431 %% Some other node has copy on disc 1432 false 1433 end; 1434 AccessMode == read_only -> 1435 %% No one has been able to update the table, 1436 %% i.e. all disc resident copies are equal 1437 {true, {Tab, read_only}}; 1438 BetterCopies /= [], Masters /= [node()] -> 1439 %% There are better copies on other nodes 1440 %% and we do not have the only master copy 1441 false; 1442 true -> 1443 {true, {Tab, initial}} 1444 end. 1445 1446reconfigure_tables(N, State, [Tab |Tail]) -> 1447 del_active_replica(Tab, N), 1448 case val({Tab, where_to_read}) of 1449 N -> mnesia_lib:set_remote_where_to_read(Tab); 1450 _ -> ignore 1451 end, 1452 LateQ = drop_loaders(Tab, N, State#state.late_loader_queue), 1453 reconfigure_tables(N, State#state{late_loader_queue = LateQ}, Tail); 1454 1455reconfigure_tables(_, State, []) -> 1456 State. 1457 1458remove_early_messages([], _Node) -> 1459 []; 1460remove_early_messages([{call, {add_active_replica, [_, Node, _, _], _}, _}|R], Node) -> 1461 remove_early_messages(R, Node); %% Does a reply before queuing 1462remove_early_messages([{call, {block_table, _, From}, ReplyTo}|R], Node) 1463 when node(From) == Node -> 1464 reply(ReplyTo, ok), %% Remove gen:server waits.. 1465 remove_early_messages(R, Node); 1466remove_early_messages([{cast, {i_have_tab, _Tab, Node}}|R], Node) -> 1467 remove_early_messages(R, Node); 1468remove_early_messages([{cast, {adopt_orphans, Node, _Tabs}}|R], Node) -> 1469 remove_early_messages(R, Node); 1470remove_early_messages([M|R],Node) -> 1471 [M|remove_early_messages(R,Node)]. 1472 1473%% Drop loader from late load queue and possibly trigger a disc_load 1474drop_loaders(Tab, Node, [H | T]) when H#late_load.table == Tab -> 1475 %% Check if it is time to issue a disc_load request 1476 case H#late_load.loaders of 1477 [Node] -> 1478 Reason = {H#late_load.reason, last_loader_down, Node}, 1479 cast({disc_load, Tab, Reason}); % Ugly cast 1480 _ -> 1481 ignore 1482 end, 1483 %% Drop the node from the list of loaders 1484 H2 = H#late_load{loaders = H#late_load.loaders -- [Node]}, 1485 [H2 | drop_loaders(Tab, Node, T)]; 1486drop_loaders(Tab, Node, [H | T]) -> 1487 [H | drop_loaders(Tab, Node, T)]; 1488drop_loaders(_, _, []) -> 1489 []. 1490 1491add_active_replica(Tab, Node) -> 1492 add_active_replica(Tab, Node, val({Tab, cstruct})). 1493 1494add_active_replica(Tab, Node, Cs) when record(Cs, cstruct) -> 1495 Storage = mnesia_lib:schema_cs_to_storage_type(Node, Cs), 1496 AccessMode = Cs#cstruct.access_mode, 1497 add_active_replica(Tab, Node, Storage, AccessMode). 1498 1499%% Block table primitives 1500 1501block_table(Tab) -> 1502 Var = {Tab, where_to_commit}, 1503 Old = val(Var), 1504 New = {blocked, Old}, 1505 set(Var, New). % where_to_commit 1506 1507unblock_table(Tab) -> 1508 Var = {Tab, where_to_commit}, 1509 New = 1510 case val(Var) of 1511 {blocked, List} -> 1512 List; 1513 List -> 1514 List 1515 end, 1516 set(Var, New). % where_to_commit 1517 1518is_tab_blocked(W2C) when list(W2C) -> 1519 {false, W2C}; 1520is_tab_blocked({blocked, W2C}) when list(W2C) -> 1521 {true, W2C}. 1522 1523mark_blocked_tab(true, Value) -> 1524 {blocked, Value}; 1525mark_blocked_tab(false, Value) -> 1526 Value. 1527 1528%% 1529 1530add_active_replica(Tab, Node, Storage, AccessMode) -> 1531 Var = {Tab, where_to_commit}, 1532 {Blocked, Old} = is_tab_blocked(val(Var)), 1533 Del = lists:keydelete(Node, 1, Old), 1534 case AccessMode of 1535 read_write -> 1536 New = lists:sort([{Node, Storage} | Del]), 1537 set(Var, mark_blocked_tab(Blocked, New)), % where_to_commit 1538 add({Tab, where_to_write}, Node); 1539 read_only -> 1540 set(Var, mark_blocked_tab(Blocked, Del)), 1541 mnesia_lib:del({Tab, where_to_write}, Node) 1542 end, 1543 add({Tab, active_replicas}, Node). 1544 1545del_active_replica(Tab, Node) -> 1546 Var = {Tab, where_to_commit}, 1547 {Blocked, Old} = is_tab_blocked(val(Var)), 1548 Del = lists:keydelete(Node, 1, Old), 1549 New = lists:sort(Del), 1550 set(Var, mark_blocked_tab(Blocked, New)), % where_to_commit 1551 mnesia_lib:del({Tab, active_replicas}, Node), 1552 mnesia_lib:del({Tab, where_to_write}, Node). 1553 1554change_table_access_mode(Cs) -> 1555 Tab = Cs#cstruct.name, 1556 lists:foreach(fun(N) -> add_active_replica(Tab, N, Cs) end, 1557 val({Tab, active_replicas})). 1558 1559%% node To now has tab loaded, but this must be undone 1560%% This code is rpc:call'ed from the tab_copier process 1561%% when it has *not* released it's table lock 1562unannounce_add_table_copy(Tab, To) -> 1563 del_active_replica(Tab, To), 1564 case val({Tab , where_to_read}) of 1565 To -> 1566 mnesia_lib:set_remote_where_to_read(Tab); 1567 _ -> 1568 ignore 1569 end. 1570 1571user_sync_tab(Tab) -> 1572 case val(debug) of 1573 trace -> 1574 mnesia_subscr:subscribe(whereis(mnesia_event), {table, Tab}); 1575 _ -> 1576 ignore 1577 end, 1578 1579 case erase({sync_tab, Tab}) of 1580 undefined -> 1581 ok; 1582 Pids -> 1583 lists:foreach(fun(Pid) -> sync_reply(Pid, Tab) end, Pids) 1584 end. 1585 1586i_have_tab(Tab) -> 1587 case val({Tab, local_content}) of 1588 true -> 1589 mnesia_lib:set_local_content_whereabouts(Tab); 1590 false -> 1591 set({Tab, where_to_read}, node()) 1592 end, 1593 add_active_replica(Tab, node()). 1594 1595sync_and_block_table_whereabouts(Tab, ToNode, RemoteS, AccessMode) when Tab /= schema -> 1596 Current = val({current, db_nodes}), 1597 Ns = 1598 case lists:member(ToNode, Current) of 1599 true -> Current -- [ToNode]; 1600 false -> Current 1601 end, 1602 remote_call(ToNode, block_table, [Tab]), 1603 [remote_call(Node, add_active_replica, [Tab, ToNode, RemoteS, AccessMode]) || 1604 Node <- [ToNode | Ns]], 1605 ok. 1606 1607sync_del_table_copy_whereabouts(Tab, ToNode) when Tab /= schema -> 1608 Current = val({current, db_nodes}), 1609 Ns = 1610 case lists:member(ToNode, Current) of 1611 true -> Current; 1612 false -> [ToNode | Current] 1613 end, 1614 Args = [Tab, ToNode], 1615 [remote_call(Node, unannounce_add_table_copy, Args) || Node <- Ns], 1616 ok. 1617 1618get_info(Timeout) -> 1619 case whereis(?SERVER_NAME) of 1620 undefined -> 1621 {timeout, Timeout}; 1622 Pid -> 1623 Pid ! {self(), get_state}, 1624 receive 1625 {?SERVER_NAME, State} when record(State, state) -> 1626 {info,State} 1627 after Timeout -> 1628 {timeout, Timeout} 1629 end 1630 end. 1631 1632get_workers(Timeout) -> 1633 case whereis(?SERVER_NAME) of 1634 undefined -> 1635 {timeout, Timeout}; 1636 Pid -> 1637 Pid ! {self(), get_state}, 1638 receive 1639 {?SERVER_NAME, State} when record(State, state) -> 1640 {workers, State#state.loader_pid, State#state.sender_pid, State#state.dumper_pid} 1641 after Timeout -> 1642 {timeout, Timeout} 1643 end 1644 end. 1645 1646info() -> 1647 Tabs = mnesia_lib:local_active_tables(), 1648 io:format( "---> Active tables <--- ~n", []), 1649 info(Tabs). 1650 1651info([Tab | Tail]) -> 1652 case val({Tab, storage_type}) of 1653 disc_only_copies -> 1654 info_format(Tab, 1655 dets:info(Tab, size), 1656 dets:info(Tab, file_size), 1657 "bytes on disc"); 1658 _ -> 1659 info_format(Tab, 1660 ?ets_info(Tab, size), 1661 ?ets_info(Tab, memory), 1662 "words of mem") 1663 end, 1664 info(Tail); 1665info([]) -> ok; 1666info(Tab) -> info([Tab]). 1667 1668info_format(Tab, Size, Mem, Media) -> 1669 StrT = mnesia_lib:pad_name(atom_to_list(Tab), 15, []), 1670 StrS = mnesia_lib:pad_name(integer_to_list(Size), 8, []), 1671 StrM = mnesia_lib:pad_name(integer_to_list(Mem), 8, []), 1672 io:format("~s: with ~s records occupying ~s ~s~n", 1673 [StrT, StrS, StrM, Media]). 1674 1675%% Handle early arrived messages 1676handle_early_msgs([Msg | Msgs], State) -> 1677 %% The messages are in reverse order 1678 case handle_early_msg(Msg, State) of 1679 {stop, Reason, Reply, State2} -> 1680 {stop, Reason, Reply, State2}; 1681 {stop, Reason, State2} -> 1682 {stop, Reason, State2}; 1683 {noreply, State2} -> 1684 handle_early_msgs(Msgs, State2); 1685 {noreply, State2, _Timeout} -> 1686 handle_early_msgs(Msgs, State2); 1687 Else -> 1688 dbg_out("handle_early_msgs case clause ~p ~n", [Else]), 1689 erlang:error(Else, [[Msg | Msgs], State]) 1690 end; 1691handle_early_msgs([], State) -> 1692 noreply(State). 1693 1694handle_early_msg({call, Msg, From}, State) -> 1695 handle_call(Msg, From, State); 1696handle_early_msg({cast, Msg}, State) -> 1697 handle_cast(Msg, State); 1698handle_early_msg({info, Msg}, State) -> 1699 handle_info(Msg, State). 1700 1701noreply(State) -> 1702 {noreply, State}. 1703 1704reply(undefined, Reply) -> 1705 Reply; 1706reply(ReplyTo, Reply) -> 1707 gen_server:reply(ReplyTo, Reply), 1708 Reply. 1709 1710%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 1711%% Worker management 1712 1713%% Returns new State 1714add_worker(Worker, State) when record(Worker, dump_log) -> 1715 InitBy = Worker#dump_log.initiated_by, 1716 Queue = State#state.dumper_queue, 1717 case lists:keymember(InitBy, #dump_log.initiated_by, Queue) of 1718 false -> 1719 ignore; 1720 true when Worker#dump_log.opt_reply_to == undefined -> 1721 %% The same threshold has been exceeded again, 1722 %% before we have had the possibility to 1723 %% process the older one. 1724 DetectedBy = {dump_log, InitBy}, 1725 Event = {mnesia_overload, DetectedBy}, 1726 mnesia_lib:report_system_event(Event) 1727 end, 1728 Queue2 = Queue ++ [Worker], 1729 State2 = State#state{dumper_queue = Queue2}, 1730 opt_start_worker(State2); 1731add_worker(Worker, State) when record(Worker, schema_commit_lock) -> 1732 Queue = State#state.dumper_queue, 1733 Queue2 = Queue ++ [Worker], 1734 State2 = State#state{dumper_queue = Queue2}, 1735 opt_start_worker(State2); 1736add_worker(Worker, State) when record(Worker, net_load) -> 1737 Queue = State#state.loader_queue, 1738 State2 = State#state{loader_queue = Queue ++ [Worker]}, 1739 opt_start_worker(State2); 1740add_worker(Worker, State) when record(Worker, send_table) -> 1741 Queue = State#state.sender_queue, 1742 State2 = State#state{sender_queue = Queue ++ [Worker]}, 1743 opt_start_worker(State2); 1744add_worker(Worker, State) when record(Worker, disc_load) -> 1745 Queue = State#state.loader_queue, 1746 State2 = State#state{loader_queue = Queue ++ [Worker]}, 1747 opt_start_worker(State2); 1748% Block controller should be used for upgrading mnesia. 1749add_worker(Worker, State) when record(Worker, block_controller) -> 1750 Queue = State#state.dumper_queue, 1751 Queue2 = [Worker | Queue], 1752 State2 = State#state{dumper_queue = Queue2}, 1753 opt_start_worker(State2). 1754 1755%% Optionally start a worker 1756%% 1757%% Dumpers and loaders may run simultaneously 1758%% but neither of them may run during schema commit. 1759%% Loaders may not start if a schema commit is enqueued. 1760opt_start_worker(State) when State#state.is_stopping == true -> 1761 State; 1762opt_start_worker(State) -> 1763 %% Prioritize dumper and schema commit 1764 %% by checking them first 1765 case State#state.dumper_queue of 1766 [Worker | _Rest] when State#state.dumper_pid == undefined -> 1767 %% Great, a worker in queue and neither 1768 %% a schema transaction is being 1769 %% committed and nor a dumper is running 1770 1771 %% Start worker but keep him in the queue 1772 if 1773 record(Worker, schema_commit_lock) -> 1774 ReplyTo = Worker#schema_commit_lock.owner, 1775 reply(ReplyTo, granted), 1776 {Owner, _Tag} = ReplyTo, 1777 State#state{dumper_pid = Owner}; 1778 1779 record(Worker, dump_log) -> 1780 Pid = spawn_link(?MODULE, dump_and_reply, [self(), Worker]), 1781 State2 = State#state{dumper_pid = Pid}, 1782 1783 %% If the worker was a dumper we may 1784 %% possibly be able to start a loader 1785 %% or sender 1786 State3 = opt_start_sender(State2), 1787 opt_start_loader(State3); 1788 1789 record(Worker, block_controller) -> 1790 case {State#state.sender_pid, State#state.loader_pid} of 1791 {undefined, undefined} -> 1792 ReplyTo = Worker#block_controller.owner, 1793 reply(ReplyTo, granted), 1794 {Owner, _Tag} = ReplyTo, 1795 State#state{dumper_pid = Owner}; 1796 _ -> 1797 State 1798 end 1799 end; 1800 _ -> 1801 %% Bad luck, try with a loader or sender instead 1802 State2 = opt_start_sender(State), 1803 opt_start_loader(State2) 1804 end. 1805 1806opt_start_sender(State) -> 1807 case State#state.sender_queue of 1808 []-> 1809 %% No need 1810 State; 1811 1812 _ when State#state.sender_pid /= undefined -> 1813 %% Bad luck, a sender is already running 1814 State; 1815 1816 [Sender | _SenderRest] -> 1817 case State#state.loader_queue of 1818 [Loader | _LoaderRest] 1819 when State#state.loader_pid /= undefined, 1820 Loader#net_load.table == Sender#send_table.table -> 1821 %% A conflicting loader is running 1822 State; 1823 _ -> 1824 SchemaQueue = State#state.dumper_queue, 1825 case lists:keymember(schema_commit, 1, SchemaQueue) of 1826 false -> 1827 1828 %% Start worker but keep him in the queue 1829 Pid = spawn_link(?MODULE, send_and_reply, 1830 [self(), Sender]), 1831 State#state{sender_pid = Pid}; 1832 true -> 1833 %% Bad luck, we must wait for the schema commit 1834 State 1835 end 1836 end 1837 end. 1838 1839opt_start_loader(State) -> 1840 LoaderQueue = State#state.loader_queue, 1841 if 1842 LoaderQueue == [] -> 1843 %% No need 1844 State; 1845 1846 State#state.loader_pid /= undefined -> 1847 %% Bad luck, an loader is already running 1848 State; 1849 1850 true -> 1851 SchemaQueue = State#state.dumper_queue, 1852 case lists:keymember(schema_commit, 1, SchemaQueue) of 1853 false -> 1854 {Worker, Rest} = pick_next(LoaderQueue), 1855 1856 %% Start worker but keep him in the queue 1857 Pid = spawn_link(?MODULE, load_and_reply, [self(), Worker]), 1858 State#state{loader_pid = Pid, 1859 loader_queue = [Worker | Rest]}; 1860 true -> 1861 %% Bad luck, we must wait for the schema commit 1862 State 1863 end 1864 end. 1865 1866start_remote_sender(Node, Tab, Receiver, Storage) -> 1867 Msg = #send_table{table = Tab, 1868 receiver_pid = Receiver, 1869 remote_storage = Storage}, 1870 gen_server:cast({?SERVER_NAME, Node}, Msg). 1871 1872dump_and_reply(ReplyTo, Worker) -> 1873 %% No trap_exit, die intentionally instead 1874 Res = mnesia_dumper:opt_dump_log(Worker#dump_log.initiated_by), 1875 ReplyTo ! #dumper_done{worker_pid = self(), 1876 worker_res = Res}, 1877 unlink(ReplyTo), 1878 exit(normal). 1879 1880send_and_reply(ReplyTo, Worker) -> 1881 %% No trap_exit, die intentionally instead 1882 Res = mnesia_loader:send_table(Worker#send_table.receiver_pid, 1883 Worker#send_table.table, 1884 Worker#send_table.remote_storage), 1885 ReplyTo ! #sender_done{worker_pid = self(), 1886 worker_res = Res}, 1887 unlink(ReplyTo), 1888 exit(normal). 1889 1890 1891load_and_reply(ReplyTo, Worker) -> 1892 process_flag(trap_exit, true), 1893 Done = load_table(Worker), 1894 ReplyTo ! Done#loader_done{worker_pid = self()}, 1895 unlink(ReplyTo), 1896 exit(normal). 1897 1898%% Now it is time to load the table 1899%% but first we must check if it still is neccessary 1900load_table(Load) when record(Load, net_load) -> 1901 Tab = Load#net_load.table, 1902 ReplyTo = Load#net_load.opt_reply_to, 1903 Reason = Load#net_load.reason, 1904 LocalC = val({Tab, local_content}), 1905 AccessMode = val({Tab, access_mode}), 1906 ReadNode = val({Tab, where_to_read}), 1907 Active = filter_active(Tab), 1908 Done = #loader_done{is_loaded = true, 1909 table_name = Tab, 1910 needs_announce = false, 1911 needs_sync = false, 1912 needs_reply = true, 1913 reply_to = ReplyTo, 1914 reply = {loaded, ok} 1915 }, 1916 if 1917 ReadNode == node() -> 1918 %% Already loaded locally 1919 Done; 1920 LocalC == true -> 1921 Res = mnesia_loader:disc_load_table(Tab, load_local_content), 1922 Done#loader_done{reply = Res, needs_announce = true, needs_sync = true}; 1923 AccessMode == read_only -> 1924 disc_load_table(Tab, Reason, ReplyTo); 1925 true -> 1926 %% Either we cannot read the table yet 1927 %% or someone is moving a replica between 1928 %% two nodes 1929 Cs = Load#net_load.cstruct, 1930 Res = mnesia_loader:net_load_table(Tab, Reason, Active, Cs), 1931 case Res of 1932 {loaded, ok} -> 1933 Done#loader_done{needs_sync = true, 1934 reply = Res}; 1935 {not_loaded, storage_unknown} -> 1936 Done#loader_done{reply = Res}; 1937 {not_loaded, _} -> 1938 Done#loader_done{is_loaded = false, 1939 needs_reply = false, 1940 reply = Res} 1941 end 1942 end; 1943 1944load_table(Load) when record(Load, disc_load) -> 1945 Tab = Load#disc_load.table, 1946 Reason = Load#disc_load.reason, 1947 ReplyTo = Load#disc_load.opt_reply_to, 1948 ReadNode = val({Tab, where_to_read}), 1949 Active = filter_active(Tab), 1950 Done = #loader_done{is_loaded = true, 1951 table_name = Tab, 1952 needs_announce = false, 1953 needs_sync = false, 1954 needs_reply = false 1955 }, 1956 if 1957 Active == [], ReadNode == nowhere -> 1958 %% Not loaded anywhere, lets load it from disc 1959 disc_load_table(Tab, Reason, ReplyTo); 1960 ReadNode == nowhere -> 1961 %% Already loaded on other node, lets get it 1962 Cs = val({Tab, cstruct}), 1963 case mnesia_loader:net_load_table(Tab, Reason, Active, Cs) of 1964 {loaded, ok} -> 1965 Done#loader_done{needs_sync = true}; 1966 {not_loaded, storage_unknown} -> 1967 Done#loader_done{is_loaded = false}; 1968 {not_loaded, ErrReason} -> 1969 Done#loader_done{is_loaded = false, 1970 reply = {not_loaded,ErrReason}} 1971 end; 1972 true -> 1973 %% Already readable, do not worry be happy 1974 Done 1975 end. 1976 1977disc_load_table(Tab, Reason, ReplyTo) -> 1978 Done = #loader_done{is_loaded = true, 1979 table_name = Tab, 1980 needs_announce = false, 1981 needs_sync = false, 1982 needs_reply = true, 1983 reply_to = ReplyTo, 1984 reply = {loaded, ok} 1985 }, 1986 Res = mnesia_loader:disc_load_table(Tab, Reason), 1987 if 1988 Res == {loaded, ok} -> 1989 Done#loader_done{needs_announce = true, 1990 needs_sync = true, 1991 reply = Res}; 1992 ReplyTo /= undefined -> 1993 Done#loader_done{is_loaded = false, 1994 reply = Res}; 1995 true -> 1996 fatal("Cannot load table ~p from disc: ~p~n", [Tab, Res]) 1997 end. 1998 1999filter_active(Tab) -> 2000 ByForce = val({Tab, load_by_force}), 2001 Active = val({Tab, active_replicas}), 2002 Masters = mnesia_recover:get_master_nodes(Tab), 2003 do_filter_active(ByForce, Active, Masters). 2004 2005do_filter_active(true, Active, _Masters) -> 2006 Active; 2007do_filter_active(false, Active, []) -> 2008 Active; 2009do_filter_active(false, Active, Masters) -> 2010 mnesia_lib:intersect(Active, Masters). 2011