1%% ``Licensed under the Apache License, Version 2.0 (the "License");
2%% you may not use this file except in compliance with the License.
3%% You may obtain a copy of the License at
4%%
5%%     http://www.apache.org/licenses/LICENSE-2.0
6%%
7%% Unless required by applicable law or agreed to in writing, software
8%% distributed under the License is distributed on an "AS IS" BASIS,
9%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10%% See the License for the specific language governing permissions and
11%% limitations under the License.
12%%
13%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
14%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
15%% AB. All Rights Reserved.''
16%%
17%%     $Id: mnesia_loader.erl,v 1.2 2010/03/04 13:54:19 maria Exp $
18%%% Purpose : Loads tables from local disc or from remote node
19
20-module(mnesia_loader).
21
22%% Mnesia internal stuff
23-export([disc_load_table/2,
24	 net_load_table/4,
25	 send_table/3]).
26
27-export([old_node_init_table/6]). %% Spawned old node protocol conversion hack
28-export([spawned_receiver/8]).    %% Spawned lock taking process
29
30-import(mnesia_lib, [set/2, fatal/2, verbose/2, dbg_out/2]).
31
32-include("mnesia.hrl").
33
34val(Var) ->
35    case ?catch_val(Var) of
36	{'EXIT', Reason} -> mnesia_lib:other_val(Var, Reason);
37	Value -> Value
38    end.
39
40%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
41%% Load a table from local disc
42
43disc_load_table(Tab, Reason) ->
44    Storage =  val({Tab, storage_type}),
45    Type = val({Tab, setorbag}),
46    dbg_out("Getting table ~p (~p) from disc: ~p~n",
47	    [Tab, Storage, Reason]),
48    ?eval_debug_fun({?MODULE, do_get_disc_copy},
49		    [{tab, Tab},
50		     {reason, Reason},
51		     {storage, Storage},
52		     {type, Type}]),
53    do_get_disc_copy2(Tab, Reason, Storage, Type).
54
55do_get_disc_copy2(Tab, _Reason, Storage, _Type) when Storage == unknown ->
56    verbose("Local table copy of ~p has recently been deleted, ignored.~n",
57	    [Tab]),
58    {loaded, ok};  %% ?
59do_get_disc_copy2(Tab, Reason, Storage, Type) when Storage == disc_copies ->
60    %% NOW we create the actual table
61    Repair = mnesia_monitor:get_env(auto_repair),
62    Args = [{keypos, 2}, public, named_table, Type],
63    case Reason of
64	{dumper, _} -> %% Resources already allocated
65	    ignore;
66	_ ->
67	    mnesia_monitor:mktab(Tab, Args),
68	    Count = mnesia_log:dcd2ets(Tab, Repair),
69	    case ets:info(Tab, size) of
70		X when X < Count * 4 ->
71		    ok = mnesia_log:ets2dcd(Tab);
72		_ ->
73		    ignore
74	    end
75    end,
76    mnesia_index:init_index(Tab, Storage),
77    snmpify(Tab, Storage),
78    set({Tab, load_node}, node()),
79    set({Tab, load_reason}, Reason),
80    {loaded, ok};
81
82do_get_disc_copy2(Tab, Reason, Storage, Type) when Storage == ram_copies ->
83    Args = [{keypos, 2}, public, named_table, Type],
84    case Reason of
85	{dumper, _} -> %% Resources already allocated
86	    ignore;
87	_ ->
88	    mnesia_monitor:mktab(Tab, Args),
89	    Fname = mnesia_lib:tab2dcd(Tab),
90	    Datname = mnesia_lib:tab2dat(Tab),
91	    Repair = mnesia_monitor:get_env(auto_repair),
92	    case mnesia_monitor:use_dir() of
93		true ->
94		    case mnesia_lib:exists(Fname) of
95			true -> mnesia_log:dcd2ets(Tab, Repair);
96			false ->
97			    case mnesia_lib:exists(Datname) of
98				true ->
99				    mnesia_lib:dets_to_ets(Tab, Tab, Datname,
100							   Type, Repair, no);
101				false ->
102				    false
103			    end
104		    end;
105		false ->
106		    false
107	    end
108    end,
109    mnesia_index:init_index(Tab, Storage),
110    snmpify(Tab, Storage),
111    set({Tab, load_node}, node()),
112    set({Tab, load_reason}, Reason),
113    {loaded, ok};
114
115do_get_disc_copy2(Tab, Reason, Storage, Type) when Storage == disc_only_copies ->
116    Args = [{file, mnesia_lib:tab2dat(Tab)},
117	    {type, mnesia_lib:disk_type(Tab, Type)},
118	    {keypos, 2},
119	    {repair, mnesia_monitor:get_env(auto_repair)}],
120    case Reason of
121	{dumper, _} ->
122	    mnesia_index:init_index(Tab, Storage),
123	    snmpify(Tab, Storage),
124	    set({Tab, load_node}, node()),
125	    set({Tab, load_reason}, Reason),
126	    {loaded, ok};
127	_ ->
128	    case mnesia_monitor:open_dets(Tab, Args) of
129		{ok, _} ->
130		    mnesia_index:init_index(Tab, Storage),
131		    snmpify(Tab, Storage),
132		    set({Tab, load_node}, node()),
133		    set({Tab, load_reason}, Reason),
134		    {loaded, ok};
135		{error, Error} ->
136		    {not_loaded, {"Failed to create dets table", Error}}
137	    end
138    end.
139
140%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
141%% Load a table from a remote node
142%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
143%%
144%% Receiver                             Sender
145%% --------                             ------
146%% Grab schema lock on table
147%%                                      Determine table size
148%% Create empty pre-grown table
149%%                                      Grab read lock on table
150%%                                      Let receiver subscribe on updates done on sender node
151%%                                      Disable rehashing of table
152%%                                      Release read lock on table
153%%                                      Send table to receiver in chunks
154%%
155%%                                      Grab read lock on table
156%% Block dirty updates
157%%                                      Update wherabouts
158%%
159%%                                      Cancel the update subscription
160%% Process the subscription events
161%% Optionally dump to disc
162%% Unblock dirty updates
163%%                                      Release read lock on table
164%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
165
166-define(MAX_TRANSFER_SIZE, 7500).
167-define(MAX_RAM_FILE_SIZE, 1000000).
168-define(MAX_RAM_TRANSFERS, (?MAX_RAM_FILE_SIZE div ?MAX_TRANSFER_SIZE) + 1).
169-define(MAX_NOPACKETS, 20).
170
171net_load_table(Tab, Reason, Ns, Cs)
172        when Reason == {dumper,add_table_copy} ->
173    try_net_load_table(Tab, Reason, Ns, Cs);
174net_load_table(Tab, Reason, Ns, _Cs) ->
175    try_net_load_table(Tab, Reason, Ns, val({Tab, cstruct})).
176
177try_net_load_table(Tab, _Reason, [], _Cs) ->
178    verbose("Copy failed. No active replicas of ~p are available.~n", [Tab]),
179    {not_loaded, none_active};
180try_net_load_table(Tab, Reason, Ns, Cs) ->
181    Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
182    do_get_network_copy(Tab, Reason, Ns, Storage, Cs).
183
184do_get_network_copy(Tab, _Reason, _Ns, unknown, _Cs) ->
185    verbose("Local table copy of ~p has recently been deleted, ignored.~n", [Tab]),
186    {not_loaded, storage_unknown};
187do_get_network_copy(Tab, Reason, Ns, Storage, Cs) ->
188    [Node | Tail] = Ns,
189    dbg_out("Getting table ~p (~p) from node ~p: ~p~n",
190	    [Tab, Storage, Node, Reason]),
191    ?eval_debug_fun({?MODULE, do_get_network_copy},
192		    [{tab, Tab}, {reason, Reason},
193		     {nodes, Ns}, {storage, Storage}]),
194    mnesia_controller:start_remote_sender(Node, Tab, self(), Storage),
195    put(mnesia_table_sender_node, {Tab, Node}),
196    case init_receiver(Node, Tab, Storage, Cs, Reason) of
197	ok ->
198	    set({Tab, load_node}, Node),
199	    set({Tab, load_reason}, Reason),
200	    mnesia_controller:i_have_tab(Tab),
201	    dbg_out("Table ~p copied from ~p to ~p~n", [Tab, Node, node()]),
202	    {loaded, ok};
203	Err = {error, _} when element(1, Reason) == dumper ->
204	    {not_loaded,Err};
205	restart ->
206	    try_net_load_table(Tab, Reason, Tail, Cs);
207	down ->
208	    try_net_load_table(Tab, Reason, Tail, Cs)
209    end.
210
211snmpify(Tab, Storage) ->
212    do_snmpify(Tab, val({Tab, snmp}), Storage).
213
214do_snmpify(_Tab, [], _Storage) ->
215    ignore;
216do_snmpify(Tab, Us, Storage) ->
217    Snmp = mnesia_snmp_hook:create_table(Us, Tab, Storage),
218    set({Tab, {index, snmp}}, Snmp).
219
220%% Start the recieiver
221%% Sender should be started first, so we don't have the schema-read
222%% lock to long (or get stuck in a deadlock)
223init_receiver(Node, Tab, Storage, Cs, Reason) ->
224    receive
225	{SenderPid, {first, TabSize}} ->
226	    spawn_receiver(Tab,Storage,Cs,SenderPid,
227			   TabSize,false,Reason);
228	{SenderPid, {first, TabSize, DetsData}} ->
229	    spawn_receiver(Tab,Storage,Cs,SenderPid,
230			   TabSize,DetsData,Reason);
231	%% Protocol conversion hack
232	{copier_done, Node} ->
233	    dbg_out("Sender of table ~p crashed on node ~p ~n", [Tab, Node]),
234	    down(Tab, Storage)
235    end.
236
237
238table_init_fun(SenderPid) ->
239    PConv = mnesia_monitor:needs_protocol_conversion(node(SenderPid)),
240    MeMyselfAndI = self(),
241    fun(read) ->
242	    Receiver =
243		if
244		    PConv == true ->
245			MeMyselfAndI ! {actual_tabrec, self()},
246			MeMyselfAndI; %% Old mnesia
247		    PConv == false -> self()
248		end,
249	    SenderPid ! {Receiver, more},
250	    get_data(SenderPid, Receiver)
251    end.
252
253
254%% Add_table_copy get's it's own locks.
255spawn_receiver(Tab,Storage,Cs,SenderPid,TabSize,DetsData,{dumper,add_table_copy}) ->
256    Init = table_init_fun(SenderPid),
257    case do_init_table(Tab,Storage,Cs,SenderPid,TabSize,DetsData,self(), Init) of
258	Err = {error, _} ->
259	    SenderPid ! {copier_done, node()},
260	    Err;
261	Else ->
262	    Else
263    end;
264
265spawn_receiver(Tab,Storage,Cs,SenderPid,
266	       TabSize,DetsData,Reason) ->
267    %% Grab a schema lock to avoid deadlock between table_loader and schema_commit dumping.
268    %% Both may grab tables-locks in different order.
269    Load = fun() ->
270		   {_,Tid,Ts} = get(mnesia_activity_state),
271		   mnesia_locker:rlock(Tid, Ts#tidstore.store,
272				       {schema, Tab}),
273		   Init = table_init_fun(SenderPid),
274		   Pid = spawn_link(?MODULE, spawned_receiver,
275				    [self(),Tab,Storage,Cs,
276				     SenderPid,TabSize,DetsData,
277				     Init]),
278		   put(mnesia_real_loader, Pid),
279		   wait_on_load_complete(Pid)
280	   end,
281    Res = case mnesia:transaction(Load, 20) of
282	      {'atomic', {error,Result}} when element(1,Reason) == dumper ->
283		  SenderPid ! {copier_done, node()},
284		  {error,Result};
285	      {'atomic', {error,Result}} ->
286		  SenderPid ! {copier_done, node()},
287		  fatal("Cannot create table ~p: ~p~n",
288			[[Tab, Storage], Result]);
289	      {'atomic', Result} -> Result;
290	      {aborted, nomore} ->
291		  SenderPid ! {copier_done, node()},
292		  restart;
293	      {aborted, _ } ->
294		  SenderPid ! {copier_done, node()},
295		  down  %% either this node or sender is dying
296	  end,
297    unlink(whereis(mnesia_tm)),  %% Avoid late unlink from tm
298    Res.
299
300spawned_receiver(ReplyTo,Tab,Storage,Cs,
301		 SenderPid,TabSize,DetsData, Init) ->
302    process_flag(trap_exit, true),
303    Done = do_init_table(Tab,Storage,Cs,
304			 SenderPid,TabSize,DetsData,
305			 ReplyTo, Init),
306    ReplyTo ! {self(),Done},
307    unlink(ReplyTo),
308    unlink(whereis(mnesia_controller)),
309    exit(normal).
310
311wait_on_load_complete(Pid) ->
312    receive
313	{Pid, Res} ->
314	    Res;
315	{'EXIT', Pid, Reason} ->
316	    exit(Reason);
317	Else ->
318	    Pid ! Else,
319	    wait_on_load_complete(Pid)
320    end.
321
322tab_receiver(Node, Tab, Storage, Cs, PConv, OrigTabRec) ->
323    receive
324	{SenderPid, {no_more, DatBin}} when PConv == false ->
325	    finish_copy(Storage,Tab,Cs,SenderPid,DatBin,OrigTabRec);
326
327	%% Protocol conversion hack
328	{SenderPid, {no_more, DatBin}} when pid(PConv) ->
329	    PConv ! {SenderPid, no_more},
330	    receive
331		{old_init_table_complete, ok} ->
332		    finish_copy(Storage, Tab, Cs, SenderPid, DatBin,OrigTabRec);
333		{old_init_table_complete, Reason} ->
334		    Msg = "OLD: [d]ets:init table failed",
335		    dbg_out("~s: ~p: ~p~n", [Msg, Tab, Reason]),
336		    down(Tab, Storage)
337	    end;
338
339	{actual_tabrec, Pid} ->
340	    tab_receiver(Node, Tab, Storage, Cs, Pid,OrigTabRec);
341
342	{SenderPid, {more, [Recs]}} when pid(PConv) ->
343	    PConv ! {SenderPid, {more, Recs}}, %% Forward Msg to OldNodes
344	    tab_receiver(Node, Tab, Storage, Cs, PConv,OrigTabRec);
345
346	{'EXIT', PConv, Reason} ->  %% [d]ets:init process crashed
347	    Msg = "Receiver crashed",
348	    dbg_out("~s: ~p: ~p~n", [Msg, Tab, Reason]),
349	    down(Tab, Storage);
350
351	%% Protocol conversion hack
352	{copier_done, Node} ->
353	    dbg_out("Sender of table ~p crashed on node ~p ~n", [Tab, Node]),
354	    down(Tab, Storage);
355
356	{'EXIT', Pid, Reason} ->
357	    handle_exit(Pid, Reason),
358	    tab_receiver(Node, Tab, Storage, Cs, PConv,OrigTabRec)
359    end.
360
361create_table(Tab, TabSize, Storage, Cs) ->
362    if
363	Storage == disc_only_copies ->
364	    mnesia_lib:lock_table(Tab),
365	    Tmp = mnesia_lib:tab2tmp(Tab),
366	    Size = lists:max([TabSize, 256]),
367	    Args = [{file, Tmp},
368		    {keypos, 2},
369%%		    {ram_file, true},
370		    {estimated_no_objects, Size},
371		    {repair, mnesia_monitor:get_env(auto_repair)},
372		    {type, mnesia_lib:disk_type(Tab, Cs#cstruct.type)}],
373	    file:delete(Tmp),
374	    case mnesia_lib:dets_sync_open(Tab, Args) of
375		{ok, _} ->
376		    mnesia_lib:unlock_table(Tab),
377		    {Storage, Tab};
378		Else ->
379		    mnesia_lib:unlock_table(Tab),
380		    Else
381	    end;
382	(Storage == ram_copies) or (Storage == disc_copies) ->
383	    Args = [{keypos, 2}, public, named_table, Cs#cstruct.type],
384	    case mnesia_monitor:unsafe_mktab(Tab, Args) of
385		Tab ->
386		    {Storage, Tab};
387		Else ->
388		    Else
389	    end
390    end.
391
392do_init_table(Tab,Storage,Cs,SenderPid,
393	      TabSize,DetsInfo,OrigTabRec,Init) ->
394    case create_table(Tab, TabSize, Storage, Cs) of
395	{Storage,Tab} ->
396	    %% Debug info
397	    Node = node(SenderPid),
398	    put(mnesia_table_receiver, {Tab, Node, SenderPid}),
399	    mnesia_tm:block_tab(Tab),
400	    PConv = mnesia_monitor:needs_protocol_conversion(Node),
401
402	    case init_table(Tab,Storage,Init,PConv,DetsInfo,SenderPid) of
403		ok ->
404		    tab_receiver(Node,Tab,Storage,Cs,PConv,OrigTabRec);
405		Reason ->
406		    Msg = "[d]ets:init table failed",
407		    dbg_out("~s: ~p: ~p~n", [Msg, Tab, Reason]),
408		    down(Tab, Storage)
409	    end;
410	Error ->
411	    Error
412    end.
413
414make_table_fun(Pid, TabRec) ->
415    fun(close) ->
416	    ok;
417       (read) ->
418	    get_data(Pid, TabRec)
419    end.
420
421get_data(Pid, TabRec) ->
422    receive
423	{Pid, {more, Recs}} ->
424	    Pid ! {TabRec, more},
425	    {Recs, make_table_fun(Pid,TabRec)};
426	{Pid, no_more} ->
427	    end_of_input;
428	{copier_done, Node} ->
429	    case node(Pid) of
430		Node ->
431		    {copier_done, Node};
432		_ ->
433		    get_data(Pid, TabRec)
434	    end;
435	{'EXIT', Pid, Reason} ->
436	    handle_exit(Pid, Reason),
437	    get_data(Pid, TabRec)
438    end.
439
440init_table(Tab, disc_only_copies, Fun, false, DetsInfo,Sender) ->
441    ErtsVer = erlang:system_info(version),
442    case DetsInfo of
443	{ErtsVer, DetsData}  ->
444	    Res = (catch dets:is_compatible_bchunk_format(Tab, DetsData)),
445	    case Res of
446		{'EXIT',{undef,[{dets,_,_}|_]}} ->
447		    Sender ! {self(), {old_protocol, Tab}},
448		    dets:init_table(Tab, Fun);  %% Old dets version
449		{'EXIT', What} ->
450		    exit(What);
451		false ->
452		    Sender ! {self(), {old_protocol, Tab}},
453		    dets:init_table(Tab, Fun);  %% Old dets version
454		true ->
455		    dets:init_table(Tab, Fun, [{format, bchunk}])
456	    end;
457	Old when Old /= false ->
458	    Sender ! {self(), {old_protocol, Tab}},
459	    dets:init_table(Tab, Fun);  %% Old dets version
460	_ ->
461	    dets:init_table(Tab, Fun)
462    end;
463init_table(Tab, _, Fun, false, _DetsInfo,_) ->
464    case catch ets:init_table(Tab, Fun) of
465	true ->
466	    ok;
467	{'EXIT', Else} -> Else
468    end;
469init_table(Tab, Storage, Fun, true, _DetsInfo, Sender) ->  %% Old Nodes
470    spawn_link(?MODULE, old_node_init_table,
471	       [Tab, Storage, Fun, self(), false, Sender]),
472    ok.
473
474old_node_init_table(Tab, Storage, Fun, TabReceiver, DetsInfo,Sender) ->
475    Res = init_table(Tab, Storage, Fun, false, DetsInfo,Sender),
476    TabReceiver ! {old_init_table_complete, Res},
477    unlink(TabReceiver),
478    ok.
479
480finish_copy(Storage,Tab,Cs,SenderPid,DatBin,OrigTabRec) ->
481    TabRef = {Storage, Tab},
482    subscr_receiver(TabRef, Cs#cstruct.record_name),
483    case handle_last(TabRef, Cs#cstruct.type, DatBin) of
484	ok ->
485	    mnesia_index:init_index(Tab, Storage),
486	    snmpify(Tab, Storage),
487	    %% OrigTabRec must not be the spawned tab-receiver
488	    %% due to old protocol.
489	    SenderPid ! {OrigTabRec, no_more},
490	    mnesia_tm:unblock_tab(Tab),
491	    ok;
492	{error, Reason} ->
493	    Msg = "Failed to handle last",
494	    dbg_out("~s: ~p: ~p~n", [Msg, Tab, Reason]),
495	    down(Tab, Storage)
496    end.
497
498subscr_receiver(TabRef = {_, Tab}, RecName) ->
499    receive
500	{mnesia_table_event, {Op, Val, _Tid}} ->
501	    if
502		Tab == RecName ->
503		    handle_event(TabRef, Op, Val);
504		true ->
505		    handle_event(TabRef, Op, setelement(1, Val, RecName))
506	    end,
507	    subscr_receiver(TabRef, RecName);
508
509	{'EXIT', Pid, Reason} ->
510	    handle_exit(Pid, Reason),
511	    subscr_receiver(TabRef, RecName)
512    after 0 ->
513	    ok
514    end.
515
516handle_event(TabRef, write, Rec) ->
517    db_put(TabRef, Rec);
518handle_event(TabRef, delete, {_Tab, Key}) ->
519    db_erase(TabRef, Key);
520handle_event(TabRef, delete_object, OldRec) ->
521    db_match_erase(TabRef, OldRec);
522handle_event(TabRef, clear_table, {_Tab, _Key}) ->
523    db_match_erase(TabRef, '_').
524
525handle_last({disc_copies, Tab}, _Type, nobin) ->
526    Ret = mnesia_log:ets2dcd(Tab),
527    Fname = mnesia_lib:tab2dat(Tab),
528    case mnesia_lib:exists(Fname) of
529	true ->  %% Remove old .DAT files.
530	    file:delete(Fname);
531	false ->
532	    ok
533    end,
534    Ret;
535
536handle_last({disc_only_copies, Tab}, Type, nobin) ->
537    case mnesia_lib:swap_tmp_files([Tab]) of
538	[] ->
539	    Args = [{file, mnesia_lib:tab2dat(Tab)},
540		    {type, mnesia_lib:disk_type(Tab, Type)},
541		    {keypos, 2},
542		    {repair, mnesia_monitor:get_env(auto_repair)}],
543	    mnesia_monitor:open_dets(Tab, Args),
544	    ok;
545	L when list(L) ->
546	    {error, {"Cannot swap tmp files", Tab, L}}
547    end;
548
549handle_last({ram_copies, _Tab}, _Type, nobin) ->
550    ok;
551handle_last({ram_copies, Tab}, _Type, DatBin) ->
552    case mnesia_monitor:use_dir() of
553	true ->
554	    mnesia_lib:lock_table(Tab),
555	    Tmp = mnesia_lib:tab2tmp(Tab),
556	    ok = file:write_file(Tmp, DatBin),
557	    ok = file:rename(Tmp, mnesia_lib:tab2dcd(Tab)),
558	    mnesia_lib:unlock_table(Tab),
559	    ok;
560	false ->
561	    ok
562    end.
563
564down(Tab, Storage) ->
565    case Storage of
566	ram_copies ->
567	    catch ?ets_delete_table(Tab);
568	disc_copies ->
569	    catch ?ets_delete_table(Tab);
570	disc_only_copies ->
571	    mnesia_lib:cleanup_tmp_files([Tab])
572    end,
573    mnesia_checkpoint:tm_del_copy(Tab, node()),
574    mnesia_controller:sync_del_table_copy_whereabouts(Tab, node()),
575    mnesia_tm:unblock_tab(Tab),
576    flush_subcrs(),
577    down.
578
579flush_subcrs() ->
580    receive
581	{mnesia_table_event, _} ->
582	    flush_subcrs();
583
584	{'EXIT', Pid, Reason} ->
585	    handle_exit(Pid, Reason),
586	    flush_subcrs()
587    after 0 ->
588	    done
589    end.
590
591db_erase({ram_copies, Tab}, Key) ->
592    true = ?ets_delete(Tab, Key);
593db_erase({disc_copies, Tab}, Key) ->
594    true = ?ets_delete(Tab, Key);
595db_erase({disc_only_copies, Tab}, Key) ->
596    ok = dets:delete(Tab, Key).
597
598db_match_erase({ram_copies, Tab} , Pat) ->
599    true = ?ets_match_delete(Tab, Pat);
600db_match_erase({disc_copies, Tab} , Pat) ->
601    true = ?ets_match_delete(Tab, Pat);
602db_match_erase({disc_only_copies, Tab}, Pat) ->
603    ok = dets:match_delete(Tab, Pat).
604
605db_put({ram_copies, Tab}, Val) ->
606    true = ?ets_insert(Tab, Val);
607db_put({disc_copies, Tab}, Val) ->
608    true = ?ets_insert(Tab, Val);
609db_put({disc_only_copies, Tab}, Val) ->
610    ok = dets:insert(Tab, Val).
611
612%% This code executes at the remote site where the data is
613%% executes in a special copier process.
614
615calc_nokeys(Storage, Tab) ->
616    %% Calculate #keys per transfer
617    Key = mnesia_lib:db_first(Storage, Tab),
618    Recs = mnesia_lib:db_get(Storage, Tab, Key),
619    BinSize = size(term_to_binary(Recs)),
620    (?MAX_TRANSFER_SIZE div BinSize) + 1.
621
622send_table(Pid, Tab, RemoteS) ->
623    case ?catch_val({Tab, storage_type}) of
624	{'EXIT', _} ->
625	    {error, {no_exists, Tab}};
626	unknown ->
627	    {error, {no_exists, Tab}};
628	Storage ->
629	    %% Send first
630	    TabSize = mnesia:table_info(Tab, size),
631	    Pconvert = mnesia_monitor:needs_protocol_conversion(node(Pid)),
632	    KeysPerTransfer = calc_nokeys(Storage, Tab),
633	    ChunkData = dets:info(Tab, bchunk_format),
634
635	    UseDetsChunk =
636		Storage == RemoteS andalso
637		Storage == disc_only_copies andalso
638		ChunkData /= undefined andalso
639		Pconvert == false,
640	    if
641		UseDetsChunk == true ->
642		    DetsInfo = erlang:system_info(version),
643		    Pid ! {self(), {first, TabSize, {DetsInfo, ChunkData}}};
644		true  ->
645		    Pid ! {self(), {first, TabSize}}
646	    end,
647
648	    %% Debug info
649	    put(mnesia_table_sender, {Tab, node(Pid), Pid}),
650	    {Init, Chunk} = reader_funcs(UseDetsChunk, Tab, Storage, KeysPerTransfer),
651
652	    SendIt = fun() ->
653			     prepare_copy(Pid, Tab, Storage),
654			     send_more(Pid, 1, Chunk, Init(), Tab, Pconvert),
655			     finish_copy(Pid, Tab, Storage, RemoteS)
656		     end,
657
658	    case catch SendIt() of
659		receiver_died ->
660		    cleanup_tab_copier(Pid, Storage, Tab),
661		    unlink(whereis(mnesia_tm)),
662		    ok;
663		{_, receiver_died} ->
664		    unlink(whereis(mnesia_tm)),
665		    ok;
666		{'atomic', no_more} ->
667		    unlink(whereis(mnesia_tm)),
668		    ok;
669		Reason ->
670		    cleanup_tab_copier(Pid, Storage, Tab),
671		    unlink(whereis(mnesia_tm)),
672		    {error, Reason}
673	    end
674    end.
675
676prepare_copy(Pid, Tab, Storage) ->
677    Trans =
678	fun() ->
679		mnesia:write_lock_table(Tab),
680		mnesia_subscr:subscribe(Pid, {table, Tab}),
681		update_where_to_write(Tab, node(Pid)),
682		mnesia_lib:db_fixtable(Storage, Tab, true),
683		ok
684	end,
685    case mnesia:transaction(Trans) of
686	{'atomic', ok} ->
687	    ok;
688	{aborted, Reason} ->
689	    exit({tab_copier_prepare, Tab, Reason})
690    end.
691
692update_where_to_write(Tab, Node) ->
693    case val({Tab, access_mode}) of
694	read_only ->
695	    ignore;
696	read_write ->
697	    Current = val({current, db_nodes}),
698	    Ns =
699		case lists:member(Node, Current) of
700		    true -> Current;
701		    false -> [Node | Current]
702		end,
703	    update_where_to_write(Ns, Tab, Node)
704    end.
705
706update_where_to_write([], _, _) ->
707    ok;
708update_where_to_write([H|T], Tab, AddNode) ->
709    rpc:call(H,  mnesia_controller, call,
710	     [{update_where_to_write, [add, Tab, AddNode], self()}]),
711    update_where_to_write(T, Tab, AddNode).
712
713send_more(Pid, N, Chunk, DataState, Tab, OldNode) ->
714    receive
715	{NewPid, more} ->
716	    case send_packet(N - 1, NewPid, Chunk, DataState, OldNode) of
717		New when integer(New) ->
718		    New - 1;
719		NewData ->
720		    send_more(NewPid, ?MAX_NOPACKETS, Chunk, NewData, Tab, OldNode)
721	    end;
722	{_NewPid, {old_protocol, Tab}} ->
723	    Storage =  val({Tab, storage_type}),
724	    {Init, NewChunk} =
725		reader_funcs(false, Tab, Storage, calc_nokeys(Storage, Tab)),
726	    send_more(Pid, 1, NewChunk, Init(), Tab, OldNode);
727
728	{copier_done, Node} when Node == node(Pid)->
729	    verbose("Receiver of table ~p crashed on ~p (more)~n", [Tab, Node]),
730	    throw(receiver_died)
731    end.
732
733reader_funcs(UseDetsChunk, Tab, Storage, KeysPerTransfer) ->
734    case UseDetsChunk of
735	false ->
736	    {fun() -> mnesia_lib:db_init_chunk(Storage, Tab, KeysPerTransfer) end,
737	     fun(Cont) -> mnesia_lib:db_chunk(Storage, Cont) end};
738	true ->
739	    {fun() -> dets_bchunk(Tab, start) end,
740	     fun(Cont) -> dets_bchunk(Tab, Cont) end}
741    end.
742
743dets_bchunk(Tab, Chunk) -> %% Arrg
744    case dets:bchunk(Tab, Chunk) of
745	{Cont, Data} -> {Data, Cont};
746	Else -> Else
747    end.
748
749send_packet(N, Pid, _Chunk, '$end_of_table', OldNode) ->
750    case OldNode of
751	true -> ignore; %% Old nodes can't handle the new no_more
752	false ->  Pid ! {self(), no_more}
753    end,
754    N;
755send_packet(N, Pid, Chunk, {[], Cont}, OldNode) ->
756    send_packet(N, Pid, Chunk, Chunk(Cont), OldNode);
757send_packet(N, Pid, Chunk, {Recs, Cont}, OldNode) when N < ?MAX_NOPACKETS ->
758    case OldNode of
759	true -> Pid ! {self(), {more, [Recs]}}; %% Old need's wrapping list
760	false -> Pid ! {self(), {more, Recs}}
761    end,
762    send_packet(N+1, Pid, Chunk, Chunk(Cont), OldNode);
763send_packet(_N, _Pid, _Chunk, DataState, _OldNode) ->
764    DataState.
765
766finish_copy(Pid, Tab, Storage, RemoteS) ->
767    RecNode = node(Pid),
768    DatBin = dat2bin(Tab, Storage, RemoteS),
769    Trans =
770	fun() ->
771		mnesia:read_lock_table(Tab),
772		A = val({Tab, access_mode}),
773		mnesia_controller:sync_and_block_table_whereabouts(Tab, RecNode, RemoteS, A),
774		cleanup_tab_copier(Pid, Storage, Tab),
775		mnesia_checkpoint:tm_add_copy(Tab, RecNode),
776		Pid ! {self(), {no_more, DatBin}},
777		receive
778		    {Pid, no_more} -> % Dont bother about the spurious 'more' message
779			no_more;
780		    {copier_done, Node} when Node == node(Pid)->
781			verbose("Tab receiver ~p crashed (more): ~p~n", [Tab, Node]),
782			receiver_died
783		end
784	end,
785    mnesia:transaction(Trans).
786
787cleanup_tab_copier(Pid, Storage, Tab) ->
788    mnesia_lib:db_fixtable(Storage, Tab, false),
789    mnesia_subscr:unsubscribe(Pid, {table, Tab}).
790
791dat2bin(Tab, ram_copies, ram_copies) ->
792    mnesia_lib:lock_table(Tab),
793    Res = file:read_file(mnesia_lib:tab2dcd(Tab)),
794    mnesia_lib:unlock_table(Tab),
795    case Res of
796	{ok, DatBin} -> DatBin;
797	_ -> nobin
798    end;
799dat2bin(_Tab, _LocalS, _RemoteS) ->
800    nobin.
801
802handle_exit(Pid, Reason) when node(Pid) == node() ->
803    exit(Reason);
804handle_exit(_Pid, _Reason) ->  %% Not from our node, this will be handled by
805    ignore.                  %% mnesia_down soon.
806