1%%
2%% %CopyrightBegin%
3%%
4%% Copyright Ericsson AB 1996-2020. All Rights Reserved.
5%%
6%% Licensed under the Apache License, Version 2.0 (the "License");
7%% you may not use this file except in compliance with the License.
8%% You may obtain a copy of the License at
9%%
10%%     http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing, software
13%% distributed under the License is distributed on an "AS IS" BASIS,
14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15%% See the License for the specific language governing permissions and
16%% limitations under the License.
17%%
18%% %CopyrightEnd%
19%%
20
21%%
22%% In this module we provide a number of explicit functions
23%% to maninpulate the schema. All these functions are called
24%% within a special schema transaction.
25%%
26%% We also have an init/1 function defined here, this func is
27%% used by mnesia:start() to initialize the entire schema.
28
29-module(mnesia_schema).
30
31-export([
32         add_backend_type/2,
33	 do_add_backend_type/2,
34	 delete_backend_type/1,
35	 do_delete_backend_type/1,
36	 backend_types/0,
37	 add_index_plugin/3,
38	 do_add_index_plugin/3,
39	 delete_index_plugin/1,
40	 do_delete_index_plugin/1,
41	 index_plugins/0,
42         add_snmp/2,
43         add_table_copy/3,
44         add_table_index/2,
45	 arrange_restore/3,
46         attr_tab_to_pos/2,
47         attr_to_pos/2,
48         change_table_copy_type/3,
49         change_table_access_mode/2,
50         change_table_load_order/2,
51	 change_table_majority/2,
52	 change_table_frag/2,
53%%	 clear_table/1,  %% removed since it is not a schema op anymore
54         create_table/1,
55	 cs2list/1,
56	 vsn_cs2list/1,
57         del_snmp/1,
58         del_table_copy/2,
59         del_table_index/2,
60         delete_cstruct/2,
61         delete_schema/1,
62         delete_schema2/0,
63         delete_table/1,
64         delete_table_property/2,
65         dump_tables/1,
66         ensure_no_schema/1,
67	 get_create_list/1,
68         get_initial_schema/3,
69	 get_table_properties/1,
70         info/0,
71         info/1,
72         init/1,
73	 init_backends/0,
74         insert_cstruct/3,
75	 is_remote_member/1,
76         list2cs/1,
77         list2cs/2,
78         lock_schema/0,
79         merge_schema/0,
80         merge_schema/1,
81         move_table/3,
82	 normalize_cs/2,
83         opt_create_dir/2,
84         prepare_commit/3,
85         purge_dir/2,
86         purge_tmp_files/0,
87         ram_delete_table/2,
88%         ram_delete_table/3,
89	 read_cstructs_from_disc/0,
90         read_nodes/0,
91         remote_read_schema/0,
92	 restore/1,
93         restore/2,
94         restore/3,
95	 schema_coordinator/3,
96	 set_where_to_read/3,
97         transform_table/4,
98         undo_prepare_commit/2,
99         unlock_schema/0,
100         version/0,
101         write_table_property/2
102        ]).
103
104%% Exports for mnesia_frag
105-export([
106	 get_tid_ts_and_lock/2,
107	 make_create_table/1,
108         ensure_active/1,
109	 pick/4,
110	 verify/3,
111	 incr_version/1,
112	 check_keys/3,
113	 check_duplicates/2,
114	 make_delete_table/2
115	]).
116
117%% Needed outside to be able to use/set table_properties
118%% from user (not supported)
119-export([schema_transaction/1,
120	 insert_schema_ops/2,
121	 do_create_table/1,
122	 do_delete_table/1,
123	 do_read_table_property/2,
124	 do_delete_table_property/2,
125         do_write_table_property/2,
126         do_change_table_copy_type/3]).
127
128-include("mnesia.hrl").
129-include_lib("kernel/include/file.hrl").
130
131-import(mnesia_lib, [set/2, del/2, verbose/2, dbg_out/2]).
132
133%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
134%% Here comes the init function which also resides in
135%% this module, it is called upon by the trans server
136%% at startup of the system
137%%
138%% We have a meta table which looks like
139%% {table, schema,
140%%    {type, set},
141%%    {disc_copies, all},
142%%    {arity, 2}
143%%    {attributes, [key, val]}
144%%
145%% This means that we have a series of {schema, Name, Cs} tuples
146%% in a table called schema !!
147
148init(IgnoreFallback) ->
149    Res = read_schema(true, IgnoreFallback),
150    {ok, Source, _CreateList} = exit_on_error(Res),
151    verbose("Schema initiated from: ~p~n", [Source]),
152    set({schema, tables}, []),
153    set({schema, local_tables}, []),
154    do_set_schema(schema),
155    Tabs = set_schema(?ets_first(schema)),
156    lists:foreach(fun(Tab) -> clear_whereabouts(Tab) end, Tabs),
157    set({schema, where_to_read}, node()),
158    set({schema, load_node}, node()),
159    set({schema, load_reason}, initial),
160    mnesia_controller:add_active_replica(schema, node()),
161    init_backends().
162
163
164init_backends() ->
165    Backends = lists:foldl(fun({Alias, Mod}, Acc) ->
166				   orddict:append(Mod, Alias, Acc)
167			   end, orddict:new(), get_ext_types()),
168    [init_backend(Mod, Aliases) || {Mod, Aliases} <- Backends],
169    ok.
170
171init_backend(Mod, [_|_] = Aliases) ->
172    case Mod:init_backend() of
173	ok ->
174	    Mod:add_aliases(Aliases);
175	Error ->
176	    mnesia:abort({backend_init_error, Error})
177    end.
178
179exit_on_error({error, Reason}) ->
180    exit(Reason);
181exit_on_error(GoodRes) ->
182    GoodRes.
183
184%% Local function in order to avoid external function call
185val(Var) ->
186    case ?catch_val_and_stack(Var) of
187	{'EXIT', Stacktrace} -> mnesia_lib:other_val(Var, Stacktrace);
188	Value -> Value
189    end.
190
191%% This function traverses all cstructs in the schema and
192%% sets all values in mnesia_gvar accordingly for each table/cstruct
193
194set_schema('$end_of_table') ->
195    [];
196set_schema(Tab) ->
197    do_set_schema(Tab),
198    [Tab | set_schema(?ets_next(schema, Tab))].
199
200get_create_list(Tab) ->
201    ?ets_lookup_element(schema, Tab, 3).
202
203do_set_schema(Tab) ->
204    List = get_create_list(Tab),
205    Cs = list2cs(List),
206    do_set_schema(Tab, Cs).
207
208do_set_schema(Tab, Cs) ->
209    Type = Cs#cstruct.type,
210    set({Tab, setorbag}, Type),
211    set({Tab, local_content}, Cs#cstruct.local_content),
212    set({Tab, ram_copies}, Cs#cstruct.ram_copies),
213    set({Tab, disc_copies}, Cs#cstruct.disc_copies),
214    set({Tab, disc_only_copies}, Cs#cstruct.disc_only_copies),
215    set({Tab, external_copies}, Cs#cstruct.external_copies),
216    set({Tab, load_order}, Cs#cstruct.load_order),
217    set({Tab, access_mode}, Cs#cstruct.access_mode),
218    set({Tab, majority}, Cs#cstruct.majority),
219    set({Tab, all_nodes}, mnesia_lib:cs_to_nodes(Cs)),
220    set({Tab, snmp}, Cs#cstruct.snmp),
221    set({Tab, user_properties}, Cs#cstruct.user_properties),
222    [set({Tab, user_property, element(1, P)}, P) || P <- Cs#cstruct.user_properties],
223    set({Tab, frag_properties}, Cs#cstruct.frag_properties),
224    mnesia_frag:set_frag_hash(Tab, Cs#cstruct.frag_properties),
225    set({Tab, storage_properties}, Cs#cstruct.storage_properties),
226    set({Tab, attributes}, Cs#cstruct.attributes),
227    Arity = length(Cs#cstruct.attributes) + 1,
228    set({Tab, arity}, Arity),
229    RecName =  Cs#cstruct.record_name,
230    set({Tab, record_name}, RecName),
231    set({Tab, wild_pattern}, wild(RecName, Arity)),
232    set({Tab, index}, [P || {P,_} <- Cs#cstruct.index]),
233    case Cs#cstruct.index of
234        [] ->
235            set({Tab, index_info}, mnesia_index:index_info(Type, []));
236        _ ->
237            ignore
238    end,
239    %% create actual index tabs later
240    set({Tab, cookie}, Cs#cstruct.cookie),
241    set({Tab, version}, Cs#cstruct.version),
242    set({Tab, cstruct}, Cs),
243    Storage = mnesia_lib:schema_cs_to_storage_type(node(), Cs),
244    set({Tab, storage_type}, Storage),
245    set_record_validation(Tab, Storage, RecName, Arity, Type),
246    mnesia_lib:add({schema, tables}, Tab),
247    Ns = mnesia_lib:cs_to_nodes(Cs),
248    case lists:member(node(), Ns) of
249        true ->
250            mnesia_lib:add({schema, local_tables}, Tab);
251        false when Tab == schema ->
252            mnesia_lib:add({schema, local_tables}, Tab);
253        false ->
254            ignore
255    end,
256    set_ext_types(Tab, get_ext_types(), Cs#cstruct.external_copies).
257
258set_record_validation(Tab, {ext,Alias,Mod}, RecName, Arity, Type) ->
259    set({Tab, record_validation}, {RecName, Arity, Type, Alias, Mod});
260set_record_validation(Tab, _, RecName, Arity, Type) ->
261    set({Tab, record_validation}, {RecName, Arity, Type}).
262
263set_ext_types(Tab, ExtTypes, ExtCopies) ->
264    lists:foreach(
265      fun({Type, _} = Key) ->
266	      Nodes = case lists:keyfind(Key, 1, ExtCopies) of
267			  {_, Ns} -> Ns;
268			  false   -> []
269		      end,
270	      set({Tab, Type}, Nodes)
271      end, ExtTypes).
272
273
274wild(RecName, Arity) ->
275    Wp0 = list_to_tuple(lists:duplicate(Arity, '_')),
276    setelement(1, Wp0, RecName).
277
278%% Temporarily read the local schema and return a list
279%% of all nodes mentioned in the schema.DAT file
280read_nodes() ->
281    %% Ensure that we access the intended Mnesia
282    %% directory. This function may not be called
283    %% during startup since it will cause the
284    %% application_controller to get into deadlock
285    case mnesia_lib:ensure_loaded(?APPLICATION) of
286	ok ->
287	    case read_schema(false) of
288		{ok, _Source, CreateList} ->
289		    Cs = list2cs(CreateList),
290		    {ok, Cs#cstruct.disc_copies ++ Cs#cstruct.ram_copies};
291		{error, Reason} ->
292		    {error, Reason}
293	    end;
294	{error, Reason} ->
295	    {error, Reason}
296    end.
297
298%% Returns Version from the tuple {Version,MasterNodes}
299version() ->
300    case read_schema(false) of
301        {ok, Source, CreateList} when Source /= default ->
302	    Cs = list2cs(CreateList),
303            {Version, _Details} = Cs#cstruct.version,
304            Version;
305        _ ->
306            case dir_exists(mnesia_lib:dir()) of
307                true -> {1,0};
308                false -> {0,0}
309            end
310    end.
311
312%% Calculate next table version from old cstruct
313incr_version(Cs) ->
314    {{Major, Minor}, _} = Cs#cstruct.version,
315    Nodes = mnesia_lib:intersect(val({schema, disc_copies}),
316                                 mnesia_lib:cs_to_nodes(Cs)),
317    V =
318        case Nodes -- val({Cs#cstruct.name, active_replicas}) of
319            [] -> {Major + 1, 0};    % All replicas are active
320            _ -> {Major, Minor + 1}  % Some replicas are inactive
321        end,
322    Cs#cstruct{version = {V, {node(), erlang:timestamp()}}}.
323
324%% Returns table name
325insert_cstruct(Tid, Cs, KeepWhereabouts) ->
326    Tab = Cs#cstruct.name,
327    TabDef = cs2list(Cs),
328    Val = {schema, Tab, TabDef},
329    mnesia_checkpoint:tm_retain(Tid, schema, Tab, write),
330    mnesia_subscr:report_table_event(schema, Tid, Val, write),
331    Active = val({Tab, active_replicas}),
332
333    case KeepWhereabouts of
334        true ->
335            ignore;
336        false when Active == [] ->
337            clear_whereabouts(Tab);
338        false ->
339            %% Someone else has initiated table
340            ignore
341    end,
342    set({Tab, cstruct}, Cs),
343    ?ets_insert(schema, Val),
344    do_set_schema(Tab, Cs),
345    Val.
346
347clear_whereabouts(Tab) ->
348    set({Tab, checkpoints}, []),
349    set({Tab, subscribers}, []),
350    set({Tab, where_to_read}, nowhere),
351    set({Tab, active_replicas}, []),
352    set({Tab, commit_work}, []),
353    set({Tab, where_to_write}, []),
354    set({Tab, where_to_commit}, []),
355    set({Tab, load_by_force}, false),
356    set({Tab, load_node}, unknown),
357    set({Tab, load_reason}, unknown).
358
359%% Returns table name
360delete_cstruct(Tid, Cs) ->
361    Tab = Cs#cstruct.name,
362    TabDef = cs2list(Cs),
363    Val = {schema, Tab, TabDef},
364    mnesia_checkpoint:tm_retain(Tid, schema, Tab, delete),
365    mnesia_subscr:report_table_event(schema, Tid, Val, delete),
366    mnesia_controller:update(
367      fun() ->
368	      ?ets_match_delete(mnesia_gvar, {{Tab, '_'}, '_'}),
369	      ?ets_match_delete(mnesia_gvar, {{Tab, '_', '_'}, '_'}),
370	      del({schema, local_tables}, Tab),
371	      del({schema, tables}, Tab),
372	      ?ets_delete(schema, Tab)
373      end),
374    Val.
375
376%% Delete the Mnesia directory on all given nodes
377%% Requires that Mnesia is not running anywhere
378%% Returns ok | {error,Reason}
379delete_schema(Ns) when is_list(Ns), Ns /= [] ->
380    RunningNs = mnesia_lib:running_nodes(Ns),
381    Reason = "Cannot delete schema on all nodes",
382    if
383        RunningNs == [] ->
384	    case rpc:multicall(Ns, ?MODULE, delete_schema2, []) of
385		{Replies, []} ->
386		    case [R || R <- Replies, R /= ok]  of
387			[] ->
388			    ok;
389			BadReplies ->
390			    verbose("~s: ~tp~n", [Reason, BadReplies]),
391			    {error, {"All nodes not running", BadReplies}}
392		    end;
393		{_Replies, BadNs} ->
394                    verbose("~s: ~p~n", [Reason, BadNs]),
395                    {error, {"All nodes not running", BadNs}}
396            end;
397        true ->
398            verbose("~s: ~p~n", [Reason, RunningNs]),
399            {error, {"Mnesia is not stopped everywhere", RunningNs}}
400    end;
401delete_schema(Ns) ->
402    {error, {badarg, Ns}}.
403
404delete_schema2() ->
405    %% Ensure that we access the intended Mnesia
406    %% directory. This function may not be called
407    %% during startup since it will cause the
408    %% application_controller to get into deadlock
409    case mnesia_lib:ensure_loaded(?APPLICATION) of
410	ok ->
411	    case mnesia_lib:is_running() of
412		no ->
413		    Dir = mnesia_lib:dir(),
414		    purge_dir(Dir, []),
415		    ok;
416		_ ->
417		    {error, {"Mnesia still running", node()}}
418	    end;
419	{error, Reason} ->
420	    {error, Reason}
421    end.
422
423ensure_no_schema([H|T]) when is_atom(H) ->
424    case rpc:call(H, ?MODULE, remote_read_schema, []) of
425        {badrpc, Reason} ->
426            {H, {"All nodes not running", H, Reason}};
427        {ok,Source, _} when Source /= default ->
428            {H, {already_exists, H}};
429        _ ->
430            ensure_no_schema(T)
431    end;
432ensure_no_schema([H|_]) ->
433    {error,{badarg, H}};
434ensure_no_schema([]) ->
435    ok.
436
437remote_read_schema() ->
438    %% Ensure that we access the intended Mnesia
439    %% directory. This function may not be called
440    %% during startup since it will cause the
441    %% application_controller to get into deadlock
442    case mnesia_lib:ensure_loaded(?APPLICATION) of
443	ok ->
444	    case mnesia_monitor:get_env(schema_location) of
445		opt_disc ->
446		    read_schema(false);
447		_ ->
448		    read_schema(false)
449	    end;
450	{error, Reason} ->
451	    {error, Reason}
452    end.
453
454dir_exists(Dir) ->
455    dir_exists(Dir, mnesia_monitor:use_dir()).
456dir_exists(Dir, true) ->
457    case file:read_file_info(Dir) of
458        {ok, _} -> true;
459        _ -> false
460    end;
461dir_exists(_Dir, false) ->
462    false.
463
464opt_create_dir(UseDir, Dir) when UseDir == true->
465    case dir_exists(Dir, UseDir) of
466        true ->
467            check_can_write(Dir);
468        false ->
469            case file:make_dir(Dir) of
470                ok ->
471                    verbose("Create Directory ~tp~n", [Dir]),
472                    ok;
473                {error, Reason} ->
474                    verbose("Cannot create mnesia dir ~tp~n", [Reason]),
475                    {error, {"Cannot create Mnesia dir", Dir, Reason}}
476            end
477    end;
478opt_create_dir(false, _) ->
479    {error, {has_no_disc, node()}}.
480
481check_can_write(Dir) ->
482    case file:read_file_info(Dir) of
483        {ok, FI} when FI#file_info.type == directory,
484		      FI#file_info.access == read_write ->
485            ok;
486        {ok, _} ->
487            {error, "Not allowed to write in Mnesia dir", Dir};
488        _ ->
489            {error, "Non existent Mnesia dir", Dir}
490    end.
491
492lock_schema() ->
493    mnesia_lib:lock_table(schema).
494
495unlock_schema() ->
496    mnesia_lib:unlock_table(schema).
497
498read_schema(Keep) ->
499    read_schema(Keep, false).
500
501%% The schema may be read for several reasons.
502%% If Mnesia is not already started the read intention
503%% we normally do not want the ets table named schema
504%% be left around.
505%% If Keep == true, the ets table schema is kept
506%% If Keep == false, the ets table schema is removed
507%%
508%% Returns {ok, Source, SchemaCstruct} or {error, Reason}
509%% Source may be: default | ram | disc | fallback
510
511read_schema(Keep, IgnoreFallback) ->
512    lock_schema(),
513    Res =
514        case mnesia:system_info(is_running) of
515            yes ->
516                {ok, ram, get_create_list(schema)};
517            _IsRunning ->
518                    case mnesia_monitor:use_dir() of
519                        true ->
520                            read_disc_schema(Keep, IgnoreFallback);
521                        false when Keep == true ->
522                            Args = [{keypos, 2}, public, named_table, set],
523                            mnesia_monitor:mktab(schema, Args),
524                            CreateList = get_initial_schema(ram_copies, []),
525                            ?ets_insert(schema,{schema, schema, CreateList}),
526                            {ok, default, CreateList};
527                        false when Keep == false ->
528			    CreateList = get_initial_schema(ram_copies, []),
529                            {ok, default, CreateList}
530                    end
531        end,
532    unlock_schema(),
533    Res.
534
535read_disc_schema(Keep, IgnoreFallback) ->
536    Running = mnesia:system_info(is_running),
537    case mnesia_bup:fallback_exists() of
538        true when IgnoreFallback == false, Running /= yes ->
539             mnesia_bup:fallback_to_schema();
540        _ ->
541            %% If we're running, we read the schema file even
542            %% if fallback exists
543            Dat = mnesia_lib:tab2dat(schema),
544            case mnesia_lib:exists(Dat) of
545                true ->
546                    do_read_disc_schema(Dat, Keep);
547                false ->
548		    Dmp = mnesia_lib:tab2dmp(schema),
549		    case mnesia_lib:exists(Dmp) of
550			true ->
551			    %% May only happen when toggling of
552			    %% schema storage type has been
553			    %% interrupted
554			    do_read_disc_schema(Dmp, Keep);
555			false ->
556			    {error, "No schema file exists"}
557		    end
558            end
559    end.
560
561do_read_disc_schema(Fname, Keep) ->
562    T =
563        case Keep of
564            false ->
565                Args = [{keypos, 2}, public, set],
566                ?ets_new_table(schema, Args);
567            true ->
568                Args = [{keypos, 2}, public, named_table, set],
569                mnesia_monitor:mktab(schema, Args)
570        end,
571    Repair = mnesia_monitor:get_env(auto_repair),
572    Res =  % BUGBUG Fixa till dcl!
573        case mnesia_lib:dets_to_ets(schema, T, Fname, set, Repair, no) of
574            loaded -> {ok, disc, ?ets_lookup_element(T, schema, 3)};
575            Other -> {error, {"Cannot read schema", Fname, Other}}
576        end,
577    case Keep of
578        true -> ignore;
579        false -> ?ets_delete_table(T)
580    end,
581    Res.
582
583get_initial_schema(SchemaStorage, Nodes) ->
584    get_initial_schema(SchemaStorage, Nodes, []).
585
586get_initial_schema(SchemaStorage, Nodes, Properties) ->	%
587    UserProps = initial_schema_properties(Properties),
588    Cs = #cstruct{name = schema,
589		  record_name = schema,
590		  attributes = [table, cstruct],
591		  user_properties = UserProps},
592    Cs2 =
593	case SchemaStorage of
594        ram_copies -> Cs#cstruct{ram_copies = Nodes};
595        disc_copies -> Cs#cstruct{disc_copies = Nodes}
596    end,
597    cs2list(Cs2).
598
599initial_schema_properties(Props0) ->
600    DefaultProps = remove_duplicates(mnesia_monitor:get_env(schema)),
601    Props = lists:foldl(
602	      fun({K,V}, Acc) ->
603		      lists:keystore(K, 1, Acc, {K,V})
604	      end, DefaultProps, remove_duplicates(Props0)),
605    initial_schema_properties_(Props).
606
607initial_schema_properties_([{backend_types, Types}|Props]) ->
608    lists:foreach(fun({Name, Module}) ->
609			  verify_backend_type(Name, Module)
610		  end, Types),
611    [{mnesia_backend_types, Types}|initial_schema_properties_(Props)];
612initial_schema_properties_([{index_plugins, Plugins}|Props]) ->
613    lists:foreach(fun({Name, Module, Function}) ->
614			  verify_index_plugin(Name, Module, Function)
615		  end, Plugins),
616    [{mnesia_index_plugins, Plugins}|initial_schema_properties_(Props)];
617initial_schema_properties_([P|_Props]) ->
618    mnesia:abort({bad_schema_property, P});
619initial_schema_properties_([]) ->
620    [].
621
622remove_duplicates([{K,_} = H|T]) ->
623    [H | remove_duplicates([X || {K1,_} = X <- T,
624				 K1 =/= K])];
625remove_duplicates([]) ->
626    [].
627
628read_cstructs_from_disc() ->
629    %% Assumptions:
630    %% - local schema lock in global
631    %% - use_dir is true
632    %% - Mnesia is not running
633    %% - Ignore fallback
634
635    Fname = mnesia_lib:tab2dat(schema),
636    case mnesia_lib:exists(Fname) of
637	true ->
638	    Args = [{file, Fname},
639		    {keypos, 2},
640		    {repair, mnesia_monitor:get_env(auto_repair)},
641		    {type, set}],
642	    case dets:open_file(make_ref(), Args) of
643		{ok, Tab} ->
644                    ExtTypes = get_ext_types_disc(),
645		    Fun = fun({_, _, List}) ->
646				  {continue, list2cs(List, ExtTypes)}
647			  end,
648		    Cstructs = dets:traverse(Tab, Fun),
649		    dets:close(Tab),
650		    {ok, Cstructs};
651		{error, Reason} ->
652		    {error, Reason}
653	    end;
654	false ->
655	    {error, "No schema file exists"}
656    end.
657
658%% We run a very special type of transactions when we
659%% we want to manipulate the schema.
660
661get_tid_ts_and_lock(Tab, Intent) ->
662    TidTs = get(mnesia_activity_state),
663    case TidTs of
664	{_Mod, Tid, Ts} when is_record(Ts, tidstore)->
665	    Store = Ts#tidstore.store,
666	    case Intent of
667		read -> mnesia_locker:rlock_table(Tid, Store, Tab);
668		write -> mnesia_locker:wlock_table(Tid, Store, Tab);
669		none -> ignore
670	    end,
671	    TidTs;
672	_ ->
673	    mnesia:abort(no_transaction)
674    end.
675
676schema_transaction(Fun) ->
677    case get(mnesia_activity_state) of
678	undefined ->
679	    Args = [self(), Fun, whereis(mnesia_controller)],
680	    Pid = spawn_link(?MODULE, schema_coordinator, Args),
681	    receive
682		{transaction_done, Res, Pid} -> Res;
683		{'EXIT', Pid, R} -> {aborted, {transaction_crashed, R}}
684	    end;
685	_ ->
686            {aborted, nested_transaction}
687    end.
688
689%% This process may dump the transaction log, and should
690%% therefore not be run in an application process
691%%
692schema_coordinator(Client, _Fun, undefined) ->
693    Res = {aborted, {node_not_running, node()}},
694    Client ! {transaction_done, Res, self()},
695    unlink(Client);
696
697schema_coordinator(Client, Fun, Controller) when is_pid(Controller) ->
698    %% Do not trap exit in order to automatically die
699    %% when the controller dies
700    put(transaction_client, Client), %% debug
701    link(Controller),
702    unlink(Client),
703
704    %% Fulfull the transaction even if the client dies
705    Res = mnesia:transaction(Fun),
706    Client ! {transaction_done, Res, self()},
707    unlink(Controller),         % Avoids spurious exit message
708    unlink(whereis(mnesia_tm)), % Avoids spurious exit message
709    exit(normal).
710
711%% The make* rotines return a list of ops, this function
712%% inserts em all in the Store and maintains the local order
713%% of ops.
714
715insert_schema_ops({_Mod, _Tid, Ts}, SchemaIOps) ->
716    do_insert_schema_ops(Ts#tidstore.store, SchemaIOps).
717
718do_insert_schema_ops(Store, [Head | Tail]) ->
719    ?ets_insert(Store, Head),
720    do_insert_schema_ops(Store, Tail);
721do_insert_schema_ops(_Store, []) ->
722    ok.
723
724api_list2cs(List) when is_list(List) ->
725    Name = pick(unknown, name, List, must),
726    Keys = check_keys(Name, List),
727    check_duplicates(Name, Keys),
728    list2cs(List);
729api_list2cs(Other) ->
730    mnesia:abort({badarg, Other}).
731
732vsn_cs2list(Cs) ->
733    cs2list(Cs).
734
735cs2list(false, Cs) ->
736    cs2list(Cs).
737
738cs2list(Cs) when is_record(Cs, cstruct) ->
739    Tags = record_info(fields, cstruct),
740    rec2list(Tags, Tags, 2, Cs);
741cs2list(CreateList) when is_list(CreateList) ->
742    CreateList;
743
744cs2list(Cs) when element(1, Cs) == cstruct, tuple_size(Cs) == 20 ->
745    Tags = [name,type,
746	    ram_copies,disc_copies,disc_only_copies,external_copies,
747	    load_order,access_mode,majority,index,snmp,local_content,
748	    record_name,attributes,
749	    user_properties,frag_properties,storage_properties,
750	    cookie,version],
751    rec2list(Tags, Tags, 2, Cs);
752%% since vsn-4.6 (protocol 8.2 or older)
753cs2list(Cs) when element(1, Cs) == cstruct, tuple_size(Cs) == 19 ->
754    Tags = [name,type,ram_copies,disc_copies,disc_only_copies,
755	    load_order,access_mode,majority,index,snmp,local_content,
756	    record_name,attributes,
757	    user_properties,frag_properties,storage_properties,
758	    cookie,version],
759    rec2list(Tags, Tags, 2, Cs).
760
761rec2list([index | Tags], [index|Orig], Pos, Rec) ->
762    Val = element(Pos, Rec),
763    [{index, lists:map(
764	       fun({_, _Type}=P) -> P;
765		  (P) when is_integer(P); is_atom(P) -> {P, ordered}
766	       end, Val)} | rec2list(Tags, Orig, Pos + 1, Rec)];
767rec2list([external_copies | Tags], Orig0, Pos, Rec) ->
768    Orig = case Orig0 of
769	       [external_copies|Rest] -> Rest;
770	       _ -> Orig0
771	   end,
772    Val = element(Pos, Rec),
773    [{Alias, Ns} || {{Alias,_}, Ns} <- Val]
774	++ rec2list(Tags, Orig, Pos+1, Rec);
775rec2list([Tag | Tags], [Tag | Orig], Pos, Rec) ->
776    Val = element(Pos, Rec),
777    [{Tag, Val} | rec2list(Tags, Orig, Pos + 1, Rec)];
778rec2list([], _, _Pos, _Rec) ->
779    [];
780rec2list(Tags, [_|Orig], Pos, Rec) ->
781    rec2list(Tags, Orig, Pos+1, Rec).
782
783normalize_cs(Cstructs, _Node) ->
784    Cstructs.
785
786list2cs(List) ->
787    list2cs(List, get_ext_types()).
788
789list2cs(List, ExtTypes) when is_list(List) ->
790    Name = pick(unknown, name, List, must),
791    Type = pick(Name, type, List, set),
792    Rc0 = pick(Name, ram_copies, List, []),
793    Dc = pick(Name, disc_copies, List, []),
794    Doc = pick(Name, disc_only_copies, List, []),
795
796    Ext = pick_external_copies(List, ExtTypes),
797    Rc = case {Rc0, Dc, Doc, Ext} of
798             {[], [], [], []} -> [node()];
799             _ -> Rc0
800         end,
801    LC = pick(Name, local_content, List, false),
802    RecName = pick(Name, record_name, List, Name),
803    Attrs = pick(Name, attributes, List, [key, val]),
804    Snmp = pick(Name, snmp, List, []),
805    LoadOrder = pick(Name, load_order, List, 0),
806    AccessMode = pick(Name, access_mode, List, read_write),
807    Majority = pick(Name, majority, List, false),
808    UserProps = pick(Name, user_properties, List, []),
809    verify({alt, [nil, list]}, mnesia_lib:etype(UserProps),
810	   {bad_type, Name, {user_properties, UserProps}}),
811    Cookie = pick(Name, cookie, List, ?unique_cookie),
812    Version = pick(Name, version, List, {{2, 0}, []}),
813    Ix = pick(Name, index, List, []),
814    verify({alt, [nil, list]}, mnesia_lib:etype(Ix),
815	   {bad_type, Name, {index, [Ix]}}),
816    Frag = pick(Name, frag_properties, List, []),
817    verify({alt, [nil, list]}, mnesia_lib:etype(Frag),
818	   {badarg, Name, {frag_properties, Frag}}),
819
820    BEProps = pick(Name, storage_properties, List, []),
821    verify({alt, [nil, list]}, mnesia_lib:etype(Ix),
822	   {badarg, Name, {storage_properties, BEProps}}),
823    CheckProp = fun(Opt, Opts) when is_atom(Opt) ->
824			lists:member(Opt, Opts)
825			    andalso mnesia:abort({badarg, Name, Opt});
826		   (Tuple, Opts) when is_tuple(Tuple) ->
827			lists:member(element(1,Tuple), Opts)
828			    andalso mnesia:abort({badarg, Name, Tuple});
829		   (What,_) ->
830			mnesia:abort({badarg, Name, What})
831		end,
832    BadEtsOpts = [set, ordered_set, bag, duplicate_bag,
833		  public, private, protected,
834		  keypos, named_table],
835    EtsOpts = proplists:get_value(ets, BEProps, []),
836    is_list(EtsOpts) orelse mnesia:abort({badarg, Name, {ets, EtsOpts}}),
837    [CheckProp(Prop, BadEtsOpts) || Prop <- EtsOpts],
838    BadDetsOpts = [type, keypos, repair, access, file],
839    DetsOpts = proplists:get_value(dets, BEProps, []),
840    is_list(DetsOpts) orelse mnesia:abort({badarg, Name, {dets, DetsOpts}}),
841    [CheckProp(Prop, BadDetsOpts) || Prop <- DetsOpts],
842
843    case whereis(mnesia_controller) of
844        undefined ->
845            %% check_keys/2 cannot be executed when mnesia is not
846            %% running, due to it not being possible to read what ext
847            %% backends are loaded.
848            %% this doesn't work - disabled for now:
849            %%Keys = check_keys(Name, List, record_info(fields, cstruct)),
850            %%check_duplicates(Name, Keys)
851            ignore;
852        Pid when is_pid(Pid) ->
853            Keys = check_keys(Name, List),
854            check_duplicates(Name, Keys)
855    end,
856
857    Cs0 = #cstruct{name = Name,
858		   ram_copies = Rc,
859		   disc_copies = Dc,
860		   disc_only_copies = Doc,
861		   external_copies = Ext,
862		   type = Type,
863		   index = Ix,
864		   snmp = Snmp,
865		   load_order = LoadOrder,
866		   access_mode = AccessMode,
867		   majority = Majority,
868		   local_content = LC,
869		   record_name = RecName,
870		   attributes = Attrs,
871		   user_properties = lists:sort(UserProps),
872		   frag_properties = lists:sort(Frag),
873                   storage_properties = lists:sort(BEProps),
874		   cookie = Cookie,
875		   version = Version},
876    case Ix of
877	[] -> Cs0;
878	[_|_] ->
879	    Ix2 = expand_index_attrs(Cs0),
880	    Cs0#cstruct{index = Ix2}
881    end;
882list2cs(Other, _ExtTypes) ->
883    mnesia:abort({badarg, Other}).
884
885pick(Tab, Key, List, Default) ->
886    case lists:keysearch(Key, 1, List) of
887        false  when Default == must ->
888            mnesia:abort({badarg, Tab, "Missing key", Key, List});
889        false ->
890            Default;
891        {value, {Key, Value}} ->
892            Value;
893	{value, BadArg} ->
894	    mnesia:abort({bad_type, Tab, BadArg})
895    end.
896
897pick_external_copies(_List, []) ->
898    [];
899pick_external_copies(List, ExtTypes) ->
900    lists:foldr(
901      fun({K, Val}, Acc) ->
902	      case lists:keyfind(K, 1, ExtTypes) of
903		  false ->
904		      Acc;
905		  {_, Mod} ->
906		      [{{K,Mod}, Val}|Acc]
907	      end
908      end, [], List).
909
910expand_storage_type(S) when S==ram_copies;
911			    S==disc_copies;
912			    S==disc_only_copies ->
913    S;
914expand_storage_type(S) ->
915    case lists:keyfind(S, 1, get_ext_types()) of
916	false ->
917	    mnesia:abort({bad_type, {storage_type, S}});
918	{Alias, Mod} ->
919	    {ext, Alias, Mod}
920    end.
921
922get_ext_types() ->
923    get_schema_user_property(mnesia_backend_types).
924
925get_index_plugins() ->
926    get_schema_user_property(mnesia_index_plugins).
927
928get_schema_user_property(Key) ->
929    case dirty_read_table_property(schema, Key) of
930	undefined ->  [];
931	{_, Types} -> Types
932    end.
933
934get_ext_types_disc() ->
935    try get_ext_types_disc_()
936    catch
937	error:_ ->[]
938    end.
939
940get_ext_types_disc_() ->
941    case mnesia_schema:remote_read_schema() of
942        {ok, _, Prop} ->
943            K1 = user_properties,
944            case lists:keyfind(K1, 1, Prop) of
945                {K1, UserProp} ->
946                    K2 = mnesia_backend_types,
947                    case lists:keyfind(K2, 1, UserProp) of
948                        {K2, Types} ->
949                            Types;
950                        _ ->
951                            []
952                    end;
953                _ ->
954                    []
955            end;
956        _ ->
957            []
958    end.
959
960%% Convert attribute name to integer if neccessary
961attr_tab_to_pos(_Tab, Pos) when is_integer(Pos) ->
962    Pos;
963attr_tab_to_pos(Tab, Attr) ->
964    attr_to_pos(Attr, val({Tab, attributes})).
965
966%% Convert attribute name to integer if neccessary
967attr_to_pos({_} = P, _) -> P;
968attr_to_pos(Pos, _Attrs) when is_integer(Pos) ->
969    Pos;
970attr_to_pos(Attr, Attrs) when is_atom(Attr) ->
971    attr_to_pos(Attr, Attrs, 2);
972attr_to_pos(Attr, _) ->
973    mnesia:abort({bad_type, Attr}).
974
975attr_to_pos(Attr, [Attr | _Attrs], Pos) ->
976    Pos;
977attr_to_pos(Attr, [_ | Attrs], Pos) ->
978    attr_to_pos(Attr, Attrs, Pos + 1);
979attr_to_pos(Attr, _, _) ->
980    mnesia:abort({bad_type, Attr}).
981
982check_keys(Tab, Attrs) ->
983    Types = [T || {T,_} <- get_ext_types()],
984    check_keys(Tab, Attrs, Types ++ record_info(fields, cstruct)).
985
986check_keys(Tab, [{Key, _Val} | Tail], Items) ->
987    Key1 = if
988               is_tuple(Key) ->
989                   element(1, Key);
990               true ->
991                   Key
992           end,
993    case lists:member(Key1, Items) of
994        true ->  [Key | check_keys(Tab, Tail, Items)];
995        false -> mnesia:abort({badarg, Tab, Key})
996    end;
997check_keys(_, [], _) ->
998    [];
999check_keys(Tab, Arg, _) ->
1000    mnesia:abort({badarg, Tab, Arg}).
1001
1002check_duplicates(Tab, Keys) ->
1003    case has_duplicates(Keys) of
1004        false -> ok;
1005        true -> mnesia:abort({badarg, Tab, "Duplicate keys", Keys})
1006    end.
1007
1008has_duplicates([H | T]) ->
1009    case lists:member(H, T) of
1010        true -> true;
1011        false -> has_duplicates(T)
1012    end;
1013has_duplicates([]) ->
1014    false.
1015
1016%% This is the only place where we check the validity of data
1017
1018verify_cstruct(#cstruct{} = Cs) ->
1019    assert_correct_cstruct(Cs),
1020    Cs1 = verify_external_copies(
1021	    Cs#cstruct{index = expand_index_attrs(Cs)}),
1022    assert_correct_cstruct(Cs1),
1023    Cs1.
1024
1025expand_index_attrs(#cstruct{index = Ix, attributes = Attrs,
1026			    name = Tab} = Cs) ->
1027    Prefered = prefered_index_types(Cs),
1028    expand_index_attrs(Ix, Tab, Attrs, Prefered).
1029
1030expand_index_attrs(Ix, Tab, Attrs, Prefered) ->
1031    lists:map(fun(P) when is_integer(P); is_atom(P) ->
1032		      {attr_to_pos(P, Attrs), Prefered};
1033		 ({A} = P) when is_atom(A) ->
1034		      {P, Prefered};
1035		 ({P, Type}) ->
1036		      {attr_to_pos(P, Attrs), Type};
1037		 (_Other) ->
1038		      mnesia:abort({bad_type, Tab, {index, Ix}})
1039	      end, Ix).
1040
1041prefered_index_types(#cstruct{external_copies = Ext}) ->
1042    ExtTypes = [mnesia_lib:semantics(S, index_types) ||
1043		   {S,Ns} <- Ext, Ns =/= []],
1044    case intersect_types(ExtTypes) of
1045	[] -> ordered;
1046	[Pref|_] -> Pref
1047    end.
1048
1049intersect_types([]) ->
1050    [];
1051intersect_types([S1, S2|Rest]) ->
1052    intersect_types([S1 -- (S1 -- S2)|Rest]);
1053intersect_types([S]) ->
1054    S.
1055
1056verify_external_copies(#cstruct{external_copies = []} = Cs) ->
1057    Cs;
1058verify_external_copies(#cstruct{name = Tab, external_copies = EC} = Cs) ->
1059    Bad = {bad_type, Tab, {external_copies, EC}},
1060    AllECNodes = lists:concat([Ns || {_, Ns} <- EC,
1061                                     is_list(Ns)]),
1062    verify(true, length(lists:usort(AllECNodes)) == length(AllECNodes), Bad),
1063    CsL = cs2list(Cs),
1064    CsL1 = lists:foldl(
1065	     fun({{Alias, Mod}, Ns} = _X, CsLx) ->
1066		     BadTab = fun(Why) ->
1067				      {Why, Tab, {{ext, Alias, Mod},Ns}}
1068			      end,
1069		     verify(atom, mnesia_lib:etype(Mod), BadTab),
1070		     verify(true, fun() ->
1071					  lists:all(fun is_atom/1, Ns)
1072				  end, BadTab),
1073		     check_semantics(Mod, Alias, BadTab, Cs),
1074		     try Mod:check_definition(Alias, Tab, Ns, CsLx) of
1075			 ok ->
1076			     CsLx;
1077			 {ok, CsLx1} ->
1078			     CsLx1;
1079			 {error, Reason} ->
1080			     mnesia:abort(BadTab(Reason))
1081		     catch
1082			 error:E ->
1083			     mnesia:abort(BadTab(E))
1084		     end;
1085		(_, CsLx) ->
1086		     CsLx
1087	     end, CsL, EC),
1088    list2cs(CsL1).
1089
1090check_semantics(Mod, Alias, BadTab, #cstruct{type = Type}) ->
1091    Ext = {ext, Alias, Mod},
1092    case lists:member(mnesia_lib:semantics(Ext, storage), [ram_copies, disc_copies,
1093							   disc_only_copies]) of
1094	false -> mnesia:abort(BadTab(invalid_storage));
1095	true  -> ok
1096    end,
1097    case lists:member(Type, mnesia_lib:semantics(Ext, types)) of
1098	false -> mnesia:abort(BadTab(bad_type));
1099	true  -> ok
1100    end.
1101
1102assert_correct_cstruct(Cs) when is_record(Cs, cstruct) ->
1103    verify_nodes(Cs),
1104
1105    Tab = Cs#cstruct.name,
1106    verify(atom, mnesia_lib:etype(Tab), {bad_type, Tab}),
1107    Type = Cs#cstruct.type,
1108    verify(true, lists:member(Type, [set, bag, ordered_set]),
1109	   {bad_type, Tab, {type, Type}}),
1110
1111    %% Currently ordered_set is not supported for disk_only_copies.
1112    if
1113 	Type == ordered_set, Cs#cstruct.disc_only_copies /= [] ->
1114	    mnesia:abort({bad_type, Tab, {not_supported, Type, disc_only_copies}});
1115	true ->
1116	    ok
1117    end,
1118
1119    RecName = Cs#cstruct.record_name,
1120    verify(atom, mnesia_lib:etype(RecName),
1121	   {bad_type, Tab, {record_name, RecName}}),
1122
1123    Attrs = Cs#cstruct.attributes,
1124    verify(list, mnesia_lib:etype(Attrs),
1125	   {bad_type, Tab, {attributes, Attrs}}),
1126
1127    Arity = length(Attrs) + 1,
1128    verify(true, Arity > 2, {bad_type, Tab, {attributes, Attrs}}),
1129
1130    lists:foldl(fun(Attr,_Other) when Attr == snmp ->
1131                        mnesia:abort({bad_type, Tab, {attributes, [Attr]}});
1132                   (Attr,Other) ->
1133                        verify(atom, mnesia_lib:etype(Attr),
1134                               {bad_type, Tab, {attributes, [Attr]}}),
1135                        verify(false, lists:member(Attr, Other),
1136                               {combine_error, Tab, {attributes, [Attr | Other]}}),
1137                        [Attr | Other]
1138                end,
1139                [],
1140                Attrs),
1141
1142    Index = Cs#cstruct.index,
1143
1144    verify({alt, [nil, list]}, mnesia_lib:etype(Index),
1145	   {bad_type, Tab, {index, Index}}),
1146    IxPlugins = get_index_plugins(),
1147
1148    AllowIndexOnKey = check_if_allow_index_on_key(),
1149    IxFun =
1150	fun(Pos) ->
1151		verify(
1152		  true, fun() ->
1153				I = index_pos(Pos),
1154				case Pos of
1155				    {_, T} ->
1156					(T==bag orelse T==ordered)
1157					    andalso good_ix_pos(
1158						      I, AllowIndexOnKey,
1159						      Arity, IxPlugins);
1160				    _ ->
1161					good_ix_pos(Pos, AllowIndexOnKey,
1162						    Arity, IxPlugins)
1163				end
1164			end,
1165		  {bad_type, Tab, {index, [Pos]}})
1166	end,
1167    lists:foreach(IxFun, Index),
1168
1169    LC = Cs#cstruct.local_content,
1170    verify({alt, [true, false]}, LC,
1171	   {bad_type, Tab, {local_content, LC}}),
1172    Access = Cs#cstruct.access_mode,
1173    verify({alt, [read_write, read_only]}, Access,
1174	   {bad_type, Tab, {access_mode, Access}}),
1175    Majority = Cs#cstruct.majority,
1176    verify({alt, [true, false]}, Majority,
1177	   {bad_type, Tab, {majority, Majority}}),
1178    case Majority of
1179	true ->
1180	    verify(false, LC,
1181		   {combine_error, Tab, [{local_content,true},{majority,true}]});
1182	false ->
1183	    ok
1184    end,
1185    Snmp = Cs#cstruct.snmp,
1186    verify(true, mnesia_snmp_hook:check_ustruct(Snmp),
1187	   {badarg, Tab, {snmp, Snmp}}),
1188
1189    CheckProp = fun(Prop) when is_tuple(Prop), size(Prop) >= 1 -> ok;
1190		   (Prop) ->
1191			mnesia:abort({bad_type, Tab,
1192				      {user_properties, [Prop]}})
1193		end,
1194    lists:foreach(CheckProp, Cs#cstruct.user_properties),
1195
1196    case Cs#cstruct.cookie of
1197	{{MegaSecs, Secs, MicroSecs}, _Node}
1198	when is_integer(MegaSecs), is_integer(Secs),
1199	     is_integer(MicroSecs), is_atom(node) ->
1200            ok;
1201        Cookie ->
1202            mnesia:abort({bad_type, Tab, {cookie, Cookie}})
1203    end,
1204    case Cs#cstruct.version of
1205        {{Major, Minor}, _Detail}
1206                when is_integer(Major), is_integer(Minor) ->
1207            ok;
1208        Version ->
1209            mnesia:abort({bad_type, Tab, {version, Version}})
1210    end.
1211
1212good_ix_pos({_} = P, _, _, Plugins) ->
1213    lists:keymember(P, 1, Plugins);
1214good_ix_pos(I, true, Arity, _) when is_integer(I) ->
1215    I >= 0 andalso I =< Arity;
1216good_ix_pos(I, false, Arity, _) when is_integer(I) ->
1217    I > 2 andalso I =< Arity;
1218good_ix_pos(_, _, _, _) ->
1219    false.
1220
1221
1222check_if_allow_index_on_key() ->
1223    case mnesia_monitor:get_env(allow_index_on_key) of
1224	true ->
1225	    true;
1226	_ ->
1227	    false
1228    end.
1229
1230verify_nodes(Cs) ->
1231    Tab = Cs#cstruct.name,
1232    Ram = Cs#cstruct.ram_copies,
1233    Disc = Cs#cstruct.disc_copies,
1234    DiscOnly = Cs#cstruct.disc_only_copies,
1235    Ext = lists:append([Ns || {_,Ns} <- Cs#cstruct.external_copies]),
1236    LoadOrder = Cs#cstruct.load_order,
1237
1238    verify({alt, [nil, list]}, mnesia_lib:etype(Ram),
1239	   {bad_type, Tab, {ram_copies, Ram}}),
1240    verify({alt, [nil, list]}, mnesia_lib:etype(Disc),
1241	   {bad_type, Tab, {disc_copies, Disc}}),
1242    lists:foreach(
1243      fun({BE, Ns}) ->
1244	      verify({alt, [nil, list]}, mnesia_lib:etype(Ns),
1245		     {bad_type, Tab, {BE, Ns}}),
1246	      lists:foreach(fun(N) ->
1247				    verify(atom, mnesia_lib:etype(N),
1248					   {bad_type, Tab, {BE, Ns}})
1249			    end, Ns)
1250      end, Cs#cstruct.external_copies),
1251    case Tab of
1252	schema ->
1253	    verify([], DiscOnly, {bad_type, Tab, {disc_only_copies, DiscOnly}});
1254	_ ->
1255	    verify({alt, [nil, list]},
1256		   mnesia_lib:etype(DiscOnly),
1257		   {bad_type, Tab, {disc_only_copies, DiscOnly}})
1258    end,
1259    verify(integer, mnesia_lib:etype(LoadOrder),
1260	   {bad_type, Tab, {load_order, LoadOrder}}),
1261
1262    Nodes = Ram ++ Disc ++ DiscOnly ++ Ext,
1263    verify(list, mnesia_lib:etype(Nodes),
1264	   {combine_error, Tab,
1265	    [{ram_copies, []}, {disc_copies, []},
1266	     {disc_only_copies, []}, {external_copies, []}]}),
1267    verify(false, has_duplicates(Nodes), {combine_error, Tab, Nodes}),
1268    AtomCheck = fun(N) ->
1269			verify(atom, mnesia_lib:etype(N), {bad_type, Tab, N})
1270		end,
1271    lists:foreach(AtomCheck, Nodes).
1272
1273verify(Expected, Fun, Error) when is_function(Fun) ->
1274    do_verify(Expected, ?CATCH(Fun()), Error);
1275verify(Expected, Actual, Error) ->
1276    do_verify(Expected, Actual, Error).
1277
1278do_verify({alt, Values}, Value, Error) ->
1279    case lists:member(Value, Values) of
1280        true -> ok;
1281        false -> mnesia:abort(Error)
1282    end;
1283do_verify(Value, Value, _) ->
1284    ok;
1285do_verify(_Value, _, Error) ->
1286     mnesia:abort(Error).
1287
1288ensure_writable(Tab) ->
1289    case val({Tab, where_to_write}) of
1290        [] -> mnesia:abort({read_only, Tab});
1291        _ -> ok
1292    end.
1293
1294%% Ensure that all replicas on disk full nodes are active
1295ensure_active(Cs) ->
1296    ensure_active(Cs, active_replicas).
1297
1298ensure_active(Cs, What) ->
1299    Tab = Cs#cstruct.name,
1300    W = {Tab, What},
1301    ensure_non_empty(W),
1302    Nodes = mnesia_lib:intersect(val({schema, disc_copies}),
1303                                 mnesia_lib:cs_to_nodes(Cs)),
1304    case Nodes -- val(W) of
1305        [] ->
1306            ok;
1307        Ns ->
1308            Expl = "All replicas on diskfull nodes are not active yet",
1309            case val({Tab, local_content}) of
1310                true ->
1311		    case rpc:multicall(Ns, ?MODULE, is_remote_member, [W]) of
1312			{Replies, []} ->
1313			    check_active(Replies, Expl, Tab);
1314			{_Replies, BadNs} ->
1315			    mnesia:abort({not_active, Expl, Tab, BadNs})
1316                    end;
1317                false ->
1318                    mnesia:abort({not_active, Expl, Tab, Ns})
1319            end
1320    end.
1321
1322ensure_non_empty({Tab, Vhat}) ->
1323       case val({Tab, Vhat}) of
1324        [] -> mnesia:abort({no_exists, Tab});
1325        _ -> ok
1326    end.
1327
1328ensure_not_active(Tab = schema, Node) ->
1329    Active = val({Tab, active_replicas}),
1330    case lists:member(Node, Active) of
1331	false when Active =/= [] ->
1332	    ok;
1333	false ->
1334	    mnesia:abort({no_exists, Tab});
1335	true ->
1336	    Expl = "Mnesia is running",
1337	    mnesia:abort({active, Expl, Node})
1338    end.
1339
1340is_remote_member(Key) ->
1341    IsActive = lists:member(node(), val(Key)),
1342    {IsActive, node()}.
1343
1344check_active([{true, _Node} | Replies], Expl, Tab) ->
1345    check_active(Replies, Expl, Tab);
1346check_active([{false, Node} | _Replies], Expl, Tab) ->
1347    mnesia:abort({not_active, Expl, Tab, [Node]});
1348check_active([{badrpc, Reason} | _Replies], Expl, Tab) ->
1349    mnesia:abort({not_active, Expl, Tab, Reason});
1350check_active([], _Expl, _Tab) ->
1351    ok.
1352
1353
1354%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1355%% Function for definining an external backend type
1356
1357add_backend_type(Name, Module) ->
1358    case schema_transaction(fun() -> do_add_backend_type(Name, Module) end) of
1359	{atomic, NeedsInit} ->
1360	    case NeedsInit of
1361		true ->
1362		    Module:init_backend();
1363		false ->
1364		    ignore
1365	    end,
1366	    Module:add_aliases([Name]),
1367	    {atomic, ok};
1368	Other ->
1369	    Other
1370    end.
1371
1372do_add_backend_type(Name, Module) ->
1373    verify_backend_type(Name, Module),
1374    Types = case do_read_table_property(schema, mnesia_backend_types) of
1375		undefined ->
1376		    [];
1377		{_, Ts} ->
1378		    case lists:keymember(Name, 1, Ts) of
1379			true ->
1380			    mnesia:abort({backend_type_already_exists, Name});
1381			false ->
1382			    Ts
1383		    end
1384	    end,
1385    ModuleRegistered = lists:keymember(Module, 2, Types),
1386    do_write_table_property(schema, {mnesia_backend_types,
1387				     [{Name, Module}|Types]}),
1388    not ModuleRegistered.
1389
1390delete_backend_type(Name) ->
1391    schema_transaction(fun() -> do_delete_backend_type(Name) end).
1392
1393do_delete_backend_type(Name) ->
1394    case do_read_table_property(schema, mnesia_backend_types) of
1395	undefined ->
1396	    [];
1397	{_, Ts} ->
1398	    case lists:keyfind(Name, 1, Ts) of
1399		{_, Mod} ->
1400		    case using_backend_type(Name, Mod) of
1401			[_|_] = Tabs ->
1402			    mnesia:abort({backend_in_use, {Name, Tabs}});
1403			[] ->
1404			    do_write_table_property(
1405			      schema, {mnesia_backend_types,
1406				       lists:keydelete(Name, 1, Ts)})
1407		    end;
1408		false ->
1409		    mnesia:abort({no_such_backend, Name})
1410	    end
1411    end.
1412
1413using_backend_type(Name, Mod) ->
1414    Ext = ets:select(mnesia_gvar,
1415		     [{ {{'$1',external_copies},'$2'}, [], [{{'$1','$2'}}] }]),
1416    Entry = {Name, Mod},
1417    [T || {T,C} <- Ext,
1418	  lists:keymember(Entry, 1, C)].
1419
1420verify_backend_type(Name, Module) ->
1421    case legal_backend_name(Name) of
1422	false ->
1423	    mnesia:abort({bad_type, {backend_type,Name,Module}});
1424	true ->
1425	    ok
1426    end,
1427    ExpectedExports = mnesia_backend_type:behaviour_info(callbacks),
1428    Exports = try Module:module_info(exports)
1429              catch
1430                  error:_ ->
1431                      mnesia:abort({undef_backend, Module})
1432              end,
1433    case ExpectedExports -- Exports of
1434        [] ->
1435	    ok;
1436        _Other ->
1437	    io:fwrite(user, "Missing backend_type exports: ~tp~n", [_Other]),
1438            mnesia:abort({bad_type, {backend_type,Name,Module}})
1439    end.
1440
1441legal_backend_name(Name) ->
1442    is_atom(Name) andalso
1443                    (not lists:member(Name, record_info(fields, cstruct))).
1444
1445%% Used e.g. by mnesia:system_info(backend_types).
1446backend_types() ->
1447    [ram_copies, disc_copies, disc_only_copies |
1448     [T || {T,_} <- get_ext_types()]].
1449
1450add_index_plugin(Name, Module, Function) ->
1451    schema_transaction(
1452      fun() -> do_add_index_plugin(Name, Module, Function) end).
1453
1454do_add_index_plugin(Name, Module, Function) ->
1455    verify_index_plugin(Name, Module, Function),
1456    Plugins = case do_read_table_property(schema, mnesia_index_plugins) of
1457		  undefined ->
1458		      [];
1459		  {_, Ps} ->
1460		      case lists:keymember(Name, 1, Ps) of
1461			  true ->
1462			      mnesia:abort({index_plugin_already_exists, Name});
1463			  false ->
1464			      Ps
1465		      end
1466	      end,
1467    do_write_table_property(schema, {mnesia_index_plugins,
1468				     [{Name, Module, Function}|Plugins]}).
1469
1470delete_index_plugin(P) ->
1471    schema_transaction(
1472      fun() -> do_delete_index_plugin(P) end).
1473
1474do_delete_index_plugin({A} = P) when is_atom(A) ->
1475    Plugins = get_index_plugins(),
1476    case lists:keyfind(P, 1, Plugins) of
1477	false ->
1478	    mnesia:abort({no_exists, {index_plugin, P}});
1479	_Found ->
1480	    case ets:select(mnesia_gvar,
1481			    [{ {{'$1',{index,{P,'_'}}},'_'},[],['$1']},
1482			     { {{'$1',{index,P}},'_'},[],['$1']}], 1) of
1483		{[_], _} ->
1484		    mnesia:abort({plugin_in_use, P});
1485		'$end_of_table' ->
1486		    do_write_table_property(
1487		      schema, {mnesia_index_plugins,
1488			       lists:keydelete(P, 1, Plugins)})
1489	    end
1490    end.
1491
1492verify_index_plugin({A} = Name, Module, Function)
1493  when is_atom(A), is_atom(Module), is_atom(Function) ->
1494    case code:ensure_loaded(Module) of
1495	{error, nofile} ->
1496	    mnesia:abort({bad_type, {index_plugin,Name,Module,Function}});
1497	{module,_} ->
1498	    %% Index plugins are called as Module:Function(Tab, Pos, Obj)
1499	    case erlang:function_exported(Module, Function, 3) of
1500		true ->
1501		    ok;
1502		false ->
1503		    mnesia:abort(
1504		      {bad_type, {index_plugin,Name,Module,Function}})
1505	    end
1506    end;
1507verify_index_plugin(Name, Module, Function) ->
1508    mnesia:abort({bad_type, {index_plugin,Name,Module,Function}}).
1509
1510
1511%% Used e.g. by mnesia:system_info(backend_types).
1512index_plugins() ->
1513    get_index_plugins().
1514
1515%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1516%% Here's the real interface function to create a table
1517
1518create_table([_|_] = TabDef) ->
1519    schema_transaction(fun() -> do_multi_create_table(TabDef) end);
1520create_table(Arg) -> {aborted, {badarg, Arg}}.
1521
1522%% And the corresponding do routines ....
1523
1524do_multi_create_table(TabDef) ->
1525    get_tid_ts_and_lock(schema, write),
1526    ensure_writable(schema),
1527    do_create_table(TabDef),
1528    ok.
1529
1530do_create_table(TabDef) when is_list(TabDef) ->
1531    Cs = api_list2cs(TabDef),
1532    case Cs#cstruct.frag_properties of
1533	[] ->
1534	    do_create_table_1(Cs);
1535	_Props ->
1536	    CsList = mnesia_frag:expand_cstruct(Cs),
1537	    lists:foreach(fun do_create_table_1/1, CsList)
1538    end.
1539
1540do_create_table_1(Cs) ->
1541    {_Mod, _Tid, Ts} =  get_tid_ts_and_lock(schema, none),
1542    Store = Ts#tidstore.store,
1543    do_insert_schema_ops(Store, make_create_table(Cs)).
1544
1545make_create_table(Cs) ->
1546    Tab = Cs#cstruct.name,
1547    verify(false, check_if_exists(Tab), {already_exists, Tab}),
1548    unsafe_make_create_table(Cs).
1549
1550unsafe_make_create_table(Cs0) ->
1551    {_Mod, Tid, Ts} =  get_tid_ts_and_lock(schema, none),
1552    Cs = verify_cstruct(Cs0),
1553    Tab = Cs#cstruct.name,
1554
1555    %% Check that we have all disc replica nodes running
1556    DiscNodes = Cs#cstruct.disc_copies ++ Cs#cstruct.disc_only_copies,
1557    RunningNodes = val({current, db_nodes}),
1558    CheckDisc = fun(N) ->
1559			verify(true, lists:member(N, RunningNodes),
1560			       {not_active, Tab, N})
1561		end,
1562    lists:foreach(CheckDisc, DiscNodes),
1563
1564    Nodes = mnesia_lib:intersect(mnesia_lib:cs_to_nodes(Cs), RunningNodes),
1565    Store = Ts#tidstore.store,
1566    mnesia_locker:wlock_no_exist(Tid, Store, Tab, Nodes),
1567    [{op, create_table, vsn_cs2list(Cs)}].
1568
1569check_if_exists(Tab) ->
1570    TidTs = get_tid_ts_and_lock(schema, write),
1571    {_, _, Ts} = TidTs,
1572    Store = Ts#tidstore.store,
1573    ets:foldl(
1574      fun({op, create_table, [{name, T}|_]}, _Acc) when T==Tab ->
1575	      true;
1576	 ({op, delete_table, [{name,T}|_]}, _Acc) when T==Tab ->
1577	      false;
1578	 (_Other, Acc) ->
1579	      Acc
1580      end, existed_before(Tab), Store).
1581
1582existed_before(Tab) ->
1583    ('EXIT' =/= element(1, ?catch_val({Tab,cstruct}))).
1584
1585
1586%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1587%% Delete a table entirely on all nodes.
1588
1589delete_table(Tab) ->
1590    schema_transaction(fun() -> do_delete_table(Tab) end).
1591
1592do_delete_table(schema) ->
1593    mnesia:abort({bad_type, schema});
1594do_delete_table(Tab) ->
1595    TidTs = get_tid_ts_and_lock(schema, write),
1596    ensure_writable(schema),
1597    insert_schema_ops(TidTs, make_delete_table(Tab, whole_table)).
1598
1599-dialyzer({no_improper_lists, make_delete_table/2}).
1600make_delete_table(Tab, Mode) ->
1601    case existed_before(Tab) of
1602	false ->
1603	    %% Deleting a table that was created in this very
1604	    %% schema transaction. Delete all ops in the Store
1605	    %% that operate on this table. We cannot run a normal
1606	    %% delete operation, since that involves checking live
1607	    %% nodes etc.
1608	    TidTs = get_tid_ts_and_lock(schema, write),
1609	    {_, _, Ts} = TidTs,
1610	    Store = Ts#tidstore.store,
1611	    Deleted = ets:select_delete(
1612			Store, [{{op,'$1',[{name,Tab}|'_']},
1613				 [{'or',
1614				   {'==','$1',create_table},
1615				   {'==','$1',delete_table}}], [true]}]),
1616	    ets:select_delete(
1617	      Store, [{{op,'$1',[{name,Tab}|'_'],'_'},
1618		       [{'or',
1619			 {'==','$1',write_table_property},
1620			 {'==','$1',delete_table_property}}],
1621		       [true]}]),
1622	    case Deleted of
1623		0 -> mnesia:abort({no_exists, Tab});
1624		_ -> []
1625	    end;
1626	true ->
1627	    case Mode of
1628		whole_table ->
1629		    case val({Tab, frag_properties}) of
1630			[] ->
1631			    [make_delete_table2(Tab)];
1632			_Props ->
1633			    %% Check if it is a base table
1634			    mnesia_frag:lookup_frag_hash(Tab),
1635
1636			    %% Check for foreigners
1637			    F = mnesia_frag:lookup_foreigners(Tab),
1638			    verify([], F, {combine_error,
1639					   Tab, "Too many foreigners", F}),
1640			    [make_delete_table2(T) ||
1641				T <- mnesia_frag:frag_names(Tab)]
1642		    end;
1643		single_frag ->
1644		    [make_delete_table2(Tab)]
1645	    end
1646    end.
1647
1648make_delete_table2(Tab) ->
1649    get_tid_ts_and_lock(Tab, write),
1650    Cs = val({Tab, cstruct}),
1651    ensure_active(Cs),
1652    ensure_writable(Tab),
1653    {op, delete_table, vsn_cs2list(Cs)}.
1654
1655%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1656%% Change fragmentation of a table
1657
1658change_table_frag(Tab, Change) ->
1659    schema_transaction(fun() -> do_change_table_frag(Tab, Change) end).
1660
1661do_change_table_frag(Tab, Change) when is_atom(Tab), Tab /= schema ->
1662    TidTs = get_tid_ts_and_lock(schema, write),
1663    Ops = mnesia_frag:change_table_frag(Tab, Change),
1664    [insert_schema_ops(TidTs, Op) || Op <- Ops],
1665    ok;
1666do_change_table_frag(Tab, _Change) ->
1667    mnesia:abort({bad_type, Tab}).
1668
1669%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1670%% Clear a table
1671
1672do_clear_table(schema) ->
1673    mnesia:abort({bad_type, schema});
1674do_clear_table(Tab) ->
1675    TidTs = get_tid_ts_and_lock(schema, write),
1676    get_tid_ts_and_lock(Tab, write),
1677    insert_schema_ops(TidTs, make_clear_table(Tab)).
1678
1679make_clear_table(Tab) ->
1680    Cs = val({Tab, cstruct}),
1681    ensure_writable(Tab),
1682    [{op, clear_table, vsn_cs2list(Cs)}].
1683
1684%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1685
1686add_table_copy(Tab, Node, Storage) ->
1687    schema_transaction(fun() -> do_add_table_copy(Tab, Node, Storage) end).
1688
1689do_add_table_copy(Tab, Node, Storage) when is_atom(Tab), is_atom(Node) ->
1690    TidTs = get_tid_ts_and_lock(schema, write),
1691    insert_schema_ops(TidTs, make_add_table_copy(Tab, Node, Storage));
1692do_add_table_copy(Tab,Node,_) ->
1693    mnesia:abort({badarg, Tab, Node}).
1694
1695make_add_table_copy(Tab, Node, Storage) ->
1696    ensure_writable(schema),
1697    Cs = incr_version(val({Tab, cstruct})),
1698    Ns = mnesia_lib:cs_to_nodes(Cs),
1699    verify(false, lists:member(Node, Ns), {already_exists, Tab, Node}),
1700    Cs2 = verify_cstruct(new_cs(Cs, Node, Storage, add)),
1701
1702    %% Check storage and if node is running
1703    IsRunning = lists:member(Node, val({current, db_nodes})),
1704    if
1705	Tab == schema ->
1706	    if
1707		Storage /= ram_copies ->
1708		    mnesia:abort({badarg, Tab, Storage});
1709		IsRunning == true ->
1710		    mnesia:abort({already_exists, Tab, Node});
1711		true ->
1712		    ignore
1713	    end;
1714	Storage == ram_copies ->
1715	    ignore;
1716	IsRunning == true ->
1717	    ignore;
1718	IsRunning == false ->
1719	    mnesia:abort({not_active, schema, Node})
1720    end,
1721    [{op, add_table_copy, Storage, Node, vsn_cs2list(Cs2)}].
1722
1723del_table_copy(Tab, Node) ->
1724    schema_transaction(fun() -> do_del_table_copy(Tab, Node) end).
1725
1726do_del_table_copy(Tab, Node) when is_atom(Node)  ->
1727    TidTs = get_tid_ts_and_lock(schema, write),
1728%%    get_tid_ts_and_lock(Tab, write),
1729    insert_schema_ops(TidTs, make_del_table_copy(Tab, Node));
1730do_del_table_copy(Tab, Node) ->
1731    mnesia:abort({badarg, Tab, Node}).
1732
1733make_del_table_copy(Tab, Node) ->
1734    ensure_writable(schema),
1735    Cs = incr_version(val({Tab, cstruct})),
1736    Storage = mnesia_lib:schema_cs_to_storage_type(Node, Cs),
1737    Cs2 = new_cs(Cs, Node, Storage, del),
1738    case mnesia_lib:cs_to_nodes(Cs2) of
1739        [] when Tab == schema ->
1740            mnesia:abort({combine_error, Tab, "Last replica"});
1741        [] ->
1742	    ensure_active(Cs),
1743            dbg_out("Last replica deleted in table ~tp~n",  [Tab]),
1744            make_delete_table(Tab,  whole_table);
1745        _ when Tab == schema ->
1746	    %% ensure_active(Cs2),
1747	    ensure_not_active(Tab, Node),
1748            Cs3 = verify_cstruct(Cs2),
1749	    Ops = remove_node_from_tabs(val({schema, tables}), Node),
1750	    [{op, del_table_copy, ram_copies, Node, vsn_cs2list(Cs3)} | Ops];
1751        _ ->
1752	    ensure_active(Cs),
1753            Cs3 = verify_cstruct(Cs2),
1754            get_tid_ts_and_lock(Tab, write),
1755            [{op, del_table_copy, Storage, Node, vsn_cs2list(Cs3)}]
1756    end.
1757
1758remove_node_from_tabs([], _Node) ->
1759    [];
1760remove_node_from_tabs([schema|Rest], Node) ->
1761    remove_node_from_tabs(Rest, Node);
1762remove_node_from_tabs([Tab|Rest], Node) ->
1763    {Cs, IsFragModified} =
1764	mnesia_frag:remove_node(Node, incr_version(val({Tab, cstruct}))),
1765    case mnesia_lib:schema_cs_to_storage_type(Node, Cs)  of
1766	unknown ->
1767	    case IsFragModified of
1768		true ->
1769		    [{op, change_table_frag, {del_node, Node}, vsn_cs2list(Cs)} |
1770		     remove_node_from_tabs(Rest, Node)];
1771		false ->
1772		    remove_node_from_tabs(Rest, Node)
1773	    end;
1774	Storage ->
1775	    Cs2 = new_cs(Cs, Node, Storage, del),
1776	    case mnesia_lib:cs_to_nodes(Cs2) of
1777		[] ->
1778		    [{op, delete_table, vsn_cs2list(Cs)} |
1779		     remove_node_from_tabs(Rest, Node)];
1780		_Ns ->
1781		    Cs3 = verify_cstruct(Cs2),
1782		    get_tid_ts_and_lock(Tab, write),
1783		    [{op, del_table_copy, ram_copies, Node, vsn_cs2list(Cs3)}|
1784		     remove_node_from_tabs(Rest, Node)]
1785	    end
1786    end.
1787
1788new_cs(Cs, Node, ram_copies, add) ->
1789    Cs#cstruct{ram_copies = opt_add(Node, Cs#cstruct.ram_copies)};
1790new_cs(Cs, Node, disc_copies, add) ->
1791    Cs#cstruct{disc_copies = opt_add(Node, Cs#cstruct.disc_copies)};
1792new_cs(Cs, Node, disc_only_copies, add) ->
1793    Cs#cstruct{disc_only_copies = opt_add(Node, Cs#cstruct.disc_only_copies)};
1794new_cs(Cs, Node, ram_copies, del) ->
1795    Cs#cstruct{ram_copies = lists:delete(Node , Cs#cstruct.ram_copies)};
1796new_cs(Cs, Node, disc_copies, del) ->
1797    Cs#cstruct{disc_copies = lists:delete(Node , Cs#cstruct.disc_copies)};
1798new_cs(Cs, Node, disc_only_copies, del) ->
1799    Cs#cstruct{disc_only_copies =
1800               lists:delete(Node , Cs#cstruct.disc_only_copies)};
1801new_cs(#cstruct{external_copies = ExtCps} = Cs, Node, Storage0, Op) ->
1802    Storage = case Storage0 of
1803		  {ext, Alias, _} -> Alias;
1804		  Alias -> Alias
1805	      end,
1806    ExtTypes = get_ext_types(),
1807    case lists:keyfind(Storage, 1, ExtTypes) of
1808	false ->
1809	    mnesia:abort({badarg, Cs#cstruct.name, Storage});
1810	{_, Mod} ->
1811	    Key = {Storage, Mod},
1812	    case {lists:keymember(Key, 1, ExtCps), Op} of
1813		{false, del} ->
1814		    Cs;
1815		{false, add} ->
1816		    Cs#cstruct{external_copies = [{Key, [Node]}|ExtCps]};
1817		{true, _} ->
1818		    F = fun({K, Ns}) when K == Key ->
1819				case Op of
1820				    del -> {K, lists:delete(Node, Ns)};
1821				    add -> {K, opt_add(Node, Ns)}
1822				end;
1823			   (X) ->
1824				X
1825			end,
1826		    Cs#cstruct{external_copies = lists:map(F, ExtCps)}
1827	    end
1828    end.
1829
1830
1831opt_add(N, L) -> [N | lists:delete(N, L)].
1832
1833move_table(Tab, FromNode, ToNode) ->
1834    schema_transaction(fun() -> do_move_table(Tab, FromNode, ToNode) end).
1835
1836do_move_table(schema, _FromNode, _ToNode) ->
1837    mnesia:abort({bad_type, schema});
1838do_move_table(Tab, FromNode, ToNode) when is_atom(FromNode), is_atom(ToNode) ->
1839    TidTs = get_tid_ts_and_lock(schema, write),
1840    get_tid_ts_and_lock(Tab, write),
1841    insert_schema_ops(TidTs, make_move_table(Tab, FromNode, ToNode));
1842do_move_table(Tab, FromNode, ToNode) ->
1843    mnesia:abort({badarg, Tab, FromNode, ToNode}).
1844
1845make_move_table(Tab, FromNode, ToNode) ->
1846    ensure_writable(schema),
1847    Cs = incr_version(val({Tab, cstruct})),
1848    Ns = mnesia_lib:cs_to_nodes(Cs),
1849    verify(false, lists:member(ToNode, Ns), {already_exists, Tab, ToNode}),
1850    verify(true, lists:member(FromNode, val({Tab, where_to_write})),
1851           {not_active, Tab, FromNode}),
1852    verify(false, val({Tab,local_content}),
1853           {"Cannot move table with local content", Tab}),
1854    ensure_active(Cs),
1855    Running = val({current, db_nodes}),
1856    Storage = mnesia_lib:schema_cs_to_storage_type(FromNode, Cs),
1857    verify(true, lists:member(ToNode, Running), {not_active, schema, ToNode}),
1858
1859    Cs2 = new_cs(Cs, ToNode, Storage, add),
1860    Cs3 = verify_cstruct(new_cs(Cs2, FromNode, Storage, del)),
1861    [{op, add_table_copy, Storage, ToNode, vsn_cs2list(Cs2)},
1862     {op, sync_trans},
1863     {op, del_table_copy, Storage, FromNode, vsn_cs2list(Cs3)}].
1864
1865%% end of functions to add and delete nodes to tables
1866%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1867%%
1868
1869change_table_copy_type(Tab, Node, ToS) ->
1870    schema_transaction(fun() -> do_change_table_copy_type(Tab, Node, ToS) end).
1871
1872do_change_table_copy_type(Tab, Node, ToS) when is_atom(Node) ->
1873    TidTs = get_tid_ts_and_lock(schema, write),
1874    get_tid_ts_and_lock(Tab, write), % ensure global sync
1875    %% get_tid_ts_and_lock(Tab, read),
1876    insert_schema_ops(TidTs, make_change_table_copy_type(Tab, Node, ToS));
1877do_change_table_copy_type(Tab, Node, _ToS) ->
1878    mnesia:abort({badarg, Tab, Node}).
1879
1880make_change_table_copy_type(Tab, Node, unknown) ->
1881    make_del_table_copy(Tab, Node);
1882make_change_table_copy_type(Tab, Node, ToS) ->
1883    ensure_writable(schema),
1884    Cs = incr_version(val({Tab, cstruct})),
1885    FromS = mnesia_lib:storage_type_at_node(Node, Tab),
1886
1887    ToSExp = expand_storage_type(ToS),
1888
1889    case compare_storage_type(false, FromS, ToSExp) of
1890	{same, _} ->
1891	    mnesia:abort({already_exists, Tab, Node, ToSExp});
1892	{diff, _} ->
1893	    ignore;
1894	incompatible ->
1895	    ensure_active(Cs)
1896    end,
1897
1898    Cs2 = new_cs(Cs, Node, FromS, del),
1899    Cs3 = verify_cstruct(new_cs(Cs2, Node, ToS, add)),
1900    [{op, change_table_copy_type, Node, FromS, ToSExp, vsn_cs2list(Cs3)}].
1901
1902%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1903%% change index functions ....
1904%% Pos is already added by 1 in both of these functions
1905
1906add_table_index(Tab, Pos) ->
1907    schema_transaction(fun() -> do_add_table_index(Tab, Pos) end).
1908
1909do_add_table_index(schema, _Attr) ->
1910    mnesia:abort({bad_type, schema});
1911do_add_table_index(Tab, Attr) ->
1912    TidTs = get_tid_ts_and_lock(schema, write),
1913    get_tid_ts_and_lock(Tab, read),
1914    Pos = attr_tab_to_pos(Tab, Attr),
1915    insert_schema_ops(TidTs, make_add_table_index(Tab, Pos)).
1916
1917make_add_table_index(Tab, Pos) ->
1918    ensure_writable(schema),
1919    Cs = incr_version(val({Tab, cstruct})),
1920    ensure_active(Cs),
1921    Ix = Cs#cstruct.index,
1922    verify(false, lists:keymember(index_pos(Pos), 1, Ix),
1923	   {already_exists, Tab, Pos}),
1924    Ix2 = lists:sort([Pos | Ix]),
1925    Cs2 = verify_cstruct(Cs#cstruct{index = Ix2}),
1926    NewPosInfo = lists:keyfind(Pos, 1, Cs2#cstruct.index),
1927    [{op, add_index, NewPosInfo, vsn_cs2list(Cs2)}].
1928
1929del_table_index(Tab, Pos) ->
1930    schema_transaction(fun() -> do_del_table_index(Tab, Pos) end).
1931
1932do_del_table_index(schema, _Attr) ->
1933    mnesia:abort({bad_type, schema});
1934do_del_table_index(Tab, Attr) ->
1935    TidTs = get_tid_ts_and_lock(schema, write),
1936    get_tid_ts_and_lock(Tab, read),
1937    Pos = attr_tab_to_pos(Tab, Attr),
1938    insert_schema_ops(TidTs, make_del_table_index(Tab, Pos)).
1939
1940make_del_table_index(Tab, Pos) ->
1941    ensure_writable(schema),
1942    Cs = incr_version(val({Tab, cstruct})),
1943    ensure_active(Cs),
1944    Ix = Cs#cstruct.index,
1945    verify(true, lists:keymember(Pos, 1, Ix), {no_exists, Tab, Pos}),
1946    Cs2 = verify_cstruct(Cs#cstruct{index = lists:keydelete(Pos, 1, Ix)}),
1947    [{op, del_index, Pos, vsn_cs2list(Cs2)}].
1948
1949%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1950
1951add_snmp(Tab, Ustruct) ->
1952    schema_transaction(fun() -> do_add_snmp(Tab, Ustruct) end).
1953
1954do_add_snmp(schema, _Ustruct) ->
1955    mnesia:abort({bad_type, schema});
1956do_add_snmp(Tab, Ustruct) ->
1957    TidTs = get_tid_ts_and_lock(schema, write),
1958    get_tid_ts_and_lock(Tab, read),
1959    insert_schema_ops(TidTs, make_add_snmp(Tab, Ustruct)).
1960
1961make_add_snmp(Tab, Ustruct) ->
1962    ensure_writable(schema),
1963    Cs = incr_version(val({Tab, cstruct})),
1964    ensure_active(Cs),
1965    verify([], Cs#cstruct.snmp, {already_exists, Tab, snmp}),
1966    Error = {badarg, Tab, snmp, Ustruct},
1967    verify(true, mnesia_snmp_hook:check_ustruct(Ustruct), Error),
1968    Cs2 = verify_cstruct(Cs#cstruct{snmp = Ustruct}),
1969    [{op, add_snmp, Ustruct, vsn_cs2list(Cs2)}].
1970
1971del_snmp(Tab) ->
1972    schema_transaction(fun() -> do_del_snmp(Tab) end).
1973
1974do_del_snmp(schema) ->
1975    mnesia:abort({bad_type, schema});
1976do_del_snmp(Tab) ->
1977    TidTs = get_tid_ts_and_lock(schema, write),
1978    get_tid_ts_and_lock(Tab, read),
1979    insert_schema_ops(TidTs, make_del_snmp(Tab)).
1980
1981make_del_snmp(Tab) ->
1982    ensure_writable(schema),
1983    Cs = incr_version(val({Tab, cstruct})),
1984    ensure_active(Cs),
1985    Cs2 = verify_cstruct(Cs#cstruct{snmp = []}),
1986    [{op, del_snmp, vsn_cs2list(Cs2)}].
1987
1988%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1989%%
1990
1991transform_table(Tab, Fun, NewAttrs, NewRecName)
1992  when is_function(Fun), is_list(NewAttrs), is_atom(NewRecName) ->
1993    schema_transaction(fun() -> do_transform_table(Tab, Fun, NewAttrs, NewRecName) end);
1994
1995transform_table(Tab, ignore, NewAttrs, NewRecName)
1996  when is_list(NewAttrs), is_atom(NewRecName) ->
1997    schema_transaction(fun() -> do_transform_table(Tab, ignore, NewAttrs, NewRecName) end);
1998
1999transform_table(Tab, Fun, NewAttrs, NewRecName) ->
2000    {aborted,{bad_type, Tab, Fun, NewAttrs, NewRecName}}.
2001
2002do_transform_table(schema, _Fun, _NewAttrs, _NewRecName) ->
2003    mnesia:abort({bad_type, schema});
2004do_transform_table(Tab, Fun, NewAttrs, NewRecName) ->
2005    TidTs = get_tid_ts_and_lock(schema, write),
2006    get_tid_ts_and_lock(Tab, write),
2007    insert_schema_ops(TidTs, make_transform(Tab, Fun, NewAttrs, NewRecName)).
2008
2009make_transform(Tab, Fun, NewAttrs, NewRecName) ->
2010    ensure_writable(schema),
2011    Cs = incr_version(val({Tab, cstruct})),
2012    ensure_active(Cs),
2013    ensure_writable(Tab),
2014    case Cs#cstruct.index of
2015	[] ->
2016	    Cs2 = verify_cstruct(
2017                    Cs#cstruct{attributes = NewAttrs,
2018                               record_name = NewRecName}),
2019	    [{op, transform, Fun, vsn_cs2list(Cs2)}];
2020	PosList ->
2021	    DelIdx = fun({Pos,_}, Ncs) ->
2022			     Ix = Ncs#cstruct.index,
2023			     Ix2 = lists:keydelete(Pos, 1, Ix),
2024			     Ncs1 = Ncs#cstruct{index = Ix2},
2025			     Op = {op, del_index, Pos, vsn_cs2list(Ncs1)},
2026			     {Op, Ncs1}
2027		     end,
2028	    AddIdx = fun({_,_} = Pos, Ncs) ->
2029			     Ix = Ncs#cstruct.index,
2030			     Ix2 = lists:sort([Pos | Ix]),
2031			     Ncs1 = Ncs#cstruct{index = Ix2},
2032			     Op = {op, add_index, Pos, vsn_cs2list(Ncs1)},
2033			     {Op, Ncs1}
2034		     end,
2035            {DelOps, Cs1} = lists:mapfoldl(DelIdx, Cs, PosList),
2036	    Cs2 = Cs1#cstruct{attributes = NewAttrs, record_name = NewRecName},
2037            {AddOps, Cs3} = lists:mapfoldl(AddIdx, Cs2, PosList),
2038	    _ = verify_cstruct(Cs3), % just a sanity check
2039	    lists:flatten([DelOps, {op, transform, Fun, vsn_cs2list(Cs2)},
2040			   AddOps])
2041    end.
2042
2043index_pos({Pos,_}) -> Pos;
2044index_pos(Pos) when is_integer(Pos) -> Pos;
2045index_pos({P} = Pos) when is_atom(P) -> Pos.
2046
2047
2048%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
2049%%
2050
2051change_table_access_mode(Tab, Mode) ->
2052    schema_transaction(fun() -> do_change_table_access_mode(Tab, Mode) end).
2053
2054do_change_table_access_mode(Tab, Mode) ->
2055    {_Mod, Tid, Ts} = get_tid_ts_and_lock(schema, write),
2056    Store = Ts#tidstore.store,
2057    mnesia_locker:wlock_no_exist(Tid, Store, schema, val({schema, active_replicas})),
2058    mnesia_locker:wlock_no_exist(Tid, Store, Tab, val({Tab, active_replicas})),
2059    do_insert_schema_ops(Store, make_change_table_access_mode(Tab, Mode)).
2060
2061make_change_table_access_mode(Tab, Mode) ->
2062    ensure_writable(schema),
2063    Cs = incr_version(val({Tab, cstruct})),
2064    ensure_active(Cs),
2065    OldMode = Cs#cstruct.access_mode,
2066    verify(false, OldMode ==  Mode, {already_exists, Tab, Mode}),
2067    Cs2 = verify_cstruct(Cs#cstruct{access_mode = Mode}),
2068    [{op, change_table_access_mode, vsn_cs2list(Cs2), OldMode, Mode}].
2069
2070%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
2071
2072change_table_load_order(Tab, LoadOrder) ->
2073    schema_transaction(fun() -> do_change_table_load_order(Tab, LoadOrder) end).
2074
2075do_change_table_load_order(schema, _LoadOrder) ->
2076    mnesia:abort({bad_type, schema});
2077do_change_table_load_order(Tab, LoadOrder) ->
2078    TidTs = get_tid_ts_and_lock(schema, write),
2079    get_tid_ts_and_lock(Tab, none),
2080    insert_schema_ops(TidTs, make_change_table_load_order(Tab, LoadOrder)).
2081
2082make_change_table_load_order(Tab, LoadOrder) ->
2083    ensure_writable(schema),
2084    Cs = incr_version(val({Tab, cstruct})),
2085    ensure_active(Cs),
2086    OldLoadOrder = Cs#cstruct.load_order,
2087    Cs2 = verify_cstruct(Cs#cstruct{load_order = LoadOrder}),
2088    [{op, change_table_load_order, vsn_cs2list(Cs2), OldLoadOrder, LoadOrder}].
2089
2090%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
2091
2092change_table_majority(Tab, Majority) when is_boolean(Majority) ->
2093    schema_transaction(fun() -> do_change_table_majority(Tab, Majority) end).
2094
2095do_change_table_majority(schema, _Majority) ->
2096    mnesia:abort({bad_type, schema});
2097do_change_table_majority(Tab, Majority) ->
2098    TidTs = get_tid_ts_and_lock(schema, write),
2099    get_tid_ts_and_lock(Tab, none),
2100    insert_schema_ops(TidTs, make_change_table_majority(Tab, Majority)).
2101
2102make_change_table_majority(Tab, Majority) ->
2103    ensure_writable(schema),
2104    Cs = incr_version(val({Tab, cstruct})),
2105    ensure_active(Cs),
2106    OldMajority = Cs#cstruct.majority,
2107    Cs2 = Cs#cstruct{majority = Majority},
2108    FragOps = case lists:keyfind(base_table, 1, Cs#cstruct.frag_properties) of
2109		  {_, Tab} ->
2110		      FragNames = mnesia_frag:frag_names(Tab) -- [Tab],
2111		      lists:map(
2112			fun(T) ->
2113				get_tid_ts_and_lock(Tab, none),
2114				CsT = incr_version(val({T, cstruct})),
2115				ensure_active(CsT),
2116				CsT2 = CsT#cstruct{majority = Majority},
2117				verify_cstruct(CsT2),
2118				{op, change_table_majority, vsn_cs2list(CsT2),
2119				 OldMajority, Majority}
2120			end, FragNames);
2121		  false    -> [];
2122		  {_, _}   -> mnesia:abort({bad_type, Tab})
2123	      end,
2124    verify_cstruct(Cs2),
2125    [{op, change_table_majority, vsn_cs2list(Cs2), OldMajority, Majority} | FragOps].
2126
2127%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
2128
2129write_table_property(Tab, Prop) when is_tuple(Prop), size(Prop) >= 1 ->
2130    schema_transaction(fun() -> do_write_table_property(Tab, Prop) end);
2131write_table_property(Tab, Prop) ->
2132    {aborted, {bad_type, Tab, Prop}}.
2133
2134do_write_table_property(Tab, Prop) ->
2135    TidTs = get_tid_ts_and_lock(schema, write),
2136    {_, _, Ts} = TidTs,
2137    Store = Ts#tidstore.store,
2138    case change_prop_in_existing_op(Tab, Prop, write_property, Store) of
2139	true ->
2140	    dbg_out("change_prop_in_existing_op"
2141		    "(~tp,~p,write_property,Store) -> true~n",
2142		    [Tab,Prop]),
2143	    %% we have merged the table prop into the create_table op
2144	    ok;
2145	false ->
2146	    dbg_out("change_prop_in_existing_op"
2147		    "(~tp,~p,write_property,Store) -> false~n",
2148		    [Tab,Prop]),
2149	    %% this must be an existing table
2150	    get_tid_ts_and_lock(Tab, none),
2151	    insert_schema_ops(TidTs, make_write_table_properties(Tab, [Prop]))
2152    end.
2153
2154make_write_table_properties(Tab, Props) ->
2155    ensure_writable(schema),
2156    Cs = incr_version(val({Tab, cstruct})),
2157    ensure_active(Cs),
2158    make_write_table_properties(Tab, Props, Cs).
2159
2160make_write_table_properties(Tab, [Prop | Props], Cs) ->
2161    OldProps = Cs#cstruct.user_properties,
2162    PropKey = element(1, Prop),
2163    DelProps = lists:keydelete(PropKey, 1, OldProps),
2164    MergedProps = lists:merge(DelProps, [Prop]),
2165    Cs2 = verify_cstruct(Cs#cstruct{user_properties = MergedProps}),
2166    [{op, write_property, vsn_cs2list(Cs2), Prop} |
2167     make_write_table_properties(Tab, Props, Cs2)];
2168make_write_table_properties(_Tab, [], _Cs) ->
2169    [].
2170
2171change_prop_in_existing_op(Tab, Prop, How, Store) ->
2172    Ops = ets:match_object(Store, '_'),
2173    case update_existing_op(Ops, Tab, Prop, How, []) of
2174	{true, Ops1} ->
2175	    ets:match_delete(Store, '_'),
2176	    [ets:insert(Store, Op) || Op <- Ops1],
2177	    true;
2178	false ->
2179	    false
2180    end.
2181
2182update_existing_op([{op, Op, L = [{name,Tab}|_], _OldProp}|Ops],
2183		   Tab, Prop, How, Acc) when Op == write_property;
2184					     Op == delete_property ->
2185    %% Apparently, mnesia_dumper doesn't care about OldProp here -- just L,
2186    %% so we will throw away OldProp (not that it matters...) and insert Prop.
2187    %% as element 3.
2188    L1 = insert_prop(Prop, L, How),
2189    NewOp = {op, How, L1, Prop},
2190    {true, lists:reverse(Acc) ++ [NewOp|Ops]};
2191update_existing_op([Op = {op, create_table, L}|Ops], Tab, Prop, How, Acc) ->
2192    case lists:keysearch(name, 1, L) of
2193	{value, {_, Tab}} ->
2194	    %% Tab is being created here -- insert Prop into L
2195	    L1 = insert_prop(Prop, L, How),
2196	    {true, lists:reverse(Acc) ++ [{op, create_table, L1}|Ops]};
2197	_ ->
2198	    update_existing_op(Ops, Tab, Prop, How, [Op|Acc])
2199    end;
2200update_existing_op([Op|Ops], Tab, Prop, How, Acc) ->
2201    update_existing_op(Ops, Tab, Prop, How, [Op|Acc]);
2202update_existing_op([], _, _, _, _) ->
2203    false.
2204
2205do_read_table_property(Tab, Key) ->
2206    TidTs = get_tid_ts_and_lock(schema, read),
2207    {_, _, Ts} = TidTs,
2208    Store = Ts#tidstore.store,
2209    Props = ets:foldl(
2210	      fun({op, announce_im_running,_,Opts,_,_}, _Acc) when Tab==schema ->
2211		      find_props(Opts);
2212		 ({op, create_table, [{name, T}|Opts]}, _Acc)
2213		    when T==Tab ->
2214		      find_props(Opts);
2215		 ({op, Op, [{name,T}|Opts], _Prop}, _Acc)
2216		 when T==Tab, Op==write_property;
2217		      T==Tab, Op==delete_property ->
2218		      find_props(Opts);
2219		 ({op, delete_table, [{name,T}|_]}, _Acc)
2220		 when T==Tab ->
2221		      [];
2222		 (_Other, Acc) ->
2223		      Acc
2224	      end, undefined, Store),
2225    case Props of
2226        undefined ->
2227            get_tid_ts_and_lock(Tab, read),
2228	    dirty_read_table_property(Tab, Key);
2229        _ when is_list(Props) ->
2230            case lists:keyfind(Key, 1, Props) of
2231		false ->
2232		    undefined;
2233		Other ->
2234		    Other
2235            end
2236    end.
2237
2238dirty_read_table_property(Tab, Key) ->
2239    try ets:lookup_element(mnesia_gvar, {Tab,user_property,Key}, 2)
2240    catch
2241	error:_ ->
2242	    undefined
2243    end.
2244
2245
2246%% perhaps a misnomer. How could also be delete_property... never mind.
2247%% Returns the modified L.
2248insert_prop(Prop, L, How) ->
2249    Prev = find_props(L),
2250    MergedProps = merge_with_previous(How, Prop, Prev),
2251    replace_props(L, MergedProps).
2252
2253find_props([{user_properties, P}|_]) -> P;
2254find_props([_H|T]) -> find_props(T).
2255%% we shouldn't reach []
2256
2257replace_props([{user_properties, _}|T], P) -> [{user_properties, P}|T];
2258replace_props([H|T], P) -> [H|replace_props(T, P)].
2259%% again, we shouldn't reach []
2260
2261merge_with_previous(write_property, Prop, Prev) ->
2262    Key = element(1, Prop),
2263    Prev1 = lists:keydelete(Key, 1, Prev),
2264    lists:sort([Prop|Prev1]);
2265merge_with_previous(delete_property, PropKey, Prev) ->
2266    lists:keydelete(PropKey, 1, Prev).
2267
2268delete_table_property(Tab, PropKey) ->
2269    schema_transaction(fun() -> do_delete_table_property(Tab, PropKey) end).
2270
2271do_delete_table_property(Tab, PropKey) ->
2272    TidTs = get_tid_ts_and_lock(schema, write),
2273    {_, _, Ts} = TidTs,
2274    Store = Ts#tidstore.store,
2275    case change_prop_in_existing_op(Tab, PropKey, delete_property, Store) of
2276	true ->
2277	    dbg_out("change_prop_in_existing_op"
2278		    "(~tp,~p,delete_property,Store) -> true~n",
2279		    [Tab,PropKey]),
2280	    %% we have merged the table prop into the create_table op
2281	    ok;
2282	false ->
2283	    dbg_out("change_prop_in_existing_op"
2284		    "(~tp,~p,delete_property,Store) -> false~n",
2285		    [Tab,PropKey]),
2286	    %% this must be an existing table
2287	    get_tid_ts_and_lock(Tab, none),
2288	    insert_schema_ops(TidTs,
2289			      make_delete_table_properties(Tab, [PropKey]))
2290    end.
2291
2292make_delete_table_properties(Tab, PropKeys) ->
2293    ensure_writable(schema),
2294    Cs = incr_version(val({Tab, cstruct})),
2295    ensure_active(Cs),
2296    make_delete_table_properties(Tab, PropKeys, Cs).
2297
2298make_delete_table_properties(Tab, [PropKey | PropKeys], Cs) ->
2299    OldProps = Cs#cstruct.user_properties,
2300    Props = lists:keydelete(PropKey, 1, OldProps),
2301    Cs2 = verify_cstruct(Cs#cstruct{user_properties = Props}),
2302    [{op, delete_property, vsn_cs2list(Cs2), PropKey} |
2303     make_delete_table_properties(Tab, PropKeys, Cs2)];
2304make_delete_table_properties(_Tab, [], _Cs) ->
2305    [].
2306
2307%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
2308
2309%% Ensure that the transaction can be committed even
2310%% if the node crashes and Mnesia is restarted
2311prepare_commit(Tid, Commit, WaitFor) ->
2312    case Commit#commit.schema_ops of
2313	[] ->
2314	    {false, Commit, optional};
2315	OrigOps ->
2316	    {Modified, Ops, DumperMode} =
2317		prepare_ops(Tid, OrigOps, WaitFor, false, [], optional),
2318	    InitBy = schema_prepare,
2319	    GoodRes = {Modified,
2320		       Commit#commit{schema_ops = lists:reverse(Ops)},
2321		       DumperMode},
2322	    case DumperMode of
2323		optional ->
2324		    dbg_out("Transaction log dump skipped (~p): ~w~n",
2325			    [DumperMode, InitBy]);
2326		mandatory ->
2327		    case mnesia_controller:sync_dump_log(InitBy) of
2328			dumped ->
2329			    GoodRes;
2330			{error, Reason} ->
2331			    mnesia:abort(Reason)
2332		    end
2333	    end,
2334	    case Ops of
2335		[] ->
2336		    ignore;
2337		_ ->
2338		    %% We need to grab a dumper lock here, the log may not
2339		    %% be dumped by others, during the schema commit phase.
2340		    mnesia_controller:wait_for_schema_commit_lock()
2341	    end,
2342	    GoodRes
2343    end.
2344
2345prepare_ops(Tid, [Op | Ops], WaitFor, Changed, Acc, DumperMode) ->
2346    case prepare_op(Tid, Op, WaitFor) of
2347        {true, mandatory} ->
2348	    prepare_ops(Tid, Ops, WaitFor, Changed, [Op | Acc], mandatory);
2349        {true, optional} ->
2350	    prepare_ops(Tid, Ops, WaitFor, Changed, [Op | Acc], DumperMode);
2351        {true, Ops2, mandatory} ->
2352	    prepare_ops(Tid, Ops, WaitFor, true, Ops2 ++ Acc, mandatory);
2353        {true, Ops2, optional} ->
2354	    prepare_ops(Tid, Ops, WaitFor, true, Ops2 ++ Acc, DumperMode);
2355	{false, optional} ->
2356	    prepare_ops(Tid, Ops, WaitFor, true, Acc, DumperMode)
2357    end;
2358prepare_ops(_Tid, [], _WaitFor, Changed, Acc, DumperMode) ->
2359    {Changed, Acc, DumperMode}.
2360
2361%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
2362%% Prepare for commit
2363%% returns true if Op should be included, i.e. unmodified
2364%%         {true, Operation} if NewRecs should be included, i.e. modified
2365%%         false if Op should NOT be included, i.e. modified
2366%%
2367prepare_op(_Tid, {op, rec, unknown, Rec}, _WaitFor) ->
2368    {{Tab, Key}, Items, _Op} = Rec,
2369    case val({Tab, storage_type}) of
2370        unknown ->
2371            {false, optional};
2372        Storage ->
2373            mnesia_tm:prepare_snmp(Tab, Key, Items), % May exit
2374            {true, [{op, rec, Storage, Rec}], optional}
2375    end;
2376
2377prepare_op(_Tid, {op, announce_im_running, Node, SchemaDef, Running, RemoteRunning}, _WaitFor) ->
2378    SchemaCs = list2cs(SchemaDef),
2379    if
2380	Node == node() -> %% Announce has already run on local node
2381	    ignore;       %% from do_merge_schema
2382	true ->
2383	    %% If a node has restarted it may still linger in db_nodes,
2384	    %% but have been removed from recover_nodes
2385	    Current  = mnesia_lib:intersect(val({current,db_nodes}), [node()|val(recover_nodes)]),
2386	    NewNodes = mnesia_lib:uniq(Running++RemoteRunning) -- Current,
2387	    mnesia_lib:set(prepare_op, {announce_im_running,NewNodes}),
2388	    announce_im_running(NewNodes, SchemaCs)
2389    end,
2390    {false, optional};
2391
2392prepare_op(_Tid, {op, sync_trans}, {part, CoordPid}) ->
2393    CoordPid ! {sync_trans, self()},
2394    receive
2395	{sync_trans, CoordPid} ->
2396	    {false, optional};
2397	{mnesia_down, _Node} = Else ->
2398	    mnesia_lib:verbose("sync_op terminated due to ~tp~n", [Else]),
2399	    mnesia:abort(Else);
2400	{'EXIT', _, _} = Else ->
2401	    mnesia_lib:verbose("sync_op terminated due to ~tp~n", [Else]),
2402	    mnesia:abort(Else)
2403    end;
2404
2405prepare_op(_Tid, {op, sync_trans}, {coord, Nodes}) ->
2406    case receive_sync(Nodes, []) of
2407	{abort, Reason} ->
2408	    mnesia_lib:verbose("sync_op terminated due to ~tp~n", [Reason]),
2409	    mnesia:abort(Reason);
2410	Pids ->
2411	    [Pid ! {sync_trans, self()} || Pid <- Pids],
2412	    {false, optional}
2413    end;
2414prepare_op(Tid, {op, create_table, TabDef}, _WaitFor) ->
2415    Cs = list2cs(TabDef),
2416    Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
2417    UseDir = mnesia_monitor:use_dir(),
2418    Tab = Cs#cstruct.name,
2419    case Storage of
2420        disc_copies when UseDir == false ->
2421	    UseDirReason = {bad_type, Tab, Storage, node()},
2422            mnesia:abort(UseDirReason);
2423        disc_only_copies when UseDir == false ->
2424	    UseDirReason = {bad_type, Tab, Storage, node()},
2425            mnesia:abort(UseDirReason);
2426	ram_copies ->
2427	    mnesia_lib:set({Tab, create_table},true),
2428	    create_ram_table(Tab, Cs),
2429	    insert_cstruct(Tid, Cs, false),
2430	    {true, optional};
2431	disc_copies ->
2432	    mnesia_lib:set({Tab, create_table},true),
2433	    create_ram_table(Tab, Cs),
2434	    create_disc_table(Tab),
2435	    insert_cstruct(Tid, Cs, false),
2436	    {true, optional};
2437	disc_only_copies ->
2438	    mnesia_lib:set({Tab, create_table},true),
2439	    create_disc_only_table(Tab,Cs),
2440	    insert_cstruct(Tid, Cs, false),
2441	    {true, optional};
2442	{ext, Alias, Mod} ->
2443	    mnesia_lib:set({Tab, create_table},true),
2444            create_external_table(Alias, Tab, Mod, Cs),
2445	    insert_cstruct(Tid, Cs, false),
2446	    {true, optional};
2447        unknown -> %% No replica on this node
2448	    mnesia_lib:set({Tab, create_table},true),
2449	    insert_cstruct(Tid, Cs, false),
2450            {true, optional}
2451    end;
2452
2453prepare_op(Tid, {op, add_table_copy, Storage, Node, TabDef}, _WaitFor) ->
2454    Cs = list2cs(TabDef),
2455    Tab = Cs#cstruct.name,
2456
2457    if
2458	Tab == schema ->
2459	    {true, optional};
2460
2461	Node == node() ->
2462	    case mnesia_lib:val({schema, storage_type}) of
2463		ram_copies when Storage /= ram_copies ->
2464		    Error = {combine_error, Tab, "has no disc", Node},
2465		    mnesia:abort(Error);
2466		_  ->
2467		    ok
2468	    end,
2469	    %% Tables are created by mnesia_loader get_network code
2470	    insert_cstruct(Tid, Cs, true),
2471	    case mnesia_controller:get_network_copy(Tid, Tab, Cs) of
2472		{loaded, ok} ->
2473		    {true, optional};
2474		{not_loaded, ErrReason} ->
2475		    Reason = {system_limit, Tab, {Node, ErrReason}},
2476		    mnesia:abort(Reason)
2477	    end;
2478	Node /= node() ->
2479	    %% Verify that ram table not has been dumped to disc
2480	    if
2481		Storage /= ram_copies ->
2482		    case mnesia_lib:schema_cs_to_storage_type(node(), Cs) of
2483			ram_copies ->
2484			    Dat = mnesia_lib:tab2dcd(Tab),
2485			    case mnesia_lib:exists(Dat) of
2486				true ->
2487				    mnesia:abort({combine_error, Tab, Storage,
2488						  "Table dumped to disc", node()});
2489				false ->
2490				    ok
2491			    end;
2492			_ ->
2493			    ok
2494		    end;
2495		true ->
2496		    ok
2497	    end,
2498	    insert_cstruct(Tid, Cs, true),
2499	    {true, optional}
2500    end;
2501
2502prepare_op(_Tid, {op, del_table_copy, _Storage, Node, TabDef}, _WaitFor) ->
2503    Cs = list2cs(TabDef),
2504    Tab = Cs#cstruct.name,
2505    set_where_to_read(Tab, Node, Cs),
2506    {true, optional};
2507
2508prepare_op(_Tid, {op, change_table_copy_type,  N, FromS, ToS, TabDef}, _WaitFor)
2509  when N == node() ->
2510    Cs = list2cs(TabDef),
2511    Tab = Cs#cstruct.name,
2512
2513    NotActive = mnesia_lib:not_active_here(Tab),
2514
2515    if Tab =/= schema ->
2516	    check_if_disc_required(FromS, ToS);
2517       true ->
2518	    ok
2519    end,
2520
2521    if
2522	NotActive == true ->
2523	    mnesia:abort({not_active, Tab, node()});
2524
2525	Tab == schema ->
2526	    case {FromS, ToS} of
2527		{ram_copies, disc_copies} ->
2528		    case mnesia:system_info(schema_location) of
2529			opt_disc ->
2530			    ignore;
2531			_ ->
2532			    mnesia:abort({combine_error,  Tab, node(),
2533					  "schema_location must be opt_disc"})
2534		    end,
2535		    Dir = mnesia_lib:dir(),
2536		    case opt_create_dir(true, Dir) of
2537			ok ->
2538			    purge_dir(Dir, []),
2539			    mnesia_log:purge_all_logs(),
2540			    set(use_dir, true),
2541			    mnesia_log:init(),
2542			    Ns = val({current, db_nodes}), %mnesia_lib:running_nodes(),
2543			    F = fun(U) -> mnesia_recover:log_mnesia_up(U) end,
2544			    lists:foreach(F, Ns),
2545
2546			    mnesia_dumper:raw_named_dump_table(Tab, dmp),
2547			    mnesia_checkpoint:tm_change_table_copy_type(Tab, FromS, ToS);
2548			{error, Reason} ->
2549			    mnesia:abort(Reason)
2550		    end;
2551		{disc_copies, ram_copies} ->
2552		    Ltabs = val({schema, local_tables}) -- [schema],
2553		    Dtabs = [L || L <- Ltabs,
2554				  val({L, storage_type}) /= ram_copies],
2555		    verify([], Dtabs, {"Disc resident tables", Dtabs, N});
2556		_ ->
2557		    mnesia:abort({combine_error, Tab, ToS})
2558	    end;
2559
2560	element(1,FromS) == ext; element(1,ToS) == ext ->
2561	    if ToS == ram_copies ->
2562		    create_ram_table(Tab, Cs);
2563	       true ->
2564		    ok
2565	    end,
2566            mnesia_dumper:dump_to_logfile(FromS, Tab),
2567            mnesia_checkpoint:tm_change_table_copy_type(Tab, FromS, ToS);
2568
2569	FromS == ram_copies ->
2570	    case mnesia_monitor:use_dir() of
2571		true ->
2572		    Dat = mnesia_lib:tab2dcd(Tab),
2573		    case mnesia_lib:exists(Dat) of
2574			true ->
2575			    mnesia:abort({combine_error, Tab, node(),
2576					  "Table dump exists"});
2577			false ->
2578			    case ToS of
2579				disc_copies ->
2580				    mnesia_log:ets2dcd(Tab, dmp);
2581				disc_only_copies ->
2582				    mnesia_dumper:raw_named_dump_table(Tab, dmp)
2583			    end,
2584			    mnesia_checkpoint:tm_change_table_copy_type(Tab,
2585                                                                        FromS,
2586                                                                        ToS)
2587		    end;
2588		false ->
2589		    mnesia:abort({has_no_disc, node()})
2590	    end;
2591
2592	FromS == disc_copies, ToS == disc_only_copies ->
2593	    mnesia_dumper:raw_named_dump_table(Tab, dmp);
2594
2595	FromS == disc_only_copies ->
2596	    Type = Cs#cstruct.type,
2597	    create_ram_table(Tab, Cs),
2598	    Datname = mnesia_lib:tab2dat(Tab),
2599	    Repair = mnesia_monitor:get_env(auto_repair),
2600	    case mnesia_lib:dets_to_ets(Tab, Tab, Datname, Type, Repair, no) of
2601		loaded -> ok;
2602		Reason ->
2603		    Err = "Failed to copy disc data to ram",
2604		    mnesia:abort({system_limit, Tab, {Err,Reason}})
2605	    end;
2606
2607	true ->
2608	    ignore
2609    end,
2610    {true, mandatory};
2611
2612prepare_op(_Tid, {op, change_table_copy_type,  N, _FromS, _ToS, _TabDef}, _WaitFor)
2613  when N /= node() ->
2614    {true, mandatory};
2615
2616prepare_op(_Tid, {op, delete_table, _TabDef}, _WaitFor) ->
2617    {true, mandatory};
2618
2619prepare_op(_Tid, {op, dump_table, unknown, TabDef}, _WaitFor) ->
2620    Cs = list2cs(TabDef),
2621    Tab = Cs#cstruct.name,
2622    case lists:member(node(), Cs#cstruct.ram_copies) of
2623        true ->
2624	    case mnesia_monitor:use_dir() of
2625		true ->
2626		    mnesia_log:ets2dcd(Tab, dmp),
2627		    Size = mnesia:table_info(Tab, size),
2628		    {true, [{op, dump_table, Size, TabDef}], optional};
2629		false ->
2630		    mnesia:abort({has_no_disc, node()})
2631	    end;
2632        false ->
2633            {false, optional}
2634    end;
2635
2636prepare_op(_Tid, {op, add_snmp, Ustruct, TabDef}, _WaitFor) ->
2637    Cs = list2cs(TabDef),
2638    case mnesia_lib:cs_to_storage_type(node(), Cs) of
2639        unknown ->
2640            {true, optional};
2641        Storage ->
2642            Tab = Cs#cstruct.name,
2643            Stab = mnesia_snmp_hook:create_table(Ustruct, Tab, Storage),
2644            mnesia_lib:set({Tab, {index, snmp}}, Stab),
2645            {true, optional}
2646    end;
2647
2648prepare_op(_Tid, {op, transform, ignore, _TabDef}, _WaitFor) ->
2649    {true, mandatory};   %% Apply schema changes only.
2650prepare_op(_Tid, {op, transform, Fun, TabDef}, _WaitFor) ->
2651    Cs = list2cs(TabDef),
2652    case mnesia_lib:cs_to_storage_type(node(), Cs) of
2653        unknown ->
2654            {true, mandatory};
2655        Storage ->
2656            Tab = Cs#cstruct.name,
2657            RecName = Cs#cstruct.record_name,
2658	    Type = Cs#cstruct.type,
2659            NewArity = length(Cs#cstruct.attributes) + 1,
2660	    mnesia_lib:db_fixtable(Storage, Tab, true),
2661            Key = mnesia_lib:db_first(Tab),
2662	    Op = {op, transform, Fun, TabDef},
2663            try transform_objs(Fun, Tab, RecName, Key,
2664			       NewArity, Storage, Type, [Op]) of
2665                Objs ->
2666		    mnesia_lib:db_fixtable(Storage, Tab, false),
2667                    {true, Objs, mandatory}
2668	    catch _:Reason:Stacktrace ->
2669		    mnesia_lib:db_fixtable(Storage, Tab, false),
2670		    mnesia_lib:important("Transform function failed: '~tp' in '~tp'",
2671					 [Reason, Stacktrace]),
2672                    exit({"Bad transform function", Tab, Fun, node(), Reason})
2673            end
2674    end;
2675
2676prepare_op(_Tid, {op, merge_schema, TabDef}, _WaitFor) ->
2677    Cs = list2cs(TabDef),
2678    case verify_merge(Cs) of
2679	ok  ->
2680	    {true, optional};
2681	Error ->
2682	    verbose("Merge_Schema ~p failed on ~p: ~tp~n", [_Tid,node(),Error]),
2683	    mnesia:abort({bad_commit, Error})
2684    end;
2685prepare_op(_Tid, _Op, _WaitFor) ->
2686    {true, optional}.
2687
2688check_if_disc_required(FromS, ToS) ->
2689    FromSem = mnesia_lib:semantics(FromS, storage),
2690    ToSem = mnesia_lib:semantics(ToS, storage),
2691    case {FromSem, ToSem} of
2692	{ram_copies, _} when ToSem == disc_copies;
2693			     ToSem == disc_only_copies ->
2694	    case mnesia_monitor:use_dir() of
2695		true ->
2696		    ok;
2697		false ->
2698		    mnesia:abort({has_no_disc, node()})
2699	    end;
2700	_ ->
2701	    ok
2702    end.
2703
2704create_ram_table(Tab, #cstruct{type=Type, storage_properties=Props}) ->
2705    EtsOpts = proplists:get_value(ets, Props, []),
2706    Args = [{keypos, 2}, public, named_table, Type | EtsOpts],
2707    case mnesia_monitor:unsafe_mktab(Tab, Args) of
2708	Tab ->
2709	    ok;
2710	{error,Reason} ->
2711	    Err = "Failed to create ets table",
2712	    mnesia:abort({system_limit, Tab, {Err,Reason}})
2713    end.
2714
2715create_disc_table(Tab) ->
2716    File = mnesia_lib:tab2dcd(Tab),
2717    file:delete(File),
2718    FArg = [{file, File}, {name, {mnesia,create}},
2719	    {repair, false}, {mode, read_write}],
2720    case mnesia_monitor:open_log(FArg) of
2721	{ok,Log} ->
2722	    mnesia_monitor:unsafe_close_log(Log),
2723	    ok;
2724	{error,Reason} ->
2725	    Err = "Failed to create disc table",
2726	    mnesia:abort({system_limit, Tab, {Err,Reason}})
2727    end.
2728create_disc_only_table(Tab, #cstruct{type=Type, storage_properties=Props}) ->
2729    File = mnesia_lib:tab2dat(Tab),
2730    file:delete(File),
2731    DetsOpts = proplists:get_value(dets, Props, []),
2732    Args = [{file, mnesia_lib:tab2dat(Tab)},
2733	    {type, mnesia_lib:disk_type(Tab, Type)},
2734	    {keypos, 2},
2735	    {repair, mnesia_monitor:get_env(auto_repair)}
2736	    | DetsOpts],
2737    case mnesia_monitor:unsafe_open_dets(Tab, Args) of
2738	{ok, _} ->
2739	    ok;
2740	{error,Reason} ->
2741	    Err = "Failed to create disc table",
2742	    mnesia:abort({system_limit, Tab, {Err,Reason}})
2743    end.
2744
2745create_external_table(Alias, Tab, Mod, Cs) ->
2746    case mnesia_monitor:unsafe_create_external(Tab, Alias, Mod, Cs) of
2747	ok ->
2748	    ok;
2749	{error,Reason} ->
2750	    Err = "Failed to create external table",
2751	    mnesia:abort({system_limit, Tab, {Err,Reason}})
2752    end.
2753
2754receive_sync([], Pids) ->
2755    Pids;
2756receive_sync(Nodes, Pids) ->
2757    receive
2758	{sync_trans, Pid} ->
2759	    Node = node(Pid),
2760	    receive_sync(lists:delete(Node, Nodes), [Pid | Pids]);
2761	Else ->
2762	    {abort, Else}
2763    end.
2764
2765set_where_to_read(Tab, Node, Cs) ->
2766    case mnesia_lib:val({Tab, where_to_read}) of
2767	Node ->
2768	    case Cs#cstruct.local_content of
2769		true ->
2770		    ok;
2771		false ->
2772		    mnesia_lib:set_remote_where_to_read(Tab, [Node]),
2773		    ok
2774	    end;
2775	_ ->
2776	    ok
2777    end.
2778
2779%% Build up the list in reverse order.
2780transform_objs(_Fun, _Tab, _RT, '$end_of_table', _NewArity, _Storage, _Type, Acc) ->
2781    Acc;
2782transform_objs(Fun, Tab, RecName, Key, A, Storage, Type, Acc) ->
2783    Objs = mnesia_lib:db_get(Tab, Key),
2784    NextKey = mnesia_lib:db_next_key(Tab, Key),
2785    Oid = {Tab, Key},
2786    NewObjs = {Ws, Ds} = transform_obj(Tab, RecName, Key, Fun, Objs, A, Type, [], []),
2787    if
2788	NewObjs == {[], []} ->
2789	    transform_objs(Fun, Tab, RecName, NextKey, A, Storage, Type, Acc);
2790	Type == bag ->
2791	    transform_objs(Fun, Tab, RecName, NextKey, A, Storage, Type,
2792			   [{op, rec, Storage, {Oid, Ws, write}},
2793			    {op, rec, Storage, {Oid, [Oid], delete}} | Acc]);
2794	Ds == [] ->
2795	    %% Type is set or ordered_set, no need to delete the record first
2796	    transform_objs(Fun, Tab, RecName, NextKey, A, Storage, Type,
2797			   [{op, rec, Storage, {Oid, Ws, write}} | Acc]);
2798	Ws == [] ->
2799	    transform_objs(Fun, Tab, RecName, NextKey, A, Storage, Type,
2800			   [{op, rec, Storage, {Oid, Ds, write}} | Acc]);
2801	true ->
2802	    transform_objs(Fun, Tab, RecName, NextKey, A, Storage, Type,
2803			   [{op, rec, Storage, {Oid, Ws, write}},
2804			    {op, rec, Storage, {Oid, Ds, delete}} | Acc])
2805    end.
2806
2807transform_obj(Tab, RecName, Key, Fun, [Obj|Rest], NewArity, Type, Ws, Ds) ->
2808    NewObj = Fun(Obj),
2809    if
2810        size(NewObj) /= NewArity ->
2811            exit({"Bad arity", Obj, NewObj});
2812	NewObj == Obj ->
2813	    transform_obj(Tab, RecName, Key, Fun, Rest, NewArity, Type, Ws, Ds);
2814        RecName == element(1, NewObj), Key == element(2, NewObj) ->
2815            transform_obj(Tab, RecName, Key, Fun, Rest, NewArity,
2816			  Type, [NewObj | Ws], Ds);
2817	NewObj == delete ->
2818	    case Type of
2819		bag -> %% Just don't write that object
2820		   transform_obj(Tab, RecName, Key, Fun, Rest,
2821				 NewArity, Type, Ws, Ds);
2822		_ ->
2823		    transform_obj(Tab, RecName, Key, Fun, Rest, NewArity,
2824				  Type, Ws, [NewObj | Ds])
2825	    end;
2826        true ->
2827            exit({"Bad key or Record Name", Obj, NewObj})
2828    end;
2829transform_obj(_Tab, _RecName, _Key, _Fun, [], _NewArity, _Type, Ws, Ds) ->
2830    {lists:reverse(Ws), lists:reverse(Ds)}.
2831
2832%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
2833%% Undo prepare of commit
2834undo_prepare_commit(Tid, Commit) ->
2835    case Commit#commit.schema_ops of
2836	[] ->
2837	    ignore;
2838	Ops ->
2839	    %% Catch to allow failure mnesia_controller may not be started
2840	    ?SAFE(mnesia_controller:release_schema_commit_lock()),
2841	    undo_prepare_ops(Tid, Ops)
2842    end,
2843    Commit.
2844
2845%% Undo in reverse order
2846undo_prepare_ops(Tid, [Op | Ops]) ->
2847    case element(1, Op) of
2848	TheOp when TheOp /= op, TheOp /= restore_op ->
2849	    undo_prepare_ops(Tid, Ops);
2850	_ ->
2851	    undo_prepare_ops(Tid, Ops),
2852	    undo_prepare_op(Tid, Op)
2853    end;
2854undo_prepare_ops(_Tid, []) ->
2855    [].
2856
2857undo_prepare_op(_Tid, {op, announce_im_running, _Node, _, _Running, _RemoteRunning}) ->
2858    case ?catch_val(prepare_op) of
2859	{announce_im_running, New} ->
2860            unannounce_im_running(New);
2861	_Else ->
2862	    ok
2863    end;
2864
2865undo_prepare_op(_Tid, {op, sync_trans}) ->
2866    ok;
2867
2868undo_prepare_op(Tid, {op, create_table, TabDef}) ->
2869    Cs = list2cs(TabDef),
2870    Tab = Cs#cstruct.name,
2871    mnesia_lib:unset({Tab, create_table}),
2872    delete_cstruct(Tid, Cs),
2873    case mnesia_lib:cs_to_storage_type(node(), Cs) of
2874	unknown ->
2875	    ok;
2876	ram_copies ->
2877	    ram_delete_table(Tab, ram_copies);
2878	disc_copies ->
2879	    ram_delete_table(Tab, disc_copies),
2880	    DcdFile = mnesia_lib:tab2dcd(Tab),
2881	    %%	    disc_delete_table(Tab, Storage),
2882	    file:delete(DcdFile);
2883	disc_only_copies ->
2884	    mnesia_monitor:unsafe_close_dets(Tab),
2885	    Dat = mnesia_lib:tab2dat(Tab),
2886	    %%	    disc_delete_table(Tab, Storage),
2887	    file:delete(Dat);
2888        {ext, Alias, Mod} ->
2889	    Mod:close_table(Alias, Tab),
2890            Mod:delete_table(Alias, Tab)
2891    end;
2892
2893undo_prepare_op(Tid, {op, add_table_copy, Storage, Node, TabDef}) ->
2894    Cs = list2cs(TabDef),
2895    Tab = Cs#cstruct.name,
2896    if
2897	Tab == schema ->
2898	    true; % Nothing to prepare
2899	Node == node() ->
2900	    mnesia_checkpoint:tm_del_copy(Tab, Node),
2901	    mnesia_controller:unannounce_add_table_copy(Tab, Node),
2902	    if
2903		Storage == disc_only_copies; Tab == schema ->
2904		    mnesia_monitor:close_dets(Tab),
2905		    file:delete(mnesia_lib:tab2dat(Tab));
2906		true ->
2907		    file:delete(mnesia_lib:tab2dcd(Tab))
2908	    end,
2909	    ram_delete_table(Tab, Storage),
2910	    Cs2 = new_cs(Cs, Node, Storage, del),
2911	    insert_cstruct(Tid, Cs2, true); % Don't care about the version
2912	Node /= node() ->
2913	    mnesia_controller:unannounce_add_table_copy(Tab, Node),
2914	    Cs2 = new_cs(Cs, Node, Storage, del),
2915	    insert_cstruct(Tid, Cs2, true) % Don't care about the version
2916    end;
2917
2918undo_prepare_op(_Tid, {op, del_table_copy, _, Node, TabDef}) ->
2919    Cs = list2cs(TabDef),
2920    Tab = Cs#cstruct.name,
2921    if node() =:= Node ->
2922	    mnesia_lib:set({Tab, where_to_read}, Node);
2923       true ->
2924	    case mnesia_lib:val({Tab, where_to_read}) of
2925		nowhere ->
2926		    mnesia_lib:set_remote_where_to_read(Tab);
2927		_ ->
2928		    ignore
2929	    end
2930    end;
2931
2932undo_prepare_op(_Tid, {op, change_table_copy_type, N, FromS, ToS, TabDef})
2933        when N == node() ->
2934    Cs = list2cs(TabDef),
2935    Tab = Cs#cstruct.name,
2936    mnesia_checkpoint:tm_change_table_copy_type(Tab, ToS, FromS),
2937    Dmp = mnesia_lib:tab2dmp(Tab),
2938
2939    case {FromS, ToS} of
2940        {ram_copies, disc_copies} when Tab == schema ->
2941            file:delete(Dmp),
2942            mnesia_log:purge_some_logs(),
2943	    set(use_dir, false);
2944	{ram_copies, disc_copies} ->
2945	    file:delete(Dmp);
2946	{ram_copies, disc_only_copies} ->
2947	    file:delete(Dmp);
2948	{disc_only_copies, _} ->
2949	    ram_delete_table(Tab, ram_copies);
2950	_ ->
2951	    ignore
2952    end;
2953
2954undo_prepare_op(_Tid, {op, dump_table, _Size, TabDef}) ->
2955    Cs = list2cs(TabDef),
2956    case lists:member(node(), Cs#cstruct.ram_copies) of
2957	true ->
2958	    Tab = Cs#cstruct.name,
2959	    Dmp = mnesia_lib:tab2dmp(Tab),
2960	    file:delete(Dmp);
2961	false ->
2962	    ignore
2963    end;
2964
2965undo_prepare_op(_Tid, {op, add_snmp, _Ustruct, TabDef}) ->
2966    Cs = list2cs(TabDef),
2967    case mnesia_lib:cs_to_storage_type(node(), Cs) of
2968	unknown ->
2969	    true;
2970	_Storage ->
2971	    Tab = Cs#cstruct.name,
2972	    case ?catch_val({Tab, {index, snmp}}) of
2973		{'EXIT',_} ->
2974		    ignore;
2975		Stab ->
2976		    mnesia_snmp_hook:delete_table(Tab, Stab),
2977		    mnesia_lib:unset({Tab, {index, snmp}})
2978	    end
2979    end;
2980
2981undo_prepare_op(_Tid, _Op) ->
2982    ignore.
2983
2984%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
2985
2986ram_delete_table(Tab, Storage) ->
2987    case Storage of
2988	unknown ->
2989	    ignore;
2990        {ext, _, _} ->
2991            ignore;
2992	disc_only_copies ->
2993	    ignore;
2994	_Else ->
2995	    %% delete possible index files and data .....
2996	    %% Got to catch this since if no info has been set in the
2997	    %% mnesia_gvar it will crash
2998	    ?CATCH(mnesia_index:del_transient(Tab, Storage)),
2999	    case ?catch_val({Tab, {index, snmp}}) of
3000		{'EXIT', _} ->
3001		    ignore;
3002		Etab ->
3003		    ?SAFE(mnesia_snmp_hook:delete_table(Tab, Etab))
3004	    end,
3005	    ?SAFE(?ets_delete_table(Tab))
3006    end.
3007
3008purge_dir(Dir, KeepFiles) ->
3009    Suffixes = known_suffixes(),
3010    purge_dir(Dir, KeepFiles, Suffixes).
3011
3012purge_dir(Dir, KeepFiles, Suffixes) ->
3013    case dir_exists(Dir) of
3014	true ->
3015	    {ok, AllFiles} = file:list_dir(Dir),
3016	    purge_known_files(AllFiles, KeepFiles, Dir, Suffixes);
3017	false ->
3018	    ok
3019    end.
3020
3021purge_tmp_files() ->
3022    case mnesia_monitor:use_dir() of
3023	true ->
3024	    Dir = mnesia_lib:dir(),
3025	    KeepFiles = [],
3026	    Exists = mnesia_lib:exists(mnesia_lib:tab2dat(schema)),
3027	    case Exists of
3028		true ->
3029		    Suffixes = tmp_suffixes(),
3030		    purge_dir(Dir, KeepFiles, Suffixes);
3031		false ->
3032		    %% Interrupted change of storage type
3033		    %% for schema table
3034		    Suffixes = known_suffixes(),
3035		    purge_dir(Dir, KeepFiles, Suffixes),
3036		    mnesia_lib:set(use_dir, false)
3037		end;
3038
3039	false ->
3040	    ok
3041    end.
3042
3043purge_known_files([File | Tail], KeepFiles, Dir, Suffixes) ->
3044    case lists:member(File, KeepFiles) of
3045	true ->
3046	    ignore;
3047	false ->
3048	    case has_known_suffix(File, Suffixes, false) of
3049		false ->
3050		    ignore;
3051		true ->
3052		    AbsFile = filename:join([Dir, File]),
3053                    delete_recursive(AbsFile)
3054            end
3055    end,
3056    purge_known_files(Tail, KeepFiles, Dir, Suffixes);
3057purge_known_files([], _KeepFiles, _Dir, _Suffixes) ->
3058    ok.
3059
3060%% Removes a directory or file recursively
3061delete_recursive(Path) ->
3062    case filelib:is_dir(Path) of
3063        true ->
3064            {ok, Names} = file:list_dir(Path),
3065            lists:foreach(fun(Name) ->
3066                                  delete_recursive(filename:join(Path, Name))
3067                          end,
3068                          Names),
3069            file:del_dir(Path);
3070        false ->
3071            file:delete(Path)
3072    end.
3073
3074has_known_suffix(_File, _Suffixes, true) ->
3075    true;
3076has_known_suffix(File, [Suffix | Tail], false) ->
3077    has_known_suffix(File, Tail, lists:suffix(Suffix, File));
3078has_known_suffix(_File, [], Bool) ->
3079    Bool.
3080
3081known_suffixes() -> known_suffixes(get_ext_types_disc()).
3082
3083known_suffixes(Ext) -> real_suffixes(Ext) ++ tmp_suffixes(Ext).
3084
3085real_suffixes(Ext) ->  [".DAT", ".LOG", ".BUP", ".DCL", ".DCD"] ++ ext_real_suffixes(Ext).
3086
3087tmp_suffixes() -> tmp_suffixes(get_ext_types_disc()).
3088
3089tmp_suffixes(Ext) -> [".TMP", ".BUPTMP", ".RET", ".DMP", "."] ++ ext_tmp_suffixes(Ext).
3090
3091ext_real_suffixes(Ext) ->
3092    try lists:foldl(fun(Mod, Acc) -> Acc++Mod:real_suffixes() end, [],
3093		    [M || {_,M} <- Ext])
3094    catch
3095        error:E ->
3096            verbose("Cant find real ext suffixes (~tp)~n", [E]),
3097            []
3098    end.
3099
3100ext_tmp_suffixes(Ext) ->
3101    try lists:foldl(fun(Mod, Acc) -> Acc++Mod:tmp_suffixes() end, [],
3102		    [M || {_,M} <- Ext])
3103    catch
3104        error:E ->
3105            verbose("Cant find tmp ext suffixes (~tp)~n", [E]),
3106            []
3107    end.
3108
3109info() ->
3110    Tabs = lists:sort(val({schema, tables})),
3111    lists:foreach(fun(T) -> info(T) end, Tabs),
3112    ok.
3113
3114info(Tab) ->
3115    Props = get_table_properties(Tab),
3116    io:format("-- Properties for ~tw table --- ~n",[Tab]),
3117    info2(Tab, Props).
3118info2(Tab, [{cstruct, _V} | Tail]) -> % Ignore cstruct
3119    info2(Tab, Tail);
3120info2(Tab, [{frag_hash, _V} | Tail]) -> % Ignore frag_hash
3121    info2(Tab, Tail);
3122info2(Tab, [{P, V} | Tail]) ->
3123    io:format("~-20tw -> ~tp~n",[P,V]),
3124    info2(Tab, Tail);
3125info2(_, []) ->
3126    io:format("~n", []).
3127
3128get_table_properties(Tab) ->
3129    try mnesia_lib:db_match_object(ram_copies, mnesia_gvar, {{Tab, '_'}, '_'}) of
3130	RawGvar ->
3131	    case [{Item, Val} || {{_Tab, Item}, Val} <- RawGvar] of
3132		[] ->
3133		    [];
3134		Gvar ->
3135		    Size = {size, mnesia:table_info(Tab, size)},
3136		    Memory = {memory, mnesia:table_info(Tab, memory)},
3137		    Master = {master_nodes, mnesia:table_info(Tab, master_nodes)},
3138		    lists:sort([Size, Memory, Master | Gvar])
3139	    end
3140    catch error:_ ->
3141	    mnesia:abort({no_exists, Tab, all})
3142    end.
3143
3144%%%%%%%%%%% RESTORE %%%%%%%%%%%
3145
3146-record(r, {iter = schema,
3147	    module,
3148	    table_options = [],
3149	    default_op = clear_tables,
3150	    tables = [],
3151	    opaque,
3152	    insert_op = error_fun,
3153	    recs = error_recs
3154	   }).
3155
3156restore(Opaque) ->
3157    restore(Opaque, [], mnesia_monitor:get_env(backup_module)).
3158restore(Opaque, Args) when is_list(Args) ->
3159    restore(Opaque, Args, mnesia_monitor:get_env(backup_module));
3160restore(_Opaque, BadArg) ->
3161    {aborted, {badarg, BadArg}}.
3162restore(Opaque, Args, Module) when is_list(Args), is_atom(Module) ->
3163    InitR = #r{opaque = Opaque, module = Module},
3164    try lists:foldl(fun check_restore_arg/2, InitR, Args) of
3165	R when is_record(R, r) ->
3166	    case mnesia_bup:read_schema(R#r.module, Opaque) of
3167		{error, Reason} ->
3168		    {aborted, Reason};
3169		BupSchema ->
3170		    schema_transaction(fun() -> do_restore(R, BupSchema) end)
3171	    end
3172    catch exit:Reason ->
3173	    {aborted, Reason}
3174    end;
3175restore(_Opaque, Args, Module) ->
3176    {aborted, {badarg, Args, Module}}.
3177
3178check_restore_arg({module, Mod}, R) when is_atom(Mod) ->
3179    R#r{module = Mod};
3180
3181check_restore_arg({clear_tables, List}, R) when is_list(List) ->
3182    case lists:member(schema, List) of
3183	false ->
3184	    TableList = [{Tab, clear_tables} || Tab <- List],
3185	    R#r{table_options = R#r.table_options ++ TableList};
3186	true ->
3187	    exit({badarg, {clear_tables, schema}})
3188    end;
3189check_restore_arg({recreate_tables, List}, R) when is_list(List) ->
3190    case lists:member(schema, List) of
3191	false ->
3192	    TableList = [{Tab, recreate_tables} || Tab <- List],
3193	    R#r{table_options = R#r.table_options ++ TableList};
3194	true ->
3195	    exit({badarg, {recreate_tables, schema}})
3196    end;
3197check_restore_arg({keep_tables, List}, R) when is_list(List) ->
3198    TableList = [{Tab, keep_tables} || Tab <- List],
3199    R#r{table_options = R#r.table_options ++ TableList};
3200check_restore_arg({skip_tables, List}, R) when is_list(List) ->
3201    TableList = [{Tab, skip_tables} || Tab <- List],
3202    R#r{table_options = R#r.table_options ++ TableList};
3203check_restore_arg({default_op, Op}, R) ->
3204    case Op of
3205	clear_tables -> ok;
3206	recreate_tables -> ok;
3207	keep_tables -> ok;
3208	skip_tables -> ok;
3209	Else ->
3210	    exit({badarg, {bad_default_op, Else}})
3211    end,
3212    R#r{default_op = Op};
3213
3214check_restore_arg(BadArg,_) ->
3215    exit({badarg, BadArg}).
3216
3217do_restore(R, BupSchema) ->
3218    TidTs = get_tid_ts_and_lock(schema, write),
3219    R2 = restore_schema(BupSchema, R),
3220    insert_schema_ops(TidTs, [{restore_op, R2}]),
3221    [element(1, TabStruct) || TabStruct <- R2#r.tables].
3222
3223arrange_restore(R, Fun, Recs) ->
3224    R2 = R#r{insert_op = Fun, recs = Recs},
3225    case mnesia_bup:iterate(R#r.module, fun restore_items/5, R#r.opaque, R2) of
3226	{ok, R3} -> R3#r.recs;
3227	{error, Reason} -> mnesia:abort(Reason)
3228    end.
3229
3230restore_items([Rec | Recs], Header, Schema, Ext, R) ->
3231    Tab = element(1, Rec),
3232    case lists:keysearch(Tab, 1, R#r.tables) of
3233	{value, {Tab, Where0, Snmp, RecName}} ->
3234	    Where = case Where0 of
3235			undefined ->
3236			    val({Tab, where_to_commit});
3237			_ ->
3238			    Where0
3239		    end,
3240	    {Rest, NRecs} = restore_tab_items([Rec | Recs], Tab,
3241					      RecName, Where, Snmp,
3242					      R#r.recs, R#r.insert_op),
3243	    restore_items(Rest, Header, Schema, Ext, R#r{recs = NRecs});
3244	false ->
3245	    Rest = skip_tab_items(Recs, Tab),
3246	    restore_items(Rest, Header, Schema, Ext, R)
3247    end;
3248
3249restore_items([], _Header, _Schema, _Ext, R) ->
3250    R.
3251
3252restore_func(Tab, R) ->
3253    case lists:keysearch(Tab, 1, R#r.table_options) of
3254	{value, {Tab, OP}} ->
3255	    OP;
3256	false ->
3257	    R#r.default_op
3258    end.
3259
3260where_to_commit(Tab, CsList) ->
3261    Ram =   [{N, ram_copies} || N <- pick(Tab, ram_copies, CsList, [])],
3262    Disc =  [{N, disc_copies} || N <- pick(Tab, disc_copies, CsList, [])],
3263    DiscO = [{N, disc_only_copies} ||
3264		N <- pick(Tab, disc_only_copies, CsList, [])],
3265    ExtNodes = [{Alias, Mod, pick(Tab, Alias, CsList, [])} ||
3266		   {Alias, Mod} <- get_ext_types()],
3267    Ext = lists:foldl(fun({Alias, Mod, Ns}, Acc) ->
3268			      [{N, {ext, Alias, Mod}} || N <- Ns] ++ Acc
3269		      end, [], ExtNodes),
3270    Ram ++ Disc ++ DiscO ++ Ext.
3271
3272%% Changes of the Meta info of schema itself is not allowed
3273restore_schema([{schema, schema, _List} | Schema], R) ->
3274    restore_schema(Schema, R);
3275restore_schema([{schema, Tab, List} | Schema], R) ->
3276    case restore_func(Tab, R) of
3277	clear_tables ->
3278	    do_clear_table(Tab),
3279	    Snmp = val({Tab, snmp}),
3280	    RecName = val({Tab, record_name}),
3281	    R2 = R#r{tables = [{Tab, undefined, Snmp, RecName} | R#r.tables]},
3282	    restore_schema(Schema, R2);
3283	recreate_tables ->
3284	    TidTs = case ?catch_val({Tab, cstruct}) of
3285			{'EXIT', _} ->
3286			    TTs = {_Mod, Tid, Ts} = get(mnesia_activity_state),
3287			    RunningNodes = val({current, db_nodes}),
3288			    Nodes = mnesia_lib:intersect(mnesia_lib:cs_to_nodes(list2cs(List)),
3289							 RunningNodes),
3290			    mnesia_locker:wlock_no_exist(Tid, Ts#tidstore.store, Tab, Nodes),
3291			    TTs;
3292			_ ->
3293			    get_tid_ts_and_lock(Tab, write)
3294		    end,
3295	    NC    = {cookie, ?unique_cookie},
3296	    List2 = lists:keyreplace(cookie, 1, List, NC),
3297	    Where = where_to_commit(Tab, List2),
3298	    Snmp  = pick(Tab, snmp, List2, []),
3299	    RecName = pick(Tab, record_name, List2, Tab),
3300	    insert_schema_ops(TidTs, [{op, restore_recreate, List2}]),
3301	    R2 = R#r{tables = [{Tab, Where, Snmp, RecName} | R#r.tables]},
3302	    restore_schema(Schema, R2);
3303	keep_tables ->
3304	    get_tid_ts_and_lock(Tab, write),
3305	    Snmp = val({Tab, snmp}),
3306	    RecName = val({Tab, record_name}),
3307	    R2 = R#r{tables = [{Tab, undefined, Snmp, RecName} | R#r.tables]},
3308	    restore_schema(Schema, R2);
3309	skip_tables ->
3310	    restore_schema(Schema, R)
3311    end;
3312
3313restore_schema([{schema, Tab} | Schema], R) ->
3314    do_delete_table(Tab),
3315    Tabs = lists:delete(Tab,R#r.tables),
3316    restore_schema(Schema, R#r{tables = Tabs});
3317restore_schema([], R) ->
3318    R.
3319
3320restore_tab_items([Rec | Rest], Tab, RecName, Where, Snmp, Recs, Op)
3321  when element(1, Rec) == Tab ->
3322    NewRecs = Op(Rec, Recs, RecName, Where, Snmp),
3323    restore_tab_items(Rest, Tab, RecName, Where, Snmp, NewRecs, Op);
3324
3325restore_tab_items(Rest, _Tab, _RecName, _Where, _Snmp, Recs, _Op) ->
3326    {Rest, Recs}.
3327
3328skip_tab_items([Rec| Rest], Tab)
3329  when element(1, Rec) == Tab ->
3330    skip_tab_items(Rest, Tab);
3331skip_tab_items(Recs, _) ->
3332    Recs.
3333
3334%%%%%%%%% Dump tables %%%%%%%%%%%%%
3335dump_tables(Tabs) when is_list(Tabs) ->
3336    schema_transaction(fun() -> do_dump_tables(Tabs) end);
3337dump_tables(Tabs) ->
3338    {aborted, {bad_type, Tabs}}.
3339
3340do_dump_tables(Tabs) ->
3341    TidTs = get_tid_ts_and_lock(schema, write),
3342    insert_schema_ops(TidTs, make_dump_tables(Tabs)).
3343
3344make_dump_tables([schema | _Tabs]) ->
3345    mnesia:abort({bad_type, schema});
3346make_dump_tables([Tab | Tabs]) ->
3347    get_tid_ts_and_lock(Tab, read),
3348    TabDef = get_create_list(Tab),
3349    DiscResident =
3350        val({Tab, disc_copies}) ++
3351        val({Tab, disc_only_copies}) ++
3352        lists:concat([Ns || {{A,M},Ns} <- val({Tab, external_copies}),
3353			    lists:member(
3354			      mnesia_lib:semantics({ext,A,M},storage),
3355			      [disc_copies, disc_only_copies])]),
3356    verify([], DiscResident,
3357	   {"Only allowed on ram_copies", Tab, DiscResident}),
3358    [{op, dump_table, unknown, TabDef} | make_dump_tables(Tabs)];
3359make_dump_tables([]) ->
3360    [].
3361
3362%% Merge the local schema with the schema on other nodes
3363merge_schema() ->
3364    schema_transaction(fun() -> do_merge_schema([]) end).
3365
3366merge_schema(UserFun) ->
3367    schema_transaction(fun() ->
3368                               UserFun(fun(Arg) -> do_merge_schema(Arg) end)
3369                       end).
3370
3371do_merge_schema(LockTabs0) ->
3372    {_Mod, Tid, Ts} = get_tid_ts_and_lock(schema, write),
3373    LockTabs = [{T, tab_to_nodes(T)} || T <- LockTabs0],
3374    [get_tid_ts_and_lock(T,write) || {T,_} <- LockTabs],
3375    Connected = val(recover_nodes),
3376    Running = val({current, db_nodes}),
3377    Store = Ts#tidstore.store,
3378    %% Verify that all nodes are locked that might not be the
3379    %% case, if this trans where queued when new nodes where added.
3380    case Running -- ets:lookup_element(Store, nodes, 2) of
3381	[] -> ok; %% All known nodes are locked
3382	Miss -> %% Abort! We don't want the sideeffects below to be executed
3383	    mnesia:abort({bad_commit, {missing_lock, Miss}})
3384    end,
3385    case Connected -- Running of
3386	[Node | _] = OtherNodes ->
3387	    %% Time for a schema merging party!
3388	    mnesia_locker:wlock_no_exist(Tid, Store, schema, [Node]),
3389            [mnesia_locker:wlock_no_exist(
3390               Tid, Store, T, mnesia_lib:intersect(Ns, OtherNodes))
3391             || {T,Ns} <- LockTabs],
3392	    case fetch_cstructs(Node) of
3393		{cstructs, Cstructs, RemoteRunning1} ->
3394		    LockedAlready = Running ++ [Node],
3395		    {New, Old} = mnesia_recover:connect_nodes(RemoteRunning1),
3396		    RemoteRunning = mnesia_lib:intersect(New ++ Old, RemoteRunning1),
3397		    if
3398			RemoteRunning /= RemoteRunning1 ->
3399			    mnesia_lib:error("Mnesia on ~p could not connect to node(s) ~p~n",
3400					     [node(), RemoteRunning1 -- RemoteRunning]),
3401			    mnesia:abort({node_not_running, RemoteRunning1 -- RemoteRunning});
3402			true -> ok
3403		    end,
3404		    NeedsLock = RemoteRunning -- LockedAlready,
3405		    mnesia_locker:wlock_no_exist(Tid, Store, schema, NeedsLock),
3406                    [mnesia_locker:wlock_no_exist(Tid, Store, T,
3407                                                  mnesia_lib:intersect(Ns,NeedsLock))
3408                     || {T,Ns} <- LockTabs],
3409
3410		    {value, SchemaCs} = lists:keysearch(schema, #cstruct.name, Cstructs),
3411		    SchemaDef = cs2list(false, SchemaCs),
3412		    %% Announce that Node is running
3413		    A = [{op, announce_im_running, node(), SchemaDef, Running, RemoteRunning}],
3414		    do_insert_schema_ops(Store, A),
3415
3416		    %% Introduce remote tables to local node
3417		    do_insert_schema_ops(Store, make_merge_schema(Node, false, Cstructs)),
3418
3419		    %% Introduce local tables to remote nodes
3420		    Tabs = val({schema, tables}),
3421		    Ops = [{op, merge_schema, get_create_list(T)}
3422			   || T <- Tabs,
3423			      not lists:keymember(T, #cstruct.name, Cstructs)],
3424		    do_insert_schema_ops(Store, Ops),
3425
3426		    %% Ensure that the txn will be committed on all nodes
3427		    NewNodes = RemoteRunning -- Running,
3428		    mnesia_lib:set(prepare_op, {announce_im_running,NewNodes}),
3429		    announce_im_running(NewNodes, SchemaCs),
3430		    {merged, Running, RemoteRunning};
3431		{error, Reason} ->
3432		    {"Cannot get cstructs", Node, Reason};
3433		{badrpc, Reason} ->
3434		    {"Cannot get cstructs", Node, {badrpc, Reason}}
3435	    end;
3436	[] ->
3437	    %% No more nodes to merge schema with
3438	    not_merged
3439    end.
3440
3441fetch_cstructs(Node) ->
3442    rpc:call(Node, mnesia_controller, get_remote_cstructs, []).
3443
3444tab_to_nodes(Tab) when is_atom(Tab) ->
3445    Cs = val({Tab, cstruct}),
3446    mnesia_lib:cs_to_nodes(Cs).
3447
3448make_merge_schema(Node, NeedsConv, [Cs | Cstructs]) ->
3449    Ops = do_make_merge_schema(Node, NeedsConv, Cs),
3450    Ops ++ make_merge_schema(Node, NeedsConv, Cstructs);
3451make_merge_schema(_Node, _, []) ->
3452    [].
3453
3454%% Merge definitions of schema table
3455do_make_merge_schema(Node, NeedsConv, RemoteCs = #cstruct{name = schema}) ->
3456    Cs = val({schema, cstruct}),
3457    Masters = mnesia_recover:get_master_nodes(schema),
3458    HasRemoteMaster = lists:member(Node, Masters),
3459    HasLocalMaster = lists:member(node(), Masters),
3460    Force = HasLocalMaster or HasRemoteMaster,
3461    %% What is the storage types opinions?
3462    StCsLocal   = mnesia_lib:cs_to_storage_type(node(), Cs),
3463    StRcsLocal  = mnesia_lib:cs_to_storage_type(node(), RemoteCs),
3464    StCsRemote  = mnesia_lib:cs_to_storage_type(Node, Cs),
3465    StRcsRemote = mnesia_lib:cs_to_storage_type(Node, RemoteCs),
3466
3467    if
3468	Cs#cstruct.cookie == RemoteCs#cstruct.cookie,
3469	Cs#cstruct.version == RemoteCs#cstruct.version ->
3470	    %% Great, we have the same cookie and version
3471	    %% and do not need to merge cstructs
3472	    [];
3473
3474	Cs#cstruct.cookie /= RemoteCs#cstruct.cookie,
3475	Cs#cstruct.disc_copies /= [],
3476	RemoteCs#cstruct.disc_copies /= [] ->
3477	    %% Both cstructs involves disc nodes
3478	    %% and we cannot merge them
3479	    if
3480		HasLocalMaster == true,
3481		HasRemoteMaster == false ->
3482		    %% Choose local cstruct,
3483		    %% since it's the master
3484		    [{op, merge_schema, cs2list(NeedsConv, Cs)}];
3485
3486		HasRemoteMaster == true,
3487		HasLocalMaster == false ->
3488		    %% Choose remote cstruct,
3489		    %% since it's the master
3490		    [{op, merge_schema, cs2list(NeedsConv, RemoteCs)}];
3491
3492		true ->
3493		    Str = io_lib:format("Incompatible schema cookies. "
3494					"Please, restart from old backup."
3495					"~w = ~w, ~w = ~w~n",
3496					[Node, cs2list(RemoteCs), node(), cs2list(Cs)]),
3497		    throw(Str)
3498	    end;
3499
3500	StCsLocal /= StRcsLocal, StRcsLocal /= unknown, StCsLocal /= ram_copies ->
3501	    Str = io_lib:format("Incompatible schema storage types (local). "
3502				"on ~w storage ~w, on ~w storage ~w~n",
3503				[node(), StCsLocal, Node, StRcsLocal]),
3504	    throw(Str);
3505	StCsRemote /= StRcsRemote, StCsRemote /= unknown, StRcsRemote /= ram_copies ->
3506	    Str = io_lib:format("Incompatible schema storage types (remote). "
3507				"on ~w cs ~w, on ~w rcs ~w~n",
3508				[node(), cs2list(Cs), Node, cs2list(RemoteCs)]),
3509	    throw(Str);
3510
3511     	Cs#cstruct.disc_copies /= [] ->
3512	    %% Choose local cstruct,
3513	    %% since it involves disc nodes
3514	    MergedCs = merge_cstructs(Cs, RemoteCs, Force),
3515	    [{op, merge_schema, cs2list(NeedsConv, MergedCs)}];
3516
3517	RemoteCs#cstruct.disc_copies /= [] ->
3518	    %% Choose remote cstruct,
3519	    %% since it involves disc nodes
3520	    MergedCs = merge_cstructs(RemoteCs, Cs, Force),
3521	    [{op, merge_schema, cs2list(NeedsConv, MergedCs)}];
3522
3523	Cs > RemoteCs ->
3524	    %% Choose remote cstruct
3525	    MergedCs = merge_cstructs(RemoteCs, Cs, Force),
3526	    [{op, merge_schema, cs2list(NeedsConv, MergedCs)}];
3527
3528	true ->
3529	    %% Choose local cstruct
3530	    MergedCs = merge_cstructs(Cs, RemoteCs, Force),
3531	    [{op, merge_schema, cs2list(NeedsConv, MergedCs)}]
3532    end;
3533
3534%% Merge definitions of normal table
3535do_make_merge_schema(Node, NeedsConv, RemoteCs = #cstruct{}) ->
3536    Tab = RemoteCs#cstruct.name,
3537    Masters = mnesia_recover:get_master_nodes(schema),
3538    HasRemoteMaster = lists:member(Node, Masters),
3539    HasLocalMaster = lists:member(node(), Masters),
3540    Force = HasLocalMaster or HasRemoteMaster,
3541    case ?catch_val({Tab, cstruct}) of
3542	{'EXIT', _} ->
3543	    %% A completely new table, created while Node was down
3544	    [{op, merge_schema, cs2list(NeedsConv, RemoteCs)}];
3545	Cs when Cs#cstruct.cookie == RemoteCs#cstruct.cookie ->
3546	    if
3547		Cs#cstruct.version == RemoteCs#cstruct.version ->
3548		    %% We have exactly the same version of the
3549		    %% table def
3550		    [];
3551
3552		Cs#cstruct.version > RemoteCs#cstruct.version ->
3553		    %% Oops, we have different versions
3554		    %% of the table def, lets merge them.
3555		    %% The only changes that may have occurred
3556		    %% is that new replicas may have been added.
3557		    MergedCs = merge_cstructs(Cs, RemoteCs, Force),
3558		    [{op, merge_schema, cs2list(NeedsConv, MergedCs)}];
3559
3560		Cs#cstruct.version < RemoteCs#cstruct.version ->
3561		    %% Oops, we have different versions
3562		    %% of the table def, lets merge them
3563		    MergedCs = merge_cstructs(RemoteCs, Cs, Force),
3564		    [{op, merge_schema, cs2list(NeedsConv, MergedCs)}]
3565	    end;
3566	Cs ->
3567	    %% Different cookies, not possible to merge
3568	    if
3569		HasLocalMaster == true,
3570		HasRemoteMaster == false ->
3571		    %% Choose local cstruct,
3572		    %% since it's the master
3573		    [{op, merge_schema, cs2list(NeedsConv, Cs)}];
3574
3575		HasRemoteMaster == true,
3576		HasLocalMaster == false ->
3577		    %% Choose remote cstruct,
3578		    %% since it's the master
3579		    [{op, merge_schema, cs2list(NeedsConv, RemoteCs)}];
3580
3581		true ->
3582		    Str = io_lib:format("Bad cookie in table definition"
3583					" ~w: ~w = ~w, ~w = ~w~n",
3584					[Tab, node(), Cs, Node, RemoteCs]),
3585		    throw(Str)
3586	    end
3587    end.
3588
3589%% Change of table definitions (cstructs) requires all replicas
3590%% of the table to be active. New replicas, db_nodes and tables
3591%% may however be added even if some replica is inactive. These
3592%% invariants must be enforced in order to allow merge of cstructs.
3593%%
3594%% Returns a new cstruct or issues a fatal error
3595merge_cstructs(Cs0, RemoteCs, Force) ->
3596    Cs = verify_cstruct(Cs0),
3597    try do_merge_cstructs(Cs, RemoteCs, Force) of
3598	MergedCs when is_record(MergedCs, cstruct) ->
3599	    MergedCs
3600    catch exit:{aborted, _Reason} when Force == true ->
3601	    Cs;
3602	  exit:Reason -> exit(Reason);
3603	  error:Reason -> exit(Reason)
3604    end.
3605
3606do_merge_cstructs(Cs, RemoteCs0, Force) ->
3607    RemoteCs = verify_cstruct(RemoteCs0),
3608    Ns = mnesia_lib:uniq(mnesia_lib:cs_to_nodes(Cs) ++
3609			 mnesia_lib:cs_to_nodes(RemoteCs)),
3610    {AnythingNew, MergedCs} =
3611	merge_storage_type(Ns, false, Cs, RemoteCs, Force),
3612    verify_cstruct(
3613      merge_versions(AnythingNew, MergedCs, RemoteCs, Force)).
3614
3615
3616merge_storage_type([N | Ns], AnythingNew, Cs, RemoteCs, Force) ->
3617    Local = mnesia_lib:cs_to_storage_type(N, Cs),
3618    Remote = mnesia_lib:cs_to_storage_type(N, RemoteCs),
3619    case compare_storage_type(true, Local, Remote) of
3620	{same, _Storage} ->
3621	    merge_storage_type(Ns, AnythingNew, Cs, RemoteCs, Force);
3622	{diff, Storage} ->
3623	    Cs2 = change_storage_type(N, Storage, Cs),
3624	    merge_storage_type(Ns, true, Cs2, RemoteCs, Force);
3625	incompatible when Force == true ->
3626	    merge_storage_type(Ns, AnythingNew, Cs, RemoteCs, Force);
3627	Other ->
3628	    Str = io_lib:format("Cannot merge storage type for node ~w "
3629				"in cstruct ~w with remote cstruct ~w (~w)~n",
3630				[N, Cs, RemoteCs, Other]),
3631	    throw(Str)
3632    end;
3633merge_storage_type([], AnythingNew, MergedCs, _RemoteCs, _Force) ->
3634    {AnythingNew, MergedCs}.
3635
3636compare_storage_type(_Retry, Any, Any) ->
3637    {same, Any};
3638compare_storage_type(_Retry, unknown, Any) ->
3639    {diff, Any};
3640compare_storage_type(_Retry, ram_copies, disc_copies) ->
3641    {diff, disc_copies};
3642compare_storage_type(_Retry, disc_copies, disc_only_copies) ->
3643    {diff, disc_only_copies};
3644compare_storage_type(true, One, Another) ->
3645    compare_storage_type(false, Another, One);
3646compare_storage_type(false, _One, _Another) ->
3647    incompatible.
3648
3649change_storage_type(N, ram_copies, Cs) ->
3650    Nodes = [N | Cs#cstruct.ram_copies],
3651    Cs#cstruct{ram_copies = mnesia_lib:uniq(Nodes)};
3652change_storage_type(N, disc_copies, Cs) ->
3653    Nodes = [N | Cs#cstruct.disc_copies],
3654    Cs#cstruct{disc_copies = mnesia_lib:uniq(Nodes)};
3655change_storage_type(N, disc_only_copies, Cs) ->
3656    Nodes = [N | Cs#cstruct.disc_only_copies],
3657    Cs#cstruct{disc_only_copies = mnesia_lib:uniq(Nodes)}.
3658
3659%% BUGBUG: Verify match of frag info; equalit demanded for all but add_node
3660
3661merge_versions(AnythingNew, Cs, RemoteCs, Force) ->
3662    if
3663	Cs#cstruct.name == schema ->
3664	    ok;
3665	Cs#cstruct.name /= schema,
3666	Cs#cstruct.cookie == RemoteCs#cstruct.cookie ->
3667	    ok;
3668	Force == true ->
3669	    ok;
3670	true ->
3671	    Str = io_lib:format("Bad cookies. Cannot merge definitions of "
3672				"table ~tw. Local = ~w, Remote = ~w~n",
3673				[Cs#cstruct.name, Cs, RemoteCs]),
3674	    throw(Str)
3675    end,
3676    if
3677	Cs#cstruct.name == RemoteCs#cstruct.name,
3678	Cs#cstruct.type == RemoteCs#cstruct.type,
3679	Cs#cstruct.local_content == RemoteCs#cstruct.local_content,
3680	Cs#cstruct.attributes == RemoteCs#cstruct.attributes,
3681	Cs#cstruct.index == RemoteCs#cstruct.index,
3682	Cs#cstruct.snmp == RemoteCs#cstruct.snmp,
3683	Cs#cstruct.access_mode == RemoteCs#cstruct.access_mode,
3684	Cs#cstruct.majority == RemoteCs#cstruct.majority,
3685	Cs#cstruct.load_order == RemoteCs#cstruct.load_order,
3686	Cs#cstruct.user_properties == RemoteCs#cstruct.user_properties ->
3687	    do_merge_versions(AnythingNew, Cs, RemoteCs);
3688	Force == true ->
3689	    do_merge_versions(AnythingNew, Cs, RemoteCs);
3690	true ->
3691	    Str1 = io_lib:format("Cannot merge definitions of "
3692				"table ~tw. Local = ~w, Remote = ~w~n",
3693				[Cs#cstruct.name, Cs, RemoteCs]),
3694	    throw(Str1)
3695    end.
3696
3697do_merge_versions(AnythingNew, MergedCs, RemoteCs) ->
3698    {{Major1, Minor1}, _Detail1} = MergedCs#cstruct.version,
3699    {{Major2, Minor2}, _Detail2} = RemoteCs#cstruct.version,
3700    if
3701	AnythingNew == false ->
3702	    MergedCs;
3703	MergedCs#cstruct.version == RemoteCs#cstruct.version ->
3704	    V = {{Major1, Minor1}, dummy},
3705	    incr_version(MergedCs#cstruct{version = V});
3706	Major1 == Major2 ->
3707	    Minor = lists:max([Minor1, Minor2]),
3708	    V = {{Major1, Minor}, dummy},
3709	    incr_version(MergedCs#cstruct{version = V});
3710	Major1 /= Major2 ->
3711	    Major = lists:max([Major1, Major2]),
3712	    V = {{Major, 0}, dummy},
3713	    incr_version(MergedCs#cstruct{version = V})
3714    end.
3715
3716%% Verify the basics
3717verify_merge(RemoteCs) ->
3718    Tab = RemoteCs#cstruct.name,
3719    Masters = mnesia_recover:get_master_nodes(schema),
3720    HasRemoteMaster = Masters /= [],
3721    case ?catch_val({Tab, cstruct}) of
3722	{'EXIT', _} ->
3723	    ok;
3724	Cs ->
3725	    StCsLocal   = mnesia_lib:cs_to_storage_type(node(), Cs),
3726	    StRcsLocal  = mnesia_lib:cs_to_storage_type(node(), RemoteCs),
3727	    if
3728		StCsLocal  == StRcsLocal ->   ok;
3729		StCsLocal  == unknown ->      ok;
3730		(StRcsLocal == unknown), (HasRemoteMaster == false) ->
3731		    {merge_error, Cs, RemoteCs};
3732		%%  Trust the merger
3733		true  -> ok
3734	    end
3735    end.
3736
3737announce_im_running([N | Ns], SchemaCs) ->
3738    {L1, L2} = mnesia_recover:connect_nodes([N]),
3739    case lists:member(N, L1) or lists:member(N, L2) of
3740	true ->
3741	    mnesia_lib:add({current, db_nodes}, N),
3742	    mnesia_controller:add_active_replica(schema, N, SchemaCs);
3743	false ->
3744	    mnesia_lib:error("Mnesia on ~p could not connect to node ~p~n",
3745			     [node(), N]),
3746	    mnesia:abort({node_not_running, N})
3747    end,
3748    announce_im_running(Ns, SchemaCs);
3749announce_im_running([], _) ->
3750    [].
3751
3752unannounce_im_running([N | Ns]) ->
3753    mnesia_lib:del({current, db_nodes}, N),
3754    mnesia_controller:del_active_replica(schema, N),
3755    unannounce_im_running(Ns);
3756unannounce_im_running([]) ->
3757    ok.
3758