1%% 2%% %CopyrightBegin% 3%% 4%% Copyright Ericsson AB 1996-2020. All Rights Reserved. 5%% 6%% Licensed under the Apache License, Version 2.0 (the "License"); 7%% you may not use this file except in compliance with the License. 8%% You may obtain a copy of the License at 9%% 10%% http://www.apache.org/licenses/LICENSE-2.0 11%% 12%% Unless required by applicable law or agreed to in writing, software 13%% distributed under the License is distributed on an "AS IS" BASIS, 14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15%% See the License for the specific language governing permissions and 16%% limitations under the License. 17%% 18%% %CopyrightEnd% 19%% 20 21%% 22-module(mnesia_tm). 23 24-export([ 25 start/0, 26 init/1, 27 non_transaction/5, 28 transaction/6, 29 commit_participant/6, 30 dirty/2, 31 display_info/2, 32 do_update_op/3, 33 get_info/1, 34 get_transactions/0, 35 info/1, 36 mnesia_down/1, 37 prepare_checkpoint/2, 38 prepare_checkpoint/1, % Internal 39 prepare_snmp/3, 40 do_snmp/2, 41 put_activity_id/1, 42 put_activity_id/2, 43 block_tab/1, 44 unblock_tab/1, 45 fixtable/3, 46 new_cr_format/1 47 ]). 48 49%% sys callback functions 50-export([system_continue/3, 51 system_terminate/4, 52 system_code_change/4 53 ]). 54 55-include("mnesia.hrl"). 56 57-import(mnesia_lib, [set/2]). 58-import(mnesia_lib, [fatal/2, verbose/2, dbg_out/2]). 59 60-record(state, {coordinators = gb_trees:empty(), participants = gb_trees:empty(), supervisor, 61 blocked_tabs = [], dirty_queue = [], fixed_tabs = []}). 62%% Format on coordinators is [{Tid, EtsTabList} ..... 63 64-record(prep, {protocol = sym_trans, 65 %% async_dirty | sync_dirty | sym_trans | sync_sym_trans | asym_trans | sync_asym_trans 66 records = [], 67 prev_tab = [], % initiate to a non valid table name 68 prev_types, 69 prev_snmp, 70 types, 71 majority = [], 72 sync = false 73 }). 74 75-record(participant, {tid, pid, commit, disc_nodes = [], 76 ram_nodes = [], protocol = sym_trans}). 77 78start() -> 79 mnesia_monitor:start_proc(?MODULE, ?MODULE, init, [self()]). 80 81init(Parent) -> 82 register(?MODULE, self()), 83 process_flag(trap_exit, true), 84 process_flag(message_queue_data, off_heap), 85 86 %% Initialize the schema 87 IgnoreFallback = mnesia_monitor:get_env(ignore_fallback_at_startup), 88 mnesia_bup:tm_fallback_start(IgnoreFallback), 89 mnesia_schema:init(IgnoreFallback), 90 91 %% Handshake and initialize transaction recovery 92 mnesia_recover:init(), 93 Early = mnesia_monitor:init(), 94 AllOthers = mnesia_lib:uniq(Early ++ mnesia_lib:all_nodes()) -- [node()], 95 set(original_nodes, AllOthers), 96 mnesia_recover:connect_nodes(AllOthers), 97 98 %% Recover transactions, may wait for decision 99 case mnesia_monitor:use_dir() of 100 true -> 101 P = mnesia_dumper:opt_dump_log(startup), % previous log 102 L = mnesia_dumper:opt_dump_log(startup), % latest log 103 Msg = "Initial dump of log during startup: ~p~n", 104 mnesia_lib:verbose(Msg, [[P, L]]), 105 mnesia_log:init(); 106 false -> 107 ignore 108 end, 109 110 mnesia_schema:purge_tmp_files(), 111 mnesia_recover:next_garb(), 112 mnesia_recover:next_check_overload(), 113 114 ?eval_debug_fun({?MODULE, init}, [{nodes, AllOthers}]), 115 116 case val(debug) of 117 Debug when Debug /= debug, Debug /= trace -> 118 ignore; 119 _ -> 120 mnesia_subscr:subscribe(whereis(mnesia_event), {table, schema}) 121 end, 122 proc_lib:init_ack(Parent, {ok, self()}), 123 doit_loop(#state{supervisor = Parent}). 124 125%% Local function in order to avoid external function call 126val(Var) -> 127 case ?catch_val_and_stack(Var) of 128 {'EXIT', Stacktrace} -> mnesia_lib:other_val(Var, Stacktrace); 129 Value -> Value 130 end. 131 132reply({From,Ref}, R) -> 133 From ! {?MODULE, Ref, R}; 134reply(From, R) -> 135 From ! {?MODULE, node(), R}. 136 137reply(From, R, State) -> 138 reply(From, R), 139 doit_loop(State). 140 141req(R) -> 142 case whereis(?MODULE) of 143 undefined -> 144 {error, {node_not_running, node()}}; 145 Pid -> 146 Ref = make_ref(), 147 Pid ! {{self(), Ref}, R}, 148 rec(Pid, Ref) 149 end. 150 151rec() -> 152 rec(whereis(?MODULE)). 153 154rec(Pid) when is_pid(Pid) -> 155 receive 156 {?MODULE, _, Reply} -> 157 Reply; 158 159 {'EXIT', Pid, _} -> 160 {error, {node_not_running, node()}} 161 end; 162rec(undefined) -> 163 {error, {node_not_running, node()}}. 164 165rec(Pid, Ref) -> 166 receive 167 {?MODULE, Ref, Reply} -> 168 Reply; 169 {'EXIT', Pid, _} -> 170 {error, {node_not_running, node()}} 171 end. 172 173tmlink({From, Ref}) when is_reference(Ref) -> 174 link(From); 175tmlink(From) -> 176 link(From). 177tmpid({Pid, _Ref}) when is_pid(Pid) -> 178 Pid; 179tmpid(Pid) -> 180 Pid. 181 182%% Returns a list of participant transaction Tid's 183mnesia_down(Node) -> 184 %% Syncronously call needed in order to avoid 185 %% race with mnesia_tm's coordinator processes 186 %% that may restart and acquire new locks. 187 %% mnesia_monitor takes care of the sync 188 case whereis(?MODULE) of 189 undefined -> 190 mnesia_monitor:mnesia_down(?MODULE, Node); 191 Pid -> 192 Pid ! {mnesia_down, Node}, 193 ok 194 end. 195 196prepare_checkpoint(Nodes, Cp) -> 197 rpc:multicall(Nodes, ?MODULE, prepare_checkpoint, [Cp]). 198 199prepare_checkpoint(Cp) -> 200 req({prepare_checkpoint,Cp}). 201 202block_tab(Tab) -> 203 req({block_tab, Tab}). 204 205unblock_tab(Tab) -> 206 req({unblock_tab, Tab}). 207 208doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor=Sup}=State) -> 209 receive 210 {_From, {async_dirty, Tid, Commit, Tab}} -> 211 case lists:member(Tab, State#state.blocked_tabs) of 212 false -> 213 do_async_dirty(Tid, new_cr_format(Commit), Tab), 214 doit_loop(State); 215 true -> 216 Item = {async_dirty, Tid, new_cr_format(Commit), Tab}, 217 State2 = State#state{dirty_queue = [Item | State#state.dirty_queue]}, 218 doit_loop(State2) 219 end; 220 221 {From, {sync_dirty, Tid, Commit, Tab}} -> 222 case lists:member(Tab, State#state.blocked_tabs) of 223 false -> 224 do_sync_dirty(From, Tid, new_cr_format(Commit), Tab), 225 doit_loop(State); 226 true -> 227 Item = {sync_dirty, From, Tid, new_cr_format(Commit), Tab}, 228 State2 = State#state{dirty_queue = [Item | State#state.dirty_queue]}, 229 doit_loop(State2) 230 end; 231 232 {From, start_outer} -> %% Create and associate ets_tab with Tid 233 try ?ets_new_table(mnesia_trans_store, [bag, public]) of 234 Etab -> 235 tmlink(From), 236 C = mnesia_recover:incr_trans_tid_serial(), 237 ?ets_insert(Etab, {nodes, node()}), 238 Tid = #tid{pid = tmpid(From), counter = C}, 239 A2 = gb_trees:insert(Tid,[Etab],Coordinators), 240 S2 = State#state{coordinators = A2}, 241 reply(From, {new_tid, Tid, Etab}, S2) 242 catch error:Reason -> %% system limit 243 Msg = "Cannot create an ets table for the " 244 "local transaction store", 245 reply(From, {error, {system_limit, Msg, Reason}}, State) 246 end; 247 248 {From, {ask_commit, Protocol, Tid, Commit0, DiscNs, RamNs}} -> 249 ?eval_debug_fun({?MODULE, doit_ask_commit}, 250 [{tid, Tid}, {prot, Protocol}]), 251 mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs), 252 Commit = new_cr_format(Commit0), 253 Pid = 254 if 255 node(Tid#tid.pid) =:= node() -> 256 error({internal_error, local_node}); 257 Protocol =:= asym_trans orelse Protocol =:= sync_asym_trans -> 258 Args = [Protocol, tmpid(From), Tid, Commit, DiscNs, RamNs], 259 spawn_link(?MODULE, commit_participant, Args); 260 true -> %% *_sym_trans 261 reply(From, {vote_yes, Tid}), 262 nopid 263 end, 264 P = #participant{tid = Tid, 265 pid = Pid, 266 commit = Commit, 267 disc_nodes = DiscNs, 268 ram_nodes = RamNs, 269 protocol = Protocol}, 270 State2 = State#state{participants = gb_trees:insert(Tid,P,Participants)}, 271 doit_loop(State2); 272 273 {Tid, do_commit} -> 274 case gb_trees:lookup(Tid, Participants) of 275 none -> 276 verbose("Tried to commit a non participant transaction ~p~n",[Tid]), 277 doit_loop(State); 278 {value, P} -> 279 ?eval_debug_fun({?MODULE,do_commit,pre},[{tid,Tid},{participant,P}]), 280 case P#participant.pid of 281 nopid -> 282 Commit = P#participant.commit, 283 Member = lists:member(node(), P#participant.disc_nodes), 284 if Member == false -> 285 ignore; 286 P#participant.protocol == sym_trans -> 287 mnesia_log:log(Commit); 288 P#participant.protocol == sync_sym_trans -> 289 mnesia_log:slog(Commit) 290 end, 291 mnesia_recover:note_decision(Tid, committed), 292 do_commit(Tid, Commit), 293 if 294 P#participant.protocol == sync_sym_trans -> 295 Tid#tid.pid ! {?MODULE, node(), {committed, Tid}}; 296 true -> 297 ignore 298 end, 299 mnesia_locker:release_tid(Tid), 300 transaction_terminated(Tid), 301 ?eval_debug_fun({?MODULE,do_commit,post},[{tid,Tid},{pid,nopid}]), 302 doit_loop(State#state{participants= 303 gb_trees:delete(Tid,Participants)}); 304 Pid when is_pid(Pid) -> 305 Pid ! {Tid, committed}, 306 ?eval_debug_fun({?MODULE, do_commit, post}, [{tid, Tid}, {pid, Pid}]), 307 doit_loop(State) 308 end 309 end; 310 311 {Tid, simple_commit} -> 312 mnesia_recover:note_decision(Tid, committed), 313 mnesia_locker:release_tid(Tid), 314 transaction_terminated(Tid), 315 doit_loop(State); 316 317 {Tid, {do_abort, Reason}} -> 318 ?eval_debug_fun({?MODULE, do_abort, pre}, [{tid, Tid}]), 319 case gb_trees:lookup(Tid, Participants) of 320 none -> 321 verbose("Tried to abort a non participant transaction ~p: ~tp~n", 322 [Tid, Reason]), 323 mnesia_locker:release_tid(Tid), 324 doit_loop(State); 325 {value, P} -> 326 case P#participant.pid of 327 nopid -> 328 Commit = P#participant.commit, 329 mnesia_recover:note_decision(Tid, aborted), 330 do_abort(Tid, Commit), 331 if 332 P#participant.protocol == sync_sym_trans -> 333 Tid#tid.pid ! {?MODULE, node(), {aborted, Tid}}; 334 true -> 335 ignore 336 end, 337 transaction_terminated(Tid), 338 mnesia_locker:release_tid(Tid), 339 ?eval_debug_fun({?MODULE, do_abort, post}, [{tid, Tid}, {pid, nopid}]), 340 doit_loop(State#state{participants= 341 gb_trees:delete(Tid,Participants)}); 342 Pid when is_pid(Pid) -> 343 Pid ! {Tid, {do_abort, Reason}}, 344 ?eval_debug_fun({?MODULE, do_abort, post}, 345 [{tid, Tid}, {pid, Pid}]), 346 doit_loop(State) 347 end 348 end; 349 350 {From, {add_store, Tid}} -> %% new store for nested transaction 351 try ?ets_new_table(mnesia_trans_store, [bag, public]) of 352 Etab -> 353 A2 = add_coord_store(Coordinators, Tid, Etab), 354 reply(From, {new_store, Etab}, 355 State#state{coordinators = A2}) 356 catch error:Reason -> %% system limit 357 Msg = "Cannot create an ets table for a nested " 358 "local transaction store", 359 reply(From, {error, {system_limit, Msg, Reason}}, State) 360 end; 361 362 {From, {del_store, Tid, Current, Obsolete, PropagateStore}} -> 363 opt_propagate_store(Current, Obsolete, PropagateStore), 364 A2 = del_coord_store(Coordinators, Tid, Current, Obsolete), 365 reply(From, store_erased, State#state{coordinators = A2}); 366 367 {'EXIT', Pid, Reason} -> 368 handle_exit(Pid, Reason, State); 369 370 {From, {restart, Tid, Store}} -> 371 A2 = restore_stores(Coordinators, Tid, Store), 372 clear_fixtable([Store]), 373 ?ets_match_delete(Store, '_'), 374 ?ets_insert(Store, {nodes, node()}), 375 reply(From, {restarted, Tid}, State#state{coordinators = A2}); 376 377 {delete_transaction, Tid} -> 378 %% used to clear transactions which are committed 379 %% in coordinator or participant processes 380 case gb_trees:is_defined(Tid, Participants) of 381 false -> 382 case gb_trees:lookup(Tid, Coordinators) of 383 none -> 384 verbose("** ERROR ** Tried to delete a non transaction ~p~n", 385 [Tid]), 386 doit_loop(State); 387 {value, Etabs} -> 388 clear_fixtable(Etabs), 389 erase_ets_tabs(Etabs), 390 transaction_terminated(Tid), 391 doit_loop(State#state{coordinators = 392 gb_trees:delete(Tid,Coordinators)}) 393 end; 394 true -> 395 transaction_terminated(Tid), 396 State2 = State#state{participants=gb_trees:delete(Tid,Participants)}, 397 doit_loop(State2) 398 end; 399 400 {sync_trans_serial, Tid} -> 401 %% Do the Lamport thing here 402 mnesia_recover:sync_trans_tid_serial(Tid), 403 doit_loop(State); 404 405 {From, info} -> 406 reply(From, {info, gb_trees:values(Participants), 407 gb_trees:to_list(Coordinators)}, State); 408 409 {mnesia_down, N} -> 410 verbose("Got mnesia_down from ~p, reconfiguring...~n", [N]), 411 reconfigure_coordinators(N, gb_trees:to_list(Coordinators)), 412 413 Tids = gb_trees:keys(Participants), 414 reconfigure_participants(N, gb_trees:values(Participants)), 415 NewState = clear_fixtable(N, State), 416 417 mnesia_locker:mnesia_down(N, Tids), 418 mnesia_monitor:mnesia_down(?MODULE, N), 419 doit_loop(NewState); 420 421 {From, {unblock_me, Tab}} -> 422 case lists:member(Tab, State#state.blocked_tabs) of 423 false -> 424 verbose("Wrong dirty Op blocked on ~p ~tp ~p", 425 [node(), Tab, From]), 426 reply(From, unblocked), 427 doit_loop(State); 428 true -> 429 Item = {Tab, unblock_me, From}, 430 State2 = State#state{dirty_queue = [Item | State#state.dirty_queue]}, 431 doit_loop(State2) 432 end; 433 434 {From, {block_tab, Tab}} -> 435 State2 = State#state{blocked_tabs = [Tab | State#state.blocked_tabs]}, 436 reply(From, ok, State2); 437 438 {From, {unblock_tab, Tab}} -> 439 BlockedTabs2 = State#state.blocked_tabs -- [Tab], 440 case lists:member(Tab, BlockedTabs2) of 441 false -> 442 mnesia_controller:unblock_table(Tab), 443 Queue = process_dirty_queue(Tab, State#state.dirty_queue), 444 State2 = State#state{blocked_tabs = BlockedTabs2, 445 dirty_queue = Queue}, 446 reply(From, ok, State2); 447 true -> 448 State2 = State#state{blocked_tabs = BlockedTabs2}, 449 reply(From, ok, State2) 450 end; 451 452 {From, {prepare_checkpoint, Cp}} -> 453 Res = mnesia_checkpoint:tm_prepare(Cp), 454 case Res of 455 {ok, _Name, IgnoreNew, _Node} -> 456 prepare_pending_coordinators(gb_trees:to_list(Coordinators), IgnoreNew), 457 prepare_pending_participants(gb_trees:values(Participants), IgnoreNew); 458 {error, _Reason} -> 459 ignore 460 end, 461 reply(From, Res, State); 462 {From, {fixtable, [Tab,Lock,Requester]}} -> 463 case ?catch_val({Tab, storage_type}) of 464 {'EXIT', _} -> 465 reply(From, error, State); 466 Storage -> 467 mnesia_lib:db_fixtable(Storage,Tab,Lock), 468 NewState = manage_fixtable(Tab,Lock,Requester,State), 469 reply(From, node(), NewState) 470 end; 471 472 {system, From, Msg} -> 473 dbg_out("~p got {system, ~p, ~tp}~n", [?MODULE, From, Msg]), 474 sys:handle_system_msg(Msg, From, Sup, ?MODULE, [], State); 475 476 Msg -> 477 verbose("** ERROR ** ~p got unexpected message: ~tp~n", [?MODULE, Msg]), 478 doit_loop(State) 479 end. 480 481do_sync_dirty(From, Tid, Commit, _Tab) -> 482 ?eval_debug_fun({?MODULE, sync_dirty, pre}, [{tid, Tid}]), 483 Res = do_dirty(Tid, Commit), 484 ?eval_debug_fun({?MODULE, sync_dirty, post}, [{tid, Tid}]), 485 From ! {?MODULE, node(), {dirty_res, Res}}. 486 487do_async_dirty(Tid, Commit, _Tab) -> 488 ?eval_debug_fun({?MODULE, async_dirty, pre}, [{tid, Tid}]), 489 do_dirty(Tid, Commit), 490 ?eval_debug_fun({?MODULE, async_dirty, post}, [{tid, Tid}]). 491 492 493%% Process items in fifo order 494process_dirty_queue(Tab, [Item | Queue]) -> 495 Queue2 = process_dirty_queue(Tab, Queue), 496 case Item of 497 {async_dirty, Tid, Commit, Tab} -> 498 do_async_dirty(Tid, Commit, Tab), 499 Queue2; 500 {sync_dirty, From, Tid, Commit, Tab} -> 501 do_sync_dirty(From, Tid, Commit, Tab), 502 Queue2; 503 {Tab, unblock_me, From} -> 504 reply(From, unblocked), 505 Queue2; 506 _ -> 507 [Item | Queue2] 508 end; 509process_dirty_queue(_Tab, []) -> 510 []. 511 512prepare_pending_coordinators([{Tid, [Store | _Etabs]} | Coords], IgnoreNew) -> 513 try ?ets_lookup(Store, pending) of 514 [] -> 515 prepare_pending_coordinators(Coords, IgnoreNew); 516 [Pending] -> 517 case lists:member(Tid, IgnoreNew) of 518 false -> 519 mnesia_checkpoint:tm_enter_pending(Pending); 520 true -> 521 ignore 522 end, 523 prepare_pending_coordinators(Coords, IgnoreNew) 524 catch error:_ -> 525 prepare_pending_coordinators(Coords, IgnoreNew) 526 end; 527prepare_pending_coordinators([], _IgnoreNew) -> 528 ok. 529 530prepare_pending_participants([Part | Parts], IgnoreNew) -> 531 Tid = Part#participant.tid, 532 D = Part#participant.disc_nodes, 533 R = Part#participant.ram_nodes, 534 case lists:member(Tid, IgnoreNew) of 535 false -> 536 mnesia_checkpoint:tm_enter_pending(Tid, D, R); 537 true -> 538 ignore 539 end, 540 prepare_pending_participants(Parts, IgnoreNew); 541prepare_pending_participants([], _IgnoreNew) -> 542 ok. 543 544handle_exit(Pid, _Reason, State) when node(Pid) /= node() -> 545 %% We got exit from a remote fool 546 doit_loop(State); 547 548handle_exit(Pid, _Reason, State) when Pid == State#state.supervisor -> 549 %% Our supervisor has died, time to stop 550 do_stop(State); 551 552handle_exit(Pid, Reason, State) -> 553 %% Check if it is a coordinator 554 case pid_search_delete(Pid, gb_trees:to_list(State#state.coordinators)) of 555 {none, _} -> 556 %% Check if it is a participant 557 Ps = gb_trees:values(State#state.participants), 558 case mnesia_lib:key_search_delete(Pid,#participant.pid,Ps) of 559 {none, _} -> 560 %% We got exit from a local fool 561 doit_loop(State); 562 {P = #participant{}, _RestP} -> 563 fatal("Participant ~p in transaction ~p died ~tp~n", 564 [P#participant.pid, P#participant.tid, Reason]), 565 NewPs = gb_trees:delete(P#participant.tid,State#state.participants), 566 doit_loop(State#state{participants = NewPs}) 567 end; 568 569 {{Tid, Etabs}, RestC} -> 570 %% A local coordinator has died and 571 %% we must determine the outcome of the 572 %% transaction and tell mnesia_tm on the 573 %% other nodes about it and then recover 574 %% locally. 575 recover_coordinator(Tid, Etabs), 576 doit_loop(State#state{coordinators = RestC}) 577 end. 578 579recover_coordinator(Tid, Etabs) -> 580 verbose("Coordinator ~p in transaction ~p died.~n", [Tid#tid.pid, Tid]), 581 582 Store = hd(Etabs), 583 CheckNodes = get_elements(nodes,Store), 584 TellNodes = CheckNodes -- [node()], 585 try arrange(Tid, Store, async) of 586 {_N, Prep} -> 587 %% Tell the participants about the outcome 588 Protocol = Prep#prep.protocol, 589 Outcome = tell_outcome(Tid, Protocol, node(), CheckNodes, TellNodes), 590 591 %% Recover locally 592 CR = Prep#prep.records, 593 {DiscNs, RamNs} = commit_nodes(CR, [], []), 594 case lists:keysearch(node(), #commit.node, CR) of 595 {value, Local} -> 596 ?eval_debug_fun({?MODULE, recover_coordinator, pre}, 597 [{tid, Tid}, {outcome, Outcome}, {prot, Protocol}]), 598 recover_coordinator(Tid, Protocol, Outcome, Local, DiscNs, RamNs), 599 ?eval_debug_fun({?MODULE, recover_coordinator, post}, 600 [{tid, Tid}, {outcome, Outcome}, {prot, Protocol}]); 601 false -> %% When killed before store havn't been copied to 602 ok %% to the new nested trans store. 603 end 604 catch _:Reason:Stacktrace -> 605 dbg_out("Recovery of coordinator ~p failed: ~tp~n", 606 [Tid, {Reason, Stacktrace}]), 607 Protocol = asym_trans, 608 tell_outcome(Tid, Protocol, node(), CheckNodes, TellNodes) 609 end, 610 erase_ets_tabs(Etabs), 611 transaction_terminated(Tid), 612 mnesia_locker:release_tid(Tid). 613 614recover_coordinator(Tid, sym_trans, committed, Local, _, _) -> 615 mnesia_recover:note_decision(Tid, committed), 616 do_dirty(Tid, Local); 617recover_coordinator(Tid, sym_trans, aborted, _Local, _, _) -> 618 mnesia_recover:note_decision(Tid, aborted); 619recover_coordinator(Tid, sync_sym_trans, committed, Local, _, _) -> 620 mnesia_recover:note_decision(Tid, committed), 621 do_dirty(Tid, Local); 622recover_coordinator(Tid, sync_sym_trans, aborted, _Local, _, _) -> 623 mnesia_recover:note_decision(Tid, aborted); 624 625recover_coordinator(Tid, Protocol, committed, Local, DiscNs, RamNs) 626 when Protocol =:= asym_trans; Protocol =:= sync_asym_trans -> 627 D = #decision{tid = Tid, outcome = committed, 628 disc_nodes = DiscNs, ram_nodes = RamNs}, 629 mnesia_recover:log_decision(D), 630 do_commit(Tid, Local); 631recover_coordinator(Tid, Protocol, aborted, Local, DiscNs, RamNs) 632 when Protocol =:= asym_trans; Protocol =:= sync_asym_trans -> 633 D = #decision{tid = Tid, outcome = aborted, 634 disc_nodes = DiscNs, ram_nodes = RamNs}, 635 mnesia_recover:log_decision(D), 636 do_abort(Tid, Local). 637 638restore_stores(Coords, Tid, Store) -> 639 Etstabs = gb_trees:get(Tid,Coords), 640 Remaining = lists:delete(Store, Etstabs), 641 erase_ets_tabs(Remaining), 642 gb_trees:update(Tid,[Store],Coords). 643 644add_coord_store(Coords, Tid, Etab) -> 645 Stores = gb_trees:get(Tid, Coords), 646 gb_trees:update(Tid, [Etab|Stores], Coords). 647 648del_coord_store(Coords, Tid, Current, Obsolete) -> 649 Stores = gb_trees:get(Tid, Coords), 650 Rest = 651 case Stores of 652 [Obsolete, Current | Tail] -> Tail; 653 [Current, Obsolete | Tail] -> Tail 654 end, 655 ?ets_delete_table(Obsolete), 656 gb_trees:update(Tid, [Current|Rest], Coords). 657 658erase_ets_tabs([H | T]) -> 659 ?ets_delete_table(H), 660 erase_ets_tabs(T); 661erase_ets_tabs([]) -> 662 ok. 663 664%% Clear one transactions all fixtables 665clear_fixtable([Store|_]) -> 666 Fixed = get_elements(fixtable, Store), 667 lists:foreach(fun({Tab,Node}) -> 668 rpc:cast(Node, ?MODULE, fixtable, [Tab,false,self()]) 669 end, Fixed). 670 671%% Clear all fixtable Node have done 672clear_fixtable(Node, State=#state{fixed_tabs = FT0}) -> 673 case mnesia_lib:key_search_delete(Node, 1, FT0) of 674 {none, _Ft} -> 675 State; 676 {{Node,Tabs},FT} -> 677 lists:foreach( 678 fun(Tab) -> 679 case ?catch_val({Tab, storage_type}) of 680 {'EXIT', _} -> 681 ignore; 682 Storage -> 683 mnesia_lib:db_fixtable(Storage,Tab,false) 684 end 685 end, Tabs), 686 State#state{fixed_tabs=FT} 687 end. 688 689manage_fixtable(Tab,true,Requester,State=#state{fixed_tabs = FT0}) -> 690 Node = node(Requester), 691 case mnesia_lib:key_search_delete(Node, 1, FT0) of 692 {none, FT}-> 693 State#state{fixed_tabs=[{Node, [Tab]}|FT]}; 694 {{Node,Tabs},FT} -> 695 State#state{fixed_tabs=[{Node, [Tab|Tabs]}|FT]} 696 end; 697manage_fixtable(Tab,false,Requester,State = #state{fixed_tabs = FT0}) -> 698 Node = node(Requester), 699 case mnesia_lib:key_search_delete(Node, 1, FT0) of 700 {none,_FT} -> State; % Hmm? Safeguard 701 {{Node, Tabs0},FT} -> 702 case lists:delete(Tab, Tabs0) of 703 [] -> State#state{fixed_tabs=FT}; 704 Tabs -> State#state{fixed_tabs=[{Node,Tabs}|FT]} 705 end 706 end. 707 708%% Deletes a pid from a list of participants 709%% or from a gb_trees of coordinators 710%% {none, All} or {Tr, Rest} 711pid_search_delete(Pid, Trs) -> 712 pid_search_delete(Pid, Trs, none, []). 713pid_search_delete(Pid, [Tr = {Tid, _Ts} | Trs], _Val, Ack) when Tid#tid.pid == Pid -> 714 pid_search_delete(Pid, Trs, Tr, Ack); 715pid_search_delete(Pid, [Tr | Trs], Val, Ack) -> 716 pid_search_delete(Pid, Trs, Val, [Tr | Ack]); 717 718pid_search_delete(_Pid, [], Val, Ack) -> 719 {Val, gb_trees:from_orddict(lists:reverse(Ack))}. 720 721transaction_terminated(Tid) -> 722 mnesia_checkpoint:tm_exit_pending(Tid), 723 Pid = Tid#tid.pid, 724 if 725 node(Pid) == node() -> 726 unlink(Pid); 727 true -> %% Do the Lamport thing here 728 mnesia_recover:sync_trans_tid_serial(Tid) 729 end. 730 731%% If there are an surrounding transaction, we inherit it's context 732non_transaction(OldState={_,_,Trans}, Fun, Args, ActivityKind, Mod) 733 when Trans /= non_transaction -> 734 Kind = case ActivityKind of 735 sync_dirty -> sync; 736 _ -> async 737 end, 738 case transaction(OldState, Fun, Args, infinity, Mod, Kind) of 739 {atomic, Res} -> Res; 740 {aborted,Res} -> exit(Res) 741 end; 742non_transaction(OldState, Fun, Args, ActivityKind, Mod) -> 743 Id = {ActivityKind, self()}, 744 NewState = {Mod, Id, non_transaction}, 745 put(mnesia_activity_state, NewState), 746 try apply(Fun, Args) of 747 {'EXIT', Reason} -> exit(Reason); 748 {aborted, Reason} -> mnesia:abort(Reason); 749 Res -> Res 750 catch 751 throw:Throw -> throw(Throw); 752 error:Reason:ST -> exit({Reason, ST}); 753 exit:Reason -> exit(Reason) 754 after 755 case OldState of 756 undefined -> erase(mnesia_activity_state); 757 _ -> put(mnesia_activity_state, OldState) 758 end 759 end. 760 761transaction(OldTidTs, Fun, Args, Retries, Mod, Type) -> 762 Factor = 1, 763 case OldTidTs of 764 undefined -> % Outer 765 execute_outer(Mod, Fun, Args, Factor, Retries, Type); 766 {_, _, non_transaction} -> % Transaction inside ?sync_dirty 767 Res = execute_outer(Mod, Fun, Args, Factor, Retries, Type), 768 put(mnesia_activity_state, OldTidTs), 769 Res; 770 {OldMod, Tid, Ts} -> % Nested 771 execute_inner(Mod, Tid, OldMod, Ts, Fun, Args, Factor, Retries, Type); 772 _ -> % Bad nesting 773 {aborted, nested_transaction} 774 end. 775 776execute_outer(Mod, Fun, Args, Factor, Retries, Type) -> 777 case req(start_outer) of 778 {error, Reason} -> 779 {aborted, Reason}; 780 {new_tid, Tid, Store} -> 781 Ts = #tidstore{store = Store}, 782 NewTidTs = {Mod, Tid, Ts}, 783 put(mnesia_activity_state, NewTidTs), 784 execute_transaction(Fun, Args, Factor, Retries, Type) 785 end. 786 787execute_inner(Mod, Tid, OldMod, Ts, Fun, Args, Factor, Retries, Type) -> 788 case req({add_store, Tid}) of 789 {error, Reason} -> 790 {aborted, Reason}; 791 {new_store, Ets} -> 792 copy_ets(Ts#tidstore.store, Ets), 793 Up = [{OldMod,Ts#tidstore.store} | Ts#tidstore.up_stores], 794 NewTs = Ts#tidstore{level = 1 + Ts#tidstore.level, 795 store = Ets, 796 up_stores = Up}, 797 NewTidTs = {Mod, Tid, NewTs}, 798 put(mnesia_activity_state, NewTidTs), 799 execute_transaction(Fun, Args, Factor, Retries, Type) 800 end. 801 802copy_ets(From, To) -> 803 do_copy_ets(?ets_first(From), From, To). 804do_copy_ets('$end_of_table', _,_) -> 805 ok; 806do_copy_ets(K, From, To) -> 807 Objs = ?ets_lookup(From, K), 808 insert_objs(Objs, To), 809 do_copy_ets(?ets_next(From, K), From, To). 810 811insert_objs([H|T], Tab) -> 812 ?ets_insert(Tab, H), 813 insert_objs(T, Tab); 814insert_objs([], _Tab) -> 815 ok. 816 817execute_transaction(Fun, Args, Factor, Retries, Type) -> 818 try apply_fun(Fun, Args, Type) of 819 {atomic, Value} -> 820 mnesia_lib:incr_counter(trans_commits), 821 erase(mnesia_activity_state), 822 %% no need to clear locks, already done by commit ... 823 %% Flush any un processed mnesia_down messages we might have 824 flush_downs(), 825 ?SAFE(unlink(whereis(?MODULE))), 826 {atomic, Value}; 827 {do_abort, Reason} -> 828 check_exit(Fun, Args, Factor, Retries, {aborted, Reason}, Type); 829 {nested_atomic, Value} -> 830 mnesia_lib:incr_counter(trans_commits), 831 {atomic, Value} 832 catch throw:Value -> %% User called throw 833 Reason = {aborted, {throw, Value}}, 834 return_abort(Fun, Args, Reason); 835 error:Reason:ST -> 836 check_exit(Fun, Args, Factor, Retries, {Reason,ST}, Type); 837 _:Reason -> 838 check_exit(Fun, Args, Factor, Retries, Reason, Type) 839 end. 840 841apply_fun(Fun, Args, Type) -> 842 Result = apply(Fun, Args), 843 case t_commit(Type) of 844 do_commit -> 845 {atomic, Result}; 846 do_commit_nested -> 847 {nested_atomic, Result}; 848 {do_abort, {aborted, Reason}} -> 849 {do_abort, Reason}; 850 {do_abort, _} = Abort -> 851 Abort 852 end. 853 854check_exit(Fun, Args, Factor, Retries, Reason, Type) -> 855 case Reason of 856 {aborted, C = #cyclic{}} -> 857 maybe_restart(Fun, Args, Factor, Retries, Type, C); 858 {aborted, {node_not_running, N}} -> 859 maybe_restart(Fun, Args, Factor, Retries, Type, {node_not_running, N}); 860 {aborted, {bad_commit, N}} -> 861 maybe_restart(Fun, Args, Factor, Retries, Type, {bad_commit, N}); 862 _ -> 863 return_abort(Fun, Args, Reason) 864 end. 865 866maybe_restart(Fun, Args, Factor, Retries, Type, Why) -> 867 {Mod, Tid, Ts} = get(mnesia_activity_state), 868 case try_again(Retries) of 869 yes when Ts#tidstore.level == 1 -> 870 restart(Mod, Tid, Ts, Fun, Args, Factor, Retries, Type, Why); 871 yes -> 872 return_abort(Fun, Args, Why); 873 no -> 874 return_abort(Fun, Args, {aborted, nomore}) 875 end. 876 877try_again(infinity) -> yes; 878try_again(X) when is_number(X) , X > 1 -> yes; 879try_again(_) -> no. 880 881%% We can only restart toplevel transactions. 882%% If a deadlock situation occurs in a nested transaction 883%% The whole thing including all nested transactions need to be 884%% restarted. The stack is thus popped by a consequtive series of 885%% exit({aborted, #cyclic{}}) calls 886 887restart(Mod, Tid, Ts, Fun, Args, Factor0, Retries0, Type, Why) -> 888 mnesia_lib:incr_counter(trans_restarts), 889 Retries = decr(Retries0), 890 case Why of 891 {bad_commit, _N} -> 892 return_abort(Fun, Args, Why), 893 Factor = 1, 894 SleepTime = mnesia_lib:random_time(Factor, Tid#tid.counter), 895 dbg_out("Restarting transaction ~w: in ~wms ~w~n", [Tid, SleepTime, Why]), 896 timer:sleep(SleepTime), 897 execute_outer(Mod, Fun, Args, Factor, Retries, Type); 898 {node_not_running, _N} -> %% Avoids hanging in receive_release_tid_ack 899 return_abort(Fun, Args, Why), 900 Factor = 1, 901 SleepTime = mnesia_lib:random_time(Factor, Tid#tid.counter), 902 dbg_out("Restarting transaction ~w: in ~wms ~w~n", [Tid, SleepTime, Why]), 903 timer:sleep(SleepTime), 904 execute_outer(Mod, Fun, Args, Factor, Retries, Type); 905 _ -> 906 SleepTime = mnesia_lib:random_time(Factor0, Tid#tid.counter), 907 dbg_out("Restarting transaction ~w: in ~wms ~w~n", [Tid, SleepTime, Why]), 908 909 if 910 Factor0 /= 10 -> 911 ignore; 912 true -> 913 %% Our serial may be much larger than other nodes ditto 914 AllNodes = val({current, db_nodes}), 915 verbose("Sync serial ~p~n", [Tid]), 916 rpc:abcast(AllNodes, ?MODULE, {sync_trans_serial, Tid}) 917 end, 918 intercept_friends(Tid, Ts), 919 Store = Ts#tidstore.store, 920 Nodes = get_elements(nodes,Store), 921 ?MODULE ! {self(), {restart, Tid, Store}}, 922 mnesia_locker:send_release_tid(Nodes, Tid), 923 timer:sleep(SleepTime), 924 mnesia_locker:receive_release_tid_acc(Nodes, Tid), 925 case get_restarted(Tid) of 926 {restarted, Tid} -> 927 execute_transaction(Fun, Args, Factor0 + 1, 928 Retries, Type); 929 {error, Reason} -> 930 mnesia:abort(Reason) 931 end 932 end. 933 934get_restarted(Tid) -> 935 case Res = rec() of 936 {restarted, Tid} -> 937 Res; 938 {error,_} -> 939 Res; 940 _ -> %% We could get a couple of aborts to many. 941 get_restarted(Tid) 942 end. 943 944decr(infinity) -> infinity; 945decr(X) when is_integer(X), X > 1 -> X - 1; 946decr(_X) -> 0. 947 948return_abort(Fun, Args, Reason) -> 949 {_Mod, Tid, Ts} = get(mnesia_activity_state), 950 dbg_out("Transaction ~p calling ~tp with ~tp failed: ~n ~tp~n", 951 [Tid, Fun, Args, Reason]), 952 OldStore = Ts#tidstore.store, 953 Nodes = get_elements(nodes, OldStore), 954 intercept_friends(Tid, Ts), 955 ?SAFE(mnesia_lib:incr_counter(trans_failures)), 956 Level = Ts#tidstore.level, 957 if 958 Level == 1 -> 959 mnesia_locker:async_release_tid(Nodes, Tid), 960 ?SAFE(?MODULE ! {delete_transaction, Tid}), 961 erase(mnesia_activity_state), 962 flush_downs(), 963 ?SAFE(unlink(whereis(?MODULE))), 964 {aborted, mnesia_lib:fix_error(Reason)}; 965 true -> 966 %% Nested transaction 967 [{OldMod,NewStore} | Tail] = Ts#tidstore.up_stores, 968 req({del_store, Tid, NewStore, OldStore, true}), 969 Ts2 = Ts#tidstore{store = NewStore, 970 up_stores = Tail, 971 level = Level - 1}, 972 NewTidTs = {OldMod, Tid, Ts2}, 973 put(mnesia_activity_state, NewTidTs), 974 case Reason of 975 #cyclic{} -> 976 exit({aborted, Reason}); 977 {node_not_running, _N} -> 978 exit({aborted, Reason}); 979 {bad_commit, _N}-> 980 exit({aborted, Reason}); 981 _ -> 982 {aborted, mnesia_lib:fix_error(Reason)} 983 end 984 end. 985 986flush_downs() -> 987 receive 988 {?MODULE, _, _} -> flush_downs(); % Votes 989 {mnesia_down, _} -> flush_downs() 990 after 0 -> flushed 991 end. 992 993 994put_activity_id(MTT) -> 995 put_activity_id(MTT, undefined). 996put_activity_id(undefined,_) -> 997 erase_activity_id(); 998put_activity_id({Mod, Tid = #tid{}, Ts = #tidstore{}},Fun) -> 999 flush_downs(), 1000 Store = Ts#tidstore.store, 1001 if 1002 is_function(Fun) -> 1003 ?ets_insert(Store, {friends, {stop,Fun}}); 1004 true -> 1005 ?ets_insert(Store, {friends, self()}) 1006 end, 1007 NewTidTs = {Mod, Tid, Ts}, 1008 put(mnesia_activity_state, NewTidTs); 1009put_activity_id(SimpleState,_) -> 1010 put(mnesia_activity_state, SimpleState). 1011 1012erase_activity_id() -> 1013 flush_downs(), 1014 erase(mnesia_activity_state). 1015 1016get_elements(Type,Store) -> 1017 try ?ets_lookup(Store, Type) of 1018 [] -> []; 1019 [{_,Val}] -> [Val]; 1020 Vals -> [Val|| {_,Val} <- Vals] 1021 catch error:_ -> [] 1022 end. 1023 1024opt_propagate_store(_Current, _Obsolete, false) -> 1025 ok; 1026opt_propagate_store(Current, Obsolete, true) -> 1027 propagate_store(Current, nodes, get_elements(nodes,Obsolete)), 1028 propagate_store(Current, fixtable, get_elements(fixtable,Obsolete)), 1029 propagate_store(Current, friends, get_elements(friends, Obsolete)). 1030 1031propagate_store(Store, Var, [Val | Vals]) -> 1032 ?ets_insert(Store, {Var, Val}), 1033 propagate_store(Store, Var, Vals); 1034propagate_store(_Store, _Var, []) -> 1035 ok. 1036 1037%% Tell all processes that are cooperating with the current transaction 1038intercept_friends(_Tid, Ts) -> 1039 Friends = get_elements(friends,Ts#tidstore.store), 1040 intercept_best_friend(Friends, false). 1041 1042intercept_best_friend([],_) -> ok; 1043intercept_best_friend([{stop,Fun} | R],Ignore) -> 1044 ?CATCH(Fun()), 1045 intercept_best_friend(R,Ignore); 1046intercept_best_friend([Pid | R],false) -> 1047 Pid ! {activity_ended, undefined, self()}, 1048 wait_for_best_friend(Pid, 0), 1049 intercept_best_friend(R,true); 1050intercept_best_friend([_|R],true) -> 1051 intercept_best_friend(R,true). 1052 1053wait_for_best_friend(Pid, Timeout) -> 1054 receive 1055 {'EXIT', Pid, _} -> ok; 1056 {activity_ended, _, Pid} -> ok 1057 after Timeout -> 1058 case erlang:is_process_alive(Pid) of 1059 true -> wait_for_best_friend(Pid, 1000); 1060 false -> ok 1061 end 1062 end. 1063 1064dirty(Protocol, Item) -> 1065 {{Tab, Key}, _Val, _Op} = Item, 1066 Tid = {dirty, self()}, 1067 Prep = prepare_items(Tid, Tab, Key, [Item], #prep{protocol= Protocol}), 1068 CR = Prep#prep.records, 1069 case Protocol of 1070 async_dirty -> 1071 %% Send commit records to the other involved nodes, 1072 %% but do only wait for one node to complete. 1073 %% Preferrably, the local node if possible. 1074 1075 ReadNode = val({Tab, where_to_read}), 1076 {WaitFor, FirstRes} = async_send_dirty(Tid, CR, Tab, ReadNode), 1077 rec_dirty(WaitFor, FirstRes); 1078 1079 sync_dirty -> 1080 %% Send commit records to the other involved nodes, 1081 %% and wait for all nodes to complete 1082 {WaitFor, FirstRes} = sync_send_dirty(Tid, CR, Tab, []), 1083 rec_dirty(WaitFor, FirstRes); 1084 _ -> 1085 mnesia:abort({bad_activity, Protocol}) 1086 end. 1087 1088%% This is the commit function, The first thing it does, 1089%% is to find out which nodes that have been participating 1090%% in this particular transaction, all of the mnesia_locker:lock* 1091%% functions insert the names of the nodes where it aquires locks 1092%% into the local shadow Store 1093%% This function exacutes in the context of the user process 1094t_commit(Type) -> 1095 {_Mod, Tid, Ts} = get(mnesia_activity_state), 1096 Store = Ts#tidstore.store, 1097 if 1098 Ts#tidstore.level == 1 -> 1099 intercept_friends(Tid, Ts), 1100 %% N is number of updates 1101 case arrange(Tid, Store, Type) of 1102 {N, Prep} when N > 0 -> 1103 multi_commit(Prep#prep.protocol, 1104 majority_attr(Prep), 1105 Tid, Prep#prep.records, Store); 1106 {0, Prep} -> 1107 multi_commit(read_only, 1108 majority_attr(Prep), 1109 Tid, Prep#prep.records, Store) 1110 end; 1111 true -> 1112 %% nested commit 1113 Level = Ts#tidstore.level, 1114 [{OldMod,Obsolete} | Tail] = Ts#tidstore.up_stores, 1115 req({del_store, Tid, Store, Obsolete, false}), 1116 NewTs = Ts#tidstore{store = Store, 1117 up_stores = Tail, 1118 level = Level - 1}, 1119 NewTidTs = {OldMod, Tid, NewTs}, 1120 put(mnesia_activity_state, NewTidTs), 1121 do_commit_nested 1122 end. 1123 1124majority_attr(#prep{majority = M}) -> 1125 M. 1126 1127 1128%% This function arranges for all objects we shall write in S to be 1129%% in a list of {Node, CommitRecord} 1130%% Important function for the performance of mnesia. 1131 1132arrange(Tid, Store, Type) -> 1133 %% The local node is always included 1134 Nodes = get_elements(nodes,Store), 1135 Recs = prep_recs(Nodes, []), 1136 Key = ?ets_first(Store), 1137 N = 0, 1138 Prep = 1139 case Type of 1140 async -> #prep{protocol = sym_trans, records = Recs}; 1141 sync -> #prep{protocol = sync_sym_trans, records = Recs} 1142 end, 1143 {New, Prepared} = do_arrange(Tid, Store, Key, Prep, N), 1144 {New, Prepared#prep{records = reverse(Prepared#prep.records)}}. 1145 1146reverse([]) -> 1147 []; 1148reverse([H=#commit{ram_copies=Ram, disc_copies=DC, 1149 disc_only_copies=DOC, ext=Ext} 1150 |R]) -> 1151 [ 1152 H#commit{ 1153 ram_copies = lists:reverse(Ram), 1154 disc_copies = lists:reverse(DC), 1155 disc_only_copies = lists:reverse(DOC), 1156 ext = [{Type, lists:reverse(E)} || {Type,E} <- Ext] 1157 } 1158 | reverse(R)]. 1159 1160prep_recs([N | Nodes], Recs) -> 1161 prep_recs(Nodes, [#commit{decision = presume_commit, node = N} | Recs]); 1162prep_recs([], Recs) -> 1163 Recs. 1164 1165%% storage_types is a list of {Node, Storage} tuples 1166%% where each tuple represents an active replica 1167do_arrange(Tid, Store, {Tab, Key}, Prep, N) -> 1168 Oid = {Tab, Key}, 1169 Items = ?ets_lookup(Store, Oid), %% Store is a bag 1170 P2 = prepare_items(Tid, Tab, Key, Items, Prep), 1171 do_arrange(Tid, Store, ?ets_next(Store, Oid), P2, N + 1); 1172do_arrange(Tid, Store, SchemaKey, Prep, N) when SchemaKey == op -> 1173 Items = ?ets_lookup(Store, SchemaKey), %% Store is a bag 1174 P2 = prepare_schema_items(Tid, Items, Prep), 1175 do_arrange(Tid, Store, ?ets_next(Store, SchemaKey), P2, N + 1); 1176do_arrange(Tid, Store, RestoreKey, Prep, N) when RestoreKey == restore_op -> 1177 [{restore_op, R}] = ?ets_lookup(Store, RestoreKey), 1178 Fun = fun({Tab, Key}, CommitRecs, _RecName, Where, Snmp) -> 1179 Item = [{{Tab, Key}, {Tab, Key}, delete}], 1180 do_prepare_items(Tid, Tab, Key, Where, Snmp, Item, CommitRecs); 1181 (BupRec, CommitRecs, RecName, Where, Snmp) -> 1182 Tab = element(1, BupRec), 1183 Key = element(2, BupRec), 1184 Item = 1185 if 1186 Tab == RecName -> 1187 [{{Tab, Key}, BupRec, write}]; 1188 true -> 1189 BupRec2 = setelement(1, BupRec, RecName), 1190 [{{Tab, Key}, BupRec2, write}] 1191 end, 1192 do_prepare_items(Tid, Tab, Key, Where, Snmp, Item, CommitRecs) 1193 end, 1194 Recs2 = mnesia_schema:arrange_restore(R, Fun, Prep#prep.records), 1195 P2 = Prep#prep{protocol = asym_trans, records = Recs2}, 1196 do_arrange(Tid, Store, ?ets_next(Store, RestoreKey), P2, N + 1); 1197do_arrange(_Tid, _Store, '$end_of_table', Prep, N) -> 1198 case Prep of 1199 #prep{sync=true, protocol=asym_trans} -> 1200 {N, Prep#prep{protocol=sync_asym_trans}}; 1201 _ -> 1202 {N, Prep} 1203 end; 1204do_arrange(Tid, Store, sticky, Prep, N) -> 1205 P2 = Prep#prep{sync=true}, 1206 do_arrange(Tid, Store, ?ets_next(Store, sticky), P2, N); 1207do_arrange(Tid, Store, IgnoredKey, Prep, N) -> %% locks, nodes ... local atoms... 1208 do_arrange(Tid, Store, ?ets_next(Store, IgnoredKey), Prep, N). 1209 1210%% Returns a prep record with all items in reverse order 1211prepare_schema_items(Tid, Items, Prep) -> 1212 Types = [{N, schema_ops} || N <- val({current, db_nodes})], 1213 Recs = prepare_nodes(Tid, Types, Items, Prep#prep.records, schema), 1214 Prep#prep{protocol = asym_trans, records = Recs}. 1215 1216%% Returns a prep record with all items in reverse order 1217prepare_items(Tid, Tab, Key, Items, Prep) when Prep#prep.prev_tab == Tab -> 1218 Types = Prep#prep.prev_types, 1219 Snmp = Prep#prep.prev_snmp, 1220 Recs = Prep#prep.records, 1221 Recs2 = do_prepare_items(Tid, Tab, Key, Types, Snmp, Items, Recs), 1222 Prep#prep{records = Recs2}; 1223 1224prepare_items(Tid, Tab, Key, Items, Prep) -> 1225 Types = val({Tab, where_to_commit}), 1226 case Types of 1227 [] -> mnesia:abort({no_exists, Tab}); 1228 {blocked, _} -> 1229 unblocked = req({unblock_me, Tab}), 1230 prepare_items(Tid, Tab, Key, Items, Prep); 1231 _ -> 1232 Majority = needs_majority(Tab, Prep), 1233 Snmp = val({Tab, snmp}), 1234 Recs2 = do_prepare_items(Tid, Tab, Key, Types, 1235 Snmp, Items, Prep#prep.records), 1236 Prep2 = Prep#prep{records = Recs2, prev_tab = Tab, 1237 majority = Majority, 1238 prev_types = Types, prev_snmp = Snmp}, 1239 check_prep(Prep2, Types) 1240 end. 1241 1242do_prepare_items(Tid, Tab, Key, Types, Snmp, Items, Recs) -> 1243 Recs2 = prepare_snmp(Tid, Tab, Key, Types, Snmp, Items, Recs), % May exit 1244 prepare_nodes(Tid, Types, Items, Recs2, normal). 1245 1246 1247needs_majority(Tab, #prep{majority = M}) -> 1248 case lists:keymember(Tab, 1, M) of 1249 true -> 1250 M; 1251 false -> 1252 case ?catch_val({Tab, majority}) of 1253 {'EXIT', _} -> 1254 M; 1255 false -> 1256 M; 1257 true -> 1258 CopyHolders = val({Tab, all_nodes}), 1259 [{Tab, CopyHolders} | M] 1260 end 1261 end. 1262 1263have_majority([], _) -> 1264 ok; 1265have_majority([{Tab, AllNodes} | Rest], Nodes) -> 1266 case mnesia_lib:have_majority(Tab, AllNodes, Nodes) of 1267 true -> 1268 have_majority(Rest, Nodes); 1269 false -> 1270 {error, Tab} 1271 end. 1272 1273prepare_snmp(Tab, Key, Items) -> 1274 case val({Tab, snmp}) of 1275 [] -> 1276 []; 1277 Ustruct when Key /= '_' -> 1278 {_Oid, _Val, Op} = hd(Items), 1279 %% Still making snmp oid (not used) because we want to catch errors here 1280 %% And also it keeps backwards comp. with old nodes. 1281 SnmpOid = mnesia_snmp_hook:key_to_oid(Tab, Key, Ustruct), % May exit 1282 [{Op, Tab, Key, SnmpOid}]; 1283 _ -> 1284 [{clear_table, Tab}] 1285 end. 1286 1287prepare_snmp(_Tid, _Tab, _Key, _Types, [], _Items, Recs) -> 1288 Recs; 1289 1290prepare_snmp(Tid, Tab, Key, Types, Us, Items, Recs) -> 1291 if Key /= '_' -> 1292 {_Oid, _Val, Op} = hd(Items), 1293 SnmpOid = mnesia_snmp_hook:key_to_oid(Tab, Key, Us), % May exit 1294 prepare_nodes(Tid, Types, [{Op, Tab, Key, SnmpOid}], Recs, snmp); 1295 Key == '_' -> 1296 prepare_nodes(Tid, Types, [{clear_table, Tab}], Recs, snmp) 1297 end. 1298 1299check_prep(#prep{majority = [], types = Types} = Prep, Types) -> 1300 Prep; 1301check_prep(#prep{majority = M, types = undefined} = Prep, Types) -> 1302 Protocol = if M == [] -> 1303 Prep#prep.protocol; 1304 true -> 1305 asym_trans 1306 end, 1307 Prep#prep{protocol = Protocol, types = Types}; 1308check_prep(Prep, _Types) -> 1309 Prep#prep{protocol = asym_trans}. 1310 1311%% Returns a list of commit records 1312prepare_nodes(Tid, [{Node, Storage} | Rest], Items, C, Kind) -> 1313 {Rec, C2} = pick_node(Tid, Node, C, []), 1314 Rec2 = prepare_node(Node, Storage, Items, Rec, Kind), 1315 [Rec2 | prepare_nodes(Tid, Rest, Items, C2, Kind)]; 1316prepare_nodes(_Tid, [], _Items, CommitRecords, _Kind) -> 1317 CommitRecords. 1318 1319pick_node(Tid, Node, [Rec | Rest], Done) -> 1320 if 1321 Rec#commit.node == Node -> 1322 {Rec, Done ++ Rest}; 1323 true -> 1324 pick_node(Tid, Node, Rest, [Rec | Done]) 1325 end; 1326pick_node({dirty,_}, Node, [], Done) -> 1327 {#commit{decision = presume_commit, node = Node}, Done}; 1328pick_node(_Tid, Node, [], _Done) -> 1329 mnesia:abort({bad_commit, {missing_lock, Node}}). 1330 1331prepare_node(Node, Storage, [Item | Items], #commit{ext=Ext0}=Rec, Kind) when Kind == snmp -> 1332 Rec2 = case lists:keytake(snmp, 1, Ext0) of 1333 false -> 1334 Rec#commit{ext = [{snmp,[Item]}|Ext0]}; 1335 {_, {snmp,Snmp},Ext} -> 1336 Rec#commit{ext = [{snmp,[Item|Snmp]}|Ext]} 1337 end, 1338 prepare_node(Node, Storage, Items, Rec2, Kind); 1339prepare_node(Node, Storage, [Item | Items], Rec, Kind) when Kind /= schema -> 1340 Rec2 = 1341 case Storage of 1342 ram_copies -> 1343 Rec#commit{ram_copies = [Item | Rec#commit.ram_copies]}; 1344 disc_copies -> 1345 Rec#commit{disc_copies = [Item | Rec#commit.disc_copies]}; 1346 disc_only_copies -> 1347 Rec#commit{disc_only_copies = 1348 [Item | Rec#commit.disc_only_copies]}; 1349 {ext, Alias, Mod} -> 1350 Ext0 = Rec#commit.ext, 1351 case lists:keytake(ext_copies, 1, Ext0) of 1352 false -> 1353 Rec#commit{ext = [{ext_copies, [{{ext,Alias,Mod}, Item}]}|Ext0]}; 1354 {_,{_,EC},Ext} -> 1355 Rec#commit{ext = [{ext_copies, [{{ext,Alias,Mod}, Item}|EC]}|Ext]} 1356 end 1357 end, 1358 prepare_node(Node, Storage, Items, Rec2, Kind); 1359prepare_node(_Node, _Storage, Items, Rec, Kind) 1360 when Kind == schema, Rec#commit.schema_ops == [] -> 1361 Rec#commit{schema_ops = Items}; 1362prepare_node(_Node, _Storage, [], Rec, _Kind) -> 1363 Rec. 1364 1365%% multi_commit((Protocol, Tid, CommitRecords, Store) 1366%% Local work is always performed in users process 1367multi_commit(read_only, _Maj = [], Tid, CR, _Store) -> 1368 %% This featherweight commit protocol is used when no 1369 %% updates has been performed in the transaction. 1370 1371 {DiscNs, RamNs} = commit_nodes(CR, [], []), 1372 Msg = {Tid, simple_commit}, 1373 rpc:abcast(DiscNs -- [node()], ?MODULE, Msg), 1374 rpc:abcast(RamNs -- [node()], ?MODULE, Msg), 1375 mnesia_recover:note_decision(Tid, committed), 1376 mnesia_locker:release_tid(Tid), 1377 ?MODULE ! {delete_transaction, Tid}, 1378 do_commit; 1379 1380multi_commit(sym_trans, _Maj = [], Tid, CR, Store) -> 1381 %% This lightweight commit protocol is used when all 1382 %% the involved tables are replicated symetrically. 1383 %% Their storage types must match on each node. 1384 %% 1385 %% 1 Ask the other involved nodes if they want to commit 1386 %% All involved nodes votes yes if they are up 1387 %% 2a Somebody has voted no 1388 %% Tell all yes voters to do_abort 1389 %% 2b Everybody has voted yes 1390 %% Tell everybody to do_commit. I.e. that they should 1391 %% prepare the commit, log the commit record and 1392 %% perform the updates. 1393 %% 1394 %% The outcome is kept 3 minutes in the transient decision table. 1395 %% 1396 %% Recovery: 1397 %% If somebody dies before the coordinator has 1398 %% broadcasted do_commit, the transaction is aborted. 1399 %% 1400 %% If a participant dies, the table load algorithm 1401 %% ensures that the contents of the involved tables 1402 %% are picked from another node. 1403 %% 1404 %% If the coordinator dies, each participants checks 1405 %% the outcome with all the others. If all are uncertain 1406 %% about the outcome, the transaction is aborted. If 1407 %% somebody knows the outcome the others will follow. 1408 1409 {DiscNs, RamNs} = commit_nodes(CR, [], []), 1410 Pending = mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs), 1411 ?ets_insert(Store, Pending), 1412 1413 {WaitFor, Local} = ask_commit(sym_trans, Tid, CR, DiscNs, RamNs), 1414 {Outcome, []} = rec_all(WaitFor, Tid, do_commit, []), 1415 ?eval_debug_fun({?MODULE, multi_commit_sym}, 1416 [{tid, Tid}, {outcome, Outcome}]), 1417 rpc:abcast(DiscNs -- [node()], ?MODULE, {Tid, Outcome}), 1418 rpc:abcast(RamNs -- [node()], ?MODULE, {Tid, Outcome}), 1419 case Outcome of 1420 do_commit -> 1421 mnesia_recover:note_decision(Tid, committed), 1422 do_dirty(Tid, Local), 1423 mnesia_locker:release_tid(Tid), 1424 ?MODULE ! {delete_transaction, Tid}; 1425 {do_abort, _Reason} -> 1426 mnesia_recover:note_decision(Tid, aborted) 1427 end, 1428 ?eval_debug_fun({?MODULE, multi_commit_sym, post}, 1429 [{tid, Tid}, {outcome, Outcome}]), 1430 Outcome; 1431 1432multi_commit(sync_sym_trans, _Maj = [], Tid, CR, Store) -> 1433 %% This protocol is the same as sym_trans except that it 1434 %% uses syncronized calls to disk_log and syncronized commits 1435 %% when several nodes are involved. 1436 1437 {DiscNs, RamNs} = commit_nodes(CR, [], []), 1438 Pending = mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs), 1439 ?ets_insert(Store, Pending), 1440 1441 {WaitFor, Local} = ask_commit(sync_sym_trans, Tid, CR, DiscNs, RamNs), 1442 {Outcome, []} = rec_all(WaitFor, Tid, do_commit, []), 1443 ?eval_debug_fun({?MODULE, multi_commit_sym_sync}, 1444 [{tid, Tid}, {outcome, Outcome}]), 1445 [?ets_insert(Store, {waiting_for_commit_ack, Node}) || Node <- WaitFor], 1446 rpc:abcast(DiscNs -- [node()], ?MODULE, {Tid, Outcome}), 1447 rpc:abcast(RamNs -- [node()], ?MODULE, {Tid, Outcome}), 1448 case Outcome of 1449 do_commit -> 1450 mnesia_recover:note_decision(Tid, committed), 1451 mnesia_log:slog(Local), 1452 do_commit(Tid, Local), 1453 %% Just wait for completion result is ignore. 1454 rec_all(WaitFor, Tid, ignore, []), 1455 mnesia_locker:release_tid(Tid), 1456 ?MODULE ! {delete_transaction, Tid}; 1457 {do_abort, _Reason} -> 1458 mnesia_recover:note_decision(Tid, aborted) 1459 end, 1460 ?eval_debug_fun({?MODULE, multi_commit_sym, post}, 1461 [{tid, Tid}, {outcome, Outcome}]), 1462 Outcome; 1463 1464multi_commit(Protocol, Majority, Tid, CR, Store) 1465 when Protocol =:= asym_trans; Protocol =:= sync_asym_trans -> 1466 %% This more expensive commit protocol is used when 1467 %% table definitions are changed (schema transactions). 1468 %% It is also used when the involved tables are 1469 %% replicated asymetrically. If the storage type differs 1470 %% on at least one node this protocol is used. 1471 %% 1472 %% 1 Ask the other involved nodes if they want to commit. 1473 %% All involved nodes prepares the commit, logs a presume_abort 1474 %% commit record and votes yes or no depending of the 1475 %% outcome of the prepare. The preparation is also performed 1476 %% by the coordinator. 1477 %% 1478 %% 2a Somebody has died or voted no 1479 %% Tell all yes voters to do_abort 1480 %% 2b Everybody has voted yes 1481 %% Put a unclear marker in the log. 1482 %% Tell the others to pre_commit. I.e. that they should 1483 %% put a unclear marker in the log and reply 1484 %% acc_pre_commit when they are done. 1485 %% 1486 %% 3a Somebody died 1487 %% Tell the remaining participants to do_abort 1488 %% 3b Everybody has replied acc_pre_commit 1489 %% Tell everybody to committed. I.e that they should 1490 %% put a committed marker in the log, perform the updates 1491 %% and reply done_commit when they are done. The coordinator 1492 %% must wait with putting his committed marker inte the log 1493 %% until the committed has been sent to all the others. 1494 %% Then he performs local commit before collecting replies. 1495 %% 1496 %% 4 Everybody has either died or replied done_commit 1497 %% Return to the caller. 1498 %% 1499 %% Recovery: 1500 %% If the coordinator dies, the participants (and 1501 %% the coordinator when he starts again) must do 1502 %% the following: 1503 %% 1504 %% If we have no unclear marker in the log we may 1505 %% safely abort, since we know that nobody may have 1506 %% decided to commit yet. 1507 %% 1508 %% If we have a committed marker in the log we may 1509 %% safely commit since we know that everybody else 1510 %% also will come to this conclusion. 1511 %% 1512 %% If we have a unclear marker but no committed 1513 %% in the log we are uncertain about the real outcome 1514 %% of the transaction and must ask the others before 1515 %% we can decide what to do. If someone knows the 1516 %% outcome we will do the same. If nobody knows, we 1517 %% will wait for the remaining involved nodes to come 1518 %% up. When all involved nodes are up and uncertain, 1519 %% we decide to commit (first put a committed marker 1520 %% in the log, then do the updates). 1521 1522 D = #decision{tid = Tid, outcome = presume_abort}, 1523 {D2, CR2} = commit_decision(D, CR, [], []), 1524 DiscNs = D2#decision.disc_nodes, 1525 RamNs = D2#decision.ram_nodes, 1526 case have_majority(Majority, DiscNs ++ RamNs) of 1527 ok -> ok; 1528 {error, Tab} -> mnesia:abort({no_majority, Tab}) 1529 end, 1530 Pending = mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs), 1531 ?ets_insert(Store, Pending), 1532 {WaitFor, Local} = ask_commit(Protocol, Tid, CR2, DiscNs, RamNs), 1533 SchemaPrep = ?CATCH(mnesia_schema:prepare_commit(Tid, Local, {coord, WaitFor})), 1534 {Votes, Pids} = rec_all(WaitFor, Tid, do_commit, []), 1535 1536 ?eval_debug_fun({?MODULE, multi_commit_asym_got_votes}, 1537 [{tid, Tid}, {votes, Votes}]), 1538 case Votes of 1539 do_commit -> 1540 case SchemaPrep of 1541 {_Modified, C = #commit{}, DumperMode} -> 1542 mnesia_log:log(C), % C is not a binary 1543 ?eval_debug_fun({?MODULE, multi_commit_asym_log_commit_rec}, 1544 [{tid, Tid}]), 1545 1546 D3 = C#commit.decision, 1547 D4 = D3#decision{outcome = unclear}, 1548 mnesia_recover:log_decision(D4), 1549 ?eval_debug_fun({?MODULE, multi_commit_asym_log_commit_dec}, 1550 [{tid, Tid}]), 1551 tell_participants(Pids, {Tid, pre_commit}), 1552 %% Now we are uncertain and we do not know 1553 %% if all participants have logged that 1554 %% they are uncertain or not 1555 rec_acc_pre_commit(Pids, Tid, Store, {C,Local}, 1556 do_commit, DumperMode, [], []); 1557 {'EXIT', Reason} -> 1558 %% The others have logged the commit 1559 %% record but they are not uncertain 1560 mnesia_recover:note_decision(Tid, aborted), 1561 ?eval_debug_fun({?MODULE, multi_commit_asym_prepare_exit}, 1562 [{tid, Tid}]), 1563 tell_participants(Pids, {Tid, {do_abort, Reason}}), 1564 do_abort(Tid, Local), 1565 {do_abort, Reason} 1566 end; 1567 1568 {do_abort, Reason} -> 1569 %% The others have logged the commit 1570 %% record but they are not uncertain 1571 mnesia_recover:note_decision(Tid, aborted), 1572 ?eval_debug_fun({?MODULE, multi_commit_asym_do_abort}, [{tid, Tid}]), 1573 tell_participants(Pids, {Tid, {do_abort, Reason}}), 1574 do_abort(Tid, Local), 1575 {do_abort, Reason} 1576 end. 1577 1578%% Returns do_commit or {do_abort, Reason} 1579rec_acc_pre_commit([Pid | Tail], Tid, Store, Commit, Res, DumperMode, 1580 GoodPids, AckPids) -> 1581 receive 1582 {?MODULE, _, {acc_pre_commit, Tid, Pid, true}} -> 1583 rec_acc_pre_commit(Tail, Tid, Store, Commit, Res, DumperMode, 1584 [Pid | GoodPids], [Pid | AckPids]); 1585 1586 {?MODULE, _, {acc_pre_commit, Tid, Pid, false}} -> 1587 rec_acc_pre_commit(Tail, Tid, Store, Commit, Res, DumperMode, 1588 [Pid | GoodPids], AckPids); 1589 1590 {?MODULE, _, {acc_pre_commit, Tid, Pid}} -> 1591 %% Kept for backwards compatibility. Remove after Mnesia 4.x 1592 rec_acc_pre_commit(Tail, Tid, Store, Commit, Res, DumperMode, 1593 [Pid | GoodPids], [Pid | AckPids]); 1594 {?MODULE, _, {do_abort, Tid, Pid, _Reason}} -> 1595 AbortRes = {do_abort, {bad_commit, node(Pid)}}, 1596 rec_acc_pre_commit(Tail, Tid, Store, Commit, AbortRes, DumperMode, 1597 GoodPids, AckPids); 1598 {mnesia_down, Node} when Node == node(Pid) -> 1599 AbortRes = {do_abort, {bad_commit, Node}}, 1600 ?SAFE(Pid ! {Tid, AbortRes}), %% Tell him that he has died 1601 rec_acc_pre_commit(Tail, Tid, Store, Commit, AbortRes, DumperMode, 1602 GoodPids, AckPids) 1603 end; 1604rec_acc_pre_commit([], Tid, Store, {Commit,OrigC}, Res, DumperMode, GoodPids, AckPids) -> 1605 D = Commit#commit.decision, 1606 case Res of 1607 do_commit -> 1608 %% Now everybody knows that the others 1609 %% has voted yes. We also know that 1610 %% everybody are uncertain. 1611 prepare_sync_schema_commit(Store, AckPids), 1612 tell_participants(GoodPids, {Tid, committed}), 1613 D2 = D#decision{outcome = committed}, 1614 mnesia_recover:log_decision(D2), 1615 ?eval_debug_fun({?MODULE, rec_acc_pre_commit_log_commit}, 1616 [{tid, Tid}]), 1617 1618 %% Now we have safely logged committed 1619 %% and we can recover without asking others 1620 do_commit(Tid, Commit, DumperMode), 1621 ?eval_debug_fun({?MODULE, rec_acc_pre_commit_done_commit}, 1622 [{tid, Tid}]), 1623 sync_schema_commit(Tid, Store, AckPids), 1624 mnesia_locker:release_tid(Tid), 1625 ?MODULE ! {delete_transaction, Tid}; 1626 1627 {do_abort, Reason} -> 1628 tell_participants(GoodPids, {Tid, {do_abort, Reason}}), 1629 D2 = D#decision{outcome = aborted}, 1630 mnesia_recover:log_decision(D2), 1631 ?eval_debug_fun({?MODULE, rec_acc_pre_commit_log_abort}, 1632 [{tid, Tid}]), 1633 do_abort(Tid, OrigC), 1634 ?eval_debug_fun({?MODULE, rec_acc_pre_commit_done_abort}, 1635 [{tid, Tid}]) 1636 end, 1637 Res. 1638 1639%% Note all nodes in case of mnesia_down mgt 1640%% sync_schema_commit is (ab)used for sync_asym_trans as well. 1641prepare_sync_schema_commit(_Store, []) -> 1642 ok; 1643prepare_sync_schema_commit(Store, [Pid | Pids]) -> 1644 ?ets_insert(Store, {waiting_for_commit_ack, node(Pid)}), 1645 prepare_sync_schema_commit(Store, Pids). 1646 1647sync_schema_commit(_Tid, _Store, []) -> 1648 ok; 1649sync_schema_commit(Tid, Store, [Pid | Tail]) -> 1650 receive 1651 {?MODULE, _, {schema_commit, Tid, Pid}} -> 1652 ?ets_match_delete(Store, {waiting_for_commit_ack, node(Pid)}), 1653 sync_schema_commit(Tid, Store, Tail); 1654 1655 {mnesia_down, Node} when Node == node(Pid) -> 1656 ?ets_match_delete(Store, {waiting_for_commit_ack, Node}), 1657 sync_schema_commit(Tid, Store, Tail) 1658 end. 1659 1660tell_participants([Pid | Pids], Msg) -> 1661 Pid ! Msg, 1662 tell_participants(Pids, Msg); 1663tell_participants([], _Msg) -> 1664 ok. 1665 1666-spec commit_participant(_, _, _, _, _, _) -> no_return(). 1667%% Trap exit because we can get a shutdown from application manager 1668commit_participant(Protocol, Coord, Tid, Bin, DiscNs, RamNs) when is_binary(Bin) -> 1669 process_flag(trap_exit, true), 1670 Commit = binary_to_term(Bin), 1671 commit_participant(Protocol, Coord, Tid, Bin, Commit, DiscNs, RamNs); 1672commit_participant(Protocol, Coord, Tid, C = #commit{}, DiscNs, RamNs) -> 1673 process_flag(trap_exit, true), 1674 commit_participant(Protocol, Coord, Tid, C, C, DiscNs, RamNs). 1675 1676commit_participant(Protocol, Coord, Tid, Bin, C0, DiscNs, _RamNs) -> 1677 ?eval_debug_fun({?MODULE, commit_participant, pre}, [{tid, Tid}]), 1678 try mnesia_schema:prepare_commit(Tid, C0, {part, Coord}) of 1679 {Modified, C = #commit{}, DumperMode} -> 1680 %% If we cannot find any local unclear decision 1681 %% we should presume abort at startup recovery 1682 case lists:member(node(), DiscNs) of 1683 false -> 1684 ignore; 1685 true -> 1686 case Modified of 1687 false -> mnesia_log:log(Bin); 1688 true -> mnesia_log:log(C) 1689 end 1690 end, 1691 ?eval_debug_fun({?MODULE, commit_participant, vote_yes}, 1692 [{tid, Tid}]), 1693 reply(Coord, {vote_yes, Tid, self()}), 1694 1695 receive 1696 {Tid, pre_commit} -> 1697 D = C#commit.decision, 1698 mnesia_recover:log_decision(D#decision{outcome = unclear}), 1699 ?eval_debug_fun({?MODULE, commit_participant, pre_commit}, 1700 [{tid, Tid}]), 1701 ExpectAck = C#commit.schema_ops /= [] 1702 orelse Protocol =:= sync_asym_trans, 1703 reply(Coord, {acc_pre_commit, Tid, self(), ExpectAck}), 1704 1705 %% Now we are vulnerable for failures, since 1706 %% we cannot decide without asking others 1707 receive 1708 {Tid, committed} -> 1709 mnesia_recover:log_decision(D#decision{outcome = committed}), 1710 ?eval_debug_fun({?MODULE, commit_participant, log_commit}, 1711 [{tid, Tid}]), 1712 do_commit(Tid, C, DumperMode), 1713 case ExpectAck of 1714 false -> ignore; 1715 true -> reply(Coord, {schema_commit, Tid, self()}) 1716 end, 1717 ?eval_debug_fun({?MODULE, commit_participant, do_commit}, 1718 [{tid, Tid}]); 1719 1720 {Tid, {do_abort, _Reason}} -> 1721 mnesia_recover:log_decision(D#decision{outcome = aborted}), 1722 ?eval_debug_fun({?MODULE, commit_participant, log_abort}, 1723 [{tid, Tid}]), 1724 mnesia_schema:undo_prepare_commit(Tid, C0), 1725 ?eval_debug_fun({?MODULE, commit_participant, undo_prepare}, 1726 [{tid, Tid}]); 1727 1728 {'EXIT', _MnesiaTM, Reason} -> 1729 reply(Coord, {do_abort, Tid, self(), {bad_commit,Reason}}), 1730 mnesia_recover:log_decision(D#decision{outcome = aborted}), 1731 mnesia_schema:undo_prepare_commit(Tid, C0); 1732 1733 Msg -> 1734 verbose("** ERROR ** commit_participant ~p, got unexpected msg: ~tp~n", 1735 [Tid, Msg]) 1736 end; 1737 {Tid, {do_abort, Reason}} -> 1738 reply(Coord, {do_abort, Tid, self(), Reason}), 1739 mnesia_schema:undo_prepare_commit(Tid, C0), 1740 ?eval_debug_fun({?MODULE, commit_participant, pre_commit_undo_prepare}, 1741 [{tid, Tid}]); 1742 1743 {'EXIT', _, Reason} -> 1744 reply(Coord, {do_abort, Tid, self(), {bad_commit,Reason}}), 1745 mnesia_schema:undo_prepare_commit(Tid, C0), 1746 ?eval_debug_fun({?MODULE, commit_participant, pre_commit_undo_prepare}, [{tid, Tid}]); 1747 1748 Msg -> 1749 reply(Coord, {do_abort, Tid, self(), {bad_commit,internal}}), 1750 verbose("** ERROR ** commit_participant ~p, got unexpected msg: ~tp~n", 1751 [Tid, Msg]) 1752 end 1753 catch _:Reason -> 1754 ?eval_debug_fun({?MODULE, commit_participant, vote_no}, 1755 [{tid, Tid}]), 1756 reply(Coord, {vote_no, Tid, Reason}), 1757 mnesia_schema:undo_prepare_commit(Tid, C0) 1758 end, 1759 mnesia_locker:release_tid(Tid), 1760 ?MODULE ! {delete_transaction, Tid}, 1761 unlink(whereis(?MODULE)), 1762 exit(normal). 1763 1764do_abort(Tid, Bin) when is_binary(Bin) -> 1765 %% Possible optimization: 1766 %% If we want we could pass arround a flag 1767 %% that tells us whether the binary contains 1768 %% schema ops or not. Only if the binary 1769 %% contains schema ops there are meningful 1770 %% unpack the binary and perform 1771 %% mnesia_schema:undo_prepare_commit/1. 1772 do_abort(Tid, binary_to_term(Bin)); 1773do_abort(Tid, Commit) -> 1774 mnesia_schema:undo_prepare_commit(Tid, Commit), 1775 Commit. 1776 1777do_dirty(Tid, Commit) when Commit#commit.schema_ops == [] -> 1778 mnesia_log:log(Commit), 1779 do_commit(Tid, Commit). 1780 1781%% do_commit(Tid, CommitRecord) 1782do_commit(Tid, Bin) when is_binary(Bin) -> 1783 do_commit(Tid, binary_to_term(Bin)); 1784do_commit(Tid, C) -> 1785 do_commit(Tid, C, optional). 1786 1787do_commit(Tid, Bin, DumperMode) when is_binary(Bin) -> 1788 do_commit(Tid, binary_to_term(Bin), DumperMode); 1789do_commit(Tid, C, DumperMode) -> 1790 mnesia_dumper:update(Tid, C#commit.schema_ops, DumperMode), 1791 R = do_snmp(Tid, proplists:get_value(snmp, C#commit.ext, [])), 1792 R2 = do_update(Tid, ram_copies, C#commit.ram_copies, R), 1793 R3 = do_update(Tid, disc_copies, C#commit.disc_copies, R2), 1794 R4 = do_update(Tid, disc_only_copies, C#commit.disc_only_copies, R3), 1795 R5 = do_update_ext(Tid, C#commit.ext, R4), 1796 mnesia_subscr:report_activity(Tid), 1797 R5. 1798 1799%% This could/should be optimized 1800do_update_ext(_Tid, [], OldRes) -> OldRes; 1801do_update_ext(Tid, Ext, OldRes) -> 1802 case lists:keyfind(ext_copies, 1, Ext) of 1803 false -> OldRes; 1804 {_, Ops} -> 1805 Do = fun({{ext, _,_} = Storage, Op}, R) -> 1806 do_update(Tid, Storage, [Op], R) 1807 end, 1808 lists:foldl(Do, OldRes, Ops) 1809 end. 1810 1811%% Update the items 1812do_update(Tid, Storage, [Op | Ops], OldRes) -> 1813 try do_update_op(Tid, Storage, Op) of 1814 ok -> do_update(Tid, Storage, Ops, OldRes); 1815 NewRes -> do_update(Tid, Storage, Ops, NewRes) 1816 catch _:Reason:ST -> 1817 %% This may only happen when we recently have 1818 %% deleted our local replica, changed storage_type 1819 %% or transformed table 1820 %% BUGBUG: Updates may be lost if storage_type is changed. 1821 %% Determine actual storage type and try again. 1822 %% BUGBUG: Updates may be lost if table is transformed. 1823 verbose("do_update in ~w failed: ~tp -> {'EXIT', ~tp}~n", 1824 [Tid, Op, {Reason, ST}]), 1825 do_update(Tid, Storage, Ops, OldRes) 1826 end; 1827do_update(_Tid, _Storage, [], Res) -> 1828 Res. 1829 1830do_update_op(Tid, Storage, {{Tab, K}, Obj, write}) -> 1831 commit_write(?catch_val({Tab, commit_work}), Tid, Storage, 1832 Tab, K, Obj, undefined), 1833 mnesia_lib:db_put(Storage, Tab, Obj); 1834 1835do_update_op(Tid, Storage, {{Tab, K}, Val, delete}) -> 1836 commit_delete(?catch_val({Tab, commit_work}), Tid, Storage, Tab, K, Val, undefined), 1837 mnesia_lib:db_erase(Storage, Tab, K); 1838 1839do_update_op(Tid, Storage, {{Tab, K}, {RecName, Incr}, update_counter}) -> 1840 {NewObj, OldObjs} = 1841 try 1842 NewVal = mnesia_lib:db_update_counter(Storage, Tab, K, Incr), 1843 true = is_integer(NewVal) andalso (NewVal >= 0), 1844 {{RecName, K, NewVal}, [{RecName, K, NewVal - Incr}]} 1845 catch error:_ when Incr > 0 -> 1846 New = {RecName, K, Incr}, 1847 mnesia_lib:db_put(Storage, Tab, New), 1848 {New, []}; 1849 error:_ -> 1850 Zero = {RecName, K, 0}, 1851 mnesia_lib:db_put(Storage, Tab, Zero), 1852 {Zero, []} 1853 end, 1854 commit_update(?catch_val({Tab, commit_work}), Tid, Storage, Tab, 1855 K, NewObj, OldObjs), 1856 element(3, NewObj); 1857 1858do_update_op(Tid, Storage, {{Tab, Key}, Obj, delete_object}) -> 1859 commit_del_object(?catch_val({Tab, commit_work}), 1860 Tid, Storage, Tab, Key, Obj), 1861 mnesia_lib:db_match_erase(Storage, Tab, Obj); 1862 1863do_update_op(Tid, Storage, {{Tab, Key}, Obj, clear_table}) -> 1864 commit_clear(?catch_val({Tab, commit_work}), Tid, Storage, Tab, Key, Obj), 1865 mnesia_lib:db_match_erase(Storage, Tab, Obj). 1866 1867commit_write([], _, _, _, _, _, _) -> ok; 1868commit_write([{checkpoints, CpList}|R], Tid, Storage, Tab, K, Obj, Old) -> 1869 mnesia_checkpoint:tm_retain(Tid, Tab, K, write, CpList), 1870 commit_write(R, Tid, Storage, Tab, K, Obj, Old); 1871commit_write([H|R], Tid, Storage, Tab, K, Obj, Old) 1872 when element(1, H) == subscribers -> 1873 mnesia_subscr:report_table_event(H, Tab, Tid, Obj, write, Old), 1874 commit_write(R, Tid, Storage, Tab, K, Obj, Old); 1875commit_write([H|R], Tid, Storage, Tab, K, Obj, Old) 1876 when element(1, H) == index -> 1877 mnesia_index:add_index(H, Storage, Tab, K, Obj, Old), 1878 commit_write(R, Tid, Storage, Tab, K, Obj, Old). 1879 1880commit_update([], _, _, _, _, _, _) -> ok; 1881commit_update([{checkpoints, CpList}|R], Tid, Storage, Tab, K, Obj, _) -> 1882 Old = mnesia_checkpoint:tm_retain(Tid, Tab, K, write, CpList), 1883 commit_update(R, Tid, Storage, Tab, K, Obj, Old); 1884commit_update([H|R], Tid, Storage, Tab, K, Obj, Old) 1885 when element(1, H) == subscribers -> 1886 mnesia_subscr:report_table_event(H, Tab, Tid, Obj, write, Old), 1887 commit_update(R, Tid, Storage, Tab, K, Obj, Old); 1888commit_update([H|R], Tid,Storage, Tab, K, Obj, Old) 1889 when element(1, H) == index -> 1890 mnesia_index:add_index(H, Storage, Tab, K, Obj, Old), 1891 commit_update(R, Tid, Storage, Tab, K, Obj, Old). 1892 1893commit_delete([], _, _, _, _, _, _) -> ok; 1894commit_delete([{checkpoints, CpList}|R], Tid, Storage, Tab, K, Obj, _) -> 1895 Old = mnesia_checkpoint:tm_retain(Tid, Tab, K, delete, CpList), 1896 commit_delete(R, Tid, Storage, Tab, K, Obj, Old); 1897commit_delete([H|R], Tid, Storage, Tab, K, Obj, Old) 1898 when element(1, H) == subscribers -> 1899 mnesia_subscr:report_table_event(H, Tab, Tid, Obj, delete, Old), 1900 commit_delete(R, Tid, Storage, Tab, K, Obj, Old); 1901commit_delete([H|R], Tid, Storage, Tab, K, Obj, Old) 1902 when element(1, H) == index -> 1903 mnesia_index:delete_index(H, Storage, Tab, K), 1904 commit_delete(R, Tid, Storage, Tab, K, Obj, Old). 1905 1906commit_del_object([], _, _, _, _, _) -> ok; 1907commit_del_object([{checkpoints, CpList}|R], Tid, Storage, Tab, K, Obj) -> 1908 mnesia_checkpoint:tm_retain(Tid, Tab, K, delete_object, CpList), 1909 commit_del_object(R, Tid, Storage, Tab, K, Obj); 1910commit_del_object([H|R], Tid, Storage, Tab, K, Obj) when element(1, H) == subscribers -> 1911 mnesia_subscr:report_table_event(H, Tab, Tid, Obj, delete_object), 1912 commit_del_object(R, Tid, Storage, Tab, K, Obj); 1913commit_del_object([H|R], Tid, Storage, Tab, K, Obj) when element(1, H) == index -> 1914 mnesia_index:del_object_index(H, Storage, Tab, K, Obj), 1915 commit_del_object(R, Tid, Storage, Tab, K, Obj). 1916 1917commit_clear([], _, _, _, _, _) -> ok; 1918commit_clear([{checkpoints, CpList}|R], Tid, Storage, Tab, K, Obj) -> 1919 mnesia_checkpoint:tm_retain(Tid, Tab, K, clear_table, CpList), 1920 commit_clear(R, Tid, Storage, Tab, K, Obj); 1921commit_clear([H|R], Tid, Storage, Tab, K, Obj) 1922 when element(1, H) == subscribers -> 1923 mnesia_subscr:report_table_event(H, Tab, Tid, Obj, clear_table, undefined), 1924 commit_clear(R, Tid, Storage, Tab, K, Obj); 1925commit_clear([H|R], Tid, Storage, Tab, K, Obj) 1926 when element(1, H) == index -> 1927 mnesia_index:clear_index(H, Tab, K, Obj), 1928 commit_clear(R, Tid, Storage, Tab, K, Obj). 1929 1930do_snmp(_, []) -> ok; 1931do_snmp(Tid, [Head|Tail]) -> 1932 try mnesia_snmp_hook:update(Head) 1933 catch _:Reason:ST -> 1934 %% This should only happen when we recently have 1935 %% deleted our local replica or recently deattached 1936 %% the snmp table 1937 verbose("do_snmp in ~w failed: ~tp -> {'EXIT', ~tp}~n", 1938 [Tid, Head, {Reason, ST}]) 1939 end, 1940 do_snmp(Tid, Tail). 1941 1942commit_nodes([C | Tail], AccD, AccR) -> 1943 case C of 1944 #commit{disc_copies=[], disc_only_copies=[], schema_ops=[], ext=Ext} -> 1945 case lists:keyfind(ext_copies, 1, Ext) of 1946 false -> commit_nodes(Tail, AccD, [C#commit.node | AccR]); 1947 _ -> commit_nodes(Tail, [C#commit.node | AccD], AccR) 1948 end; 1949 _ -> 1950 commit_nodes(Tail, [C#commit.node | AccD], AccR) 1951 end; 1952commit_nodes([], AccD, AccR) -> 1953 {AccD, AccR}. 1954 1955commit_decision(D, [C | Tail], AccD, AccR) -> 1956 N = C#commit.node, 1957 {D2, Tail2} = 1958 case C of 1959 #commit{disc_copies=[], disc_only_copies=[], schema_ops=[], ext=Ext} -> 1960 case lists:keyfind(ext_copies, 1, Ext) of 1961 false -> commit_decision(D, Tail, AccD, [N | AccR]); 1962 _ -> commit_decision(D, Tail, [N | AccD], AccR) 1963 end; 1964 #commit{schema_ops=[]} -> 1965 commit_decision(D, Tail, [N | AccD], AccR); 1966 #commit{schema_ops=Ops} -> 1967 case ram_only_ops(N, Ops) of 1968 true -> commit_decision(D, Tail, AccD, [N | AccR]); 1969 false -> commit_decision(D, Tail, [N | AccD], AccR) 1970 end 1971 end, 1972 {D2, [C#commit{decision = D2} | Tail2]}; 1973commit_decision(D, [], AccD, AccR) -> 1974 {D#decision{disc_nodes = AccD, ram_nodes = AccR}, []}. 1975 1976ram_only_ops(N, [{op, change_table_copy_type, N, _FromS, _ToS, Cs} | _Ops ]) -> 1977 case lists:member({name, schema}, Cs) of 1978 true -> 1979 %% We always use disk if change type of the schema 1980 false; 1981 false -> 1982 not lists:member(N, val({schema, disc_copies})) 1983 end; 1984 1985ram_only_ops(N, _Ops) -> 1986 not lists:member(N, val({schema, disc_copies})). 1987 1988%% Returns {WaitFor, Res} 1989sync_send_dirty(Tid, [Head | Tail], Tab, WaitFor) -> 1990 Node = Head#commit.node, 1991 if 1992 Node == node() -> 1993 {WF, _} = sync_send_dirty(Tid, Tail, Tab, WaitFor), 1994 Res = do_dirty(Tid, Head), 1995 {WF, Res}; 1996 true -> 1997 {?MODULE, Node} ! {self(), {sync_dirty, Tid, Head, Tab}}, 1998 sync_send_dirty(Tid, Tail, Tab, [Node | WaitFor]) 1999 end; 2000sync_send_dirty(_Tid, [], _Tab, WaitFor) -> 2001 {WaitFor, {'EXIT', {aborted, {node_not_running, WaitFor}}}}. 2002 2003%% Returns {WaitFor, Res} 2004async_send_dirty(_Tid, _Nodes, Tab, nowhere) -> 2005 {[], {'EXIT', {aborted, {no_exists, Tab}}}}; 2006async_send_dirty(Tid, Nodes, Tab, ReadNode) -> 2007 async_send_dirty(Tid, Nodes, Tab, ReadNode, [], ok). 2008 2009async_send_dirty(Tid, [Head | Tail], Tab, ReadNode, WaitFor, Res) -> 2010 Node = Head#commit.node, 2011 if 2012 ReadNode == Node, Node == node() -> 2013 NewRes = do_dirty(Tid, Head), 2014 async_send_dirty(Tid, Tail, Tab, ReadNode, WaitFor, NewRes); 2015 ReadNode == Node -> 2016 {?MODULE, Node} ! {self(), {sync_dirty, Tid, Head, Tab}}, 2017 NewRes = {'EXIT', {aborted, {node_not_running, Node}}}, 2018 async_send_dirty(Tid, Tail, Tab, ReadNode, [Node | WaitFor], NewRes); 2019 true -> 2020 {?MODULE, Node} ! {self(), {async_dirty, Tid, Head, Tab}}, 2021 async_send_dirty(Tid, Tail, Tab, ReadNode, WaitFor, Res) 2022 end; 2023async_send_dirty(_Tid, [], _Tab, _ReadNode, WaitFor, Res) -> 2024 {WaitFor, Res}. 2025 2026rec_dirty([Node | Tail], Res) when Node /= node() -> 2027 NewRes = get_dirty_reply(Node, Res), 2028 rec_dirty(Tail, NewRes); 2029rec_dirty([], Res) -> 2030 Res. 2031 2032get_dirty_reply(Node, Res) -> 2033 receive 2034 {?MODULE, Node, {'EXIT', Reason}} -> 2035 {'EXIT', {aborted, {badarg, Reason}}}; 2036 {?MODULE, Node, {dirty_res, ok}} -> 2037 case Res of 2038 {'EXIT', {aborted, {node_not_running, _Node}}} -> 2039 ok; 2040 _ -> 2041 %% Prioritize bad results, but node_not_running 2042 Res 2043 end; 2044 {?MODULE, Node, {dirty_res, Reply}} -> 2045 Reply; 2046 {mnesia_down, Node} -> 2047 case get(mnesia_activity_state) of 2048 {_, Tid, _Ts} when element(1,Tid) == tid -> 2049 %% Hmm dirty called inside a transaction, to avoid 2050 %% hanging transaction we need to restart the transaction 2051 mnesia:abort({node_not_running, Node}); 2052 _ -> 2053 %% It's ok to ignore mnesia_down's since we will make 2054 %% the replicas consistent again when Node is started 2055 Res 2056 end 2057 after 1000 -> 2058 case lists:member(Node, val({current, db_nodes})) of 2059 true -> 2060 get_dirty_reply(Node, Res); 2061 false -> 2062 Res 2063 end 2064 end. 2065 2066%% Assume that CommitRecord is no binary 2067%% Return {Res, Pids} 2068ask_commit(Protocol, Tid, CR, DiscNs, RamNs) -> 2069 ask_commit(Protocol, Tid, CR, DiscNs, RamNs, [], no_local). 2070 2071ask_commit(Protocol, Tid, [Head | Tail], DiscNs, RamNs, WaitFor, Local) -> 2072 Node = Head#commit.node, 2073 if 2074 Node == node() -> 2075 ask_commit(Protocol, Tid, Tail, DiscNs, RamNs, WaitFor, Head); 2076 true -> 2077 Msg = {ask_commit, convert_old(Protocol, Node), Tid, Head, DiscNs, RamNs}, 2078 {?MODULE, Node} ! {self(), Msg}, 2079 ask_commit(Protocol, Tid, Tail, DiscNs, RamNs, [Node | WaitFor], Local) 2080 end; 2081ask_commit(_Protocol, _Tid, [], _DiscNs, _RamNs, WaitFor, Local) -> 2082 {WaitFor, Local}. 2083 2084convert_old(sync_asym_trans, Node) -> 2085 case ?catch_val({protocol, Node}) of 2086 {{8,3}, _} -> asym_trans; 2087 _ -> sync_asym_trans 2088 end; 2089convert_old(Protocol, _) -> 2090 Protocol. 2091 2092new_cr_format(#commit{ext=[]}=Cr) -> Cr; 2093new_cr_format(#commit{ext=[{_,_}|_]}=Cr) -> Cr; 2094new_cr_format(#commit{ext=Snmp}=Cr) -> 2095 Cr#commit{ext=[{snmp,Snmp}]}. 2096 2097rec_all([Node | Tail], Tid, Res, Pids) -> 2098 receive 2099 {?MODULE, Node, {vote_yes, Tid}} -> 2100 rec_all(Tail, Tid, Res, Pids); 2101 {?MODULE, Node, {vote_yes, Tid, Pid}} -> 2102 rec_all(Tail, Tid, Res, [Pid | Pids]); 2103 {?MODULE, Node, {vote_no, Tid, Reason}} -> 2104 rec_all(Tail, Tid, {do_abort, Reason}, Pids); 2105 {?MODULE, Node, {committed, Tid}} -> 2106 rec_all(Tail, Tid, Res, Pids); 2107 {?MODULE, Node, {aborted, Tid}} -> 2108 rec_all(Tail, Tid, Res, Pids); 2109 2110 {mnesia_down, Node} -> 2111 %% Make sure that mnesia_tm knows it has died 2112 %% it may have been restarted 2113 Abort = {do_abort, {bad_commit, Node}}, 2114 ?SAFE({?MODULE, Node} ! {Tid, Abort}), 2115 rec_all(Tail, Tid, Abort, Pids) 2116 end; 2117rec_all([], _Tid, Res, Pids) -> 2118 {Res, Pids}. 2119 2120get_transactions() -> 2121 {info, Participant, Coordinator} = req(info), 2122 lists:map(fun({Tid, _Tabs}) -> 2123 Status = tr_status(Tid,Participant), 2124 {Tid#tid.counter, Tid#tid.pid, Status} 2125 end,Coordinator). 2126 2127tr_status(Tid,Participant) -> 2128 case lists:keymember(Tid, 1, Participant) of 2129 true -> participant; 2130 false -> coordinator 2131 end. 2132 2133get_info(Timeout) -> 2134 case whereis(?MODULE) of 2135 undefined -> 2136 {timeout, Timeout}; 2137 Pid -> 2138 Pid ! {self(), info}, 2139 receive 2140 {?MODULE, _, {info, Part, Coord}} -> 2141 {info, Part, Coord} 2142 after Timeout -> 2143 {timeout, Timeout} 2144 end 2145 end. 2146 2147display_info(Stream, {timeout, T}) -> 2148 io:format(Stream, "---> No info about coordinator and participant transactions, " 2149 "timeout ~p <--- ~n", [T]); 2150 2151display_info(Stream, {info, Part, Coord}) -> 2152 io:format(Stream, "---> Participant transactions <--- ~n", []), 2153 lists:foreach(fun(P) -> pr_participant(Stream, P) end, Part), 2154 io:format(Stream, "---> Coordinator transactions <---~n", []), 2155 lists:foreach(fun({Tid, _Tabs}) -> pr_tid(Stream, Tid) end, Coord). 2156 2157pr_participant(Stream, P) -> 2158 Commit0 = P#participant.commit, 2159 Commit = 2160 if 2161 is_binary(Commit0) -> binary_to_term(Commit0); 2162 true -> Commit0 2163 end, 2164 pr_tid(Stream, P#participant.tid), 2165 io:format(Stream, "with participant objects ~tp~n", [Commit]). 2166 2167 2168pr_tid(Stream, Tid) -> 2169 io:format(Stream, "Tid: ~p (owned by ~p) ~n", 2170 [Tid#tid.counter, Tid#tid.pid]). 2171 2172info(Serial) -> 2173 io:format( "Info about transaction with serial == ~p~n", [Serial]), 2174 {info, Participant, Trs} = req(info), 2175 search_pr_participant(Serial, Participant), 2176 search_pr_coordinator(Serial, Trs). 2177 2178 2179search_pr_coordinator(_S, []) -> no; 2180search_pr_coordinator(S, [{Tid, _Ts}|Tail]) -> 2181 case Tid#tid.counter of 2182 S -> 2183 io:format( "Tid is coordinator, owner == \n", []), 2184 display_pid_info(Tid#tid.pid), 2185 search_pr_coordinator(S, Tail); 2186 _ -> 2187 search_pr_coordinator(S, Tail) 2188 end. 2189 2190search_pr_participant(_S, []) -> 2191 false; 2192search_pr_participant(S, [ P | Tail]) -> 2193 Tid = P#participant.tid, 2194 Commit0 = P#participant.commit, 2195 if 2196 Tid#tid.counter == S -> 2197 io:format( "Tid is participant to commit, owner == \n", []), 2198 Pid = Tid#tid.pid, 2199 display_pid_info(Pid), 2200 io:format( "Tid wants to write objects \n",[]), 2201 Commit = 2202 if 2203 is_binary(Commit0) -> binary_to_term(Commit0); 2204 true -> Commit0 2205 end, 2206 2207 io:format("~tp~n", [Commit]), 2208 search_pr_participant(S,Tail); %% !!!!! 2209 true -> 2210 search_pr_participant(S, Tail) 2211 end. 2212 2213display_pid_info(Pid) -> 2214 case rpc:pinfo(Pid) of 2215 undefined -> 2216 io:format( "Dead process \n"); 2217 Info -> 2218 Call = fetch(initial_call, Info), 2219 Curr = case fetch(current_function, Info) of 2220 {Mod,F,Args} when is_list(Args) -> 2221 {Mod,F,length(Args)}; 2222 Other -> 2223 Other 2224 end, 2225 Reds = fetch(reductions, Info), 2226 LM = fetch(message_queue_len, Info), 2227 pformat(io_lib:format("~p", [Pid]), 2228 io_lib:format("~tp", [Call]), 2229 io_lib:format("~tp", [Curr]), Reds, LM) 2230 end. 2231 2232pformat(A1, A2, A3, A4, A5) -> 2233 io:format( "~-12s ~-21ts ~-21ts ~9w ~4w~n", [A1,A2,A3,A4,A5]). 2234 2235fetch(Key, Info) -> 2236 case lists:keysearch(Key, 1, Info) of 2237 {value, {_, Val}} -> 2238 Val; 2239 _ -> 2240 0 2241 end. 2242 2243 2244%%%%%%%%%%%%%%%%%%%% 2245%%%%%%%%%%%%%%%%%%%%% reconfigure stuff comes here ...... 2246%%%%%%%%%%%%%%%%%%%%% 2247 2248reconfigure_coordinators(N, [{Tid, [Store | _]} | Coordinators]) -> 2249 case mnesia_recover:outcome(Tid, unknown) of 2250 committed -> 2251 WaitingNodes = ?ets_lookup(Store, waiting_for_commit_ack), 2252 case lists:keymember(N, 2, WaitingNodes) of 2253 false -> 2254 ignore; % avoid spurious mnesia_down messages 2255 true -> 2256 send_mnesia_down(Tid, Store, N) 2257 end; 2258 _ -> 2259 %% Tell the coordinator about the mnesia_down 2260 send_mnesia_down(Tid, Store, N) 2261 end, 2262 reconfigure_coordinators(N, Coordinators); 2263reconfigure_coordinators(_N, []) -> 2264 ok. 2265 2266send_mnesia_down(Tid, Store, Node) -> 2267 Msg = {mnesia_down, Node}, 2268 send_to_pids([Tid#tid.pid | get_elements(friends,Store)], Msg). 2269 2270send_to_pids([Pid | Pids], Msg) when is_pid(Pid) -> 2271 Pid ! Msg, 2272 send_to_pids(Pids, Msg); 2273send_to_pids([_ | Pids], Msg) -> 2274 send_to_pids(Pids, Msg); 2275send_to_pids([], _Msg) -> 2276 ok. 2277 2278reconfigure_participants(N, [P | Tail]) -> 2279 case lists:member(N, P#participant.disc_nodes) or 2280 lists:member(N, P#participant.ram_nodes) of 2281 false -> 2282 %% Ignore, since we are not a participant 2283 %% in the transaction. 2284 reconfigure_participants(N, Tail); 2285 2286 true -> 2287 %% We are on a participant node, lets 2288 %% check if the dead one was a 2289 %% participant or a coordinator. 2290 Tid = P#participant.tid, 2291 if 2292 node(Tid#tid.pid) /= N -> 2293 %% Another participant node died. Ignore. 2294 reconfigure_participants(N, Tail); 2295 2296 true -> 2297 %% The coordinator node has died and 2298 %% we must determine the outcome of the 2299 %% transaction and tell mnesia_tm on all 2300 %% nodes (including the local node) about it 2301 verbose("Coordinator ~p in transaction ~p died~n", 2302 [Tid#tid.pid, Tid]), 2303 2304 Nodes = P#participant.disc_nodes ++ 2305 P#participant.ram_nodes, 2306 AliveNodes = Nodes -- [N], 2307 Protocol = P#participant.protocol, 2308 tell_outcome(Tid, Protocol, N, AliveNodes, AliveNodes), 2309 reconfigure_participants(N, Tail) 2310 end 2311 end; 2312reconfigure_participants(_, []) -> 2313 []. 2314 2315%% We need to determine the outcome of the transaction and 2316%% tell mnesia_tm on all involved nodes (including the local node) 2317%% about the outcome. 2318tell_outcome(Tid, Protocol, Node, CheckNodes, TellNodes) -> 2319 Outcome = mnesia_recover:what_happened(Tid, proto(Protocol), CheckNodes), 2320 case Outcome of 2321 aborted -> 2322 rpc:abcast(TellNodes, ?MODULE, {Tid,{do_abort, {mnesia_down, Node}}}); 2323 committed -> 2324 rpc:abcast(TellNodes, ?MODULE, {Tid, do_commit}) 2325 end, 2326 Outcome. 2327 2328proto(sync_asym_trans) -> asym_trans; 2329proto(Proto) -> Proto. 2330 2331do_stop(#state{coordinators = Coordinators}) -> 2332 Msg = {mnesia_down, node()}, 2333 lists:foreach(fun({Tid, _}) -> Tid#tid.pid ! Msg end, gb_trees:to_list(Coordinators)), 2334 mnesia_checkpoint:stop(), 2335 mnesia_log:stop(), 2336 exit(shutdown). 2337 2338fixtable(Tab, Lock, Me) -> 2339 case req({fixtable, [Tab,Lock,Me]}) of 2340 error -> 2341 exit({no_exists, Tab}); 2342 Else -> 2343 Else 2344 end. 2345 2346%%%%%%%%%%%%%%%%%%%%%%%%%%% 2347%% System upgrade 2348 2349system_continue(_Parent, _Debug, State) -> 2350 doit_loop(State). 2351 2352-spec system_terminate(_, _, _, _) -> no_return(). 2353system_terminate(_Reason, _Parent, _Debug, State) -> 2354 do_stop(State). 2355 2356system_code_change(State=#state{coordinators=Cs0,participants=Ps0},_Module,_OldVsn,downgrade) -> 2357 case is_tuple(Cs0) of 2358 true -> 2359 Cs = gb_trees:to_list(Cs0), 2360 Ps = gb_trees:values(Ps0), 2361 {ok, State#state{coordinators=Cs,participants=Ps}}; 2362 false -> 2363 {ok, State} 2364 end; 2365 2366system_code_change(State=#state{coordinators=Cs0,participants=Ps0},_Module,_OldVsn,_Extra) -> 2367 case is_list(Cs0) of 2368 true -> 2369 Cs = gb_trees:from_orddict(lists:sort(Cs0)), 2370 Ps1 = [{P#participant.tid,P}|| P <- Ps0], 2371 Ps = gb_trees:from_orddict(lists:sort(Ps1)), 2372 {ok, State#state{coordinators=Cs,participants=Ps}}; 2373 false -> 2374 {ok, State} 2375 end. 2376