1%% ``Licensed under the Apache License, Version 2.0 (the "License"); 2%% you may not use this file except in compliance with the License. 3%% You may obtain a copy of the License at 4%% 5%% http://www.apache.org/licenses/LICENSE-2.0 6%% 7%% Unless required by applicable law or agreed to in writing, software 8%% distributed under the License is distributed on an "AS IS" BASIS, 9%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 10%% See the License for the specific language governing permissions and 11%% limitations under the License. 12%% 13%% The Initial Developer of the Original Code is Ericsson Utvecklings AB. 14%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings 15%% AB. All Rights Reserved.'' 16%% 17%% $Id: mnesia_recover.erl,v 1.1 2008/12/17 09:53:39 mikpe Exp $ 18-module(mnesia_recover). 19 20-behaviour(gen_server). 21 22-export([ 23 allow_garb/0, 24 call/1, 25 connect_nodes/1, 26 disconnect/1, 27 dump_decision_tab/0, 28 get_master_node_info/0, 29 get_master_node_tables/0, 30 get_master_nodes/1, 31 get_mnesia_downs/0, 32 has_mnesia_down/1, 33 incr_trans_tid_serial/0, 34 init/0, 35 log_decision/1, 36 log_master_nodes/3, 37 log_mnesia_down/1, 38 log_mnesia_up/1, 39 mnesia_down/1, 40 note_decision/2, 41 note_log_decision/2, 42 outcome/2, 43 start/0, 44 start_garb/0, 45 still_pending/1, 46 sync_trans_tid_serial/1, 47 wait_for_decision/2, 48 what_happened/3 49 ]). 50 51%% gen_server callbacks 52-export([init/1, 53 handle_call/3, 54 handle_cast/2, 55 handle_info/2, 56 terminate/2, 57 code_change/3 58 ]). 59 60 61-include("mnesia.hrl"). 62-import(mnesia_lib, [set/2, verbose/2, error/2, fatal/2]). 63 64-record(state, {supervisor, 65 unclear_pid, 66 unclear_decision, 67 unclear_waitfor, 68 tm_queue_len = 0, 69 initiated = false, 70 early_msgs = [] 71 }). 72 73%%-define(DBG(F, A), mnesia:report_event(list_to_atom(lists:flatten(io_lib:format(F, A))))). 74%%-define(DBG(F, A), io:format("DBG: " ++ F, A)). 75 76-record(transient_decision, {tid, outcome}). 77 78start() -> 79 gen_server:start_link({local, ?MODULE}, ?MODULE, [self()], 80 [{timeout, infinity} 81 %%, {debug, [trace]} 82 ]). 83 84init() -> 85 call(init). 86 87start_garb() -> 88 Pid = whereis(mnesia_recover), 89 {ok, _} = timer:send_interval(timer:minutes(2), Pid, garb_decisions), 90 {ok, _} = timer:send_interval(timer:seconds(10), Pid, check_overload). 91 92allow_garb() -> 93 cast(allow_garb). 94 95 96%% The transaction log has either been swiched (latest -> previous) or 97%% there is nothing to be dumped. This means that the previous 98%% transaction log only may contain commit records which refers to 99%% transactions noted in the last two of the 'Prev' tables. All other 100%% tables may now be garbed by 'garb_decisions' (after 2 minutes). 101%% Max 10 tables are kept. 102do_allow_garb() -> 103 %% The order of the following stuff is important! 104 Curr = val(latest_transient_decision), 105 Old = val(previous_transient_decisions), 106 Next = create_transient_decision(), 107 {Prev, ReallyOld} = sublist([Curr | Old], 10, []), 108 [?ets_delete_table(Tab) || Tab <- ReallyOld], 109 set(previous_transient_decisions, Prev), 110 set(latest_transient_decision, Next). 111 112sublist([H|R], N, Acc) when N > 0 -> 113 sublist(R, N-1, [H| Acc]); 114sublist(List, _N, Acc) -> 115 {lists:reverse(Acc), List}. 116 117do_garb_decisions() -> 118 case val(previous_transient_decisions) of 119 [First, Second | Rest] -> 120 set(previous_transient_decisions, [First, Second]), 121 [?ets_delete_table(Tab) || Tab <- Rest]; 122 _ -> 123 ignore 124 end. 125 126connect_nodes([]) -> 127 []; 128connect_nodes(Ns) -> 129 %% Determine which nodes we should try to connect 130 AlreadyConnected = val(recover_nodes), 131 {_, Nodes} = mnesia_lib:search_delete(node(), Ns), 132 Check = Nodes -- AlreadyConnected, 133 GoodNodes = mnesia_monitor:negotiate_protocol(Check), 134 if 135 GoodNodes == [] -> 136 %% No good noodes to connect to 137 ignore; 138 true -> 139 %% Now we have agreed upon a protocol with some new nodes 140 %% and we may use them when we recover transactions 141 mnesia_lib:add_list(recover_nodes, GoodNodes), 142 cast({announce_all, GoodNodes}), 143 case get_master_nodes(schema) of 144 [] -> 145 Context = starting_partitioned_network, 146 mnesia_monitor:detect_inconcistency(GoodNodes, Context); 147 _ -> %% If master_nodes is set ignore old inconsistencies 148 ignore 149 end 150 end, 151 {GoodNodes, AlreadyConnected}. 152 153disconnect(Node) -> 154 mnesia_monitor:disconnect(Node), 155 mnesia_lib:del(recover_nodes, Node). 156 157val(Var) -> 158 case ?catch_val(Var) of 159 {'EXIT', Reason} -> mnesia_lib:other_val(Var, Reason); 160 Value -> Value 161 end. 162 163call(Msg) -> 164 Pid = whereis(?MODULE), 165 case Pid of 166 undefined -> 167 {error, {node_not_running, node()}}; 168 Pid -> 169 link(Pid), 170 Res = gen_server:call(Pid, Msg, infinity), 171 unlink(Pid), 172 173 %% We get an exit signal if server dies 174 receive 175 {'EXIT', Pid, _Reason} -> 176 {error, {node_not_running, node()}} 177 after 0 -> 178 ignore 179 end, 180 Res 181 end. 182 183multicall(Nodes, Msg) -> 184 rpc:multicall(Nodes, ?MODULE, call, [Msg]). 185 186cast(Msg) -> 187 case whereis(?MODULE) of 188 undefined -> ignore; 189 Pid -> gen_server:cast(Pid, Msg) 190 end. 191 192abcast(Nodes, Msg) -> 193 gen_server:abcast(Nodes, ?MODULE, Msg). 194 195note_decision(Tid, Outcome) -> 196 Tab = val(latest_transient_decision), 197 ?ets_insert(Tab, #transient_decision{tid = Tid, outcome = Outcome}). 198 199note_up(Node, _Date, _Time) -> 200 ?ets_delete(mnesia_decision, Node). 201 202note_down(Node, Date, Time) -> 203 ?ets_insert(mnesia_decision, {mnesia_down, Node, Date, Time}). 204 205note_master_nodes(Tab, []) -> 206 ?ets_delete(mnesia_decision, Tab); 207note_master_nodes(Tab, Nodes) when list(Nodes) -> 208 Master = {master_nodes, Tab, Nodes}, 209 ?ets_insert(mnesia_decision, Master). 210 211note_outcome(D) when D#decision.disc_nodes == [] -> 212%% ?DBG("~w: note_tmp_decision: ~w~n", [node(), D]), 213 note_decision(D#decision.tid, filter_outcome(D#decision.outcome)), 214 ?ets_delete(mnesia_decision, D#decision.tid); 215note_outcome(D) when D#decision.disc_nodes /= [] -> 216%% ?DBG("~w: note_decision: ~w~n", [node(), D]), 217 ?ets_insert(mnesia_decision, D). 218 219log_decision(D) when D#decision.outcome /= unclear -> 220 OldD = decision(D#decision.tid), 221 MergedD = merge_decisions(node(), OldD, D), 222 do_log_decision(MergedD, true); 223log_decision(D) -> 224 do_log_decision(D, false). 225 226do_log_decision(D, DoTell) -> 227 RamNs = D#decision.ram_nodes, 228 DiscNs = D#decision.disc_nodes -- [node()], 229 Outcome = D#decision.outcome, 230 D2 = 231 case Outcome of 232 aborted -> D#decision{disc_nodes = DiscNs}; 233 committed -> D#decision{disc_nodes = DiscNs}; 234 _ -> D 235 end, 236 note_outcome(D2), 237 case mnesia_monitor:use_dir() of 238 true -> 239 mnesia_log:append(latest_log, D2), 240 if 241 DoTell == true, Outcome /= unclear -> 242 tell_im_certain(DiscNs, D2), 243 tell_im_certain(RamNs, D2); 244 true -> 245 ignore 246 end; 247 false -> 248 ignore 249 end. 250 251tell_im_certain([], _D) -> 252 ignore; 253tell_im_certain(Nodes, D) -> 254 Msg = {im_certain, node(), D}, 255%% ?DBG("~w: ~w: tell: ~w~n", [node(), Msg, Nodes]), 256 abcast(Nodes, Msg). 257 258log_mnesia_up(Node) -> 259 call({log_mnesia_up, Node}). 260 261log_mnesia_down(Node) -> 262 call({log_mnesia_down, Node}). 263 264get_mnesia_downs() -> 265 Tab = mnesia_decision, 266 Pat = {mnesia_down, '_', '_', '_'}, 267 Downs = ?ets_match_object(Tab, Pat), 268 [Node || {mnesia_down, Node, _Date, _Time} <- Downs]. 269 270%% Check if we have got a mnesia_down from Node 271has_mnesia_down(Node) -> 272 case ?ets_lookup(mnesia_decision, Node) of 273 [{mnesia_down, Node, _Date, _Time}] -> 274 true; 275 [] -> 276 false 277 end. 278 279mnesia_down(Node) -> 280 case ?catch_val(recover_nodes) of 281 {'EXIT', _} -> 282 %% Not started yet 283 ignore; 284 _ -> 285 mnesia_lib:del(recover_nodes, Node), 286 cast({mnesia_down, Node}) 287 end. 288 289log_master_nodes(Args, UseDir, IsRunning) -> 290 if 291 IsRunning == yes -> 292 log_master_nodes2(Args, UseDir, IsRunning, ok); 293 UseDir == false -> 294 ok; 295 true -> 296 Name = latest_log, 297 Fname = mnesia_log:latest_log_file(), 298 Exists = mnesia_lib:exists(Fname), 299 Repair = mnesia:system_info(auto_repair), 300 OpenArgs = [{file, Fname}, {name, Name}, {repair, Repair}], 301 case disk_log:open(OpenArgs) of 302 {ok, Name} -> 303 log_master_nodes2(Args, UseDir, IsRunning, ok); 304 {repaired, Name, {recovered, _R}, {badbytes, _B}} 305 when Exists == true -> 306 log_master_nodes2(Args, UseDir, IsRunning, ok); 307 {repaired, Name, {recovered, _R}, {badbytes, _B}} 308 when Exists == false -> 309 mnesia_log:write_trans_log_header(), 310 log_master_nodes2(Args, UseDir, IsRunning, ok); 311 {error, Reason} -> 312 {error, Reason} 313 end 314 end. 315 316log_master_nodes2([{Tab, Nodes} | Tail], UseDir, IsRunning, WorstRes) -> 317 Res = 318 case IsRunning of 319 yes -> 320 R = call({log_master_nodes, Tab, Nodes, UseDir, IsRunning}), 321 mnesia_controller:master_nodes_updated(Tab, Nodes), 322 R; 323 _ -> 324 do_log_master_nodes(Tab, Nodes, UseDir, IsRunning) 325 end, 326 case Res of 327 ok -> 328 log_master_nodes2(Tail, UseDir, IsRunning, WorstRes); 329 {error, Reason} -> 330 log_master_nodes2(Tail, UseDir, IsRunning, {error, Reason}) 331 end; 332log_master_nodes2([], _UseDir, IsRunning, WorstRes) -> 333 case IsRunning of 334 yes -> 335 WorstRes; 336 _ -> 337 disk_log:close(latest_log), 338 WorstRes 339 end. 340 341get_master_node_info() -> 342 Tab = mnesia_decision, 343 Pat = {master_nodes, '_', '_'}, 344 case catch mnesia_lib:db_match_object(ram_copies,Tab, Pat) of 345 {'EXIT', _} -> 346 []; 347 Masters -> 348 Masters 349 end. 350 351get_master_node_tables() -> 352 Masters = get_master_node_info(), 353 [Tab || {master_nodes, Tab, _Nodes} <- Masters]. 354 355get_master_nodes(Tab) -> 356 case catch ?ets_lookup_element(mnesia_decision, Tab, 3) of 357 {'EXIT', _} -> []; 358 Nodes -> Nodes 359 end. 360 361%% Determine what has happened to the transaction 362what_happened(Tid, Protocol, Nodes) -> 363 Default = 364 case Protocol of 365 asym_trans -> aborted; 366 _ -> unclear %% sym_trans and sync_sym_trans 367 end, 368 This = node(), 369 case lists:member(This, Nodes) of 370 true -> 371 {ok, Outcome} = call({what_happened, Default, Tid}), 372 Others = Nodes -- [This], 373 case filter_outcome(Outcome) of 374 unclear -> what_happened_remotely(Tid, Default, Others); 375 aborted -> aborted; 376 committed -> committed 377 end; 378 false -> 379 what_happened_remotely(Tid, Default, Nodes) 380 end. 381 382what_happened_remotely(Tid, Default, Nodes) -> 383 {Replies, _} = multicall(Nodes, {what_happened, Default, Tid}), 384 check_what_happened(Replies, 0, 0). 385 386check_what_happened([H | T], Aborts, Commits) -> 387 case H of 388 {ok, R} -> 389 case filter_outcome(R) of 390 committed -> 391 check_what_happened(T, Aborts, Commits + 1); 392 aborted -> 393 check_what_happened(T, Aborts + 1, Commits); 394 unclear -> 395 check_what_happened(T, Aborts, Commits) 396 end; 397 {error, _} -> 398 check_what_happened(T, Aborts, Commits); 399 {badrpc, _} -> 400 check_what_happened(T, Aborts, Commits) 401 end; 402check_what_happened([], Aborts, Commits) -> 403 if 404 Aborts == 0, Commits == 0 -> aborted; % None of the active nodes knows 405 Aborts > 0 -> aborted; % Someody has aborted 406 Aborts == 0, Commits > 0 -> committed % All has committed 407 end. 408 409%% Determine what has happened to the transaction 410%% and possibly wait forever for the decision. 411wait_for_decision(presume_commit, _InitBy) -> 412 %% sym_trans 413 {{presume_commit, self()}, committed}; 414 415wait_for_decision(D, InitBy) when D#decision.outcome == presume_abort -> 416 %% asym_trans 417 Tid = D#decision.tid, 418 Outcome = filter_outcome(outcome(Tid, D#decision.outcome)), 419 if 420 Outcome /= unclear -> 421 {Tid, Outcome}; 422 423 InitBy /= startup -> 424 %% Wait a while for active transactions 425 %% to end and try again 426 timer:sleep(200), 427 wait_for_decision(D, InitBy); 428 429 InitBy == startup -> 430 {ok, Res} = call({wait_for_decision, D}), 431 {Tid, Res} 432 end. 433 434still_pending([Tid | Pending]) -> 435 case filter_outcome(outcome(Tid, unclear)) of 436 unclear -> [Tid | still_pending(Pending)]; 437 _ -> still_pending(Pending) 438 end; 439still_pending([]) -> 440 []. 441 442load_decision_tab() -> 443 Cont = mnesia_log:open_decision_tab(), 444 load_decision_tab(Cont, load_decision_tab), 445 mnesia_log:close_decision_tab(). 446 447load_decision_tab(eof, _InitBy) -> 448 ok; 449load_decision_tab(Cont, InitBy) -> 450 case mnesia_log:chunk_decision_tab(Cont) of 451 {Cont2, Decisions} -> 452 note_log_decisions(Decisions, InitBy), 453 load_decision_tab(Cont2, InitBy); 454 eof -> 455 ok 456 end. 457 458%% Dumps DECISION.LOG and PDECISION.LOG and removes them. 459%% From now on all decisions are logged in the transaction log file 460convert_old() -> 461 HasOldStuff = 462 mnesia_lib:exists(mnesia_log:previous_decision_log_file()) or 463 mnesia_lib:exists(mnesia_log:decision_log_file()), 464 case HasOldStuff of 465 true -> 466 mnesia_log:open_decision_log(), 467 dump_decision_log(startup), 468 dump_decision_log(startup), 469 mnesia_log:close_decision_log(), 470 Latest = mnesia_log:decision_log_file(), 471 ok = file:delete(Latest); 472 false -> 473 ignore 474 end. 475 476dump_decision_log(InitBy) -> 477 %% Assumed to be run in transaction log dumper process 478 Cont = mnesia_log:prepare_decision_log_dump(), 479 perform_dump_decision_log(Cont, InitBy). 480 481perform_dump_decision_log(eof, _InitBy) -> 482 confirm_decision_log_dump(); 483perform_dump_decision_log(Cont, InitBy) when InitBy == startup -> 484 case mnesia_log:chunk_decision_log(Cont) of 485 {Cont2, Decisions} -> 486 note_log_decisions(Decisions, InitBy), 487 perform_dump_decision_log(Cont2, InitBy); 488 eof -> 489 confirm_decision_log_dump() 490 end; 491perform_dump_decision_log(_Cont, _InitBy) -> 492 confirm_decision_log_dump(). 493 494confirm_decision_log_dump() -> 495 dump_decision_tab(), 496 mnesia_log:confirm_decision_log_dump(). 497 498dump_decision_tab() -> 499 Tab = mnesia_decision, 500 All = mnesia_lib:db_match_object(ram_copies,Tab, '_'), 501 mnesia_log:save_decision_tab({decision_list, All}). 502 503note_log_decisions([What | Tail], InitBy) -> 504 note_log_decision(What, InitBy), 505 note_log_decisions(Tail, InitBy); 506note_log_decisions([], _InitBy) -> 507 ok. 508 509note_log_decision(NewD, InitBy) when NewD#decision.outcome == pre_commit -> 510 note_log_decision(NewD#decision{outcome = unclear}, InitBy); 511 512note_log_decision(NewD, _InitBy) when record(NewD, decision) -> 513 Tid = NewD#decision.tid, 514 sync_trans_tid_serial(Tid), 515 OldD = decision(Tid), 516 MergedD = merge_decisions(node(), OldD, NewD), 517 note_outcome(MergedD); 518 519note_log_decision({trans_tid, serial, _Serial}, startup) -> 520 ignore; 521 522note_log_decision({trans_tid, serial, Serial}, _InitBy) -> 523 sync_trans_tid_serial(Serial); 524 525note_log_decision({mnesia_up, Node, Date, Time}, _InitBy) -> 526 note_up(Node, Date, Time); 527 528note_log_decision({mnesia_down, Node, Date, Time}, _InitBy) -> 529 note_down(Node, Date, Time); 530 531note_log_decision({master_nodes, Tab, Nodes}, _InitBy) -> 532 note_master_nodes(Tab, Nodes); 533 534note_log_decision(H, _InitBy) when H#log_header.log_kind == decision_log -> 535 V = mnesia_log:decision_log_version(), 536 if 537 H#log_header.log_version == V-> 538 ok; 539 H#log_header.log_version == "2.0" -> 540 verbose("Accepting an old version format of decision log: ~p~n", 541 [V]), 542 ok; 543 true -> 544 fatal("Bad version of decision log: ~p~n", [H]) 545 end; 546 547note_log_decision(H, _InitBy) when H#log_header.log_kind == decision_tab -> 548 V = mnesia_log:decision_tab_version(), 549 if 550 V == H#log_header.log_version -> 551 ok; 552 true -> 553 fatal("Bad version of decision tab: ~p~n", [H]) 554 end; 555note_log_decision({decision_list, ItemList}, InitBy) -> 556 note_log_decisions(ItemList, InitBy); 557note_log_decision(BadItem, InitBy) -> 558 exit({"Bad decision log item", BadItem, InitBy}). 559 560trans_tid_serial() -> 561 ?ets_lookup_element(mnesia_decision, serial, 3). 562 563set_trans_tid_serial(Val) -> 564 ?ets_insert(mnesia_decision, {trans_tid, serial, Val}). 565 566incr_trans_tid_serial() -> 567 ?ets_update_counter(mnesia_decision, serial, 1). 568 569sync_trans_tid_serial(ThatCounter) when integer(ThatCounter) -> 570 ThisCounter = trans_tid_serial(), 571 if 572 ThatCounter > ThisCounter -> 573 set_trans_tid_serial(ThatCounter + 1); 574 true -> 575 ignore 576 end; 577sync_trans_tid_serial(Tid) -> 578 sync_trans_tid_serial(Tid#tid.counter). 579 580%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 581%%% Callback functions from gen_server 582 583%%---------------------------------------------------------------------- 584%% Func: init/1 585%% Returns: {ok, State} | 586%% {ok, State, Timeout} | 587%% {stop, Reason} 588%%---------------------------------------------------------------------- 589init([Parent]) -> 590 process_flag(trap_exit, true), 591 mnesia_lib:verbose("~p starting: ~p~n", [?MODULE, self()]), 592 set(latest_transient_decision, create_transient_decision()), 593 set(previous_transient_decisions, []), 594 set(recover_nodes, []), 595 State = #state{supervisor = Parent}, 596 {ok, State}. 597 598create_transient_decision() -> 599 ?ets_new_table(mnesia_transient_decision, [{keypos, 2}, set, public]). 600 601%%---------------------------------------------------------------------- 602%% Func: handle_call/3 603%% Returns: {reply, Reply, State} | 604%% {reply, Reply, State, Timeout} | 605%% {noreply, State} | 606%% {noreply, State, Timeout} | 607%% {stop, Reason, Reply, State} | (terminate/2 is called) 608%%---------------------------------------------------------------------- 609 610handle_call(init, From, State) when State#state.initiated == false -> 611 Args = [{keypos, 2}, set, public, named_table], 612 case mnesia_monitor:use_dir() of 613 true -> 614 ?ets_new_table(mnesia_decision, Args), 615 set_trans_tid_serial(0), 616 TabFile = mnesia_log:decision_tab_file(), 617 case mnesia_lib:exists(TabFile) of 618 true -> 619 load_decision_tab(); 620 false -> 621 ignore 622 end, 623 convert_old(), 624 mnesia_dumper:opt_dump_log(scan_decisions); 625 false -> 626 ?ets_new_table(mnesia_decision, Args), 627 set_trans_tid_serial(0) 628 end, 629 handle_early_msgs(State, From); 630 631handle_call(Msg, From, State) when State#state.initiated == false -> 632 %% Buffer early messages 633 Msgs = State#state.early_msgs, 634 {noreply, State#state{early_msgs = [{call, Msg, From} | Msgs]}}; 635 636handle_call({what_happened, Default, Tid}, _From, State) -> 637 sync_trans_tid_serial(Tid), 638 Outcome = outcome(Tid, Default), 639 {reply, {ok, Outcome}, State}; 640 641handle_call({wait_for_decision, D}, From, State) -> 642 Recov = val(recover_nodes), 643 AliveRam = (mnesia_lib:intersect(D#decision.ram_nodes, Recov) -- [node()]), 644 RemoteDisc = D#decision.disc_nodes -- [node()], 645 if 646 AliveRam == [], RemoteDisc == [] -> 647 %% No more else to wait for and we may safely abort 648 {reply, {ok, aborted}, State}; 649 true -> 650 verbose("Transaction ~p is unclear. " 651 "Wait for disc nodes: ~w ram: ~w~n", 652 [D#decision.tid, RemoteDisc, AliveRam]), 653 AliveDisc = mnesia_lib:intersect(RemoteDisc, Recov), 654 Msg = {what_decision, node(), D}, 655 abcast(AliveRam, Msg), 656 abcast(AliveDisc, Msg), 657 case val(max_wait_for_decision) of 658 infinity -> 659 ignore; 660 MaxWait -> 661 ForceMsg = {force_decision, D#decision.tid}, 662 {ok, _} = timer:send_after(MaxWait, ForceMsg) 663 end, 664 State2 = State#state{unclear_pid = From, 665 unclear_decision = D, 666 unclear_waitfor = (RemoteDisc ++ AliveRam)}, 667 {noreply, State2} 668 end; 669 670handle_call({log_mnesia_up, Node}, _From, State) -> 671 do_log_mnesia_up(Node), 672 {reply, ok, State}; 673 674handle_call({log_mnesia_down, Node}, _From, State) -> 675 do_log_mnesia_down(Node), 676 {reply, ok, State}; 677 678handle_call({log_master_nodes, Tab, Nodes, UseDir, IsRunning}, _From, State) -> 679 do_log_master_nodes(Tab, Nodes, UseDir, IsRunning), 680 {reply, ok, State}; 681 682handle_call(Msg, _From, State) -> 683 error("~p got unexpected call: ~p~n", [?MODULE, Msg]), 684 {noreply, State}. 685 686do_log_mnesia_up(Node) -> 687 Yoyo = {mnesia_up, Node, Date = date(), Time = time()}, 688 case mnesia_monitor:use_dir() of 689 true -> 690 mnesia_log:append(latest_log, Yoyo), 691 disk_log:sync(latest_log); 692 false -> 693 ignore 694 end, 695 note_up(Node, Date, Time). 696 697do_log_mnesia_down(Node) -> 698 Yoyo = {mnesia_down, Node, Date = date(), Time = time()}, 699 case mnesia_monitor:use_dir() of 700 true -> 701 mnesia_log:append(latest_log, Yoyo), 702 disk_log:sync(latest_log); 703 false -> 704 ignore 705 end, 706 note_down(Node, Date, Time). 707 708do_log_master_nodes(Tab, Nodes, UseDir, IsRunning) -> 709 Master = {master_nodes, Tab, Nodes}, 710 Res = 711 case UseDir of 712 true -> 713 LogRes = mnesia_log:append(latest_log, Master), 714 disk_log:sync(latest_log), 715 LogRes; 716 false -> 717 ok 718 end, 719 case IsRunning of 720 yes -> 721 note_master_nodes(Tab, Nodes); 722 _NotRunning -> 723 ignore 724 end, 725 Res. 726 727%%---------------------------------------------------------------------- 728%% Func: handle_cast/2 729%% Returns: {noreply, State} | 730%% {noreply, State, Timeout} | 731%% {stop, Reason, State} (terminate/2 is called) 732%%---------------------------------------------------------------------- 733 734handle_cast(Msg, State) when State#state.initiated == false -> 735 %% Buffer early messages 736 Msgs = State#state.early_msgs, 737 {noreply, State#state{early_msgs = [{cast, Msg} | Msgs]}}; 738 739handle_cast({im_certain, Node, NewD}, State) -> 740 OldD = decision(NewD#decision.tid), 741 MergedD = merge_decisions(Node, OldD, NewD), 742 do_log_decision(MergedD, false), 743 {noreply, State}; 744 745handle_cast(allow_garb, State) -> 746 do_allow_garb(), 747 {noreply, State}; 748 749handle_cast({decisions, Node, Decisions}, State) -> 750 mnesia_lib:add(recover_nodes, Node), 751 State2 = add_remote_decisions(Node, Decisions, State), 752 {noreply, State2}; 753 754handle_cast({what_decision, Node, OtherD}, State) -> 755 Tid = OtherD#decision.tid, 756 sync_trans_tid_serial(Tid), 757 Decision = 758 case decision(Tid) of 759 no_decision -> OtherD; 760 MyD when record(MyD, decision) -> MyD 761 end, 762 announce([Node], [Decision], [], true), 763 {noreply, State}; 764 765handle_cast({mnesia_down, Node}, State) -> 766 case State#state.unclear_decision of 767 undefined -> 768 {noreply, State}; 769 D -> 770 case lists:member(Node, D#decision.ram_nodes) of 771 false -> 772 {noreply, State}; 773 true -> 774 State2 = add_remote_decision(Node, D, State), 775 {noreply, State2} 776 end 777 end; 778 779handle_cast({announce_all, Nodes}, State) -> 780 announce_all(Nodes, tabs()), 781 {noreply, State}; 782 783handle_cast(Msg, State) -> 784 error("~p got unexpected cast: ~p~n", [?MODULE, Msg]), 785 {noreply, State}. 786 787%%---------------------------------------------------------------------- 788%% Func: handle_info/2 789%% Returns: {noreply, State} | 790%% {noreply, State, Timeout} | 791%% {stop, Reason, State} (terminate/2 is called) 792%%---------------------------------------------------------------------- 793 794%% No need for buffering 795%% handle_info(Msg, State) when State#state.initiated == false -> 796%% %% Buffer early messages 797%% Msgs = State#state.early_msgs, 798%% {noreply, State#state{early_msgs = [{info, Msg} | Msgs]}}; 799 800handle_info(check_overload, S) -> 801 %% Time to check if mnesia_tm is overloaded 802 case whereis(mnesia_tm) of 803 Pid when pid(Pid) -> 804 805 Threshold = 100, 806 Prev = S#state.tm_queue_len, 807 {message_queue_len, Len} = 808 process_info(Pid, message_queue_len), 809 if 810 Len > Threshold, Prev > Threshold -> 811 What = {mnesia_tm, message_queue_len, [Prev, Len]}, 812 mnesia_lib:report_system_event({mnesia_overload, What}), 813 {noreply, S#state{tm_queue_len = 0}}; 814 815 Len > Threshold -> 816 {noreply, S#state{tm_queue_len = Len}}; 817 818 true -> 819 {noreply, S#state{tm_queue_len = 0}} 820 end; 821 undefined -> 822 {noreply, S} 823 end; 824 825handle_info(garb_decisions, State) -> 826 do_garb_decisions(), 827 {noreply, State}; 828 829handle_info({force_decision, Tid}, State) -> 830 %% Enforce a transaction recovery decision, 831 %% if we still are waiting for the outcome 832 833 case State#state.unclear_decision of 834 U when U#decision.tid == Tid -> 835 verbose("Decided to abort transaction ~p since " 836 "max_wait_for_decision has been exceeded~n", 837 [Tid]), 838 D = U#decision{outcome = aborted}, 839 State2 = add_remote_decision(node(), D, State), 840 {noreply, State2}; 841 _ -> 842 {noreply, State} 843 end; 844 845handle_info({'EXIT', Pid, R}, State) when Pid == State#state.supervisor -> 846 mnesia_lib:dbg_out("~p was ~p~n",[?MODULE, R]), 847 {stop, shutdown, State}; 848 849handle_info(Msg, State) -> 850 error("~p got unexpected info: ~p~n", [?MODULE, Msg]), 851 {noreply, State}. 852 853%%---------------------------------------------------------------------- 854%% Func: terminate/2 855%% Purpose: Shutdown the server 856%% Returns: any (ignored by gen_server) 857%%---------------------------------------------------------------------- 858 859terminate(Reason, State) -> 860 mnesia_monitor:terminate_proc(?MODULE, Reason, State). 861 862%%---------------------------------------------------------------------- 863%% Func: code_change/3 864%% Purpose: Upgrade process when its code is to be changed 865%% Returns: {ok, NewState} 866%%---------------------------------------------------------------------- 867code_change(_OldVsn, State, _Extra) -> 868 {ok, State}. 869 870%%%---------------------------------------------------------------------- 871%%% Internal functions 872%%%---------------------------------------------------------------------- 873 874handle_early_msgs(State, From) -> 875 Res = do_handle_early_msgs(State#state.early_msgs, 876 State#state{early_msgs = [], 877 initiated = true}), 878 gen_server:reply(From, ok), 879 Res. 880 881do_handle_early_msgs([Msg | Msgs], State) -> 882 %% The messages are in reverted order 883 case do_handle_early_msgs(Msgs, State) of 884 {stop, Reason, Reply, State2} -> 885 {stop, Reason, Reply, State2}; 886 {stop, Reason, State2} -> 887 {stop, Reason, State2}; 888 {noreply, State2} -> 889 handle_early_msg(Msg, State2) 890 end; 891 892do_handle_early_msgs([], State) -> 893 {noreply, State}. 894 895handle_early_msg({call, Msg, From}, State) -> 896 case handle_call(Msg, From, State) of 897 {reply, R, S} -> 898 gen_server:reply(From, R), 899 {noreply, S}; 900 Other -> 901 Other 902 end; 903handle_early_msg({cast, Msg}, State) -> 904 handle_cast(Msg, State); 905handle_early_msg({info, Msg}, State) -> 906 handle_info(Msg, State). 907 908tabs() -> 909 Curr = val(latest_transient_decision), % Do not miss any trans even 910 Prev = val(previous_transient_decisions), % if the tabs are switched 911 [Curr, mnesia_decision | Prev]. % Ordered by hit probability 912 913decision(Tid) -> 914 decision(Tid, tabs()). 915 916decision(Tid, [Tab | Tabs]) -> 917 case catch ?ets_lookup(Tab, Tid) of 918 [D] when record(D, decision) -> 919 D; 920 [C] when record(C, transient_decision) -> 921 #decision{tid = C#transient_decision.tid, 922 outcome = C#transient_decision.outcome, 923 disc_nodes = [], 924 ram_nodes = [] 925 }; 926 [] -> 927 decision(Tid, Tabs); 928 {'EXIT', _} -> 929 %% Recently switched transient decision table 930 decision(Tid, Tabs) 931 end; 932decision(_Tid, []) -> 933 no_decision. 934 935outcome(Tid, Default) -> 936 outcome(Tid, Default, tabs()). 937 938outcome(Tid, Default, [Tab | Tabs]) -> 939 case catch ?ets_lookup_element(Tab, Tid, 3) of 940 {'EXIT', _} -> 941 outcome(Tid, Default, Tabs); 942 Val -> 943 Val 944 end; 945outcome(_Tid, Default, []) -> 946 Default. 947 948filter_outcome(Val) -> 949 case Val of 950 unclear -> unclear; 951 aborted -> aborted; 952 presume_abort -> aborted; 953 committed -> committed; 954 pre_commit -> unclear 955 end. 956 957filter_aborted(D) when D#decision.outcome == presume_abort -> 958 D#decision{outcome = aborted}; 959filter_aborted(D) -> 960 D. 961 962%% Merge old decision D with new (probably remote) decision 963merge_decisions(Node, D, NewD0) -> 964 NewD = filter_aborted(NewD0), 965 if 966 D == no_decision, node() /= Node -> 967 %% We did not know anything about this txn 968 NewD#decision{disc_nodes = []}; 969 D == no_decision -> 970 NewD; 971 record(D, decision) -> 972 DiscNs = D#decision.disc_nodes -- ([node(), Node]), 973 OldD = filter_aborted(D#decision{disc_nodes = DiscNs}), 974%% mnesia_lib:dbg_out("merge ~w: NewD = ~w~n D = ~w~n OldD = ~w~n", 975%% [Node, NewD, D, OldD]), 976 if 977 OldD#decision.outcome == unclear, 978 NewD#decision.outcome == unclear -> 979 D; 980 981 OldD#decision.outcome == NewD#decision.outcome -> 982 %% We have come to the same decision 983 OldD; 984 985 OldD#decision.outcome == committed, 986 NewD#decision.outcome == aborted -> 987 %% Interesting! We have already committed, 988 %% but someone else has aborted. Now we 989 %% have a nice little inconcistency. The 990 %% other guy (or some one else) has 991 %% enforced a recovery decision when 992 %% max_wait_for_decision was exceeded. 993 %% We will pretend that we have obeyed 994 %% the forced recovery decision, but we 995 %% will also generate an event in case the 996 %% application wants to do something clever. 997 Msg = {inconsistent_database, bad_decision, Node}, 998 mnesia_lib:report_system_event(Msg), 999 OldD#decision{outcome = aborted}; 1000 1001 OldD#decision.outcome == aborted -> 1002 %% aborted overrrides anything 1003 OldD#decision{outcome = aborted}; 1004 1005 NewD#decision.outcome == aborted -> 1006 %% aborted overrrides anything 1007 OldD#decision{outcome = aborted}; 1008 1009 OldD#decision.outcome == committed, 1010 NewD#decision.outcome == unclear -> 1011 %% committed overrides unclear 1012 OldD#decision{outcome = committed}; 1013 1014 OldD#decision.outcome == unclear, 1015 NewD#decision.outcome == committed -> 1016 %% committed overrides unclear 1017 OldD#decision{outcome = committed} 1018 end 1019 end. 1020 1021add_remote_decisions(Node, [D | Tail], State) when record(D, decision) -> 1022 State2 = add_remote_decision(Node, D, State), 1023 add_remote_decisions(Node, Tail, State2); 1024 1025add_remote_decisions(Node, [C | Tail], State) 1026 when record(C, transient_decision) -> 1027 D = #decision{tid = C#transient_decision.tid, 1028 outcome = C#transient_decision.outcome, 1029 disc_nodes = [], 1030 ram_nodes = []}, 1031 State2 = add_remote_decision(Node, D, State), 1032 add_remote_decisions(Node, Tail, State2); 1033 1034add_remote_decisions(Node, [{mnesia_down, _, _, _} | Tail], State) -> 1035 add_remote_decisions(Node, Tail, State); 1036 1037add_remote_decisions(Node, [{trans_tid, serial, Serial} | Tail], State) -> 1038 sync_trans_tid_serial(Serial), 1039 case State#state.unclear_decision of 1040 undefined -> 1041 ignored; 1042 D -> 1043 case lists:member(Node, D#decision.ram_nodes) of 1044 true -> 1045 ignore; 1046 false -> 1047 abcast([Node], {what_decision, node(), D}) 1048 end 1049 end, 1050 add_remote_decisions(Node, Tail, State); 1051 1052add_remote_decisions(_Node, [], State) -> 1053 State. 1054 1055add_remote_decision(Node, NewD, State) -> 1056 Tid = NewD#decision.tid, 1057 OldD = decision(Tid), 1058 D = merge_decisions(Node, OldD, NewD), 1059 do_log_decision(D, false), 1060 Outcome = D#decision.outcome, 1061 if 1062 OldD == no_decision -> 1063 ignore; 1064 Outcome == unclear -> 1065 ignore; 1066 true -> 1067 case lists:member(node(), NewD#decision.disc_nodes) or 1068 lists:member(node(), NewD#decision.ram_nodes) of 1069 true -> 1070 tell_im_certain([Node], D); 1071 false -> 1072 ignore 1073 end 1074 end, 1075 case State#state.unclear_decision of 1076 U when U#decision.tid == Tid -> 1077 WaitFor = State#state.unclear_waitfor -- [Node], 1078 if 1079 Outcome == unclear, WaitFor == [] -> 1080 %% Everybody are uncertain, lets abort 1081 NewOutcome = aborted, 1082 CertainD = D#decision{outcome = NewOutcome, 1083 disc_nodes = [], 1084 ram_nodes = []}, 1085 tell_im_certain(D#decision.disc_nodes, CertainD), 1086 tell_im_certain(D#decision.ram_nodes, CertainD), 1087 do_log_decision(CertainD, false), 1088 verbose("Decided to abort transaction ~p " 1089 "since everybody are uncertain ~p~n", 1090 [Tid, CertainD]), 1091 gen_server:reply(State#state.unclear_pid, {ok, NewOutcome}), 1092 State#state{unclear_pid = undefined, 1093 unclear_decision = undefined, 1094 unclear_waitfor = undefined}; 1095 Outcome /= unclear -> 1096 verbose("~p told us that transaction ~p was ~p~n", 1097 [Node, Tid, Outcome]), 1098 gen_server:reply(State#state.unclear_pid, {ok, Outcome}), 1099 State#state{unclear_pid = undefined, 1100 unclear_decision = undefined, 1101 unclear_waitfor = undefined}; 1102 Outcome == unclear -> 1103 State#state{unclear_waitfor = WaitFor} 1104 end; 1105 _ -> 1106 State 1107 end. 1108 1109announce_all([], _Tabs) -> 1110 ok; 1111announce_all(ToNodes, [Tab | Tabs]) -> 1112 case catch mnesia_lib:db_match_object(ram_copies, Tab, '_') of 1113 {'EXIT', _} -> 1114 %% Oops, we are in the middle of a 'garb_decisions' 1115 announce_all(ToNodes, Tabs); 1116 List -> 1117 announce(ToNodes, List, [], false), 1118 announce_all(ToNodes, Tabs) 1119 end; 1120announce_all(_ToNodes, []) -> 1121 ok. 1122 1123announce(ToNodes, [Head | Tail], Acc, ForceSend) -> 1124 Acc2 = arrange(ToNodes, Head, Acc, ForceSend), 1125 announce(ToNodes, Tail, Acc2, ForceSend); 1126 1127announce(_ToNodes, [], Acc, _ForceSend) -> 1128 send_decisions(Acc). 1129 1130send_decisions([{Node, Decisions} | Tail]) -> 1131 abcast([Node], {decisions, node(), Decisions}), 1132 send_decisions(Tail); 1133send_decisions([]) -> 1134 ok. 1135 1136arrange([To | ToNodes], D, Acc, ForceSend) when record(D, decision) -> 1137 NeedsAdd = (ForceSend or 1138 lists:member(To, D#decision.disc_nodes) or 1139 lists:member(To, D#decision.ram_nodes)), 1140 case NeedsAdd of 1141 true -> 1142 Acc2 = add_decision(To, D, Acc), 1143 arrange(ToNodes, D, Acc2, ForceSend); 1144 false -> 1145 arrange(ToNodes, D, Acc, ForceSend) 1146 end; 1147 1148arrange([To | ToNodes], C, Acc, ForceSend) when record(C, transient_decision) -> 1149 Acc2 = add_decision(To, C, Acc), 1150 arrange(ToNodes, C, Acc2, ForceSend); 1151 1152arrange([_To | _ToNodes], {mnesia_down, _Node, _Date, _Time}, Acc, _ForceSend) -> 1153 %% The others have their own info about this 1154 Acc; 1155 1156arrange([_To | _ToNodes], {master_nodes, _Tab, _Nodes}, Acc, _ForceSend) -> 1157 %% The others have their own info about this 1158 Acc; 1159 1160arrange([To | ToNodes], {trans_tid, serial, Serial}, Acc, ForceSend) -> 1161 %% Do the lamport thing plus release the others 1162 %% from uncertainity. 1163 Acc2 = add_decision(To, {trans_tid, serial, Serial}, Acc), 1164 arrange(ToNodes, {trans_tid, serial, Serial}, Acc2, ForceSend); 1165 1166arrange([], _Decision, Acc, _ForceSend) -> 1167 Acc. 1168 1169add_decision(Node, Decision, [{Node, Decisions} | Tail]) -> 1170 [{Node, [Decision | Decisions]} | Tail]; 1171add_decision(Node, Decision, [Head | Tail]) -> 1172 [Head | add_decision(Node, Decision, Tail)]; 1173add_decision(Node, Decision, []) -> 1174 [{Node, [Decision]}]. 1175