1%%
2%% %CopyrightBegin%
3%%
4%% Copyright Ericsson AB 1996-2018. All Rights Reserved.
5%%
6%% Licensed under the Apache License, Version 2.0 (the "License");
7%% you may not use this file except in compliance with the License.
8%% You may obtain a copy of the License at
9%%
10%%     http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing, software
13%% distributed under the License is distributed on an "AS IS" BASIS,
14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15%% See the License for the specific language governing permissions and
16%% limitations under the License.
17%%
18%% %CopyrightEnd%
19%%
20
21%%
22%% The mnesia_init process loads tables from local disc or from
23%% another nodes. It also coordinates updates of the info about
24%% where we can read and write tables.
25%%
26%% Tables may need to be loaded initially at startup of the local
27%% node or when other nodes announces that they already have loaded
28%% tables that we also want.
29%%
30%% Initially we set the load request queue to those tables that we
31%% safely can load locally, i.e. tables where we have the last
32%% consistent replica and we have received mnesia_down from all
33%% other nodes holding the table. Then we let the mnesia_init
34%% process enter its normal working state.
35%%
36%% When we need to load a table we append a request to the load
37%% request queue. All other requests are regarded as high priority
38%% and are processed immediately (e.g. update table whereabouts).
39%% We processes the load request queue as a "background" job..
40
41-module(mnesia_controller).
42
43-behaviour(gen_server).
44
45%% Mnesia internal stuff
46-export([
47	 start/0,
48	 i_have_tab/1,
49	 info/0,
50	 get_info/1,
51	 get_workers/1,
52	 force_load_table/1,
53	 async_dump_log/1,
54	 sync_dump_log/1,
55	 snapshot_dcd/1,
56	 connect_nodes/1,
57         connect_nodes/2,
58	 wait_for_schema_commit_lock/0,
59	 release_schema_commit_lock/0,
60	 create_table/1,
61	 get_disc_copy/1,
62	 get_remote_cstructs/0,  % new function
63	 get_cstructs/0,         % old function
64	 sync_and_block_table_whereabouts/4,
65	 sync_del_table_copy_whereabouts/2,
66	 block_table/1,
67	 unblock_table/1,
68	 block_controller/0,
69	 unblock_controller/0,
70	 unannounce_add_table_copy/2,
71	 master_nodes_updated/2,
72	 mnesia_down/1,
73	 add_active_replica/2,
74	 add_active_replica/3,
75	 add_active_replica/4,
76	 update/1,
77	 change_table_access_mode/1,
78	 change_table_majority/1,
79	 del_active_replica/2,
80	 wait_for_tables/2,
81	 get_network_copy/3,
82	 merge_schema/0,
83	 start_remote_sender/4,
84	 schedule_late_disc_load/2
85	]).
86
87%% gen_server callbacks
88-export([init/1,
89	 handle_call/3,
90	 handle_cast/2,
91	 handle_info/2,
92	 terminate/2,
93	 code_change/3]).
94
95%% Module internal stuff
96-export([call/1,
97	 cast/1,
98	 dump_and_reply/2,
99	 load_and_reply/2,
100	 send_and_reply/2,
101	 wait_for_tables_init/2,
102	 connect_nodes2/3
103	]).
104
105-compile({no_auto_import,[error/2]}).
106
107-import(mnesia_lib, [set/2, add/2]).
108-import(mnesia_lib, [fatal/2, error/2, verbose/2, dbg_out/2]).
109
110-include("mnesia.hrl").
111
112-define(SERVER_NAME, ?MODULE).
113
114-record(state, {supervisor,
115		schema_is_merged = false,
116		early_msgs = [],
117		loader_pid = [],     %% Was Pid is now [{Pid,Work}|..]
118		loader_queue,        %% Was list is now gb_tree
119		sender_pid = [],     %% Was a pid or undef is now [{Pid,Work}|..]
120		sender_queue =  [],
121		late_loader_queue,   %% Was list is now gb_tree
122		dumper_pid,          %% Dumper or schema commit pid
123		dumper_queue = [],   %% Dumper or schema commit queue
124		others = [],         %% Processes that needs the copier_done msg
125		dump_log_timer_ref,
126		is_stopping = false
127	       }).
128%% Backwards Comp. Sender_pid is now a list of senders..
129get_senders(#state{sender_pid = Pids}) when is_list(Pids) -> Pids.
130%% Backwards Comp. loader_pid is now a list of loaders..
131get_loaders(#state{loader_pid = Pids}) when is_list(Pids) -> Pids.
132max_loaders() ->
133    case ?catch_val(no_table_loaders) of
134	{'EXIT', _} ->
135	    mnesia_lib:set(no_table_loaders,1),
136	    1;
137	Val -> Val
138    end.
139
140-record(schema_commit_lock, {owner}).
141-record(block_controller, {owner}).
142
143-record(dump_log, {initiated_by,
144		   opt_reply_to,
145		   operation = dump_log
146		  }).
147
148-record(net_load, {table,
149		   reason,
150		   opt_reply_to,
151		   cstruct = unknown
152		  }).
153
154-record(send_table, {table,
155		     receiver_pid,
156		     remote_storage
157		    }).
158
159-record(disc_load, {table,
160		    reason,
161		    opt_reply_to
162		   }).
163
164-record(late_load, {table,
165		    reason,
166		    opt_reply_to,
167		    loaders
168		   }).
169
170-record(loader_done, {worker_pid,
171		      is_loaded,
172		      table_name,
173		      needs_announce,
174		      needs_sync,
175		      needs_reply,
176		      reply_to,
177		      reply}).
178
179-record(sender_done, {worker_pid,
180		      worker_res,
181		      table_name
182		     }).
183
184-record(dumper_done, {worker_pid,
185		      worker_res
186		     }).
187
188%% Local function in order to avoid external function call
189val(Var) ->
190    case ?catch_val_and_stack(Var) of
191	{'EXIT', Stacktrace} -> mnesia_lib:other_val(Var, Stacktrace);
192	Value -> Value
193    end.
194
195start() ->
196    gen_server:start_link({local, ?SERVER_NAME}, ?MODULE, [self()],
197			  [{timeout, infinity}
198			   %% ,{debug, [trace]}
199			  ]).
200
201sync_dump_log(InitBy) ->
202    call({sync_dump_log, InitBy}).
203
204async_dump_log(InitBy) ->
205    ?SERVER_NAME ! {async_dump_log, InitBy},
206    ok.
207
208snapshot_dcd(Tables) when is_list(Tables) ->
209    case [T || T <- Tables,
210	       mnesia_lib:storage_type_at_node(node(), T) =/= disc_copies] of
211	[] ->
212	    call({snapshot_dcd, Tables});
213	BadTabs ->
214	    {error, {not_disc_copies, BadTabs}}
215    end.
216
217%% Wait for tables to be active
218%% If needed, we will wait for Mnesia to start
219%% If Mnesia stops, we will wait for Mnesia to restart
220%% We will wait even if the list of tables is empty
221%%
222wait_for_tables(Tabs, Timeout) when is_list(Tabs), Timeout == infinity ->
223    do_wait_for_tables(Tabs, Timeout);
224wait_for_tables(Tabs, Timeout) when is_list(Tabs),
225                                    is_integer(Timeout), Timeout >= 0 ->
226    do_wait_for_tables(Tabs, Timeout);
227wait_for_tables(Tabs, Timeout) ->
228    {error, {badarg, Tabs, Timeout}}.
229
230do_wait_for_tables(Tabs, 0) ->
231    reply_wait(Tabs);
232do_wait_for_tables(Tabs, Timeout) ->
233    Pid = spawn_link(?MODULE, wait_for_tables_init, [self(), Tabs]),
234    receive
235	{?SERVER_NAME, Pid, Res} ->
236	    Res;
237	{'EXIT', Pid, _} ->
238	    reply_wait(Tabs)
239    after Timeout ->
240	    unlink(Pid),
241	    exit(Pid, timeout),
242	    reply_wait(Tabs)
243    end.
244
245reply_wait(Tabs) ->
246    try mnesia_lib:active_tables() of
247	Active when is_list(Active) ->
248	    case Tabs -- Active of
249		[] ->
250		    ok;
251		BadTabs ->
252		    {timeout, BadTabs}
253	    end
254    catch exit:_ -> {error, {node_not_running, node()}}
255    end.
256
257wait_for_tables_init(From, Tabs) ->
258    process_flag(trap_exit, true),
259    Res = wait_for_init(From, Tabs, whereis(?SERVER_NAME)),
260    From ! {?SERVER_NAME, self(), Res},
261    unlink(From),
262    exit(normal).
263
264wait_for_init(From, Tabs, Init) ->
265    try link(Init) of
266	true when is_pid(Init) ->
267	    cast({sync_tabs, Tabs, self()}),
268	    rec_tabs(Tabs, Tabs, From, Init)
269    catch error:_ -> %% Mnesia is not started
270	    {error, {node_not_running, node()}}
271    end.
272
273sync_reply(Waiter, Tab) ->
274    Waiter ! {?SERVER_NAME, {tab_synced, Tab}}.
275
276rec_tabs([Tab | Tabs], AllTabs, From, Init) ->
277    receive
278	{?SERVER_NAME, {tab_synced, Tab}} ->
279	    rec_tabs(Tabs, AllTabs, From, Init);
280
281	{'EXIT', From, _} ->
282	    %% This will trigger an exit signal
283	    %% to mnesia_init
284	    exit(wait_for_tables_timeout);
285
286	{'EXIT', Init, _} ->
287	    %% Oops, mnesia_init stopped,
288	    exit(mnesia_stopped)
289    end;
290rec_tabs([], _, _, Init) ->
291    unlink(Init),
292    ok.
293
294get_remote_cstructs() ->
295    get_cstructs().  %% Sigh not forward compatible always check version
296
297%% Old function kept for backwards compatibility; converts cstructs before sending.
298get_cstructs() ->
299    {cstructs, Cstructs, Running} = call(get_cstructs),
300    Node = node(group_leader()),
301    {cstructs, mnesia_schema:normalize_cs(Cstructs, Node), Running}.
302
303update(Fun) ->
304    call({update,Fun}).
305
306
307mnesia_down(Node) ->
308    case whereis(?SERVER_NAME) of
309	undefined -> mnesia_monitor:mnesia_down(?SERVER_NAME, Node);
310	Pid -> gen_server:cast(Pid, {mnesia_down, Node})
311    end.
312
313wait_for_schema_commit_lock() ->
314    try
315	Pid = whereis(?SERVER_NAME),
316	link(Pid), %% Keep the link until release_schema_commit_lock
317	gen_server:call(Pid, wait_for_schema_commit_lock, infinity)
318    catch _:_ ->
319	    mnesia:abort({node_not_running, node()})
320    end.
321
322block_controller() ->
323    call(block_controller).
324
325unblock_controller() ->
326    cast(unblock_controller).
327
328release_schema_commit_lock() ->
329    cast({release_schema_commit_lock, self()}),
330    unlink(whereis(?SERVER_NAME)).
331
332%% Special for preparation of add table copy
333get_network_copy(Tid, Tab, Cs) ->
334%   We can't let the controller queue this one
335%   because that may cause a deadlock between schema_operations
336%   and initial tableloadings which both takes schema locks.
337%   But we have to get copier_done msgs when the other side
338%   goes down.
339    call({add_other, self()}),
340    Reason = {dumper,{add_table_copy, Tid}},
341    Work = #net_load{table = Tab,reason = Reason,cstruct = Cs},
342    %% I'll need this cause it's linked trough the subscriber
343    %% might be solved by using monitor in subscr instead.
344    process_flag(trap_exit, true),
345    Load = load_table_fun(Work),
346    Res = ?CATCH(Load()),
347    process_flag(trap_exit, false),
348    call({del_other, self()}),
349    case Res of
350 	#loader_done{is_loaded = true} ->
351 	    Tab = Res#loader_done.table_name,
352 	    case Res#loader_done.needs_announce of
353 		true ->
354 		    i_have_tab(Tab);
355 		false ->
356 		    ignore
357 	    end,
358 	    Res#loader_done.reply;
359	#loader_done{} ->
360 	    Res#loader_done.reply;
361 	Else ->
362 	    {not_loaded, Else}
363    end.
364
365%% This functions is invoked from the dumper
366%%
367%% There are two cases here:
368%% startup ->
369%%   no need for sync, since mnesia_controller not started yet
370%% schema_trans ->
371%%   already synced with mnesia_controller since the dumper
372%%   is syncronously started from mnesia_controller
373
374create_table(Tab) ->
375    {loaded, ok} = mnesia_loader:disc_load_table(Tab, {dumper,create_table}).
376
377get_disc_copy(Tab) ->
378    disc_load_table(Tab, {dumper,change_table_copy_type}, undefined).
379
380%% Returns ok instead of yes
381force_load_table(Tab) when is_atom(Tab), Tab /= schema ->
382    case ?catch_val({Tab, storage_type}) of
383	ram_copies ->
384	    do_force_load_table(Tab);
385	disc_copies ->
386	    do_force_load_table(Tab);
387	disc_only_copies ->
388	    do_force_load_table(Tab);
389        {ext, _, _} ->
390            do_force_load_table(Tab);
391	unknown ->
392	    set({Tab, load_by_force}, true),
393	    cast({force_load_updated, Tab}),
394	    wait_for_tables([Tab], infinity);
395	{'EXIT', _} ->
396	    {error, {no_exists, Tab}}
397    end;
398force_load_table(Tab) ->
399    {error, {bad_type, Tab}}.
400
401do_force_load_table(Tab) ->
402    Loaded = ?catch_val({Tab, load_reason}),
403    case Loaded of
404	unknown ->
405	    set({Tab, load_by_force}, true),
406	    mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user),
407	    wait_for_tables([Tab], infinity);
408	{'EXIT', _} ->
409	    set({Tab, load_by_force}, true),
410	    mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user),
411	    wait_for_tables([Tab], infinity);
412	_ ->
413	    ok
414    end.
415master_nodes_updated(schema, _Masters) ->
416    ignore;
417master_nodes_updated(Tab, Masters) ->
418    cast({master_nodes_updated, Tab, Masters}).
419
420schedule_late_disc_load(Tabs, Reason) ->
421    MsgTag = late_disc_load,
422    try_schedule_late_disc_load(Tabs, Reason, MsgTag).
423
424try_schedule_late_disc_load(Tabs, _Reason, MsgTag)
425  when Tabs == [], MsgTag /= schema_is_merged ->
426    ignore;
427try_schedule_late_disc_load(Tabs, Reason, MsgTag) ->
428    GetIntents =
429	fun() ->
430		Item = mnesia_late_disc_load,
431		Nodes = val({current, db_nodes}),
432		mnesia:lock({global, Item, Nodes}, write),
433		case multicall(Nodes -- [node()], disc_load_intents) of
434		    {Replies, []} ->
435			call({MsgTag, Tabs, Reason, Replies}),
436			done;
437		    {_, BadNodes} ->
438			%% Some nodes did not respond, lets try again
439			{retry, BadNodes}
440		end
441	end,
442    case mnesia:transaction(GetIntents) of
443	{atomic, done} ->
444	    done;
445	{atomic, {retry, BadNodes}} ->
446	    verbose("Retry late_load_tables because bad nodes: ~p~n",
447		    [BadNodes]),
448	    try_schedule_late_disc_load(Tabs, Reason, MsgTag);
449	{aborted, AbortReason} ->
450	    fatal("Cannot late_load_tables ~tp: ~tp~n",
451		  [[Tabs, Reason, MsgTag], AbortReason])
452    end.
453
454connect_nodes(Ns) ->
455    connect_nodes(Ns, fun default_merge/1).
456
457connect_nodes(Ns, UserFun) ->
458    case mnesia:system_info(is_running) of
459	no ->
460	    {error, {node_not_running, node()}};
461	yes ->
462	    Pid = spawn_link(?MODULE,connect_nodes2,[self(),Ns, UserFun]),
463	    receive
464		{?MODULE, Pid, Res, New} ->
465		    case Res of
466			ok ->
467			    mnesia_lib:add_list(extra_db_nodes, New),
468			    {ok, New};
469			{aborted, {throw, Str}} when is_list(Str) ->
470			    %%mnesia_recover:disconnect_nodes(New),
471			    {error, {merge_schema_failed, lists:flatten(Str)}};
472			Else ->
473			    {error, Else}
474		    end;
475		{'EXIT', Pid, Reason} ->
476		    {error, Reason}
477	    end
478    end.
479
480connect_nodes2(Father, Ns, UserFun) ->
481    Current = val({current, db_nodes}),
482    abcast([node()|Ns], {merging_schema, node()}),
483    {NewC, OldC} = mnesia_recover:connect_nodes(Ns),
484    Connected = NewC ++OldC,
485    New1 = mnesia_lib:intersect(Ns, Connected),
486    New = New1 -- Current,
487    process_flag(trap_exit, true),
488    Res = try_merge_schema(New, [], UserFun),
489    Msg = {schema_is_merged, [], late_merge, []},
490    _ = multicall([node()|Ns], Msg),
491    After = val({current, db_nodes}),
492    Father ! {?MODULE, self(), Res, mnesia_lib:intersect(Ns,After)},
493    unlink(Father),
494    ok.
495
496%% Merge the local schema with the schema on other nodes.
497%% But first we must let all processes that want to force
498%% load tables wait until the schema merge is done.
499
500merge_schema() ->
501    AllNodes = mnesia_lib:all_nodes(),
502    case try_merge_schema(AllNodes, [node()], fun default_merge/1) of
503	ok ->
504	    schema_is_merged();
505	{aborted, {throw, Str}} when is_list(Str) ->
506	    fatal("Failed to merge schema: ~s~n", [Str]);
507	Else ->
508	    fatal("Failed to merge schema: ~p~n", [Else])
509    end.
510
511default_merge(F) ->
512    F([]).
513
514try_merge_schema(Nodes, Told0, UserFun) ->
515    case mnesia_schema:merge_schema(UserFun) of
516	{atomic, not_merged} ->
517	    %% No more nodes that we need to merge the schema with
518	    %% Ensure we have told everybody that we are running
519	    case val({current,db_nodes}) -- mnesia_lib:uniq(Told0) of
520		[] ->  ok;
521		Tell ->
522		    im_running(Tell, [node()]),
523		    ok
524	    end;
525	{atomic, {merged, OldFriends, NewFriends}} ->
526	    %% Check if new nodes has been added to the schema
527	    Diff = mnesia_lib:all_nodes() -- [node() | Nodes],
528	    mnesia_recover:connect_nodes(Diff),
529
530	    %% Tell everybody to adopt orphan tables
531	    im_running(OldFriends, NewFriends),
532	    im_running(NewFriends, OldFriends),
533	    Told = case lists:member(node(), NewFriends) of
534		       true -> Told0 ++ OldFriends;
535		       false -> Told0 ++ NewFriends
536		   end,
537	    try_merge_schema(Nodes, Told, UserFun);
538	{atomic, {"Cannot get cstructs", Node, Reason}} ->
539	    dbg_out("Cannot get cstructs, Node ~p ~tp~n", [Node, Reason]),
540	    timer:sleep(300), % Avoid a endless loop look alike
541	    try_merge_schema(Nodes, Told0, UserFun);
542	{aborted, {shutdown, _}} ->  %% One of the nodes is going down
543	    timer:sleep(300), % Avoid a endless loop look alike
544	    try_merge_schema(Nodes, Told0, UserFun);
545	Other ->
546	    Other
547    end.
548
549im_running(OldFriends, NewFriends) ->
550    abcast(OldFriends, {im_running, node(), NewFriends}).
551
552schema_is_merged() ->
553    MsgTag = schema_is_merged,
554    SafeLoads = initial_safe_loads(),
555
556    %% At this point we do not know anything about
557    %% which tables that the other nodes already
558    %% has loaded and therefore we let the normal
559    %% processing of the loader_queue take care
560    %% of it, since we at that time point will
561    %% know the whereabouts. We rely on the fact
562    %% that all nodes tells each other directly
563    %% when they have loaded a table and are
564    %% willing to share it.
565
566    try_schedule_late_disc_load(SafeLoads, initial, MsgTag).
567
568
569cast(Msg) ->
570    case whereis(?SERVER_NAME) of
571	undefined -> ok;
572	Pid ->  gen_server:cast(Pid, Msg)
573    end.
574
575abcast(Nodes, Msg) ->
576    gen_server:abcast(Nodes, ?SERVER_NAME, Msg).
577
578call(Msg) ->
579    case whereis(?SERVER_NAME) of
580	undefined ->
581	    {error, {node_not_running, node()}};
582	Pid ->
583	    link(Pid),
584	    Res = gen_server:call(Pid, Msg, infinity),
585	    unlink(Pid),
586
587	    %% We get an exit signal if server dies
588            receive
589                {'EXIT', Pid, _Reason} ->
590                    {error, {node_not_running, node()}}
591            after 0 ->
592                    Res
593            end
594    end.
595
596remote_call(Node, Func, Args) ->
597    try gen_server:call({?MODULE, Node}, {Func, Args, self()}, infinity)
598    catch exit:Error -> {error, Error}
599    end.
600
601multicall(Nodes, Msg) ->
602    {Good, Bad} = gen_server:multi_call(Nodes, ?MODULE, Msg, infinity),
603    PatchedGood = [Reply || {_Node, Reply} <- Good],
604    {PatchedGood, Bad}.  %% Make the replies look like rpc:multicalls..
605%%    rpc:multicall(Nodes, ?MODULE, call, [Msg]).
606
607next_async_dump_log() ->
608    Interval = mnesia_monitor:get_env(dump_log_time_threshold),
609    Msg = {next_async_dump_log, time_threshold},
610    Ref = erlang:send_after(Interval, self(), Msg),
611    Ref.
612
613%%%----------------------------------------------------------------------
614%%% Callback functions from gen_server
615%%%----------------------------------------------------------------------
616
617%%----------------------------------------------------------------------
618%% Func: init/1
619%% Returns: {ok, State}          |
620%%          {ok, State, Timeout} |
621%%          {stop, Reason}
622%%----------------------------------------------------------------------
623init([Parent]) ->
624    process_flag(trap_exit, true),
625    mnesia_lib:verbose("~p starting: ~p~n", [?SERVER_NAME, self()]),
626
627    %% Handshake and initialize transaction recovery
628    %% for new nodes detected in the schema
629    All = mnesia_lib:all_nodes(),
630    Diff = All -- [node() | val(original_nodes)],
631    mnesia_lib:unset(original_nodes),
632    mnesia_recover:connect_nodes(Diff),
633
634    Ref = next_async_dump_log(),
635    mnesia_dumper:start_regulator(),
636
637    Empty = gb_trees:empty(),
638    {ok, #state{supervisor = Parent, dump_log_timer_ref = Ref,
639		loader_queue = Empty,
640		late_loader_queue = Empty}}.
641
642%%----------------------------------------------------------------------
643%% Func: handle_call/3
644%% Returns: {reply, Reply, State}          |
645%%          {reply, Reply, State, Timeout} |
646%%          {noreply, State}               |
647%%          {noreply, State, Timeout}      |
648%%          {stop, Reason, Reply, State}   | (terminate/2 is called)
649%%          {stop, Reason, Reply, State}     (terminate/2 is called)
650%%----------------------------------------------------------------------
651
652handle_call({sync_dump_log, InitBy}, From, State) ->
653    Worker = #dump_log{initiated_by = InitBy,
654		       opt_reply_to = From
655		      },
656    State2 = add_worker(Worker, State),
657    noreply(State2);
658
659handle_call({snapshot_dcd, Tables}, From, State) ->
660    Worker = #dump_log{initiated_by = user,
661		       opt_reply_to = From,
662		       operation = fun() ->
663					   mnesia_dumper:snapshot_dcd(Tables)
664				   end},
665    State2 = add_worker(Worker, State),
666    noreply(State2);
667
668handle_call(wait_for_schema_commit_lock, From, State) ->
669    Worker = #schema_commit_lock{owner = From},
670    State2 = add_worker(Worker, State),
671    noreply(State2);
672
673handle_call(block_controller, From, State) ->
674    Worker = #block_controller{owner = From},
675    State2 = add_worker(Worker, State),
676    noreply(State2);
677
678handle_call({update,Fun}, From, State) ->
679    Res = ?CATCH(Fun()),
680    reply(From, Res),
681    noreply(State);
682
683handle_call(get_cstructs, From, State) ->
684    Tabs = val({schema, tables}),
685    Cstructs = [val({T, cstruct}) || T <- Tabs],
686    Running = val({current, db_nodes}),
687    reply(From, {cstructs, Cstructs, Running}),
688    noreply(State);
689
690handle_call({schema_is_merged, [], late_merge, []}, From,
691	    State = #state{schema_is_merged = Merged}) ->
692    case Merged of
693	{false, Node} when Node == node(From) ->
694	    Msgs = State#state.early_msgs,
695	    State1 = State#state{early_msgs = [], schema_is_merged = true},
696	    handle_early_msgs(lists:reverse(Msgs), State1);
697	_ ->
698	    %% Ooops this came to early, before we have merged :-)
699	    %% or it came to late or from a node we don't care about
700	    reply(From, ignore),
701	    noreply(State)
702    end;
703
704handle_call({schema_is_merged, TabsR, Reason, RemoteLoaders}, From, State) ->
705    State2 = late_disc_load(TabsR, Reason, RemoteLoaders, From, State),
706
707    %% Handle early messages
708    Msgs = State2#state.early_msgs,
709    State3 = State2#state{early_msgs = [], schema_is_merged = true},
710    handle_early_msgs(lists:reverse(Msgs), State3);
711
712handle_call(disc_load_intents,From,State = #state{loader_queue=LQ,late_loader_queue=LLQ}) ->
713    LQTabs  = gb_trees:keys(LQ),
714    LLQTabs = gb_trees:keys(LLQ),
715    ActiveTabs = lists:sort(mnesia_lib:local_active_tables()),
716    reply(From, {ok, node(), ordsets:union([LQTabs,LLQTabs,ActiveTabs])}),
717    noreply(State);
718
719handle_call({update_where_to_write, [add, Tab, AddNode], _From}, _Dummy, State) ->
720    Current = val({current, db_nodes}),
721    Res =
722	case lists:member(AddNode, Current) and
723	    (State#state.schema_is_merged == true) of
724	    true ->
725		mnesia_lib:add_lsort({Tab, where_to_write}, AddNode),
726		update_where_to_wlock(Tab);
727	    false ->
728		ignore
729	end,
730    {reply, Res, State};
731
732handle_call({add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From},
733	    ReplyTo, State) ->
734    KnownNode = lists:member(ToNode, val({current, db_nodes})),
735    Merged = State#state.schema_is_merged,
736    if
737	KnownNode == false ->
738	    reply(ReplyTo, ignore),
739	    noreply(State);
740	Merged == true ->
741	    Res = case ?catch_val({Tab, cstruct}) of
742		      {'EXIT', _} ->  %% Tab deleted
743			  deleted;
744		      _ ->
745			  add_active_replica(Tab, ToNode, RemoteS, AccessMode)
746		  end,
747	    reply(ReplyTo, Res),
748	    noreply(State);
749	true -> %% Schema is not merged
750	    Msg = {add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From},
751	    Msgs = State#state.early_msgs,
752	    reply(ReplyTo, ignore),   %% Reply ignore and add data after schema merge
753	    noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]})
754    end;
755
756handle_call({unannounce_add_table_copy, [Tab, Node], From}, ReplyTo, State) ->
757    KnownNode = lists:member(node(From), val({current, db_nodes})),
758    Merged = State#state.schema_is_merged,
759    if
760	KnownNode == false ->
761	    reply(ReplyTo, ignore),
762	    noreply(State);
763	Merged == true ->
764	    Res = unannounce_add_table_copy(Tab, Node),
765	    reply(ReplyTo, Res),
766	    noreply(State);
767	true -> %% Schema is not merged
768	    Msg = {unannounce_add_table_copy, [Tab, Node], From},
769	    Msgs = State#state.early_msgs,
770	    reply(ReplyTo, ignore),   %% Reply ignore and add data after schema merge
771	    %% Set ReplyTO to undefined so we don't reply twice
772	    noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]})
773    end;
774
775handle_call(Msg, From, State) when State#state.schema_is_merged /= true ->
776    %% Buffer early messages
777    Msgs = State#state.early_msgs,
778    noreply(State#state{early_msgs = [{call, Msg, From} | Msgs]});
779
780handle_call({late_disc_load, Tabs, Reason, RemoteLoaders}, From, State) ->
781    State2 = late_disc_load(Tabs, Reason, RemoteLoaders, From, State),
782    noreply(State2);
783
784handle_call({unblock_table, Tab}, _Dummy, State) ->
785    Var = {Tab, where_to_commit},
786    case val(Var) of
787	{blocked, List} ->
788	    set(Var, List); % where_to_commit
789	_ ->
790	    ignore
791    end,
792    {reply, ok, State};
793
794handle_call({block_table, [Tab], From}, _Dummy, State) ->
795    case lists:member(node(From), val({current, db_nodes})) of
796	true ->
797	    block_table(Tab);
798	false ->
799	    ignore
800    end,
801    {reply, ok, State};
802
803handle_call({check_w2r, _Node, Tab}, _From, State) ->
804    {reply, val({Tab, where_to_read}), State};
805
806handle_call({add_other, Who}, _From, State = #state{others=Others0}) ->
807    Others = [Who|Others0],
808    {reply, ok, State#state{others=Others}};
809handle_call({del_other, Who}, _From, State = #state{others=Others0}) ->
810    Others = lists:delete(Who, Others0),
811    {reply, ok, State#state{others=Others}};
812
813handle_call(Msg, _From, State) ->
814    error("~p got unexpected call: ~tp~n", [?SERVER_NAME, Msg]),
815    noreply(State).
816
817late_disc_load(TabsR, Reason, RemoteLoaders, From,
818	       State = #state{loader_queue = LQ, late_loader_queue = LLQ}) ->
819    verbose("Intend to load tables: ~tp~n", [TabsR]),
820    ?eval_debug_fun({?MODULE, late_disc_load},
821		    [{tabs, TabsR},
822		     {reason, Reason},
823		     {loaders, RemoteLoaders}]),
824
825    reply(From, queued),
826    %% RemoteLoaders is a list of {ok, Node, Tabs} tuples
827
828    %% Remove deleted tabs and queued/loaded
829    LocalTabs = gb_sets:from_ordset(lists:sort(mnesia_lib:val({schema,local_tables}))),
830    Filter = fun(TabInfo0, Acc) ->
831		     TabInfo = {Tab,_} =
832			 case TabInfo0 of
833			     {_,_} -> TabInfo0;
834			     TabN -> {TabN,Reason}
835			 end,
836		     case gb_sets:is_member(Tab, LocalTabs) of
837			 true ->
838			     case ?catch_val({Tab, where_to_read}) == node() of
839				 true -> Acc;
840				 false ->
841				     case gb_trees:is_defined(Tab,LQ) of
842					 true ->  Acc;
843					 false -> [TabInfo | Acc]
844				     end
845			     end;
846			 false -> Acc
847		     end
848	     end,
849
850    Tabs = lists:foldl(Filter, [], TabsR),
851
852    Nodes = val({current, db_nodes}),
853    LateQueue = late_loaders(Tabs, RemoteLoaders, Nodes, LLQ),
854    State#state{late_loader_queue = LateQueue}.
855
856late_loaders([{Tab, Reason} | Tabs], RemoteLoaders, Nodes, LLQ) ->
857    case gb_trees:is_defined(Tab, LLQ) of
858	false ->
859	    LoadNodes = late_load_filter(RemoteLoaders, Tab, Nodes, []),
860	    case LoadNodes of
861		[] ->  cast({disc_load, Tab, Reason}); % Ugly cast
862		_ ->   ignore
863	    end,
864	    LateLoad = #late_load{table=Tab,loaders=LoadNodes,reason=Reason},
865	    late_loaders(Tabs, RemoteLoaders, Nodes, gb_trees:insert(Tab,LateLoad,LLQ));
866	true ->
867	    late_loaders(Tabs, RemoteLoaders, Nodes, LLQ)
868    end;
869late_loaders([], _RemoteLoaders, _Nodes, LLQ) ->
870    LLQ.
871
872late_load_filter([{error, _} | RemoteLoaders], Tab, Nodes, Acc) ->
873    late_load_filter(RemoteLoaders, Tab, Nodes, Acc);
874late_load_filter([{badrpc, _} | RemoteLoaders], Tab, Nodes, Acc) ->
875    late_load_filter(RemoteLoaders, Tab, Nodes, Acc);
876late_load_filter([RL | RemoteLoaders], Tab, Nodes, Acc) ->
877    {ok, Node, Intents} = RL,
878    Access = val({Tab, access_mode}),
879    LocalC = val({Tab, local_content}),
880    StillActive = lists:member(Node, Nodes),
881    RemoteIntent = lists:member(Tab, Intents),
882    if
883	Access == read_write,
884	LocalC == false,
885	StillActive == true,
886	RemoteIntent == true ->
887	    Masters = mnesia_recover:get_master_nodes(Tab),
888	    case lists:member(Node, Masters) of
889		true ->
890		    %% The other node is master node for
891		    %% the table, accept his load intent
892		    late_load_filter(RemoteLoaders, Tab, Nodes, [Node | Acc]);
893		false when Masters == [] ->
894		    %% The table has no master nodes
895		    %% accept his load intent
896		    late_load_filter(RemoteLoaders, Tab, Nodes, [Node | Acc]);
897		false ->
898		    %% Some one else is master node for
899		    %% the table, ignore his load intent
900		    late_load_filter(RemoteLoaders, Tab, Nodes, Acc)
901	    end;
902	true ->
903	    late_load_filter(RemoteLoaders, Tab, Nodes, Acc)
904    end;
905late_load_filter([], _Tab, _Nodes, Acc) ->
906    Acc.
907
908%%----------------------------------------------------------------------
909%% Func: handle_cast/2
910%% Returns: {noreply, State}          |
911%%          {noreply, State, Timeout} |
912%%          {stop, Reason, State}            (terminate/2 is called)
913%%----------------------------------------------------------------------
914
915handle_cast({release_schema_commit_lock, _Owner}, State) ->
916    if
917	State#state.is_stopping == true ->
918	    {stop, shutdown, State};
919	true ->
920	    case State#state.dumper_queue of
921		[#schema_commit_lock{}|Rest] ->
922		    [_Worker | Rest] = State#state.dumper_queue,
923		    State2 = State#state{dumper_pid = undefined,
924					 dumper_queue = Rest},
925		    State3 = opt_start_worker(State2),
926		    noreply(State3);
927		_ ->
928		    noreply(State)
929	    end
930    end;
931
932handle_cast(unblock_controller, State) ->
933    if
934	State#state.is_stopping == true ->
935	    {stop, shutdown, State};
936	is_record(hd(State#state.dumper_queue), block_controller) ->
937	    [_Worker | Rest] = State#state.dumper_queue,
938	    State2 = State#state{dumper_pid = undefined,
939				 dumper_queue = Rest},
940	    State3 = opt_start_worker(State2),
941	    noreply(State3)
942    end;
943
944handle_cast({mnesia_down, Node}, State) ->
945    maybe_log_mnesia_down(Node),
946    mnesia_lib:del({current, db_nodes}, Node),
947    mnesia_lib:unset({node_up, Node}),
948    mnesia_checkpoint:tm_mnesia_down(Node),
949    Alltabs = val({schema, tables}),
950    reconfigure_tables(Node, Alltabs),
951    %% Done from (external point of view)
952    mnesia_monitor:mnesia_down(?SERVER_NAME, Node),
953
954    %% Fix if we are late_merging against the node that went down
955    case State#state.schema_is_merged of
956	{false, Node} ->
957	    spawn(?MODULE, call, [{schema_is_merged, [], late_merge, []}]);
958	_ ->
959	    ignore
960    end,
961
962    %% Fix internal stuff
963    LateQ = remove_loaders(Alltabs, Node, State#state.late_loader_queue),
964
965    case get_senders(State) ++ get_loaders(State) of
966	[] -> ignore;
967	Senders ->
968	    lists:foreach(fun({Pid,_}) -> Pid ! {copier_done, Node} end,
969			  Senders)
970    end,
971    lists:foreach(fun(Pid) -> Pid ! {copier_done,Node} end,
972		  State#state.others),
973
974    Remove = fun(ST) ->
975		     node(ST#send_table.receiver_pid) /= Node
976	     end,
977    NewSenders = lists:filter(Remove, State#state.sender_queue),
978    Early = remove_early_messages(State#state.early_msgs, Node),
979    noreply(State#state{sender_queue = NewSenders,
980			early_msgs = Early,
981			late_loader_queue = LateQ
982		       });
983
984handle_cast({merging_schema, Node}, State) ->
985    case State#state.schema_is_merged of
986	false ->
987	    %% This comes from dynamic connect_nodes which are made
988	    %% after mnesia:start() and the schema_merge.
989	    ImANewKidInTheBlock =
990		(val({schema, storage_type}) == ram_copies)
991		andalso (mnesia_lib:val({schema, local_tables}) == [schema]),
992	    case ImANewKidInTheBlock of
993		true ->  %% I'm newly started ram_node..
994		    noreply(State#state{schema_is_merged = {false, Node}});
995		false ->
996		    noreply(State)
997	    end;
998	_ -> %% Already merging schema.
999	    noreply(State)
1000    end;
1001
1002handle_cast(Msg, State) when State#state.schema_is_merged /= true ->
1003    %% Buffer early messages
1004    Msgs = State#state.early_msgs,
1005    noreply(State#state{early_msgs = [{cast, Msg} | Msgs]});
1006
1007%% This must be done after schema_is_merged otherwise adopt_orphan
1008%% might trigger a table load from wrong nodes as a result of that we don't
1009%% know which tables we can load safly first.
1010handle_cast({im_running, Node, NewFriends}, State) ->
1011    LocalTabs = mnesia_lib:local_active_tables() -- [schema],
1012    RemoveLocalOnly = fun(Tab) -> not val({Tab, local_content}) end,
1013    Tabs = lists:filter(RemoveLocalOnly, LocalTabs),
1014    Nodes = mnesia_lib:union([Node],val({current, db_nodes})),
1015    Ns = mnesia_lib:intersect(NewFriends, Nodes),
1016    abcast(Ns, {adopt_orphans, node(), Tabs}),
1017    noreply(State);
1018
1019handle_cast({disc_load, Tab, Reason}, State) ->
1020    Worker = #disc_load{table = Tab, reason = Reason},
1021    State2 = add_worker(Worker, State),
1022    noreply(State2);
1023
1024handle_cast(Worker = #send_table{}, State) ->
1025    State2 = add_worker(Worker, State),
1026    noreply(State2);
1027
1028handle_cast({sync_tabs, Tabs, From}, State) ->
1029    %% user initiated wait_for_tables
1030    handle_sync_tabs(Tabs, From),
1031    noreply(State);
1032
1033handle_cast({i_have_tab, Tab, Node}, State) ->
1034    case lists:member(Node, val({current, db_nodes})) of
1035	true ->
1036	    State2 = node_has_tabs([Tab], Node, State),
1037	    noreply(State2);
1038	false ->
1039	    noreply(State)
1040    end;
1041
1042handle_cast({force_load_updated, Tab}, State) ->
1043    case val({Tab, active_replicas}) of
1044	[] ->
1045	    %% No valid replicas
1046	    noreply(State);
1047	[SomeNode | _] ->
1048	    State2 = node_has_tabs([Tab], SomeNode, State),
1049	    noreply(State2)
1050    end;
1051
1052handle_cast({master_nodes_updated, Tab, Masters}, State) ->
1053    Active = val({Tab, active_replicas}),
1054    Valid =
1055	case val({Tab, load_by_force}) of
1056	    true ->
1057		Active;
1058	    false ->
1059		if
1060		    Masters == [] ->
1061			Active;
1062		    true ->
1063			mnesia_lib:intersect(Masters, Active)
1064		end
1065	end,
1066    case Valid of
1067	[] ->
1068	    %% No valid replicas
1069	    noreply(State);
1070	[SomeNode | _] ->
1071	    State2 = node_has_tabs([Tab], SomeNode, State),
1072	    noreply(State2)
1073    end;
1074
1075handle_cast({adopt_orphans, Node, Tabs}, State) ->
1076    State2 = node_has_tabs(Tabs, Node, State),
1077
1078    case ?catch_val({node_up,Node}) of
1079	true -> ignore;
1080	_ ->
1081	    %% Register the other node as up and running
1082	    set({node_up, Node}, true),
1083	    mnesia_recover:log_mnesia_up(Node),
1084	    verbose("Logging mnesia_up ~w~n",[Node]),
1085	    mnesia_lib:report_system_event({mnesia_up, Node}),
1086	    %% Load orphan tables
1087	    LocalTabs = val({schema, local_tables}) -- [schema],
1088	    Nodes = val({current, db_nodes}),
1089	    {LocalOrphans, RemoteMasters} =
1090		orphan_tables(LocalTabs, Node, Nodes, [], []),
1091	    Reason = {adopt_orphan, node()},
1092	    mnesia_late_loader:async_late_disc_load(node(), LocalOrphans, Reason),
1093
1094	    Fun =
1095		fun(N) ->
1096			RemoteOrphans =
1097			    [Tab || {Tab, Ns} <- RemoteMasters,
1098				    lists:member(N, Ns)],
1099			mnesia_late_loader:maybe_async_late_disc_load(N, RemoteOrphans, Reason)
1100		end,
1101	    lists:foreach(Fun, Nodes)
1102    end,
1103    noreply(State2);
1104
1105handle_cast(Msg, State) ->
1106    error("~p got unexpected cast: ~tp~n", [?SERVER_NAME, Msg]),
1107    noreply(State).
1108
1109handle_sync_tabs([Tab | Tabs], From) ->
1110    case val({Tab, where_to_read}) of
1111	nowhere ->
1112	    case get({sync_tab, Tab}) of
1113		undefined ->
1114		    put({sync_tab, Tab}, [From]);
1115		Pids ->
1116		    put({sync_tab, Tab}, [From | Pids])
1117	    end;
1118	_ ->
1119	    sync_reply(From, Tab)
1120    end,
1121    handle_sync_tabs(Tabs, From);
1122handle_sync_tabs([], _From) ->
1123    ok.
1124
1125%%----------------------------------------------------------------------
1126%% Func: handle_info/2
1127%% Returns: {noreply, State}          |
1128%%          {noreply, State, Timeout} |
1129%%          {stop, Reason, State}            (terminate/2 is called)
1130%%----------------------------------------------------------------------
1131
1132handle_info({next_async_dump_log, InitBy}, State) ->
1133    async_dump_log(InitBy),
1134    Ref = next_async_dump_log(),
1135    noreply(State#state{dump_log_timer_ref=Ref});
1136
1137handle_info({async_dump_log, InitBy}, State) ->
1138    Worker = #dump_log{initiated_by = InitBy},
1139    State2 = add_worker(Worker, State),
1140    noreply(State2);
1141
1142handle_info(#dumper_done{worker_pid=Pid, worker_res=Res}, State) ->
1143    if
1144	State#state.is_stopping == true ->
1145	    {stop, shutdown, State};
1146	Res == dumped, Pid == State#state.dumper_pid ->
1147	    [Worker | Rest] = State#state.dumper_queue,
1148	    reply(Worker#dump_log.opt_reply_to, Res),
1149	    State2 = State#state{dumper_pid = undefined,
1150				 dumper_queue = Rest},
1151	    State3 = opt_start_worker(State2),
1152	    noreply(State3);
1153	true ->
1154	    fatal("Dumper failed: ~p~n state: ~tp~n", [Res, State]),
1155	    {stop, fatal, State}
1156    end;
1157
1158handle_info(Done = #loader_done{worker_pid=WPid, table_name=Tab}, State0) ->
1159    LateQueue0 = State0#state.late_loader_queue,
1160    State1 = State0#state{loader_pid = lists:keydelete(WPid,1,get_loaders(State0))},
1161
1162    State2 =
1163	case Done#loader_done.is_loaded of
1164	    true ->
1165		%% Optional table announcement
1166		if
1167		    Done#loader_done.needs_announce == true,
1168		    Done#loader_done.needs_reply == true ->
1169			i_have_tab(Tab),
1170			%% Should be {dumper,{add_table_copy, _}} only
1171			reply(Done#loader_done.reply_to,
1172			      Done#loader_done.reply);
1173		    Done#loader_done.needs_reply == true ->
1174			%% Should be {dumper,{add_table_copy,_}} only
1175			reply(Done#loader_done.reply_to,
1176			      Done#loader_done.reply);
1177		    Done#loader_done.needs_announce == true, Tab == schema ->
1178			i_have_tab(Tab);
1179		    Done#loader_done.needs_announce == true ->
1180			i_have_tab(Tab),
1181			%% Local node needs to perform user_sync_tab/1
1182			Ns = val({current, db_nodes}),
1183			abcast(Ns, {i_have_tab, Tab, node()});
1184		    Tab == schema ->
1185			ignore;
1186		    true ->
1187			%% Local node needs to perform user_sync_tab/1
1188			Ns = val({current, db_nodes}),
1189			AlreadyKnows = val({Tab, active_replicas}),
1190			abcast(Ns -- AlreadyKnows, {i_have_tab, Tab, node()})
1191		end,
1192		%% Optional user sync
1193		case Done#loader_done.needs_sync of
1194		    true -> user_sync_tab(Tab);
1195		    false -> ignore
1196		end,
1197		State1#state{late_loader_queue=gb_trees:delete_any(Tab, LateQueue0)};
1198	    false ->
1199		%% Either the node went down or table was not
1200		%% loaded remotly yet
1201		case Done#loader_done.needs_reply of
1202		    true ->
1203			reply(Done#loader_done.reply_to,
1204			      Done#loader_done.reply);
1205		    false ->
1206			ignore
1207		end,
1208		case ?catch_val({Tab, active_replicas}) of
1209		    [_|_] -> % still available elsewhere
1210			{value,{_,Worker}} = lists:keysearch(WPid,1,get_loaders(State0)),
1211			add_loader(Tab,Worker,State1);
1212		    _ ->
1213			DelState = State1#state{late_loader_queue=gb_trees:delete_any(Tab, LateQueue0)},
1214			case ?catch_val({Tab, storage_type}) of
1215			    ram_copies ->
1216				cast({disc_load, Tab, ram_only}),
1217				DelState;
1218			    _ ->
1219				DelState
1220			end
1221		end
1222	end,
1223    State3 = opt_start_worker(State2),
1224    noreply(State3);
1225
1226handle_info(#sender_done{worker_pid=Pid, worker_res=Res}, State)  ->
1227    Senders = get_senders(State),
1228    {value, {Pid,_Worker}} = lists:keysearch(Pid, 1, Senders),
1229    if
1230	Res == ok ->
1231	    State2 = State#state{sender_pid = lists:keydelete(Pid, 1, Senders)},
1232	    State3 = opt_start_worker(State2),
1233	    noreply(State3);
1234	true ->
1235	    %% No need to send any message to the table receiver
1236	    %% since it will soon get a mnesia_down anyway
1237	    fatal("Sender failed: ~p~n state: ~tp~n", [Res, State]),
1238	    {stop, fatal, State}
1239    end;
1240
1241handle_info({'EXIT', Pid, R}, State) when Pid == State#state.supervisor ->
1242    ?SAFE(set(mnesia_status, stopping)),
1243    case State#state.dumper_pid of
1244	undefined ->
1245	    dbg_out("~p was ~tp~n", [?SERVER_NAME, R]),
1246	    {stop, shutdown, State};
1247	_ ->
1248	    noreply(State#state{is_stopping = true})
1249    end;
1250
1251handle_info({'EXIT', Pid, R}, State) when Pid == State#state.dumper_pid ->
1252    case State#state.dumper_queue of
1253	[#schema_commit_lock{}|Workers] -> %% Schema trans crashed or was killed
1254	    dbg_out("WARNING: Dumper ~p exited ~tp~n", [Pid, R]),
1255	    State2 = State#state{dumper_queue = Workers, dumper_pid = undefined},
1256	    State3 = opt_start_worker(State2),
1257	    noreply(State3);
1258	_Other ->
1259	    fatal("Dumper or schema commit crashed: ~p~n state: ~tp~n", [R, State]),
1260	    {stop, fatal, State}
1261    end;
1262
1263handle_info(Msg = {'EXIT', Pid, R}, State) when R /= wait_for_tables_timeout ->
1264    case lists:keymember(Pid, 1, get_senders(State)) of
1265	true ->
1266	    %% No need to send any message to the table receiver
1267	    %% since it will soon get a mnesia_down anyway
1268	    fatal("Sender crashed: ~p~n state: ~tp~n", [{Pid,R}, State]),
1269	    {stop, fatal, State};
1270	false ->
1271	    case lists:keymember(Pid, 1, get_loaders(State)) of
1272		true ->
1273		    fatal("Loader crashed: ~p~n state: ~tp~n", [R, State]),
1274		    {stop, fatal, State};
1275		false ->
1276		    error("~p got unexpected info: ~tp~n", [?SERVER_NAME, Msg]),
1277		    noreply(State)
1278	    end
1279    end;
1280
1281handle_info({From, get_state}, State) ->
1282    From ! {?SERVER_NAME, State},
1283    noreply(State);
1284
1285%% No real need for buffering
1286handle_info(Msg, State) when State#state.schema_is_merged /= true ->
1287    %% Buffer early messages
1288    Msgs = State#state.early_msgs,
1289    noreply(State#state{early_msgs = [{info, Msg} | Msgs]});
1290
1291handle_info({'EXIT', Pid, wait_for_tables_timeout}, State) ->
1292    sync_tab_timeout(Pid, get()),
1293    noreply(State);
1294
1295handle_info(Msg, State) ->
1296    error("~p got unexpected info: ~tp~n", [?SERVER_NAME, Msg]),
1297    noreply(State).
1298
1299sync_tab_timeout(Pid, [{{sync_tab, Tab}, Pids} | Tail]) ->
1300    case lists:delete(Pid, Pids) of
1301	[] ->
1302	    erase({sync_tab, Tab});
1303	Pids2 ->
1304	    put({sync_tab, Tab}, Pids2)
1305    end,
1306    sync_tab_timeout(Pid, Tail);
1307sync_tab_timeout(Pid, [_ | Tail]) ->
1308    sync_tab_timeout(Pid, Tail);
1309sync_tab_timeout(_Pid, []) ->
1310    ok.
1311
1312%% Pick the load record that has the highest load order
1313%% Returns {BestLoad, RemainingQueue} or {none, []} if queue is empty
1314pick_next(Queue) ->
1315    List = gb_trees:values(Queue),
1316    case pick_next(List, none, none) of
1317	none -> {none, gb_trees:empty()};
1318	{Tab, Worker} -> {Worker, gb_trees:delete(Tab,Queue)}
1319    end.
1320
1321pick_next([Head = #net_load{table=Tab}| Tail], Load, Order) ->
1322    select_best(Head, Tail, ?catch_val({Tab, load_order}), Load, Order);
1323pick_next([Head = #disc_load{table=Tab}| Tail], Load, Order) ->
1324    select_best(Head, Tail, ?catch_val({Tab, load_order}), Load, Order);
1325pick_next([], none, _Order) ->
1326    none;
1327pick_next([], Load, _Order) ->
1328    {element(2,Load), Load}.
1329
1330select_best(_Head, Tail, {'EXIT', _WHAT}, Load, Order) ->
1331    %% Table have been deleted drop it.
1332    pick_next(Tail, Load, Order);
1333select_best(Load, Tail, Order, none, none) ->
1334    pick_next(Tail, Load, Order);
1335select_best(Load, Tail, Order, _OldLoad, OldOrder) when Order > OldOrder ->
1336    pick_next(Tail, Load, Order);
1337select_best(_Load, Tail, _Order, OldLoad, OldOrder) ->
1338    pick_next(Tail, OldLoad, OldOrder).
1339
1340%%----------------------------------------------------------------------
1341%% Func: terminate/2
1342%% Purpose: Shutdown the server
1343%% Returns: any (ignored by gen_server)
1344%%----------------------------------------------------------------------
1345terminate(Reason, State) ->
1346    mnesia_monitor:terminate_proc(?SERVER_NAME, Reason, State).
1347
1348%%----------------------------------------------------------------------
1349%% Func: code_change/3
1350%% Purpose: Upgrade process when its code is to be changed
1351%% Returns: {ok, NewState}
1352%%----------------------------------------------------------------------
1353code_change(_OldVsn, State0, _Extra) ->
1354    %% Loader Queue
1355    State1 = case State0#state.loader_pid of
1356		 Pids when is_list(Pids) -> State0;
1357		 undefined -> State0#state{loader_pid = [],loader_queue=gb_trees:empty()};
1358		 Pid when is_pid(Pid) ->
1359		     [Loader|Rest] = State0#state.loader_queue,
1360		     LQ0 = [{element(2,Rec),Rec} || Rec <- Rest],
1361		     LQ1 = lists:sort(LQ0),
1362		     LQ  = gb_trees:from_orddict(LQ1),
1363		     State0#state{loader_pid=[{Pid,Loader}], loader_queue=LQ}
1364	     end,
1365    %% LateLoaderQueue
1366    State = if is_list(State1#state.late_loader_queue) ->
1367		    LLQ0 = State1#state.late_loader_queue,
1368		    LLQ1 = lists:sort([{element(2,Rec),Rec} || Rec <- LLQ0]),
1369		    LLQ  = gb_trees:from_orddict(LLQ1),
1370		    State1#state{late_loader_queue=LLQ};
1371	       true ->
1372		    State1
1373	    end,
1374    {ok, State}.
1375
1376%%%----------------------------------------------------------------------
1377%%% Internal functions
1378%%%----------------------------------------------------------------------
1379
1380maybe_log_mnesia_down(N) ->
1381    %% We use mnesia_down when deciding which tables to load locally,
1382    %% so if we are not running (i.e haven't decided which tables
1383    %% to load locally), don't log mnesia_down yet.
1384    case mnesia_lib:is_running() of
1385	yes ->
1386	    verbose("Logging mnesia_down ~w~n", [N]),
1387	    mnesia_recover:log_mnesia_down(N),
1388	    ok;
1389	_ ->
1390	    Filter = fun(Tab) ->
1391			     inactive_copy_holders(Tab, N)
1392		     end,
1393	    HalfLoadedTabs = lists:any(Filter, val({schema, local_tables}) -- [schema]),
1394	    if
1395		HalfLoadedTabs == true ->
1396		    verbose("Logging mnesia_down ~w~n", [N]),
1397		    mnesia_recover:log_mnesia_down(N),
1398		    ok;
1399		true ->
1400		    %% Unfortunately we have not loaded some common
1401		    %% tables yet, so we cannot rely on the nodedown
1402		    log_later   %% BUGBUG handle this case!!!
1403	    end
1404    end.
1405
1406inactive_copy_holders(Tab, Node) ->
1407    Cs = val({Tab, cstruct}),
1408    case mnesia_lib:cs_to_storage_type(Node, Cs) of
1409	unknown ->
1410	    false;
1411	_Storage ->
1412	    mnesia_lib:not_active_here(Tab)
1413    end.
1414
1415orphan_tables([Tab | Tabs], Node, Ns, Local, Remote) ->
1416    Cs = val({Tab, cstruct}),
1417    CopyHolders = mnesia_lib:copy_holders(Cs),
1418    RamCopyHolders = Cs#cstruct.ram_copies,
1419    DiscCopyHolders = CopyHolders -- RamCopyHolders,
1420    DiscNodes = val({schema, disc_copies}),
1421    LocalContent = Cs#cstruct.local_content,
1422    RamCopyHoldersOnDiscNodes = mnesia_lib:intersect(RamCopyHolders, DiscNodes),
1423    Active = val({Tab, active_replicas}),
1424    BeingCreated = (?catch_val({Tab, create_table}) == true),
1425    Read = val({Tab, where_to_read}),
1426    case lists:member(Node, DiscCopyHolders) of
1427	_ when BeingCreated == true ->
1428	    orphan_tables(Tabs, Node, Ns, Local, Remote);
1429	_ when Read == node() -> %% Allready loaded
1430	    orphan_tables(Tabs, Node, Ns, Local, Remote);
1431	true when Active == [] ->
1432	    case DiscCopyHolders -- Ns of
1433		[] ->
1434		    %% We're last up and the other nodes have not
1435		    %% loaded the table. Lets load it if we are
1436		    %% the smallest node.
1437		    case lists:min(DiscCopyHolders) of
1438			Min when Min == node() ->
1439			    case mnesia_recover:get_master_nodes(Tab) of
1440				[] ->
1441				    L = [Tab | Local],
1442				    orphan_tables(Tabs, Node, Ns, L, Remote);
1443				Masters ->
1444                                    %% Do not disc_load table from RamCopyHolders
1445				    R = [{Tab, Masters -- RamCopyHolders} | Remote],
1446				    orphan_tables(Tabs, Node, Ns, Local, R)
1447			    end;
1448			_ ->
1449			    orphan_tables(Tabs, Node, Ns, Local, Remote)
1450		    end;
1451		_ ->
1452		    orphan_tables(Tabs, Node, Ns, Local, Remote)
1453	    end;
1454	false when Active == [], DiscCopyHolders == [], RamCopyHoldersOnDiscNodes == [] ->
1455	    %% Special case when all replicas resides on disc less nodes
1456	    orphan_tables(Tabs, Node, Ns, [Tab | Local], Remote);
1457	_ when LocalContent == true ->
1458	    orphan_tables(Tabs, Node, Ns, [Tab | Local], Remote);
1459	_ ->
1460	    orphan_tables(Tabs, Node, Ns, Local, Remote)
1461    end;
1462orphan_tables([], _, _, LocalOrphans, RemoteMasters) ->
1463    {LocalOrphans, RemoteMasters}.
1464
1465node_has_tabs([Tab | Tabs], Node, State) when Node /= node() ->
1466    State2 =
1467	try update_whereabouts(Tab, Node, State) of
1468	    State1 = #state{} -> State1
1469	catch exit:R -> %% Tab was just deleted?
1470		case ?catch_val({Tab, cstruct}) of
1471		    {'EXIT', _} -> State; % yes
1472		    _ ->  erlang:error(R)
1473		end
1474	end,
1475    node_has_tabs(Tabs, Node, State2);
1476node_has_tabs([Tab | Tabs], Node, State) ->
1477    user_sync_tab(Tab),
1478    node_has_tabs(Tabs, Node, State);
1479node_has_tabs([], _Node, State) ->
1480    State.
1481
1482update_whereabouts(Tab, Node, State) ->
1483    Storage = val({Tab, storage_type}),
1484    Read = val({Tab, where_to_read}),
1485    LocalC = val({Tab, local_content}),
1486    BeingCreated = (?catch_val({Tab, create_table}) == true),
1487    Masters = mnesia_recover:get_master_nodes(Tab),
1488    ByForce = val({Tab, load_by_force}),
1489    GoGetIt =
1490	if
1491	    ByForce == true ->
1492		true;
1493	    Masters == [] ->
1494		true;
1495	    true ->
1496		lists:member(Node, Masters)
1497	end,
1498
1499    dbg_out("Table ~w is loaded on ~w. s=~w, r=~w, lc=~w, f=~w, m=~w~n",
1500	    [Tab, Node, Storage, Read, LocalC, ByForce, GoGetIt]),
1501    if
1502	LocalC == true ->
1503	    %% Local contents, don't care about other node
1504	    State;
1505	BeingCreated == true ->
1506	    %% The table is currently being created
1507	    %% It will be handled elsewhere
1508	    State;
1509	Storage == unknown, Read == nowhere ->
1510	    %% No own copy, time to read remotely
1511	    %% if the other node is a good node
1512	    add_active_replica(Tab, Node),
1513	    case GoGetIt of
1514		true ->
1515		    set({Tab, where_to_read}, Node),
1516		    user_sync_tab(Tab),
1517		    State;
1518		false ->
1519		    State
1520	    end;
1521	Storage == unknown ->
1522	    %% No own copy, continue to read remotely
1523	    add_active_replica(Tab, Node),
1524	    NodeST = mnesia_lib:semantics(mnesia_lib:storage_type_at_node(Node, Tab), storage),
1525	    ReadST = mnesia_lib:semantics(mnesia_lib:storage_type_at_node(Read, Tab), storage),
1526	    if   %% Avoid reading from disc_only_copies
1527		NodeST == disc_only_copies ->
1528		    ignore;
1529		ReadST == disc_only_copies ->
1530		    mnesia_lib:set_remote_where_to_read(Tab);
1531		true ->
1532		    ignore
1533	    end,
1534	    user_sync_tab(Tab),
1535	    State;
1536	Read == nowhere ->
1537	    %% Own copy, go and get a copy of the table
1538	    %% if the other node is master or if there
1539	    %% are no master at all
1540	    add_active_replica(Tab, Node),
1541	    case GoGetIt of
1542		true ->
1543		    Worker = #net_load{table = Tab,
1544				       reason = {active_remote, Node}},
1545		    add_worker(Worker, State);
1546		false ->
1547		    State
1548	    end;
1549	true ->
1550	    %% We already have an own copy
1551	    add_active_replica(Tab, Node),
1552	    user_sync_tab(Tab),
1553	    State
1554    end.
1555
1556initial_safe_loads() ->
1557    case val({schema, storage_type}) of
1558	ram_copies ->
1559	    Downs = [],
1560	    Tabs = val({schema, local_tables}) -- [schema],
1561	    LastC = fun(T) -> last_consistent_replica(T, Downs) end,
1562	    lists:zf(LastC, Tabs);
1563
1564	disc_copies ->
1565	    Downs = mnesia_recover:get_mnesia_downs(),
1566	    dbg_out("mnesia_downs = ~p~n", [Downs]),
1567
1568	    Tabs = val({schema, local_tables}) -- [schema],
1569	    LastC = fun(T) -> last_consistent_replica(T, Downs) end,
1570	    lists:zf(LastC, Tabs)
1571    end.
1572
1573last_consistent_replica(Tab, Downs) ->
1574    Cs = val({Tab, cstruct}),
1575    Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
1576    Ram = Cs#cstruct.ram_copies,
1577    Disc = Cs#cstruct.disc_copies,
1578    DiscOnly = Cs#cstruct.disc_only_copies,
1579    Ext = Cs#cstruct.external_copies,
1580    BetterCopies0 = mnesia_lib:remote_copy_holders(Cs) -- Downs,
1581    BetterCopies = BetterCopies0 -- Ram,
1582    AccessMode = Cs#cstruct.access_mode,
1583    Copies = mnesia_lib:copy_holders(Cs),
1584    Masters = mnesia_recover:get_master_nodes(Tab),
1585    LocalMaster0 = lists:member(node(), Masters),
1586    LocalContent = Cs#cstruct.local_content,
1587    RemoteMaster =
1588	if
1589	    Masters == [] -> false;
1590	    true -> not LocalMaster0
1591	end,
1592    LocalMaster =
1593	if
1594	    Masters == [] -> false;
1595	    true -> LocalMaster0
1596	end,
1597    if
1598	Copies == [node()]  ->
1599	    %% Only one copy holder and it is local.
1600	    %% It may also be a local contents table
1601	    {true, {Tab, local_only}};
1602	LocalContent == true ->
1603	    {true, {Tab, local_content}};
1604	LocalMaster == true ->
1605	    %% We have a local master
1606	    {true, {Tab, local_master}};
1607	RemoteMaster == true ->
1608	    %% Wait for remote master copy
1609	    false;
1610	Storage == ram_copies ->
1611	    if
1612		Disc == [], DiscOnly == [], Ext == [] ->
1613		    %% Nobody has copy on disc
1614		    {true, {Tab, ram_only}};
1615		true ->
1616		    %% Some other node has copy on disc
1617		    false
1618	    end;
1619	AccessMode == read_only ->
1620	    %% No one has been able to update the table,
1621	    %% i.e. all disc resident copies are equal
1622	    {true, {Tab, read_only}};
1623	BetterCopies /= [], Masters /= [node()] ->
1624	    %% There are better copies on other nodes
1625	    %% and we do not have the only master copy
1626	    false;
1627	true ->
1628	    {true, {Tab, initial}}
1629    end.
1630
1631reconfigure_tables(N, [Tab |Tail]) ->
1632    del_active_replica(Tab, N),
1633    case val({Tab, where_to_read}) of
1634	N ->  mnesia_lib:set_remote_where_to_read(Tab);
1635	_ ->  ignore
1636    end,
1637    reconfigure_tables(N, Tail);
1638reconfigure_tables(_, []) ->
1639    ok.
1640
1641remove_loaders([Tab| Tabs], N, Loaders) ->
1642    LateQ = drop_loaders(Tab, N, Loaders),
1643    remove_loaders(Tabs, N, LateQ);
1644remove_loaders([],_, LateQ) -> LateQ.
1645
1646remove_early_messages([], _Node) ->
1647    [];
1648remove_early_messages([{call, {add_active_replica, [_, Node, _, _], _}, _}|R], Node) ->
1649    remove_early_messages(R, Node); %% Does a reply before queuing
1650remove_early_messages([{call, {block_table, _, From}, ReplyTo}|R], Node)
1651  when node(From) == Node ->
1652    reply(ReplyTo, ok),  %% Remove gen:server waits..
1653    remove_early_messages(R, Node);
1654remove_early_messages([{cast, {i_have_tab, _Tab, Node}}|R], Node) ->
1655    remove_early_messages(R, Node);
1656remove_early_messages([{cast, {adopt_orphans, Node, _Tabs}}|R], Node) ->
1657    remove_early_messages(R, Node);
1658remove_early_messages([M|R],Node) ->
1659    [M|remove_early_messages(R,Node)].
1660
1661%% Drop loader from late load queue and possibly trigger a disc_load
1662drop_loaders(Tab, Node, LLQ) ->
1663    case gb_trees:lookup(Tab,LLQ) of
1664	none ->
1665	    LLQ;
1666	{value, H} ->
1667	    %% Check if it is time to issue a disc_load request
1668	    case H#late_load.loaders of
1669		[Node] ->
1670		    Reason = {H#late_load.reason, last_loader_down, Node},
1671		    cast({disc_load, Tab, Reason});  % Ugly cast
1672		_ ->
1673		    ignore
1674	    end,
1675	    %% Drop the node from the list of loaders
1676	    H2 = H#late_load{loaders = H#late_load.loaders -- [Node]},
1677	    gb_trees:update(Tab, H2, LLQ)
1678    end.
1679
1680add_active_replica(Tab, Node) ->
1681    add_active_replica(Tab, Node, val({Tab, cstruct})).
1682
1683add_active_replica(Tab, Node, Cs = #cstruct{}) ->
1684    Storage = mnesia_lib:schema_cs_to_storage_type(Node, Cs),
1685    AccessMode = Cs#cstruct.access_mode,
1686    add_active_replica(Tab, Node, Storage, AccessMode).
1687
1688%% Block table primitives
1689
1690block_table(Tab) ->
1691    Var = {Tab, where_to_commit},
1692    case is_tab_blocked(val(Var)) of
1693        {true, _} -> ok;
1694        {false, W2C} -> set(Var, mark_blocked_tab(true, W2C))
1695    end.
1696
1697unblock_table(Tab) ->
1698    call({unblock_table, Tab}).
1699
1700is_tab_blocked(W2C) when is_list(W2C) ->
1701    {false, W2C};
1702is_tab_blocked({blocked, W2C}) when is_list(W2C) ->
1703    {true, W2C}.
1704
1705mark_blocked_tab(true, Value) ->
1706    {blocked, Value};
1707mark_blocked_tab(false, Value) ->
1708    Value.
1709
1710%%
1711
1712add_active_replica(Tab, Node, Storage, AccessMode) ->
1713    Var = {Tab, where_to_commit},
1714    {Blocked, Old} = is_tab_blocked(val(Var)),
1715    Del = lists:keydelete(Node, 1, Old),
1716    case AccessMode of
1717	read_write ->
1718	    New = lists:sort([{Node, Storage} | Del]),
1719	    set(Var, mark_blocked_tab(Blocked, New)), % where_to_commit
1720	    mnesia_lib:add_lsort({Tab, where_to_write}, Node);
1721	read_only ->
1722	    set(Var, mark_blocked_tab(Blocked, Del)),
1723	    mnesia_lib:del({Tab, where_to_write}, Node)
1724    end,
1725
1726    update_where_to_wlock(Tab),
1727    add({Tab, active_replicas}, Node).
1728
1729del_active_replica(Tab, Node) ->
1730    Var = {Tab, where_to_commit},
1731    {Blocked, Old} = is_tab_blocked(val(Var)),
1732    Del = lists:keydelete(Node, 1, Old),
1733    New = lists:sort(Del),
1734    set(Var, mark_blocked_tab(Blocked, New)),      % where_to_commit
1735    mnesia_lib:del({Tab, active_replicas}, Node),
1736    mnesia_lib:del({Tab, where_to_write}, Node),
1737    update_where_to_wlock(Tab).
1738
1739change_table_access_mode(Cs) ->
1740    W = fun() ->
1741		Tab = Cs#cstruct.name,
1742		lists:foreach(fun(N) -> add_active_replica(Tab, N, Cs) end,
1743			      val({Tab, active_replicas}))
1744	end,
1745    update(W).
1746
1747change_table_majority(Cs) ->
1748    W = fun() ->
1749		Tab = Cs#cstruct.name,
1750		set({Tab, majority}, Cs#cstruct.majority),
1751		update_where_to_wlock(Tab)
1752	end,
1753    update(W).
1754
1755update_where_to_wlock(Tab) ->
1756    WNodes = val({Tab, where_to_write}),
1757    Majority = ?catch_val({Tab, majority}) == true,
1758    set({Tab, where_to_wlock}, {WNodes, Majority}).
1759
1760%% node To now has tab loaded, but this must be undone
1761%% This code is rpc:call'ed from the tab_copier process
1762%% when it has *not* released it's table lock
1763unannounce_add_table_copy(Tab, To) ->
1764    ?SAFE(del_active_replica(Tab, To)),
1765    try To = val({Tab , where_to_read}),
1766	 mnesia_lib:set_remote_where_to_read(Tab)
1767    catch _:_ -> ignore
1768    end.
1769
1770user_sync_tab(Tab) ->
1771    case val(debug) of
1772	trace ->
1773	    mnesia_subscr:subscribe(whereis(mnesia_event), {table, Tab});
1774	_ ->
1775	    ignore
1776    end,
1777
1778    case erase({sync_tab, Tab}) of
1779	undefined ->
1780	    ok;
1781	Pids ->
1782	    lists:foreach(fun(Pid) -> sync_reply(Pid, Tab) end, Pids)
1783    end.
1784
1785i_have_tab(Tab) ->
1786    case val({Tab, local_content}) of
1787	true ->
1788	    mnesia_lib:set_local_content_whereabouts(Tab);
1789	false ->
1790	    set({Tab, where_to_read}, node())
1791    end,
1792    add_active_replica(Tab, node()).
1793
1794sync_and_block_table_whereabouts(Tab, ToNode, RemoteS, AccessMode) when Tab /= schema ->
1795    Current = val({current, db_nodes}),
1796    Ns =
1797	case lists:member(ToNode, Current) of
1798	    true -> Current -- [ToNode];
1799	    false -> Current
1800	end,
1801    _ = remote_call(ToNode, block_table, [Tab]),
1802    [remote_call(Node, add_active_replica, [Tab, ToNode, RemoteS, AccessMode]) ||
1803	Node <- [ToNode | Ns]],
1804    ok.
1805
1806sync_del_table_copy_whereabouts(Tab, ToNode) when Tab /= schema ->
1807    Current = val({current, db_nodes}),
1808    Ns =
1809	case lists:member(ToNode, Current) of
1810	    true -> Current;
1811	    false -> [ToNode | Current]
1812	end,
1813    Args = [Tab, ToNode],
1814    [remote_call(Node, unannounce_add_table_copy, Args) || Node <- Ns],
1815    ok.
1816
1817get_info(Timeout) ->
1818    case whereis(?SERVER_NAME) of
1819	undefined ->
1820	    {timeout, Timeout};
1821	Pid ->
1822	    Pid ! {self(), get_state},
1823	    receive
1824		{?SERVER_NAME, State = #state{loader_queue=LQ,late_loader_queue=LLQ}} ->
1825		    {info,State#state{loader_queue=gb_trees:to_list(LQ),
1826				      late_loader_queue=gb_trees:to_list(LLQ)}}
1827	    after Timeout ->
1828		    {timeout, Timeout}
1829	    end
1830    end.
1831
1832get_workers(Timeout) ->
1833    case whereis(?SERVER_NAME) of
1834	undefined ->
1835	    {timeout, Timeout};
1836	Pid ->
1837	    Pid ! {self(), get_state},
1838	    receive
1839		{?SERVER_NAME, State = #state{}} ->
1840		    {workers, get_loaders(State), get_senders(State), State#state.dumper_pid}
1841	    after Timeout ->
1842		    {timeout, Timeout}
1843	    end
1844    end.
1845
1846info() ->
1847    Tabs = mnesia_lib:local_active_tables(),
1848    io:format( "---> Active tables <--- ~n", []),
1849    info(Tabs).
1850
1851info([Tab | Tail]) ->
1852    case val({Tab, storage_type}) of
1853	disc_only_copies ->
1854	    info_format(Tab,
1855			dets:info(Tab, size),
1856			dets:info(Tab, file_size),
1857			"bytes on disc");
1858        {ext, Alias, Mod} ->
1859            info_format(Tab,
1860                        Mod:info(Alias, Tab, size),
1861                        Mod:info(Alias, Tab, memory),
1862                        "words of mem");
1863	_ ->
1864	    info_format(Tab,
1865			?ets_info(Tab, size),
1866			?ets_info(Tab, memory),
1867			"words of mem")
1868    end,
1869    info(Tail);
1870info([]) -> ok.
1871
1872
1873info_format(Tab, Size, Mem, Media) ->
1874    StrT = mnesia_lib:pad_name(atom_to_list(Tab), 15, []),
1875    StrS = mnesia_lib:pad_name(integer_to_list(Size), 8, []),
1876    StrM = mnesia_lib:pad_name(integer_to_list(Mem), 8, []),
1877    io:format("~ts: with ~s records occupying ~s ~s~n",
1878	      [StrT, StrS, StrM, Media]).
1879
1880%% Handle early arrived messages
1881handle_early_msgs([Msg | Msgs], State) ->
1882    %% The messages are in reverse order
1883    case handle_early_msg(Msg, State) of
1884%%         {stop, Reason, Reply, State2} ->  % Will not happen according to dialyzer
1885%% 	    {stop, Reason, Reply, State2};
1886        {stop, Reason, State2} ->
1887	    {stop, Reason, State2};
1888	{noreply, State2} ->
1889	    handle_early_msgs(Msgs, State2);
1890 	{reply, Reply, State2} ->
1891	    {call, _Call, From} = Msg,
1892	    reply(From, Reply),
1893 	    handle_early_msgs(Msgs, State2)
1894    end;
1895handle_early_msgs([], State) ->
1896    noreply(State).
1897
1898handle_early_msg({call, Msg, From}, State) ->
1899    handle_call(Msg, From, State);
1900handle_early_msg({cast, Msg}, State) ->
1901    handle_cast(Msg, State);
1902handle_early_msg({info, Msg}, State) ->
1903    handle_info(Msg, State).
1904
1905noreply(State) ->
1906    {noreply, State}.
1907
1908reply(undefined, Reply) ->
1909    Reply;
1910reply(ReplyTo, Reply) ->
1911    gen_server:reply(ReplyTo, Reply),
1912    Reply.
1913
1914%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1915%% Worker management
1916
1917%% Returns new State
1918add_worker(Worker = #dump_log{}, State) ->
1919    InitBy = Worker#dump_log.initiated_by,
1920    Queue = State#state.dumper_queue,
1921    Status =
1922        case lists:keymember(InitBy, #dump_log.initiated_by, Queue) of
1923            true when Worker#dump_log.opt_reply_to == undefined ->
1924                %% The same threshold has been exceeded again,
1925                %% before we have had the possibility to
1926                %% process the older one.
1927                DetectedBy = {dump_log, InitBy},
1928                Event = {mnesia_overload, DetectedBy},
1929                mnesia_lib:report_system_event(Event),
1930                true;
1931            _ ->
1932                false
1933        end,
1934    mnesia_recover:log_dump_overload(Status),
1935    Queue2 = Queue ++ [Worker],
1936    State2 = State#state{dumper_queue = Queue2},
1937    opt_start_worker(State2);
1938add_worker(Worker = #schema_commit_lock{}, State) ->
1939    Queue = State#state.dumper_queue,
1940    Queue2 = Queue ++ [Worker],
1941    State2 = State#state{dumper_queue = Queue2},
1942    opt_start_worker(State2);
1943add_worker(Worker = #net_load{}, State) ->
1944    opt_start_worker(add_loader(Worker#net_load.table,Worker,State));
1945add_worker(Worker = #send_table{}, State) ->
1946    Queue = State#state.sender_queue,
1947    State2 = State#state{sender_queue = Queue ++ [Worker]},
1948    opt_start_worker(State2);
1949add_worker(Worker = #disc_load{}, State) ->
1950    opt_start_worker(add_loader(Worker#disc_load.table,Worker,State));
1951% Block controller should be used for upgrading mnesia.
1952add_worker(Worker = #block_controller{}, State) ->
1953    Queue = State#state.dumper_queue,
1954    Queue2 = [Worker | Queue],
1955    State2 = State#state{dumper_queue = Queue2},
1956    opt_start_worker(State2).
1957
1958add_loader(Tab,Worker,State = #state{loader_queue=LQ0}) ->
1959    case gb_trees:is_defined(Tab, LQ0) of
1960	true -> State;
1961	false ->
1962	    LQ=gb_trees:insert(Tab, Worker, LQ0),
1963	    State#state{loader_queue=LQ}
1964    end.
1965
1966%% Optionally start a worker
1967%%
1968%% Dumpers and loaders may run simultaneously
1969%% but neither of them may run during schema commit.
1970%% Loaders may not start if a schema commit is enqueued.
1971opt_start_worker(State) when State#state.is_stopping == true ->
1972    State;
1973opt_start_worker(State) ->
1974    %% Prioritize dumper and schema commit
1975    %% by checking them first
1976    case State#state.dumper_queue of
1977	[Worker | _Rest] when State#state.dumper_pid == undefined ->
1978	    %% Great, a worker in queue and neither
1979	    %% a schema transaction is being
1980	    %% committed and nor a dumper is running
1981
1982	    %% Start worker but keep him in the queue
1983	    if
1984		is_record(Worker, schema_commit_lock) ->
1985		    ReplyTo = Worker#schema_commit_lock.owner,
1986		    reply(ReplyTo, granted),
1987		    {Owner, _Tag} = ReplyTo,
1988		    opt_start_loader(State#state{dumper_pid = Owner});
1989
1990		is_record(Worker, dump_log) ->
1991		    Pid = spawn_link(?MODULE, dump_and_reply, [self(), Worker]),
1992		    State2 = State#state{dumper_pid = Pid},
1993
1994		    %% If the worker was a dumper we may
1995		    %% possibly be able to start a loader
1996		    %% or sender
1997		    State3 = opt_start_sender(State2),
1998		    opt_start_loader(State3);
1999
2000		is_record(Worker, block_controller) ->
2001		    case {get_senders(State), get_loaders(State)} of
2002			{[], []} ->
2003			    ReplyTo = Worker#block_controller.owner,
2004			    reply(ReplyTo, granted),
2005			    {Owner, _Tag} = ReplyTo,
2006			    State#state{dumper_pid = Owner};
2007			_ ->
2008			    State
2009		    end
2010	    end;
2011	_ ->
2012	    %% Bad luck, try with a loader or sender instead
2013	    State2 = opt_start_sender(State),
2014	    opt_start_loader(State2)
2015    end.
2016
2017opt_start_sender(State) ->
2018    case State#state.sender_queue of
2019	[]->   State; 	    %% No need
2020	SenderQ ->
2021	    {NewS,Kept} = opt_start_sender2(SenderQ, get_senders(State),
2022					    [], get_loaders(State)),
2023	    State#state{sender_pid = NewS, sender_queue = Kept}
2024    end.
2025
2026opt_start_sender2([], Pids,Kept, _) -> {Pids,Kept};
2027opt_start_sender2([Sender|R], Pids, Kept, LoaderQ) ->
2028    Tab = Sender#send_table.table,
2029    Active = val({Tab, active_replicas}),
2030    IgotIt = lists:member(node(), Active),
2031    IsLoading = lists:any(fun({_Pid,Loader}) ->
2032				  Tab == element(#net_load.table, Loader)
2033			  end, LoaderQ),
2034    if
2035	IgotIt, IsLoading  ->
2036	    %% I'm currently finishing loading the table let him wait
2037	    opt_start_sender2(R,Pids, [Sender|Kept], LoaderQ);
2038	IgotIt ->
2039	    %% Start worker but keep him in the queue
2040	    Pid = spawn_link(?MODULE, send_and_reply,[self(), Sender]),
2041	    opt_start_sender2(R,[{Pid,Sender}|Pids],Kept,LoaderQ);
2042	true ->
2043	    verbose("Send table failed ~tp not active on this node ~n", [Tab]),
2044	    Sender#send_table.receiver_pid ! {copier_done, node()},
2045	    opt_start_sender2(R,Pids, Kept, LoaderQ)
2046    end.
2047
2048opt_start_loader(State = #state{loader_queue = LoaderQ}) ->
2049    Current = get_loaders(State),
2050    Max = max_loaders(),
2051    case gb_trees:is_empty(LoaderQ) of
2052	true ->
2053	    State;
2054	_ when length(Current) >= Max ->
2055	    State;
2056	false ->
2057	    SchemaQueue = State#state.dumper_queue,
2058	    case lists:keymember(schema_commit_lock, 1, SchemaQueue) of
2059		false ->
2060		    case pick_next(LoaderQ) of
2061			{none,Rest} ->
2062			    State#state{loader_queue=Rest};
2063			{Worker,Rest} ->
2064			    case already_loading(Worker, get_loaders(State)) of
2065				true ->
2066				    opt_start_loader(State#state{loader_queue = Rest});
2067				false ->
2068				    %% Start worker but keep him in the queue
2069				    Pid = load_and_reply(self(), Worker),
2070				    State#state{loader_pid=[{Pid,Worker}|get_loaders(State)],
2071						loader_queue = Rest}
2072			    end
2073		    end;
2074		true ->
2075		    %% Bad luck, we must wait for the schema commit
2076		    State
2077	    end
2078    end.
2079
2080already_loading(#net_load{table=Tab},Loaders) ->
2081    already_loading2(Tab,Loaders);
2082already_loading(#disc_load{table=Tab},Loaders) ->
2083    already_loading2(Tab,Loaders).
2084
2085already_loading2(Tab, [{_,#net_load{table=Tab}}|_]) -> true;
2086already_loading2(Tab, [{_,#disc_load{table=Tab}}|_]) -> true;
2087already_loading2(Tab, [_|Rest]) -> already_loading2(Tab,Rest);
2088already_loading2(_,[]) -> false.
2089
2090start_remote_sender(Node, Tab, Receiver, Storage) ->
2091    Msg = #send_table{table = Tab,
2092		      receiver_pid = Receiver,
2093		      remote_storage = Storage},
2094    gen_server:cast({?SERVER_NAME, Node}, Msg).
2095
2096dump_and_reply(ReplyTo, Worker) ->
2097    %% No trap_exit, die intentionally instead
2098    Res = case Worker#dump_log.operation of
2099	      dump_log ->
2100		  mnesia_dumper:opt_dump_log(Worker#dump_log.initiated_by);
2101	      F when is_function(F, 0) ->
2102		  F()
2103	  end,
2104    ReplyTo ! #dumper_done{worker_pid = self(),
2105			   worker_res = Res},
2106    unlink(ReplyTo),
2107    exit(normal).
2108
2109send_and_reply(ReplyTo, Worker) ->
2110    %% No trap_exit, die intentionally instead
2111    Res = mnesia_loader:send_table(Worker#send_table.receiver_pid,
2112				   Worker#send_table.table,
2113				   Worker#send_table.remote_storage),
2114    ReplyTo ! #sender_done{worker_pid = self(),
2115			   worker_res = Res},
2116    unlink(ReplyTo),
2117    exit(normal).
2118
2119load_and_reply(ReplyTo, Worker) ->
2120    Load = load_table_fun(Worker),
2121    SendAndReply =
2122	fun() ->
2123		process_flag(trap_exit, true),
2124		Done = Load(),
2125		ReplyTo ! Done#loader_done{worker_pid = self()},
2126		unlink(ReplyTo),
2127		exit(normal)
2128	end,
2129    spawn_link(SendAndReply).
2130
2131%% Now it is time to load the table
2132%% but first we must check if it still is neccessary
2133load_table_fun(#net_load{cstruct=Cs, table=Tab, reason=Reason, opt_reply_to=ReplyTo}) ->
2134    LocalC = val({Tab, local_content}),
2135    AccessMode = val({Tab, access_mode}),
2136    ReadNode = val({Tab, where_to_read}),
2137    Active = filter_active(Tab),
2138    Done = #loader_done{is_loaded = true,
2139			table_name = Tab,
2140			needs_announce = false,
2141			needs_sync = false,
2142			needs_reply = (ReplyTo /= undefined),
2143			reply_to = ReplyTo,
2144			reply = {loaded, ok}
2145		       },
2146    AddTableCopy = case Reason of
2147		       {dumper,{add_table_copy,_}} -> true;
2148		       _ -> false
2149		   end,
2150
2151    OnlyRamCopies = case Cs of
2152                        #cstruct{disc_copies = DC,
2153                                 disc_only_copies = DOC,
2154                                 external_copies = Ext} ->
2155                            [] =:= (DC ++ (DOC ++ Ext)) -- [node()];
2156                        _ ->
2157                            false
2158                    end,
2159    if
2160	ReadNode == node() ->
2161	    %% Already loaded locally
2162	    fun() -> Done end;
2163	LocalC == true ->
2164	    fun() ->
2165		    Res = mnesia_loader:disc_load_table(Tab, load_local_content),
2166		    Done#loader_done{reply = Res, needs_announce = true, needs_sync = true}
2167	    end;
2168	AccessMode == read_only, not AddTableCopy ->
2169	    fun() -> disc_load_table(Tab, Reason, ReplyTo) end;
2170        Active =:= [], AddTableCopy, OnlyRamCopies ->
2171            fun() -> disc_load_table(Tab, Reason, ReplyTo) end;
2172	true ->
2173	    fun() ->
2174		    %% Either we cannot read the table yet
2175		    %% or someone is moving a replica between
2176		    %% two nodes
2177		    Res = mnesia_loader:net_load_table(Tab, Reason, Active, Cs),
2178		    case Res of
2179			{loaded, ok} ->
2180			    Done#loader_done{needs_sync = true,
2181					     reply = Res};
2182			{not_loaded, _} ->
2183			    Done#loader_done{is_loaded = false,
2184					     reply = Res}
2185		    end
2186	    end
2187    end;
2188load_table_fun(#disc_load{table=Tab, reason=Reason, opt_reply_to=ReplyTo}) ->
2189    ReadNode = val({Tab, where_to_read}),
2190    Active = filter_active(Tab),
2191    Done = #loader_done{is_loaded = true,
2192			table_name = Tab,
2193			needs_announce = false,
2194			needs_sync = false,
2195			needs_reply = false
2196		       },
2197    if
2198	Active == [], ReadNode == nowhere ->
2199	    %% Not loaded anywhere, lets load it from disc
2200	    fun() -> disc_load_table(Tab, Reason, ReplyTo) end;
2201	ReadNode == nowhere ->
2202	    %% Already loaded on other node, lets get it
2203	    Cs = val({Tab, cstruct}),
2204	    fun() ->
2205		    case mnesia_loader:net_load_table(Tab, Reason, Active, Cs) of
2206			{loaded, ok} ->
2207			    Done#loader_done{needs_sync = true};
2208			{not_loaded, storage_unknown} ->
2209			    Done#loader_done{is_loaded = false};
2210			{not_loaded, ErrReason} ->
2211			    Done#loader_done{is_loaded = false,
2212					     reply = {not_loaded,ErrReason}}
2213		    end
2214	    end;
2215	true ->
2216	    %% Already readable, do not worry be happy
2217	    fun() -> Done end
2218    end.
2219
2220disc_load_table(Tab, Reason, ReplyTo) ->
2221    Done = #loader_done{is_loaded = true,
2222			table_name = Tab,
2223			needs_announce = false,
2224			needs_sync = false,
2225			needs_reply = ReplyTo /= undefined,
2226			reply_to = ReplyTo,
2227			reply = {loaded, ok}
2228		       },
2229    Res = mnesia_loader:disc_load_table(Tab, Reason),
2230    if
2231	Res == {loaded, ok} ->
2232	    Done#loader_done{needs_announce = true,
2233			     needs_sync = true,
2234			     reply = Res};
2235	ReplyTo /= undefined ->
2236	    Done#loader_done{is_loaded = false,
2237			     reply = Res};
2238	true ->
2239	    fatal("Cannot load table ~tp from disc: ~tp~n", [Tab, Res])
2240    end.
2241
2242filter_active(Tab) ->
2243    ByForce = val({Tab, load_by_force}),
2244    Active  = val({Tab, active_replicas}),
2245    Masters = mnesia_recover:get_master_nodes(Tab),
2246    Ns = do_filter_active(ByForce, Active, Masters),
2247    %% Reorder the so that we load from fastest first
2248    LS = ?catch_val({Tab, storage_type}),
2249    DOC = val({Tab, disc_only_copies}),
2250    {Good,Worse} =
2251	case LS of
2252	    disc_only_copies ->
2253		G = mnesia_lib:intersect(Ns, DOC),
2254		{G,Ns--G};
2255	    _ ->
2256		G = Ns -- DOC,
2257		{G,Ns--G}
2258	end,
2259    %% Pick a random node of the fastest
2260    Len = length(Good),
2261    if
2262	Len > 0 ->
2263	    R = erlang:phash(node(), Len+1),
2264	    random(R-1,Good,Worse);
2265	true  ->
2266	    Worse
2267    end.
2268
2269random(N, [H|R], Acc) when N > 0 ->
2270    random(N-1,R, [H|Acc]);
2271random(0, L, Acc) ->
2272    L ++ Acc.
2273
2274do_filter_active(true, Active, _Masters) ->
2275    Active;
2276do_filter_active(false, Active, []) ->
2277    Active;
2278do_filter_active(false, Active, Masters) ->
2279    mnesia_lib:intersect(Active, Masters).
2280
2281
2282