1%%
2%% %CopyrightBegin%
3%%
4%% Copyright Ericsson AB 1996-2020. All Rights Reserved.
5%%
6%% Licensed under the Apache License, Version 2.0 (the "License");
7%% you may not use this file except in compliance with the License.
8%% You may obtain a copy of the License at
9%%
10%%     http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing, software
13%% distributed under the License is distributed on an "AS IS" BASIS,
14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15%% See the License for the specific language governing permissions and
16%% limitations under the License.
17%%
18%% %CopyrightEnd%
19%%
20
21%%
22-module(mnesia_tm).
23
24-export([
25	 start/0,
26	 init/1,
27	 non_transaction/5,
28	 transaction/6,
29	 commit_participant/6,
30	 dirty/2,
31	 display_info/2,
32	 do_update_op/3,
33	 get_info/1,
34	 get_transactions/0,
35	 info/1,
36	 mnesia_down/1,
37	 prepare_checkpoint/2,
38	 prepare_checkpoint/1, % Internal
39	 prepare_snmp/3,
40	 do_snmp/2,
41	 put_activity_id/1,
42	 put_activity_id/2,
43	 block_tab/1,
44	 unblock_tab/1,
45	 fixtable/3,
46	 new_cr_format/1
47	]).
48
49%% sys callback functions
50-export([system_continue/3,
51	 system_terminate/4,
52	 system_code_change/4
53	]).
54
55-include("mnesia.hrl").
56
57-import(mnesia_lib, [set/2]).
58-import(mnesia_lib, [fatal/2, verbose/2, dbg_out/2]).
59
60-record(state, {coordinators = gb_trees:empty(), participants = gb_trees:empty(), supervisor,
61		blocked_tabs = [], dirty_queue = [], fixed_tabs = []}).
62%% Format on coordinators is [{Tid, EtsTabList} .....
63
64-record(prep, {protocol = sym_trans,
65	       %% async_dirty | sync_dirty | sym_trans | sync_sym_trans | asym_trans | sync_asym_trans
66	       records = [],
67	       prev_tab = [], % initiate to a non valid table name
68	       prev_types,
69	       prev_snmp,
70	       types,
71	       majority = [],
72               sync = false
73	      }).
74
75-record(participant, {tid, pid, commit, disc_nodes = [],
76		      ram_nodes = [], protocol = sym_trans}).
77
78start() ->
79    mnesia_monitor:start_proc(?MODULE, ?MODULE, init, [self()]).
80
81init(Parent) ->
82    register(?MODULE, self()),
83    process_flag(trap_exit, true),
84    process_flag(message_queue_data, off_heap),
85
86    %% Initialize the schema
87    IgnoreFallback = mnesia_monitor:get_env(ignore_fallback_at_startup),
88    mnesia_bup:tm_fallback_start(IgnoreFallback),
89    mnesia_schema:init(IgnoreFallback),
90
91    %% Handshake and initialize transaction recovery
92    mnesia_recover:init(),
93    Early = mnesia_monitor:init(),
94    AllOthers = mnesia_lib:uniq(Early ++ mnesia_lib:all_nodes()) -- [node()],
95    set(original_nodes, AllOthers),
96    mnesia_recover:connect_nodes(AllOthers),
97
98    %% Recover transactions, may wait for decision
99    case mnesia_monitor:use_dir() of
100	true ->
101	    P = mnesia_dumper:opt_dump_log(startup), % previous log
102	    L = mnesia_dumper:opt_dump_log(startup), % latest log
103	    Msg = "Initial dump of log during startup: ~p~n",
104	    mnesia_lib:verbose(Msg, [[P, L]]),
105	    mnesia_log:init();
106	false ->
107	    ignore
108    end,
109
110    mnesia_schema:purge_tmp_files(),
111    mnesia_recover:next_garb(),
112    mnesia_recover:next_check_overload(),
113
114    ?eval_debug_fun({?MODULE, init},  [{nodes, AllOthers}]),
115
116    case val(debug) of
117	Debug when Debug /= debug, Debug /= trace ->
118	    ignore;
119	_ ->
120	    mnesia_subscr:subscribe(whereis(mnesia_event), {table, schema})
121    end,
122    proc_lib:init_ack(Parent, {ok, self()}),
123    doit_loop(#state{supervisor = Parent}).
124
125%% Local function in order to avoid external function call
126val(Var) ->
127    case ?catch_val_and_stack(Var) of
128	{'EXIT', Stacktrace} -> mnesia_lib:other_val(Var, Stacktrace);
129	Value -> Value
130    end.
131
132reply({From,Ref}, R) ->
133    From ! {?MODULE, Ref, R};
134reply(From, R) ->
135    From ! {?MODULE, node(), R}.
136
137reply(From, R, State) ->
138    reply(From, R),
139    doit_loop(State).
140
141req(R) ->
142    case whereis(?MODULE) of
143	undefined ->
144	    {error, {node_not_running, node()}};
145	Pid ->
146	    Ref = make_ref(),
147	    Pid ! {{self(), Ref}, R},
148	    rec(Pid, Ref)
149    end.
150
151rec() ->
152    rec(whereis(?MODULE)).
153
154rec(Pid) when is_pid(Pid) ->
155    receive
156	{?MODULE, _, Reply} ->
157	    Reply;
158
159	{'EXIT', Pid, _} ->
160	    {error, {node_not_running, node()}}
161    end;
162rec(undefined) ->
163    {error, {node_not_running, node()}}.
164
165rec(Pid, Ref) ->
166    receive
167	{?MODULE, Ref, Reply} ->
168	    Reply;
169	{'EXIT', Pid, _} ->
170	    {error, {node_not_running, node()}}
171    end.
172
173tmlink({From, Ref}) when is_reference(Ref) ->
174    link(From);
175tmlink(From) ->
176    link(From).
177tmpid({Pid, _Ref}) when is_pid(Pid) ->
178    Pid;
179tmpid(Pid) ->
180    Pid.
181
182%% Returns a list of participant transaction Tid's
183mnesia_down(Node) ->
184    %% Syncronously call needed in order to avoid
185    %% race with mnesia_tm's coordinator processes
186    %% that may restart and acquire new locks.
187    %% mnesia_monitor takes care of the sync
188    case whereis(?MODULE) of
189	undefined ->
190	    mnesia_monitor:mnesia_down(?MODULE, Node);
191	Pid ->
192	    Pid ! {mnesia_down, Node},
193	    ok
194    end.
195
196prepare_checkpoint(Nodes, Cp) ->
197    rpc:multicall(Nodes, ?MODULE, prepare_checkpoint, [Cp]).
198
199prepare_checkpoint(Cp) ->
200    req({prepare_checkpoint,Cp}).
201
202block_tab(Tab) ->
203    req({block_tab, Tab}).
204
205unblock_tab(Tab) ->
206    req({unblock_tab, Tab}).
207
208doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor=Sup}=State) ->
209    receive
210	{_From, {async_dirty, Tid, Commit, Tab}} ->
211	    case lists:member(Tab, State#state.blocked_tabs) of
212		false ->
213		    do_async_dirty(Tid, new_cr_format(Commit), Tab),
214		    doit_loop(State);
215		true ->
216		    Item = {async_dirty, Tid, new_cr_format(Commit), Tab},
217		    State2 = State#state{dirty_queue = [Item | State#state.dirty_queue]},
218		    doit_loop(State2)
219	    end;
220
221	{From, {sync_dirty, Tid, Commit, Tab}} ->
222	    case lists:member(Tab, State#state.blocked_tabs) of
223		false ->
224		    do_sync_dirty(From, Tid, new_cr_format(Commit), Tab),
225		    doit_loop(State);
226		true ->
227		    Item = {sync_dirty, From, Tid, new_cr_format(Commit), Tab},
228		    State2 = State#state{dirty_queue = [Item | State#state.dirty_queue]},
229		    doit_loop(State2)
230	    end;
231
232	{From, start_outer} -> %% Create and associate ets_tab with Tid
233	    try ?ets_new_table(mnesia_trans_store, [bag, public]) of
234		Etab ->
235		    tmlink(From),
236		    C = mnesia_recover:incr_trans_tid_serial(),
237		    ?ets_insert(Etab, {nodes, node()}),
238		    Tid = #tid{pid = tmpid(From), counter = C},
239		    A2 = gb_trees:insert(Tid,[Etab],Coordinators),
240		    S2 = State#state{coordinators = A2},
241		    reply(From, {new_tid, Tid, Etab}, S2)
242	    catch error:Reason -> %% system limit
243		    Msg = "Cannot create an ets table for the "
244			"local transaction store",
245		    reply(From, {error, {system_limit, Msg, Reason}}, State)
246	    end;
247
248	{From, {ask_commit, Protocol, Tid, Commit0, DiscNs, RamNs}} ->
249	    ?eval_debug_fun({?MODULE, doit_ask_commit},
250			    [{tid, Tid}, {prot, Protocol}]),
251	    mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs),
252	    Commit = new_cr_format(Commit0),
253	    Pid =
254                if
255                    node(Tid#tid.pid) =:= node() ->
256                        error({internal_error, local_node});
257                    Protocol =:= asym_trans orelse Protocol =:= sync_asym_trans ->
258			Args = [Protocol, tmpid(From), Tid, Commit, DiscNs, RamNs],
259			spawn_link(?MODULE, commit_participant, Args);
260                    true -> %% *_sym_trans
261			reply(From, {vote_yes, Tid}),
262			nopid
263		end,
264	    P = #participant{tid = Tid,
265			     pid = Pid,
266			     commit = Commit,
267			     disc_nodes = DiscNs,
268			     ram_nodes = RamNs,
269			     protocol = Protocol},
270	    State2 = State#state{participants = gb_trees:insert(Tid,P,Participants)},
271	    doit_loop(State2);
272
273	{Tid, do_commit} ->
274	    case gb_trees:lookup(Tid, Participants) of
275		none ->
276		    verbose("Tried to commit a non participant transaction ~p~n",[Tid]),
277		    doit_loop(State);
278		{value, P} ->
279		    ?eval_debug_fun({?MODULE,do_commit,pre},[{tid,Tid},{participant,P}]),
280		    case P#participant.pid of
281			nopid ->
282			    Commit = P#participant.commit,
283			    Member = lists:member(node(), P#participant.disc_nodes),
284			    if Member == false ->
285				    ignore;
286			       P#participant.protocol == sym_trans ->
287				    mnesia_log:log(Commit);
288			       P#participant.protocol == sync_sym_trans ->
289				    mnesia_log:slog(Commit)
290			    end,
291			    mnesia_recover:note_decision(Tid, committed),
292			    do_commit(Tid, Commit),
293			    if
294				P#participant.protocol == sync_sym_trans ->
295				    Tid#tid.pid ! {?MODULE, node(), {committed, Tid}};
296				true ->
297				    ignore
298			    end,
299			    mnesia_locker:release_tid(Tid),
300			    transaction_terminated(Tid),
301			    ?eval_debug_fun({?MODULE,do_commit,post},[{tid,Tid},{pid,nopid}]),
302			    doit_loop(State#state{participants=
303						  gb_trees:delete(Tid,Participants)});
304			Pid when is_pid(Pid) ->
305			    Pid ! {Tid, committed},
306			    ?eval_debug_fun({?MODULE, do_commit, post}, [{tid, Tid}, {pid, Pid}]),
307			    doit_loop(State)
308		    end
309	    end;
310
311	{Tid, simple_commit} ->
312	    mnesia_recover:note_decision(Tid, committed),
313	    mnesia_locker:release_tid(Tid),
314	    transaction_terminated(Tid),
315	    doit_loop(State);
316
317	{Tid, {do_abort, Reason}} ->
318	    ?eval_debug_fun({?MODULE, do_abort, pre}, [{tid, Tid}]),
319	    case gb_trees:lookup(Tid, Participants) of
320		none ->
321		    verbose("Tried to abort a non participant transaction ~p: ~tp~n",
322			    [Tid, Reason]),
323		    mnesia_locker:release_tid(Tid),
324		    doit_loop(State);
325		{value, P} ->
326		    case P#participant.pid of
327			nopid ->
328			    Commit = P#participant.commit,
329			    mnesia_recover:note_decision(Tid, aborted),
330			    do_abort(Tid, Commit),
331			    if
332				P#participant.protocol == sync_sym_trans ->
333				    Tid#tid.pid ! {?MODULE, node(), {aborted, Tid}};
334				true ->
335				    ignore
336			    end,
337			    transaction_terminated(Tid),
338			    mnesia_locker:release_tid(Tid),
339			    ?eval_debug_fun({?MODULE, do_abort, post}, [{tid, Tid}, {pid, nopid}]),
340			    doit_loop(State#state{participants=
341						  gb_trees:delete(Tid,Participants)});
342			Pid when is_pid(Pid) ->
343			    Pid ! {Tid, {do_abort, Reason}},
344			    ?eval_debug_fun({?MODULE, do_abort, post},
345					    [{tid, Tid}, {pid, Pid}]),
346			    doit_loop(State)
347		    end
348	    end;
349
350	{From, {add_store, Tid}} -> %% new store for nested  transaction
351	    try ?ets_new_table(mnesia_trans_store, [bag, public]) of
352		Etab ->
353		    A2 = add_coord_store(Coordinators, Tid, Etab),
354		    reply(From, {new_store, Etab},
355			  State#state{coordinators = A2})
356	    catch error:Reason -> %% system limit
357		    Msg = "Cannot create an ets table for a nested "
358			"local transaction store",
359		    reply(From, {error, {system_limit, Msg, Reason}}, State)
360	    end;
361
362	{From, {del_store, Tid, Current, Obsolete, PropagateStore}} ->
363	    opt_propagate_store(Current, Obsolete, PropagateStore),
364	    A2 = del_coord_store(Coordinators, Tid, Current, Obsolete),
365	    reply(From, store_erased, State#state{coordinators = A2});
366
367	{'EXIT', Pid, Reason} ->
368	    handle_exit(Pid, Reason, State);
369
370	{From, {restart, Tid, Store}} ->
371	    A2 = restore_stores(Coordinators, Tid, Store),
372	    clear_fixtable([Store]),
373	    ?ets_match_delete(Store, '_'),
374	    ?ets_insert(Store, {nodes, node()}),
375	    reply(From, {restarted, Tid}, State#state{coordinators = A2});
376
377	{delete_transaction, Tid} ->
378	    %% used to clear transactions which are committed
379	    %% in coordinator or participant processes
380	    case gb_trees:is_defined(Tid, Participants) of
381		false ->
382		    case gb_trees:lookup(Tid, Coordinators) of
383			none ->
384			    verbose("** ERROR ** Tried to delete a non transaction ~p~n",
385				    [Tid]),
386			    doit_loop(State);
387			{value, Etabs} ->
388			    clear_fixtable(Etabs),
389			    erase_ets_tabs(Etabs),
390			    transaction_terminated(Tid),
391			    doit_loop(State#state{coordinators =
392						  gb_trees:delete(Tid,Coordinators)})
393		    end;
394		true ->
395		    transaction_terminated(Tid),
396		    State2 = State#state{participants=gb_trees:delete(Tid,Participants)},
397		    doit_loop(State2)
398	    end;
399
400	{sync_trans_serial, Tid} ->
401	    %% Do the Lamport thing here
402	    mnesia_recover:sync_trans_tid_serial(Tid),
403	    doit_loop(State);
404
405	{From, info} ->
406	    reply(From, {info, gb_trees:values(Participants),
407			 gb_trees:to_list(Coordinators)}, State);
408
409	{mnesia_down, N} ->
410	    verbose("Got mnesia_down from ~p, reconfiguring...~n", [N]),
411	    reconfigure_coordinators(N, gb_trees:to_list(Coordinators)),
412
413	    Tids = gb_trees:keys(Participants),
414	    reconfigure_participants(N, gb_trees:values(Participants)),
415	    NewState = clear_fixtable(N, State),
416
417	    mnesia_locker:mnesia_down(N, Tids),
418	    mnesia_monitor:mnesia_down(?MODULE, N),
419	    doit_loop(NewState);
420
421	{From, {unblock_me, Tab}} ->
422	    case lists:member(Tab, State#state.blocked_tabs) of
423		false ->
424		    verbose("Wrong dirty Op blocked on ~p ~tp ~p",
425			    [node(), Tab, From]),
426		    reply(From, unblocked),
427		    doit_loop(State);
428		true ->
429		    Item = {Tab, unblock_me, From},
430		    State2 = State#state{dirty_queue = [Item | State#state.dirty_queue]},
431		    doit_loop(State2)
432	    end;
433
434	{From, {block_tab, Tab}} ->
435	    State2 = State#state{blocked_tabs = [Tab | State#state.blocked_tabs]},
436	    reply(From, ok, State2);
437
438	{From, {unblock_tab, Tab}} ->
439	    BlockedTabs2 = State#state.blocked_tabs -- [Tab],
440	    case lists:member(Tab, BlockedTabs2) of
441		false ->
442		    mnesia_controller:unblock_table(Tab),
443		    Queue = process_dirty_queue(Tab, State#state.dirty_queue),
444		    State2 = State#state{blocked_tabs = BlockedTabs2,
445					 dirty_queue = Queue},
446		    reply(From, ok, State2);
447		true ->
448		    State2 = State#state{blocked_tabs = BlockedTabs2},
449		    reply(From, ok, State2)
450	    end;
451
452	{From, {prepare_checkpoint, Cp}} ->
453	    Res = mnesia_checkpoint:tm_prepare(Cp),
454	    case Res of
455		{ok, _Name, IgnoreNew, _Node} ->
456		    prepare_pending_coordinators(gb_trees:to_list(Coordinators), IgnoreNew),
457		    prepare_pending_participants(gb_trees:values(Participants), IgnoreNew);
458		{error, _Reason} ->
459		    ignore
460	    end,
461	    reply(From, Res, State);
462	{From, {fixtable, [Tab,Lock,Requester]}} ->
463	    case ?catch_val({Tab, storage_type}) of
464		{'EXIT', _} ->
465		    reply(From, error, State);
466		Storage ->
467		    mnesia_lib:db_fixtable(Storage,Tab,Lock),
468		    NewState = manage_fixtable(Tab,Lock,Requester,State),
469		    reply(From, node(), NewState)
470	    end;
471
472	{system, From, Msg} ->
473	    dbg_out("~p got {system, ~p, ~tp}~n", [?MODULE, From, Msg]),
474	    sys:handle_system_msg(Msg, From, Sup, ?MODULE, [], State);
475
476	Msg ->
477	    verbose("** ERROR ** ~p got unexpected message: ~tp~n", [?MODULE, Msg]),
478	    doit_loop(State)
479    end.
480
481do_sync_dirty(From, Tid, Commit, _Tab) ->
482    ?eval_debug_fun({?MODULE, sync_dirty, pre}, [{tid, Tid}]),
483    Res = do_dirty(Tid, Commit),
484    ?eval_debug_fun({?MODULE, sync_dirty, post}, [{tid, Tid}]),
485    From ! {?MODULE, node(), {dirty_res, Res}}.
486
487do_async_dirty(Tid, Commit, _Tab) ->
488    ?eval_debug_fun({?MODULE, async_dirty, pre}, [{tid, Tid}]),
489    do_dirty(Tid, Commit),
490    ?eval_debug_fun({?MODULE, async_dirty, post}, [{tid, Tid}]).
491
492
493%% Process items in fifo order
494process_dirty_queue(Tab, [Item | Queue]) ->
495    Queue2 = process_dirty_queue(Tab, Queue),
496    case Item of
497	{async_dirty, Tid, Commit, Tab} ->
498	    do_async_dirty(Tid, Commit, Tab),
499	    Queue2;
500	{sync_dirty, From, Tid, Commit, Tab} ->
501	    do_sync_dirty(From, Tid, Commit, Tab),
502	    Queue2;
503	{Tab, unblock_me, From} ->
504	    reply(From, unblocked),
505	    Queue2;
506	_ ->
507	    [Item | Queue2]
508    end;
509process_dirty_queue(_Tab, []) ->
510    [].
511
512prepare_pending_coordinators([{Tid, [Store | _Etabs]} | Coords], IgnoreNew) ->
513    try ?ets_lookup(Store, pending) of
514	[] ->
515	    prepare_pending_coordinators(Coords, IgnoreNew);
516	[Pending] ->
517	    case lists:member(Tid, IgnoreNew) of
518		false ->
519		    mnesia_checkpoint:tm_enter_pending(Pending);
520		true ->
521		    ignore
522	    end,
523	    prepare_pending_coordinators(Coords, IgnoreNew)
524    catch error:_ ->
525	    prepare_pending_coordinators(Coords, IgnoreNew)
526    end;
527prepare_pending_coordinators([], _IgnoreNew) ->
528    ok.
529
530prepare_pending_participants([Part | Parts], IgnoreNew) ->
531    Tid = Part#participant.tid,
532    D = Part#participant.disc_nodes,
533    R = Part#participant.ram_nodes,
534    case lists:member(Tid, IgnoreNew) of
535	false ->
536	    mnesia_checkpoint:tm_enter_pending(Tid, D, R);
537	true ->
538	    ignore
539    end,
540    prepare_pending_participants(Parts, IgnoreNew);
541prepare_pending_participants([], _IgnoreNew) ->
542    ok.
543
544handle_exit(Pid, _Reason, State) when node(Pid) /= node() ->
545    %% We got exit from a remote fool
546    doit_loop(State);
547
548handle_exit(Pid, _Reason, State) when Pid == State#state.supervisor ->
549    %% Our supervisor has died, time to stop
550    do_stop(State);
551
552handle_exit(Pid, Reason, State) ->
553    %% Check if it is a coordinator
554    case pid_search_delete(Pid, gb_trees:to_list(State#state.coordinators)) of
555	{none, _} ->
556	    %% Check if it is a participant
557	    Ps = gb_trees:values(State#state.participants),
558	    case mnesia_lib:key_search_delete(Pid,#participant.pid,Ps) of
559		{none, _} ->
560		    %% We got exit from a local fool
561		    doit_loop(State);
562		{P = #participant{}, _RestP} ->
563		    fatal("Participant ~p in transaction ~p died ~tp~n",
564			  [P#participant.pid, P#participant.tid, Reason]),
565		    NewPs = gb_trees:delete(P#participant.tid,State#state.participants),
566		    doit_loop(State#state{participants = NewPs})
567	    end;
568
569	{{Tid, Etabs}, RestC} ->
570	    %% A local coordinator has died and
571	    %% we must determine the outcome of the
572	    %% transaction and tell mnesia_tm on the
573	    %% other nodes about it and then recover
574	    %% locally.
575	    recover_coordinator(Tid, Etabs),
576	    doit_loop(State#state{coordinators = RestC})
577    end.
578
579recover_coordinator(Tid, Etabs) ->
580    verbose("Coordinator ~p in transaction ~p died.~n", [Tid#tid.pid, Tid]),
581
582    Store = hd(Etabs),
583    CheckNodes = get_elements(nodes,Store),
584    TellNodes = CheckNodes -- [node()],
585    try arrange(Tid, Store, async) of
586	{_N, Prep} ->
587	    %% Tell the participants about the outcome
588	    Protocol = Prep#prep.protocol,
589	    Outcome = tell_outcome(Tid, Protocol, node(), CheckNodes, TellNodes),
590
591	    %% Recover locally
592	    CR = Prep#prep.records,
593	    {DiscNs, RamNs} = commit_nodes(CR, [], []),
594	    case lists:keysearch(node(), #commit.node, CR) of
595		{value, Local} ->
596		    ?eval_debug_fun({?MODULE, recover_coordinator, pre},
597				    [{tid, Tid}, {outcome, Outcome}, {prot, Protocol}]),
598		    recover_coordinator(Tid, Protocol, Outcome, Local, DiscNs, RamNs),
599		    ?eval_debug_fun({?MODULE, recover_coordinator, post},
600				    [{tid, Tid}, {outcome, Outcome}, {prot, Protocol}]);
601		false ->  %% When killed before store havn't been copied to
602		    ok    %% to the new nested trans store.
603	    end
604    catch _:Reason:Stacktrace ->
605	    dbg_out("Recovery of coordinator ~p failed: ~tp~n",
606		    [Tid, {Reason, Stacktrace}]),
607	    Protocol = asym_trans,
608	    tell_outcome(Tid, Protocol, node(), CheckNodes, TellNodes)
609    end,
610    erase_ets_tabs(Etabs),
611    transaction_terminated(Tid),
612    mnesia_locker:release_tid(Tid).
613
614recover_coordinator(Tid, sym_trans, committed, Local, _, _) ->
615    mnesia_recover:note_decision(Tid, committed),
616    do_dirty(Tid, Local);
617recover_coordinator(Tid, sym_trans, aborted, _Local, _, _) ->
618    mnesia_recover:note_decision(Tid, aborted);
619recover_coordinator(Tid, sync_sym_trans, committed, Local, _, _) ->
620    mnesia_recover:note_decision(Tid, committed),
621    do_dirty(Tid, Local);
622recover_coordinator(Tid, sync_sym_trans, aborted, _Local, _, _) ->
623    mnesia_recover:note_decision(Tid, aborted);
624
625recover_coordinator(Tid, Protocol, committed, Local, DiscNs, RamNs)
626  when Protocol =:= asym_trans; Protocol =:= sync_asym_trans ->
627    D = #decision{tid = Tid, outcome = committed,
628		  disc_nodes = DiscNs, ram_nodes = RamNs},
629    mnesia_recover:log_decision(D),
630    do_commit(Tid, Local);
631recover_coordinator(Tid, Protocol, aborted, Local, DiscNs, RamNs)
632  when Protocol =:= asym_trans; Protocol =:= sync_asym_trans ->
633    D = #decision{tid = Tid, outcome = aborted,
634		  disc_nodes = DiscNs, ram_nodes = RamNs},
635    mnesia_recover:log_decision(D),
636    do_abort(Tid, Local).
637
638restore_stores(Coords, Tid, Store) ->
639    Etstabs = gb_trees:get(Tid,Coords),
640    Remaining  = lists:delete(Store, Etstabs),
641    erase_ets_tabs(Remaining),
642    gb_trees:update(Tid,[Store],Coords).
643
644add_coord_store(Coords, Tid, Etab) ->
645    Stores = gb_trees:get(Tid, Coords),
646    gb_trees:update(Tid, [Etab|Stores], Coords).
647
648del_coord_store(Coords, Tid, Current, Obsolete) ->
649    Stores = gb_trees:get(Tid, Coords),
650    Rest =
651    	case Stores of
652    	    [Obsolete, Current | Tail] -> Tail;
653    	    [Current, Obsolete | Tail] -> Tail
654    	end,
655    ?ets_delete_table(Obsolete),
656    gb_trees:update(Tid, [Current|Rest], Coords).
657
658erase_ets_tabs([H | T]) ->
659    ?ets_delete_table(H),
660    erase_ets_tabs(T);
661erase_ets_tabs([]) ->
662    ok.
663
664%% Clear one transactions all fixtables
665clear_fixtable([Store|_]) ->
666    Fixed = get_elements(fixtable, Store),
667    lists:foreach(fun({Tab,Node}) ->
668			  rpc:cast(Node, ?MODULE, fixtable, [Tab,false,self()])
669		  end, Fixed).
670
671%% Clear all fixtable Node have done
672clear_fixtable(Node, State=#state{fixed_tabs = FT0}) ->
673    case mnesia_lib:key_search_delete(Node, 1, FT0) of
674	{none, _Ft} ->
675	    State;
676	{{Node,Tabs},FT} ->
677	    lists:foreach(
678	      fun(Tab) ->
679		      case ?catch_val({Tab, storage_type}) of
680			  {'EXIT', _} ->
681			      ignore;
682			  Storage ->
683			      mnesia_lib:db_fixtable(Storage,Tab,false)
684		      end
685	      end, Tabs),
686	    State#state{fixed_tabs=FT}
687    end.
688
689manage_fixtable(Tab,true,Requester,State=#state{fixed_tabs = FT0}) ->
690    Node = node(Requester),
691    case mnesia_lib:key_search_delete(Node, 1, FT0) of
692	{none, FT}->
693	    State#state{fixed_tabs=[{Node, [Tab]}|FT]};
694	{{Node,Tabs},FT} ->
695	    State#state{fixed_tabs=[{Node, [Tab|Tabs]}|FT]}
696    end;
697manage_fixtable(Tab,false,Requester,State = #state{fixed_tabs = FT0}) ->
698    Node = node(Requester),
699    case mnesia_lib:key_search_delete(Node, 1, FT0) of
700	{none,_FT} -> State; % Hmm? Safeguard
701	{{Node, Tabs0},FT} ->
702	    case lists:delete(Tab, Tabs0) of
703		[] -> State#state{fixed_tabs=FT};
704		Tabs -> State#state{fixed_tabs=[{Node,Tabs}|FT]}
705	    end
706    end.
707
708%% Deletes a pid from a list of participants
709%% or from a gb_trees of coordinators
710%% {none, All} or {Tr, Rest}
711pid_search_delete(Pid, Trs) ->
712    pid_search_delete(Pid, Trs, none, []).
713pid_search_delete(Pid, [Tr = {Tid, _Ts} | Trs], _Val, Ack) when Tid#tid.pid == Pid ->
714    pid_search_delete(Pid, Trs, Tr, Ack);
715pid_search_delete(Pid, [Tr | Trs], Val, Ack) ->
716    pid_search_delete(Pid, Trs, Val, [Tr | Ack]);
717
718pid_search_delete(_Pid, [], Val, Ack) ->
719    {Val, gb_trees:from_orddict(lists:reverse(Ack))}.
720
721transaction_terminated(Tid)  ->
722    mnesia_checkpoint:tm_exit_pending(Tid),
723    Pid = Tid#tid.pid,
724    if
725	node(Pid) == node() ->
726	    unlink(Pid);
727	true ->  %% Do the Lamport thing here
728	    mnesia_recover:sync_trans_tid_serial(Tid)
729    end.
730
731%% If there are an surrounding transaction, we inherit it's context
732non_transaction(OldState={_,_,Trans}, Fun, Args, ActivityKind, Mod)
733  when Trans /= non_transaction ->
734    Kind = case ActivityKind of
735	       sync_dirty -> sync;
736	       _ -> async
737	   end,
738    case transaction(OldState, Fun, Args, infinity, Mod, Kind) of
739	{atomic, Res} -> Res;
740	{aborted,Res} -> exit(Res)
741    end;
742non_transaction(OldState, Fun, Args, ActivityKind, Mod) ->
743    Id = {ActivityKind, self()},
744    NewState = {Mod, Id, non_transaction},
745    put(mnesia_activity_state, NewState),
746    try apply(Fun, Args) of
747	{'EXIT', Reason} -> exit(Reason);
748	{aborted, Reason} -> mnesia:abort(Reason);
749	Res -> Res
750    catch
751        throw:Throw     -> throw(Throw);
752        error:Reason:ST -> exit({Reason, ST});
753        exit:Reason     -> exit(Reason)
754    after
755	case OldState of
756	    undefined -> erase(mnesia_activity_state);
757	    _ -> put(mnesia_activity_state, OldState)
758	end
759    end.
760
761transaction(OldTidTs, Fun, Args, Retries, Mod, Type) ->
762    Factor = 1,
763    case OldTidTs of
764	undefined -> % Outer
765	    execute_outer(Mod, Fun, Args, Factor, Retries, Type);
766	{_, _, non_transaction} -> % Transaction inside ?sync_dirty
767	    Res = execute_outer(Mod, Fun, Args, Factor, Retries, Type),
768	    put(mnesia_activity_state, OldTidTs),
769	    Res;
770	{OldMod, Tid, Ts} ->  % Nested
771	    execute_inner(Mod, Tid, OldMod, Ts, Fun, Args, Factor, Retries, Type);
772	_ -> % Bad nesting
773	    {aborted, nested_transaction}
774    end.
775
776execute_outer(Mod, Fun, Args, Factor, Retries, Type) ->
777    case req(start_outer) of
778	{error, Reason} ->
779	    {aborted, Reason};
780	{new_tid, Tid, Store} ->
781	    Ts = #tidstore{store = Store},
782	    NewTidTs = {Mod, Tid, Ts},
783	    put(mnesia_activity_state, NewTidTs),
784	    execute_transaction(Fun, Args, Factor, Retries, Type)
785    end.
786
787execute_inner(Mod, Tid, OldMod, Ts, Fun, Args, Factor, Retries, Type) ->
788    case req({add_store, Tid}) of
789	{error, Reason} ->
790	    {aborted, Reason};
791	{new_store, Ets} ->
792	    copy_ets(Ts#tidstore.store, Ets),
793	    Up = [{OldMod,Ts#tidstore.store} | Ts#tidstore.up_stores],
794	    NewTs = Ts#tidstore{level = 1 + Ts#tidstore.level,
795				store = Ets,
796				up_stores = Up},
797	    NewTidTs = {Mod, Tid, NewTs},
798	    put(mnesia_activity_state, NewTidTs),
799	    execute_transaction(Fun, Args, Factor, Retries, Type)
800    end.
801
802copy_ets(From, To) ->
803    do_copy_ets(?ets_first(From), From, To).
804do_copy_ets('$end_of_table', _,_) ->
805    ok;
806do_copy_ets(K, From, To) ->
807    Objs = ?ets_lookup(From, K),
808    insert_objs(Objs, To),
809    do_copy_ets(?ets_next(From, K), From, To).
810
811insert_objs([H|T], Tab) ->
812    ?ets_insert(Tab, H),
813    insert_objs(T, Tab);
814insert_objs([], _Tab) ->
815    ok.
816
817execute_transaction(Fun, Args, Factor, Retries, Type) ->
818    try apply_fun(Fun, Args, Type) of
819	{atomic, Value} ->
820	    mnesia_lib:incr_counter(trans_commits),
821	    erase(mnesia_activity_state),
822	    %% no need to clear locks, already done by commit ...
823	    %% Flush any un processed mnesia_down messages we might have
824	    flush_downs(),
825	    ?SAFE(unlink(whereis(?MODULE))),
826	    {atomic, Value};
827	{do_abort, Reason} ->
828	    check_exit(Fun, Args, Factor, Retries, {aborted, Reason}, Type);
829	{nested_atomic, Value} ->
830	    mnesia_lib:incr_counter(trans_commits),
831	    {atomic, Value}
832    catch throw:Value ->  %% User called throw
833	    Reason = {aborted, {throw, Value}},
834	    return_abort(Fun, Args, Reason);
835	  error:Reason:ST ->
836	    check_exit(Fun, Args, Factor, Retries, {Reason,ST}, Type);
837	  _:Reason ->
838	    check_exit(Fun, Args, Factor, Retries, Reason, Type)
839    end.
840
841apply_fun(Fun, Args, Type) ->
842    Result = apply(Fun, Args),
843    case t_commit(Type) of
844	do_commit ->
845            {atomic, Result};
846        do_commit_nested ->
847            {nested_atomic, Result};
848	{do_abort, {aborted, Reason}} ->
849	    {do_abort, Reason};
850	{do_abort, _} = Abort ->
851	    Abort
852    end.
853
854check_exit(Fun, Args, Factor, Retries, Reason, Type) ->
855    case Reason of
856	{aborted, C = #cyclic{}} ->
857	    maybe_restart(Fun, Args, Factor, Retries, Type, C);
858	{aborted, {node_not_running, N}} ->
859	    maybe_restart(Fun, Args, Factor, Retries, Type, {node_not_running, N});
860	{aborted, {bad_commit, N}} ->
861	    maybe_restart(Fun, Args, Factor, Retries, Type, {bad_commit, N});
862	_ ->
863	    return_abort(Fun, Args, Reason)
864    end.
865
866maybe_restart(Fun, Args, Factor, Retries, Type, Why) ->
867    {Mod, Tid, Ts} = get(mnesia_activity_state),
868    case try_again(Retries) of
869	yes when Ts#tidstore.level == 1 ->
870	    restart(Mod, Tid, Ts, Fun, Args, Factor, Retries, Type, Why);
871	yes ->
872	    return_abort(Fun, Args, Why);
873	no ->
874	    return_abort(Fun, Args, {aborted, nomore})
875    end.
876
877try_again(infinity) -> yes;
878try_again(X) when is_number(X) , X > 1 -> yes;
879try_again(_) -> no.
880
881%% We can only restart toplevel transactions.
882%% If a deadlock situation occurs in a nested transaction
883%% The whole thing including all nested transactions need to be
884%% restarted. The stack is thus popped by a consequtive series of
885%% exit({aborted, #cyclic{}}) calls
886
887restart(Mod, Tid, Ts, Fun, Args, Factor0, Retries0, Type, Why) ->
888    mnesia_lib:incr_counter(trans_restarts),
889    Retries = decr(Retries0),
890    case Why of
891	{bad_commit, _N} ->
892	    return_abort(Fun, Args, Why),
893	    Factor = 1,
894	    SleepTime = mnesia_lib:random_time(Factor, Tid#tid.counter),
895	    dbg_out("Restarting transaction ~w: in ~wms ~w~n", [Tid, SleepTime, Why]),
896	    timer:sleep(SleepTime),
897	    execute_outer(Mod, Fun, Args, Factor, Retries, Type);
898	{node_not_running, _N} ->   %% Avoids hanging in receive_release_tid_ack
899	    return_abort(Fun, Args, Why),
900	    Factor = 1,
901	    SleepTime = mnesia_lib:random_time(Factor, Tid#tid.counter),
902	    dbg_out("Restarting transaction ~w: in ~wms ~w~n", [Tid, SleepTime, Why]),
903	    timer:sleep(SleepTime),
904	    execute_outer(Mod, Fun, Args, Factor, Retries, Type);
905	_ ->
906	    SleepTime = mnesia_lib:random_time(Factor0, Tid#tid.counter),
907	    dbg_out("Restarting transaction ~w: in ~wms ~w~n", [Tid, SleepTime, Why]),
908
909	    if
910		Factor0 /= 10 ->
911		    ignore;
912		true ->
913		    %% Our serial may be much larger than other nodes ditto
914		    AllNodes = val({current, db_nodes}),
915		    verbose("Sync serial ~p~n", [Tid]),
916		    rpc:abcast(AllNodes, ?MODULE, {sync_trans_serial, Tid})
917	    end,
918	    intercept_friends(Tid, Ts),
919	    Store = Ts#tidstore.store,
920	    Nodes = get_elements(nodes,Store),
921	    ?MODULE ! {self(), {restart, Tid, Store}},
922	    mnesia_locker:send_release_tid(Nodes, Tid),
923	    timer:sleep(SleepTime),
924	    mnesia_locker:receive_release_tid_acc(Nodes, Tid),
925	    case get_restarted(Tid) of
926		{restarted, Tid} ->
927		    execute_transaction(Fun, Args, Factor0 + 1,
928					Retries, Type);
929		{error, Reason} ->
930		    mnesia:abort(Reason)
931	    end
932    end.
933
934get_restarted(Tid) ->
935    case Res = rec() of
936	{restarted, Tid} ->
937	    Res;
938	{error,_} ->
939	    Res;
940	_ -> %% We could get a couple of aborts to many.
941	    get_restarted(Tid)
942    end.
943
944decr(infinity) -> infinity;
945decr(X) when is_integer(X), X > 1 -> X - 1;
946decr(_X) -> 0.
947
948return_abort(Fun, Args, Reason)  ->
949    {_Mod, Tid, Ts} = get(mnesia_activity_state),
950    dbg_out("Transaction ~p calling ~tp with ~tp failed: ~n ~tp~n",
951	    [Tid, Fun, Args, Reason]),
952    OldStore = Ts#tidstore.store,
953    Nodes = get_elements(nodes, OldStore),
954    intercept_friends(Tid, Ts),
955    ?SAFE(mnesia_lib:incr_counter(trans_failures)),
956    Level = Ts#tidstore.level,
957    if
958	Level == 1 ->
959	    mnesia_locker:async_release_tid(Nodes, Tid),
960	    ?SAFE(?MODULE ! {delete_transaction, Tid}),
961	    erase(mnesia_activity_state),
962	    flush_downs(),
963	    ?SAFE(unlink(whereis(?MODULE))),
964	    {aborted, mnesia_lib:fix_error(Reason)};
965	true ->
966	    %% Nested transaction
967	    [{OldMod,NewStore} | Tail] = Ts#tidstore.up_stores,
968	    req({del_store, Tid, NewStore, OldStore, true}),
969	    Ts2 = Ts#tidstore{store = NewStore,
970			      up_stores = Tail,
971			      level = Level - 1},
972	    NewTidTs = {OldMod, Tid, Ts2},
973	    put(mnesia_activity_state, NewTidTs),
974	    case Reason of
975		#cyclic{} ->
976		    exit({aborted, Reason});
977		{node_not_running, _N} ->
978		    exit({aborted, Reason});
979		{bad_commit, _N}->
980		    exit({aborted, Reason});
981		_ ->
982		    {aborted, mnesia_lib:fix_error(Reason)}
983	    end
984    end.
985
986flush_downs() ->
987    receive
988	{?MODULE, _, _} -> flush_downs(); % Votes
989	{mnesia_down, _} -> flush_downs()
990    after 0 -> flushed
991    end.
992
993
994put_activity_id(MTT) ->
995    put_activity_id(MTT, undefined).
996put_activity_id(undefined,_) ->
997    erase_activity_id();
998put_activity_id({Mod, Tid = #tid{}, Ts = #tidstore{}},Fun) ->
999    flush_downs(),
1000    Store = Ts#tidstore.store,
1001    if
1002	is_function(Fun) ->
1003	    ?ets_insert(Store, {friends, {stop,Fun}});
1004	true ->
1005	    ?ets_insert(Store, {friends, self()})
1006    end,
1007    NewTidTs = {Mod, Tid, Ts},
1008    put(mnesia_activity_state, NewTidTs);
1009put_activity_id(SimpleState,_) ->
1010    put(mnesia_activity_state, SimpleState).
1011
1012erase_activity_id() ->
1013    flush_downs(),
1014    erase(mnesia_activity_state).
1015
1016get_elements(Type,Store) ->
1017    try ?ets_lookup(Store, Type) of
1018	[] -> [];
1019	[{_,Val}] -> [Val];
1020	Vals -> [Val|| {_,Val} <- Vals]
1021    catch error:_ -> []
1022    end.
1023
1024opt_propagate_store(_Current, _Obsolete, false) ->
1025    ok;
1026opt_propagate_store(Current, Obsolete, true) ->
1027    propagate_store(Current, nodes, get_elements(nodes,Obsolete)),
1028    propagate_store(Current, fixtable, get_elements(fixtable,Obsolete)),
1029    propagate_store(Current, friends, get_elements(friends, Obsolete)).
1030
1031propagate_store(Store, Var, [Val | Vals]) ->
1032    ?ets_insert(Store, {Var, Val}),
1033    propagate_store(Store, Var, Vals);
1034propagate_store(_Store, _Var, []) ->
1035    ok.
1036
1037%% Tell all processes that are cooperating with the current transaction
1038intercept_friends(_Tid, Ts) ->
1039    Friends = get_elements(friends,Ts#tidstore.store),
1040    intercept_best_friend(Friends, false).
1041
1042intercept_best_friend([],_) ->    ok;
1043intercept_best_friend([{stop,Fun} | R],Ignore) ->
1044    ?CATCH(Fun()),
1045    intercept_best_friend(R,Ignore);
1046intercept_best_friend([Pid | R],false) ->
1047    Pid ! {activity_ended, undefined, self()},
1048    wait_for_best_friend(Pid, 0),
1049    intercept_best_friend(R,true);
1050intercept_best_friend([_|R],true) ->
1051    intercept_best_friend(R,true).
1052
1053wait_for_best_friend(Pid, Timeout) ->
1054    receive
1055	{'EXIT', Pid, _} -> ok;
1056	{activity_ended, _, Pid} -> ok
1057    after Timeout ->
1058	    case erlang:is_process_alive(Pid) of
1059		true -> wait_for_best_friend(Pid, 1000);
1060		false -> ok
1061	    end
1062    end.
1063
1064dirty(Protocol, Item) ->
1065    {{Tab, Key}, _Val, _Op} = Item,
1066    Tid = {dirty, self()},
1067    Prep = prepare_items(Tid, Tab, Key, [Item], #prep{protocol= Protocol}),
1068    CR =  Prep#prep.records,
1069    case Protocol of
1070	async_dirty ->
1071	    %% Send commit records to the other involved nodes,
1072	    %% but do only wait for one node to complete.
1073	    %% Preferrably, the local node if possible.
1074
1075	    ReadNode = val({Tab, where_to_read}),
1076	    {WaitFor, FirstRes} = async_send_dirty(Tid, CR, Tab, ReadNode),
1077	    rec_dirty(WaitFor, FirstRes);
1078
1079	sync_dirty ->
1080	    %% Send commit records to the other involved nodes,
1081	    %% and wait for all nodes to complete
1082	    {WaitFor, FirstRes} = sync_send_dirty(Tid, CR, Tab, []),
1083	    rec_dirty(WaitFor, FirstRes);
1084	_ ->
1085	    mnesia:abort({bad_activity, Protocol})
1086    end.
1087
1088%% This is the commit function, The first thing it does,
1089%% is to find out which nodes that have been participating
1090%% in this particular transaction, all of the mnesia_locker:lock*
1091%% functions insert the names of the nodes where it aquires locks
1092%% into the local shadow Store
1093%% This function exacutes in the context of the user process
1094t_commit(Type) ->
1095    {_Mod, Tid, Ts} = get(mnesia_activity_state),
1096    Store = Ts#tidstore.store,
1097    if
1098	Ts#tidstore.level == 1 ->
1099	    intercept_friends(Tid, Ts),
1100	    %% N is number of updates
1101	    case arrange(Tid, Store, Type) of
1102		{N, Prep} when N > 0 ->
1103		    multi_commit(Prep#prep.protocol,
1104				 majority_attr(Prep),
1105				 Tid, Prep#prep.records, Store);
1106		{0, Prep} ->
1107		    multi_commit(read_only,
1108				 majority_attr(Prep),
1109				 Tid, Prep#prep.records, Store)
1110	    end;
1111	true ->
1112	    %% nested commit
1113	    Level = Ts#tidstore.level,
1114	    [{OldMod,Obsolete} | Tail] = Ts#tidstore.up_stores,
1115	    req({del_store, Tid, Store, Obsolete, false}),
1116	    NewTs = Ts#tidstore{store = Store,
1117				up_stores = Tail,
1118				level = Level - 1},
1119	    NewTidTs = {OldMod, Tid, NewTs},
1120	    put(mnesia_activity_state, NewTidTs),
1121	    do_commit_nested
1122    end.
1123
1124majority_attr(#prep{majority = M}) ->
1125    M.
1126
1127
1128%% This function arranges for all objects we shall write in S to be
1129%% in a list of {Node, CommitRecord}
1130%% Important function for the performance of mnesia.
1131
1132arrange(Tid, Store, Type) ->
1133    %% The local node is always included
1134    Nodes = get_elements(nodes,Store),
1135    Recs = prep_recs(Nodes, []),
1136    Key = ?ets_first(Store),
1137    N = 0,
1138    Prep =
1139	case Type of
1140	    async -> #prep{protocol = sym_trans, records = Recs};
1141	    sync -> #prep{protocol = sync_sym_trans, records = Recs}
1142	end,
1143    {New, Prepared} = do_arrange(Tid, Store, Key, Prep, N),
1144    {New, Prepared#prep{records = reverse(Prepared#prep.records)}}.
1145
1146reverse([]) ->
1147    [];
1148reverse([H=#commit{ram_copies=Ram, disc_copies=DC,
1149		   disc_only_copies=DOC, ext=Ext}
1150	 |R]) ->
1151    [
1152     H#commit{
1153       ram_copies       = lists:reverse(Ram),
1154       disc_copies      = lists:reverse(DC),
1155       disc_only_copies = lists:reverse(DOC),
1156       ext              = [{Type, lists:reverse(E)} || {Type,E} <- Ext]
1157      }
1158     | reverse(R)].
1159
1160prep_recs([N | Nodes], Recs) ->
1161    prep_recs(Nodes, [#commit{decision = presume_commit, node = N} | Recs]);
1162prep_recs([], Recs) ->
1163    Recs.
1164
1165%% storage_types is a list of {Node, Storage} tuples
1166%% where each tuple represents an active replica
1167do_arrange(Tid, Store, {Tab, Key}, Prep, N) ->
1168    Oid = {Tab, Key},
1169    Items = ?ets_lookup(Store, Oid), %% Store is a bag
1170    P2 = prepare_items(Tid, Tab, Key, Items, Prep),
1171    do_arrange(Tid, Store, ?ets_next(Store, Oid), P2, N + 1);
1172do_arrange(Tid, Store, SchemaKey, Prep, N) when SchemaKey == op ->
1173    Items = ?ets_lookup(Store, SchemaKey), %% Store is a bag
1174    P2 = prepare_schema_items(Tid, Items, Prep),
1175    do_arrange(Tid, Store, ?ets_next(Store, SchemaKey), P2, N + 1);
1176do_arrange(Tid, Store, RestoreKey, Prep, N) when RestoreKey == restore_op ->
1177    [{restore_op, R}] = ?ets_lookup(Store, RestoreKey),
1178    Fun = fun({Tab, Key}, CommitRecs, _RecName, Where, Snmp) ->
1179		  Item = [{{Tab, Key}, {Tab, Key}, delete}],
1180		  do_prepare_items(Tid, Tab, Key, Where, Snmp, Item, CommitRecs);
1181	     (BupRec, CommitRecs, RecName, Where, Snmp) ->
1182		  Tab = element(1, BupRec),
1183		  Key = element(2, BupRec),
1184		  Item =
1185		      if
1186			  Tab == RecName ->
1187			      [{{Tab, Key}, BupRec, write}];
1188			  true ->
1189			      BupRec2 = setelement(1, BupRec, RecName),
1190			      [{{Tab, Key}, BupRec2, write}]
1191		      end,
1192		  do_prepare_items(Tid, Tab, Key, Where, Snmp, Item, CommitRecs)
1193	  end,
1194    Recs2 = mnesia_schema:arrange_restore(R, Fun, Prep#prep.records),
1195    P2 = Prep#prep{protocol = asym_trans, records = Recs2},
1196    do_arrange(Tid, Store, ?ets_next(Store, RestoreKey), P2, N + 1);
1197do_arrange(_Tid, _Store, '$end_of_table', Prep, N) ->
1198    case Prep of
1199        #prep{sync=true, protocol=asym_trans} ->
1200            {N, Prep#prep{protocol=sync_asym_trans}};
1201        _ ->
1202            {N, Prep}
1203    end;
1204do_arrange(Tid, Store, sticky, Prep, N) ->
1205    P2 = Prep#prep{sync=true},
1206    do_arrange(Tid, Store, ?ets_next(Store, sticky), P2, N);
1207do_arrange(Tid, Store, IgnoredKey, Prep, N) -> %% locks, nodes ... local atoms...
1208    do_arrange(Tid, Store, ?ets_next(Store, IgnoredKey), Prep, N).
1209
1210%% Returns a prep record  with all items in reverse order
1211prepare_schema_items(Tid, Items, Prep) ->
1212    Types = [{N, schema_ops} || N <- val({current, db_nodes})],
1213    Recs = prepare_nodes(Tid, Types, Items, Prep#prep.records, schema),
1214    Prep#prep{protocol = asym_trans, records = Recs}.
1215
1216%% Returns a prep record with all items in reverse order
1217prepare_items(Tid, Tab, Key, Items, Prep) when Prep#prep.prev_tab == Tab ->
1218    Types = Prep#prep.prev_types,
1219    Snmp = Prep#prep.prev_snmp,
1220    Recs = Prep#prep.records,
1221    Recs2 = do_prepare_items(Tid, Tab, Key, Types, Snmp, Items, Recs),
1222    Prep#prep{records = Recs2};
1223
1224prepare_items(Tid, Tab, Key, Items, Prep) ->
1225    Types = val({Tab, where_to_commit}),
1226    case Types of
1227	[] -> mnesia:abort({no_exists, Tab});
1228	{blocked, _} ->
1229	    unblocked = req({unblock_me, Tab}),
1230	    prepare_items(Tid, Tab, Key, Items, Prep);
1231	_ ->
1232	    Majority = needs_majority(Tab, Prep),
1233	    Snmp = val({Tab, snmp}),
1234	    Recs2 = do_prepare_items(Tid, Tab, Key, Types,
1235				     Snmp, Items, Prep#prep.records),
1236	    Prep2 = Prep#prep{records = Recs2, prev_tab = Tab,
1237			      majority = Majority,
1238			      prev_types = Types, prev_snmp = Snmp},
1239	    check_prep(Prep2, Types)
1240    end.
1241
1242do_prepare_items(Tid, Tab, Key, Types, Snmp, Items, Recs) ->
1243    Recs2 = prepare_snmp(Tid, Tab, Key, Types, Snmp, Items, Recs), % May exit
1244    prepare_nodes(Tid, Types, Items, Recs2, normal).
1245
1246
1247needs_majority(Tab, #prep{majority = M}) ->
1248    case lists:keymember(Tab, 1, M) of
1249	true ->
1250	    M;
1251	false ->
1252	    case ?catch_val({Tab, majority}) of
1253		{'EXIT', _} ->
1254		    M;
1255		false ->
1256		    M;
1257		true ->
1258		    CopyHolders = val({Tab, all_nodes}),
1259		    [{Tab, CopyHolders} | M]
1260	    end
1261    end.
1262
1263have_majority([], _) ->
1264    ok;
1265have_majority([{Tab, AllNodes} | Rest], Nodes) ->
1266    case mnesia_lib:have_majority(Tab, AllNodes, Nodes) of
1267	true ->
1268	    have_majority(Rest, Nodes);
1269	false ->
1270	    {error, Tab}
1271    end.
1272
1273prepare_snmp(Tab, Key, Items) ->
1274    case val({Tab, snmp}) of
1275	[] ->
1276	    [];
1277	Ustruct when Key /= '_' ->
1278	    {_Oid, _Val, Op} = hd(Items),
1279	    %% Still making snmp oid (not used) because we want to catch errors here
1280	    %% And also it keeps backwards comp. with old nodes.
1281	    SnmpOid = mnesia_snmp_hook:key_to_oid(Tab, Key, Ustruct), % May exit
1282	    [{Op, Tab, Key, SnmpOid}];
1283	_ ->
1284	    [{clear_table, Tab}]
1285    end.
1286
1287prepare_snmp(_Tid, _Tab, _Key, _Types, [], _Items, Recs) ->
1288    Recs;
1289
1290prepare_snmp(Tid, Tab, Key, Types, Us, Items, Recs) ->
1291    if Key /= '_' ->
1292	    {_Oid, _Val, Op} = hd(Items),
1293	    SnmpOid = mnesia_snmp_hook:key_to_oid(Tab, Key, Us), % May exit
1294	    prepare_nodes(Tid, Types, [{Op, Tab, Key, SnmpOid}], Recs, snmp);
1295       Key == '_' ->
1296	    prepare_nodes(Tid, Types, [{clear_table, Tab}], Recs, snmp)
1297    end.
1298
1299check_prep(#prep{majority = [], types = Types} = Prep, Types) ->
1300    Prep;
1301check_prep(#prep{majority = M, types = undefined} = Prep, Types) ->
1302    Protocol = if M == [] ->
1303		       Prep#prep.protocol;
1304		  true ->
1305		       asym_trans
1306	       end,
1307    Prep#prep{protocol = Protocol, types = Types};
1308check_prep(Prep, _Types) ->
1309    Prep#prep{protocol = asym_trans}.
1310
1311%% Returns a list of commit records
1312prepare_nodes(Tid, [{Node, Storage} | Rest], Items, C, Kind) ->
1313    {Rec, C2} = pick_node(Tid, Node, C, []),
1314    Rec2 = prepare_node(Node, Storage, Items, Rec, Kind),
1315    [Rec2 | prepare_nodes(Tid, Rest, Items, C2, Kind)];
1316prepare_nodes(_Tid, [], _Items, CommitRecords, _Kind) ->
1317    CommitRecords.
1318
1319pick_node(Tid, Node, [Rec | Rest], Done) ->
1320    if
1321	Rec#commit.node == Node ->
1322	    {Rec, Done ++ Rest};
1323	true ->
1324	    pick_node(Tid, Node, Rest, [Rec | Done])
1325    end;
1326pick_node({dirty,_}, Node, [], Done) ->
1327    {#commit{decision = presume_commit, node = Node}, Done};
1328pick_node(_Tid, Node, [], _Done) ->
1329    mnesia:abort({bad_commit, {missing_lock, Node}}).
1330
1331prepare_node(Node, Storage, [Item | Items], #commit{ext=Ext0}=Rec, Kind) when Kind == snmp ->
1332    Rec2 = case lists:keytake(snmp, 1, Ext0) of
1333	       false ->
1334		   Rec#commit{ext = [{snmp,[Item]}|Ext0]};
1335	       {_, {snmp,Snmp},Ext} ->
1336		   Rec#commit{ext = [{snmp,[Item|Snmp]}|Ext]}
1337	   end,
1338    prepare_node(Node, Storage, Items, Rec2, Kind);
1339prepare_node(Node, Storage, [Item | Items], Rec, Kind) when Kind /= schema ->
1340    Rec2 =
1341	case Storage of
1342	    ram_copies ->
1343		Rec#commit{ram_copies = [Item | Rec#commit.ram_copies]};
1344	    disc_copies ->
1345		Rec#commit{disc_copies = [Item | Rec#commit.disc_copies]};
1346	    disc_only_copies ->
1347		Rec#commit{disc_only_copies =
1348			   [Item | Rec#commit.disc_only_copies]};
1349	    {ext, Alias, Mod} ->
1350		Ext0 = Rec#commit.ext,
1351		case lists:keytake(ext_copies, 1, Ext0) of
1352		    false ->
1353			Rec#commit{ext = [{ext_copies, [{{ext,Alias,Mod}, Item}]}|Ext0]};
1354		    {_,{_,EC},Ext} ->
1355			Rec#commit{ext = [{ext_copies, [{{ext,Alias,Mod}, Item}|EC]}|Ext]}
1356		end
1357	end,
1358    prepare_node(Node, Storage, Items, Rec2, Kind);
1359prepare_node(_Node, _Storage, Items, Rec, Kind)
1360  when Kind == schema, Rec#commit.schema_ops == []  ->
1361    Rec#commit{schema_ops = Items};
1362prepare_node(_Node, _Storage, [], Rec, _Kind) ->
1363    Rec.
1364
1365%% multi_commit((Protocol, Tid, CommitRecords, Store)
1366%% Local work is always performed in users process
1367multi_commit(read_only, _Maj = [], Tid, CR, _Store) ->
1368    %% This featherweight commit protocol is used when no
1369    %% updates has been performed in the transaction.
1370
1371    {DiscNs, RamNs} = commit_nodes(CR, [], []),
1372    Msg = {Tid, simple_commit},
1373    rpc:abcast(DiscNs -- [node()], ?MODULE, Msg),
1374    rpc:abcast(RamNs -- [node()], ?MODULE, Msg),
1375    mnesia_recover:note_decision(Tid, committed),
1376    mnesia_locker:release_tid(Tid),
1377    ?MODULE ! {delete_transaction, Tid},
1378    do_commit;
1379
1380multi_commit(sym_trans, _Maj = [], Tid, CR, Store) ->
1381    %% This lightweight commit protocol is used when all
1382    %% the involved tables are replicated symetrically.
1383    %% Their storage types must match on each node.
1384    %%
1385    %% 1  Ask the other involved nodes if they want to commit
1386    %%    All involved nodes votes yes if they are up
1387    %% 2a Somebody has voted no
1388    %%    Tell all yes voters to do_abort
1389    %% 2b Everybody has voted yes
1390    %%    Tell everybody to do_commit. I.e. that they should
1391    %%    prepare the commit, log the commit record and
1392    %%    perform the updates.
1393    %%
1394    %%    The outcome is kept 3 minutes in the transient decision table.
1395    %%
1396    %% Recovery:
1397    %%    If somebody dies before the coordinator has
1398    %%    broadcasted do_commit, the transaction is aborted.
1399    %%
1400    %%    If a participant dies, the table load algorithm
1401    %%    ensures that the contents of the involved tables
1402    %%    are picked from another node.
1403    %%
1404    %%    If the coordinator dies, each participants checks
1405    %%    the outcome with all the others. If all are uncertain
1406    %%    about the outcome, the transaction is aborted. If
1407    %%    somebody knows the outcome the others will follow.
1408
1409    {DiscNs, RamNs} = commit_nodes(CR, [], []),
1410    Pending = mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs),
1411    ?ets_insert(Store, Pending),
1412
1413    {WaitFor, Local} = ask_commit(sym_trans, Tid, CR, DiscNs, RamNs),
1414    {Outcome, []} = rec_all(WaitFor, Tid, do_commit, []),
1415    ?eval_debug_fun({?MODULE, multi_commit_sym},
1416		    [{tid, Tid}, {outcome, Outcome}]),
1417    rpc:abcast(DiscNs -- [node()], ?MODULE, {Tid, Outcome}),
1418    rpc:abcast(RamNs -- [node()], ?MODULE, {Tid, Outcome}),
1419    case Outcome of
1420	do_commit ->
1421	    mnesia_recover:note_decision(Tid, committed),
1422	    do_dirty(Tid, Local),
1423	    mnesia_locker:release_tid(Tid),
1424	    ?MODULE ! {delete_transaction, Tid};
1425	{do_abort, _Reason} ->
1426	    mnesia_recover:note_decision(Tid, aborted)
1427    end,
1428    ?eval_debug_fun({?MODULE, multi_commit_sym, post},
1429		    [{tid, Tid}, {outcome, Outcome}]),
1430    Outcome;
1431
1432multi_commit(sync_sym_trans, _Maj = [], Tid, CR, Store) ->
1433    %%   This protocol is the same as sym_trans except that it
1434    %%   uses syncronized calls to disk_log and syncronized commits
1435    %%   when several nodes are involved.
1436
1437    {DiscNs, RamNs} = commit_nodes(CR, [], []),
1438    Pending = mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs),
1439    ?ets_insert(Store, Pending),
1440
1441    {WaitFor, Local} = ask_commit(sync_sym_trans, Tid, CR, DiscNs, RamNs),
1442    {Outcome, []} = rec_all(WaitFor, Tid, do_commit, []),
1443    ?eval_debug_fun({?MODULE, multi_commit_sym_sync},
1444		    [{tid, Tid}, {outcome, Outcome}]),
1445    [?ets_insert(Store, {waiting_for_commit_ack, Node}) || Node <- WaitFor],
1446    rpc:abcast(DiscNs -- [node()], ?MODULE, {Tid, Outcome}),
1447    rpc:abcast(RamNs -- [node()], ?MODULE, {Tid, Outcome}),
1448    case Outcome of
1449	do_commit ->
1450	    mnesia_recover:note_decision(Tid, committed),
1451	    mnesia_log:slog(Local),
1452	    do_commit(Tid, Local),
1453	    %% Just wait for completion result is ignore.
1454	    rec_all(WaitFor, Tid, ignore, []),
1455	    mnesia_locker:release_tid(Tid),
1456	    ?MODULE ! {delete_transaction, Tid};
1457	{do_abort, _Reason} ->
1458	    mnesia_recover:note_decision(Tid, aborted)
1459    end,
1460    ?eval_debug_fun({?MODULE, multi_commit_sym, post},
1461		    [{tid, Tid}, {outcome, Outcome}]),
1462    Outcome;
1463
1464multi_commit(Protocol, Majority, Tid, CR, Store)
1465  when Protocol =:= asym_trans; Protocol =:= sync_asym_trans ->
1466    %% This more expensive commit protocol is used when
1467    %% table definitions are changed (schema transactions).
1468    %% It is also used when the involved tables are
1469    %% replicated asymetrically. If the storage type differs
1470    %% on at least one node this protocol is used.
1471    %%
1472    %% 1 Ask the other involved nodes if they want to commit.
1473    %%   All involved nodes prepares the commit, logs a presume_abort
1474    %%   commit record and votes yes or no depending of the
1475    %%   outcome of the prepare. The preparation is also performed
1476    %%   by the coordinator.
1477    %%
1478    %% 2a Somebody has died or voted no
1479    %%    Tell all yes voters to do_abort
1480    %% 2b Everybody has voted yes
1481    %%    Put a unclear marker in the log.
1482    %%    Tell the others to pre_commit. I.e. that they should
1483    %%    put a unclear marker in the log and reply
1484    %%    acc_pre_commit when they are done.
1485    %%
1486    %% 3a Somebody died
1487    %%    Tell the remaining participants to do_abort
1488    %% 3b Everybody has replied acc_pre_commit
1489    %%    Tell everybody to committed. I.e that they should
1490    %%    put a committed marker in the log, perform the updates
1491    %%    and reply done_commit when they are done. The coordinator
1492    %%    must wait with putting his committed marker inte the log
1493    %%    until the committed has been sent to all the others.
1494    %%    Then he performs local commit before collecting replies.
1495    %%
1496    %% 4  Everybody has either died or replied done_commit
1497    %%    Return to the caller.
1498    %%
1499    %% Recovery:
1500    %%    If the coordinator dies, the participants (and
1501    %%    the coordinator when he starts again) must do
1502    %%    the following:
1503    %%
1504    %%    If we have no unclear marker in the log we may
1505    %%    safely abort, since we know that nobody may have
1506    %%    decided to commit yet.
1507    %%
1508    %%    If we have a committed marker in the log we may
1509    %%    safely commit since we know that everybody else
1510    %%    also will come to this conclusion.
1511    %%
1512    %%    If we have a unclear marker but no committed
1513    %%    in the log we are uncertain about the real outcome
1514    %%    of the transaction and must ask the others before
1515    %%    we can decide what to do. If someone knows the
1516    %%    outcome we will do the same. If nobody knows, we
1517    %%    will wait for the remaining involved nodes to come
1518    %%    up. When all involved nodes are up and uncertain,
1519    %%    we decide to commit (first put a committed marker
1520    %%    in the log, then do the updates).
1521
1522    D = #decision{tid = Tid, outcome = presume_abort},
1523    {D2, CR2} = commit_decision(D, CR, [], []),
1524    DiscNs = D2#decision.disc_nodes,
1525    RamNs = D2#decision.ram_nodes,
1526    case have_majority(Majority, DiscNs ++ RamNs) of
1527	ok  -> ok;
1528	{error, Tab} -> mnesia:abort({no_majority, Tab})
1529    end,
1530    Pending = mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs),
1531    ?ets_insert(Store, Pending),
1532    {WaitFor, Local} = ask_commit(Protocol, Tid, CR2, DiscNs, RamNs),
1533    SchemaPrep = ?CATCH(mnesia_schema:prepare_commit(Tid, Local, {coord, WaitFor})),
1534    {Votes, Pids} = rec_all(WaitFor, Tid, do_commit, []),
1535
1536    ?eval_debug_fun({?MODULE, multi_commit_asym_got_votes},
1537		    [{tid, Tid}, {votes, Votes}]),
1538    case Votes of
1539	do_commit ->
1540	    case SchemaPrep of
1541		{_Modified, C = #commit{}, DumperMode} ->
1542		    mnesia_log:log(C), % C is not a binary
1543		    ?eval_debug_fun({?MODULE, multi_commit_asym_log_commit_rec},
1544				    [{tid, Tid}]),
1545
1546		    D3 = C#commit.decision,
1547		    D4 = D3#decision{outcome = unclear},
1548		    mnesia_recover:log_decision(D4),
1549		    ?eval_debug_fun({?MODULE, multi_commit_asym_log_commit_dec},
1550				    [{tid, Tid}]),
1551		    tell_participants(Pids, {Tid, pre_commit}),
1552		    %% Now we are uncertain and we do not know
1553		    %% if all participants have logged that
1554		    %% they are uncertain or not
1555		    rec_acc_pre_commit(Pids, Tid, Store, {C,Local},
1556				       do_commit, DumperMode, [], []);
1557		{'EXIT', Reason} ->
1558		    %% The others have logged the commit
1559		    %% record but they are not uncertain
1560		    mnesia_recover:note_decision(Tid, aborted),
1561		    ?eval_debug_fun({?MODULE, multi_commit_asym_prepare_exit},
1562				    [{tid, Tid}]),
1563		    tell_participants(Pids, {Tid, {do_abort, Reason}}),
1564		    do_abort(Tid, Local),
1565		    {do_abort, Reason}
1566	    end;
1567
1568	{do_abort, Reason} ->
1569	    %% The others have logged the commit
1570	    %% record but they are not uncertain
1571	    mnesia_recover:note_decision(Tid, aborted),
1572	    ?eval_debug_fun({?MODULE, multi_commit_asym_do_abort}, [{tid, Tid}]),
1573	    tell_participants(Pids, {Tid, {do_abort, Reason}}),
1574	    do_abort(Tid, Local),
1575	    {do_abort, Reason}
1576    end.
1577
1578%% Returns do_commit or {do_abort, Reason}
1579rec_acc_pre_commit([Pid | Tail], Tid, Store, Commit, Res, DumperMode,
1580		   GoodPids, AckPids) ->
1581    receive
1582	{?MODULE, _, {acc_pre_commit, Tid, Pid, true}} ->
1583	    rec_acc_pre_commit(Tail, Tid, Store, Commit, Res, DumperMode,
1584			       [Pid | GoodPids], [Pid | AckPids]);
1585
1586	{?MODULE, _, {acc_pre_commit, Tid, Pid, false}} ->
1587	    rec_acc_pre_commit(Tail, Tid, Store, Commit, Res, DumperMode,
1588			       [Pid | GoodPids], AckPids);
1589
1590	{?MODULE, _, {acc_pre_commit, Tid, Pid}} ->
1591	    %% Kept for backwards compatibility. Remove after Mnesia 4.x
1592	    rec_acc_pre_commit(Tail, Tid, Store, Commit, Res, DumperMode,
1593			       [Pid | GoodPids], [Pid | AckPids]);
1594	{?MODULE, _, {do_abort, Tid, Pid, _Reason}} ->
1595	    AbortRes = {do_abort, {bad_commit, node(Pid)}},
1596	    rec_acc_pre_commit(Tail, Tid, Store, Commit, AbortRes, DumperMode,
1597			       GoodPids, AckPids);
1598	{mnesia_down, Node} when Node == node(Pid) ->
1599	    AbortRes = {do_abort, {bad_commit, Node}},
1600	    ?SAFE(Pid ! {Tid, AbortRes}),  %% Tell him that he has died
1601	    rec_acc_pre_commit(Tail, Tid, Store, Commit, AbortRes, DumperMode,
1602			       GoodPids, AckPids)
1603    end;
1604rec_acc_pre_commit([], Tid, Store, {Commit,OrigC}, Res, DumperMode, GoodPids, AckPids) ->
1605    D = Commit#commit.decision,
1606    case Res of
1607	do_commit ->
1608	    %% Now everybody knows that the others
1609	    %% has voted yes. We also know that
1610	    %% everybody are uncertain.
1611	    prepare_sync_schema_commit(Store, AckPids),
1612	    tell_participants(GoodPids, {Tid, committed}),
1613	    D2 = D#decision{outcome = committed},
1614	    mnesia_recover:log_decision(D2),
1615            ?eval_debug_fun({?MODULE, rec_acc_pre_commit_log_commit},
1616			    [{tid, Tid}]),
1617
1618	    %% Now we have safely logged committed
1619	    %% and we can recover without asking others
1620	    do_commit(Tid, Commit, DumperMode),
1621            ?eval_debug_fun({?MODULE, rec_acc_pre_commit_done_commit},
1622			    [{tid, Tid}]),
1623	    sync_schema_commit(Tid, Store, AckPids),
1624	    mnesia_locker:release_tid(Tid),
1625	    ?MODULE ! {delete_transaction, Tid};
1626
1627	{do_abort, Reason} ->
1628	    tell_participants(GoodPids, {Tid, {do_abort, Reason}}),
1629	    D2 = D#decision{outcome = aborted},
1630	    mnesia_recover:log_decision(D2),
1631            ?eval_debug_fun({?MODULE, rec_acc_pre_commit_log_abort},
1632			    [{tid, Tid}]),
1633	    do_abort(Tid, OrigC),
1634	    ?eval_debug_fun({?MODULE, rec_acc_pre_commit_done_abort},
1635			    [{tid, Tid}])
1636    end,
1637    Res.
1638
1639%% Note all nodes in case of mnesia_down mgt
1640%% sync_schema_commit is (ab)used for sync_asym_trans as well.
1641prepare_sync_schema_commit(_Store, []) ->
1642    ok;
1643prepare_sync_schema_commit(Store, [Pid | Pids]) ->
1644    ?ets_insert(Store, {waiting_for_commit_ack, node(Pid)}),
1645    prepare_sync_schema_commit(Store, Pids).
1646
1647sync_schema_commit(_Tid, _Store, []) ->
1648    ok;
1649sync_schema_commit(Tid, Store, [Pid | Tail]) ->
1650    receive
1651	{?MODULE, _, {schema_commit, Tid, Pid}} ->
1652	    ?ets_match_delete(Store, {waiting_for_commit_ack, node(Pid)}),
1653	    sync_schema_commit(Tid, Store, Tail);
1654
1655	{mnesia_down, Node} when Node == node(Pid) ->
1656	    ?ets_match_delete(Store, {waiting_for_commit_ack, Node}),
1657	    sync_schema_commit(Tid, Store, Tail)
1658    end.
1659
1660tell_participants([Pid | Pids], Msg) ->
1661    Pid ! Msg,
1662    tell_participants(Pids, Msg);
1663tell_participants([], _Msg) ->
1664    ok.
1665
1666-spec commit_participant(_, _, _, _, _, _) -> no_return().
1667%% Trap exit because we can get a shutdown from application manager
1668commit_participant(Protocol, Coord, Tid, Bin, DiscNs, RamNs) when is_binary(Bin) ->
1669    process_flag(trap_exit, true),
1670    Commit = binary_to_term(Bin),
1671    commit_participant(Protocol, Coord, Tid, Bin, Commit, DiscNs, RamNs);
1672commit_participant(Protocol, Coord, Tid, C = #commit{}, DiscNs, RamNs) ->
1673    process_flag(trap_exit, true),
1674    commit_participant(Protocol, Coord, Tid, C, C, DiscNs, RamNs).
1675
1676commit_participant(Protocol, Coord, Tid, Bin, C0, DiscNs, _RamNs) ->
1677    ?eval_debug_fun({?MODULE, commit_participant, pre}, [{tid, Tid}]),
1678    try mnesia_schema:prepare_commit(Tid, C0, {part, Coord}) of
1679	{Modified, C = #commit{}, DumperMode} ->
1680	    %% If we cannot find any local unclear decision
1681	    %% we should presume abort at startup recovery
1682	    case lists:member(node(), DiscNs) of
1683		false ->
1684		    ignore;
1685		true ->
1686		    case Modified of
1687			false -> mnesia_log:log(Bin);
1688			true  -> mnesia_log:log(C)
1689		    end
1690	    end,
1691	    ?eval_debug_fun({?MODULE, commit_participant, vote_yes},
1692			    [{tid, Tid}]),
1693	    reply(Coord, {vote_yes, Tid, self()}),
1694
1695	    receive
1696		{Tid, pre_commit} ->
1697		    D = C#commit.decision,
1698		    mnesia_recover:log_decision(D#decision{outcome = unclear}),
1699		    ?eval_debug_fun({?MODULE, commit_participant, pre_commit},
1700				    [{tid, Tid}]),
1701		    ExpectAck = C#commit.schema_ops /= []
1702                        orelse Protocol =:= sync_asym_trans,
1703		    reply(Coord, {acc_pre_commit, Tid, self(), ExpectAck}),
1704
1705		    %% Now we are vulnerable for failures, since
1706		    %% we cannot decide without asking others
1707		    receive
1708			{Tid, committed} ->
1709			    mnesia_recover:log_decision(D#decision{outcome = committed}),
1710			    ?eval_debug_fun({?MODULE, commit_participant, log_commit},
1711					    [{tid, Tid}]),
1712			    do_commit(Tid, C, DumperMode),
1713			    case ExpectAck of
1714				false -> ignore;
1715				true -> reply(Coord, {schema_commit, Tid, self()})
1716			    end,
1717			    ?eval_debug_fun({?MODULE, commit_participant, do_commit},
1718					    [{tid, Tid}]);
1719
1720			{Tid, {do_abort, _Reason}} ->
1721			    mnesia_recover:log_decision(D#decision{outcome = aborted}),
1722			    ?eval_debug_fun({?MODULE, commit_participant, log_abort},
1723					    [{tid, Tid}]),
1724			    mnesia_schema:undo_prepare_commit(Tid, C0),
1725			    ?eval_debug_fun({?MODULE, commit_participant, undo_prepare},
1726					    [{tid, Tid}]);
1727
1728			{'EXIT', _MnesiaTM, Reason} ->
1729			    reply(Coord, {do_abort, Tid, self(), {bad_commit,Reason}}),
1730			    mnesia_recover:log_decision(D#decision{outcome = aborted}),
1731			    mnesia_schema:undo_prepare_commit(Tid, C0);
1732
1733			Msg ->
1734			    verbose("** ERROR ** commit_participant ~p, got unexpected msg: ~tp~n",
1735				    [Tid, Msg])
1736		    end;
1737		{Tid, {do_abort, Reason}} ->
1738		    reply(Coord, {do_abort, Tid, self(), Reason}),
1739		    mnesia_schema:undo_prepare_commit(Tid, C0),
1740		    ?eval_debug_fun({?MODULE, commit_participant, pre_commit_undo_prepare},
1741				    [{tid, Tid}]);
1742
1743		{'EXIT', _, Reason} ->
1744		    reply(Coord, {do_abort, Tid, self(), {bad_commit,Reason}}),
1745		    mnesia_schema:undo_prepare_commit(Tid, C0),
1746		    ?eval_debug_fun({?MODULE, commit_participant, pre_commit_undo_prepare}, [{tid, Tid}]);
1747
1748		Msg ->
1749		    reply(Coord, {do_abort, Tid, self(), {bad_commit,internal}}),
1750		    verbose("** ERROR ** commit_participant ~p, got unexpected msg: ~tp~n",
1751			    [Tid, Msg])
1752	    end
1753    catch _:Reason ->
1754	    ?eval_debug_fun({?MODULE, commit_participant, vote_no},
1755			    [{tid, Tid}]),
1756	    reply(Coord, {vote_no, Tid, Reason}),
1757	    mnesia_schema:undo_prepare_commit(Tid, C0)
1758    end,
1759    mnesia_locker:release_tid(Tid),
1760    ?MODULE ! {delete_transaction, Tid},
1761    unlink(whereis(?MODULE)),
1762    exit(normal).
1763
1764do_abort(Tid, Bin) when is_binary(Bin) ->
1765    %% Possible optimization:
1766    %% If we want we could pass arround a flag
1767    %% that tells us whether the binary contains
1768    %% schema ops or not. Only if the binary
1769    %% contains schema ops there are meningful
1770    %% unpack the binary and perform
1771    %% mnesia_schema:undo_prepare_commit/1.
1772    do_abort(Tid, binary_to_term(Bin));
1773do_abort(Tid, Commit) ->
1774    mnesia_schema:undo_prepare_commit(Tid, Commit),
1775    Commit.
1776
1777do_dirty(Tid, Commit) when Commit#commit.schema_ops == [] ->
1778    mnesia_log:log(Commit),
1779    do_commit(Tid, Commit).
1780
1781%% do_commit(Tid, CommitRecord)
1782do_commit(Tid, Bin) when is_binary(Bin) ->
1783    do_commit(Tid, binary_to_term(Bin));
1784do_commit(Tid, C) ->
1785    do_commit(Tid, C, optional).
1786
1787do_commit(Tid, Bin, DumperMode) when is_binary(Bin) ->
1788    do_commit(Tid, binary_to_term(Bin), DumperMode);
1789do_commit(Tid, C, DumperMode) ->
1790    mnesia_dumper:update(Tid, C#commit.schema_ops, DumperMode),
1791    R  = do_snmp(Tid, proplists:get_value(snmp, C#commit.ext, [])),
1792    R2 = do_update(Tid, ram_copies, C#commit.ram_copies, R),
1793    R3 = do_update(Tid, disc_copies, C#commit.disc_copies, R2),
1794    R4 = do_update(Tid, disc_only_copies, C#commit.disc_only_copies, R3),
1795    R5 = do_update_ext(Tid, C#commit.ext, R4),
1796    mnesia_subscr:report_activity(Tid),
1797    R5.
1798
1799%% This could/should be optimized
1800do_update_ext(_Tid, [], OldRes) -> OldRes;
1801do_update_ext(Tid, Ext, OldRes) ->
1802    case lists:keyfind(ext_copies, 1, Ext) of
1803	false -> OldRes;
1804	{_, Ops} ->
1805	    Do = fun({{ext, _,_} = Storage, Op}, R) ->
1806			 do_update(Tid, Storage, [Op], R)
1807		 end,
1808	    lists:foldl(Do, OldRes, Ops)
1809    end.
1810
1811%% Update the items
1812do_update(Tid, Storage, [Op | Ops], OldRes) ->
1813    try do_update_op(Tid, Storage, Op) of
1814	ok ->     do_update(Tid, Storage, Ops, OldRes);
1815	NewRes -> do_update(Tid, Storage, Ops, NewRes)
1816    catch _:Reason:ST ->
1817	    %% This may only happen when we recently have
1818	    %% deleted our local replica, changed storage_type
1819	    %% or transformed table
1820	    %% BUGBUG: Updates may be lost if storage_type is changed.
1821	    %%         Determine actual storage type and try again.
1822	    %% BUGBUG: Updates may be lost if table is transformed.
1823	    verbose("do_update in ~w failed: ~tp -> {'EXIT', ~tp}~n",
1824		    [Tid, Op, {Reason, ST}]),
1825	    do_update(Tid, Storage, Ops, OldRes)
1826    end;
1827do_update(_Tid, _Storage, [], Res) ->
1828    Res.
1829
1830do_update_op(Tid, Storage, {{Tab, K}, Obj, write}) ->
1831    commit_write(?catch_val({Tab, commit_work}), Tid, Storage,
1832		 Tab, K, Obj, undefined),
1833    mnesia_lib:db_put(Storage, Tab, Obj);
1834
1835do_update_op(Tid, Storage, {{Tab, K}, Val, delete}) ->
1836    commit_delete(?catch_val({Tab, commit_work}), Tid, Storage, Tab, K, Val, undefined),
1837    mnesia_lib:db_erase(Storage, Tab, K);
1838
1839do_update_op(Tid, Storage, {{Tab, K}, {RecName, Incr}, update_counter}) ->
1840    {NewObj, OldObjs} =
1841        try
1842	    NewVal = mnesia_lib:db_update_counter(Storage, Tab, K, Incr),
1843	    true = is_integer(NewVal) andalso (NewVal >= 0),
1844	    {{RecName, K, NewVal}, [{RecName, K, NewVal - Incr}]}
1845	catch error:_ when Incr > 0 ->
1846                New = {RecName, K, Incr},
1847                mnesia_lib:db_put(Storage, Tab, New),
1848                {New, []};
1849	      error:_ ->
1850		Zero = {RecName, K, 0},
1851		mnesia_lib:db_put(Storage, Tab, Zero),
1852		{Zero, []}
1853        end,
1854    commit_update(?catch_val({Tab, commit_work}), Tid, Storage, Tab,
1855		  K, NewObj, OldObjs),
1856    element(3, NewObj);
1857
1858do_update_op(Tid, Storage, {{Tab, Key}, Obj, delete_object}) ->
1859    commit_del_object(?catch_val({Tab, commit_work}),
1860		      Tid, Storage, Tab, Key, Obj),
1861    mnesia_lib:db_match_erase(Storage, Tab, Obj);
1862
1863do_update_op(Tid, Storage, {{Tab, Key}, Obj, clear_table}) ->
1864    commit_clear(?catch_val({Tab, commit_work}), Tid, Storage, Tab, Key, Obj),
1865    mnesia_lib:db_match_erase(Storage, Tab, Obj).
1866
1867commit_write([], _, _, _, _, _, _) -> ok;
1868commit_write([{checkpoints, CpList}|R], Tid, Storage, Tab, K, Obj, Old) ->
1869    mnesia_checkpoint:tm_retain(Tid, Tab, K, write, CpList),
1870    commit_write(R, Tid, Storage, Tab, K, Obj, Old);
1871commit_write([H|R], Tid, Storage, Tab, K, Obj, Old)
1872  when element(1, H) == subscribers ->
1873    mnesia_subscr:report_table_event(H, Tab, Tid, Obj, write, Old),
1874    commit_write(R, Tid, Storage, Tab, K, Obj, Old);
1875commit_write([H|R], Tid, Storage, Tab, K, Obj, Old)
1876  when element(1, H) == index ->
1877    mnesia_index:add_index(H, Storage, Tab, K, Obj, Old),
1878    commit_write(R, Tid, Storage, Tab, K, Obj, Old).
1879
1880commit_update([], _, _, _, _, _, _) -> ok;
1881commit_update([{checkpoints, CpList}|R], Tid, Storage, Tab, K, Obj, _) ->
1882    Old = mnesia_checkpoint:tm_retain(Tid, Tab, K, write, CpList),
1883    commit_update(R, Tid, Storage, Tab, K, Obj, Old);
1884commit_update([H|R], Tid, Storage, Tab, K, Obj, Old)
1885  when element(1, H) == subscribers ->
1886    mnesia_subscr:report_table_event(H, Tab, Tid, Obj, write, Old),
1887    commit_update(R, Tid, Storage, Tab, K, Obj, Old);
1888commit_update([H|R], Tid,Storage,  Tab, K, Obj, Old)
1889  when element(1, H) == index ->
1890    mnesia_index:add_index(H, Storage, Tab, K, Obj, Old),
1891    commit_update(R, Tid, Storage, Tab, K, Obj, Old).
1892
1893commit_delete([], _, _, _, _, _, _) ->  ok;
1894commit_delete([{checkpoints, CpList}|R], Tid, Storage, Tab, K, Obj, _) ->
1895    Old = mnesia_checkpoint:tm_retain(Tid, Tab, K, delete, CpList),
1896    commit_delete(R, Tid, Storage, Tab, K, Obj, Old);
1897commit_delete([H|R], Tid, Storage, Tab, K, Obj, Old)
1898  when element(1, H) == subscribers ->
1899    mnesia_subscr:report_table_event(H, Tab, Tid, Obj, delete, Old),
1900    commit_delete(R, Tid, Storage, Tab, K, Obj, Old);
1901commit_delete([H|R], Tid, Storage, Tab, K, Obj, Old)
1902  when element(1, H) == index ->
1903    mnesia_index:delete_index(H, Storage, Tab, K),
1904    commit_delete(R, Tid, Storage, Tab, K, Obj, Old).
1905
1906commit_del_object([], _, _, _, _, _) -> ok;
1907commit_del_object([{checkpoints, CpList}|R], Tid, Storage, Tab, K, Obj) ->
1908    mnesia_checkpoint:tm_retain(Tid, Tab, K, delete_object, CpList),
1909    commit_del_object(R, Tid, Storage, Tab, K, Obj);
1910commit_del_object([H|R], Tid, Storage, Tab, K, Obj) when element(1, H) == subscribers ->
1911    mnesia_subscr:report_table_event(H, Tab, Tid, Obj, delete_object),
1912    commit_del_object(R, Tid, Storage, Tab, K, Obj);
1913commit_del_object([H|R], Tid, Storage, Tab, K, Obj) when element(1, H) == index ->
1914    mnesia_index:del_object_index(H, Storage, Tab, K, Obj),
1915    commit_del_object(R, Tid, Storage, Tab, K, Obj).
1916
1917commit_clear([], _, _, _, _, _) ->  ok;
1918commit_clear([{checkpoints, CpList}|R], Tid, Storage, Tab, K, Obj) ->
1919    mnesia_checkpoint:tm_retain(Tid, Tab, K, clear_table, CpList),
1920    commit_clear(R, Tid, Storage, Tab, K, Obj);
1921commit_clear([H|R], Tid, Storage, Tab, K, Obj)
1922  when element(1, H) == subscribers ->
1923    mnesia_subscr:report_table_event(H, Tab, Tid, Obj, clear_table, undefined),
1924    commit_clear(R, Tid, Storage, Tab, K, Obj);
1925commit_clear([H|R], Tid, Storage, Tab, K, Obj)
1926  when element(1, H) == index ->
1927    mnesia_index:clear_index(H, Tab, K, Obj),
1928    commit_clear(R, Tid, Storage, Tab, K, Obj).
1929
1930do_snmp(_, []) ->   ok;
1931do_snmp(Tid, [Head|Tail]) ->
1932    try mnesia_snmp_hook:update(Head)
1933    catch _:Reason:ST ->
1934	    %% This should only happen when we recently have
1935	    %% deleted our local replica or recently deattached
1936	    %% the snmp table
1937	    verbose("do_snmp in ~w failed: ~tp -> {'EXIT', ~tp}~n",
1938		    [Tid, Head, {Reason, ST}])
1939    end,
1940    do_snmp(Tid, Tail).
1941
1942commit_nodes([C | Tail], AccD, AccR) ->
1943    case C of
1944	#commit{disc_copies=[], disc_only_copies=[], schema_ops=[], ext=Ext} ->
1945	    case lists:keyfind(ext_copies, 1, Ext) of
1946		false -> commit_nodes(Tail, AccD, [C#commit.node | AccR]);
1947		_ -> commit_nodes(Tail, [C#commit.node | AccD], AccR)
1948	    end;
1949	_ ->
1950	    commit_nodes(Tail, [C#commit.node | AccD], AccR)
1951    end;
1952commit_nodes([], AccD, AccR) ->
1953    {AccD, AccR}.
1954
1955commit_decision(D, [C | Tail], AccD, AccR) ->
1956    N = C#commit.node,
1957    {D2, Tail2} =
1958	case C of
1959	    #commit{disc_copies=[], disc_only_copies=[], schema_ops=[], ext=Ext} ->
1960		case lists:keyfind(ext_copies, 1, Ext) of
1961		    false -> commit_decision(D, Tail, AccD, [N | AccR]);
1962		    _     -> commit_decision(D, Tail, [N | AccD], AccR)
1963		end;
1964	    #commit{schema_ops=[]} ->
1965		commit_decision(D, Tail, [N | AccD], AccR);
1966	    #commit{schema_ops=Ops} ->
1967		case ram_only_ops(N, Ops) of
1968		    true ->  commit_decision(D, Tail, AccD, [N | AccR]);
1969		    false -> commit_decision(D, Tail, [N | AccD], AccR)
1970		end
1971	end,
1972    {D2, [C#commit{decision = D2} | Tail2]};
1973commit_decision(D, [], AccD, AccR) ->
1974    {D#decision{disc_nodes = AccD, ram_nodes = AccR}, []}.
1975
1976ram_only_ops(N, [{op, change_table_copy_type, N, _FromS, _ToS, Cs} | _Ops ]) ->
1977    case lists:member({name, schema}, Cs) of
1978	true ->
1979	    %% We always use disk if change type of the schema
1980	    false;
1981	false ->
1982	    not lists:member(N, val({schema, disc_copies}))
1983    end;
1984
1985ram_only_ops(N, _Ops) ->
1986    not lists:member(N, val({schema, disc_copies})).
1987
1988%% Returns {WaitFor, Res}
1989sync_send_dirty(Tid, [Head | Tail], Tab, WaitFor) ->
1990    Node = Head#commit.node,
1991    if
1992	Node == node() ->
1993	    {WF, _} = sync_send_dirty(Tid, Tail, Tab, WaitFor),
1994	    Res =  do_dirty(Tid, Head),
1995	    {WF, Res};
1996	true ->
1997	    {?MODULE, Node} ! {self(), {sync_dirty, Tid, Head, Tab}},
1998	    sync_send_dirty(Tid, Tail, Tab, [Node | WaitFor])
1999    end;
2000sync_send_dirty(_Tid, [], _Tab, WaitFor) ->
2001    {WaitFor, {'EXIT', {aborted, {node_not_running, WaitFor}}}}.
2002
2003%% Returns {WaitFor, Res}
2004async_send_dirty(_Tid, _Nodes, Tab, nowhere) ->
2005    {[], {'EXIT', {aborted, {no_exists, Tab}}}};
2006async_send_dirty(Tid, Nodes, Tab, ReadNode) ->
2007    async_send_dirty(Tid, Nodes, Tab, ReadNode, [], ok).
2008
2009async_send_dirty(Tid, [Head | Tail], Tab, ReadNode, WaitFor, Res) ->
2010    Node = Head#commit.node,
2011    if
2012	ReadNode == Node, Node == node() ->
2013	    NewRes =  do_dirty(Tid, Head),
2014	    async_send_dirty(Tid, Tail, Tab, ReadNode, WaitFor, NewRes);
2015	ReadNode == Node ->
2016	    {?MODULE, Node} ! {self(), {sync_dirty, Tid, Head, Tab}},
2017	    NewRes = {'EXIT', {aborted, {node_not_running, Node}}},
2018	    async_send_dirty(Tid, Tail, Tab, ReadNode, [Node | WaitFor], NewRes);
2019	true ->
2020	    {?MODULE, Node} ! {self(), {async_dirty, Tid, Head, Tab}},
2021	    async_send_dirty(Tid, Tail, Tab, ReadNode, WaitFor, Res)
2022    end;
2023async_send_dirty(_Tid, [], _Tab, _ReadNode, WaitFor, Res) ->
2024    {WaitFor, Res}.
2025
2026rec_dirty([Node | Tail], Res) when Node /= node() ->
2027    NewRes = get_dirty_reply(Node, Res),
2028    rec_dirty(Tail, NewRes);
2029rec_dirty([], Res) ->
2030    Res.
2031
2032get_dirty_reply(Node, Res) ->
2033    receive
2034	{?MODULE, Node, {'EXIT', Reason}} ->
2035	    {'EXIT', {aborted, {badarg, Reason}}};
2036	{?MODULE, Node, {dirty_res, ok}} ->
2037	    case Res of
2038		{'EXIT', {aborted, {node_not_running, _Node}}} ->
2039		    ok;
2040		_ ->
2041		    %% Prioritize bad results, but node_not_running
2042		    Res
2043	    end;
2044	{?MODULE, Node, {dirty_res, Reply}} ->
2045	    Reply;
2046	{mnesia_down, Node} ->
2047	    case get(mnesia_activity_state) of
2048		{_, Tid, _Ts} when element(1,Tid) == tid ->
2049		    %% Hmm dirty called inside a transaction, to avoid
2050		    %% hanging transaction we need to restart the transaction
2051		    mnesia:abort({node_not_running, Node});
2052		_ ->
2053		    %% It's ok to ignore mnesia_down's since we will make
2054		    %% the replicas consistent again when Node is started
2055		    Res
2056	    end
2057    after 1000 ->
2058	    case lists:member(Node, val({current, db_nodes})) of
2059		true ->
2060		    get_dirty_reply(Node, Res);
2061		false ->
2062		    Res
2063	    end
2064    end.
2065
2066%% Assume that CommitRecord is no binary
2067%% Return {Res, Pids}
2068ask_commit(Protocol, Tid, CR, DiscNs, RamNs) ->
2069    ask_commit(Protocol, Tid, CR, DiscNs, RamNs, [], no_local).
2070
2071ask_commit(Protocol, Tid, [Head | Tail], DiscNs, RamNs, WaitFor, Local) ->
2072    Node = Head#commit.node,
2073    if
2074	Node == node() ->
2075	    ask_commit(Protocol, Tid, Tail, DiscNs, RamNs, WaitFor, Head);
2076	true ->
2077	    Msg = {ask_commit, convert_old(Protocol, Node), Tid, Head, DiscNs, RamNs},
2078	    {?MODULE, Node} ! {self(), Msg},
2079	    ask_commit(Protocol, Tid, Tail, DiscNs, RamNs, [Node | WaitFor], Local)
2080    end;
2081ask_commit(_Protocol, _Tid, [], _DiscNs, _RamNs, WaitFor, Local) ->
2082    {WaitFor, Local}.
2083
2084convert_old(sync_asym_trans, Node) ->
2085    case ?catch_val({protocol, Node}) of
2086        {{8,3}, _} -> asym_trans;
2087        _ -> sync_asym_trans
2088    end;
2089convert_old(Protocol, _) ->
2090    Protocol.
2091
2092new_cr_format(#commit{ext=[]}=Cr) -> Cr;
2093new_cr_format(#commit{ext=[{_,_}|_]}=Cr) -> Cr;
2094new_cr_format(#commit{ext=Snmp}=Cr) ->
2095    Cr#commit{ext=[{snmp,Snmp}]}.
2096
2097rec_all([Node | Tail], Tid, Res, Pids) ->
2098    receive
2099	{?MODULE, Node, {vote_yes, Tid}} ->
2100	    rec_all(Tail, Tid, Res, Pids);
2101	{?MODULE, Node, {vote_yes, Tid, Pid}} ->
2102	    rec_all(Tail, Tid, Res, [Pid | Pids]);
2103	{?MODULE, Node, {vote_no, Tid, Reason}} ->
2104	    rec_all(Tail, Tid, {do_abort, Reason}, Pids);
2105	{?MODULE, Node, {committed, Tid}} ->
2106	    rec_all(Tail, Tid, Res, Pids);
2107	{?MODULE, Node, {aborted, Tid}} ->
2108	    rec_all(Tail, Tid, Res, Pids);
2109
2110	{mnesia_down, Node} ->
2111	    %% Make sure that mnesia_tm knows it has died
2112	    %% it may have been restarted
2113	    Abort = {do_abort, {bad_commit, Node}},
2114	    ?SAFE({?MODULE, Node} ! {Tid, Abort}),
2115	    rec_all(Tail, Tid, Abort, Pids)
2116    end;
2117rec_all([], _Tid, Res, Pids) ->
2118    {Res, Pids}.
2119
2120get_transactions() ->
2121    {info, Participant, Coordinator} = req(info),
2122    lists:map(fun({Tid, _Tabs}) ->
2123		      Status = tr_status(Tid,Participant),
2124		      {Tid#tid.counter, Tid#tid.pid, Status}
2125	      end,Coordinator).
2126
2127tr_status(Tid,Participant) ->
2128    case lists:keymember(Tid, 1, Participant) of
2129	true -> participant;
2130	false  -> coordinator
2131    end.
2132
2133get_info(Timeout) ->
2134    case whereis(?MODULE) of
2135	undefined ->
2136	    {timeout, Timeout};
2137	Pid ->
2138	    Pid ! {self(), info},
2139	    receive
2140		{?MODULE, _, {info, Part, Coord}} ->
2141		    {info, Part, Coord}
2142	    after Timeout ->
2143		    {timeout, Timeout}
2144	    end
2145    end.
2146
2147display_info(Stream, {timeout, T}) ->
2148    io:format(Stream, "---> No info about coordinator and participant transactions, "
2149	      "timeout ~p <--- ~n", [T]);
2150
2151display_info(Stream, {info, Part, Coord}) ->
2152    io:format(Stream, "---> Participant transactions <--- ~n", []),
2153    lists:foreach(fun(P) -> pr_participant(Stream, P) end, Part),
2154    io:format(Stream, "---> Coordinator transactions <---~n", []),
2155    lists:foreach(fun({Tid, _Tabs}) -> pr_tid(Stream, Tid) end, Coord).
2156
2157pr_participant(Stream, P) ->
2158    Commit0 = P#participant.commit,
2159    Commit =
2160	if
2161	    is_binary(Commit0) -> binary_to_term(Commit0);
2162	    true -> Commit0
2163	end,
2164    pr_tid(Stream, P#participant.tid),
2165    io:format(Stream, "with participant objects ~tp~n", [Commit]).
2166
2167
2168pr_tid(Stream, Tid) ->
2169    io:format(Stream, "Tid: ~p (owned by ~p) ~n",
2170	      [Tid#tid.counter, Tid#tid.pid]).
2171
2172info(Serial) ->
2173    io:format( "Info about transaction with serial == ~p~n", [Serial]),
2174    {info, Participant, Trs} = req(info),
2175    search_pr_participant(Serial, Participant),
2176    search_pr_coordinator(Serial, Trs).
2177
2178
2179search_pr_coordinator(_S, []) -> no;
2180search_pr_coordinator(S, [{Tid, _Ts}|Tail]) ->
2181    case Tid#tid.counter of
2182	S ->
2183	    io:format( "Tid is coordinator, owner == \n", []),
2184	    display_pid_info(Tid#tid.pid),
2185	    search_pr_coordinator(S, Tail);
2186	_ ->
2187	    search_pr_coordinator(S, Tail)
2188    end.
2189
2190search_pr_participant(_S, []) ->
2191    false;
2192search_pr_participant(S, [ P | Tail]) ->
2193    Tid = P#participant.tid,
2194    Commit0 = P#participant.commit,
2195    if
2196	Tid#tid.counter == S ->
2197	    io:format( "Tid is participant to commit, owner == \n", []),
2198	    Pid = Tid#tid.pid,
2199	    display_pid_info(Pid),
2200	    io:format( "Tid wants to write objects \n",[]),
2201	    Commit =
2202		if
2203		    is_binary(Commit0) -> binary_to_term(Commit0);
2204		    true -> Commit0
2205		end,
2206
2207	    io:format("~tp~n", [Commit]),
2208	    search_pr_participant(S,Tail);  %% !!!!!
2209	true ->
2210	    search_pr_participant(S, Tail)
2211    end.
2212
2213display_pid_info(Pid) ->
2214    case rpc:pinfo(Pid) of
2215	undefined ->
2216	    io:format( "Dead process \n");
2217	Info ->
2218	    Call = fetch(initial_call, Info),
2219	    Curr = case fetch(current_function, Info) of
2220		       {Mod,F,Args} when is_list(Args) ->
2221			   {Mod,F,length(Args)};
2222		       Other ->
2223			   Other
2224		   end,
2225	    Reds  = fetch(reductions, Info),
2226	    LM = fetch(message_queue_len, Info),
2227	    pformat(io_lib:format("~p", [Pid]),
2228		    io_lib:format("~tp", [Call]),
2229		    io_lib:format("~tp", [Curr]), Reds, LM)
2230    end.
2231
2232pformat(A1, A2, A3, A4, A5) ->
2233    io:format( "~-12s ~-21ts ~-21ts ~9w ~4w~n", [A1,A2,A3,A4,A5]).
2234
2235fetch(Key, Info) ->
2236    case lists:keysearch(Key, 1, Info) of
2237	{value, {_, Val}} ->
2238	    Val;
2239	_ ->
2240	    0
2241    end.
2242
2243
2244%%%%%%%%%%%%%%%%%%%%
2245%%%%%%%%%%%%%%%%%%%%%  reconfigure stuff comes here ......
2246%%%%%%%%%%%%%%%%%%%%%
2247
2248reconfigure_coordinators(N, [{Tid, [Store | _]} | Coordinators]) ->
2249    case mnesia_recover:outcome(Tid, unknown) of
2250	committed ->
2251	    WaitingNodes = ?ets_lookup(Store, waiting_for_commit_ack),
2252	    case lists:keymember(N, 2, WaitingNodes) of
2253		false ->
2254		    ignore; % avoid spurious mnesia_down messages
2255		true ->
2256		    send_mnesia_down(Tid, Store, N)
2257	    end;
2258	_ ->
2259	    %% Tell the coordinator about the mnesia_down
2260	    send_mnesia_down(Tid, Store, N)
2261    end,
2262    reconfigure_coordinators(N, Coordinators);
2263reconfigure_coordinators(_N, []) ->
2264    ok.
2265
2266send_mnesia_down(Tid, Store, Node) ->
2267    Msg = {mnesia_down, Node},
2268    send_to_pids([Tid#tid.pid | get_elements(friends,Store)], Msg).
2269
2270send_to_pids([Pid | Pids], Msg) when is_pid(Pid) ->
2271    Pid ! Msg,
2272    send_to_pids(Pids, Msg);
2273send_to_pids([_ | Pids], Msg) ->
2274    send_to_pids(Pids, Msg);
2275send_to_pids([], _Msg) ->
2276    ok.
2277
2278reconfigure_participants(N, [P | Tail]) ->
2279    case lists:member(N, P#participant.disc_nodes) or
2280	 lists:member(N, P#participant.ram_nodes) of
2281	false ->
2282	    %% Ignore, since we are not a participant
2283	    %% in the transaction.
2284	    reconfigure_participants(N, Tail);
2285
2286	true ->
2287	    %% We are on a participant node, lets
2288	    %% check if the dead one was a
2289	    %% participant or a coordinator.
2290	    Tid  = P#participant.tid,
2291	    if
2292		node(Tid#tid.pid) /= N ->
2293		    %% Another participant node died. Ignore.
2294		    reconfigure_participants(N, Tail);
2295
2296		true ->
2297		    %% The coordinator node has died and
2298		    %% we must determine the outcome of the
2299		    %% transaction and tell mnesia_tm on all
2300		    %% nodes (including the local node) about it
2301		    verbose("Coordinator ~p in transaction ~p died~n",
2302			    [Tid#tid.pid, Tid]),
2303
2304		    Nodes = P#participant.disc_nodes ++
2305			    P#participant.ram_nodes,
2306		    AliveNodes = Nodes  -- [N],
2307		    Protocol =  P#participant.protocol,
2308		    tell_outcome(Tid, Protocol, N, AliveNodes, AliveNodes),
2309		    reconfigure_participants(N, Tail)
2310	    end
2311    end;
2312reconfigure_participants(_, []) ->
2313    [].
2314
2315%% We need to determine the outcome of the transaction and
2316%% tell mnesia_tm on all involved nodes (including the local node)
2317%% about the outcome.
2318tell_outcome(Tid, Protocol, Node, CheckNodes, TellNodes) ->
2319    Outcome = mnesia_recover:what_happened(Tid, proto(Protocol), CheckNodes),
2320    case Outcome of
2321	aborted ->
2322	    rpc:abcast(TellNodes, ?MODULE, {Tid,{do_abort, {mnesia_down, Node}}});
2323	committed ->
2324	    rpc:abcast(TellNodes, ?MODULE, {Tid, do_commit})
2325    end,
2326    Outcome.
2327
2328proto(sync_asym_trans) -> asym_trans;
2329proto(Proto) -> Proto.
2330
2331do_stop(#state{coordinators = Coordinators}) ->
2332    Msg = {mnesia_down, node()},
2333    lists:foreach(fun({Tid, _}) -> Tid#tid.pid ! Msg end, gb_trees:to_list(Coordinators)),
2334    mnesia_checkpoint:stop(),
2335    mnesia_log:stop(),
2336    exit(shutdown).
2337
2338fixtable(Tab, Lock, Me) ->
2339    case req({fixtable, [Tab,Lock,Me]}) of
2340	error ->
2341	    exit({no_exists, Tab});
2342	Else ->
2343	    Else
2344    end.
2345
2346%%%%%%%%%%%%%%%%%%%%%%%%%%%
2347%% System upgrade
2348
2349system_continue(_Parent, _Debug, State) ->
2350    doit_loop(State).
2351
2352-spec system_terminate(_, _, _, _) -> no_return().
2353system_terminate(_Reason, _Parent, _Debug, State) ->
2354    do_stop(State).
2355
2356system_code_change(State=#state{coordinators=Cs0,participants=Ps0},_Module,_OldVsn,downgrade) ->
2357    case is_tuple(Cs0) of
2358	true ->
2359	    Cs = gb_trees:to_list(Cs0),
2360	    Ps = gb_trees:values(Ps0),
2361	    {ok, State#state{coordinators=Cs,participants=Ps}};
2362	false ->
2363	    {ok, State}
2364    end;
2365
2366system_code_change(State=#state{coordinators=Cs0,participants=Ps0},_Module,_OldVsn,_Extra) ->
2367    case is_list(Cs0) of
2368	true ->
2369	    Cs = gb_trees:from_orddict(lists:sort(Cs0)),
2370	    Ps1 = [{P#participant.tid,P}|| P <- Ps0],
2371	    Ps = gb_trees:from_orddict(lists:sort(Ps1)),
2372	    {ok, State#state{coordinators=Cs,participants=Ps}};
2373	false ->
2374	    {ok, State}
2375    end.
2376