1%% ``Licensed under the Apache License, Version 2.0 (the "License");
2%% you may not use this file except in compliance with the License.
3%% You may obtain a copy of the License at
4%%
5%%     http://www.apache.org/licenses/LICENSE-2.0
6%%
7%% Unless required by applicable law or agreed to in writing, software
8%% distributed under the License is distributed on an "AS IS" BASIS,
9%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10%% See the License for the specific language governing permissions and
11%% limitations under the License.
12%%
13%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
14%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
15%% AB. All Rights Reserved.''
16%%
17%%     $Id: mnesia_dumper.erl,v 1.1 2008/12/17 09:53:38 mikpe Exp $
18%%
19-module(mnesia_dumper).
20
21%% The InitBy arg may be one of the following:
22%% scan_decisions     Initial scan for decisions
23%% startup	      Initial dump during startup
24%% schema_prepare     Dump initiated during schema transaction preparation
25%% schema_update      Dump initiated during schema transaction commit
26%% fast_schema_update A schema_update, but ignores the log file
27%% user		      Dump initiated by user
28%% write_threshold    Automatic dump caused by too many log writes
29%% time_threshold     Automatic dump caused by timeout
30
31%% Public interface
32-export([
33	 get_log_writes/0,
34	 incr_log_writes/0,
35	 raw_dump_table/2,
36	 raw_named_dump_table/2,
37	 start_regulator/0,
38	 opt_dump_log/1,
39	 update/3
40	]).
41
42 %% Internal stuff
43-export([regulator_init/1]).
44
45-include("mnesia.hrl").
46-include_lib("kernel/include/file.hrl").
47
48-import(mnesia_lib, [fatal/2, dbg_out/2]).
49
50-define(REGULATOR_NAME, mnesia_dumper_load_regulator).
51-define(DumpToEtsMultiplier, 4).
52
53-record(state, {initiated_by = nobody,
54		dumper = nopid,
55		regulator_pid,
56		supervisor_pid,
57		queue = [],
58		timeout}).
59
60get_log_writes() ->
61    Max = mnesia_monitor:get_env(dump_log_write_threshold),
62    Prev = mnesia_lib:read_counter(trans_log_writes),
63    Left = mnesia_lib:read_counter(trans_log_writes_left),
64    Diff = Max - Left,
65    Prev + Diff.
66
67incr_log_writes() ->
68    Left = mnesia_lib:incr_counter(trans_log_writes_left, -1),
69    if
70	Left > 0 ->
71	    ignore;
72	true ->
73	    adjust_log_writes(true)
74    end.
75
76adjust_log_writes(DoCast) ->
77    Token = {mnesia_adjust_log_writes, self()},
78    case global:set_lock(Token, [node()], 1) of
79	false ->
80	    ignore; %% Somebody else is sending a dump request
81	true ->
82	    case DoCast of
83		false ->
84		    ignore;
85		true ->
86		    mnesia_controller:async_dump_log(write_threshold)
87	    end,
88	    Max = mnesia_monitor:get_env(dump_log_write_threshold),
89	    Left = mnesia_lib:read_counter(trans_log_writes_left),
90	    %% Don't care if we lost a few writes
91	    mnesia_lib:set_counter(trans_log_writes_left, Max),
92	    Diff = Max - Left,
93	    mnesia_lib:incr_counter(trans_log_writes, Diff),
94	    global:del_lock(Token, [node()])
95    end.
96
97%% Returns 'ok' or exits
98opt_dump_log(InitBy) ->
99    Reg = case whereis(?REGULATOR_NAME) of
100	      undefined ->
101		  nopid;
102	      Pid when pid(Pid) ->
103		  Pid
104	  end,
105    perform_dump(InitBy, Reg).
106
107%% Scan for decisions
108perform_dump(InitBy, Regulator) when InitBy == scan_decisions ->
109    ?eval_debug_fun({?MODULE, perform_dump}, [InitBy]),
110
111    dbg_out("Transaction log dump initiated by ~w~n", [InitBy]),
112    scan_decisions(mnesia_log:previous_log_file(), InitBy, Regulator),
113    scan_decisions(mnesia_log:latest_log_file(), InitBy, Regulator);
114
115%% Propagate the log into the DAT-files
116perform_dump(InitBy, Regulator) ->
117    ?eval_debug_fun({?MODULE, perform_dump}, [InitBy]),
118    LogState = mnesia_log:prepare_log_dump(InitBy),
119    dbg_out("Transaction log dump initiated by ~w: ~w~n",
120	    [InitBy, LogState]),
121    adjust_log_writes(false),
122    mnesia_recover:allow_garb(),
123    case LogState of
124	already_dumped ->
125	    dumped;
126	{needs_dump, Diff} ->
127	    U = mnesia_monitor:get_env(dump_log_update_in_place),
128	    Cont = mnesia_log:init_log_dump(),
129	    case catch do_perform_dump(Cont, U, InitBy, Regulator, undefined) of
130		ok ->
131		    ?eval_debug_fun({?MODULE, post_dump}, [InitBy]),
132		    case mnesia_monitor:use_dir() of
133			true ->
134			    mnesia_recover:dump_decision_tab();
135			false ->
136			    mnesia_log:purge_some_logs()
137		    end,
138		    %% And now to the crucial point...
139		    mnesia_log:confirm_log_dump(Diff);
140		{error, Reason} ->
141		    {error, Reason};
142		{'EXIT', {Desc, Reason}} ->
143		    case mnesia_monitor:get_env(auto_repair) of
144			true ->
145			    mnesia_lib:important(Desc, Reason),
146			    %% Ignore rest of the log
147			    mnesia_log:confirm_log_dump(Diff);
148			false ->
149			    fatal(Desc, Reason)
150		    end
151	    end;
152	{error, Reason} ->
153	    {error, {"Cannot prepare log dump", Reason}}
154    end.
155
156scan_decisions(Fname, InitBy, Regulator) ->
157    Exists = mnesia_lib:exists(Fname),
158    case Exists of
159	false ->
160	    ok;
161	true ->
162	    Header = mnesia_log:trans_log_header(),
163	    Name = previous_log,
164	    mnesia_log:open_log(Name, Header, Fname, Exists,
165				mnesia_monitor:get_env(auto_repair), read_only),
166	    Cont = start,
167	    Res = (catch do_perform_dump(Cont, false, InitBy, Regulator, undefined)),
168	    mnesia_log:close_log(Name),
169	    case Res of
170		ok -> ok;
171		{'EXIT', Reason} -> {error, Reason}
172	    end
173    end.
174
175do_perform_dump(Cont, InPlace, InitBy, Regulator, OldVersion) ->
176    case mnesia_log:chunk_log(Cont) of
177	{C2, Recs} ->
178	    case catch insert_recs(Recs, InPlace, InitBy, Regulator, OldVersion) of
179		{'EXIT', R} ->
180		    Reason = {"Transaction log dump error: ~p~n", [R]},
181		    close_files(InPlace, {error, Reason}, InitBy),
182		    exit(Reason);
183		Version ->
184		    do_perform_dump(C2, InPlace, InitBy, Regulator, Version)
185	    end;
186	eof ->
187	    close_files(InPlace, ok, InitBy),
188	    ok
189    end.
190
191insert_recs([Rec | Recs], InPlace, InitBy, Regulator, LogV) ->
192    regulate(Regulator),
193    case insert_rec(Rec, InPlace, InitBy, LogV) of
194	LogH when record(LogH, log_header) ->
195	    insert_recs(Recs, InPlace, InitBy, Regulator, LogH#log_header.log_version);
196	_ ->
197	    insert_recs(Recs, InPlace, InitBy, Regulator, LogV)
198    end;
199
200insert_recs([], _InPlace, _InitBy, _Regulator, Version) ->
201    Version.
202
203insert_rec(Rec, _InPlace, scan_decisions, _LogV) ->
204    if
205	record(Rec, commit) ->
206	    ignore;
207	record(Rec, log_header) ->
208	    ignore;
209	true ->
210	    mnesia_recover:note_log_decision(Rec, scan_decisions)
211    end;
212insert_rec(Rec, InPlace, InitBy, LogV) when record(Rec, commit) ->
213    %% Determine the Outcome of the transaction and recover it
214    D = Rec#commit.decision,
215    case mnesia_recover:wait_for_decision(D, InitBy) of
216	{Tid, committed} ->
217	    do_insert_rec(Tid, Rec, InPlace, InitBy, LogV);
218	{Tid, aborted} ->
219	    mnesia_schema:undo_prepare_commit(Tid, Rec)
220    end;
221insert_rec(H, _InPlace, _InitBy, _LogV) when record(H, log_header) ->
222    CurrentVersion = mnesia_log:version(),
223    if
224        H#log_header.log_kind /= trans_log ->
225	    exit({"Bad kind of transaction log", H});
226	H#log_header.log_version == CurrentVersion ->
227	    ok;
228	H#log_header.log_version == "4.2" ->
229	    ok;
230	H#log_header.log_version == "4.1" ->
231	    ok;
232	H#log_header.log_version == "4.0" ->
233	    ok;
234	true ->
235	    fatal("Bad version of transaction log: ~p~n", [H])
236    end,
237    H;
238
239insert_rec(_Rec, _InPlace, _InitBy, _LogV) ->
240    ok.
241
242do_insert_rec(Tid, Rec, InPlace, InitBy, LogV) ->
243    case Rec#commit.schema_ops of
244	[] ->
245	    ignore;
246	SchemaOps ->
247	    case val({schema, storage_type}) of
248		ram_copies ->
249		    insert_ops(Tid, schema_ops, SchemaOps, InPlace, InitBy, LogV);
250	        Storage ->
251		    true = open_files(schema, Storage, InPlace, InitBy),
252		    insert_ops(Tid, schema_ops, SchemaOps, InPlace, InitBy, LogV)
253	    end
254    end,
255    D = Rec#commit.disc_copies,
256    insert_ops(Tid, disc_copies, D, InPlace, InitBy, LogV),
257    case InitBy of
258	startup ->
259	    DO = Rec#commit.disc_only_copies,
260	    insert_ops(Tid, disc_only_copies, DO, InPlace, InitBy, LogV);
261	_ ->
262	    ignore
263    end.
264
265
266update(_Tid, [], _DumperMode) ->
267    dumped;
268update(Tid, SchemaOps, DumperMode) ->
269    UseDir = mnesia_monitor:use_dir(),
270    Res = perform_update(Tid, SchemaOps, DumperMode, UseDir),
271    mnesia_controller:release_schema_commit_lock(),
272    Res.
273
274perform_update(_Tid, _SchemaOps, mandatory, true) ->
275    %% Force a dump of the transaction log in order to let the
276    %% dumper perform needed updates
277
278    InitBy = schema_update,
279    ?eval_debug_fun({?MODULE, dump_schema_op}, [InitBy]),
280    opt_dump_log(InitBy);
281perform_update(Tid, SchemaOps, _DumperMode, _UseDir) ->
282    %% No need for a full transaction log dump.
283    %% Ignore the log file and perform only perform
284    %% the corresponding updates.
285
286    InitBy = fast_schema_update,
287    InPlace = mnesia_monitor:get_env(dump_log_update_in_place),
288    ?eval_debug_fun({?MODULE, dump_schema_op}, [InitBy]),
289    case catch insert_ops(Tid, schema_ops, SchemaOps, InPlace, InitBy,
290			  mnesia_log:version()) of
291	{'EXIT', Reason} ->
292	    Error = {error, {"Schema update error", Reason}},
293	    close_files(InPlace, Error, InitBy),
294            fatal("Schema update error ~p ~p", [Reason, SchemaOps]);
295	_ ->
296	    ?eval_debug_fun({?MODULE, post_dump}, [InitBy]),
297	    close_files(InPlace, ok, InitBy),
298	    ok
299    end.
300
301insert_ops(_Tid, _Storage, [], _InPlace, _InitBy, _) ->    ok;
302insert_ops(Tid, Storage, [Op], InPlace, InitBy, Ver)  when Ver >= "4.3"->
303    insert_op(Tid, Storage, Op, InPlace, InitBy),
304    ok;
305insert_ops(Tid, Storage, [Op | Ops], InPlace, InitBy, Ver)  when Ver >= "4.3"->
306    insert_op(Tid, Storage, Op, InPlace, InitBy),
307    insert_ops(Tid, Storage, Ops, InPlace, InitBy, Ver);
308insert_ops(Tid, Storage, [Op | Ops], InPlace, InitBy, Ver) when Ver < "4.3" ->
309    insert_ops(Tid, Storage, Ops, InPlace, InitBy, Ver),
310    insert_op(Tid, Storage, Op, InPlace, InitBy).
311
312%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
313%% Normal ops
314
315disc_insert(_Tid, Storage, Tab, Key, Val, Op, InPlace, InitBy) ->
316    case open_files(Tab, Storage, InPlace, InitBy) of
317	true ->
318	    case Storage of
319		disc_copies when Tab /= schema ->
320		    mnesia_log:append({?MODULE,Tab}, {{Tab, Key}, Val, Op}),
321		    ok;
322		_ ->
323		    case Op of
324			write ->
325			    ok = dets:insert(Tab, Val);
326			delete ->
327			    ok = dets:delete(Tab, Key);
328			update_counter ->
329			    {RecName, Incr} = Val,
330			    case catch dets:update_counter(Tab, Key, Incr) of
331				CounterVal when integer(CounterVal) ->
332				    ok;
333				_ ->
334				    Zero = {RecName, Key, 0},
335				    ok = dets:insert(Tab, Zero)
336			    end;
337			delete_object ->
338			    ok = dets:delete_object(Tab, Val);
339			clear_table ->
340			    ok = dets:match_delete(Tab, '_')
341		    end
342	    end;
343	false ->
344	    ignore
345    end.
346
347insert(Tid, Storage, Tab, Key, [Val | Tail], Op, InPlace, InitBy) ->
348    insert(Tid, Storage, Tab, Key, Val, Op, InPlace, InitBy),
349    insert(Tid, Storage, Tab, Key, Tail, Op, InPlace, InitBy);
350
351insert(_Tid, _Storage, _Tab, _Key, [], _Op, _InPlace, _InitBy) ->
352    ok;
353
354insert(Tid, Storage, Tab, Key, Val, Op, InPlace, InitBy) ->
355    Item = {{Tab, Key}, Val, Op},
356    case InitBy of
357	startup ->
358	    disc_insert(Tid, Storage, Tab, Key, Val, Op, InPlace, InitBy);
359
360	_ when Storage == ram_copies ->
361	    mnesia_tm:do_update_op(Tid, Storage, Item),
362	    Snmp = mnesia_tm:prepare_snmp(Tab, Key, [Item]),
363	    mnesia_tm:do_snmp(Tid, Snmp);
364
365	_ when Storage == disc_copies ->
366	    disc_insert(Tid, Storage, Tab, Key, Val, Op, InPlace, InitBy),
367	    mnesia_tm:do_update_op(Tid, Storage, Item),
368	    Snmp = mnesia_tm:prepare_snmp(Tab, Key, [Item]),
369	    mnesia_tm:do_snmp(Tid, Snmp);
370
371	_ when Storage == disc_only_copies ->
372	    mnesia_tm:do_update_op(Tid, Storage, Item),
373	    Snmp = mnesia_tm:prepare_snmp(Tab, Key, [Item]),
374	    mnesia_tm:do_snmp(Tid, Snmp);
375
376	_ when Storage == unknown ->
377	    ignore
378    end.
379
380disc_delete_table(Tab, Storage) ->
381    case mnesia_monitor:use_dir() of
382	true ->
383	    if
384		Storage == disc_only_copies; Tab == schema ->
385		    mnesia_monitor:unsafe_close_dets(Tab),
386		    Dat = mnesia_lib:tab2dat(Tab),
387		    file:delete(Dat);
388		true ->
389		    DclFile = mnesia_lib:tab2dcl(Tab),
390		    case get({?MODULE,Tab}) of
391			{opened_dumper, dcl} ->
392			    del_opened_tab(Tab),
393			    mnesia_log:unsafe_close_log(Tab);
394			_ ->
395			    ok
396		    end,
397		    file:delete(DclFile),
398		    DcdFile = mnesia_lib:tab2dcd(Tab),
399		    file:delete(DcdFile),
400		    ok
401	    end,
402	    erase({?MODULE, Tab});
403	false ->
404	    ignore
405    end.
406
407disc_delete_indecies(_Tab, _Cs, Storage) when Storage /= disc_only_copies ->
408    ignore;
409disc_delete_indecies(Tab, Cs, disc_only_copies) ->
410    Indecies = Cs#cstruct.index,
411    mnesia_index:del_transient(Tab, Indecies, disc_only_copies).
412
413insert_op(Tid, Storage, {{Tab, Key}, Val, Op}, InPlace, InitBy) ->
414    %% Propagate to disc only
415    disc_insert(Tid, Storage, Tab, Key, Val, Op, InPlace, InitBy);
416
417%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
418%% NOTE that all operations below will only
419%% be performed if the dump is initiated by
420%% startup or fast_schema_update
421%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
422
423insert_op(_Tid, schema_ops, _OP, _InPlace, Initby)
424  when Initby /= startup,
425       Initby /= fast_schema_update,
426       Initby /= schema_update ->
427    ignore;
428
429insert_op(Tid, _, {op, rec, Storage, Item}, InPlace, InitBy) ->
430    {{Tab, Key}, ValList, Op} = Item,
431    insert(Tid, Storage, Tab, Key, ValList, Op, InPlace, InitBy);
432
433insert_op(Tid, _, {op, change_table_copy_type, N, FromS, ToS, TabDef}, InPlace, InitBy) ->
434    Cs = mnesia_schema:list2cs(TabDef),
435    Val = mnesia_schema:insert_cstruct(Tid, Cs, true), % Update ram only
436    {schema, Tab, _} = Val,
437    if
438	InitBy /= startup ->
439	    mnesia_controller:add_active_replica(Tab, N, Cs);
440	true ->
441	    ignore
442    end,
443    if
444	N == node() ->
445	    Dmp  = mnesia_lib:tab2dmp(Tab),
446	    Dat  = mnesia_lib:tab2dat(Tab),
447	    Dcd  = mnesia_lib:tab2dcd(Tab),
448	    Dcl  = mnesia_lib:tab2dcl(Tab),
449	    case {FromS, ToS} of
450		{ram_copies, disc_copies} when Tab == schema ->
451		    ok = ensure_rename(Dmp, Dat);
452		{ram_copies, disc_copies} ->
453		    file:delete(Dcl),
454		    ok = ensure_rename(Dmp, Dcd);
455		{disc_copies, ram_copies} when Tab == schema ->
456		    mnesia_lib:set(use_dir, false),
457		    mnesia_monitor:unsafe_close_dets(Tab),
458		    file:delete(Dat);
459		{disc_copies, ram_copies} ->
460		    file:delete(Dcl),
461		    file:delete(Dcd);
462		{ram_copies, disc_only_copies} ->
463		    ok = ensure_rename(Dmp, Dat),
464		    true = open_files(Tab, disc_only_copies, InPlace, InitBy),
465		    %% ram_delete_table must be done before init_indecies,
466		    %% it uses info which is reset in init_indecies,
467		    %% it doesn't matter, because init_indecies don't use
468		    %% the ram replica of the table when creating the disc
469		    %% index; Could be improved :)
470		    mnesia_schema:ram_delete_table(Tab, FromS),
471		    PosList = Cs#cstruct.index,
472		    mnesia_index:init_indecies(Tab, disc_only_copies, PosList);
473		{disc_only_copies, ram_copies} ->
474		    mnesia_monitor:unsafe_close_dets(Tab),
475		    disc_delete_indecies(Tab, Cs, disc_only_copies),
476		    case InitBy of
477			startup ->
478			    ignore;
479			_ ->
480			    mnesia_controller:get_disc_copy(Tab)
481		    end,
482		    disc_delete_table(Tab, disc_only_copies);
483		{disc_copies, disc_only_copies} ->
484		    ok = ensure_rename(Dmp, Dat),
485		    true = open_files(Tab, disc_only_copies, InPlace, InitBy),
486		    mnesia_schema:ram_delete_table(Tab, FromS),
487		    PosList = Cs#cstruct.index,
488		    mnesia_index:init_indecies(Tab, disc_only_copies, PosList),
489		    file:delete(Dcl),
490		    file:delete(Dcd);
491		{disc_only_copies, disc_copies} ->
492		    mnesia_monitor:unsafe_close_dets(Tab),
493		    disc_delete_indecies(Tab, Cs, disc_only_copies),
494		    case InitBy of
495			startup ->
496			    ignore;
497			_ ->
498			    mnesia_log:ets2dcd(Tab),
499			    mnesia_controller:get_disc_copy(Tab),
500			    disc_delete_table(Tab, disc_only_copies)
501		    end
502	    end;
503	true ->
504	    ignore
505    end,
506    S = val({schema, storage_type}),
507    disc_insert(Tid, S, schema, Tab, Val, write, InPlace, InitBy);
508
509insert_op(Tid, _, {op, transform, _Fun, TabDef}, InPlace, InitBy) ->
510    Cs = mnesia_schema:list2cs(TabDef),
511    case mnesia_lib:cs_to_storage_type(node(), Cs) of
512	disc_copies ->
513	    open_dcl(Cs#cstruct.name);
514	_ ->
515	    ignore
516    end,
517    insert_cstruct(Tid, Cs, true, InPlace, InitBy);
518
519%%%  Operations below this are handled without using the logg.
520
521insert_op(Tid, _, {op, restore_recreate, TabDef}, InPlace, InitBy) ->
522    Cs = mnesia_schema:list2cs(TabDef),
523    Tab = Cs#cstruct.name,
524    Type = Cs#cstruct.type,
525    Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
526    %% Delete all possbibly existing files and tables
527    disc_delete_table(Tab, Storage),
528    disc_delete_indecies(Tab, Cs, Storage),
529    case InitBy of
530	startup ->
531	    ignore;
532	_ ->
533	    mnesia_schema:ram_delete_table(Tab, Storage),
534	    mnesia_checkpoint:tm_del_copy(Tab, node())
535    end,
536    %%    delete_cstruct(Tid, Cs, InPlace, InitBy),
537    %% And create new ones..
538    if
539	(InitBy == startup) or (Storage == unknown) ->
540	    ignore;
541	Storage == ram_copies ->
542	    Args = [{keypos, 2}, public, named_table, Type],
543	    mnesia_monitor:mktab(Tab, Args);
544	Storage == disc_copies ->
545	    Args = [{keypos, 2}, public, named_table, Type],
546	    mnesia_monitor:mktab(Tab, Args),
547	    File = mnesia_lib:tab2dcd(Tab),
548	    FArg = [{file, File}, {name, {mnesia,create}},
549		    {repair, false}, {mode, read_write}],
550	    {ok, Log} = mnesia_monitor:open_log(FArg),
551	    mnesia_monitor:unsafe_close_log(Log);
552	Storage == disc_only_copies ->
553	    File = mnesia_lib:tab2dat(Tab),
554	    file:delete(File),
555	    Args = [{file, mnesia_lib:tab2dat(Tab)},
556		    {type, mnesia_lib:disk_type(Tab, Type)},
557		    {keypos, 2},
558		    {repair, mnesia_monitor:get_env(auto_repair)}],
559	    mnesia_monitor:open_dets(Tab, Args)
560    end,
561    insert_op(Tid, ignore, {op, create_table, TabDef}, InPlace, InitBy);
562
563insert_op(Tid, _, {op, create_table, TabDef}, InPlace, InitBy) ->
564    Cs = mnesia_schema:list2cs(TabDef),
565    insert_cstruct(Tid, Cs, false, InPlace, InitBy),
566    Tab = Cs#cstruct.name,
567    Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
568    case InitBy of
569	startup ->
570	    case Storage of
571		unknown ->
572		    ignore;
573		ram_copies ->
574		    ignore;
575		disc_copies ->
576		    Dcd = mnesia_lib:tab2dcd(Tab),
577		    case mnesia_lib:exists(Dcd) of
578			true -> ignore;
579			false ->
580			    mnesia_log:open_log(temp,
581						mnesia_log:dcl_log_header(),
582						Dcd,
583						false,
584						false,
585						read_write),
586			    mnesia_log:unsafe_close_log(temp)
587		    end;
588		_ ->
589		    Args = [{file, mnesia_lib:tab2dat(Tab)},
590			    {type, mnesia_lib:disk_type(Tab, Cs#cstruct.type)},
591			    {keypos, 2},
592			    {repair, mnesia_monitor:get_env(auto_repair)}],
593		    case mnesia_monitor:open_dets(Tab, Args) of
594			{ok, _} ->
595			    mnesia_monitor:unsafe_close_dets(Tab);
596			{error, Error} ->
597			    exit({"Failed to create dets table", Error})
598		    end
599	    end;
600	_ ->
601	    Copies = mnesia_lib:copy_holders(Cs),
602	    Active = mnesia_lib:intersect(Copies, val({current, db_nodes})),
603	    [mnesia_controller:add_active_replica(Tab, N, Cs) || N <- Active],
604
605	    case Storage of
606		unknown ->
607		    case Cs#cstruct.local_content of
608			true ->
609			    ignore;
610			false ->
611			    mnesia_lib:set_remote_where_to_read(Tab)
612		    end;
613		_ ->
614		    case Cs#cstruct.local_content of
615			true ->
616			    mnesia_lib:set_local_content_whereabouts(Tab);
617			false ->
618			    mnesia_lib:set({Tab, where_to_read}, node())
619		    end,
620		    case Storage of
621			ram_copies ->
622			    ignore;
623			_ ->
624			    %% Indecies are still created by loader
625			    disc_delete_indecies(Tab, Cs, Storage)
626			    %% disc_delete_table(Tab, Storage)
627		    end,
628
629		    %% Update whereabouts and create table
630		    mnesia_controller:create_table(Tab)
631	    end
632    end;
633
634insert_op(_Tid, _, {op, dump_table, Size, TabDef}, _InPlace, _InitBy) ->
635    case Size of
636	unknown ->
637	    ignore;
638	_ ->
639	    Cs = mnesia_schema:list2cs(TabDef),
640	    Tab = Cs#cstruct.name,
641	    Dmp = mnesia_lib:tab2dmp(Tab),
642	    Dat = mnesia_lib:tab2dcd(Tab),
643	    case Size of
644		0 ->
645		    %% Assume that table files already are closed
646		    file:delete(Dmp),
647		    file:delete(Dat);
648		_ ->
649		    ok = ensure_rename(Dmp, Dat)
650	    end
651    end;
652
653insert_op(Tid, _, {op, delete_table, TabDef}, InPlace, InitBy) ->
654    Cs = mnesia_schema:list2cs(TabDef),
655    Tab = Cs#cstruct.name,
656    case mnesia_lib:cs_to_storage_type(node(), Cs) of
657	unknown ->
658	    ignore;
659	Storage ->
660	    disc_delete_table(Tab, Storage),
661	    disc_delete_indecies(Tab, Cs, Storage),
662	    case InitBy of
663		startup ->
664		    ignore;
665		_ ->
666		    mnesia_schema:ram_delete_table(Tab, Storage),
667		    mnesia_checkpoint:tm_del_copy(Tab, node())
668	    end
669    end,
670    delete_cstruct(Tid, Cs, InPlace, InitBy);
671
672insert_op(Tid, _, {op, clear_table, TabDef}, InPlace, InitBy) ->
673    Cs = mnesia_schema:list2cs(TabDef),
674    Tab = Cs#cstruct.name,
675    case mnesia_lib:cs_to_storage_type(node(), Cs) of
676	unknown ->
677	    ignore;
678	Storage ->
679	    Oid = '_', %%val({Tab, wild_pattern}),
680	    if Storage == disc_copies ->
681		    open_dcl(Cs#cstruct.name);
682	       true ->
683		    ignore
684	    end,
685	    insert(Tid, Storage, Tab, '_', Oid, clear_table, InPlace, InitBy)
686    end;
687
688insert_op(Tid, _, {op, merge_schema, TabDef}, InPlace, InitBy) ->
689    Cs = mnesia_schema:list2cs(TabDef),
690    insert_cstruct(Tid, Cs, false, InPlace, InitBy);
691
692insert_op(Tid, _, {op, del_table_copy, Storage, Node, TabDef}, InPlace, InitBy) ->
693    Cs = mnesia_schema:list2cs(TabDef),
694    Tab = Cs#cstruct.name,
695    if
696	Tab == schema, Storage == ram_copies ->
697	    insert_cstruct(Tid, Cs, true, InPlace, InitBy);
698        Tab /= schema ->
699	    mnesia_controller:del_active_replica(Tab, Node),
700	    mnesia_lib:del({Tab, Storage}, Node),
701	    if
702		Node == node() ->
703		    case Cs#cstruct.local_content of
704			true -> mnesia_lib:set({Tab, where_to_read}, nowhere);
705			false -> mnesia_lib:set_remote_where_to_read(Tab)
706		    end,
707		    mnesia_lib:del({schema, local_tables}, Tab),
708		    mnesia_lib:set({Tab, storage_type}, unknown),
709		    insert_cstruct(Tid, Cs, true, InPlace, InitBy),
710		    disc_delete_table(Tab, Storage),
711		    disc_delete_indecies(Tab, Cs, Storage),
712		    mnesia_schema:ram_delete_table(Tab, Storage),
713		    mnesia_checkpoint:tm_del_copy(Tab, Node);
714		true ->
715		    case val({Tab, where_to_read}) of
716			Node ->
717			    mnesia_lib:set_remote_where_to_read(Tab);
718			_  ->
719			    ignore
720		    end,
721		    insert_cstruct(Tid, Cs, true, InPlace, InitBy)
722	    end
723    end;
724
725insert_op(Tid, _, {op, add_table_copy, _Storage, _Node, TabDef}, InPlace, InitBy) ->
726    %% During prepare commit, the files was created
727    %% and the replica was announced
728    Cs = mnesia_schema:list2cs(TabDef),
729    insert_cstruct(Tid, Cs, true, InPlace, InitBy);
730
731insert_op(Tid, _, {op, add_snmp, _Us, TabDef}, InPlace, InitBy) ->
732    Cs = mnesia_schema:list2cs(TabDef),
733    insert_cstruct(Tid, Cs, true, InPlace, InitBy);
734
735insert_op(Tid, _, {op, del_snmp, TabDef}, InPlace, InitBy) ->
736    Cs = mnesia_schema:list2cs(TabDef),
737    Tab = Cs#cstruct.name,
738    Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
739    if
740	InitBy /= startup,
741	Storage /= unknown ->
742	    case ?catch_val({Tab, {index, snmp}}) of
743		{'EXIT', _} ->
744		    ignore;
745		Stab ->
746		    mnesia_snmp_hook:delete_table(Tab, Stab),
747		    mnesia_lib:unset({Tab, {index, snmp}})
748	    end;
749	true ->
750	    ignore
751    end,
752    insert_cstruct(Tid, Cs, true, InPlace, InitBy);
753
754insert_op(Tid, _, {op, add_index, Pos, TabDef}, InPlace, InitBy) ->
755    Cs = mnesia_schema:list2cs(TabDef),
756    Tab = insert_cstruct(Tid, Cs, true, InPlace, InitBy),
757    Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
758    case InitBy of
759	startup when Storage == disc_only_copies ->
760	    mnesia_index:init_indecies(Tab, Storage, [Pos]);
761	startup ->
762	    ignore;
763	_  ->
764	    mnesia_index:init_indecies(Tab, Storage, [Pos])
765    end;
766
767insert_op(Tid, _, {op, del_index, Pos, TabDef}, InPlace, InitBy) ->
768    Cs = mnesia_schema:list2cs(TabDef),
769    Tab = Cs#cstruct.name,
770    Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
771    case InitBy of
772	startup when Storage == disc_only_copies ->
773	    mnesia_index:del_index_table(Tab, Storage, Pos);
774	startup ->
775	    ignore;
776	_ ->
777	    mnesia_index:del_index_table(Tab, Storage, Pos)
778    end,
779    insert_cstruct(Tid, Cs, true, InPlace, InitBy);
780
781insert_op(Tid, _, {op, change_table_access_mode,TabDef, _OldAccess, _Access}, InPlace, InitBy) ->
782    Cs = mnesia_schema:list2cs(TabDef),
783    case InitBy of
784	startup -> ignore;
785	_ -> mnesia_controller:change_table_access_mode(Cs)
786    end,
787    insert_cstruct(Tid, Cs, true, InPlace, InitBy);
788
789insert_op(Tid, _, {op, change_table_load_order, TabDef, _OldLevel, _Level}, InPlace, InitBy) ->
790    Cs = mnesia_schema:list2cs(TabDef),
791    insert_cstruct(Tid, Cs, true, InPlace, InitBy);
792
793insert_op(Tid, _, {op, delete_property, TabDef, PropKey}, InPlace, InitBy) ->
794    Cs = mnesia_schema:list2cs(TabDef),
795    Tab = Cs#cstruct.name,
796    mnesia_lib:unset({Tab, user_property, PropKey}),
797    insert_cstruct(Tid, Cs, true, InPlace, InitBy);
798
799insert_op(Tid, _, {op, write_property, TabDef, _Prop}, InPlace, InitBy) ->
800    Cs = mnesia_schema:list2cs(TabDef),
801    insert_cstruct(Tid, Cs, true, InPlace, InitBy);
802
803insert_op(Tid, _, {op, change_table_frag, _Change, TabDef}, InPlace, InitBy) ->
804    Cs = mnesia_schema:list2cs(TabDef),
805    insert_cstruct(Tid, Cs, true, InPlace, InitBy).
806
807open_files(Tab, Storage, UpdateInPlace, InitBy)
808  when Storage /= unknown, Storage /= ram_copies ->
809    case get({?MODULE, Tab}) of
810	undefined ->
811	    case ?catch_val({Tab, setorbag}) of
812		{'EXIT', _} ->
813		    false;
814		Type ->
815		    case Storage of
816			disc_copies when Tab /= schema ->
817			    Bool = open_disc_copies(Tab, InitBy),
818			    Bool;
819			_ ->
820			    Fname = prepare_open(Tab, UpdateInPlace),
821			    Args = [{file, Fname},
822				    {keypos, 2},
823				    {repair, mnesia_monitor:get_env(auto_repair)},
824				    {type, mnesia_lib:disk_type(Tab, Type)}],
825			    {ok, _} = mnesia_monitor:open_dets(Tab, Args),
826			    put({?MODULE, Tab}, {opened_dumper, dat}),
827			    true
828		    end
829	    end;
830	already_dumped ->
831	    false;
832	{opened_dumper, _} ->
833	    true
834    end;
835open_files(_Tab, _Storage, _UpdateInPlace, _InitBy) ->
836    false.
837
838open_disc_copies(Tab, InitBy) ->
839    DclF = mnesia_lib:tab2dcl(Tab),
840    DumpEts =
841	case file:read_file_info(DclF) of
842	    {error, enoent} ->
843		false;
844	    {ok, DclInfo} ->
845		DcdF =  mnesia_lib:tab2dcd(Tab),
846		case file:read_file_info(DcdF) of
847		    {error, Reason} ->
848			mnesia_lib:dbg_out("File ~p info_error ~p ~n",
849					   [DcdF, Reason]),
850			true;
851		    {ok, DcdInfo} ->
852			DcdInfo#file_info.size =<
853			    (DclInfo#file_info.size *
854			     ?DumpToEtsMultiplier)
855		end
856	end,
857    if
858	DumpEts == false; InitBy == startup ->
859	    mnesia_log:open_log({?MODULE,Tab},
860				mnesia_log:dcl_log_header(),
861				DclF,
862				mnesia_lib:exists(DclF),
863				mnesia_monitor:get_env(auto_repair),
864				read_write),
865	    put({?MODULE, Tab}, {opened_dumper, dcl}),
866	    true;
867	true ->
868	    mnesia_log:ets2dcd(Tab),
869	    put({?MODULE, Tab}, already_dumped),
870	    false
871    end.
872
873%% Always opens the dcl file for writing overriding already_dumped
874%% mechanismen, used for schema transactions.
875open_dcl(Tab) ->
876    case get({?MODULE, Tab}) of
877	{opened_dumper, _} ->
878	    true;
879	_ -> %% undefined or already_dumped
880	    DclF = mnesia_lib:tab2dcl(Tab),
881	    mnesia_log:open_log({?MODULE,Tab},
882				mnesia_log:dcl_log_header(),
883				DclF,
884				mnesia_lib:exists(DclF),
885				mnesia_monitor:get_env(auto_repair),
886				read_write),
887	    put({?MODULE, Tab}, {opened_dumper, dcl}),
888	    true
889    end.
890
891prepare_open(Tab, UpdateInPlace) ->
892    Dat =  mnesia_lib:tab2dat(Tab),
893    case UpdateInPlace of
894	true ->
895	    Dat;
896	false ->
897	    Tmp = mnesia_lib:tab2tmp(Tab),
898	    case catch mnesia_lib:copy_file(Dat, Tmp) of
899		ok ->
900		    Tmp;
901		Error ->
902		    fatal("Cannot copy dets file ~p to ~p: ~p~n",
903			  [Dat, Tmp, Error])
904	    end
905	end.
906
907del_opened_tab(Tab) ->
908    erase({?MODULE, Tab}).
909
910close_files(UpdateInPlace, Outcome, InitBy) -> % Update in place
911    close_files(UpdateInPlace, Outcome, InitBy, get()).
912
913close_files(InPlace, Outcome, InitBy, [{{?MODULE, Tab}, already_dumped} | Tail]) ->
914    erase({?MODULE, Tab}),
915    close_files(InPlace, Outcome, InitBy, Tail);
916close_files(InPlace, Outcome, InitBy, [{{?MODULE, Tab}, {opened_dumper, Type}} | Tail]) ->
917    erase({?MODULE, Tab}),
918    case val({Tab, storage_type}) of
919	disc_only_copies when InitBy /= startup ->
920	    ignore;
921	disc_copies when Tab /= schema ->
922	    mnesia_log:close_log({?MODULE,Tab});
923	Storage ->
924	    do_close(InPlace, Outcome, Tab, Type, Storage)
925    end,
926    close_files(InPlace, Outcome, InitBy, Tail);
927
928close_files(InPlace, Outcome, InitBy, [_ | Tail]) ->
929    close_files(InPlace, Outcome, InitBy, Tail);
930close_files(_, _, _InitBy, []) ->
931    ok.
932
933%% If storage is unknown during close clean up files, this can happen if timing
934%% is right and dirty_write conflicts with schema operations.
935do_close(_, _, Tab, dcl, unknown) ->
936    mnesia_log:close_log({?MODULE,Tab}),
937    file:delete(mnesia_lib:tab2dcl(Tab));
938do_close(_, _, Tab, dcl, _) ->  %% To be safe, can it happen?
939    mnesia_log:close_log({?MODULE,Tab});
940
941do_close(InPlace, Outcome, Tab, dat, Storage) ->
942    mnesia_monitor:close_dets(Tab),
943    if
944	Storage == unknown, InPlace == true  ->
945	    file:delete(mnesia_lib:tab2dat(Tab));
946	InPlace == true ->
947	    %% Update in place
948	    ok;
949	Outcome == ok, Storage /= unknown ->
950	    %% Success: swap tmp files with dat files
951	    TabDat = mnesia_lib:tab2dat(Tab),
952	    ok = file:rename(mnesia_lib:tab2tmp(Tab), TabDat);
953	true ->
954	    file:delete(mnesia_lib:tab2tmp(Tab))
955    end.
956
957
958ensure_rename(From, To) ->
959    case mnesia_lib:exists(From) of
960	true ->
961	    file:rename(From, To);
962	false ->
963	    case mnesia_lib:exists(To) of
964		true ->
965		    ok;
966		false ->
967		    {error, {rename_failed, From, To}}
968	    end
969    end.
970
971insert_cstruct(Tid, Cs, KeepWhereabouts, InPlace, InitBy) ->
972    Val = mnesia_schema:insert_cstruct(Tid, Cs, KeepWhereabouts),
973    {schema, Tab, _} = Val,
974    S = val({schema, storage_type}),
975    disc_insert(Tid, S, schema, Tab, Val, write, InPlace, InitBy),
976    Tab.
977
978delete_cstruct(Tid, Cs, InPlace, InitBy) ->
979    Val = mnesia_schema:delete_cstruct(Tid, Cs),
980    {schema, Tab, _} = Val,
981    S = val({schema, storage_type}),
982    disc_insert(Tid, S, schema, Tab, Val, delete, InPlace, InitBy),
983    Tab.
984
985%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
986%% Raw dump of table. Dumper must have unique access to the ets table.
987
988raw_named_dump_table(Tab, Ftype) ->
989    case mnesia_monitor:use_dir() of
990	true ->
991	    mnesia_lib:lock_table(Tab),
992	    TmpFname = mnesia_lib:tab2tmp(Tab),
993	    Fname =
994		case Ftype of
995		    dat -> mnesia_lib:tab2dat(Tab);
996		    dmp -> mnesia_lib:tab2dmp(Tab)
997		end,
998	    file:delete(TmpFname),
999	    file:delete(Fname),
1000	    TabSize = ?ets_info(Tab, size),
1001	    TabRef = Tab,
1002	    DiskType = mnesia_lib:disk_type(Tab),
1003	    Args = [{file, TmpFname},
1004		    {keypos, 2},
1005		    %%		    {ram_file, true},
1006		    {estimated_no_objects, TabSize + 256},
1007		    {repair, mnesia_monitor:get_env(auto_repair)},
1008		    {type, DiskType}],
1009	    case mnesia_lib:dets_sync_open(TabRef, Args) of
1010		{ok, TabRef} ->
1011		    Storage = ram_copies,
1012		    mnesia_lib:db_fixtable(Storage, Tab, true),
1013
1014		    case catch raw_dump_table(TabRef, Tab) of
1015			{'EXIT', Reason} ->
1016			    mnesia_lib:db_fixtable(Storage, Tab, false),
1017			    mnesia_lib:dets_sync_close(Tab),
1018			    file:delete(TmpFname),
1019			    mnesia_lib:unlock_table(Tab),
1020			    exit({"Dump of table to disc failed", Reason});
1021			ok ->
1022			    mnesia_lib:db_fixtable(Storage, Tab, false),
1023			    mnesia_lib:dets_sync_close(Tab),
1024			    mnesia_lib:unlock_table(Tab),
1025			    ok = file:rename(TmpFname, Fname)
1026		    end;
1027		{error, Reason} ->
1028		    mnesia_lib:unlock_table(Tab),
1029		    exit({"Open of file before dump to disc failed", Reason})
1030	    end;
1031	false ->
1032	    exit({has_no_disc, node()})
1033    end.
1034
1035raw_dump_table(DetsRef, EtsRef) ->
1036    dets:from_ets(DetsRef, EtsRef).
1037
1038%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1039%% Load regulator
1040%%
1041%% This is a poor mans substitute for a fair scheduler algorithm
1042%% in the Erlang emulator. The mnesia_dumper process performs many
1043%% costly BIF invokations and must pay for this. But since the
1044%% Emulator does not handle this properly we must compensate for
1045%% this with some form of load regulation of ourselves in order to
1046%% not steal all computation power in the Erlang Emulator ans make
1047%% other processes starve. Hopefully this is a temporary solution.
1048
1049start_regulator() ->
1050    case mnesia_monitor:get_env(dump_log_load_regulation) of
1051	false ->
1052	    nopid;
1053	true ->
1054	    N = ?REGULATOR_NAME,
1055	    case mnesia_monitor:start_proc(N, ?MODULE, regulator_init, [self()]) of
1056		{ok, Pid} ->
1057		    Pid;
1058		{error, Reason} ->
1059		    fatal("Failed to start ~n: ~p~n", [N, Reason])
1060	    end
1061    end.
1062
1063regulator_init(Parent) ->
1064    %% No need for trapping exits.
1065    %% Using low priority causes the regulation
1066    process_flag(priority, low),
1067    register(?REGULATOR_NAME, self()),
1068    proc_lib:init_ack(Parent, {ok, self()}),
1069    regulator_loop().
1070
1071regulator_loop() ->
1072    receive
1073	{regulate, From} ->
1074	    From ! {regulated, self()},
1075	    regulator_loop();
1076	{stop, From} ->
1077	    From ! {stopped, self()},
1078	    exit(normal)
1079    end.
1080
1081regulate(nopid) ->
1082    ok;
1083regulate(RegulatorPid) ->
1084    RegulatorPid ! {regulate, self()},
1085    receive
1086	{regulated, RegulatorPid} -> ok
1087    end.
1088
1089val(Var) ->
1090    case ?catch_val(Var) of
1091	{'EXIT', Reason} -> mnesia_lib:other_val(Var, Reason);
1092	Value -> Value
1093    end.
1094