1%% ``Licensed under the Apache License, Version 2.0 (the "License");
2%% you may not use this file except in compliance with the License.
3%% You may obtain a copy of the License at
4%%
5%%     http://www.apache.org/licenses/LICENSE-2.0
6%%
7%% Unless required by applicable law or agreed to in writing, software
8%% distributed under the License is distributed on an "AS IS" BASIS,
9%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10%% See the License for the specific language governing permissions and
11%% limitations under the License.
12%%
13%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
14%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
15%% AB. All Rights Reserved.''
16%%
17%%     $Id: mnesia_controller.erl,v 1.3 2010/03/04 13:54:19 maria Exp $
18%% The mnesia_init process loads tables from local disc or from
19%% another nodes. It also coordinates updates of the info about
20%% where we can read and write tables.
21%%
22%% Tables may need to be loaded initially at startup of the local
23%% node or when other nodes announces that they already have loaded
24%% tables that we also want.
25%%
26%% Initially we set the load request queue to those tables that we
27%% safely can load locally, i.e. tables where we have the last
28%% consistent replica and we have received mnesia_down from all
29%% other nodes holding the table. Then we let the mnesia_init
30%% process enter its normal working state.
31%%
32%% When we need to load a table we append a request to the load
33%% request queue. All other requests are regarded as high priority
34%% and are processed immediately (e.g. update table whereabouts).
35%% We processes the load request queue as a "background" job..
36
37-module(mnesia_controller).
38
39-behaviour(gen_server).
40
41%% Mnesia internal stuff
42-export([
43	 start/0,
44	 i_have_tab/1,
45	 info/0,
46	 get_info/1,
47	 get_workers/1,
48	 force_load_table/1,
49	 async_dump_log/1,
50	 sync_dump_log/1,
51	 connect_nodes/1,
52	 wait_for_schema_commit_lock/0,
53	 release_schema_commit_lock/0,
54	 create_table/1,
55	 get_disc_copy/1,
56	 get_cstructs/0,
57	 sync_and_block_table_whereabouts/4,
58	 sync_del_table_copy_whereabouts/2,
59	 block_table/1,
60	 unblock_table/1,
61	 block_controller/0,
62	 unblock_controller/0,
63	 unannounce_add_table_copy/2,
64	 master_nodes_updated/2,
65	 mnesia_down/1,
66	 add_active_replica/2,
67	 add_active_replica/3,
68	 add_active_replica/4,
69	 change_table_access_mode/1,
70	 del_active_replica/2,
71	 wait_for_tables/2,
72	 get_network_copy/2,
73	 merge_schema/0,
74	 start_remote_sender/4,
75	 schedule_late_disc_load/2
76	]).
77
78%% gen_server callbacks
79-export([init/1,
80	 handle_call/3,
81	 handle_cast/2,
82	 handle_info/2,
83	 terminate/2,
84	 code_change/3]).
85
86%% Module internal stuff
87-export([call/1,
88	 cast/1,
89	 dump_and_reply/2,
90	 load_and_reply/2,
91	 send_and_reply/2,
92	 wait_for_tables_init/2
93	]).
94
95-import(mnesia_lib, [set/2, add/2]).
96-import(mnesia_lib, [fatal/2, error/2, verbose/2, dbg_out/2]).
97
98-include("mnesia.hrl").
99
100-define(SERVER_NAME, ?MODULE).
101
102-record(state, {supervisor,
103		schema_is_merged = false,
104		early_msgs = [],
105		loader_pid,
106		loader_queue = [],
107		sender_pid,
108		sender_queue =  [],
109		late_loader_queue = [],
110		dumper_pid,          % Dumper or schema commit pid
111		dumper_queue = [],   % Dumper or schema commit queue
112		dump_log_timer_ref,
113		is_stopping = false
114	       }).
115
116-record(worker_reply, {what,
117		       pid,
118		       result
119		      }).
120
121-record(schema_commit_lock, {owner}).
122-record(block_controller, {owner}).
123
124-record(dump_log, {initiated_by,
125		   opt_reply_to
126		  }).
127
128-record(net_load, {table,
129		   reason,
130		   opt_reply_to,
131		   cstruct = unknown
132		  }).
133
134-record(send_table, {table,
135		     receiver_pid,
136		     remote_storage
137		    }).
138
139-record(disc_load, {table,
140		    reason,
141		    opt_reply_to
142		   }).
143
144-record(late_load, {table,
145		    reason,
146		    opt_reply_to,
147		    loaders
148		   }).
149
150-record(loader_done, {worker_pid,
151		      is_loaded,
152		      table_name,
153		      needs_announce,
154		      needs_sync,
155		      needs_reply,
156		      reply_to,
157		      reply}).
158
159-record(sender_done, {worker_pid,
160		      worker_res,
161		      table_name
162		     }).
163
164-record(dumper_done, {worker_pid,
165		      worker_res
166		     }).
167
168val(Var) ->
169    case ?catch_val(Var) of
170	{'EXIT', Reason} -> mnesia_lib:other_val(Var, Reason);
171	Value -> Value
172    end.
173
174start() ->
175    gen_server:start_link({local, ?SERVER_NAME}, ?MODULE, [self()],
176			  [{timeout, infinity}
177			   %% ,{debug, [trace]}
178			  ]).
179
180sync_dump_log(InitBy) ->
181    call({sync_dump_log, InitBy}).
182
183async_dump_log(InitBy) ->
184    ?SERVER_NAME ! {async_dump_log, InitBy}.
185
186%% Wait for tables to be active
187%% If needed, we will wait for Mnesia to start
188%% If Mnesia stops, we will wait for Mnesia to restart
189%% We will wait even if the list of tables is empty
190%%
191wait_for_tables(Tabs, Timeout) when list(Tabs), Timeout == infinity ->
192    do_wait_for_tables(Tabs, Timeout);
193wait_for_tables(Tabs, Timeout) when list(Tabs),
194                                    integer(Timeout), Timeout >= 0 ->
195    do_wait_for_tables(Tabs, Timeout);
196wait_for_tables(Tabs, Timeout) ->
197    {error, {badarg, Tabs, Timeout}}.
198
199do_wait_for_tables(Tabs, 0) ->
200    reply_wait(Tabs);
201do_wait_for_tables(Tabs, Timeout) ->
202    Pid = spawn_link(?MODULE, wait_for_tables_init, [self(), Tabs]),
203    receive
204	{?SERVER_NAME, Pid, Res} ->
205	    Res;
206
207	{'EXIT', Pid, _} ->
208	    reply_wait(Tabs)
209
210    after Timeout ->
211	    unlink(Pid),
212	    exit(Pid, timeout),
213	    reply_wait(Tabs)
214    end.
215
216reply_wait(Tabs) ->
217    case catch mnesia_lib:active_tables() of
218	{'EXIT', _} ->
219	    {error, {node_not_running, node()}};
220	Active when list(Active) ->
221	    case Tabs -- Active of
222		[] ->
223		    ok;
224		BadTabs ->
225		    {timeout, BadTabs}
226	    end
227    end.
228
229wait_for_tables_init(From, Tabs) ->
230    process_flag(trap_exit, true),
231    Res = wait_for_init(From, Tabs, whereis(?SERVER_NAME)),
232    From ! {?SERVER_NAME, self(), Res},
233    unlink(From),
234    exit(normal).
235
236wait_for_init(From, Tabs, Init) ->
237    case catch link(Init) of
238	{'EXIT', _} ->
239	    %% Mnesia is not started
240	    {error, {node_not_running, node()}};
241	true when pid(Init) ->
242	    cast({sync_tabs, Tabs, self()}),
243	    rec_tabs(Tabs, Tabs, From, Init)
244    end.
245
246sync_reply(Waiter, Tab) ->
247    Waiter ! {?SERVER_NAME, {tab_synced, Tab}}.
248
249rec_tabs([Tab | Tabs], AllTabs, From, Init) ->
250    receive
251	{?SERVER_NAME, {tab_synced, Tab}} ->
252	    rec_tabs(Tabs, AllTabs, From, Init);
253
254	{'EXIT', From, _} ->
255	    %% This will trigger an exit signal
256	    %% to mnesia_init
257	    exit(wait_for_tables_timeout);
258
259	{'EXIT', Init, _} ->
260	    %% Oops, mnesia_init stopped,
261	    exit(mnesia_stopped)
262    end;
263rec_tabs([], _, _, Init) ->
264    unlink(Init),
265    ok.
266
267get_cstructs() ->
268    call(get_cstructs).
269
270mnesia_down(Node) ->
271    case cast({mnesia_down, Node}) of
272	{error, _} -> mnesia_monitor:mnesia_down(?SERVER_NAME, Node);
273	_Pid ->  ok
274    end.
275wait_for_schema_commit_lock() ->
276    link(whereis(?SERVER_NAME)),
277    unsafe_call(wait_for_schema_commit_lock).
278
279block_controller() ->
280    call(block_controller).
281
282unblock_controller() ->
283    cast(unblock_controller).
284
285release_schema_commit_lock() ->
286    cast({release_schema_commit_lock, self()}),
287    unlink(whereis(?SERVER_NAME)).
288
289%% Special for preparation of add table copy
290get_network_copy(Tab, Cs) ->
291    Work = #net_load{table = Tab,
292		     reason = {dumper, add_table_copy},
293		     cstruct = Cs
294		    },
295    Res = (catch load_table(Work)),
296    if Res#loader_done.is_loaded == true ->
297	    Tab = Res#loader_done.table_name,
298	    case Res#loader_done.needs_announce of
299		true ->
300		    i_have_tab(Tab);
301		false ->
302		    ignore
303	    end;
304       true -> ignore
305    end,
306
307    receive %% Flush copier done message
308	{copier_done, _Node} ->
309	    ok
310    after 500 ->  %% avoid hanging if something is wrong and we shall fail.
311	    ignore
312    end,
313    Res#loader_done.reply.
314
315%% This functions is invoked from the dumper
316%%
317%% There are two cases here:
318%% startup ->
319%%   no need for sync, since mnesia_controller not started yet
320%% schema_trans ->
321%%   already synced with mnesia_controller since the dumper
322%%   is syncronously started from mnesia_controller
323
324create_table(Tab) ->
325    {loaded, ok} = mnesia_loader:disc_load_table(Tab, {dumper,create_table}).
326
327get_disc_copy(Tab) ->
328    disc_load_table(Tab, {dumper,change_table_copy_type}, undefined).
329
330%% Returns ok instead of yes
331force_load_table(Tab) when atom(Tab), Tab /= schema ->
332    case ?catch_val({Tab, storage_type}) of
333	ram_copies ->
334	    do_force_load_table(Tab);
335	disc_copies ->
336	    do_force_load_table(Tab);
337	disc_only_copies ->
338	    do_force_load_table(Tab);
339	unknown ->
340	    set({Tab, load_by_force}, true),
341	    cast({force_load_updated, Tab}),
342	    wait_for_tables([Tab], infinity);
343	{'EXIT', _} ->
344	    {error, {no_exists, Tab}}
345    end;
346force_load_table(Tab) ->
347    {error, {bad_type, Tab}}.
348
349do_force_load_table(Tab) ->
350    Loaded = ?catch_val({Tab, load_reason}),
351    case Loaded of
352	unknown ->
353	    set({Tab, load_by_force}, true),
354	    mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user),
355	    wait_for_tables([Tab], infinity);
356	{'EXIT', _} ->
357	    set({Tab, load_by_force}, true),
358	    mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user),
359	    wait_for_tables([Tab], infinity);
360	_ ->
361	    ok
362    end.
363master_nodes_updated(schema, _Masters) ->
364    ignore;
365master_nodes_updated(Tab, Masters) ->
366    cast({master_nodes_updated, Tab, Masters}).
367
368schedule_late_disc_load(Tabs, Reason) ->
369    MsgTag = late_disc_load,
370    try_schedule_late_disc_load(Tabs, Reason, MsgTag).
371
372try_schedule_late_disc_load(Tabs, _Reason, MsgTag)
373  when Tabs == [], MsgTag /= schema_is_merged ->
374    ignore;
375try_schedule_late_disc_load(Tabs, Reason, MsgTag) ->
376    GetIntents =
377	fun() ->
378		Item = mnesia_late_disc_load,
379		Nodes = val({current, db_nodes}),
380		mnesia:lock({global, Item, Nodes}, write),
381		case multicall(Nodes -- [node()], disc_load_intents) of
382		    {Replies, []} ->
383			call({MsgTag, Tabs, Reason, Replies}),
384			done;
385		    {_, BadNodes} ->
386			%% Some nodes did not respond, lets try again
387			{retry, BadNodes}
388		end
389	end,
390    case mnesia:transaction(GetIntents) of
391	{'atomic', done} ->
392	    done;
393	{'atomic', {retry, BadNodes}} ->
394	    verbose("Retry late_load_tables because bad nodes: ~p~n",
395		    [BadNodes]),
396	    try_schedule_late_disc_load(Tabs, Reason, MsgTag);
397	{aborted, AbortReason} ->
398	    fatal("Cannot late_load_tables~p: ~p~n",
399		  [[Tabs, Reason, MsgTag], AbortReason])
400    end.
401
402connect_nodes(Ns) ->
403    case mnesia:system_info(is_running) of
404	no ->
405	    {error, {node_not_running, node()}};
406	yes ->
407	    {NewC, OldC} = mnesia_recover:connect_nodes(Ns),
408	    Connected = NewC ++OldC,
409	    New1 = mnesia_lib:intersect(Ns, Connected),
410	    New = New1 -- val({current, db_nodes}),
411
412	    case try_merge_schema(New) of
413		ok ->
414		    mnesia_lib:add_list(extra_db_nodes, New),
415		    {ok, New};
416		{aborted, {throw, Str}} when list(Str) ->
417		    %%mnesia_recover:disconnect_nodes(New),
418		    {error, {merge_schema_failed, lists:flatten(Str)}};
419		Else ->
420		    %% Unconnect nodes where merge failed!!
421		    %% mnesia_recover:disconnect_nodes(New),
422		    {error, Else}
423	    end
424    end.
425
426%% Merge the local schema with the schema on other nodes.
427%% But first we must let all processes that want to force
428%% load tables wait until the schema merge is done.
429
430merge_schema() ->
431    AllNodes = mnesia_lib:all_nodes(),
432    case try_merge_schema(AllNodes) of
433	ok ->
434	    schema_is_merged();
435	{aborted, {throw, Str}} when list(Str) ->
436	    fatal("Failed to merge schema: ~s~n", [Str]);
437	Else ->
438	    fatal("Failed to merge schema: ~p~n", [Else])
439    end.
440
441try_merge_schema(Nodes) ->
442    case mnesia_schema:merge_schema() of
443	{'atomic', not_merged} ->
444	    %% No more nodes that we need to merge the schema with
445	    ok;
446	{'atomic', {merged, OldFriends, NewFriends}} ->
447	    %% Check if new nodes has been added to the schema
448	    Diff = mnesia_lib:all_nodes() -- [node() | Nodes],
449	    mnesia_recover:connect_nodes(Diff),
450
451	    %% Tell everybody to adopt orphan tables
452	    im_running(OldFriends, NewFriends),
453	    im_running(NewFriends, OldFriends),
454
455	    try_merge_schema(Nodes);
456	{'atomic', {"Cannot get cstructs", Node, Reason}} ->
457	    dbg_out("Cannot get cstructs, Node ~p ~p~n", [Node, Reason]),
458	    timer:sleep(1000), % Avoid a endless loop look alike
459	    try_merge_schema(Nodes);
460	Other ->
461	    Other
462    end.
463
464im_running(OldFriends, NewFriends) ->
465    abcast(OldFriends, {im_running, node(), NewFriends}).
466
467schema_is_merged() ->
468    MsgTag = schema_is_merged,
469    SafeLoads = initial_safe_loads(),
470
471    %% At this point we do not know anything about
472    %% which tables that the other nodes already
473    %% has loaded and therefore we let the normal
474    %% processing of the loader_queue take care
475    %% of it, since we at that time point will
476    %% know the whereabouts. We rely on the fact
477    %% that all nodes tells each other directly
478    %% when they have loaded a table and are
479    %% willing to share it.
480
481    try_schedule_late_disc_load(SafeLoads, initial, MsgTag).
482
483
484cast(Msg) ->
485    case whereis(?SERVER_NAME) of
486	undefined ->{error, {node_not_running, node()}};
487	Pid ->  gen_server:cast(Pid, Msg)
488    end.
489
490abcast(Nodes, Msg) ->
491    gen_server:abcast(Nodes, ?SERVER_NAME, Msg).
492
493unsafe_call(Msg) ->
494    case whereis(?SERVER_NAME) of
495	undefined -> {error, {node_not_running, node()}};
496	Pid -> gen_server:call(Pid, Msg, infinity)
497    end.
498
499call(Msg) ->
500    case whereis(?SERVER_NAME) of
501	undefined ->
502	    {error, {node_not_running, node()}};
503	Pid ->
504	    link(Pid),
505	    Res = gen_server:call(Pid, Msg, infinity),
506	    unlink(Pid),
507
508	    %% We get an exit signal if server dies
509            receive
510                {'EXIT', Pid, _Reason} ->
511                    {error, {node_not_running, node()}}
512            after 0 ->
513                    ignore
514            end,
515	    Res
516    end.
517
518remote_call(Node, Func, Args) ->
519    case catch gen_server:call({?MODULE, Node}, {Func, Args, self()}, infinity) of
520	{'EXIT', Error} ->
521	    {error, Error};
522	Else ->
523	    Else
524    end.
525
526multicall(Nodes, Msg) ->
527    {Good, Bad} = gen_server:multi_call(Nodes, ?MODULE, Msg, infinity),
528    PatchedGood = [Reply || {_Node, Reply} <- Good],
529    {PatchedGood, Bad}.  %% Make the replies look like rpc:multicalls..
530%%    rpc:multicall(Nodes, ?MODULE, call, [Msg]).
531
532%%%----------------------------------------------------------------------
533%%% Callback functions from gen_server
534%%%----------------------------------------------------------------------
535
536%%----------------------------------------------------------------------
537%% Func: init/1
538%% Returns: {ok, State}          |
539%%          {ok, State, Timeout} |
540%%          {stop, Reason}
541%%----------------------------------------------------------------------
542init([Parent]) ->
543    process_flag(trap_exit, true),
544    mnesia_lib:verbose("~p starting: ~p~n", [?SERVER_NAME, self()]),
545
546    %% Handshake and initialize transaction recovery
547    %% for new nodes detected in the schema
548    All = mnesia_lib:all_nodes(),
549    Diff = All -- [node() | val(original_nodes)],
550    mnesia_lib:unset(original_nodes),
551    mnesia_recover:connect_nodes(Diff),
552
553    Interval = mnesia_monitor:get_env(dump_log_time_threshold),
554    Msg = {async_dump_log, time_threshold},
555    {ok, Ref} = timer:send_interval(Interval, Msg),
556    mnesia_dumper:start_regulator(),
557
558    {ok, #state{supervisor = Parent, dump_log_timer_ref = Ref}}.
559
560%%----------------------------------------------------------------------
561%% Func: handle_call/3
562%% Returns: {reply, Reply, State}          |
563%%          {reply, Reply, State, Timeout} |
564%%          {noreply, State}               |
565%%          {noreply, State, Timeout}      |
566%%          {stop, Reason, Reply, State}   | (terminate/2 is called)
567%%          {stop, Reason, Reply, State}     (terminate/2 is called)
568%%----------------------------------------------------------------------
569
570handle_call({sync_dump_log, InitBy}, From, State) ->
571    Worker = #dump_log{initiated_by = InitBy,
572		       opt_reply_to = From
573		      },
574    State2 = add_worker(Worker, State),
575    noreply(State2);
576
577handle_call(wait_for_schema_commit_lock, From, State) ->
578    Worker = #schema_commit_lock{owner = From},
579    State2 = add_worker(Worker, State),
580    noreply(State2);
581
582handle_call(block_controller, From, State) ->
583    Worker = #block_controller{owner = From},
584    State2 = add_worker(Worker, State),
585    noreply(State2);
586
587
588handle_call(get_cstructs, From, State) ->
589    Tabs = val({schema, tables}),
590    Cstructs = [val({T, cstruct}) || T <- Tabs],
591    Running = val({current, db_nodes}),
592    reply(From, {cstructs, Cstructs, Running}),
593    noreply(State);
594
595handle_call({schema_is_merged, TabsR, Reason, RemoteLoaders}, From, State) ->
596    State2 = late_disc_load(TabsR, Reason, RemoteLoaders, From, State),
597
598    %% Handle early messages
599    Msgs = State2#state.early_msgs,
600    State3 = State2#state{early_msgs = [], schema_is_merged = true},
601    Ns = val({current, db_nodes}),
602    dbg_out("Schema is merged ~w, State ~w~n", [Ns, State3]),
603%%    dbg_out("handle_early_msgs ~p ~n", [Msgs]), % qqqq
604    handle_early_msgs(lists:reverse(Msgs), State3);
605
606handle_call(disc_load_intents, From, State) ->
607    Tabs = disc_load_intents(State#state.loader_queue) ++
608           disc_load_intents(State#state.late_loader_queue),
609    ActiveTabs = mnesia_lib:local_active_tables(),
610    reply(From, {ok, node(), mnesia_lib:union(Tabs, ActiveTabs)}),
611    noreply(State);
612
613handle_call({update_where_to_write, [add, Tab, AddNode], _From}, _Dummy, State) ->
614%%%    dbg_out("update_w2w ~p", [[add, Tab, AddNode]]), %%% qqqq
615    Current = val({current, db_nodes}),
616    Res =
617	case lists:member(AddNode, Current) and
618	    State#state.schema_is_merged == true of
619	    true ->
620		mnesia_lib:add({Tab, where_to_write}, AddNode);
621	    false ->
622		ignore
623	end,
624    {reply, Res, State};
625
626handle_call({add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From},
627	    ReplyTo, State) ->
628    KnownNode = lists:member(ToNode, val({current, db_nodes})),
629    Merged = State#state.schema_is_merged,
630    if
631	KnownNode == false ->
632	    reply(ReplyTo, ignore),
633	    noreply(State);
634	Merged == true ->
635	    Res = add_active_replica(Tab, ToNode, RemoteS, AccessMode),
636	    reply(ReplyTo, Res),
637	    noreply(State);
638	true -> %% Schema is not merged
639	    Msg = {add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From},
640	    Msgs = State#state.early_msgs,
641	    reply(ReplyTo, ignore),   %% Reply ignore and add data after schema merge
642	    noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]})
643    end;
644
645handle_call({unannounce_add_table_copy, [Tab, Node], From}, ReplyTo, State) ->
646    KnownNode = lists:member(node(From), val({current, db_nodes})),
647    Merged = State#state.schema_is_merged,
648    if
649	KnownNode == false ->
650	    reply(ReplyTo, ignore),
651	    noreply(State);
652	Merged == true ->
653	    Res = unannounce_add_table_copy(Tab, Node),
654	    reply(ReplyTo, Res),
655	    noreply(State);
656	true -> %% Schema is not merged
657	    Msg = {unannounce_add_table_copy, [Tab, Node], From},
658	    Msgs = State#state.early_msgs,
659	    reply(ReplyTo, ignore),   %% Reply ignore and add data after schema merge
660	    %% Set ReplyTO to undefined so we don't reply twice
661	    noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]})
662    end;
663
664handle_call(Msg, From, State) when State#state.schema_is_merged == false ->
665    %% Buffer early messages
666%%    dbg_out("Buffered early msg ~p ~n", [Msg]),   %% qqqq
667    Msgs = State#state.early_msgs,
668    noreply(State#state{early_msgs = [{call, Msg, From} | Msgs]});
669
670handle_call({net_load, Tab, Cs}, From, State) ->
671    Worker = #net_load{table = Tab,
672		       opt_reply_to = From,
673		       reason = add_table_copy,
674		       cstruct = Cs
675		      },
676    State2 = add_worker(Worker, State),
677    noreply(State2);
678
679handle_call({late_disc_load, Tabs, Reason, RemoteLoaders}, From, State) ->
680    State2 = late_disc_load(Tabs, Reason, RemoteLoaders, From, State),
681    noreply(State2);
682
683handle_call({block_table, [Tab], From}, _Dummy, State) ->
684    case lists:member(node(From), val({current, db_nodes})) of
685	true ->
686	    block_table(Tab);
687	false ->
688	    ignore
689    end,
690    {reply, ok, State};
691
692handle_call({check_w2r, _Node, Tab}, _From, State) ->
693    {reply, val({Tab, where_to_read}), State};
694
695handle_call(Msg, _From, State) ->
696    error("~p got unexpected call: ~p~n", [?SERVER_NAME, Msg]),
697    noreply(State).
698
699disc_load_intents([H | T]) when record(H, disc_load) ->
700    [H#disc_load.table | disc_load_intents(T)];
701disc_load_intents([H | T]) when record(H, late_load) ->
702    [H#late_load.table | disc_load_intents(T)];
703disc_load_intents( [H | T]) when record(H, net_load) ->
704    disc_load_intents(T);
705disc_load_intents([]) ->
706    [].
707
708late_disc_load(TabsR, Reason, RemoteLoaders, From, State) ->
709    verbose("Intend to load tables: ~p~n", [TabsR]),
710    ?eval_debug_fun({?MODULE, late_disc_load},
711		    [{tabs, TabsR},
712		     {reason, Reason},
713		     {loaders, RemoteLoaders}]),
714
715    reply(From, queued),
716    %% RemoteLoaders is a list of {ok, Node, Tabs} tuples
717
718    %% Remove deleted tabs
719    LocalTabs = mnesia_lib:val({schema, local_tables}),
720    Filter = fun({Tab, Reas}, Acc) ->
721		     case lists:member(Tab, LocalTabs) of
722			 true -> [{Tab, Reas} | Acc];
723		         false -> Acc
724		     end;
725		(Tab, Acc) ->
726		     case lists:member(Tab, LocalTabs) of
727			 true -> [Tab | Acc];
728		         false -> Acc
729		     end
730	     end,
731
732    Tabs = lists:foldl(Filter, [], TabsR),
733
734    Nodes = val({current, db_nodes}),
735    LateLoaders = late_loaders(Tabs, Reason, RemoteLoaders, Nodes),
736    LateQueue = State#state.late_loader_queue ++ LateLoaders,
737    State#state{late_loader_queue = LateQueue}.
738
739late_loaders([{Tab, Reason} | Tabs], DefaultReason, RemoteLoaders, Nodes) ->
740    LoadNodes = late_load_filter(RemoteLoaders, Tab, Nodes, []),
741    case LoadNodes of
742	[] ->
743	    cast({disc_load, Tab, Reason}); % Ugly cast
744	_ ->
745	    ignore
746    end,
747    LateLoad = #late_load{table = Tab, loaders = LoadNodes, reason = Reason},
748    [LateLoad | late_loaders(Tabs, DefaultReason, RemoteLoaders, Nodes)];
749
750late_loaders([Tab | Tabs], Reason, RemoteLoaders, Nodes) ->
751    Loaders = late_load_filter(RemoteLoaders, Tab, Nodes, []),
752    case Loaders of
753	[] ->
754	    cast({disc_load, Tab, Reason});  % Ugly cast
755	_ ->
756	    ignore
757    end,
758    LateLoad = #late_load{table = Tab, loaders = Loaders, reason = Reason},
759    [LateLoad | late_loaders(Tabs, Reason, RemoteLoaders, Nodes)];
760late_loaders([], _Reason, _RemoteLoaders, _Nodes) ->
761    [].
762
763late_load_filter([{error, _} | RemoteLoaders], Tab, Nodes, Acc) ->
764    late_load_filter(RemoteLoaders, Tab, Nodes, Acc);
765late_load_filter([{badrpc, _} | RemoteLoaders], Tab, Nodes, Acc) ->
766    late_load_filter(RemoteLoaders, Tab, Nodes, Acc);
767late_load_filter([RL | RemoteLoaders], Tab, Nodes, Acc) ->
768    {ok, Node, Intents} = RL,
769    Access = val({Tab, access_mode}),
770    LocalC = val({Tab, local_content}),
771    StillActive = lists:member(Node, Nodes),
772    RemoteIntent = lists:member(Tab, Intents),
773    if
774	Access == read_write,
775	LocalC == false,
776	StillActive == true,
777	RemoteIntent == true ->
778	    Masters = mnesia_recover:get_master_nodes(Tab),
779	    case lists:member(Node, Masters) of
780		true ->
781		    %% The other node is master node for
782		    %% the table, accept his load intent
783		    late_load_filter(RemoteLoaders, Tab, Nodes, [Node | Acc]);
784		false when Masters == [] ->
785		    %% The table has no master nodes
786		    %% accept his load intent
787		    late_load_filter(RemoteLoaders, Tab, Nodes, [Node | Acc]);
788		false ->
789		    %% Some one else is master node for
790		    %% the table, ignore his load intent
791		    late_load_filter(RemoteLoaders, Tab, Nodes, Acc)
792	    end;
793	true ->
794	    late_load_filter(RemoteLoaders, Tab, Nodes, Acc)
795    end;
796late_load_filter([], _Tab, _Nodes, Acc) ->
797    Acc.
798
799%%----------------------------------------------------------------------
800%% Func: handle_cast/2
801%% Returns: {noreply, State}          |
802%%          {noreply, State, Timeout} |
803%%          {stop, Reason, State}            (terminate/2 is called)
804%%----------------------------------------------------------------------
805
806handle_cast({release_schema_commit_lock, _Owner}, State) ->
807    if
808	State#state.is_stopping == true ->
809	    {stop, shutdown, State};
810	true ->
811	    case State#state.dumper_queue of
812		[#schema_commit_lock{}|Rest] ->
813		    [_Worker | Rest] = State#state.dumper_queue,
814		    State2 = State#state{dumper_pid = undefined,
815					 dumper_queue = Rest},
816		    State3 = opt_start_worker(State2),
817		    noreply(State3);
818		_ ->
819		    noreply(State)
820	    end
821    end;
822
823handle_cast(unblock_controller, State) ->
824    if
825	State#state.is_stopping == true ->
826	    {stop, shutdown, State};
827	record(hd(State#state.dumper_queue), block_controller) ->
828	    [_Worker | Rest] = State#state.dumper_queue,
829	    State2 = State#state{dumper_pid = undefined,
830				 dumper_queue = Rest},
831	    State3 = opt_start_worker(State2),
832	    noreply(State3)
833    end;
834
835handle_cast({mnesia_down, Node}, State) ->
836    maybe_log_mnesia_down(Node),
837    mnesia_lib:del({current, db_nodes}, Node),
838    mnesia_checkpoint:tm_mnesia_down(Node),
839    Alltabs = val({schema, tables}),
840    State2 = reconfigure_tables(Node, State, Alltabs),
841    case State#state.sender_pid of
842	undefined -> ignore;
843	Pid when pid(Pid) -> Pid ! {copier_done, Node}
844    end,
845    case State#state.loader_pid of
846	undefined -> ignore;
847	Pid2 when pid(Pid2) -> Pid2 ! {copier_done, Node}
848    end,
849    NewSenders =
850	case State#state.sender_queue of
851	    [OldSender | RestSenders] ->
852		Remove = fun(ST) ->
853				 node(ST#send_table.receiver_pid) /= Node
854			 end,
855		NewS = lists:filter(Remove, RestSenders),
856		%% Keep old sender it will be removed by sender_done
857		[OldSender | NewS];
858	    [] ->
859		[]
860	end,
861    Early = remove_early_messages(State2#state.early_msgs, Node),
862    mnesia_monitor:mnesia_down(?SERVER_NAME, Node),
863    noreply(State2#state{sender_queue = NewSenders, early_msgs = Early});
864
865handle_cast({im_running, _Node, NewFriends}, State) ->
866    Tabs = mnesia_lib:local_active_tables() -- [schema],
867    Ns = mnesia_lib:intersect(NewFriends, val({current, db_nodes})),
868    abcast(Ns, {adopt_orphans, node(), Tabs}),
869    noreply(State);
870
871handle_cast(Msg, State) when State#state.schema_is_merged == false ->
872    %% Buffer early messages
873    Msgs = State#state.early_msgs,
874    noreply(State#state{early_msgs = [{cast, Msg} | Msgs]});
875
876handle_cast({disc_load, Tab, Reason}, State) ->
877    Worker = #disc_load{table = Tab, reason = Reason},
878    State2 = add_worker(Worker, State),
879    noreply(State2);
880
881handle_cast(Worker, State) when record(Worker, send_table) ->
882    State2 = add_worker(Worker, State),
883    noreply(State2);
884
885handle_cast({sync_tabs, Tabs, From}, State) ->
886    %% user initiated wait_for_tables
887    handle_sync_tabs(Tabs, From),
888    noreply(State);
889
890handle_cast({i_have_tab, Tab, Node}, State) ->
891    case lists:member(Node, val({current, db_nodes})) of
892	true ->
893	    State2 = node_has_tabs([Tab], Node, State),
894	    noreply(State2);
895	false ->
896	    noreply(State)
897    end;
898
899handle_cast({force_load_updated, Tab}, State) ->
900    case val({Tab, active_replicas}) of
901	[] ->
902	    %% No valid replicas
903	    noreply(State);
904	[SomeNode | _] ->
905	    State2 = node_has_tabs([Tab], SomeNode, State),
906	    noreply(State2)
907    end;
908
909handle_cast({master_nodes_updated, Tab, Masters}, State) ->
910    Active = val({Tab, active_replicas}),
911    Valid =
912	case val({Tab, load_by_force}) of
913	    true ->
914		Active;
915	    false ->
916		if
917		    Masters == [] ->
918			Active;
919		    true ->
920			mnesia_lib:intersect(Masters, Active)
921		end
922	end,
923    case Valid of
924	[] ->
925	    %% No valid replicas
926	    noreply(State);
927	[SomeNode | _] ->
928	    State2 = node_has_tabs([Tab], SomeNode, State),
929	    noreply(State2)
930    end;
931
932handle_cast({adopt_orphans, Node, Tabs}, State) ->
933
934    State2 = node_has_tabs(Tabs, Node, State),
935
936    %% Register the other node as up and running
937    mnesia_recover:log_mnesia_up(Node),
938    verbose("Logging mnesia_up ~w~n", [Node]),
939    mnesia_lib:report_system_event({mnesia_up, Node}),
940
941    %% Load orphan tables
942    LocalTabs = val({schema, local_tables}) -- [schema],
943    Nodes = val({current, db_nodes}),
944    {LocalOrphans, RemoteMasters} =
945	orphan_tables(LocalTabs, Node, Nodes, [], []),
946    Reason = {adopt_orphan, node()},
947    mnesia_late_loader:async_late_disc_load(node(), LocalOrphans, Reason),
948
949    Fun =
950	fun(N) ->
951		RemoteOrphans =
952		    [Tab || {Tab, Ns} <- RemoteMasters,
953			    lists:member(N, Ns)],
954		mnesia_late_loader:maybe_async_late_disc_load(N, RemoteOrphans, Reason)
955	  end,
956    lists:foreach(Fun, Nodes),
957
958    Queue = State2#state.loader_queue,
959    State3 = State2#state{loader_queue = Queue},
960    noreply(State3);
961
962handle_cast(Msg, State) ->
963    error("~p got unexpected cast: ~p~n", [?SERVER_NAME, Msg]),
964    noreply(State).
965
966handle_sync_tabs([Tab | Tabs], From) ->
967    case val({Tab, where_to_read}) of
968	nowhere ->
969	    case get({sync_tab, Tab}) of
970		undefined ->
971		    put({sync_tab, Tab}, [From]);
972		Pids ->
973		    put({sync_tab, Tab}, [From | Pids])
974	    end;
975	_ ->
976	    sync_reply(From, Tab)
977    end,
978    handle_sync_tabs(Tabs, From);
979handle_sync_tabs([], _From) ->
980    ok.
981
982%%----------------------------------------------------------------------
983%% Func: handle_info/2
984%% Returns: {noreply, State}          |
985%%          {noreply, State, Timeout} |
986%%          {stop, Reason, State}            (terminate/2 is called)
987%%----------------------------------------------------------------------
988
989handle_info({async_dump_log, InitBy}, State) ->
990    Worker = #dump_log{initiated_by = InitBy},
991    State2 = add_worker(Worker, State),
992    noreply(State2);
993
994handle_info(Done, State) when record(Done, dumper_done) ->
995    Pid = Done#dumper_done.worker_pid,
996    Res = Done#dumper_done.worker_res,
997    if
998	State#state.is_stopping == true ->
999	    {stop, shutdown, State};
1000	Res == dumped, Pid == State#state.dumper_pid ->
1001	    [Worker | Rest] = State#state.dumper_queue,
1002	    reply(Worker#dump_log.opt_reply_to, Res),
1003	    State2 = State#state{dumper_pid = undefined,
1004				 dumper_queue = Rest},
1005	    State3 = opt_start_worker(State2),
1006	    noreply(State3);
1007	true ->
1008	    fatal("Dumper failed: ~p~n state: ~p~n", [Res, State]),
1009	    {stop, fatal, State}
1010    end;
1011
1012handle_info(Done, State) when record(Done, loader_done) ->
1013    if
1014	%% Assertion
1015	Done#loader_done.worker_pid == State#state.loader_pid -> ok
1016    end,
1017
1018    [_Worker | Rest] = LoadQ0 = State#state.loader_queue,
1019    LateQueue0 = State#state.late_loader_queue,
1020    {LoadQ, LateQueue} =
1021	case Done#loader_done.is_loaded of
1022	    true ->
1023		Tab = Done#loader_done.table_name,
1024
1025		%% Optional user sync
1026		case Done#loader_done.needs_sync of
1027		    true -> user_sync_tab(Tab);
1028		    false -> ignore
1029		end,
1030
1031		%% Optional table announcement
1032		case Done#loader_done.needs_announce of
1033		    true ->
1034			i_have_tab(Tab),
1035			case Tab of
1036			    schema ->
1037				ignore;
1038			    _ ->
1039				%% Local node needs to perform user_sync_tab/1
1040				Ns = val({current, db_nodes}),
1041				abcast(Ns, {i_have_tab, Tab, node()})
1042			end;
1043		    false ->
1044			case Tab of
1045			    schema ->
1046				ignore;
1047			    _ ->
1048				%% Local node needs to perform user_sync_tab/1
1049				Ns = val({current, db_nodes}),
1050				AlreadyKnows = val({Tab, active_replicas}),
1051				abcast(Ns -- AlreadyKnows, {i_have_tab, Tab, node()})
1052			end
1053		end,
1054
1055		%% Optional client reply
1056		case Done#loader_done.needs_reply of
1057		    true ->
1058			reply(Done#loader_done.reply_to,
1059			      Done#loader_done.reply);
1060		    false ->
1061			ignore
1062		end,
1063		{Rest, reply_late_load(Tab, LateQueue0)};
1064	    false ->
1065		case Done#loader_done.reply of
1066		    restart ->
1067			{LoadQ0, LateQueue0};
1068		    _ ->
1069			{Rest, LateQueue0}
1070		end
1071	end,
1072
1073    State2 = State#state{loader_pid = undefined,
1074			 loader_queue = LoadQ,
1075			 late_loader_queue = LateQueue},
1076
1077    State3 = opt_start_worker(State2),
1078    noreply(State3);
1079
1080handle_info(Done, State) when record(Done, sender_done) ->
1081    Pid = Done#sender_done.worker_pid,
1082    Res = Done#sender_done.worker_res,
1083    if
1084	Res == ok, Pid == State#state.sender_pid ->
1085	    [Worker | Rest] = State#state.sender_queue,
1086	    Worker#send_table.receiver_pid ! {copier_done, node()},
1087	    State2 = State#state{sender_pid = undefined,
1088				 sender_queue = Rest},
1089	    State3 = opt_start_worker(State2),
1090	    noreply(State3);
1091	true ->
1092	    %% No need to send any message to the table receiver
1093	    %% since it will soon get a mnesia_down anyway
1094	    fatal("Sender failed: ~p~n state: ~p~n", [Res, State]),
1095	    {stop, fatal, State}
1096    end;
1097
1098handle_info({'EXIT', Pid, R}, State) when Pid == State#state.supervisor ->
1099    catch set(mnesia_status, stopping),
1100    case State#state.dumper_pid of
1101	undefined ->
1102	    dbg_out("~p was ~p~n", [?SERVER_NAME, R]),
1103	    {stop, shutdown, State};
1104	_ ->
1105	    noreply(State#state{is_stopping = true})
1106    end;
1107
1108handle_info({'EXIT', Pid, R}, State) when Pid == State#state.dumper_pid ->
1109    case State#state.dumper_queue of
1110	[#schema_commit_lock{}|Workers] ->  %% Schema trans crashed or was killed
1111	    State2 = State#state{dumper_queue = Workers, dumper_pid = undefined},
1112	    State3 = opt_start_worker(State2),
1113	    noreply(State3);
1114	_Other ->
1115	    fatal("Dumper or schema commit crashed: ~p~n state: ~p~n", [R, State]),
1116	    {stop, fatal, State}
1117    end;
1118
1119handle_info({'EXIT', Pid, R}, State) when Pid == State#state.loader_pid ->
1120    fatal("Loader crashed: ~p~n state: ~p~n", [R, State]),
1121    {stop, fatal, State};
1122
1123handle_info({'EXIT', Pid, R}, State) when Pid == State#state.sender_pid ->
1124    %% No need to send any message to the table receiver
1125    %% since it will soon get a mnesia_down anyway
1126    fatal("Sender crashed: ~p~n state: ~p~n", [R, State]),
1127    {stop, fatal, State};
1128
1129handle_info({From, get_state}, State) ->
1130    From ! {?SERVER_NAME, State},
1131    noreply(State);
1132
1133%% No real need for buffering
1134handle_info(Msg, State) when State#state.schema_is_merged == false ->
1135    %% Buffer early messages
1136    Msgs = State#state.early_msgs,
1137    noreply(State#state{early_msgs = [{info, Msg} | Msgs]});
1138
1139handle_info({'EXIT', Pid, wait_for_tables_timeout}, State) ->
1140    sync_tab_timeout(Pid, get()),
1141    noreply(State);
1142
1143handle_info(Msg, State) ->
1144    error("~p got unexpected info: ~p~n", [?SERVER_NAME, Msg]),
1145    noreply(State).
1146
1147reply_late_load(Tab, [H | T]) when H#late_load.table == Tab ->
1148    reply(H#late_load.opt_reply_to, ok),
1149    reply_late_load(Tab, T);
1150reply_late_load(Tab, [H | T])  ->
1151    [H | reply_late_load(Tab, T)];
1152reply_late_load(_Tab, []) ->
1153    [].
1154
1155sync_tab_timeout(Pid, [{{sync_tab, Tab}, Pids} | Tail]) ->
1156    case lists:delete(Pid, Pids) of
1157	[] ->
1158	    erase({sync_tab, Tab});
1159	Pids2 ->
1160	    put({sync_tab, Tab}, Pids2)
1161    end,
1162    sync_tab_timeout(Pid, Tail);
1163sync_tab_timeout(Pid, [_ | Tail]) ->
1164    sync_tab_timeout(Pid, Tail);
1165sync_tab_timeout(_Pid, []) ->
1166    ok.
1167
1168%% Pick the load record that has the highest load order
1169%% Returns {BestLoad, RemainingQueue} or {none, []} if queue is empty
1170pick_next(Queue) ->
1171    pick_next(Queue, none, none, []).
1172
1173pick_next([Head | Tail], Load, Order, Rest) when record(Head, net_load) ->
1174    Tab = Head#net_load.table,
1175    select_best(Head, Tail, val({Tab, load_order}), Load, Order, Rest);
1176pick_next([Head | Tail], Load, Order, Rest) when record(Head, disc_load) ->
1177    Tab = Head#disc_load.table,
1178    select_best(Head, Tail, val({Tab, load_order}), Load, Order, Rest);
1179pick_next([], Load, _Order, Rest) ->
1180    {Load, Rest}.
1181
1182select_best(Load, Tail, Order, none, none, Rest) ->
1183    pick_next(Tail, Load, Order, Rest);
1184select_best(Load, Tail, Order, OldLoad, OldOrder, Rest) when Order > OldOrder ->
1185    pick_next(Tail, Load, Order, [OldLoad | Rest]);
1186select_best(Load, Tail, _Order, OldLoad, OldOrder, Rest) ->
1187    pick_next(Tail, OldLoad, OldOrder, [Load | Rest]).
1188
1189%%----------------------------------------------------------------------
1190%% Func: terminate/2
1191%% Purpose: Shutdown the server
1192%% Returns: any (ignored by gen_server)
1193%%----------------------------------------------------------------------
1194terminate(Reason, State) ->
1195    mnesia_monitor:terminate_proc(?SERVER_NAME, Reason, State).
1196
1197%%----------------------------------------------------------------------
1198%% Func: code_change/3
1199%% Purpose: Upgrade process when its code is to be changed
1200%% Returns: {ok, NewState}
1201%%----------------------------------------------------------------------
1202code_change(_OldVsn, State, _Extra) ->
1203    {ok, State}.
1204
1205%%%----------------------------------------------------------------------
1206%%% Internal functions
1207%%%----------------------------------------------------------------------
1208
1209maybe_log_mnesia_down(N) ->
1210    %% We use mnesia_down when deciding which tables to load locally,
1211    %% so if we are not running (i.e haven't decided which tables
1212    %% to load locally), don't log mnesia_down yet.
1213    case mnesia_lib:is_running() of
1214	yes ->
1215	    verbose("Logging mnesia_down ~w~n", [N]),
1216	    mnesia_recover:log_mnesia_down(N),
1217	    ok;
1218	_ ->
1219	    Filter = fun(Tab) ->
1220			     inactive_copy_holders(Tab, N)
1221		     end,
1222	    HalfLoadedTabs = lists:any(Filter, val({schema, local_tables}) -- [schema]),
1223	    if
1224		HalfLoadedTabs == true ->
1225		    verbose("Logging mnesia_down ~w~n", [N]),
1226		    mnesia_recover:log_mnesia_down(N),
1227		    ok;
1228		true ->
1229		    %% Unfortunately we have not loaded some common
1230		    %% tables yet, so we cannot rely on the nodedown
1231		    log_later   %% BUGBUG handle this case!!!
1232	    end
1233    end.
1234
1235inactive_copy_holders(Tab, Node) ->
1236    Cs = val({Tab, cstruct}),
1237    case mnesia_lib:cs_to_storage_type(Node, Cs) of
1238	unknown ->
1239	    false;
1240	_Storage ->
1241	    mnesia_lib:not_active_here(Tab)
1242    end.
1243
1244orphan_tables([Tab | Tabs], Node, Ns, Local, Remote) ->
1245    Cs = val({Tab, cstruct}),
1246    CopyHolders = mnesia_lib:copy_holders(Cs),
1247    RamCopyHolders = Cs#cstruct.ram_copies,
1248    DiscCopyHolders = CopyHolders -- RamCopyHolders,
1249    DiscNodes = val({schema, disc_copies}),
1250    LocalContent = Cs#cstruct.local_content,
1251    RamCopyHoldersOnDiscNodes = mnesia_lib:intersect(RamCopyHolders, DiscNodes),
1252    Active = val({Tab, active_replicas}),
1253    case lists:member(Node, DiscCopyHolders) of
1254	true when Active == [] ->
1255	    case DiscCopyHolders -- Ns of
1256		[] ->
1257		    %% We're last up and the other nodes have not
1258		    %% loaded the table. Lets load it if we are
1259		    %% the smallest node.
1260		    case lists:min(DiscCopyHolders) of
1261			Min when Min == node() ->
1262			    case mnesia_recover:get_master_nodes(Tab) of
1263				[] ->
1264				    L = [Tab | Local],
1265				    orphan_tables(Tabs, Node, Ns, L, Remote);
1266				Masters ->
1267				    R = [{Tab, Masters} | Remote],
1268				    orphan_tables(Tabs, Node, Ns, Local, R)
1269			    end;
1270			_ ->
1271			    orphan_tables(Tabs, Node, Ns, Local, Remote)
1272		    end;
1273		_ ->
1274		    orphan_tables(Tabs, Node, Ns, Local, Remote)
1275	    end;
1276	false when Active == [], DiscCopyHolders == [], RamCopyHoldersOnDiscNodes == [] ->
1277	    %% Special case when all replicas resides on disc less nodes
1278	    orphan_tables(Tabs, Node, Ns, [Tab | Local], Remote);
1279	_ when LocalContent == true ->
1280	    orphan_tables(Tabs, Node, Ns, [Tab | Local], Remote);
1281	_ ->
1282	    orphan_tables(Tabs, Node, Ns, Local, Remote)
1283    end;
1284orphan_tables([], _, _, LocalOrphans, RemoteMasters) ->
1285    {LocalOrphans, RemoteMasters}.
1286
1287node_has_tabs([Tab | Tabs], Node, State) when Node /= node() ->
1288    State2 = update_whereabouts(Tab, Node, State),
1289    node_has_tabs(Tabs, Node, State2);
1290node_has_tabs([Tab | Tabs], Node, State) ->
1291    user_sync_tab(Tab),
1292    node_has_tabs(Tabs, Node, State);
1293node_has_tabs([], _Node, State) ->
1294    State.
1295
1296update_whereabouts(Tab, Node, State) ->
1297    Storage = val({Tab, storage_type}),
1298    Read = val({Tab, where_to_read}),
1299    LocalC = val({Tab, local_content}),
1300    BeingCreated = (?catch_val({Tab, create_table}) == true),
1301    Masters = mnesia_recover:get_master_nodes(Tab),
1302    ByForce = val({Tab, load_by_force}),
1303    GoGetIt =
1304	if
1305	    ByForce == true ->
1306		true;
1307	    Masters == [] ->
1308		true;
1309	    true ->
1310		lists:member(Node, Masters)
1311	end,
1312
1313    dbg_out("Table ~w is loaded on ~w. s=~w, r=~w, lc=~w, f=~w, m=~w~n",
1314	    [Tab, Node, Storage, Read, LocalC, ByForce, GoGetIt]),
1315    if
1316	LocalC == true ->
1317	    %% Local contents, don't care about other node
1318	    State;
1319	Storage == unknown, Read == nowhere ->
1320	    %% No own copy, time to read remotely
1321	    %% if the other node is a good node
1322	    add_active_replica(Tab, Node),
1323	    case GoGetIt of
1324		true ->
1325		    set({Tab, where_to_read}, Node),
1326		    user_sync_tab(Tab),
1327		    State;
1328		false ->
1329		    State
1330	    end;
1331	Storage == unknown ->
1332	    %% No own copy, continue to read remotely
1333	    add_active_replica(Tab, Node),
1334	    NodeST = mnesia_lib:storage_type_at_node(Node, Tab),
1335	    ReadST = mnesia_lib:storage_type_at_node(Read, Tab),
1336	    if   %% Avoid reading from disc_only_copies
1337		NodeST == disc_only_copies ->
1338		    ignore;
1339		ReadST == disc_only_copies ->
1340		    mnesia_lib:set_remote_where_to_read(Tab);
1341		true ->
1342		    ignore
1343	    end,
1344	    user_sync_tab(Tab),
1345	    State;
1346	BeingCreated == true ->
1347	    %% The table is currently being created
1348	    %% and we shall have an own copy of it.
1349	    %% We will load the (empty) table locally.
1350	    add_active_replica(Tab, Node),
1351	    State;
1352	Read == nowhere ->
1353	    %% Own copy, go and get a copy of the table
1354	    %% if the other node is master or if there
1355	    %% are no master at all
1356	    add_active_replica(Tab, Node),
1357	    case GoGetIt of
1358		true ->
1359		    Worker = #net_load{table = Tab,
1360				       reason = {active_remote, Node}},
1361		    add_worker(Worker, State);
1362		false ->
1363		    State
1364	    end;
1365	true ->
1366	    %% We already have an own copy
1367	    add_active_replica(Tab, Node),
1368	    user_sync_tab(Tab),
1369	    State
1370    end.
1371
1372initial_safe_loads() ->
1373    case val({schema, storage_type}) of
1374	ram_copies ->
1375	    Downs = [],
1376	    Tabs = val({schema, local_tables}) -- [schema],
1377	    LastC = fun(T) -> last_consistent_replica(T, Downs) end,
1378	    lists:zf(LastC, Tabs);
1379
1380	disc_copies ->
1381	    Downs = mnesia_recover:get_mnesia_downs(),
1382	    dbg_out("mnesia_downs = ~p~n", [Downs]),
1383
1384	    Tabs = val({schema, local_tables}) -- [schema],
1385	    LastC = fun(T) -> last_consistent_replica(T, Downs) end,
1386	    lists:zf(LastC, Tabs)
1387    end.
1388
1389last_consistent_replica(Tab, Downs) ->
1390    Cs = val({Tab, cstruct}),
1391    Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
1392    Ram = Cs#cstruct.ram_copies,
1393    Disc = Cs#cstruct.disc_copies,
1394    DiscOnly = Cs#cstruct.disc_only_copies,
1395    BetterCopies0 = mnesia_lib:remote_copy_holders(Cs) -- Downs,
1396    BetterCopies = BetterCopies0 -- Ram,
1397    AccessMode = Cs#cstruct.access_mode,
1398    Copies = mnesia_lib:copy_holders(Cs),
1399    Masters = mnesia_recover:get_master_nodes(Tab),
1400    LocalMaster0 = lists:member(node(), Masters),
1401    LocalContent = Cs#cstruct.local_content,
1402    RemoteMaster =
1403	if
1404	    Masters == [] -> false;
1405	    true -> not LocalMaster0
1406	end,
1407    LocalMaster =
1408	if
1409	    Masters == [] -> false;
1410	    true -> LocalMaster0
1411	end,
1412    if
1413	Copies == [node()]  ->
1414	    %% Only one copy holder and it is local.
1415	    %% It may also be a local contents table
1416	    {true, {Tab, local_only}};
1417	LocalContent == true ->
1418	    {true, {Tab, local_content}};
1419	LocalMaster == true ->
1420	    %% We have a local master
1421	    {true, {Tab, local_master}};
1422	RemoteMaster == true ->
1423	    %% Wait for remote master copy
1424	    false;
1425	Storage == ram_copies ->
1426	    if
1427		Disc == [], DiscOnly == [] ->
1428		    %% Nobody has copy on disc
1429		    {true, {Tab, ram_only}};
1430		true ->
1431		    %% Some other node has copy on disc
1432		    false
1433	    end;
1434	AccessMode == read_only ->
1435	    %% No one has been able to update the table,
1436	    %% i.e. all disc resident copies are equal
1437	    {true, {Tab, read_only}};
1438	BetterCopies /= [], Masters /= [node()] ->
1439	    %% There are better copies on other nodes
1440	    %% and we do not have the only master copy
1441	    false;
1442	true ->
1443	    {true, {Tab, initial}}
1444    end.
1445
1446reconfigure_tables(N, State, [Tab |Tail]) ->
1447    del_active_replica(Tab, N),
1448    case val({Tab, where_to_read}) of
1449	N ->  mnesia_lib:set_remote_where_to_read(Tab);
1450	_ ->  ignore
1451    end,
1452    LateQ = drop_loaders(Tab, N, State#state.late_loader_queue),
1453    reconfigure_tables(N, State#state{late_loader_queue = LateQ}, Tail);
1454
1455reconfigure_tables(_, State, []) ->
1456    State.
1457
1458remove_early_messages([], _Node) ->
1459    [];
1460remove_early_messages([{call, {add_active_replica, [_, Node, _, _], _}, _}|R], Node) ->
1461    remove_early_messages(R, Node); %% Does a reply before queuing
1462remove_early_messages([{call, {block_table, _, From}, ReplyTo}|R], Node)
1463  when node(From) == Node ->
1464    reply(ReplyTo, ok),  %% Remove gen:server waits..
1465    remove_early_messages(R, Node);
1466remove_early_messages([{cast, {i_have_tab, _Tab, Node}}|R], Node) ->
1467    remove_early_messages(R, Node);
1468remove_early_messages([{cast, {adopt_orphans, Node, _Tabs}}|R], Node) ->
1469    remove_early_messages(R, Node);
1470remove_early_messages([M|R],Node) ->
1471    [M|remove_early_messages(R,Node)].
1472
1473%% Drop loader from late load queue and possibly trigger a disc_load
1474drop_loaders(Tab, Node, [H | T]) when H#late_load.table == Tab ->
1475    %% Check if it is time to issue a disc_load request
1476    case H#late_load.loaders of
1477	[Node] ->
1478	    Reason = {H#late_load.reason, last_loader_down, Node},
1479	    cast({disc_load, Tab, Reason});  % Ugly cast
1480	_ ->
1481	    ignore
1482    end,
1483    %% Drop the node from the list of loaders
1484    H2 = H#late_load{loaders = H#late_load.loaders -- [Node]},
1485    [H2 | drop_loaders(Tab, Node, T)];
1486drop_loaders(Tab, Node, [H | T]) ->
1487    [H | drop_loaders(Tab, Node, T)];
1488drop_loaders(_, _, []) ->
1489    [].
1490
1491add_active_replica(Tab, Node) ->
1492    add_active_replica(Tab, Node, val({Tab, cstruct})).
1493
1494add_active_replica(Tab, Node, Cs) when record(Cs, cstruct) ->
1495    Storage = mnesia_lib:schema_cs_to_storage_type(Node, Cs),
1496    AccessMode = Cs#cstruct.access_mode,
1497    add_active_replica(Tab, Node, Storage, AccessMode).
1498
1499%% Block table primitives
1500
1501block_table(Tab) ->
1502    Var = {Tab, where_to_commit},
1503    Old = val(Var),
1504    New = {blocked, Old},
1505    set(Var, New). % where_to_commit
1506
1507unblock_table(Tab) ->
1508    Var = {Tab, where_to_commit},
1509    New =
1510	case val(Var) of
1511	    {blocked, List} ->
1512		List;
1513	    List ->
1514		List
1515	end,
1516    set(Var, New). % where_to_commit
1517
1518is_tab_blocked(W2C) when list(W2C) ->
1519    {false, W2C};
1520is_tab_blocked({blocked, W2C}) when list(W2C) ->
1521    {true, W2C}.
1522
1523mark_blocked_tab(true, Value) ->
1524    {blocked, Value};
1525mark_blocked_tab(false, Value) ->
1526    Value.
1527
1528%%
1529
1530add_active_replica(Tab, Node, Storage, AccessMode) ->
1531    Var = {Tab, where_to_commit},
1532    {Blocked, Old} = is_tab_blocked(val(Var)),
1533    Del = lists:keydelete(Node, 1, Old),
1534    case AccessMode of
1535	read_write ->
1536	    New = lists:sort([{Node, Storage} | Del]),
1537	    set(Var, mark_blocked_tab(Blocked, New)), % where_to_commit
1538	    add({Tab, where_to_write}, Node);
1539	read_only ->
1540	    set(Var, mark_blocked_tab(Blocked, Del)),
1541	    mnesia_lib:del({Tab, where_to_write}, Node)
1542    end,
1543    add({Tab, active_replicas}, Node).
1544
1545del_active_replica(Tab, Node) ->
1546    Var = {Tab, where_to_commit},
1547    {Blocked, Old} = is_tab_blocked(val(Var)),
1548    Del = lists:keydelete(Node, 1, Old),
1549    New = lists:sort(Del),
1550    set(Var, mark_blocked_tab(Blocked, New)),      % where_to_commit
1551    mnesia_lib:del({Tab, active_replicas}, Node),
1552    mnesia_lib:del({Tab, where_to_write}, Node).
1553
1554change_table_access_mode(Cs) ->
1555    Tab = Cs#cstruct.name,
1556    lists:foreach(fun(N) -> add_active_replica(Tab, N, Cs) end,
1557		  val({Tab, active_replicas})).
1558
1559%% node To now has tab loaded, but this must be undone
1560%% This code is rpc:call'ed from the tab_copier process
1561%% when it has *not* released it's table lock
1562unannounce_add_table_copy(Tab, To) ->
1563    del_active_replica(Tab, To),
1564    case val({Tab , where_to_read}) of
1565	To ->
1566	    mnesia_lib:set_remote_where_to_read(Tab);
1567	_ ->
1568	    ignore
1569    end.
1570
1571user_sync_tab(Tab) ->
1572    case val(debug) of
1573	trace ->
1574	    mnesia_subscr:subscribe(whereis(mnesia_event), {table, Tab});
1575	_ ->
1576	    ignore
1577    end,
1578
1579    case erase({sync_tab, Tab}) of
1580	undefined ->
1581	    ok;
1582	Pids ->
1583	    lists:foreach(fun(Pid) -> sync_reply(Pid, Tab) end, Pids)
1584    end.
1585
1586i_have_tab(Tab) ->
1587    case val({Tab, local_content}) of
1588	true ->
1589	    mnesia_lib:set_local_content_whereabouts(Tab);
1590	false ->
1591	    set({Tab, where_to_read}, node())
1592    end,
1593    add_active_replica(Tab, node()).
1594
1595sync_and_block_table_whereabouts(Tab, ToNode, RemoteS, AccessMode) when Tab /= schema ->
1596    Current = val({current, db_nodes}),
1597    Ns =
1598	case lists:member(ToNode, Current) of
1599	    true -> Current -- [ToNode];
1600	    false -> Current
1601	end,
1602    remote_call(ToNode, block_table, [Tab]),
1603    [remote_call(Node, add_active_replica, [Tab, ToNode, RemoteS, AccessMode]) ||
1604	Node <- [ToNode | Ns]],
1605    ok.
1606
1607sync_del_table_copy_whereabouts(Tab, ToNode) when Tab /= schema ->
1608    Current = val({current, db_nodes}),
1609    Ns =
1610	case lists:member(ToNode, Current) of
1611	    true -> Current;
1612	    false -> [ToNode | Current]
1613	end,
1614    Args = [Tab, ToNode],
1615    [remote_call(Node, unannounce_add_table_copy, Args) || Node <- Ns],
1616    ok.
1617
1618get_info(Timeout) ->
1619    case whereis(?SERVER_NAME) of
1620	undefined ->
1621	    {timeout, Timeout};
1622	Pid ->
1623	    Pid ! {self(), get_state},
1624	    receive
1625		{?SERVER_NAME, State} when record(State, state) ->
1626		    {info,State}
1627	    after Timeout ->
1628		    {timeout, Timeout}
1629	    end
1630    end.
1631
1632get_workers(Timeout) ->
1633    case whereis(?SERVER_NAME) of
1634	undefined ->
1635	    {timeout, Timeout};
1636	Pid ->
1637	    Pid ! {self(), get_state},
1638	    receive
1639		{?SERVER_NAME, State} when record(State, state) ->
1640		    {workers, State#state.loader_pid, State#state.sender_pid, State#state.dumper_pid}
1641	    after Timeout ->
1642		    {timeout, Timeout}
1643	    end
1644    end.
1645
1646info() ->
1647    Tabs = mnesia_lib:local_active_tables(),
1648    io:format( "---> Active tables <--- ~n", []),
1649    info(Tabs).
1650
1651info([Tab | Tail]) ->
1652    case val({Tab, storage_type}) of
1653	disc_only_copies ->
1654	    info_format(Tab,
1655			dets:info(Tab, size),
1656			dets:info(Tab, file_size),
1657			"bytes on disc");
1658	_ ->
1659	    info_format(Tab,
1660			?ets_info(Tab, size),
1661			?ets_info(Tab, memory),
1662			"words of mem")
1663    end,
1664    info(Tail);
1665info([]) -> ok;
1666info(Tab) -> info([Tab]).
1667
1668info_format(Tab, Size, Mem, Media) ->
1669    StrT = mnesia_lib:pad_name(atom_to_list(Tab), 15, []),
1670    StrS = mnesia_lib:pad_name(integer_to_list(Size), 8, []),
1671    StrM = mnesia_lib:pad_name(integer_to_list(Mem), 8, []),
1672    io:format("~s: with ~s records occupying ~s ~s~n",
1673	      [StrT, StrS, StrM, Media]).
1674
1675%% Handle early arrived messages
1676handle_early_msgs([Msg | Msgs], State) ->
1677    %% The messages are in reverse order
1678    case handle_early_msg(Msg, State) of
1679        {stop, Reason, Reply, State2} ->
1680	    {stop, Reason, Reply, State2};
1681        {stop, Reason, State2} ->
1682	    {stop, Reason, State2};
1683	{noreply, State2} ->
1684	    handle_early_msgs(Msgs, State2);
1685	{noreply, State2, _Timeout} ->
1686	    handle_early_msgs(Msgs, State2);
1687	Else ->
1688	    dbg_out("handle_early_msgs case clause ~p ~n", [Else]),
1689	    erlang:error(Else, [[Msg | Msgs], State])
1690    end;
1691handle_early_msgs([], State) ->
1692    noreply(State).
1693
1694handle_early_msg({call, Msg, From}, State) ->
1695    handle_call(Msg, From, State);
1696handle_early_msg({cast, Msg}, State) ->
1697    handle_cast(Msg, State);
1698handle_early_msg({info, Msg}, State) ->
1699    handle_info(Msg, State).
1700
1701noreply(State) ->
1702    {noreply, State}.
1703
1704reply(undefined, Reply) ->
1705    Reply;
1706reply(ReplyTo, Reply) ->
1707    gen_server:reply(ReplyTo, Reply),
1708    Reply.
1709
1710%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1711%% Worker management
1712
1713%% Returns new State
1714add_worker(Worker, State) when record(Worker, dump_log) ->
1715    InitBy = Worker#dump_log.initiated_by,
1716    Queue = State#state.dumper_queue,
1717    case lists:keymember(InitBy, #dump_log.initiated_by, Queue) of
1718	false ->
1719	    ignore;
1720	true when Worker#dump_log.opt_reply_to == undefined ->
1721	    %% The same threshold has been exceeded again,
1722	    %% before we have had the possibility to
1723	    %% process the older one.
1724	    DetectedBy = {dump_log, InitBy},
1725	    Event = {mnesia_overload, DetectedBy},
1726	    mnesia_lib:report_system_event(Event)
1727    end,
1728    Queue2 = Queue ++ [Worker],
1729    State2 = State#state{dumper_queue = Queue2},
1730    opt_start_worker(State2);
1731add_worker(Worker, State) when record(Worker, schema_commit_lock) ->
1732    Queue = State#state.dumper_queue,
1733    Queue2 = Queue ++ [Worker],
1734    State2 = State#state{dumper_queue = Queue2},
1735    opt_start_worker(State2);
1736add_worker(Worker, State) when record(Worker, net_load) ->
1737    Queue = State#state.loader_queue,
1738    State2 = State#state{loader_queue = Queue ++ [Worker]},
1739    opt_start_worker(State2);
1740add_worker(Worker, State) when record(Worker, send_table) ->
1741    Queue = State#state.sender_queue,
1742    State2 = State#state{sender_queue = Queue ++ [Worker]},
1743    opt_start_worker(State2);
1744add_worker(Worker, State) when record(Worker, disc_load) ->
1745    Queue = State#state.loader_queue,
1746    State2 = State#state{loader_queue = Queue ++ [Worker]},
1747    opt_start_worker(State2);
1748% Block controller should be used for upgrading mnesia.
1749add_worker(Worker, State) when record(Worker, block_controller) ->
1750    Queue = State#state.dumper_queue,
1751    Queue2 = [Worker | Queue],
1752    State2 = State#state{dumper_queue = Queue2},
1753    opt_start_worker(State2).
1754
1755%% Optionally start a worker
1756%%
1757%% Dumpers and loaders may run simultaneously
1758%% but neither of them may run during schema commit.
1759%% Loaders may not start if a schema commit is enqueued.
1760opt_start_worker(State) when State#state.is_stopping == true ->
1761    State;
1762opt_start_worker(State) ->
1763    %% Prioritize dumper and schema commit
1764    %% by checking them first
1765    case State#state.dumper_queue of
1766	[Worker | _Rest] when State#state.dumper_pid == undefined ->
1767	    %% Great, a worker in queue and neither
1768	    %% a schema transaction is being
1769	    %% committed and nor a dumper is running
1770
1771	    %% Start worker but keep him in the queue
1772	    if
1773		record(Worker, schema_commit_lock) ->
1774		    ReplyTo = Worker#schema_commit_lock.owner,
1775		    reply(ReplyTo, granted),
1776		    {Owner, _Tag} = ReplyTo,
1777		    State#state{dumper_pid = Owner};
1778
1779		record(Worker, dump_log) ->
1780		    Pid = spawn_link(?MODULE, dump_and_reply, [self(), Worker]),
1781		    State2 = State#state{dumper_pid = Pid},
1782
1783		    %% If the worker was a dumper we may
1784		    %% possibly be able to start a loader
1785		    %% or sender
1786		    State3 = opt_start_sender(State2),
1787		    opt_start_loader(State3);
1788
1789		record(Worker, block_controller) ->
1790		    case {State#state.sender_pid, State#state.loader_pid} of
1791			{undefined, undefined} ->
1792			    ReplyTo = Worker#block_controller.owner,
1793			    reply(ReplyTo, granted),
1794			    {Owner, _Tag} = ReplyTo,
1795			    State#state{dumper_pid = Owner};
1796			_ ->
1797			    State
1798		    end
1799	    end;
1800	_ ->
1801	    %% Bad luck, try with a loader or sender instead
1802	    State2 = opt_start_sender(State),
1803	    opt_start_loader(State2)
1804    end.
1805
1806opt_start_sender(State) ->
1807    case State#state.sender_queue of
1808	[]->
1809	    %% No need
1810	    State;
1811
1812	_ when State#state.sender_pid /= undefined ->
1813	    %% Bad luck, a sender is already running
1814	    State;
1815
1816	[Sender | _SenderRest] ->
1817	    case State#state.loader_queue of
1818		[Loader | _LoaderRest]
1819		when State#state.loader_pid /= undefined,
1820		     Loader#net_load.table == Sender#send_table.table ->
1821		    %% A conflicting loader is running
1822		    State;
1823		_ ->
1824		    SchemaQueue = State#state.dumper_queue,
1825		    case lists:keymember(schema_commit, 1, SchemaQueue) of
1826			false ->
1827
1828			    %% Start worker but keep him in the queue
1829			    Pid = spawn_link(?MODULE, send_and_reply,
1830					     [self(), Sender]),
1831			    State#state{sender_pid = Pid};
1832			true ->
1833			    %% Bad luck, we must wait for the schema commit
1834			    State
1835		    end
1836	    end
1837    end.
1838
1839opt_start_loader(State) ->
1840    LoaderQueue = State#state.loader_queue,
1841    if
1842	LoaderQueue == [] ->
1843	    %% No need
1844	    State;
1845
1846	State#state.loader_pid /= undefined ->
1847	    %% Bad luck, an loader is already running
1848	    State;
1849
1850	true ->
1851	    SchemaQueue = State#state.dumper_queue,
1852	    case lists:keymember(schema_commit, 1, SchemaQueue) of
1853		false ->
1854		    {Worker, Rest} = pick_next(LoaderQueue),
1855
1856		    %% Start worker but keep him in the queue
1857		    Pid = spawn_link(?MODULE, load_and_reply, [self(), Worker]),
1858		    State#state{loader_pid = Pid,
1859				loader_queue = [Worker | Rest]};
1860		true ->
1861		    %% Bad luck, we must wait for the schema commit
1862		    State
1863	    end
1864    end.
1865
1866start_remote_sender(Node, Tab, Receiver, Storage) ->
1867    Msg = #send_table{table = Tab,
1868		      receiver_pid = Receiver,
1869		      remote_storage = Storage},
1870    gen_server:cast({?SERVER_NAME, Node}, Msg).
1871
1872dump_and_reply(ReplyTo, Worker) ->
1873    %% No trap_exit, die intentionally instead
1874    Res = mnesia_dumper:opt_dump_log(Worker#dump_log.initiated_by),
1875    ReplyTo ! #dumper_done{worker_pid = self(),
1876			   worker_res = Res},
1877    unlink(ReplyTo),
1878    exit(normal).
1879
1880send_and_reply(ReplyTo, Worker) ->
1881    %% No trap_exit, die intentionally instead
1882    Res = mnesia_loader:send_table(Worker#send_table.receiver_pid,
1883				   Worker#send_table.table,
1884				   Worker#send_table.remote_storage),
1885    ReplyTo ! #sender_done{worker_pid = self(),
1886			   worker_res = Res},
1887    unlink(ReplyTo),
1888    exit(normal).
1889
1890
1891load_and_reply(ReplyTo, Worker) ->
1892    process_flag(trap_exit, true),
1893    Done = load_table(Worker),
1894    ReplyTo ! Done#loader_done{worker_pid = self()},
1895    unlink(ReplyTo),
1896    exit(normal).
1897
1898%% Now it is time to load the table
1899%% but first we must check if it still is neccessary
1900load_table(Load) when record(Load, net_load) ->
1901    Tab = Load#net_load.table,
1902    ReplyTo = Load#net_load.opt_reply_to,
1903    Reason =  Load#net_load.reason,
1904    LocalC = val({Tab, local_content}),
1905    AccessMode = val({Tab, access_mode}),
1906    ReadNode = val({Tab, where_to_read}),
1907    Active = filter_active(Tab),
1908    Done = #loader_done{is_loaded = true,
1909			table_name = Tab,
1910			needs_announce = false,
1911			needs_sync = false,
1912			needs_reply = true,
1913			reply_to = ReplyTo,
1914			reply = {loaded, ok}
1915		       },
1916    if
1917	ReadNode == node() ->
1918	    %% Already loaded locally
1919	    Done;
1920	LocalC == true ->
1921	    Res = mnesia_loader:disc_load_table(Tab, load_local_content),
1922	    Done#loader_done{reply = Res, needs_announce = true, needs_sync = true};
1923	AccessMode == read_only ->
1924	    disc_load_table(Tab, Reason, ReplyTo);
1925	true ->
1926	    %% Either we cannot read the table yet
1927	    %% or someone is moving a replica between
1928	    %% two nodes
1929	    Cs =  Load#net_load.cstruct,
1930	    Res = mnesia_loader:net_load_table(Tab, Reason, Active, Cs),
1931	    case Res of
1932		{loaded, ok} ->
1933		    Done#loader_done{needs_sync = true,
1934				     reply = Res};
1935		{not_loaded, storage_unknown} ->
1936		    Done#loader_done{reply = Res};
1937		{not_loaded, _} ->
1938		    Done#loader_done{is_loaded = false,
1939				     needs_reply = false,
1940				     reply = Res}
1941	    end
1942    end;
1943
1944load_table(Load) when record(Load, disc_load) ->
1945    Tab = Load#disc_load.table,
1946    Reason =  Load#disc_load.reason,
1947    ReplyTo = Load#disc_load.opt_reply_to,
1948    ReadNode = val({Tab, where_to_read}),
1949    Active = filter_active(Tab),
1950    Done = #loader_done{is_loaded = true,
1951			table_name = Tab,
1952			needs_announce = false,
1953			needs_sync = false,
1954			needs_reply = false
1955		       },
1956    if
1957	Active == [], ReadNode == nowhere ->
1958	    %% Not loaded anywhere, lets load it from disc
1959	    disc_load_table(Tab, Reason, ReplyTo);
1960	ReadNode == nowhere ->
1961	    %% Already loaded on other node, lets get it
1962	    Cs = val({Tab, cstruct}),
1963	    case mnesia_loader:net_load_table(Tab, Reason, Active, Cs) of
1964		{loaded, ok} ->
1965		    Done#loader_done{needs_sync = true};
1966		{not_loaded, storage_unknown} ->
1967		    Done#loader_done{is_loaded = false};
1968		{not_loaded, ErrReason} ->
1969		    Done#loader_done{is_loaded = false,
1970				     reply = {not_loaded,ErrReason}}
1971	    end;
1972	true ->
1973	    %% Already readable, do not worry be happy
1974	    Done
1975    end.
1976
1977disc_load_table(Tab, Reason, ReplyTo) ->
1978    Done = #loader_done{is_loaded = true,
1979			table_name = Tab,
1980			needs_announce = false,
1981			needs_sync = false,
1982			needs_reply = true,
1983			reply_to = ReplyTo,
1984			reply = {loaded, ok}
1985		       },
1986    Res = mnesia_loader:disc_load_table(Tab, Reason),
1987    if
1988	Res == {loaded, ok} ->
1989	    Done#loader_done{needs_announce = true,
1990			     needs_sync = true,
1991			     reply = Res};
1992	ReplyTo /= undefined ->
1993	    Done#loader_done{is_loaded = false,
1994			     reply = Res};
1995	true ->
1996	    fatal("Cannot load table ~p from disc: ~p~n", [Tab, Res])
1997    end.
1998
1999filter_active(Tab) ->
2000    ByForce = val({Tab, load_by_force}),
2001    Active = val({Tab, active_replicas}),
2002    Masters = mnesia_recover:get_master_nodes(Tab),
2003    do_filter_active(ByForce, Active, Masters).
2004
2005do_filter_active(true, Active, _Masters) ->
2006    Active;
2007do_filter_active(false, Active, []) ->
2008    Active;
2009do_filter_active(false, Active, Masters) ->
2010    mnesia_lib:intersect(Active, Masters).
2011