1%% ``Licensed under the Apache License, Version 2.0 (the "License"); 2%% you may not use this file except in compliance with the License. 3%% You may obtain a copy of the License at 4%% 5%% http://www.apache.org/licenses/LICENSE-2.0 6%% 7%% Unless required by applicable law or agreed to in writing, software 8%% distributed under the License is distributed on an "AS IS" BASIS, 9%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 10%% See the License for the specific language governing permissions and 11%% limitations under the License. 12%% 13%% The Initial Developer of the Original Code is Ericsson Utvecklings AB. 14%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings 15%% AB. All Rights Reserved.'' 16%% 17%% $Id: mnesia_loader.erl,v 1.2 2010/03/04 13:54:19 maria Exp $ 18%%% Purpose : Loads tables from local disc or from remote node 19 20-module(mnesia_loader). 21 22%% Mnesia internal stuff 23-export([disc_load_table/2, 24 net_load_table/4, 25 send_table/3]). 26 27-export([old_node_init_table/6]). %% Spawned old node protocol conversion hack 28-export([spawned_receiver/8]). %% Spawned lock taking process 29 30-import(mnesia_lib, [set/2, fatal/2, verbose/2, dbg_out/2]). 31 32-include("mnesia.hrl"). 33 34val(Var) -> 35 case ?catch_val(Var) of 36 {'EXIT', Reason} -> mnesia_lib:other_val(Var, Reason); 37 Value -> Value 38 end. 39 40%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 41%% Load a table from local disc 42 43disc_load_table(Tab, Reason) -> 44 Storage = val({Tab, storage_type}), 45 Type = val({Tab, setorbag}), 46 dbg_out("Getting table ~p (~p) from disc: ~p~n", 47 [Tab, Storage, Reason]), 48 ?eval_debug_fun({?MODULE, do_get_disc_copy}, 49 [{tab, Tab}, 50 {reason, Reason}, 51 {storage, Storage}, 52 {type, Type}]), 53 do_get_disc_copy2(Tab, Reason, Storage, Type). 54 55do_get_disc_copy2(Tab, _Reason, Storage, _Type) when Storage == unknown -> 56 verbose("Local table copy of ~p has recently been deleted, ignored.~n", 57 [Tab]), 58 {loaded, ok}; %% ? 59do_get_disc_copy2(Tab, Reason, Storage, Type) when Storage == disc_copies -> 60 %% NOW we create the actual table 61 Repair = mnesia_monitor:get_env(auto_repair), 62 Args = [{keypos, 2}, public, named_table, Type], 63 case Reason of 64 {dumper, _} -> %% Resources already allocated 65 ignore; 66 _ -> 67 mnesia_monitor:mktab(Tab, Args), 68 Count = mnesia_log:dcd2ets(Tab, Repair), 69 case ets:info(Tab, size) of 70 X when X < Count * 4 -> 71 ok = mnesia_log:ets2dcd(Tab); 72 _ -> 73 ignore 74 end 75 end, 76 mnesia_index:init_index(Tab, Storage), 77 snmpify(Tab, Storage), 78 set({Tab, load_node}, node()), 79 set({Tab, load_reason}, Reason), 80 {loaded, ok}; 81 82do_get_disc_copy2(Tab, Reason, Storage, Type) when Storage == ram_copies -> 83 Args = [{keypos, 2}, public, named_table, Type], 84 case Reason of 85 {dumper, _} -> %% Resources already allocated 86 ignore; 87 _ -> 88 mnesia_monitor:mktab(Tab, Args), 89 Fname = mnesia_lib:tab2dcd(Tab), 90 Datname = mnesia_lib:tab2dat(Tab), 91 Repair = mnesia_monitor:get_env(auto_repair), 92 case mnesia_monitor:use_dir() of 93 true -> 94 case mnesia_lib:exists(Fname) of 95 true -> mnesia_log:dcd2ets(Tab, Repair); 96 false -> 97 case mnesia_lib:exists(Datname) of 98 true -> 99 mnesia_lib:dets_to_ets(Tab, Tab, Datname, 100 Type, Repair, no); 101 false -> 102 false 103 end 104 end; 105 false -> 106 false 107 end 108 end, 109 mnesia_index:init_index(Tab, Storage), 110 snmpify(Tab, Storage), 111 set({Tab, load_node}, node()), 112 set({Tab, load_reason}, Reason), 113 {loaded, ok}; 114 115do_get_disc_copy2(Tab, Reason, Storage, Type) when Storage == disc_only_copies -> 116 Args = [{file, mnesia_lib:tab2dat(Tab)}, 117 {type, mnesia_lib:disk_type(Tab, Type)}, 118 {keypos, 2}, 119 {repair, mnesia_monitor:get_env(auto_repair)}], 120 case Reason of 121 {dumper, _} -> 122 mnesia_index:init_index(Tab, Storage), 123 snmpify(Tab, Storage), 124 set({Tab, load_node}, node()), 125 set({Tab, load_reason}, Reason), 126 {loaded, ok}; 127 _ -> 128 case mnesia_monitor:open_dets(Tab, Args) of 129 {ok, _} -> 130 mnesia_index:init_index(Tab, Storage), 131 snmpify(Tab, Storage), 132 set({Tab, load_node}, node()), 133 set({Tab, load_reason}, Reason), 134 {loaded, ok}; 135 {error, Error} -> 136 {not_loaded, {"Failed to create dets table", Error}} 137 end 138 end. 139 140%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 141%% Load a table from a remote node 142%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 143%% 144%% Receiver Sender 145%% -------- ------ 146%% Grab schema lock on table 147%% Determine table size 148%% Create empty pre-grown table 149%% Grab read lock on table 150%% Let receiver subscribe on updates done on sender node 151%% Disable rehashing of table 152%% Release read lock on table 153%% Send table to receiver in chunks 154%% 155%% Grab read lock on table 156%% Block dirty updates 157%% Update wherabouts 158%% 159%% Cancel the update subscription 160%% Process the subscription events 161%% Optionally dump to disc 162%% Unblock dirty updates 163%% Release read lock on table 164%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 165 166-define(MAX_TRANSFER_SIZE, 7500). 167-define(MAX_RAM_FILE_SIZE, 1000000). 168-define(MAX_RAM_TRANSFERS, (?MAX_RAM_FILE_SIZE div ?MAX_TRANSFER_SIZE) + 1). 169-define(MAX_NOPACKETS, 20). 170 171net_load_table(Tab, Reason, Ns, Cs) 172 when Reason == {dumper,add_table_copy} -> 173 try_net_load_table(Tab, Reason, Ns, Cs); 174net_load_table(Tab, Reason, Ns, _Cs) -> 175 try_net_load_table(Tab, Reason, Ns, val({Tab, cstruct})). 176 177try_net_load_table(Tab, _Reason, [], _Cs) -> 178 verbose("Copy failed. No active replicas of ~p are available.~n", [Tab]), 179 {not_loaded, none_active}; 180try_net_load_table(Tab, Reason, Ns, Cs) -> 181 Storage = mnesia_lib:cs_to_storage_type(node(), Cs), 182 do_get_network_copy(Tab, Reason, Ns, Storage, Cs). 183 184do_get_network_copy(Tab, _Reason, _Ns, unknown, _Cs) -> 185 verbose("Local table copy of ~p has recently been deleted, ignored.~n", [Tab]), 186 {not_loaded, storage_unknown}; 187do_get_network_copy(Tab, Reason, Ns, Storage, Cs) -> 188 [Node | Tail] = Ns, 189 dbg_out("Getting table ~p (~p) from node ~p: ~p~n", 190 [Tab, Storage, Node, Reason]), 191 ?eval_debug_fun({?MODULE, do_get_network_copy}, 192 [{tab, Tab}, {reason, Reason}, 193 {nodes, Ns}, {storage, Storage}]), 194 mnesia_controller:start_remote_sender(Node, Tab, self(), Storage), 195 put(mnesia_table_sender_node, {Tab, Node}), 196 case init_receiver(Node, Tab, Storage, Cs, Reason) of 197 ok -> 198 set({Tab, load_node}, Node), 199 set({Tab, load_reason}, Reason), 200 mnesia_controller:i_have_tab(Tab), 201 dbg_out("Table ~p copied from ~p to ~p~n", [Tab, Node, node()]), 202 {loaded, ok}; 203 Err = {error, _} when element(1, Reason) == dumper -> 204 {not_loaded,Err}; 205 restart -> 206 try_net_load_table(Tab, Reason, Tail, Cs); 207 down -> 208 try_net_load_table(Tab, Reason, Tail, Cs) 209 end. 210 211snmpify(Tab, Storage) -> 212 do_snmpify(Tab, val({Tab, snmp}), Storage). 213 214do_snmpify(_Tab, [], _Storage) -> 215 ignore; 216do_snmpify(Tab, Us, Storage) -> 217 Snmp = mnesia_snmp_hook:create_table(Us, Tab, Storage), 218 set({Tab, {index, snmp}}, Snmp). 219 220%% Start the recieiver 221%% Sender should be started first, so we don't have the schema-read 222%% lock to long (or get stuck in a deadlock) 223init_receiver(Node, Tab, Storage, Cs, Reason) -> 224 receive 225 {SenderPid, {first, TabSize}} -> 226 spawn_receiver(Tab,Storage,Cs,SenderPid, 227 TabSize,false,Reason); 228 {SenderPid, {first, TabSize, DetsData}} -> 229 spawn_receiver(Tab,Storage,Cs,SenderPid, 230 TabSize,DetsData,Reason); 231 %% Protocol conversion hack 232 {copier_done, Node} -> 233 dbg_out("Sender of table ~p crashed on node ~p ~n", [Tab, Node]), 234 down(Tab, Storage) 235 end. 236 237 238table_init_fun(SenderPid) -> 239 PConv = mnesia_monitor:needs_protocol_conversion(node(SenderPid)), 240 MeMyselfAndI = self(), 241 fun(read) -> 242 Receiver = 243 if 244 PConv == true -> 245 MeMyselfAndI ! {actual_tabrec, self()}, 246 MeMyselfAndI; %% Old mnesia 247 PConv == false -> self() 248 end, 249 SenderPid ! {Receiver, more}, 250 get_data(SenderPid, Receiver) 251 end. 252 253 254%% Add_table_copy get's it's own locks. 255spawn_receiver(Tab,Storage,Cs,SenderPid,TabSize,DetsData,{dumper,add_table_copy}) -> 256 Init = table_init_fun(SenderPid), 257 case do_init_table(Tab,Storage,Cs,SenderPid,TabSize,DetsData,self(), Init) of 258 Err = {error, _} -> 259 SenderPid ! {copier_done, node()}, 260 Err; 261 Else -> 262 Else 263 end; 264 265spawn_receiver(Tab,Storage,Cs,SenderPid, 266 TabSize,DetsData,Reason) -> 267 %% Grab a schema lock to avoid deadlock between table_loader and schema_commit dumping. 268 %% Both may grab tables-locks in different order. 269 Load = fun() -> 270 {_,Tid,Ts} = get(mnesia_activity_state), 271 mnesia_locker:rlock(Tid, Ts#tidstore.store, 272 {schema, Tab}), 273 Init = table_init_fun(SenderPid), 274 Pid = spawn_link(?MODULE, spawned_receiver, 275 [self(),Tab,Storage,Cs, 276 SenderPid,TabSize,DetsData, 277 Init]), 278 put(mnesia_real_loader, Pid), 279 wait_on_load_complete(Pid) 280 end, 281 Res = case mnesia:transaction(Load, 20) of 282 {'atomic', {error,Result}} when element(1,Reason) == dumper -> 283 SenderPid ! {copier_done, node()}, 284 {error,Result}; 285 {'atomic', {error,Result}} -> 286 SenderPid ! {copier_done, node()}, 287 fatal("Cannot create table ~p: ~p~n", 288 [[Tab, Storage], Result]); 289 {'atomic', Result} -> Result; 290 {aborted, nomore} -> 291 SenderPid ! {copier_done, node()}, 292 restart; 293 {aborted, _ } -> 294 SenderPid ! {copier_done, node()}, 295 down %% either this node or sender is dying 296 end, 297 unlink(whereis(mnesia_tm)), %% Avoid late unlink from tm 298 Res. 299 300spawned_receiver(ReplyTo,Tab,Storage,Cs, 301 SenderPid,TabSize,DetsData, Init) -> 302 process_flag(trap_exit, true), 303 Done = do_init_table(Tab,Storage,Cs, 304 SenderPid,TabSize,DetsData, 305 ReplyTo, Init), 306 ReplyTo ! {self(),Done}, 307 unlink(ReplyTo), 308 unlink(whereis(mnesia_controller)), 309 exit(normal). 310 311wait_on_load_complete(Pid) -> 312 receive 313 {Pid, Res} -> 314 Res; 315 {'EXIT', Pid, Reason} -> 316 exit(Reason); 317 Else -> 318 Pid ! Else, 319 wait_on_load_complete(Pid) 320 end. 321 322tab_receiver(Node, Tab, Storage, Cs, PConv, OrigTabRec) -> 323 receive 324 {SenderPid, {no_more, DatBin}} when PConv == false -> 325 finish_copy(Storage,Tab,Cs,SenderPid,DatBin,OrigTabRec); 326 327 %% Protocol conversion hack 328 {SenderPid, {no_more, DatBin}} when pid(PConv) -> 329 PConv ! {SenderPid, no_more}, 330 receive 331 {old_init_table_complete, ok} -> 332 finish_copy(Storage, Tab, Cs, SenderPid, DatBin,OrigTabRec); 333 {old_init_table_complete, Reason} -> 334 Msg = "OLD: [d]ets:init table failed", 335 dbg_out("~s: ~p: ~p~n", [Msg, Tab, Reason]), 336 down(Tab, Storage) 337 end; 338 339 {actual_tabrec, Pid} -> 340 tab_receiver(Node, Tab, Storage, Cs, Pid,OrigTabRec); 341 342 {SenderPid, {more, [Recs]}} when pid(PConv) -> 343 PConv ! {SenderPid, {more, Recs}}, %% Forward Msg to OldNodes 344 tab_receiver(Node, Tab, Storage, Cs, PConv,OrigTabRec); 345 346 {'EXIT', PConv, Reason} -> %% [d]ets:init process crashed 347 Msg = "Receiver crashed", 348 dbg_out("~s: ~p: ~p~n", [Msg, Tab, Reason]), 349 down(Tab, Storage); 350 351 %% Protocol conversion hack 352 {copier_done, Node} -> 353 dbg_out("Sender of table ~p crashed on node ~p ~n", [Tab, Node]), 354 down(Tab, Storage); 355 356 {'EXIT', Pid, Reason} -> 357 handle_exit(Pid, Reason), 358 tab_receiver(Node, Tab, Storage, Cs, PConv,OrigTabRec) 359 end. 360 361create_table(Tab, TabSize, Storage, Cs) -> 362 if 363 Storage == disc_only_copies -> 364 mnesia_lib:lock_table(Tab), 365 Tmp = mnesia_lib:tab2tmp(Tab), 366 Size = lists:max([TabSize, 256]), 367 Args = [{file, Tmp}, 368 {keypos, 2}, 369%% {ram_file, true}, 370 {estimated_no_objects, Size}, 371 {repair, mnesia_monitor:get_env(auto_repair)}, 372 {type, mnesia_lib:disk_type(Tab, Cs#cstruct.type)}], 373 file:delete(Tmp), 374 case mnesia_lib:dets_sync_open(Tab, Args) of 375 {ok, _} -> 376 mnesia_lib:unlock_table(Tab), 377 {Storage, Tab}; 378 Else -> 379 mnesia_lib:unlock_table(Tab), 380 Else 381 end; 382 (Storage == ram_copies) or (Storage == disc_copies) -> 383 Args = [{keypos, 2}, public, named_table, Cs#cstruct.type], 384 case mnesia_monitor:unsafe_mktab(Tab, Args) of 385 Tab -> 386 {Storage, Tab}; 387 Else -> 388 Else 389 end 390 end. 391 392do_init_table(Tab,Storage,Cs,SenderPid, 393 TabSize,DetsInfo,OrigTabRec,Init) -> 394 case create_table(Tab, TabSize, Storage, Cs) of 395 {Storage,Tab} -> 396 %% Debug info 397 Node = node(SenderPid), 398 put(mnesia_table_receiver, {Tab, Node, SenderPid}), 399 mnesia_tm:block_tab(Tab), 400 PConv = mnesia_monitor:needs_protocol_conversion(Node), 401 402 case init_table(Tab,Storage,Init,PConv,DetsInfo,SenderPid) of 403 ok -> 404 tab_receiver(Node,Tab,Storage,Cs,PConv,OrigTabRec); 405 Reason -> 406 Msg = "[d]ets:init table failed", 407 dbg_out("~s: ~p: ~p~n", [Msg, Tab, Reason]), 408 down(Tab, Storage) 409 end; 410 Error -> 411 Error 412 end. 413 414make_table_fun(Pid, TabRec) -> 415 fun(close) -> 416 ok; 417 (read) -> 418 get_data(Pid, TabRec) 419 end. 420 421get_data(Pid, TabRec) -> 422 receive 423 {Pid, {more, Recs}} -> 424 Pid ! {TabRec, more}, 425 {Recs, make_table_fun(Pid,TabRec)}; 426 {Pid, no_more} -> 427 end_of_input; 428 {copier_done, Node} -> 429 case node(Pid) of 430 Node -> 431 {copier_done, Node}; 432 _ -> 433 get_data(Pid, TabRec) 434 end; 435 {'EXIT', Pid, Reason} -> 436 handle_exit(Pid, Reason), 437 get_data(Pid, TabRec) 438 end. 439 440init_table(Tab, disc_only_copies, Fun, false, DetsInfo,Sender) -> 441 ErtsVer = erlang:system_info(version), 442 case DetsInfo of 443 {ErtsVer, DetsData} -> 444 Res = (catch dets:is_compatible_bchunk_format(Tab, DetsData)), 445 case Res of 446 {'EXIT',{undef,[{dets,_,_}|_]}} -> 447 Sender ! {self(), {old_protocol, Tab}}, 448 dets:init_table(Tab, Fun); %% Old dets version 449 {'EXIT', What} -> 450 exit(What); 451 false -> 452 Sender ! {self(), {old_protocol, Tab}}, 453 dets:init_table(Tab, Fun); %% Old dets version 454 true -> 455 dets:init_table(Tab, Fun, [{format, bchunk}]) 456 end; 457 Old when Old /= false -> 458 Sender ! {self(), {old_protocol, Tab}}, 459 dets:init_table(Tab, Fun); %% Old dets version 460 _ -> 461 dets:init_table(Tab, Fun) 462 end; 463init_table(Tab, _, Fun, false, _DetsInfo,_) -> 464 case catch ets:init_table(Tab, Fun) of 465 true -> 466 ok; 467 {'EXIT', Else} -> Else 468 end; 469init_table(Tab, Storage, Fun, true, _DetsInfo, Sender) -> %% Old Nodes 470 spawn_link(?MODULE, old_node_init_table, 471 [Tab, Storage, Fun, self(), false, Sender]), 472 ok. 473 474old_node_init_table(Tab, Storage, Fun, TabReceiver, DetsInfo,Sender) -> 475 Res = init_table(Tab, Storage, Fun, false, DetsInfo,Sender), 476 TabReceiver ! {old_init_table_complete, Res}, 477 unlink(TabReceiver), 478 ok. 479 480finish_copy(Storage,Tab,Cs,SenderPid,DatBin,OrigTabRec) -> 481 TabRef = {Storage, Tab}, 482 subscr_receiver(TabRef, Cs#cstruct.record_name), 483 case handle_last(TabRef, Cs#cstruct.type, DatBin) of 484 ok -> 485 mnesia_index:init_index(Tab, Storage), 486 snmpify(Tab, Storage), 487 %% OrigTabRec must not be the spawned tab-receiver 488 %% due to old protocol. 489 SenderPid ! {OrigTabRec, no_more}, 490 mnesia_tm:unblock_tab(Tab), 491 ok; 492 {error, Reason} -> 493 Msg = "Failed to handle last", 494 dbg_out("~s: ~p: ~p~n", [Msg, Tab, Reason]), 495 down(Tab, Storage) 496 end. 497 498subscr_receiver(TabRef = {_, Tab}, RecName) -> 499 receive 500 {mnesia_table_event, {Op, Val, _Tid}} -> 501 if 502 Tab == RecName -> 503 handle_event(TabRef, Op, Val); 504 true -> 505 handle_event(TabRef, Op, setelement(1, Val, RecName)) 506 end, 507 subscr_receiver(TabRef, RecName); 508 509 {'EXIT', Pid, Reason} -> 510 handle_exit(Pid, Reason), 511 subscr_receiver(TabRef, RecName) 512 after 0 -> 513 ok 514 end. 515 516handle_event(TabRef, write, Rec) -> 517 db_put(TabRef, Rec); 518handle_event(TabRef, delete, {_Tab, Key}) -> 519 db_erase(TabRef, Key); 520handle_event(TabRef, delete_object, OldRec) -> 521 db_match_erase(TabRef, OldRec); 522handle_event(TabRef, clear_table, {_Tab, _Key}) -> 523 db_match_erase(TabRef, '_'). 524 525handle_last({disc_copies, Tab}, _Type, nobin) -> 526 Ret = mnesia_log:ets2dcd(Tab), 527 Fname = mnesia_lib:tab2dat(Tab), 528 case mnesia_lib:exists(Fname) of 529 true -> %% Remove old .DAT files. 530 file:delete(Fname); 531 false -> 532 ok 533 end, 534 Ret; 535 536handle_last({disc_only_copies, Tab}, Type, nobin) -> 537 case mnesia_lib:swap_tmp_files([Tab]) of 538 [] -> 539 Args = [{file, mnesia_lib:tab2dat(Tab)}, 540 {type, mnesia_lib:disk_type(Tab, Type)}, 541 {keypos, 2}, 542 {repair, mnesia_monitor:get_env(auto_repair)}], 543 mnesia_monitor:open_dets(Tab, Args), 544 ok; 545 L when list(L) -> 546 {error, {"Cannot swap tmp files", Tab, L}} 547 end; 548 549handle_last({ram_copies, _Tab}, _Type, nobin) -> 550 ok; 551handle_last({ram_copies, Tab}, _Type, DatBin) -> 552 case mnesia_monitor:use_dir() of 553 true -> 554 mnesia_lib:lock_table(Tab), 555 Tmp = mnesia_lib:tab2tmp(Tab), 556 ok = file:write_file(Tmp, DatBin), 557 ok = file:rename(Tmp, mnesia_lib:tab2dcd(Tab)), 558 mnesia_lib:unlock_table(Tab), 559 ok; 560 false -> 561 ok 562 end. 563 564down(Tab, Storage) -> 565 case Storage of 566 ram_copies -> 567 catch ?ets_delete_table(Tab); 568 disc_copies -> 569 catch ?ets_delete_table(Tab); 570 disc_only_copies -> 571 mnesia_lib:cleanup_tmp_files([Tab]) 572 end, 573 mnesia_checkpoint:tm_del_copy(Tab, node()), 574 mnesia_controller:sync_del_table_copy_whereabouts(Tab, node()), 575 mnesia_tm:unblock_tab(Tab), 576 flush_subcrs(), 577 down. 578 579flush_subcrs() -> 580 receive 581 {mnesia_table_event, _} -> 582 flush_subcrs(); 583 584 {'EXIT', Pid, Reason} -> 585 handle_exit(Pid, Reason), 586 flush_subcrs() 587 after 0 -> 588 done 589 end. 590 591db_erase({ram_copies, Tab}, Key) -> 592 true = ?ets_delete(Tab, Key); 593db_erase({disc_copies, Tab}, Key) -> 594 true = ?ets_delete(Tab, Key); 595db_erase({disc_only_copies, Tab}, Key) -> 596 ok = dets:delete(Tab, Key). 597 598db_match_erase({ram_copies, Tab} , Pat) -> 599 true = ?ets_match_delete(Tab, Pat); 600db_match_erase({disc_copies, Tab} , Pat) -> 601 true = ?ets_match_delete(Tab, Pat); 602db_match_erase({disc_only_copies, Tab}, Pat) -> 603 ok = dets:match_delete(Tab, Pat). 604 605db_put({ram_copies, Tab}, Val) -> 606 true = ?ets_insert(Tab, Val); 607db_put({disc_copies, Tab}, Val) -> 608 true = ?ets_insert(Tab, Val); 609db_put({disc_only_copies, Tab}, Val) -> 610 ok = dets:insert(Tab, Val). 611 612%% This code executes at the remote site where the data is 613%% executes in a special copier process. 614 615calc_nokeys(Storage, Tab) -> 616 %% Calculate #keys per transfer 617 Key = mnesia_lib:db_first(Storage, Tab), 618 Recs = mnesia_lib:db_get(Storage, Tab, Key), 619 BinSize = size(term_to_binary(Recs)), 620 (?MAX_TRANSFER_SIZE div BinSize) + 1. 621 622send_table(Pid, Tab, RemoteS) -> 623 case ?catch_val({Tab, storage_type}) of 624 {'EXIT', _} -> 625 {error, {no_exists, Tab}}; 626 unknown -> 627 {error, {no_exists, Tab}}; 628 Storage -> 629 %% Send first 630 TabSize = mnesia:table_info(Tab, size), 631 Pconvert = mnesia_monitor:needs_protocol_conversion(node(Pid)), 632 KeysPerTransfer = calc_nokeys(Storage, Tab), 633 ChunkData = dets:info(Tab, bchunk_format), 634 635 UseDetsChunk = 636 Storage == RemoteS andalso 637 Storage == disc_only_copies andalso 638 ChunkData /= undefined andalso 639 Pconvert == false, 640 if 641 UseDetsChunk == true -> 642 DetsInfo = erlang:system_info(version), 643 Pid ! {self(), {first, TabSize, {DetsInfo, ChunkData}}}; 644 true -> 645 Pid ! {self(), {first, TabSize}} 646 end, 647 648 %% Debug info 649 put(mnesia_table_sender, {Tab, node(Pid), Pid}), 650 {Init, Chunk} = reader_funcs(UseDetsChunk, Tab, Storage, KeysPerTransfer), 651 652 SendIt = fun() -> 653 prepare_copy(Pid, Tab, Storage), 654 send_more(Pid, 1, Chunk, Init(), Tab, Pconvert), 655 finish_copy(Pid, Tab, Storage, RemoteS) 656 end, 657 658 case catch SendIt() of 659 receiver_died -> 660 cleanup_tab_copier(Pid, Storage, Tab), 661 unlink(whereis(mnesia_tm)), 662 ok; 663 {_, receiver_died} -> 664 unlink(whereis(mnesia_tm)), 665 ok; 666 {'atomic', no_more} -> 667 unlink(whereis(mnesia_tm)), 668 ok; 669 Reason -> 670 cleanup_tab_copier(Pid, Storage, Tab), 671 unlink(whereis(mnesia_tm)), 672 {error, Reason} 673 end 674 end. 675 676prepare_copy(Pid, Tab, Storage) -> 677 Trans = 678 fun() -> 679 mnesia:write_lock_table(Tab), 680 mnesia_subscr:subscribe(Pid, {table, Tab}), 681 update_where_to_write(Tab, node(Pid)), 682 mnesia_lib:db_fixtable(Storage, Tab, true), 683 ok 684 end, 685 case mnesia:transaction(Trans) of 686 {'atomic', ok} -> 687 ok; 688 {aborted, Reason} -> 689 exit({tab_copier_prepare, Tab, Reason}) 690 end. 691 692update_where_to_write(Tab, Node) -> 693 case val({Tab, access_mode}) of 694 read_only -> 695 ignore; 696 read_write -> 697 Current = val({current, db_nodes}), 698 Ns = 699 case lists:member(Node, Current) of 700 true -> Current; 701 false -> [Node | Current] 702 end, 703 update_where_to_write(Ns, Tab, Node) 704 end. 705 706update_where_to_write([], _, _) -> 707 ok; 708update_where_to_write([H|T], Tab, AddNode) -> 709 rpc:call(H, mnesia_controller, call, 710 [{update_where_to_write, [add, Tab, AddNode], self()}]), 711 update_where_to_write(T, Tab, AddNode). 712 713send_more(Pid, N, Chunk, DataState, Tab, OldNode) -> 714 receive 715 {NewPid, more} -> 716 case send_packet(N - 1, NewPid, Chunk, DataState, OldNode) of 717 New when integer(New) -> 718 New - 1; 719 NewData -> 720 send_more(NewPid, ?MAX_NOPACKETS, Chunk, NewData, Tab, OldNode) 721 end; 722 {_NewPid, {old_protocol, Tab}} -> 723 Storage = val({Tab, storage_type}), 724 {Init, NewChunk} = 725 reader_funcs(false, Tab, Storage, calc_nokeys(Storage, Tab)), 726 send_more(Pid, 1, NewChunk, Init(), Tab, OldNode); 727 728 {copier_done, Node} when Node == node(Pid)-> 729 verbose("Receiver of table ~p crashed on ~p (more)~n", [Tab, Node]), 730 throw(receiver_died) 731 end. 732 733reader_funcs(UseDetsChunk, Tab, Storage, KeysPerTransfer) -> 734 case UseDetsChunk of 735 false -> 736 {fun() -> mnesia_lib:db_init_chunk(Storage, Tab, KeysPerTransfer) end, 737 fun(Cont) -> mnesia_lib:db_chunk(Storage, Cont) end}; 738 true -> 739 {fun() -> dets_bchunk(Tab, start) end, 740 fun(Cont) -> dets_bchunk(Tab, Cont) end} 741 end. 742 743dets_bchunk(Tab, Chunk) -> %% Arrg 744 case dets:bchunk(Tab, Chunk) of 745 {Cont, Data} -> {Data, Cont}; 746 Else -> Else 747 end. 748 749send_packet(N, Pid, _Chunk, '$end_of_table', OldNode) -> 750 case OldNode of 751 true -> ignore; %% Old nodes can't handle the new no_more 752 false -> Pid ! {self(), no_more} 753 end, 754 N; 755send_packet(N, Pid, Chunk, {[], Cont}, OldNode) -> 756 send_packet(N, Pid, Chunk, Chunk(Cont), OldNode); 757send_packet(N, Pid, Chunk, {Recs, Cont}, OldNode) when N < ?MAX_NOPACKETS -> 758 case OldNode of 759 true -> Pid ! {self(), {more, [Recs]}}; %% Old need's wrapping list 760 false -> Pid ! {self(), {more, Recs}} 761 end, 762 send_packet(N+1, Pid, Chunk, Chunk(Cont), OldNode); 763send_packet(_N, _Pid, _Chunk, DataState, _OldNode) -> 764 DataState. 765 766finish_copy(Pid, Tab, Storage, RemoteS) -> 767 RecNode = node(Pid), 768 DatBin = dat2bin(Tab, Storage, RemoteS), 769 Trans = 770 fun() -> 771 mnesia:read_lock_table(Tab), 772 A = val({Tab, access_mode}), 773 mnesia_controller:sync_and_block_table_whereabouts(Tab, RecNode, RemoteS, A), 774 cleanup_tab_copier(Pid, Storage, Tab), 775 mnesia_checkpoint:tm_add_copy(Tab, RecNode), 776 Pid ! {self(), {no_more, DatBin}}, 777 receive 778 {Pid, no_more} -> % Dont bother about the spurious 'more' message 779 no_more; 780 {copier_done, Node} when Node == node(Pid)-> 781 verbose("Tab receiver ~p crashed (more): ~p~n", [Tab, Node]), 782 receiver_died 783 end 784 end, 785 mnesia:transaction(Trans). 786 787cleanup_tab_copier(Pid, Storage, Tab) -> 788 mnesia_lib:db_fixtable(Storage, Tab, false), 789 mnesia_subscr:unsubscribe(Pid, {table, Tab}). 790 791dat2bin(Tab, ram_copies, ram_copies) -> 792 mnesia_lib:lock_table(Tab), 793 Res = file:read_file(mnesia_lib:tab2dcd(Tab)), 794 mnesia_lib:unlock_table(Tab), 795 case Res of 796 {ok, DatBin} -> DatBin; 797 _ -> nobin 798 end; 799dat2bin(_Tab, _LocalS, _RemoteS) -> 800 nobin. 801 802handle_exit(Pid, Reason) when node(Pid) == node() -> 803 exit(Reason); 804handle_exit(_Pid, _Reason) -> %% Not from our node, this will be handled by 805 ignore. %% mnesia_down soon. 806