1%% ``Licensed under the Apache License, Version 2.0 (the "License");
2%% you may not use this file except in compliance with the License.
3%% You may obtain a copy of the License at
4%%
5%%     http://www.apache.org/licenses/LICENSE-2.0
6%%
7%% Unless required by applicable law or agreed to in writing, software
8%% distributed under the License is distributed on an "AS IS" BASIS,
9%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10%% See the License for the specific language governing permissions and
11%% limitations under the License.
12%%
13%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
14%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
15%% AB. All Rights Reserved.''
16%%
17%%     $Id: mnesia_schema.erl,v 1.2 2010/03/04 13:54:20 maria Exp $
18%% In this module we provide a number of explicit functions
19%% to maninpulate the schema. All these functions are called
20%% within a special schema transaction.
21%%
22%% We also have an init/1 function defined here, this func is
23%% used by mnesia:start() to initialize the entire schema.
24
25-module(mnesia_schema).
26
27-export([
28         add_snmp/2,
29         add_table_copy/3,
30         add_table_index/2,
31	 arrange_restore/3,
32         attr_tab_to_pos/2,
33         attr_to_pos/2,
34         change_table_copy_type/3,
35         change_table_access_mode/2,
36         change_table_load_order/2,
37	 change_table_frag/2,
38	 clear_table/1,
39         create_table/1,
40	 cs2list/1,
41         del_snmp/1,
42         del_table_copy/2,
43         del_table_index/2,
44         delete_cstruct/2,
45         delete_schema/1,
46         delete_schema2/0,
47         delete_table/1,
48         delete_table_property/2,
49         dump_tables/1,
50         ensure_no_schema/1,
51	 get_create_list/1,
52         get_initial_schema/2,
53	 get_table_properties/1,
54         info/0,
55         info/1,
56         init/1,
57         insert_cstruct/3,
58	 is_remote_member/1,
59         list2cs/1,
60         lock_schema/0,
61	 lock_del_table/4, % Spawned
62         merge_schema/0,
63         move_table/3,
64         opt_create_dir/2,
65         prepare_commit/3,
66         purge_dir/2,
67         purge_tmp_files/0,
68         ram_delete_table/2,
69%         ram_delete_table/3,
70	 read_cstructs_from_disc/0,
71         read_nodes/0,
72         remote_read_schema/0,
73	 restore/1,
74         restore/2,
75         restore/3,
76	 schema_coordinator/3,
77	 set_where_to_read/3,
78         transform_table/4,
79         undo_prepare_commit/2,
80         unlock_schema/0,
81         version/0,
82         write_table_property/2
83        ]).
84
85%% Exports for mnesia_frag
86-export([
87	 get_tid_ts_and_lock/2,
88	 make_create_table/1,
89         ensure_active/1,
90	 pick/4,
91	 verify/3,
92	 incr_version/1,
93	 check_keys/3,
94	 check_duplicates/2,
95	 make_delete_table/2
96	]).
97
98%% Needed outside to be able to use/set table_properties
99%% from user (not supported)
100-export([schema_transaction/1,
101	 insert_schema_ops/2,
102	 do_create_table/1,
103	 do_delete_table/1,
104	 do_delete_table_property/2,
105	 do_write_table_property/2]).
106
107-include("mnesia.hrl").
108-include_lib("kernel/include/file.hrl").
109
110-import(mnesia_lib, [set/2, del/2, verbose/2, dbg_out/2]).
111
112%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
113%% Here comes the init function which also resides in
114%% this module, it is called upon by the trans server
115%% at startup of the system
116%%
117%% We have a meta table which looks like
118%% {table, schema,
119%%    {type, set},
120%%    {disc_copies, all},
121%%    {arity, 2}
122%%    {attributes, [key, val]}
123%%
124%% This means that we have a series of {schema, Name, Cs} tuples
125%% in a table called schema !!
126
127init(IgnoreFallback) ->
128    Res = read_schema(true, false, IgnoreFallback),
129    {ok, Source, _CreateList} = exit_on_error(Res),
130    verbose("Schema initiated from: ~p~n", [Source]),
131    set({schema, tables}, []),
132    set({schema, local_tables}, []),
133    Tabs = set_schema(?ets_first(schema)),
134    lists:foreach(fun(Tab) -> clear_whereabouts(Tab) end, Tabs),
135    set({schema, where_to_read}, node()),
136    set({schema, load_node}, node()),
137    set({schema, load_reason}, initial),
138    mnesia_controller:add_active_replica(schema, node()).
139
140exit_on_error({error, Reason}) ->
141    exit(Reason);
142exit_on_error(GoodRes) ->
143    GoodRes.
144
145val(Var) ->
146    case ?catch_val(Var) of
147	{'EXIT', Reason} -> mnesia_lib:other_val(Var, Reason);
148	Value -> Value
149    end.
150
151%% This function traverses all cstructs in the schema and
152%% sets all values in mnesia_gvar accordingly for each table/cstruct
153
154set_schema('$end_of_table') ->
155    [];
156set_schema(Tab) ->
157    do_set_schema(Tab),
158    [Tab | set_schema(?ets_next(schema, Tab))].
159
160get_create_list(Tab) ->
161    ?ets_lookup_element(schema, Tab, 3).
162
163do_set_schema(Tab) ->
164    List = get_create_list(Tab),
165    Cs = list2cs(List),
166    do_set_schema(Tab, Cs).
167
168do_set_schema(Tab, Cs) ->
169    Type = Cs#cstruct.type,
170    set({Tab, setorbag}, Type),
171    set({Tab, local_content}, Cs#cstruct.local_content),
172    set({Tab, ram_copies}, Cs#cstruct.ram_copies),
173    set({Tab, disc_copies}, Cs#cstruct.disc_copies),
174    set({Tab, disc_only_copies}, Cs#cstruct.disc_only_copies),
175    set({Tab, load_order}, Cs#cstruct.load_order),
176    set({Tab, access_mode}, Cs#cstruct.access_mode),
177    set({Tab, snmp}, Cs#cstruct.snmp),
178    set({Tab, user_properties}, Cs#cstruct.user_properties),
179    [set({Tab, user_property, element(1, P)}, P) || P <- Cs#cstruct.user_properties],
180    set({Tab, frag_properties}, Cs#cstruct.frag_properties),
181    mnesia_frag:set_frag_hash(Tab, Cs#cstruct.frag_properties),
182    set({Tab, attributes}, Cs#cstruct.attributes),
183    Arity = length(Cs#cstruct.attributes) + 1,
184    set({Tab, arity}, Arity),
185    RecName =  Cs#cstruct.record_name,
186    set({Tab, record_name}, RecName),
187    set({Tab, record_validation}, {RecName, Arity, Type}),
188    set({Tab, wild_pattern}, wild(RecName, Arity)),
189    set({Tab, index}, Cs#cstruct.index),
190    %% create actual index tabs later
191    set({Tab, cookie}, Cs#cstruct.cookie),
192    set({Tab, version}, Cs#cstruct.version),
193    set({Tab, cstruct}, Cs),
194    Storage = mnesia_lib:schema_cs_to_storage_type(node(), Cs),
195    set({Tab, storage_type}, Storage),
196    mnesia_lib:add({schema, tables}, Tab),
197    Ns = mnesia_lib:cs_to_nodes(Cs),
198    case lists:member(node(), Ns) of
199        true ->
200            mnesia_lib:add({schema, local_tables}, Tab);
201        false when Tab == schema ->
202            mnesia_lib:add({schema, local_tables}, Tab);
203        false ->
204            ignore
205    end.
206
207wild(RecName, Arity) ->
208    Wp0 = list_to_tuple(lists:duplicate(Arity, '_')),
209    setelement(1, Wp0, RecName).
210
211%% Temporarily read the local schema and return a list
212%% of all nodes mentioned in the schema.DAT file
213read_nodes() ->
214    %% Ensure that we access the intended Mnesia
215    %% directory. This function may not be called
216    %% during startup since it will cause the
217    %% application_controller to get into deadlock
218    case mnesia_lib:ensure_loaded(?APPLICATION) of
219	ok ->
220	    case read_schema(false, false) of
221		{ok, _Source, CreateList} ->
222		    Cs = list2cs(CreateList),
223		    {ok, Cs#cstruct.disc_copies ++ Cs#cstruct.ram_copies};
224		{error, Reason} ->
225		    {error, Reason}
226	    end;
227	{error, Reason} ->
228	    {error, Reason}
229    end.
230
231%% Returns Version from the tuple {Version,MasterNodes}
232version() ->
233    case read_schema(false, false) of
234        {ok, Source, CreateList} when Source /= default ->
235	    Cs = list2cs(CreateList),
236            {Version, _Details} = Cs#cstruct.version,
237            Version;
238        _ ->
239            case dir_exists(mnesia_lib:dir()) of
240                true -> {1,0};
241                false -> {0,0}
242            end
243    end.
244
245%% Calculate next table version from old cstruct
246incr_version(Cs) ->
247    {{Major, Minor}, _} = Cs#cstruct.version,
248    Nodes = mnesia_lib:intersect(val({schema, disc_copies}),
249                                 mnesia_lib:cs_to_nodes(Cs)),
250    V =
251        case Nodes -- val({Cs#cstruct.name, active_replicas}) of
252            [] -> {Major + 1, 0};    % All replicas are active
253            _ -> {Major, Minor + 1}  % Some replicas are inactive
254        end,
255    Cs#cstruct{version = {V, {node(), now()}}}.
256
257%% Returns table name
258insert_cstruct(Tid, Cs, KeepWhereabouts) ->
259    Tab = Cs#cstruct.name,
260    TabDef = cs2list(Cs),
261    Val = {schema, Tab, TabDef},
262    mnesia_checkpoint:tm_retain(Tid, schema, Tab, write),
263    mnesia_subscr:report_table_event(schema, Tid, Val, write),
264    Active = val({Tab, active_replicas}),
265
266    case KeepWhereabouts of
267        true ->
268            ignore;
269        false when Active == [] ->
270            clear_whereabouts(Tab);
271        false ->
272            %% Someone else has initiated table
273            ignore
274    end,
275    set({Tab, cstruct}, Cs),
276    ?ets_insert(schema, Val),
277    do_set_schema(Tab, Cs),
278    Val.
279
280clear_whereabouts(Tab) ->
281    set({Tab, checkpoints}, []),
282    set({Tab, subscribers}, []),
283    set({Tab, where_to_read}, nowhere),
284    set({Tab, active_replicas}, []),
285    set({Tab, commit_work}, []),
286    set({Tab, where_to_write}, []),
287    set({Tab, where_to_commit}, []),
288    set({Tab, load_by_force}, false),
289    set({Tab, load_node}, unknown),
290    set({Tab, load_reason}, unknown).
291
292%% Returns table name
293delete_cstruct(Tid, Cs) ->
294    Tab = Cs#cstruct.name,
295    TabDef = cs2list(Cs),
296    Val = {schema, Tab, TabDef},
297    mnesia_checkpoint:tm_retain(Tid, schema, Tab, delete),
298    mnesia_subscr:report_table_event(schema, Tid, Val, delete),
299    ?ets_match_delete(mnesia_gvar, {{Tab, '_'}, '_'}),
300    ?ets_match_delete(mnesia_gvar, {{Tab, '_', '_'}, '_'}),
301    del({schema, local_tables}, Tab),
302    del({schema, tables}, Tab),
303    ?ets_delete(schema, Tab),
304    Val.
305
306%% Delete the Mnesia directory on all given nodes
307%% Requires that Mnesia is not running anywhere
308%% Returns ok | {error,Reason}
309delete_schema(Ns) when list(Ns), Ns /= [] ->
310    RunningNs = mnesia_lib:running_nodes(Ns),
311    Reason = "Cannot delete schema on all nodes",
312    if
313        RunningNs == [] ->
314	    case rpc:multicall(Ns, ?MODULE, delete_schema2, []) of
315		{Replies, []} ->
316		    case [R || R <- Replies, R /= ok]  of
317			[] ->
318			    ok;
319			BadReplies ->
320			    verbose("~s: ~p~n", [Reason, BadReplies]),
321			    {error, {"All nodes not running", BadReplies}}
322		    end;
323		{_Replies, BadNs} ->
324                    verbose("~s: ~p~n", [Reason, BadNs]),
325                    {error, {"All nodes not running", BadNs}}
326            end;
327        true ->
328            verbose("~s: ~p~n", [Reason, RunningNs]),
329            {error, {"Mnesia is not stopped everywhere", RunningNs}}
330    end;
331delete_schema(Ns) ->
332    {error, {badarg, Ns}}.
333
334delete_schema2() ->
335    %% Ensure that we access the intended Mnesia
336    %% directory. This function may not be called
337    %% during startup since it will cause the
338    %% application_controller to get into deadlock
339    case mnesia_lib:ensure_loaded(?APPLICATION) of
340	ok ->
341	    case mnesia_lib:is_running() of
342		no ->
343		    Dir = mnesia_lib:dir(),
344		    purge_dir(Dir, []),
345		    ok;
346		_ ->
347		    {error, {"Mnesia still running", node()}}
348	    end;
349	{error, Reason} ->
350	    {error, Reason}
351    end.
352
353ensure_no_schema([H|T]) when atom(H) ->
354    case rpc:call(H, ?MODULE, remote_read_schema, []) of
355        {badrpc, Reason} ->
356            {H, {"All nodes not running", H, Reason}};
357        {ok,Source, _} when Source /= default ->
358            {H, {already_exists, H}};
359        _ ->
360            ensure_no_schema(T)
361    end;
362ensure_no_schema([H|_]) ->
363    {error,{badarg, H}};
364ensure_no_schema([]) ->
365    ok.
366
367remote_read_schema() ->
368    %% Ensure that we access the intended Mnesia
369    %% directory. This function may not be called
370    %% during startup since it will cause the
371    %% application_controller to get into deadlock
372    case mnesia_lib:ensure_loaded(?APPLICATION) of
373	ok ->
374	    case mnesia_monitor:get_env(schema_location) of
375		opt_disc ->
376		    read_schema(false, true);
377		_ ->
378		    read_schema(false, false)
379	    end;
380	{error, Reason} ->
381	    {error, Reason}
382    end.
383
384dir_exists(Dir) ->
385    dir_exists(Dir, mnesia_monitor:use_dir()).
386dir_exists(Dir, true) ->
387    case file:read_file_info(Dir) of
388        {ok, _} -> true;
389        _ -> false
390    end;
391dir_exists(_Dir, false) ->
392    false.
393
394opt_create_dir(UseDir, Dir) when UseDir == true->
395    case dir_exists(Dir, UseDir) of
396        true ->
397            check_can_write(Dir);
398        false ->
399            case file:make_dir(Dir) of
400                ok ->
401                    verbose("Create Directory ~p~n", [Dir]),
402                    ok;
403                {error, Reason} ->
404                    verbose("Cannot create mnesia dir ~p~n", [Reason]),
405                    {error, {"Cannot create Mnesia dir", Dir, Reason}}
406            end
407    end;
408opt_create_dir(false, _) ->
409    {error, {has_no_disc, node()}}.
410
411check_can_write(Dir) ->
412    case file:read_file_info(Dir) of
413        {ok, FI} when FI#file_info.type == directory,
414		      FI#file_info.access == read_write ->
415            ok;
416        {ok, _} ->
417            {error, "Not allowed to write in Mnesia dir", Dir};
418        _ ->
419            {error, "Non existent Mnesia dir", Dir}
420    end.
421
422lock_schema() ->
423    mnesia_lib:lock_table(schema).
424
425unlock_schema() ->
426    mnesia_lib:unlock_table(schema).
427
428read_schema(Keep, _UseDirAnyway) ->
429    read_schema(Keep, false, false).
430
431%% The schema may be read for several reasons.
432%% If Mnesia is not already started the read intention
433%% we normally do not want the ets table named schema
434%% be left around.
435%% If Keep == true, the ets table schema is kept
436%% If Keep == false, the ets table schema is removed
437%%
438%% Returns {ok, Source, SchemaCstruct} or {error, Reason}
439%% Source may be: default | ram | disc | fallback
440
441read_schema(Keep, UseDirAnyway, IgnoreFallback) ->
442    lock_schema(),
443    Res =
444        case mnesia:system_info(is_running) of
445            yes ->
446                {ok, ram, get_create_list(schema)};
447            _IsRunning ->
448                    case mnesia_monitor:use_dir() of
449                        true ->
450                            read_disc_schema(Keep, IgnoreFallback);
451                        false when UseDirAnyway == true ->
452                            read_disc_schema(Keep, IgnoreFallback);
453                        false when Keep == true ->
454                            Args = [{keypos, 2}, public, named_table, set],
455                            mnesia_monitor:mktab(schema, Args),
456                            CreateList = get_initial_schema(ram_copies, []),
457                            ?ets_insert(schema,{schema, schema, CreateList}),
458                            {ok, default, CreateList};
459                        false when Keep == false ->
460			    CreateList = get_initial_schema(ram_copies, []),
461                            {ok, default, CreateList}
462                    end
463        end,
464    unlock_schema(),
465    Res.
466
467read_disc_schema(Keep, IgnoreFallback) ->
468    Running = mnesia:system_info(is_running),
469    case mnesia_bup:fallback_exists() of
470        true when IgnoreFallback == false, Running /= yes ->
471             mnesia_bup:fallback_to_schema();
472        _ ->
473            %% If we're running, we read the schema file even
474            %% if fallback exists
475            Dat = mnesia_lib:tab2dat(schema),
476            case mnesia_lib:exists(Dat) of
477                true ->
478                    do_read_disc_schema(Dat, Keep);
479                false ->
480		    Dmp = mnesia_lib:tab2dmp(schema),
481		    case mnesia_lib:exists(Dmp) of
482			true ->
483			    %% May only happen when toggling of
484			    %% schema storage type has been
485			    %% interrupted
486			    do_read_disc_schema(Dmp, Keep);
487			false ->
488			    {error, "No schema file exists"}
489		    end
490            end
491    end.
492
493do_read_disc_schema(Fname, Keep) ->
494    T =
495        case Keep of
496            false ->
497                Args = [{keypos, 2}, public, set],
498                ?ets_new_table(schema, Args);
499            true ->
500                Args = [{keypos, 2}, public, named_table, set],
501                mnesia_monitor:mktab(schema, Args)
502        end,
503    Repair = mnesia_monitor:get_env(auto_repair),
504    Res =  % BUGBUG Fixa till dcl!
505        case mnesia_lib:dets_to_ets(schema, T, Fname, set, Repair, no) of
506            loaded -> {ok, disc, ?ets_lookup_element(T, schema, 3)};
507            Other -> {error, {"Cannot read schema", Fname, Other}}
508        end,
509    case Keep of
510        true -> ignore;
511        false -> ?ets_delete_table(T)
512    end,
513    Res.
514
515get_initial_schema(SchemaStorage, Nodes) ->
516    Cs = #cstruct{name = schema,
517		  record_name = schema,
518		  attributes = [table, cstruct]},
519    Cs2 =
520	case SchemaStorage of
521        ram_copies -> Cs#cstruct{ram_copies = Nodes};
522        disc_copies -> Cs#cstruct{disc_copies = Nodes}
523    end,
524    cs2list(Cs2).
525
526read_cstructs_from_disc() ->
527    %% Assumptions:
528    %% - local schema lock in global
529    %% - use_dir is true
530    %% - Mnesia is not running
531    %% - Ignore fallback
532
533    Fname = mnesia_lib:tab2dat(schema),
534    case mnesia_lib:exists(Fname) of
535	true ->
536	    Args = [{file, Fname},
537		    {keypos, 2},
538		    {repair, mnesia_monitor:get_env(auto_repair)},
539		    {type, set}],
540	    case dets:open_file(make_ref(), Args) of
541		{ok, Tab} ->
542		    Fun = fun({_, _, List}) ->
543				  {continue, list2cs(List)}
544			  end,
545		    Cstructs = dets:traverse(Tab, Fun),
546		    dets:close(Tab),
547		    {ok, Cstructs};
548		{error, Reason} ->
549		    {error, Reason}
550	    end;
551	false ->
552	    {error, "No schema file exists"}
553    end.
554
555%% We run a very special type of transactions when we
556%% we want to manipulate the schema.
557
558get_tid_ts_and_lock(Tab, Intent) ->
559    TidTs = get(mnesia_activity_state),
560    case TidTs of
561	{_Mod, Tid, Ts} when record(Ts, tidstore)->
562	    Store = Ts#tidstore.store,
563	    case Intent of
564		read -> mnesia_locker:rlock_table(Tid, Store, Tab);
565		write -> mnesia_locker:wlock_table(Tid, Store, Tab);
566		none -> ignore
567	    end,
568	    TidTs;
569	_ ->
570	    mnesia:abort(no_transaction)
571    end.
572
573schema_transaction(Fun) ->
574    case get(mnesia_activity_state) of
575	undefined ->
576	    Args = [self(), Fun, whereis(mnesia_controller)],
577	    Pid = spawn_link(?MODULE, schema_coordinator, Args),
578	    receive
579		{transaction_done, Res, Pid} -> Res;
580		{'EXIT', Pid, R} -> {aborted, {transaction_crashed, R}}
581	    end;
582	_ ->
583            {aborted, nested_transaction}
584    end.
585
586%% This process may dump the transaction log, and should
587%% therefore not be run in an application process
588%%
589schema_coordinator(Client, _Fun, undefined) ->
590    Res = {aborted, {node_not_running, node()}},
591    Client ! {transaction_done, Res, self()},
592    unlink(Client);
593
594schema_coordinator(Client, Fun, Controller) when pid(Controller) ->
595    %% Do not trap exit in order to automatically die
596    %% when the controller dies
597
598    link(Controller),
599    unlink(Client),
600
601    %% Fulfull the transaction even if the client dies
602    Res = mnesia:transaction(Fun),
603    Client ! {transaction_done, Res, self()},
604    unlink(Controller),         % Avoids spurious exit message
605    unlink(whereis(mnesia_tm)), % Avoids spurious exit message
606    exit(normal).
607
608%% The make* rotines return a list of ops, this function
609%% inserts em all in the Store and maintains the local order
610%% of ops.
611
612insert_schema_ops({_Mod, _Tid, Ts}, SchemaIOps) ->
613    do_insert_schema_ops(Ts#tidstore.store, SchemaIOps).
614
615do_insert_schema_ops(Store, [Head | Tail]) ->
616    ?ets_insert(Store, Head),
617    do_insert_schema_ops(Store, Tail);
618do_insert_schema_ops(_Store, []) ->
619    ok.
620
621cs2list(Cs) when record(Cs, cstruct) ->
622    Tags = record_info(fields, cstruct),
623    rec2list(Tags, 2, Cs);
624cs2list(CreateList) when list(CreateList) ->
625    CreateList.
626
627rec2list([Tag | Tags], Pos, Rec) ->
628    Val = element(Pos, Rec),
629    [{Tag, Val} | rec2list(Tags, Pos + 1, Rec)];
630rec2list([], _Pos, _Rec) ->
631    [].
632
633list2cs(List) when list(List) ->
634    Name = pick(unknown, name, List, must),
635    Type = pick(Name, type, List, set),
636    Rc0 = pick(Name, ram_copies, List, []),
637    Dc = pick(Name, disc_copies, List, []),
638    Doc = pick(Name, disc_only_copies, List, []),
639    Rc = case {Rc0, Dc, Doc} of
640             {[], [], []} -> [node()];
641             _ -> Rc0
642         end,
643    LC = pick(Name, local_content, List, false),
644    RecName = pick(Name, record_name, List, Name),
645    Attrs = pick(Name, attributes, List, [key, val]),
646    Snmp = pick(Name, snmp, List, []),
647    LoadOrder = pick(Name, load_order, List, 0),
648    AccessMode = pick(Name, access_mode, List, read_write),
649    UserProps = pick(Name, user_properties, List, []),
650    verify({alt, [nil, list]}, mnesia_lib:etype(UserProps),
651	   {bad_type, Name, {user_properties, UserProps}}),
652    Cookie = pick(Name, cookie, List, ?unique_cookie),
653    Version = pick(Name, version, List, {{2, 0}, []}),
654    Ix = pick(Name, index, List, []),
655    verify({alt, [nil, list]}, mnesia_lib:etype(Ix),
656	   {bad_type, Name, {index, [Ix]}}),
657    Ix2 = [attr_to_pos(I, Attrs) || I <- Ix],
658
659    Frag = pick(Name, frag_properties, List, []),
660    verify({alt, [nil, list]}, mnesia_lib:etype(Frag),
661	   {badarg, Name, {frag_properties, Frag}}),
662
663    Keys = check_keys(Name, List, record_info(fields, cstruct)),
664    check_duplicates(Name, Keys),
665    #cstruct{name = Name,
666             ram_copies = Rc,
667             disc_copies = Dc,
668             disc_only_copies = Doc,
669             type = Type,
670             index = Ix2,
671             snmp = Snmp,
672             load_order = LoadOrder,
673             access_mode = AccessMode,
674             local_content = LC,
675	     record_name = RecName,
676             attributes = Attrs,
677             user_properties = lists:sort(UserProps),
678	     frag_properties = lists:sort(Frag),
679             cookie = Cookie,
680             version = Version};
681list2cs(Other) ->
682    mnesia:abort({badarg, Other}).
683
684pick(Tab, Key, List, Default) ->
685    case lists:keysearch(Key, 1, List) of
686        false  when Default == must ->
687            mnesia:abort({badarg, Tab, "Missing key", Key, List});
688        false ->
689            Default;
690        {value, {Key, Value}} ->
691            Value;
692	{value, BadArg} ->
693	    mnesia:abort({bad_type, Tab, BadArg})
694    end.
695
696%% Convert attribute name to integer if neccessary
697attr_tab_to_pos(_Tab, Pos) when integer(Pos) ->
698    Pos;
699attr_tab_to_pos(Tab, Attr) ->
700    attr_to_pos(Attr, val({Tab, attributes})).
701
702%% Convert attribute name to integer if neccessary
703attr_to_pos(Pos, _Attrs) when integer(Pos) ->
704    Pos;
705attr_to_pos(Attr, Attrs) when atom(Attr) ->
706    attr_to_pos(Attr, Attrs, 2);
707attr_to_pos(Attr, _) ->
708    mnesia:abort({bad_type, Attr}).
709
710attr_to_pos(Attr, [Attr | _Attrs], Pos) ->
711    Pos;
712attr_to_pos(Attr, [_ | Attrs], Pos) ->
713    attr_to_pos(Attr, Attrs, Pos + 1);
714attr_to_pos(Attr, _, _) ->
715    mnesia:abort({bad_type, Attr}).
716
717check_keys(Tab, [{Key, _Val} | Tail], Items) ->
718    case lists:member(Key, Items) of
719        true ->  [Key | check_keys(Tab, Tail, Items)];
720        false -> mnesia:abort({badarg, Tab, Key})
721    end;
722check_keys(_, [], _) ->
723    [];
724check_keys(Tab, Arg, _) ->
725    mnesia:abort({badarg, Tab, Arg}).
726
727check_duplicates(Tab, Keys) ->
728    case has_duplicates(Keys) of
729        false -> ok;
730        true -> mnesia:abort({badarg, Tab, "Duplicate keys", Keys})
731    end.
732
733has_duplicates([H | T]) ->
734    case lists:member(H, T) of
735        true -> true;
736        false -> has_duplicates(T)
737    end;
738has_duplicates([]) ->
739    false.
740
741%% This is the only place where we check the validity of data
742verify_cstruct(Cs) when record(Cs, cstruct) ->
743    verify_nodes(Cs),
744
745    Tab = Cs#cstruct.name,
746    verify(atom, mnesia_lib:etype(Tab), {bad_type, Tab}),
747    Type = Cs#cstruct.type,
748    verify(true, lists:member(Type, [set, bag, ordered_set]),
749	   {bad_type, Tab, {type, Type}}),
750
751    %% Currently ordered_set is not supported for disk_only_copies.
752    if
753	Type == ordered_set, Cs#cstruct.disc_only_copies /= [] ->
754	    mnesia:abort({bad_type, Tab, {not_supported, Type, disc_only_copies}});
755	true ->
756	    ok
757    end,
758
759    RecName = Cs#cstruct.record_name,
760    verify(atom, mnesia_lib:etype(RecName),
761	   {bad_type, Tab, {record_name, RecName}}),
762
763    Attrs = Cs#cstruct.attributes,
764    verify(list, mnesia_lib:etype(Attrs),
765	   {bad_type, Tab, {attributes, Attrs}}),
766
767    Arity = length(Attrs) + 1,
768    verify(true, Arity > 2, {bad_type, Tab, {attributes, Attrs}}),
769
770    lists:foldl(fun(Attr,_Other) when Attr == snmp ->
771                        mnesia:abort({bad_type, Tab, {attributes, [Attr]}});
772                   (Attr,Other) ->
773                        verify(atom, mnesia_lib:etype(Attr),
774                               {bad_type, Tab, {attributes, [Attr]}}),
775                        verify(false, lists:member(Attr, Other),
776                               {combine_error, Tab, {attributes, [Attr | Other]}}),
777                        [Attr | Other]
778                end,
779                [],
780                Attrs),
781
782    Index = Cs#cstruct.index,
783    verify({alt, [nil, list]}, mnesia_lib:etype(Index),
784	   {bad_type, Tab, {index, Index}}),
785
786    IxFun =
787        fun(Pos) ->
788                verify(true, fun() ->
789                                     if
790					 integer(Pos),
791                                         Pos > 2,
792                                         Pos =< Arity ->
793                                             true;
794                                         true -> false
795                                     end
796                             end,
797                       {bad_type, Tab, {index, [Pos]}})
798        end,
799    lists:foreach(IxFun, Index),
800
801    LC = Cs#cstruct.local_content,
802    verify({alt, [true, false]}, LC,
803	   {bad_type, Tab, {local_content, LC}}),
804    Access = Cs#cstruct.access_mode,
805    verify({alt, [read_write, read_only]}, Access,
806	   {bad_type, Tab, {access_mode, Access}}),
807
808    Snmp = Cs#cstruct.snmp,
809    verify(true, mnesia_snmp_hook:check_ustruct(Snmp),
810	   {badarg, Tab, {snmp, Snmp}}),
811
812    CheckProp = fun(Prop) when tuple(Prop), size(Prop) >= 1 -> ok;
813		   (Prop) -> mnesia:abort({bad_type, Tab, {user_properties, [Prop]}})
814		end,
815    lists:foreach(CheckProp, Cs#cstruct.user_properties),
816
817    case Cs#cstruct.cookie of
818	{{MegaSecs, Secs, MicroSecs}, _Node}
819	when integer(MegaSecs), integer(Secs),
820	     integer(MicroSecs), atom(node) ->
821            ok;
822        Cookie ->
823            mnesia:abort({bad_type, Tab, {cookie, Cookie}})
824    end,
825    case Cs#cstruct.version of
826        {{Major, Minor}, _Detail}
827                when integer(Major), integer(Minor) ->
828            ok;
829        Version ->
830            mnesia:abort({bad_type, Tab, {version, Version}})
831    end.
832
833verify_nodes(Cs) ->
834    Tab = Cs#cstruct.name,
835    Ram = Cs#cstruct.ram_copies,
836    Disc = Cs#cstruct.disc_copies,
837    DiscOnly = Cs#cstruct.disc_only_copies,
838    LoadOrder = Cs#cstruct.load_order,
839
840    verify({alt, [nil, list]}, mnesia_lib:etype(Ram),
841	   {bad_type, Tab, {ram_copies, Ram}}),
842    verify({alt, [nil, list]}, mnesia_lib:etype(Disc),
843	   {bad_type, Tab, {disc_copies, Disc}}),
844    case Tab of
845	schema ->
846	    verify([], DiscOnly, {bad_type, Tab, {disc_only_copies, DiscOnly}});
847	_ ->
848	    verify({alt, [nil, list]},
849		   mnesia_lib:etype(DiscOnly),
850		   {bad_type, Tab, {disc_only_copies, DiscOnly}})
851    end,
852    verify(integer, mnesia_lib:etype(LoadOrder),
853	   {bad_type, Tab, {load_order, LoadOrder}}),
854
855    Nodes = Ram ++ Disc ++ DiscOnly,
856    verify(list, mnesia_lib:etype(Nodes),
857	   {combine_error, Tab,
858	    [{ram_copies, []}, {disc_copies, []}, {disc_only_copies, []}]}),
859    verify(false, has_duplicates(Nodes), {combine_error, Tab, Nodes}),
860    AtomCheck = fun(N) -> verify(atom, mnesia_lib:etype(N), {bad_type, Tab, N}) end,
861    lists:foreach(AtomCheck, Nodes).
862
863verify(Expected, Fun, Error) when function(Fun) ->
864    do_verify(Expected, catch Fun(), Error);
865verify(Expected, Actual, Error) ->
866    do_verify(Expected, Actual, Error).
867
868do_verify({alt, Values}, Value, Error) ->
869    case lists:member(Value, Values) of
870        true -> ok;
871        false -> mnesia:abort(Error)
872    end;
873do_verify(Value, Value, _) ->
874    ok;
875do_verify(_Value, _, Error) ->
876     mnesia:abort(Error).
877
878ensure_writable(Tab) ->
879    case val({Tab, where_to_write}) of
880        [] -> mnesia:abort({read_only, Tab});
881        _ -> ok
882    end.
883
884%% Ensure that all replicas on disk full nodes are active
885ensure_active(Cs) ->
886    ensure_active(Cs, active_replicas).
887
888ensure_active(Cs, What) ->
889    Tab = Cs#cstruct.name,
890    case val({Tab, What}) of
891        [] -> mnesia:abort({no_exists, Tab});
892        _ -> ok
893    end,
894    Nodes = mnesia_lib:intersect(val({schema, disc_copies}),
895                                 mnesia_lib:cs_to_nodes(Cs)),
896    W = {Tab, What},
897    case Nodes -- val(W) of
898        [] ->
899            ok;
900        Ns ->
901            Expl = "All replicas on diskfull nodes are not active yet",
902            case val({Tab, local_content}) of
903                true ->
904		    case rpc:multicall(Ns, ?MODULE, is_remote_member, [W]) of
905			{Replies, []} ->
906			    check_active(Replies, Expl, Tab);
907			{_Replies, BadNs} ->
908			    mnesia:abort({not_active, Expl, Tab, BadNs})
909                    end;
910                false ->
911                    mnesia:abort({not_active, Expl, Tab, Ns})
912            end
913    end.
914
915ensure_not_active(schema, Node) ->
916    case lists:member(Node, val({schema, active_replicas})) of
917	false ->
918	    ok;
919	true ->
920	    Expl = "Mnesia is running",
921	    mnesia:abort({active, Expl, Node})
922    end.
923
924is_remote_member(Key) ->
925    IsActive = lists:member(node(), val(Key)),
926    {IsActive, node()}.
927
928check_active([{true, _Node} | Replies], Expl, Tab) ->
929    check_active(Replies, Expl, Tab);
930check_active([{false, Node} | _Replies], Expl, Tab) ->
931    mnesia:abort({not_active, Expl, Tab, [Node]});
932check_active([{badrpc, Reason} | _Replies], Expl, Tab) ->
933    mnesia:abort({not_active, Expl, Tab, Reason});
934check_active([], _Expl, _Tab) ->
935    ok.
936
937%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
938%% Here's the real interface function to create a table
939
940create_table(TabDef) ->
941    schema_transaction(fun() -> do_multi_create_table(TabDef) end).
942
943%% And the corresponding do routines ....
944
945do_multi_create_table(TabDef) ->
946    get_tid_ts_and_lock(schema, write),
947    ensure_writable(schema),
948    Cs = list2cs(TabDef),
949    case Cs#cstruct.frag_properties of
950	[] ->
951	    do_create_table(Cs);
952	_Props ->
953	    CsList = mnesia_frag:expand_cstruct(Cs),
954	    lists:foreach(fun do_create_table/1, CsList)
955    end,
956    ok.
957
958do_create_table(Cs) ->
959    {_Mod, _Tid, Ts} =  get_tid_ts_and_lock(schema, none),
960    Store = Ts#tidstore.store,
961    do_insert_schema_ops(Store, make_create_table(Cs)).
962
963make_create_table(Cs) ->
964    Tab = Cs#cstruct.name,
965    verify('EXIT', element(1, ?catch_val({Tab, cstruct})),
966	   {already_exists, Tab}),
967    unsafe_make_create_table(Cs).
968
969% unsafe_do_create_table(Cs) ->
970%     {_Mod, Tid, Ts} =  get_tid_ts_and_lock(schema, none),
971%     Store = Ts#tidstore.store,
972%     do_insert_schema_ops(Store, unsafe_make_create_table(Cs)).
973
974unsafe_make_create_table(Cs) ->
975    {_Mod, Tid, Ts} =  get_tid_ts_and_lock(schema, none),
976    verify_cstruct(Cs),
977    Tab = Cs#cstruct.name,
978
979    %% Check that we have all disc replica nodes running
980    DiscNodes = Cs#cstruct.disc_copies ++ Cs#cstruct.disc_only_copies,
981    RunningNodes = val({current, db_nodes}),
982    CheckDisc = fun(N) ->
983			verify(true, lists:member(N, RunningNodes),
984			       {not_active, Tab, N})
985		end,
986    lists:foreach(CheckDisc, DiscNodes),
987
988    Nodes = mnesia_lib:intersect(mnesia_lib:cs_to_nodes(Cs), RunningNodes),
989    Store = Ts#tidstore.store,
990    mnesia_locker:wlock_no_exist(Tid, Store, Tab, Nodes),
991    [{op, create_table, cs2list(Cs)}].
992
993%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
994%% Delete a table entirely on all nodes.
995
996delete_table(Tab) ->
997    schema_transaction(fun() -> do_delete_table(Tab) end).
998
999do_delete_table(schema) ->
1000    mnesia:abort({bad_type, schema});
1001do_delete_table(Tab) ->
1002    TidTs = get_tid_ts_and_lock(schema, write),
1003    ensure_writable(schema),
1004    insert_schema_ops(TidTs, make_delete_table(Tab, whole_table)).
1005
1006make_delete_table(Tab, Mode) ->
1007    case Mode of
1008	whole_table ->
1009	    case val({Tab, frag_properties}) of
1010		[] ->
1011		    [make_delete_table2(Tab)];
1012		_Props ->
1013		    %% Check if it is a base table
1014		    mnesia_frag:lookup_frag_hash(Tab),
1015
1016		    %% Check for foreigners
1017		    F = mnesia_frag:lookup_foreigners(Tab),
1018		    verify([], F, {combine_error, Tab, "Too many foreigners", F}),
1019		    [make_delete_table2(T) || T <- mnesia_frag:frag_names(Tab)]
1020	    end;
1021	single_frag ->
1022	    [make_delete_table2(Tab)]
1023    end.
1024
1025make_delete_table2(Tab) ->
1026    get_tid_ts_and_lock(Tab, write),
1027    Cs = val({Tab, cstruct}),
1028    ensure_active(Cs),
1029    ensure_writable(Tab),
1030    {op, delete_table, cs2list(Cs)}.
1031
1032%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1033%% Change fragmentation of a table
1034
1035change_table_frag(Tab, Change) ->
1036    schema_transaction(fun() -> do_change_table_frag(Tab, Change) end).
1037
1038do_change_table_frag(Tab, Change) when atom(Tab), Tab /= schema ->
1039    TidTs = get_tid_ts_and_lock(schema, write),
1040    Ops = mnesia_frag:change_table_frag(Tab, Change),
1041    [insert_schema_ops(TidTs, Op) || Op <- Ops],
1042    ok;
1043do_change_table_frag(Tab, _Change) ->
1044    mnesia:abort({bad_type, Tab}).
1045
1046%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1047%% Clear a table
1048
1049clear_table(Tab) ->
1050    schema_transaction(fun() -> do_clear_table(Tab) end).
1051
1052do_clear_table(schema) ->
1053    mnesia:abort({bad_type, schema});
1054do_clear_table(Tab) ->
1055    TidTs = get_tid_ts_and_lock(schema, write),
1056    get_tid_ts_and_lock(Tab, write),
1057    insert_schema_ops(TidTs, make_clear_table(Tab)).
1058
1059make_clear_table(Tab) ->
1060    ensure_writable(schema),
1061    Cs = val({Tab, cstruct}),
1062    ensure_active(Cs),
1063    ensure_writable(Tab),
1064    [{op, clear_table, cs2list(Cs)}].
1065
1066%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1067
1068add_table_copy(Tab, Node, Storage) ->
1069    schema_transaction(fun() -> do_add_table_copy(Tab, Node, Storage) end).
1070
1071do_add_table_copy(Tab, Node, Storage) when atom(Tab), atom(Node) ->
1072    TidTs = get_tid_ts_and_lock(schema, write),
1073    insert_schema_ops(TidTs, make_add_table_copy(Tab, Node, Storage));
1074do_add_table_copy(Tab,Node,_) ->
1075    mnesia:abort({badarg, Tab, Node}).
1076
1077make_add_table_copy(Tab, Node, Storage) ->
1078    ensure_writable(schema),
1079    Cs = incr_version(val({Tab, cstruct})),
1080    Ns = mnesia_lib:cs_to_nodes(Cs),
1081    verify(false, lists:member(Node, Ns), {already_exists, Tab, Node}),
1082    Cs2 = new_cs(Cs, Node, Storage, add),
1083    verify_cstruct(Cs2),
1084
1085    %% Check storage and if node is running
1086    IsRunning = lists:member(Node, val({current, db_nodes})),
1087    if
1088	Storage == unknown ->
1089	    mnesia:abort({badarg, Tab, Storage});
1090	Tab == schema ->
1091	    if
1092		Storage /= ram_copies ->
1093		    mnesia:abort({badarg, Tab, Storage});
1094		IsRunning == true ->
1095		    mnesia:abort({already_exists, Tab, Node});
1096		true ->
1097		    ignore
1098	    end;
1099	Storage == ram_copies ->
1100	    ignore;
1101	IsRunning == true ->
1102	    ignore;
1103	IsRunning == false ->
1104	    mnesia:abort({not_active, schema, Node})
1105    end,
1106    [{op, add_table_copy, Storage, Node, cs2list(Cs2)}].
1107
1108del_table_copy(Tab, Node) ->
1109    schema_transaction(fun() -> do_del_table_copy(Tab, Node) end).
1110
1111do_del_table_copy(Tab, Node) when atom(Node)  ->
1112    TidTs = get_tid_ts_and_lock(schema, write),
1113%%    get_tid_ts_and_lock(Tab, write),
1114    insert_schema_ops(TidTs, make_del_table_copy(Tab, Node));
1115do_del_table_copy(Tab, Node) ->
1116    mnesia:abort({badarg, Tab, Node}).
1117
1118make_del_table_copy(Tab, Node) ->
1119    ensure_writable(schema),
1120    Cs = incr_version(val({Tab, cstruct})),
1121    Storage = mnesia_lib:schema_cs_to_storage_type(Node, Cs),
1122    Cs2 = new_cs(Cs, Node, Storage, del),
1123    case mnesia_lib:cs_to_nodes(Cs2) of
1124        [] when Tab == schema ->
1125            mnesia:abort({combine_error, Tab, "Last replica"});
1126        [] ->
1127	    ensure_active(Cs),
1128            dbg_out("Last replica deleted in table ~p~n",  [Tab]),
1129            make_delete_table(Tab,  whole_table);
1130        _ when Tab == schema ->
1131	    ensure_active(Cs2),
1132	    ensure_not_active(Tab, Node),
1133            verify_cstruct(Cs2),
1134	    Ops = remove_node_from_tabs(val({schema, tables}), Node),
1135	    [{op, del_table_copy, ram_copies, Node, cs2list(Cs2)} | Ops];
1136        _ ->
1137	    ensure_active(Cs),
1138            verify_cstruct(Cs2),
1139            [{op, del_table_copy, Storage, Node, cs2list(Cs2)}]
1140    end.
1141
1142remove_node_from_tabs([], _Node) ->
1143    [];
1144remove_node_from_tabs([schema|Rest], Node) ->
1145    remove_node_from_tabs(Rest, Node);
1146remove_node_from_tabs([Tab|Rest], Node) ->
1147    {Cs, IsFragModified} =
1148	mnesia_frag:remove_node(Node, incr_version(val({Tab, cstruct}))),
1149    case mnesia_lib:schema_cs_to_storage_type(Node, Cs)  of
1150	unknown ->
1151	    case IsFragModified of
1152		true ->
1153		    [{op, change_table_frag, {del_node, Node}, cs2list(Cs)} |
1154		     remove_node_from_tabs(Rest, Node)];
1155		false ->
1156		    remove_node_from_tabs(Rest, Node)
1157	    end;
1158	Storage ->
1159	    Cs2 = new_cs(Cs, Node, Storage, del),
1160	    case mnesia_lib:cs_to_nodes(Cs2) of
1161		[] ->
1162		    [{op, delete_table, cs2list(Cs)} |
1163		     remove_node_from_tabs(Rest, Node)];
1164		_Ns ->
1165		    verify_cstruct(Cs2),
1166		    [{op, del_table_copy, ram_copies, Node, cs2list(Cs2)}|
1167		     remove_node_from_tabs(Rest, Node)]
1168	    end
1169    end.
1170
1171new_cs(Cs, Node, ram_copies, add) ->
1172    Cs#cstruct{ram_copies = opt_add(Node, Cs#cstruct.ram_copies)};
1173new_cs(Cs, Node, disc_copies, add) ->
1174    Cs#cstruct{disc_copies = opt_add(Node, Cs#cstruct.disc_copies)};
1175new_cs(Cs, Node, disc_only_copies, add) ->
1176    Cs#cstruct{disc_only_copies = opt_add(Node, Cs#cstruct.disc_only_copies)};
1177new_cs(Cs, Node, ram_copies, del) ->
1178    Cs#cstruct{ram_copies = lists:delete(Node , Cs#cstruct.ram_copies)};
1179new_cs(Cs, Node, disc_copies, del) ->
1180    Cs#cstruct{disc_copies = lists:delete(Node , Cs#cstruct.disc_copies)};
1181new_cs(Cs, Node, disc_only_copies, del) ->
1182    Cs#cstruct{disc_only_copies =
1183               lists:delete(Node , Cs#cstruct.disc_only_copies)};
1184new_cs(Cs, _Node, Storage, _Op) ->
1185    mnesia:abort({badarg, Cs#cstruct.name, Storage}).
1186
1187
1188opt_add(N, L) -> [N | lists:delete(N, L)].
1189
1190move_table(Tab, FromNode, ToNode) ->
1191    schema_transaction(fun() -> do_move_table(Tab, FromNode, ToNode) end).
1192
1193do_move_table(schema, _FromNode, _ToNode) ->
1194    mnesia:abort({bad_type, schema});
1195do_move_table(Tab, FromNode, ToNode) when atom(FromNode), atom(ToNode) ->
1196    TidTs = get_tid_ts_and_lock(schema, write),
1197    insert_schema_ops(TidTs, make_move_table(Tab, FromNode, ToNode));
1198do_move_table(Tab, FromNode, ToNode) ->
1199    mnesia:abort({badarg, Tab, FromNode, ToNode}).
1200
1201make_move_table(Tab, FromNode, ToNode) ->
1202    ensure_writable(schema),
1203    Cs = incr_version(val({Tab, cstruct})),
1204    Ns = mnesia_lib:cs_to_nodes(Cs),
1205    verify(false, lists:member(ToNode, Ns), {already_exists, Tab, ToNode}),
1206    verify(true, lists:member(FromNode, val({Tab, where_to_write})),
1207           {not_active, Tab, FromNode}),
1208    verify(false, val({Tab,local_content}),
1209           {"Cannot move table with local content", Tab}),
1210    ensure_active(Cs),
1211    Running = val({current, db_nodes}),
1212    Storage = mnesia_lib:schema_cs_to_storage_type(FromNode, Cs),
1213    verify(true, lists:member(ToNode, Running), {not_active, schema, ToNode}),
1214
1215    Cs2 = new_cs(Cs, ToNode, Storage, add),
1216    Cs3 = new_cs(Cs2, FromNode, Storage, del),
1217    verify_cstruct(Cs3),
1218    [{op, add_table_copy, Storage, ToNode, cs2list(Cs2)},
1219     {op, sync_trans},
1220     {op, del_table_copy, Storage, FromNode, cs2list(Cs3)}].
1221
1222%% end of functions to add and delete nodes to tables
1223%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1224%%
1225
1226change_table_copy_type(Tab, Node, ToS) ->
1227    schema_transaction(fun() -> do_change_table_copy_type(Tab, Node, ToS) end).
1228
1229do_change_table_copy_type(Tab, Node, ToS) when atom(Node) ->
1230    TidTs = get_tid_ts_and_lock(schema, write),
1231    get_tid_ts_and_lock(Tab, write), % ensure global sync
1232    %% get_tid_ts_and_lock(Tab, read),
1233    insert_schema_ops(TidTs, make_change_table_copy_type(Tab, Node, ToS));
1234do_change_table_copy_type(Tab, Node, _ToS) ->
1235    mnesia:abort({badarg, Tab, Node}).
1236
1237make_change_table_copy_type(Tab, Node, unknown) ->
1238    make_del_table_copy(Tab, Node);
1239make_change_table_copy_type(Tab, Node, ToS) ->
1240    ensure_writable(schema),
1241    Cs = incr_version(val({Tab, cstruct})),
1242    FromS = mnesia_lib:storage_type_at_node(Node, Tab),
1243
1244    case compare_storage_type(false, FromS, ToS) of
1245	{same, _} ->
1246	    mnesia:abort({already_exists, Tab, Node, ToS});
1247	{diff, _} ->
1248	    ignore;
1249	incompatible ->
1250	    ensure_active(Cs)
1251    end,
1252
1253    Cs2 = new_cs(Cs, Node, FromS, del),
1254    Cs3 = new_cs(Cs2, Node, ToS, add),
1255    verify_cstruct(Cs3),
1256
1257    if
1258	FromS == unknown ->
1259	    make_add_table_copy(Tab, Node, ToS);
1260	true ->
1261	    ignore
1262    end,
1263
1264    [{op, change_table_copy_type, Node, FromS, ToS, cs2list(Cs3)}].
1265
1266%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1267%% change index functions ....
1268%% Pos is already added by 1 in both of these functions
1269
1270add_table_index(Tab, Pos) ->
1271    schema_transaction(fun() -> do_add_table_index(Tab, Pos) end).
1272
1273do_add_table_index(schema, _Attr) ->
1274    mnesia:abort({bad_type, schema});
1275do_add_table_index(Tab, Attr) ->
1276    TidTs = get_tid_ts_and_lock(schema, write),
1277    get_tid_ts_and_lock(Tab, read),
1278    Pos = attr_tab_to_pos(Tab, Attr),
1279    insert_schema_ops(TidTs, make_add_table_index(Tab, Pos)).
1280
1281make_add_table_index(Tab, Pos) ->
1282    ensure_writable(schema),
1283    Cs = incr_version(val({Tab, cstruct})),
1284    ensure_active(Cs),
1285    Ix = Cs#cstruct.index,
1286    verify(false, lists:member(Pos, Ix), {already_exists, Tab, Pos}),
1287    Ix2 = lists:sort([Pos | Ix]),
1288    Cs2 = Cs#cstruct{index = Ix2},
1289    verify_cstruct(Cs2),
1290    [{op, add_index, Pos, cs2list(Cs2)}].
1291
1292del_table_index(Tab, Pos) ->
1293    schema_transaction(fun() -> do_del_table_index(Tab, Pos) end).
1294
1295do_del_table_index(schema, _Attr) ->
1296    mnesia:abort({bad_type, schema});
1297do_del_table_index(Tab, Attr) ->
1298    TidTs = get_tid_ts_and_lock(schema, write),
1299    get_tid_ts_and_lock(Tab, read),
1300    Pos = attr_tab_to_pos(Tab, Attr),
1301    insert_schema_ops(TidTs, make_del_table_index(Tab, Pos)).
1302
1303make_del_table_index(Tab, Pos) ->
1304    ensure_writable(schema),
1305    Cs = incr_version(val({Tab, cstruct})),
1306    ensure_active(Cs),
1307    Ix = Cs#cstruct.index,
1308    verify(true, lists:member(Pos, Ix), {no_exists, Tab, Pos}),
1309    Cs2 = Cs#cstruct{index = lists:delete(Pos, Ix)},
1310    verify_cstruct(Cs2),
1311    [{op, del_index, Pos, cs2list(Cs2)}].
1312
1313%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1314
1315add_snmp(Tab, Ustruct) ->
1316    schema_transaction(fun() -> do_add_snmp(Tab, Ustruct) end).
1317
1318do_add_snmp(schema, _Ustruct) ->
1319    mnesia:abort({bad_type, schema});
1320do_add_snmp(Tab, Ustruct) ->
1321    TidTs = get_tid_ts_and_lock(schema, write),
1322    get_tid_ts_and_lock(Tab, read),
1323    insert_schema_ops(TidTs, make_add_snmp(Tab, Ustruct)).
1324
1325make_add_snmp(Tab, Ustruct) ->
1326    ensure_writable(schema),
1327    Cs = incr_version(val({Tab, cstruct})),
1328    ensure_active(Cs),
1329    verify([], Cs#cstruct.snmp, {already_exists, Tab, snmp}),
1330    Error = {badarg, Tab, snmp, Ustruct},
1331    verify(true, mnesia_snmp_hook:check_ustruct(Ustruct), Error),
1332    Cs2 = Cs#cstruct{snmp = Ustruct},
1333    verify_cstruct(Cs2),
1334    [{op, add_snmp, Ustruct, cs2list(Cs2)}].
1335
1336del_snmp(Tab) ->
1337    schema_transaction(fun() -> do_del_snmp(Tab) end).
1338
1339do_del_snmp(schema) ->
1340    mnesia:abort({bad_type, schema});
1341do_del_snmp(Tab) ->
1342    TidTs = get_tid_ts_and_lock(schema, write),
1343    get_tid_ts_and_lock(Tab, read),
1344    insert_schema_ops(TidTs, make_del_snmp(Tab)).
1345
1346make_del_snmp(Tab) ->
1347    ensure_writable(schema),
1348    Cs = incr_version(val({Tab, cstruct})),
1349    ensure_active(Cs),
1350    Cs2 = Cs#cstruct{snmp = []},
1351    verify_cstruct(Cs2),
1352    [{op, del_snmp, cs2list(Cs2)}].
1353
1354%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1355%%
1356
1357transform_table(Tab, Fun, NewAttrs, NewRecName)
1358  when function(Fun), list(NewAttrs), atom(NewRecName) ->
1359    schema_transaction(fun() -> do_transform_table(Tab, Fun, NewAttrs, NewRecName) end);
1360
1361transform_table(Tab, ignore, NewAttrs, NewRecName)
1362  when list(NewAttrs), atom(NewRecName) ->
1363    schema_transaction(fun() -> do_transform_table(Tab, ignore, NewAttrs, NewRecName) end);
1364
1365transform_table(Tab, Fun, NewAttrs, NewRecName) ->
1366    {aborted,{bad_type, Tab, Fun, NewAttrs, NewRecName}}.
1367
1368do_transform_table(schema, _Fun, _NewAttrs, _NewRecName) ->
1369    mnesia:abort({bad_type, schema});
1370do_transform_table(Tab, Fun, NewAttrs, NewRecName) ->
1371    TidTs = get_tid_ts_and_lock(schema, write),
1372    get_tid_ts_and_lock(Tab, write),
1373    insert_schema_ops(TidTs, make_transform(Tab, Fun, NewAttrs, NewRecName)).
1374
1375make_transform(Tab, Fun, NewAttrs, NewRecName) ->
1376    ensure_writable(schema),
1377    Cs = incr_version(val({Tab, cstruct})),
1378    ensure_active(Cs),
1379    ensure_writable(Tab),
1380    case mnesia_lib:val({Tab, index}) of
1381	[] ->
1382	    Cs2 = Cs#cstruct{attributes = NewAttrs, record_name = NewRecName},
1383	    verify_cstruct(Cs2),
1384	    [{op, transform, Fun, cs2list(Cs2)}];
1385	PosList ->
1386	    DelIdx = fun(Pos, Ncs) ->
1387			     Ix = Ncs#cstruct.index,
1388			     Ncs1 = Ncs#cstruct{index = lists:delete(Pos, Ix)},
1389			     Op = {op, del_index, Pos, cs2list(Ncs1)},
1390			     {Op, Ncs1}
1391		     end,
1392	    AddIdx = fun(Pos, Ncs) ->
1393			     Ix = Ncs#cstruct.index,
1394			     Ix2 = lists:sort([Pos | Ix]),
1395			     Ncs1 = Ncs#cstruct{index = Ix2},
1396			     Op = {op, add_index, Pos, cs2list(Ncs1)},
1397			     {Op, Ncs1}
1398		     end,
1399            {DelOps, Cs1} = lists:mapfoldl(DelIdx, Cs, PosList),
1400	    Cs2 = Cs1#cstruct{attributes = NewAttrs, record_name = NewRecName},
1401            {AddOps, Cs3} = lists:mapfoldl(AddIdx, Cs2, PosList),
1402	    verify_cstruct(Cs3),
1403	    lists:flatten([DelOps, {op, transform, Fun, cs2list(Cs2)}, AddOps])
1404    end.
1405
1406%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1407%%
1408
1409change_table_access_mode(Tab, Mode) ->
1410    schema_transaction(fun() -> do_change_table_access_mode(Tab, Mode) end).
1411
1412do_change_table_access_mode(Tab, Mode) ->
1413    {_Mod, Tid, Ts} = get_tid_ts_and_lock(schema, write),
1414    Store = Ts#tidstore.store,
1415    mnesia_locker:wlock_no_exist(Tid, Store, schema, val({schema, active_replicas})),
1416    mnesia_locker:wlock_no_exist(Tid, Store, Tab, val({Tab, active_replicas})),
1417    do_insert_schema_ops(Store, make_change_table_access_mode(Tab, Mode)).
1418
1419make_change_table_access_mode(Tab, Mode) ->
1420    ensure_writable(schema),
1421    Cs = incr_version(val({Tab, cstruct})),
1422    ensure_active(Cs),
1423    OldMode = Cs#cstruct.access_mode,
1424    verify(false, OldMode ==  Mode, {already_exists, Tab, Mode}),
1425    Cs2 = Cs#cstruct{access_mode = Mode},
1426    verify_cstruct(Cs2),
1427    [{op, change_table_access_mode, cs2list(Cs2), OldMode, Mode}].
1428
1429%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1430
1431change_table_load_order(Tab, LoadOrder) ->
1432    schema_transaction(fun() -> do_change_table_load_order(Tab, LoadOrder) end).
1433
1434do_change_table_load_order(schema, _LoadOrder) ->
1435    mnesia:abort({bad_type, schema});
1436do_change_table_load_order(Tab, LoadOrder) ->
1437    TidTs = get_tid_ts_and_lock(schema, write),
1438    get_tid_ts_and_lock(Tab, none),
1439    insert_schema_ops(TidTs, make_change_table_load_order(Tab, LoadOrder)).
1440
1441make_change_table_load_order(Tab, LoadOrder) ->
1442    ensure_writable(schema),
1443    Cs = incr_version(val({Tab, cstruct})),
1444    ensure_active(Cs),
1445    OldLoadOrder = Cs#cstruct.load_order,
1446    Cs2 = Cs#cstruct{load_order = LoadOrder},
1447    verify_cstruct(Cs2),
1448    [{op, change_table_load_order, cs2list(Cs2), OldLoadOrder, LoadOrder}].
1449
1450%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1451
1452write_table_property(Tab, Prop) when tuple(Prop), size(Prop) >= 1 ->
1453    schema_transaction(fun() -> do_write_table_property(Tab, Prop) end);
1454write_table_property(Tab, Prop) ->
1455    {aborted, {bad_type, Tab, Prop}}.
1456do_write_table_property(Tab, Prop) ->
1457    TidTs = get_tid_ts_and_lock(schema, write),
1458    {_, _, Ts} = TidTs,
1459    Store = Ts#tidstore.store,
1460    case change_prop_in_existing_op(Tab, Prop, write_property, Store) of
1461	true ->
1462	    dbg_out("change_prop_in_existing_op"
1463		    "(~p,~p,write_property,Store) -> true~n",
1464		    [Tab,Prop]),
1465	    %% we have merged the table prop into the create_table op
1466	    ok;
1467	false ->
1468	    dbg_out("change_prop_in_existing_op"
1469		    "(~p,~p,write_property,Store) -> false~n",
1470		    [Tab,Prop]),
1471	    %% this must be an existing table
1472	    get_tid_ts_and_lock(Tab, none),
1473	    insert_schema_ops(TidTs, make_write_table_properties(Tab, [Prop]))
1474    end.
1475
1476make_write_table_properties(Tab, Props) ->
1477    ensure_writable(schema),
1478    Cs = incr_version(val({Tab, cstruct})),
1479    ensure_active(Cs),
1480    make_write_table_properties(Tab, Props, Cs).
1481
1482make_write_table_properties(Tab, [Prop | Props], Cs) ->
1483    OldProps = Cs#cstruct.user_properties,
1484    PropKey = element(1, Prop),
1485    DelProps = lists:keydelete(PropKey, 1, OldProps),
1486    MergedProps = lists:merge(DelProps, [Prop]),
1487    Cs2 = Cs#cstruct{user_properties = MergedProps},
1488    verify_cstruct(Cs2),
1489    [{op, write_property, cs2list(Cs2), Prop} |
1490     make_write_table_properties(Tab, Props, Cs2)];
1491make_write_table_properties(_Tab, [], _Cs) ->
1492    [].
1493
1494change_prop_in_existing_op(Tab, Prop, How, Store) ->
1495    Ops = ets:match_object(Store, '_'),
1496    case update_existing_op(Ops, Tab, Prop, How, []) of
1497	{true, Ops1} ->
1498	    ets:match_delete(Store, '_'),
1499	    [ets:insert(Store, Op) || Op <- Ops1],
1500	    true;
1501	false ->
1502	    false
1503    end.
1504
1505update_existing_op([{op, Op, L = [{name,Tab}|_], _OldProp}|Ops],
1506		   Tab, Prop, How, Acc) when Op == write_property;
1507					     Op == delete_property ->
1508    %% Apparently, mnesia_dumper doesn't care about OldProp here -- just L,
1509    %% so we will throw away OldProp (not that it matters...) and insert Prop.
1510    %% as element 3.
1511    L1 = insert_prop(Prop, L, How),
1512    NewOp = {op, How, L1, Prop},
1513    {true, lists:reverse(Acc) ++ [NewOp|Ops]};
1514update_existing_op([Op = {op, create_table, L}|Ops], Tab, Prop, How, Acc) ->
1515    case lists:keysearch(name, 1, L) of
1516	{value, {_, Tab}} ->
1517	    %% Tab is being created here -- insert Prop into L
1518	    L1 = insert_prop(Prop, L, How),
1519	    {true, lists:reverse(Acc) ++ [{op, create_table, L1}|Ops]};
1520	_ ->
1521	    update_existing_op(Ops, Tab, Prop, How, [Op|Acc])
1522    end;
1523update_existing_op([Op|Ops], Tab, Prop, How, Acc) ->
1524    update_existing_op(Ops, Tab, Prop, How, [Op|Acc]);
1525update_existing_op([], _, _, _, _) ->
1526    false.
1527
1528%% perhaps a misnomer. How could also be delete_property... never mind.
1529%% Returns the modified L.
1530insert_prop(Prop, L, How) ->
1531    Prev = find_props(L),
1532    MergedProps = merge_with_previous(How, Prop, Prev),
1533    replace_props(L, MergedProps).
1534
1535
1536find_props([{user_properties, P}|_]) -> P;
1537find_props([_H|T]) -> find_props(T).
1538%% we shouldn't reach []
1539
1540replace_props([{user_properties, _}|T], P) -> [{user_properties, P}|T];
1541replace_props([H|T], P) -> [H|replace_props(T, P)].
1542%% again, we shouldn't reach []
1543
1544merge_with_previous(write_property, Prop, Prev) ->
1545    Key = element(1, Prop),
1546    Prev1 = lists:keydelete(Key, 1, Prev),
1547    lists:sort([Prop|Prev1]);
1548merge_with_previous(delete_property, PropKey, Prev) ->
1549    lists:keydelete(PropKey, 1, Prev).
1550
1551delete_table_property(Tab, PropKey) ->
1552    schema_transaction(fun() -> do_delete_table_property(Tab, PropKey) end).
1553
1554do_delete_table_property(Tab, PropKey) ->
1555    TidTs = get_tid_ts_and_lock(schema, write),
1556    {_, _, Ts} = TidTs,
1557    Store = Ts#tidstore.store,
1558    case change_prop_in_existing_op(Tab, PropKey, delete_property, Store) of
1559	true ->
1560	    dbg_out("change_prop_in_existing_op"
1561		    "(~p,~p,delete_property,Store) -> true~n",
1562		    [Tab,PropKey]),
1563	    %% we have merged the table prop into the create_table op
1564	    ok;
1565	false ->
1566	    dbg_out("change_prop_in_existing_op"
1567		    "(~p,~p,delete_property,Store) -> false~n",
1568		    [Tab,PropKey]),
1569	    %% this must be an existing table
1570	    get_tid_ts_and_lock(Tab, none),
1571	    insert_schema_ops(TidTs,
1572			      make_delete_table_properties(Tab, [PropKey]))
1573    end.
1574
1575make_delete_table_properties(Tab, PropKeys) ->
1576    ensure_writable(schema),
1577    Cs = incr_version(val({Tab, cstruct})),
1578    ensure_active(Cs),
1579    make_delete_table_properties(Tab, PropKeys, Cs).
1580
1581make_delete_table_properties(Tab, [PropKey | PropKeys], Cs) ->
1582    OldProps = Cs#cstruct.user_properties,
1583    Props = lists:keydelete(PropKey, 1, OldProps),
1584    Cs2 = Cs#cstruct{user_properties = Props},
1585    verify_cstruct(Cs2),
1586    [{op, delete_property, cs2list(Cs2), PropKey} |
1587     make_delete_table_properties(Tab, PropKeys, Cs2)];
1588make_delete_table_properties(_Tab, [], _Cs) ->
1589    [].
1590
1591%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1592
1593%% Ensure that the transaction can be committed even
1594%% if the node crashes and Mnesia is restarted
1595prepare_commit(Tid, Commit, WaitFor) ->
1596    case Commit#commit.schema_ops of
1597	[] ->
1598	    {false, Commit, optional};
1599	OrigOps ->
1600	    {Modified, Ops, DumperMode} =
1601		prepare_ops(Tid, OrigOps, WaitFor, false, [], optional),
1602	    InitBy = schema_prepare,
1603	    GoodRes = {Modified,
1604		       Commit#commit{schema_ops = lists:reverse(Ops)},
1605		       DumperMode},
1606	    case DumperMode of
1607		optional ->
1608		    dbg_out("Transaction log dump skipped (~p): ~w~n",
1609			    [DumperMode, InitBy]);
1610		mandatory ->
1611		    case mnesia_controller:sync_dump_log(InitBy) of
1612			dumped ->
1613			    GoodRes;
1614			{error, Reason} ->
1615			    mnesia:abort(Reason)
1616		    end
1617	    end,
1618	    case Ops of
1619		[] ->
1620		    ignore;
1621		_ ->
1622		    %% We need to grab a dumper lock here, the log may not
1623		    %% be dumped by others, during the schema commit phase.
1624		    mnesia_controller:wait_for_schema_commit_lock()
1625	    end,
1626	    GoodRes
1627    end.
1628
1629prepare_ops(Tid, [Op | Ops], WaitFor, Changed, Acc, DumperMode) ->
1630    case prepare_op(Tid, Op, WaitFor) of
1631        {true, mandatory} ->
1632	    prepare_ops(Tid, Ops, WaitFor, Changed, [Op | Acc], mandatory);
1633        {true, optional} ->
1634	    prepare_ops(Tid, Ops, WaitFor, Changed, [Op | Acc], DumperMode);
1635        {true, Ops2, mandatory} ->
1636	    prepare_ops(Tid, Ops, WaitFor, true, Ops2 ++ Acc, mandatory);
1637        {true, Ops2, optional} ->
1638	    prepare_ops(Tid, Ops, WaitFor, true, Ops2 ++ Acc, DumperMode);
1639	{false, mandatory} ->
1640	    prepare_ops(Tid, Ops, WaitFor, true, Acc, mandatory);
1641	{false, optional} ->
1642	    prepare_ops(Tid, Ops, WaitFor, true, Acc, DumperMode)
1643    end;
1644prepare_ops(_Tid, [], _WaitFor, Changed, Acc, DumperMode) ->
1645    {Changed, Acc, DumperMode}.
1646
1647%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1648%% Prepare for commit
1649%% returns true if Op should be included, i.e. unmodified
1650%%         {true, Operation} if NewRecs should be included, i.e. modified
1651%%         false if Op should NOT be included, i.e. modified
1652%%
1653prepare_op(_Tid, {op, rec, unknown, Rec}, _WaitFor) ->
1654    {{Tab, Key}, Items, _Op} = Rec,
1655    case val({Tab, storage_type}) of
1656        unknown ->
1657            {false, optional};
1658        Storage ->
1659            mnesia_tm:prepare_snmp(Tab, Key, Items), % May exit
1660            {true, [{op, rec, Storage, Rec}], optional}
1661    end;
1662
1663prepare_op(_Tid, {op, announce_im_running, _Node, SchemaDef, Running, RemoteRunning}, _WaitFor) ->
1664    SchemaCs = list2cs(SchemaDef),
1665    case lists:member(node(), Running) of
1666        true ->
1667            announce_im_running(RemoteRunning -- Running, SchemaCs);
1668        false ->
1669            announce_im_running(Running -- RemoteRunning, SchemaCs)
1670    end,
1671    {false, optional};
1672
1673prepare_op(_Tid, {op, sync_trans}, {part, CoordPid}) ->
1674    CoordPid ! {sync_trans, self()},
1675    receive
1676	{sync_trans, CoordPid} ->
1677	    {false, optional};
1678	Else ->
1679	    mnesia_lib:verbose("sync_op terminated due to ~p~n", [Else]),
1680	    mnesia:abort(Else)
1681    end;
1682
1683prepare_op(_Tid, {op, sync_trans}, {coord, Nodes}) ->
1684    case receive_sync(Nodes, []) of
1685	{abort, Reason} ->
1686	    mnesia_lib:verbose("sync_op terminated due to ~p~n", [Reason]),
1687	    mnesia:abort(Reason);
1688	Pids ->
1689	    [Pid ! {sync_trans, self()} || Pid <- Pids],
1690	    {false, optional}
1691    end;
1692prepare_op(Tid, {op, create_table, TabDef}, _WaitFor) ->
1693    Cs = list2cs(TabDef),
1694    Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
1695    UseDir = mnesia_monitor:use_dir(),
1696    Tab = Cs#cstruct.name,
1697    case Storage of
1698        disc_copies when UseDir == false ->
1699	    UseDirReason = {bad_type, Tab, Storage, node()},
1700            mnesia:abort(UseDirReason);
1701        disc_only_copies when UseDir == false ->
1702	    UseDirReason = {bad_type, Tab, Storage, node()},
1703            mnesia:abort(UseDirReason);
1704	ram_copies ->
1705	    create_ram_table(Tab, Cs#cstruct.type),
1706	    insert_cstruct(Tid, Cs, false),
1707	    {true, optional};
1708	disc_copies ->
1709	    create_ram_table(Tab, Cs#cstruct.type),
1710	    create_disc_table(Tab),
1711	    insert_cstruct(Tid, Cs, false),
1712	    {true, optional};
1713	disc_only_copies ->
1714	    create_disc_only_table(Tab,Cs#cstruct.type),
1715	    insert_cstruct(Tid, Cs, false),
1716	    {true, optional};
1717        unknown -> %% No replica on this node
1718	    insert_cstruct(Tid, Cs, false),
1719            {true, optional}
1720    end;
1721
1722prepare_op(Tid, {op, add_table_copy, Storage, Node, TabDef}, _WaitFor) ->
1723    Cs = list2cs(TabDef),
1724    Tab = Cs#cstruct.name,
1725
1726    if
1727	Tab == schema ->
1728	    {true, optional}; % Nothing to prepare
1729	Node == node() ->
1730	    case mnesia_lib:val({schema, storage_type}) of
1731		ram_copies when Storage /= ram_copies ->
1732		    Error = {combine_error, Tab, "has no disc", Node},
1733		    mnesia:abort(Error);
1734		_  ->
1735		    ok
1736	    end,
1737	    %% Tables are created by mnesia_loader get_network code
1738	    insert_cstruct(Tid, Cs, true),
1739	    case mnesia_controller:get_network_copy(Tab, Cs) of
1740		{loaded, ok} ->
1741		    {true, optional};
1742		{not_loaded, ErrReason} ->
1743		    Reason = {system_limit, Tab, {Node, ErrReason}},
1744		    mnesia:abort(Reason)
1745	    end;
1746	Node /= node() ->
1747	    %% Verify that ram table not has been dumped to disc
1748	    if
1749		Storage /= ram_copies ->
1750		    case mnesia_lib:schema_cs_to_storage_type(node(), Cs) of
1751			ram_copies ->
1752			    Dat = mnesia_lib:tab2dcd(Tab),
1753			    case mnesia_lib:exists(Dat) of
1754				true ->
1755				    mnesia:abort({combine_error, Tab, Storage,
1756						  "Table dumped to disc", node()});
1757				false ->
1758				    ok
1759			    end;
1760			_ ->
1761			    ok
1762		    end;
1763		true ->
1764		    ok
1765	    end,
1766	    insert_cstruct(Tid, Cs, true),
1767	    {true, optional}
1768    end;
1769
1770prepare_op(Tid, {op, del_table_copy, _Storage, Node, TabDef}, _WaitFor) ->
1771    Cs = list2cs(TabDef),
1772    Tab = Cs#cstruct.name,
1773
1774    if
1775	%% Schema table lock is always required to run a schema op.
1776	%% No need to look it.
1777	node(Tid#tid.pid) == node(), Tab /= schema ->
1778	    Pid = spawn_link(?MODULE, lock_del_table, [Tab, Node, Cs, self()]),
1779	    receive
1780		{Pid, updated} ->
1781		    {true, optional};
1782		{Pid, FailReason} ->
1783		    mnesia:abort(FailReason);
1784		{'EXIT', Pid, Reason} ->
1785		    mnesia:abort(Reason)
1786	    end;
1787	true ->
1788	    {true, optional}
1789    end;
1790
1791prepare_op(_Tid, {op, change_table_copy_type,  N, FromS, ToS, TabDef}, _WaitFor)
1792  when N == node() ->
1793    Cs = list2cs(TabDef),
1794    Tab = Cs#cstruct.name,
1795
1796    NotActive = mnesia_lib:not_active_here(Tab),
1797
1798    if
1799	NotActive == true ->
1800	    mnesia:abort({not_active, Tab, node()});
1801
1802	Tab == schema ->
1803	    case {FromS, ToS} of
1804		{ram_copies, disc_copies} ->
1805		    case mnesia:system_info(schema_location) of
1806			opt_disc ->
1807			    ignore;
1808			_ ->
1809			    mnesia:abort({combine_error,  Tab, node(),
1810					  "schema_location must be opt_disc"})
1811		    end,
1812		    Dir = mnesia_lib:dir(),
1813		    case opt_create_dir(true, Dir) of
1814			ok ->
1815			    purge_dir(Dir, []),
1816			    mnesia_log:purge_all_logs(),
1817			    set(use_dir, true),
1818			    mnesia_log:init(),
1819			    Ns = val({current, db_nodes}), %mnesia_lib:running_nodes(),
1820			    F = fun(U) -> mnesia_recover:log_mnesia_up(U) end,
1821			    lists:foreach(F, Ns),
1822
1823			    mnesia_dumper:raw_named_dump_table(Tab, dmp),
1824			    mnesia_checkpoint:tm_change_table_copy_type(Tab, FromS, ToS);
1825			{error, Reason} ->
1826			    mnesia:abort(Reason)
1827		    end;
1828		{disc_copies, ram_copies} ->
1829		    Ltabs = val({schema, local_tables}) -- [schema],
1830		    Dtabs = [L || L <- Ltabs,
1831				  val({L, storage_type}) /= ram_copies],
1832		    verify([], Dtabs, {"Disc resident tables", Dtabs, N});
1833		_ ->
1834		    mnesia:abort({combine_error, Tab, ToS})
1835	    end;
1836
1837	FromS == ram_copies ->
1838	    case mnesia_monitor:use_dir() of
1839		true ->
1840		    Dat = mnesia_lib:tab2dcd(Tab),
1841		    case mnesia_lib:exists(Dat) of
1842			true ->
1843			    mnesia:abort({combine_error, Tab, node(),
1844					  "Table dump exists"});
1845			false ->
1846			    case ToS of
1847				disc_copies ->
1848				    mnesia_log:ets2dcd(Tab, dmp);
1849				disc_only_copies ->
1850				    mnesia_dumper:raw_named_dump_table(Tab, dmp)
1851			    end,
1852			    mnesia_checkpoint:tm_change_table_copy_type(Tab, FromS, ToS)
1853		    end;
1854		false ->
1855		    mnesia:abort({has_no_disc, node()})
1856	    end;
1857
1858	FromS == disc_copies, ToS == disc_only_copies ->
1859	    mnesia_dumper:raw_named_dump_table(Tab, dmp);
1860	FromS == disc_only_copies ->
1861	    Type = Cs#cstruct.type,
1862	    create_ram_table(Tab, Type),
1863	    Datname = mnesia_lib:tab2dat(Tab),
1864	    Repair = mnesia_monitor:get_env(auto_repair),
1865	    case mnesia_lib:dets_to_ets(Tab, Tab, Datname, Type, Repair, no) of
1866		loaded -> ok;
1867		Reason ->
1868		    Err = "Failed to copy disc data to ram",
1869		    mnesia:abort({system_limit, Tab, {Err,Reason}})
1870	    end;
1871	true ->
1872	    ignore
1873    end,
1874    {true, mandatory};
1875
1876prepare_op(_Tid, {op, change_table_copy_type,  N, _FromS, _ToS, _TabDef}, _WaitFor)
1877  when N /= node() ->
1878    {true, mandatory};
1879
1880prepare_op(_Tid, {op, delete_table, _TabDef}, _WaitFor) ->
1881    {true, mandatory};
1882
1883prepare_op(_Tid, {op, dump_table, unknown, TabDef}, _WaitFor) ->
1884    Cs = list2cs(TabDef),
1885    Tab = Cs#cstruct.name,
1886    case lists:member(node(), Cs#cstruct.ram_copies) of
1887        true ->
1888	    case mnesia_monitor:use_dir() of
1889		true ->
1890		    mnesia_log:ets2dcd(Tab, dmp),
1891		    Size = mnesia:table_info(Tab, size),
1892		    {true, [{op, dump_table, Size, TabDef}], optional};
1893		false ->
1894		    mnesia:abort({has_no_disc, node()})
1895	    end;
1896        false ->
1897            {false, optional}
1898    end;
1899
1900prepare_op(_Tid, {op, add_snmp, Ustruct, TabDef}, _WaitFor) ->
1901    Cs = list2cs(TabDef),
1902    case mnesia_lib:cs_to_storage_type(node(), Cs) of
1903        unknown ->
1904            {true, optional};
1905        Storage ->
1906            Tab = Cs#cstruct.name,
1907            Stab = mnesia_snmp_hook:create_table(Ustruct, Tab, Storage),
1908            mnesia_lib:set({Tab, {index, snmp}}, Stab),
1909            {true, optional}
1910    end;
1911
1912prepare_op(_Tid, {op, transform, ignore, _TabDef}, _WaitFor) ->
1913    {true, mandatory};   %% Apply schema changes only.
1914prepare_op(_Tid, {op, transform, Fun, TabDef}, _WaitFor) ->
1915    Cs = list2cs(TabDef),
1916    case mnesia_lib:cs_to_storage_type(node(), Cs) of
1917        unknown ->
1918            {true, mandatory};
1919        Storage ->
1920            Tab = Cs#cstruct.name,
1921            RecName = Cs#cstruct.record_name,
1922	    Type = Cs#cstruct.type,
1923            NewArity = length(Cs#cstruct.attributes) + 1,
1924	    mnesia_lib:db_fixtable(Storage, Tab, true),
1925            Key = mnesia_lib:db_first(Tab),
1926	    Op = {op, transform, Fun, TabDef},
1927            case catch transform_objs(Fun, Tab, RecName,
1928				      Key, NewArity, Storage, Type, [Op]) of
1929                {'EXIT', Reason} ->
1930		    mnesia_lib:db_fixtable(Storage, Tab, false),
1931                    exit({"Bad transform function", Tab, Fun, node(), Reason});
1932                Objs ->
1933		    mnesia_lib:db_fixtable(Storage, Tab, false),
1934                    {true, Objs, mandatory}
1935            end
1936    end;
1937
1938prepare_op(_Tid, _Op, _WaitFor) ->
1939    {true, optional}.
1940
1941
1942create_ram_table(Tab, Type) ->
1943    Args = [{keypos, 2}, public, named_table, Type],
1944    case mnesia_monitor:unsafe_mktab(Tab, Args) of
1945	Tab ->
1946	    ok;
1947	{error,Reason} ->
1948	    Err = "Failed to create ets table",
1949	    mnesia:abort({system_limit, Tab, {Err,Reason}})
1950    end.
1951create_disc_table(Tab) ->
1952    File = mnesia_lib:tab2dcd(Tab),
1953    file:delete(File),
1954    FArg = [{file, File}, {name, {mnesia,create}},
1955	    {repair, false}, {mode, read_write}],
1956    case mnesia_monitor:open_log(FArg) of
1957	{ok,Log} ->
1958	    mnesia_monitor:unsafe_close_log(Log),
1959	    ok;
1960	{error,Reason} ->
1961	    Err = "Failed to create disc table",
1962	    mnesia:abort({system_limit, Tab, {Err,Reason}})
1963    end.
1964create_disc_only_table(Tab,Type) ->
1965    File = mnesia_lib:tab2dat(Tab),
1966    file:delete(File),
1967    Args = [{file, mnesia_lib:tab2dat(Tab)},
1968	    {type, mnesia_lib:disk_type(Tab, Type)},
1969	    {keypos, 2},
1970	    {repair, mnesia_monitor:get_env(auto_repair)}],
1971    case mnesia_monitor:unsafe_open_dets(Tab, Args) of
1972	{ok, _} ->
1973	    ok;
1974	{error,Reason} ->
1975	    Err = "Failed to create disc table",
1976	    mnesia:abort({system_limit, Tab, {Err,Reason}})
1977    end.
1978
1979
1980receive_sync([], Pids) ->
1981    Pids;
1982receive_sync(Nodes, Pids) ->
1983    receive
1984	{sync_trans, Pid} ->
1985	    Node = node(Pid),
1986	    receive_sync(lists:delete(Node, Nodes), [Pid | Pids]);
1987	Else ->
1988	    {abort, Else}
1989    end.
1990
1991lock_del_table(Tab, Node, Cs, Father) ->
1992    Ns = val({schema, active_replicas}),
1993    Lock = fun() ->
1994		   mnesia:write_lock_table(Tab),
1995		   {Res, []} = rpc:multicall(Ns, ?MODULE, set_where_to_read, [Tab, Node, Cs]),
1996		   Filter = fun(ok) ->
1997				    false;
1998			       ({badrpc, {'EXIT', {undef, _}}}) ->
1999				    %% This will be the case we talks with elder nodes
2000				    %% than 3.8.2, they will set where_to_read without
2001				    %% getting a lock.
2002				    false;
2003			       (_) ->
2004				    true
2005			    end,
2006		   [] = lists:filter(Filter, Res),
2007		   ok
2008	   end,
2009    case mnesia:transaction(Lock) of
2010	{'atomic', ok} ->
2011	    Father ! {self(), updated};
2012	{aborted, R} ->
2013	    Father ! {self(), R}
2014    end,
2015    unlink(Father),
2016    exit(normal).
2017
2018set_where_to_read(Tab, Node, Cs) ->
2019    case mnesia_lib:val({Tab, where_to_read}) of
2020	Node ->
2021	    case Cs#cstruct.local_content of
2022		true ->
2023		    ok;
2024		false ->
2025		    mnesia_lib:set_remote_where_to_read(Tab, [Node]),
2026		    ok
2027	    end;
2028	_ ->
2029	    ok
2030    end.
2031
2032%% Build up the list in reverse order.
2033transform_objs(_Fun, _Tab, _RT, '$end_of_table', _NewArity, _Storage, _Type, Acc) ->
2034    Acc;
2035transform_objs(Fun, Tab, RecName, Key, A, Storage, Type, Acc) ->
2036    Objs = mnesia_lib:db_get(Tab, Key),
2037    NextKey = mnesia_lib:db_next_key(Tab, Key),
2038    Oid = {Tab, Key},
2039    NewObjs = {Ws, Ds} = transform_obj(Tab, RecName, Key, Fun, Objs, A, Type, [], []),
2040    if
2041	NewObjs == {[], []} ->
2042	    transform_objs(Fun, Tab, RecName, NextKey, A, Storage, Type, Acc);
2043	Type == bag ->
2044	    transform_objs(Fun, Tab, RecName, NextKey, A, Storage, Type,
2045			   [{op, rec, Storage, {Oid, Ws, write}},
2046			    {op, rec, Storage, {Oid, [Oid], delete}} | Acc]);
2047	Ds == [] ->
2048	    %% Type is set or ordered_set, no need to delete the record first
2049	    transform_objs(Fun, Tab, RecName, NextKey, A, Storage, Type,
2050			   [{op, rec, Storage, {Oid, Ws, write}} | Acc]);
2051	Ws == [] ->
2052	    transform_objs(Fun, Tab, RecName, NextKey, A, Storage, Type,
2053			   [{op, rec, Storage, {Oid, Ds, write}} | Acc]);
2054	true ->
2055	    transform_objs(Fun, Tab, RecName, NextKey, A, Storage, Type,
2056			   [{op, rec, Storage, {Oid, Ws, write}},
2057			    {op, rec, Storage, {Oid, Ds, delete}} | Acc])
2058    end.
2059
2060transform_obj(Tab, RecName, Key, Fun, [Obj|Rest], NewArity, Type, Ws, Ds) ->
2061    NewObj = Fun(Obj),
2062    if
2063        size(NewObj) /= NewArity ->
2064            exit({"Bad arity", Obj, NewObj});
2065	NewObj == Obj ->
2066	    transform_obj(Tab, RecName, Key, Fun, Rest, NewArity, Type, Ws, Ds);
2067        RecName == element(1, NewObj), Key == element(2, NewObj) ->
2068            transform_obj(Tab, RecName, Key, Fun, Rest, NewArity,
2069			  Type, [NewObj | Ws], Ds);
2070	NewObj == delete ->
2071	    case Type of
2072		bag -> %% Just don't write that object
2073		   transform_obj(Tab, RecName, Key, Fun, Rest,
2074				 NewArity, Type, Ws, Ds);
2075		_ ->
2076		    transform_obj(Tab, RecName, Key, Fun, Rest, NewArity,
2077				  Type, Ws, [NewObj | Ds])
2078	    end;
2079        true ->
2080            exit({"Bad key or Record Name", Obj, NewObj})
2081    end;
2082transform_obj(_Tab, _RecName, _Key, _Fun, [], _NewArity, _Type, Ws, Ds) ->
2083    {lists:reverse(Ws), lists:reverse(Ds)}.
2084
2085%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
2086%% Undo prepare of commit
2087undo_prepare_commit(Tid, Commit) ->
2088    case Commit#commit.schema_ops of
2089	[] ->
2090	    ignore;
2091	Ops ->
2092	    %% Catch to allow failure mnesia_controller may not be started
2093	    catch mnesia_controller:release_schema_commit_lock(),
2094	    undo_prepare_ops(Tid, Ops)
2095    end,
2096    Commit.
2097
2098%% Undo in reverse order
2099undo_prepare_ops(Tid, [Op | Ops]) ->
2100    case element(1, Op) of
2101	TheOp when TheOp /= op, TheOp /= restore_op ->
2102	    undo_prepare_ops(Tid, Ops);
2103	_ ->
2104	    undo_prepare_ops(Tid, Ops),
2105	    undo_prepare_op(Tid, Op)
2106    end;
2107undo_prepare_ops(_Tid, []) ->
2108    [].
2109
2110undo_prepare_op(_Tid, {op, announce_im_running, _, _, Running, RemoteRunning}) ->
2111    case lists:member(node(), Running) of
2112        true ->
2113            unannounce_im_running(RemoteRunning -- Running);
2114        false ->
2115            unannounce_im_running(Running -- RemoteRunning)
2116    end;
2117
2118undo_prepare_op(_Tid, {op, sync_trans}) ->
2119    ok;
2120
2121undo_prepare_op(Tid, {op, create_table, TabDef}) ->
2122    Cs = list2cs(TabDef),
2123    Tab = Cs#cstruct.name,
2124    mnesia_lib:unset({Tab, create_table}),
2125    delete_cstruct(Tid, Cs),
2126    case mnesia_lib:cs_to_storage_type(node(), Cs) of
2127	unknown ->
2128	    ok;
2129	ram_copies ->
2130	    ram_delete_table(Tab, ram_copies);
2131	disc_copies ->
2132	    ram_delete_table(Tab, disc_copies),
2133	    DcdFile = mnesia_lib:tab2dcd(Tab),
2134	    %%	    disc_delete_table(Tab, Storage),
2135	    file:delete(DcdFile);
2136	disc_only_copies ->
2137	    mnesia_monitor:unsafe_close_dets(Tab),
2138	    Dat = mnesia_lib:tab2dat(Tab),
2139	    %%	    disc_delete_table(Tab, Storage),
2140	    file:delete(Dat)
2141    end;
2142
2143undo_prepare_op(Tid, {op, add_table_copy, Storage, Node, TabDef}) ->
2144    Cs = list2cs(TabDef),
2145    Tab = Cs#cstruct.name,
2146    if
2147	Tab == schema ->
2148	    true; % Nothing to prepare
2149	Node == node() ->
2150	    mnesia_checkpoint:tm_del_copy(Tab, Node),
2151	    mnesia_controller:unannounce_add_table_copy(Tab, Node),
2152	    if
2153		Storage == disc_only_copies; Tab == schema ->
2154		    mnesia_monitor:close_dets(Tab),
2155		    file:delete(mnesia_lib:tab2dat(Tab));
2156		true ->
2157		    file:delete(mnesia_lib:tab2dcd(Tab))
2158	    end,
2159	    ram_delete_table(Tab, Storage),
2160	    Cs2 = new_cs(Cs, Node, Storage, del),
2161	    insert_cstruct(Tid, Cs2, true); % Don't care about the version
2162	Node /= node() ->
2163	    mnesia_controller:unannounce_add_table_copy(Tab, Node),
2164	    Cs2 = new_cs(Cs, Node, Storage, del),
2165	    insert_cstruct(Tid, Cs2, true) % Don't care about the version
2166    end;
2167
2168undo_prepare_op(_Tid, {op, del_table_copy, _, Node, TabDef})
2169  when Node == node() ->
2170    Cs = list2cs(TabDef),
2171    Tab = Cs#cstruct.name,
2172    mnesia_lib:set({Tab, where_to_read}, Node);
2173
2174
2175undo_prepare_op(_Tid, {op, change_table_copy_type, N, FromS, ToS, TabDef})
2176        when N == node() ->
2177    Cs = list2cs(TabDef),
2178    Tab = Cs#cstruct.name,
2179    mnesia_checkpoint:tm_change_table_copy_type(Tab, ToS, FromS),
2180    Dmp = mnesia_lib:tab2dmp(Tab),
2181
2182    case {FromS, ToS} of
2183        {ram_copies, disc_copies} when Tab == schema ->
2184            file:delete(Dmp),
2185            mnesia_log:purge_some_logs(),
2186	    set(use_dir, false);
2187	{ram_copies, disc_copies} ->
2188	    file:delete(Dmp);
2189	{ram_copies, disc_only_copies} ->
2190	    file:delete(Dmp);
2191	{disc_only_copies, _} ->
2192	    ram_delete_table(Tab, ram_copies);
2193	_ ->
2194	    ignore
2195    end;
2196
2197undo_prepare_op(_Tid, {op, dump_table, _Size, TabDef}) ->
2198    Cs = list2cs(TabDef),
2199    case lists:member(node(), Cs#cstruct.ram_copies) of
2200	true ->
2201	    Tab = Cs#cstruct.name,
2202	    Dmp = mnesia_lib:tab2dmp(Tab),
2203	    file:delete(Dmp);
2204	false ->
2205	    ignore
2206    end;
2207
2208undo_prepare_op(_Tid, {op, add_snmp, _Ustruct, TabDef}) ->
2209    Cs = list2cs(TabDef),
2210    case mnesia_lib:cs_to_storage_type(node(), Cs) of
2211	unknown ->
2212	    true;
2213	_Storage ->
2214	    Tab = Cs#cstruct.name,
2215	    case ?catch_val({Tab, {index, snmp}}) of
2216		{'EXIT',_} ->
2217		    ignore;
2218		Stab ->
2219		    mnesia_snmp_hook:delete_table(Tab, Stab),
2220		    mnesia_lib:unset({Tab, {index, snmp}})
2221	    end
2222    end;
2223
2224undo_prepare_op(_Tid, _Op) ->
2225    ignore.
2226
2227%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
2228
2229ram_delete_table(Tab, Storage) ->
2230    case Storage of
2231	unknown ->
2232	    ignore;
2233	disc_only_copies ->
2234	    ignore;
2235	_Else ->
2236	    %% delete possible index files and data .....
2237	    %% Got to catch this since if no info has been set in the
2238	    %% mnesia_gvar it will crash
2239	    catch mnesia_index:del_transient(Tab, Storage),
2240	    case ?catch_val({Tab, {index, snmp}}) of
2241		{'EXIT', _} ->
2242		    ignore;
2243		Etab ->
2244		    catch mnesia_snmp_hook:delete_table(Tab, Etab)
2245	    end,
2246	    catch ?ets_delete_table(Tab)
2247    end.
2248
2249purge_dir(Dir, KeepFiles) ->
2250    Suffixes = known_suffixes(),
2251    purge_dir(Dir, KeepFiles, Suffixes).
2252
2253purge_dir(Dir, KeepFiles, Suffixes) ->
2254    case dir_exists(Dir) of
2255	true ->
2256	    {ok, AllFiles} = file:list_dir(Dir),
2257	    purge_known_files(AllFiles, KeepFiles, Dir, Suffixes);
2258	false ->
2259	    ok
2260    end.
2261
2262purge_tmp_files() ->
2263    case mnesia_monitor:use_dir() of
2264	true ->
2265	    Dir = mnesia_lib:dir(),
2266	    KeepFiles = [],
2267	    Exists = mnesia_lib:exists(mnesia_lib:tab2dat(schema)),
2268	    case Exists of
2269		true ->
2270		    Suffixes = tmp_suffixes(),
2271		    purge_dir(Dir, KeepFiles, Suffixes);
2272		false ->
2273		    %% Interrupted change of storage type
2274		    %% for schema table
2275		    Suffixes = known_suffixes(),
2276		    purge_dir(Dir, KeepFiles, Suffixes),
2277		    mnesia_lib:set(use_dir, false)
2278		end;
2279
2280	false ->
2281	    ok
2282    end.
2283
2284purge_known_files([File | Tail], KeepFiles, Dir, Suffixes) ->
2285    case lists:member(File, KeepFiles) of
2286	true ->
2287	    ignore;
2288	false ->
2289	    case has_known_suffix(File, Suffixes, false) of
2290		false ->
2291		    ignore;
2292		true ->
2293		    AbsFile = filename:join([Dir, File]),
2294		    file:delete(AbsFile)
2295	    end
2296    end,
2297    purge_known_files(Tail, KeepFiles, Dir, Suffixes);
2298purge_known_files([], _KeepFiles, _Dir, _Suffixes) ->
2299    ok.
2300
2301has_known_suffix(_File, _Suffixes, true) ->
2302    true;
2303has_known_suffix(File, [Suffix | Tail], false) ->
2304    has_known_suffix(File, Tail, lists:suffix(Suffix, File));
2305has_known_suffix(_File, [], Bool) ->
2306    Bool.
2307
2308known_suffixes() -> real_suffixes() ++ tmp_suffixes().
2309
2310real_suffixes() ->  [".DAT", ".LOG", ".BUP", ".DCL", ".DCD"].
2311
2312tmp_suffixes() -> [".TMP", ".BUPTMP", ".RET", ".DMP"].
2313
2314info() ->
2315    Tabs = lists:sort(val({schema, tables})),
2316    lists:foreach(fun(T) -> info(T) end, Tabs),
2317    ok.
2318
2319info(Tab) ->
2320    Props = get_table_properties(Tab),
2321    io:format("-- Properties for ~w table --- ~n",[Tab]),
2322    info2(Tab, Props).
2323info2(Tab, [{cstruct, _V} | Tail]) -> % Ignore cstruct
2324    info2(Tab, Tail);
2325info2(Tab, [{frag_hash, _V} | Tail]) -> % Ignore frag_hash
2326    info2(Tab, Tail);
2327info2(Tab, [{P, V} | Tail]) ->
2328    io:format("~-20w -> ~p~n",[P,V]),
2329    info2(Tab, Tail);
2330info2(_, []) ->
2331    io:format("~n", []).
2332
2333get_table_properties(Tab) ->
2334    case catch mnesia_lib:db_match_object(ram_copies,
2335					  mnesia_gvar, {{Tab, '_'}, '_'}) of
2336	{'EXIT', _} ->
2337	    mnesia:abort({no_exists, Tab, all});
2338	RawGvar ->
2339	    case [{Item, Val} || {{_Tab, Item}, Val} <- RawGvar] of
2340		[] ->
2341		    [];
2342		Gvar ->
2343		    Size = {size, mnesia:table_info(Tab, size)},
2344		    Memory = {memory, mnesia:table_info(Tab, memory)},
2345		    Master = {master_nodes, mnesia:table_info(Tab, master_nodes)},
2346		    lists:sort([Size, Memory, Master | Gvar])
2347	    end
2348    end.
2349
2350%%%%%%%%%%% RESTORE %%%%%%%%%%%
2351
2352-record(r, {iter = schema,
2353	    module,
2354	    table_options = [],
2355	    default_op = clear_tables,
2356	    tables = [],
2357	    opaque,
2358	    insert_op = error_fun,
2359	    recs = error_recs
2360	   }).
2361
2362restore(Opaque) ->
2363    restore(Opaque, [], mnesia_monitor:get_env(backup_module)).
2364restore(Opaque, Args) when list(Args) ->
2365    restore(Opaque, Args, mnesia_monitor:get_env(backup_module));
2366restore(_Opaque, BadArg) ->
2367    {aborted, {badarg, BadArg}}.
2368restore(Opaque, Args, Module) when list(Args), atom(Module) ->
2369    InitR = #r{opaque = Opaque, module = Module},
2370    case catch lists:foldl(fun check_restore_arg/2, InitR, Args) of
2371	R when record(R, r) ->
2372	    case mnesia_bup:read_schema(Module, Opaque) of
2373		{error, Reason} ->
2374		    {aborted, Reason};
2375		BupSchema ->
2376		    schema_transaction(fun() -> do_restore(R, BupSchema) end)
2377	    end;
2378	{'EXIT', Reason} ->
2379	    {aborted, Reason}
2380    end;
2381restore(_Opaque, Args, Module) ->
2382    {aborted, {badarg, Args, Module}}.
2383
2384check_restore_arg({module, Mod}, R) when atom(Mod) ->
2385    R#r{module = Mod};
2386
2387check_restore_arg({clear_tables, List}, R) when list(List) ->
2388    case lists:member(schema, List) of
2389	false ->
2390	    TableList = [{Tab, clear_tables} || Tab <- List],
2391	    R#r{table_options = R#r.table_options ++ TableList};
2392	true ->
2393	    exit({badarg, {clear_tables, schema}})
2394    end;
2395check_restore_arg({recreate_tables, List}, R) when list(List) ->
2396    case lists:member(schema, List) of
2397	false ->
2398	    TableList = [{Tab, recreate_tables} || Tab <- List],
2399	    R#r{table_options = R#r.table_options ++ TableList};
2400	true ->
2401	    exit({badarg, {recreate_tables, schema}})
2402    end;
2403check_restore_arg({keep_tables, List}, R) when list(List) ->
2404    TableList = [{Tab, keep_tables} || Tab <- List],
2405    R#r{table_options = R#r.table_options ++ TableList};
2406check_restore_arg({skip_tables, List}, R) when list(List) ->
2407    TableList = [{Tab, skip_tables} || Tab <- List],
2408    R#r{table_options = R#r.table_options ++ TableList};
2409check_restore_arg({default_op, Op}, R) ->
2410    case Op of
2411	clear_tables -> ok;
2412	recreate_tables -> ok;
2413	keep_tables -> ok;
2414	skip_tables -> ok;
2415	Else ->
2416	    exit({badarg, {bad_default_op, Else}})
2417    end,
2418    R#r{default_op = Op};
2419
2420check_restore_arg(BadArg,_) ->
2421    exit({badarg, BadArg}).
2422
2423do_restore(R, BupSchema) ->
2424    TidTs = get_tid_ts_and_lock(schema, write),
2425    R2 = restore_schema(BupSchema, R),
2426    insert_schema_ops(TidTs, [{restore_op, R2}]),
2427    [element(1, TabStruct) || TabStruct <- R2#r.tables].
2428
2429arrange_restore(R, Fun, Recs) ->
2430    R2 = R#r{insert_op = Fun, recs = Recs},
2431    case mnesia_bup:iterate(R#r.module, fun restore_items/4, R#r.opaque, R2) of
2432	{ok, R3} -> R3#r.recs;
2433	{error, Reason} -> mnesia:abort(Reason);
2434	Reason -> mnesia:abort(Reason)
2435    end.
2436
2437restore_items([Rec | Recs], Header, Schema, R) ->
2438    Tab = element(1, Rec),
2439    case lists:keysearch(Tab, 1, R#r.tables) of
2440	{value, {Tab, Where, Snmp, RecName}} ->
2441	    {Rest, NRecs} =
2442		restore_tab_items([Rec | Recs], Tab, RecName, Where, Snmp,
2443				  R#r.recs, R#r.insert_op),
2444	    restore_items(Rest, Header, Schema, R#r{recs = NRecs});
2445	false ->
2446	    Rest = skip_tab_items(Recs, Tab),
2447	    restore_items(Rest, Header, Schema, R)
2448    end;
2449
2450restore_items([], _Header, _Schema, R) ->
2451    R.
2452
2453restore_func(Tab, R) ->
2454    case lists:keysearch(Tab, 1, R#r.table_options) of
2455	{value, {Tab, OP}} ->
2456	    OP;
2457	false ->
2458	    R#r.default_op
2459    end.
2460
2461where_to_commit(Tab, CsList) ->
2462    Ram =   [{N, ram_copies} || N <- pick(Tab, ram_copies, CsList, [])],
2463    Disc =  [{N, disc_copies} || N <- pick(Tab, disc_copies, CsList, [])],
2464    DiscO = [{N, disc_only_copies} || N <- pick(Tab, disc_only_copies, CsList, [])],
2465    Ram ++ Disc ++ DiscO.
2466
2467%% Changes of the Meta info of schema itself is not allowed
2468restore_schema([{schema, schema, _List} | Schema], R) ->
2469    restore_schema(Schema, R);
2470restore_schema([{schema, Tab, List} | Schema], R) ->
2471    case restore_func(Tab, R) of
2472	clear_tables ->
2473	    do_clear_table(Tab),
2474	    Where = val({Tab, where_to_commit}),
2475	    Snmp = val({Tab, snmp}),
2476	    RecName = val({Tab, record_name}),
2477	    R2 = R#r{tables = [{Tab, Where, Snmp, RecName} | R#r.tables]},
2478	    restore_schema(Schema, R2);
2479	recreate_tables ->
2480	    TidTs = get_tid_ts_and_lock(Tab, write),
2481	    NC    = {cookie, ?unique_cookie},
2482	    List2 = lists:keyreplace(cookie, 1, List, NC),
2483	    Where = where_to_commit(Tab, List2),
2484	    Snmp  = pick(Tab, snmp, List2, []),
2485	    RecName = pick(Tab, record_name, List2, Tab),
2486% 	    case ?catch_val({Tab, cstruct}) of
2487% 		{'EXIT', _} ->
2488% 		    ignore;
2489% 		OldCs when record(OldCs, cstruct) ->
2490% 		    do_delete_table(Tab)
2491% 	    end,
2492% 	    unsafe_do_create_table(list2cs(List2)),
2493	    insert_schema_ops(TidTs, [{op, restore_recreate, List2}]),
2494	    R2 = R#r{tables = [{Tab, Where, Snmp, RecName} | R#r.tables]},
2495	    restore_schema(Schema, R2);
2496	keep_tables ->
2497	    get_tid_ts_and_lock(Tab, write),
2498	    Where = val({Tab, where_to_commit}),
2499	    Snmp = val({Tab, snmp}),
2500	    RecName = val({Tab, record_name}),
2501	    R2 = R#r{tables = [{Tab, Where, Snmp, RecName} | R#r.tables]},
2502	    restore_schema(Schema, R2);
2503	skip_tables ->
2504	    restore_schema(Schema, R)
2505    end;
2506
2507restore_schema([{schema, Tab} | Schema], R) ->
2508    do_delete_table(Tab),
2509    Tabs = lists:delete(Tab,R#r.tables),
2510    restore_schema(Schema, R#r{tables = Tabs});
2511restore_schema([], R) ->
2512    R.
2513
2514restore_tab_items([Rec | Rest], Tab, RecName, Where, Snmp, Recs, Op)
2515  when element(1, Rec) == Tab ->
2516    NewRecs = Op(Rec, Recs, RecName, Where, Snmp),
2517    restore_tab_items(Rest, Tab, RecName, Where, Snmp, NewRecs, Op);
2518
2519restore_tab_items(Rest, _Tab, _RecName, _Where, _Snmp, Recs, _Op) ->
2520    {Rest, Recs}.
2521
2522skip_tab_items([Rec| Rest], Tab)
2523  when element(1, Rec) == Tab ->
2524    skip_tab_items(Rest, Tab);
2525skip_tab_items(Recs, _) ->
2526    Recs.
2527
2528%%%%%%%%% Dump tables %%%%%%%%%%%%%
2529dump_tables(Tabs) when list(Tabs) ->
2530    schema_transaction(fun() -> do_dump_tables(Tabs) end);
2531dump_tables(Tabs) ->
2532    {aborted, {bad_type, Tabs}}.
2533
2534do_dump_tables(Tabs) ->
2535    TidTs = get_tid_ts_and_lock(schema, write),
2536    insert_schema_ops(TidTs, make_dump_tables(Tabs)).
2537
2538make_dump_tables([schema | _Tabs]) ->
2539    mnesia:abort({bad_type, schema});
2540make_dump_tables([Tab | Tabs]) ->
2541    get_tid_ts_and_lock(Tab, read),
2542    TabDef = get_create_list(Tab),
2543    DiscResident =  val({Tab, disc_copies}) ++ val({Tab, disc_only_copies}),
2544    verify([], DiscResident,
2545	   {"Only allowed on ram_copies", Tab, DiscResident}),
2546    [{op, dump_table, unknown, TabDef} | make_dump_tables(Tabs)];
2547make_dump_tables([]) ->
2548    [].
2549
2550%% Merge the local schema with the schema on other nodes
2551merge_schema() ->
2552    schema_transaction(fun() -> do_merge_schema() end).
2553
2554do_merge_schema() ->
2555    {_Mod, Tid, Ts} = get_tid_ts_and_lock(schema, write),
2556    Connected = val(recover_nodes),
2557    Running = val({current, db_nodes}),
2558    Store = Ts#tidstore.store,
2559    case Connected -- Running of
2560	[Node | _] ->
2561	    %% Time for a schema merging party!
2562	    mnesia_locker:wlock_no_exist(Tid, Store, schema, [Node]),
2563
2564	    case rpc:call(Node, mnesia_controller, get_cstructs, []) of
2565		{cstructs, Cstructs, RemoteRunning1} ->
2566		    LockedAlready = Running ++ [Node],
2567		    {New, Old} = mnesia_recover:connect_nodes(RemoteRunning1),
2568		    RemoteRunning = mnesia_lib:intersect(New ++ Old, RemoteRunning1),
2569		    if
2570			RemoteRunning /= RemoteRunning1 ->
2571			    mnesia_lib:error("Mnesia on ~p could not connect to node(s) ~p~n",
2572					     [node(), RemoteRunning1 -- RemoteRunning]);
2573			true -> ok
2574		    end,
2575		    NeedsLock = RemoteRunning -- LockedAlready,
2576		    mnesia_locker:wlock_no_exist(Tid, Store, schema, NeedsLock),
2577
2578		    {value, SchemaCs} =
2579			lists:keysearch(schema, #cstruct.name, Cstructs),
2580
2581		    %% Announce that Node is running
2582		    A = [{op, announce_im_running, node(),
2583			  cs2list(SchemaCs), Running, RemoteRunning}],
2584		    do_insert_schema_ops(Store, A),
2585
2586		    %% Introduce remote tables to local node
2587		    do_insert_schema_ops(Store, make_merge_schema(Node, Cstructs)),
2588
2589		    %% Introduce local tables to remote nodes
2590		    Tabs = val({schema, tables}),
2591		    Ops = [{op, merge_schema, get_create_list(T)}
2592			   || T <- Tabs,
2593			      not lists:keymember(T, #cstruct.name, Cstructs)],
2594		    do_insert_schema_ops(Store, Ops),
2595
2596		    %% Ensure that the txn will be committed on all nodes
2597		    announce_im_running(RemoteRunning, SchemaCs),
2598		    {merged, Running, RemoteRunning};
2599		{error, Reason} ->
2600		    {"Cannot get cstructs", Node, Reason};
2601		{badrpc, Reason} ->
2602		    {"Cannot get cstructs", Node, {badrpc, Reason}}
2603	    end;
2604	[] ->
2605	    %% No more nodes to merge schema with
2606	    not_merged
2607    end.
2608
2609make_merge_schema(Node, [Cs | Cstructs]) ->
2610    Ops = do_make_merge_schema(Node, Cs),
2611    Ops ++ make_merge_schema(Node, Cstructs);
2612make_merge_schema(_Node, []) ->
2613    [].
2614
2615%% Merge definitions of schema table
2616do_make_merge_schema(Node, RemoteCs)
2617        when RemoteCs#cstruct.name == schema ->
2618    Cs = val({schema, cstruct}),
2619    Masters = mnesia_recover:get_master_nodes(schema),
2620    HasRemoteMaster = lists:member(Node, Masters),
2621    HasLocalMaster = lists:member(node(), Masters),
2622    Force = HasLocalMaster or HasRemoteMaster,
2623    %% What is the storage types opinions?
2624    StCsLocal   = mnesia_lib:cs_to_storage_type(node(), Cs),
2625    StRcsLocal  = mnesia_lib:cs_to_storage_type(node(), RemoteCs),
2626    StCsRemote  = mnesia_lib:cs_to_storage_type(Node, Cs),
2627    StRcsRemote = mnesia_lib:cs_to_storage_type(Node, RemoteCs),
2628
2629    if
2630	Cs#cstruct.cookie == RemoteCs#cstruct.cookie,
2631	Cs#cstruct.version == RemoteCs#cstruct.version ->
2632	    %% Great, we have the same cookie and version
2633	    %% and do not need to merge cstructs
2634	    [];
2635
2636	Cs#cstruct.cookie /= RemoteCs#cstruct.cookie,
2637	Cs#cstruct.disc_copies /= [],
2638	RemoteCs#cstruct.disc_copies /= [] ->
2639	    %% Both cstructs involves disc nodes
2640	    %% and we cannot merge them
2641	    if
2642		HasLocalMaster == true,
2643		HasRemoteMaster == false ->
2644		    %% Choose local cstruct,
2645		    %% since it's the master
2646		    [{op, merge_schema, cs2list(Cs)}];
2647
2648		HasRemoteMaster == true,
2649		HasLocalMaster == false ->
2650		    %% Choose remote cstruct,
2651		    %% since it's the master
2652		    [{op, merge_schema, cs2list(RemoteCs)}];
2653
2654		true ->
2655		    Str = io_lib:format("Incompatible schema cookies. "
2656					"Please, restart from old backup."
2657					"~w = ~w, ~w = ~w~n",
2658					[Node, cs2list(RemoteCs), node(), cs2list(Cs)]),
2659		    throw(Str)
2660	    end;
2661
2662	StCsLocal /= StRcsLocal, StRcsLocal /= unknown ->
2663	    Str = io_lib:format("Incompatible schema storage types. "
2664				"on ~w storage ~w, on ~w storage ~w~n",
2665				[node(), StCsLocal, Node, StRcsLocal]),
2666	    throw(Str);
2667	StCsRemote /= StRcsRemote, StCsRemote /= unknown ->
2668	    Str = io_lib:format("Incompatible schema storage types. "
2669				"on ~w storage ~w, on ~w storage ~w~n",
2670				[node(), StCsRemote, Node, StRcsRemote]),
2671	    throw(Str);
2672
2673	Cs#cstruct.disc_copies /= [] ->
2674	    %% Choose local cstruct,
2675	    %% since it involves disc nodes
2676	    MergedCs = merge_cstructs(Cs, RemoteCs, Force),
2677	    [{op, merge_schema, cs2list(MergedCs)}];
2678
2679	RemoteCs#cstruct.disc_copies /= [] ->
2680	    %% Choose remote cstruct,
2681	    %% since it involves disc nodes
2682	    MergedCs = merge_cstructs(RemoteCs, Cs, Force),
2683	    [{op, merge_schema, cs2list(MergedCs)}];
2684
2685	Cs > RemoteCs ->
2686	    %% Choose remote cstruct
2687	    MergedCs = merge_cstructs(RemoteCs, Cs, Force),
2688	    [{op, merge_schema, cs2list(MergedCs)}];
2689
2690	true ->
2691	    %% Choose local cstruct
2692	    MergedCs = merge_cstructs(Cs, RemoteCs, Force),
2693	    [{op, merge_schema, cs2list(MergedCs)}]
2694    end;
2695
2696%% Merge definitions of normal table
2697do_make_merge_schema(Node, RemoteCs) ->
2698    Tab = RemoteCs#cstruct.name,
2699    Masters = mnesia_recover:get_master_nodes(schema),
2700    HasRemoteMaster = lists:member(Node, Masters),
2701    HasLocalMaster = lists:member(node(), Masters),
2702    Force = HasLocalMaster or HasRemoteMaster,
2703    case ?catch_val({Tab, cstruct}) of
2704	{'EXIT', _} ->
2705	    %% A completely new table, created while Node was down
2706	    [{op, merge_schema, cs2list(RemoteCs)}];
2707	Cs when Cs#cstruct.cookie == RemoteCs#cstruct.cookie ->
2708	    if
2709		Cs#cstruct.version == RemoteCs#cstruct.version ->
2710		    %% We have exactly the same version of the
2711		    %% table def
2712		    [];
2713
2714		Cs#cstruct.version > RemoteCs#cstruct.version ->
2715		    %% Oops, we have different versions
2716		    %% of the table def, lets merge them.
2717		    %% The only changes that may have occurred
2718		    %% is that new replicas may have been added.
2719		    MergedCs = merge_cstructs(Cs, RemoteCs, Force),
2720		    [{op, merge_schema, cs2list(MergedCs)}];
2721
2722		Cs#cstruct.version < RemoteCs#cstruct.version ->
2723		    %% Oops, we have different versions
2724		    %% of the table def, lets merge them
2725		    MergedCs = merge_cstructs(RemoteCs, Cs, Force),
2726		    [{op, merge_schema, cs2list(MergedCs)}]
2727	    end;
2728	Cs ->
2729	    %% Different cookies, not possible to merge
2730	    if
2731		HasLocalMaster == true,
2732		HasRemoteMaster == false ->
2733		    %% Choose local cstruct,
2734		    %% since it's the master
2735		    [{op, merge_schema, cs2list(Cs)}];
2736
2737		HasRemoteMaster == true,
2738		HasLocalMaster == false ->
2739		    %% Choose remote cstruct,
2740		    %% since it's the master
2741		    [{op, merge_schema, cs2list(RemoteCs)}];
2742
2743		true ->
2744		    Str = io_lib:format("Bad cookie in table definition"
2745					" ~w: ~w = ~w, ~w = ~w~n",
2746					[Tab, node(), Cs, Node, RemoteCs]),
2747		    throw(Str)
2748	    end
2749    end.
2750
2751%% Change of table definitions (cstructs) requires all replicas
2752%% of the table to be active. New replicas, db_nodes and tables
2753%% may however be added even if some replica is inactive. These
2754%% invariants must be enforced in order to allow merge of cstructs.
2755%%
2756%% Returns a new cstruct or issues a fatal error
2757merge_cstructs(Cs, RemoteCs, Force) ->
2758    verify_cstruct(Cs),
2759    case catch do_merge_cstructs(Cs, RemoteCs, Force) of
2760	{'EXIT', {aborted, _Reason}} when Force == true ->
2761	    Cs;
2762	{'EXIT', Reason} ->
2763	    exit(Reason);
2764	MergedCs when record(MergedCs, cstruct) ->
2765	    MergedCs;
2766	Other ->
2767	    throw(Other)
2768    end.
2769
2770do_merge_cstructs(Cs, RemoteCs, Force) ->
2771    verify_cstruct(RemoteCs),
2772    Ns = mnesia_lib:uniq(mnesia_lib:cs_to_nodes(Cs) ++
2773			 mnesia_lib:cs_to_nodes(RemoteCs)),
2774    {AnythingNew, MergedCs} =
2775	merge_storage_type(Ns, false, Cs, RemoteCs, Force),
2776    MergedCs2 = merge_versions(AnythingNew, MergedCs, RemoteCs, Force),
2777    verify_cstruct(MergedCs2),
2778    MergedCs2.
2779
2780merge_storage_type([N | Ns], AnythingNew, Cs, RemoteCs, Force) ->
2781    Local = mnesia_lib:cs_to_storage_type(N, Cs),
2782    Remote = mnesia_lib:cs_to_storage_type(N, RemoteCs),
2783    case compare_storage_type(true, Local, Remote) of
2784	{same, _Storage} ->
2785	    merge_storage_type(Ns, AnythingNew, Cs, RemoteCs, Force);
2786	{diff, Storage} ->
2787	    Cs2 = change_storage_type(N, Storage, Cs),
2788	    merge_storage_type(Ns, true, Cs2, RemoteCs, Force);
2789	incompatible when Force == true ->
2790	    merge_storage_type(Ns, AnythingNew, Cs, RemoteCs, Force);
2791	Other ->
2792	    Str = io_lib:format("Cannot merge storage type for node ~w "
2793				"in cstruct ~w with remote cstruct ~w (~w)~n",
2794				[N, Cs, RemoteCs, Other]),
2795	    throw(Str)
2796    end;
2797merge_storage_type([], AnythingNew, MergedCs, _RemoteCs, _Force) ->
2798    {AnythingNew, MergedCs}.
2799
2800compare_storage_type(_Retry, Any, Any) ->
2801    {same, Any};
2802compare_storage_type(_Retry, unknown, Any) ->
2803    {diff, Any};
2804compare_storage_type(_Retry, ram_copies, disc_copies) ->
2805    {diff, disc_copies};
2806compare_storage_type(_Retry, disc_copies, disc_only_copies) ->
2807    {diff, disc_only_copies};
2808compare_storage_type(true, One, Another) ->
2809    compare_storage_type(false, Another, One);
2810compare_storage_type(false, _One, _Another) ->
2811    incompatible.
2812
2813change_storage_type(N, ram_copies, Cs) ->
2814    Nodes = [N | Cs#cstruct.ram_copies],
2815    Cs#cstruct{ram_copies = mnesia_lib:uniq(Nodes)};
2816change_storage_type(N, disc_copies, Cs) ->
2817    Nodes = [N | Cs#cstruct.disc_copies],
2818    Cs#cstruct{disc_copies = mnesia_lib:uniq(Nodes)};
2819change_storage_type(N, disc_only_copies, Cs) ->
2820    Nodes = [N | Cs#cstruct.disc_only_copies],
2821    Cs#cstruct{disc_only_copies = mnesia_lib:uniq(Nodes)}.
2822
2823%% BUGBUG: Verify match of frag info; equalit demanded for all but add_node
2824
2825merge_versions(AnythingNew, Cs, RemoteCs, Force) ->
2826    if
2827	Cs#cstruct.name == schema ->
2828	    ok;
2829	Cs#cstruct.name /= schema,
2830	Cs#cstruct.cookie == RemoteCs#cstruct.cookie ->
2831	    ok;
2832	Force == true ->
2833	    ok;
2834	true ->
2835	    Str = io_lib:format("Bad cookies. Cannot merge definitions of "
2836				"table ~w. Local = ~w, Remote = ~w~n",
2837				[Cs#cstruct.name, Cs, RemoteCs]),
2838	    throw(Str)
2839    end,
2840    if
2841	Cs#cstruct.name == RemoteCs#cstruct.name,
2842	Cs#cstruct.type == RemoteCs#cstruct.type,
2843	Cs#cstruct.local_content == RemoteCs#cstruct.local_content,
2844	Cs#cstruct.attributes == RemoteCs#cstruct.attributes,
2845	Cs#cstruct.index == RemoteCs#cstruct.index,
2846	Cs#cstruct.snmp == RemoteCs#cstruct.snmp,
2847	Cs#cstruct.access_mode == RemoteCs#cstruct.access_mode,
2848	Cs#cstruct.load_order == RemoteCs#cstruct.load_order,
2849	Cs#cstruct.user_properties == RemoteCs#cstruct.user_properties ->
2850	    do_merge_versions(AnythingNew, Cs, RemoteCs);
2851	Force == true ->
2852	    do_merge_versions(AnythingNew, Cs, RemoteCs);
2853	true ->
2854	    Str1 = io_lib:format("Cannot merge definitions of "
2855				"table ~w. Local = ~w, Remote = ~w~n",
2856				[Cs#cstruct.name, Cs, RemoteCs]),
2857	    throw(Str1)
2858    end.
2859
2860do_merge_versions(AnythingNew, MergedCs, RemoteCs) ->
2861    {{Major1, Minor1}, _Detail1} = MergedCs#cstruct.version,
2862    {{Major2, Minor2}, _Detail2} = RemoteCs#cstruct.version,
2863    if
2864	MergedCs#cstruct.version == RemoteCs#cstruct.version ->
2865	    MergedCs;
2866	AnythingNew == false ->
2867	    MergedCs;
2868	Major1 == Major2 ->
2869	    Minor = lists:max([Minor1, Minor2]),
2870	    V = {{Major1, Minor}, dummy},
2871	    incr_version(MergedCs#cstruct{version = V});
2872	Major1 /= Major2 ->
2873	    Major = lists:max([Major1, Major2]),
2874	    V = {{Major, 0}, dummy},
2875	    incr_version(MergedCs#cstruct{version = V})
2876    end.
2877
2878announce_im_running([N | Ns], SchemaCs) ->
2879    {L1, L2} = mnesia_recover:connect_nodes([N]),
2880    case lists:member(N, L1) or lists:member(N, L2) of
2881	true ->
2882%%	    dbg_out("Adding ~p to {current db_nodes} ~n", [N]),  %% qqqq
2883	    mnesia_lib:add({current, db_nodes}, N),
2884	    mnesia_controller:add_active_replica(schema, N, SchemaCs);
2885	false ->
2886	    ignore
2887    end,
2888    announce_im_running(Ns, SchemaCs);
2889announce_im_running([], _) ->
2890    [].
2891
2892unannounce_im_running([N | Ns]) ->
2893    mnesia_lib:del({current, db_nodes}, N),
2894    mnesia_controller:del_active_replica(schema, N),
2895    mnesia_recover:disconnect(N),
2896    unannounce_im_running(Ns);
2897unannounce_im_running([]) ->
2898    [].
2899