1%% ``Licensed under the Apache License, Version 2.0 (the "License");
2%% you may not use this file except in compliance with the License.
3%% You may obtain a copy of the License at
4%%
5%%     http://www.apache.org/licenses/LICENSE-2.0
6%%
7%% Unless required by applicable law or agreed to in writing, software
8%% distributed under the License is distributed on an "AS IS" BASIS,
9%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10%% See the License for the specific language governing permissions and
11%% limitations under the License.
12%%
13%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
14%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
15%% AB. All Rights Reserved.''
16%%
17%%     $Id: mnesia_recover.erl,v 1.1 2008/12/17 09:53:39 mikpe Exp $
18-module(mnesia_recover).
19
20-behaviour(gen_server).
21
22-export([
23	 allow_garb/0,
24	 call/1,
25	 connect_nodes/1,
26	 disconnect/1,
27	 dump_decision_tab/0,
28	 get_master_node_info/0,
29	 get_master_node_tables/0,
30	 get_master_nodes/1,
31	 get_mnesia_downs/0,
32	 has_mnesia_down/1,
33	 incr_trans_tid_serial/0,
34	 init/0,
35	 log_decision/1,
36	 log_master_nodes/3,
37	 log_mnesia_down/1,
38	 log_mnesia_up/1,
39	 mnesia_down/1,
40	 note_decision/2,
41	 note_log_decision/2,
42	 outcome/2,
43	 start/0,
44	 start_garb/0,
45	 still_pending/1,
46	 sync_trans_tid_serial/1,
47	 wait_for_decision/2,
48	 what_happened/3
49	]).
50
51%% gen_server callbacks
52-export([init/1,
53	 handle_call/3,
54	 handle_cast/2,
55	 handle_info/2,
56	 terminate/2,
57	 code_change/3
58	]).
59
60
61-include("mnesia.hrl").
62-import(mnesia_lib, [set/2, verbose/2, error/2, fatal/2]).
63
64-record(state, {supervisor,
65		unclear_pid,
66		unclear_decision,
67		unclear_waitfor,
68		tm_queue_len = 0,
69		initiated = false,
70		early_msgs = []
71	       }).
72
73%%-define(DBG(F, A), mnesia:report_event(list_to_atom(lists:flatten(io_lib:format(F, A))))).
74%%-define(DBG(F, A), io:format("DBG: " ++ F, A)).
75
76-record(transient_decision, {tid, outcome}).
77
78start() ->
79    gen_server:start_link({local, ?MODULE}, ?MODULE, [self()],
80			  [{timeout, infinity}
81			   %%, {debug, [trace]}
82			  ]).
83
84init() ->
85    call(init).
86
87start_garb() ->
88    Pid = whereis(mnesia_recover),
89    {ok, _} = timer:send_interval(timer:minutes(2), Pid, garb_decisions),
90    {ok, _} = timer:send_interval(timer:seconds(10), Pid, check_overload).
91
92allow_garb() ->
93    cast(allow_garb).
94
95
96%% The transaction log has either been swiched (latest -> previous) or
97%% there is nothing to be dumped. This means that the previous
98%% transaction log only may contain commit records which refers to
99%% transactions noted in the last two of the 'Prev' tables. All other
100%% tables may now be garbed by 'garb_decisions' (after 2 minutes).
101%% Max 10 tables are kept.
102do_allow_garb() ->
103    %% The order of the following stuff is important!
104    Curr = val(latest_transient_decision),
105    Old = val(previous_transient_decisions),
106    Next = create_transient_decision(),
107    {Prev, ReallyOld} = sublist([Curr | Old], 10, []),
108    [?ets_delete_table(Tab) || Tab <- ReallyOld],
109    set(previous_transient_decisions, Prev),
110    set(latest_transient_decision, Next).
111
112sublist([H|R], N, Acc) when N > 0 ->
113    sublist(R, N-1, [H| Acc]);
114sublist(List, _N, Acc) ->
115    {lists:reverse(Acc), List}.
116
117do_garb_decisions() ->
118    case val(previous_transient_decisions) of
119	[First, Second | Rest] ->
120	    set(previous_transient_decisions, [First, Second]),
121	    [?ets_delete_table(Tab) || Tab <- Rest];
122	_ ->
123	    ignore
124    end.
125
126connect_nodes([]) ->
127    [];
128connect_nodes(Ns) ->
129    %% Determine which nodes we should try to connect
130    AlreadyConnected = val(recover_nodes),
131    {_, Nodes} = mnesia_lib:search_delete(node(), Ns),
132    Check = Nodes -- AlreadyConnected,
133    GoodNodes = mnesia_monitor:negotiate_protocol(Check),
134    if
135	GoodNodes == [] ->
136	    %% No good noodes to connect to
137	    ignore;
138	true ->
139	    %% Now we have agreed upon a protocol with some new nodes
140	    %% and we may use them when we recover transactions
141	    mnesia_lib:add_list(recover_nodes, GoodNodes),
142	    cast({announce_all, GoodNodes}),
143	    case get_master_nodes(schema) of
144		[] ->
145		    Context = starting_partitioned_network,
146		    mnesia_monitor:detect_inconcistency(GoodNodes, Context);
147		_ -> %% If master_nodes is set ignore old inconsistencies
148		    ignore
149	    end
150    end,
151    {GoodNodes, AlreadyConnected}.
152
153disconnect(Node) ->
154    mnesia_monitor:disconnect(Node),
155    mnesia_lib:del(recover_nodes, Node).
156
157val(Var) ->
158    case ?catch_val(Var) of
159	{'EXIT', Reason} -> mnesia_lib:other_val(Var, Reason);
160	Value -> Value
161    end.
162
163call(Msg) ->
164    Pid = whereis(?MODULE),
165    case Pid of
166	undefined ->
167	    {error, {node_not_running, node()}};
168	Pid ->
169	    link(Pid),
170	    Res = gen_server:call(Pid, Msg, infinity),
171	    unlink(Pid),
172
173            %% We get an exit signal if server dies
174            receive
175                {'EXIT', Pid, _Reason} ->
176                    {error, {node_not_running, node()}}
177            after 0 ->
178                    ignore
179            end,
180	    Res
181    end.
182
183multicall(Nodes, Msg) ->
184    rpc:multicall(Nodes, ?MODULE, call, [Msg]).
185
186cast(Msg) ->
187    case whereis(?MODULE) of
188	undefined -> ignore;
189	Pid ->  gen_server:cast(Pid, Msg)
190    end.
191
192abcast(Nodes, Msg) ->
193    gen_server:abcast(Nodes, ?MODULE, Msg).
194
195note_decision(Tid, Outcome) ->
196    Tab = val(latest_transient_decision),
197    ?ets_insert(Tab, #transient_decision{tid = Tid, outcome = Outcome}).
198
199note_up(Node, _Date, _Time) ->
200    ?ets_delete(mnesia_decision, Node).
201
202note_down(Node, Date, Time) ->
203    ?ets_insert(mnesia_decision, {mnesia_down, Node, Date, Time}).
204
205note_master_nodes(Tab, []) ->
206    ?ets_delete(mnesia_decision, Tab);
207note_master_nodes(Tab, Nodes) when list(Nodes) ->
208    Master = {master_nodes, Tab, Nodes},
209    ?ets_insert(mnesia_decision, Master).
210
211note_outcome(D) when D#decision.disc_nodes == [] ->
212%%    ?DBG("~w: note_tmp_decision: ~w~n", [node(), D]),
213    note_decision(D#decision.tid, filter_outcome(D#decision.outcome)),
214    ?ets_delete(mnesia_decision, D#decision.tid);
215note_outcome(D) when D#decision.disc_nodes /= [] ->
216%%    ?DBG("~w: note_decision: ~w~n", [node(), D]),
217    ?ets_insert(mnesia_decision, D).
218
219log_decision(D) when D#decision.outcome /= unclear ->
220    OldD = decision(D#decision.tid),
221    MergedD = merge_decisions(node(), OldD, D),
222    do_log_decision(MergedD, true);
223log_decision(D) ->
224    do_log_decision(D, false).
225
226do_log_decision(D, DoTell) ->
227    RamNs = D#decision.ram_nodes,
228    DiscNs = D#decision.disc_nodes -- [node()],
229    Outcome = D#decision.outcome,
230    D2 =
231	case Outcome of
232	    aborted -> D#decision{disc_nodes = DiscNs};
233	    committed -> D#decision{disc_nodes = DiscNs};
234	    _ -> D
235	end,
236    note_outcome(D2),
237    case mnesia_monitor:use_dir() of
238	true ->
239	    mnesia_log:append(latest_log, D2),
240	    if
241		DoTell == true, Outcome /= unclear ->
242		    tell_im_certain(DiscNs, D2),
243		    tell_im_certain(RamNs, D2);
244		true ->
245		    ignore
246	    end;
247	false ->
248	    ignore
249    end.
250
251tell_im_certain([], _D) ->
252    ignore;
253tell_im_certain(Nodes, D) ->
254    Msg = {im_certain, node(), D},
255%%    ?DBG("~w: ~w: tell: ~w~n", [node(), Msg, Nodes]),
256    abcast(Nodes, Msg).
257
258log_mnesia_up(Node) ->
259    call({log_mnesia_up, Node}).
260
261log_mnesia_down(Node) ->
262    call({log_mnesia_down, Node}).
263
264get_mnesia_downs() ->
265    Tab = mnesia_decision,
266    Pat = {mnesia_down, '_', '_', '_'},
267    Downs = ?ets_match_object(Tab, Pat),
268    [Node || {mnesia_down, Node, _Date, _Time} <- Downs].
269
270%% Check if we have got a mnesia_down from Node
271has_mnesia_down(Node) ->
272    case ?ets_lookup(mnesia_decision, Node) of
273	[{mnesia_down, Node, _Date, _Time}] ->
274	    true;
275	[] ->
276	    false
277    end.
278
279mnesia_down(Node) ->
280    case ?catch_val(recover_nodes) of
281	{'EXIT', _} ->
282	    %% Not started yet
283	    ignore;
284	_ ->
285	    mnesia_lib:del(recover_nodes, Node),
286	    cast({mnesia_down, Node})
287    end.
288
289log_master_nodes(Args, UseDir, IsRunning) ->
290    if
291	IsRunning == yes ->
292	    log_master_nodes2(Args, UseDir, IsRunning, ok);
293	UseDir == false ->
294	    ok;
295	true ->
296	    Name = latest_log,
297	    Fname = mnesia_log:latest_log_file(),
298	    Exists = mnesia_lib:exists(Fname),
299	    Repair = mnesia:system_info(auto_repair),
300	    OpenArgs = [{file, Fname}, {name, Name}, {repair, Repair}],
301	    case disk_log:open(OpenArgs) of
302		{ok, Name} ->
303		    log_master_nodes2(Args, UseDir, IsRunning, ok);
304		{repaired, Name, {recovered,  _R}, {badbytes, _B}}
305		  when Exists == true ->
306		    log_master_nodes2(Args, UseDir, IsRunning, ok);
307		{repaired, Name, {recovered,  _R}, {badbytes, _B}}
308		  when Exists == false ->
309		    mnesia_log:write_trans_log_header(),
310		    log_master_nodes2(Args, UseDir, IsRunning, ok);
311		{error, Reason} ->
312		    {error, Reason}
313	    end
314    end.
315
316log_master_nodes2([{Tab, Nodes} | Tail], UseDir, IsRunning, WorstRes) ->
317    Res =
318	case IsRunning of
319	    yes ->
320		R = call({log_master_nodes, Tab, Nodes, UseDir, IsRunning}),
321		mnesia_controller:master_nodes_updated(Tab, Nodes),
322		R;
323	    _ ->
324		do_log_master_nodes(Tab, Nodes, UseDir, IsRunning)
325	end,
326    case Res of
327	ok ->
328	    log_master_nodes2(Tail, UseDir, IsRunning, WorstRes);
329	{error, Reason} ->
330	    log_master_nodes2(Tail, UseDir, IsRunning, {error, Reason})
331    end;
332log_master_nodes2([], _UseDir, IsRunning, WorstRes) ->
333    case IsRunning of
334	yes ->
335	    WorstRes;
336	_ ->
337	    disk_log:close(latest_log),
338	    WorstRes
339    end.
340
341get_master_node_info() ->
342    Tab = mnesia_decision,
343    Pat = {master_nodes, '_', '_'},
344    case catch mnesia_lib:db_match_object(ram_copies,Tab, Pat) of
345	{'EXIT', _} ->
346	    [];
347	Masters ->
348	    Masters
349    end.
350
351get_master_node_tables() ->
352    Masters = get_master_node_info(),
353    [Tab || {master_nodes, Tab, _Nodes} <- Masters].
354
355get_master_nodes(Tab) ->
356    case catch ?ets_lookup_element(mnesia_decision, Tab, 3) of
357	{'EXIT', _} -> [];
358	Nodes  -> Nodes
359    end.
360
361%% Determine what has happened to the transaction
362what_happened(Tid, Protocol, Nodes) ->
363    Default =
364	case Protocol of
365	    asym_trans -> aborted;
366	    _ -> unclear  %% sym_trans and sync_sym_trans
367	end,
368    This = node(),
369    case lists:member(This, Nodes) of
370	true ->
371	    {ok, Outcome} = call({what_happened, Default, Tid}),
372	    Others = Nodes -- [This],
373	    case filter_outcome(Outcome) of
374		unclear -> what_happened_remotely(Tid, Default, Others);
375		aborted -> aborted;
376		committed -> committed
377	    end;
378	false ->
379	    what_happened_remotely(Tid, Default, Nodes)
380    end.
381
382what_happened_remotely(Tid, Default, Nodes) ->
383    {Replies, _} = multicall(Nodes, {what_happened, Default, Tid}),
384    check_what_happened(Replies, 0, 0).
385
386check_what_happened([H | T], Aborts, Commits) ->
387    case H of
388	{ok, R} ->
389	    case filter_outcome(R) of
390		committed ->
391		    check_what_happened(T, Aborts, Commits + 1);
392		aborted ->
393		    check_what_happened(T, Aborts + 1, Commits);
394		unclear ->
395		    check_what_happened(T, Aborts, Commits)
396	    end;
397	{error, _} ->
398	    check_what_happened(T, Aborts, Commits);
399	{badrpc, _} ->
400	    check_what_happened(T, Aborts, Commits)
401    end;
402check_what_happened([], Aborts, Commits) ->
403    if
404	Aborts == 0, Commits == 0 -> aborted;  % None of the active nodes knows
405	Aborts > 0 -> aborted;                 % Someody has aborted
406	Aborts == 0, Commits > 0 -> committed  % All has committed
407    end.
408
409%% Determine what has happened to the transaction
410%% and possibly wait forever for the decision.
411wait_for_decision(presume_commit, _InitBy) ->
412    %% sym_trans
413    {{presume_commit, self()}, committed};
414
415wait_for_decision(D, InitBy) when D#decision.outcome == presume_abort ->
416    %% asym_trans
417    Tid = D#decision.tid,
418    Outcome = filter_outcome(outcome(Tid, D#decision.outcome)),
419    if
420	Outcome /= unclear ->
421	    {Tid, Outcome};
422
423	InitBy /= startup ->
424	    %% Wait a while for active transactions
425	    %% to end and try again
426	    timer:sleep(200),
427	    wait_for_decision(D, InitBy);
428
429	InitBy == startup ->
430	    {ok, Res} = call({wait_for_decision, D}),
431	    {Tid, Res}
432    end.
433
434still_pending([Tid | Pending]) ->
435    case filter_outcome(outcome(Tid, unclear)) of
436	unclear -> [Tid | still_pending(Pending)];
437	_ -> still_pending(Pending)
438    end;
439still_pending([]) ->
440    [].
441
442load_decision_tab() ->
443    Cont = mnesia_log:open_decision_tab(),
444    load_decision_tab(Cont, load_decision_tab),
445    mnesia_log:close_decision_tab().
446
447load_decision_tab(eof, _InitBy) ->
448    ok;
449load_decision_tab(Cont, InitBy) ->
450    case mnesia_log:chunk_decision_tab(Cont) of
451	{Cont2, Decisions} ->
452	    note_log_decisions(Decisions, InitBy),
453	    load_decision_tab(Cont2, InitBy);
454	eof ->
455	    ok
456    end.
457
458%% Dumps DECISION.LOG and PDECISION.LOG and removes them.
459%% From now on all decisions are logged in the transaction log file
460convert_old() ->
461    HasOldStuff =
462	mnesia_lib:exists(mnesia_log:previous_decision_log_file()) or
463	mnesia_lib:exists(mnesia_log:decision_log_file()),
464    case HasOldStuff of
465	true ->
466	    mnesia_log:open_decision_log(),
467	    dump_decision_log(startup),
468	    dump_decision_log(startup),
469	    mnesia_log:close_decision_log(),
470	    Latest = mnesia_log:decision_log_file(),
471	    ok = file:delete(Latest);
472	false ->
473	    ignore
474    end.
475
476dump_decision_log(InitBy) ->
477    %% Assumed to be run in transaction log dumper process
478    Cont = mnesia_log:prepare_decision_log_dump(),
479    perform_dump_decision_log(Cont, InitBy).
480
481perform_dump_decision_log(eof, _InitBy) ->
482    confirm_decision_log_dump();
483perform_dump_decision_log(Cont, InitBy) when InitBy == startup ->
484    case mnesia_log:chunk_decision_log(Cont) of
485	{Cont2, Decisions} ->
486	    note_log_decisions(Decisions, InitBy),
487	    perform_dump_decision_log(Cont2, InitBy);
488	eof ->
489	    confirm_decision_log_dump()
490    end;
491perform_dump_decision_log(_Cont, _InitBy) ->
492    confirm_decision_log_dump().
493
494confirm_decision_log_dump() ->
495    dump_decision_tab(),
496    mnesia_log:confirm_decision_log_dump().
497
498dump_decision_tab() ->
499    Tab = mnesia_decision,
500    All = mnesia_lib:db_match_object(ram_copies,Tab, '_'),
501    mnesia_log:save_decision_tab({decision_list, All}).
502
503note_log_decisions([What | Tail], InitBy) ->
504    note_log_decision(What, InitBy),
505    note_log_decisions(Tail, InitBy);
506note_log_decisions([], _InitBy) ->
507    ok.
508
509note_log_decision(NewD, InitBy) when NewD#decision.outcome == pre_commit ->
510    note_log_decision(NewD#decision{outcome = unclear}, InitBy);
511
512note_log_decision(NewD, _InitBy) when record(NewD, decision) ->
513    Tid = NewD#decision.tid,
514    sync_trans_tid_serial(Tid),
515    OldD = decision(Tid),
516    MergedD = merge_decisions(node(), OldD, NewD),
517    note_outcome(MergedD);
518
519note_log_decision({trans_tid, serial, _Serial}, startup) ->
520    ignore;
521
522note_log_decision({trans_tid, serial, Serial}, _InitBy) ->
523    sync_trans_tid_serial(Serial);
524
525note_log_decision({mnesia_up, Node, Date, Time}, _InitBy) ->
526    note_up(Node, Date, Time);
527
528note_log_decision({mnesia_down, Node, Date, Time}, _InitBy) ->
529    note_down(Node, Date, Time);
530
531note_log_decision({master_nodes, Tab, Nodes}, _InitBy) ->
532    note_master_nodes(Tab, Nodes);
533
534note_log_decision(H, _InitBy) when H#log_header.log_kind == decision_log ->
535    V = mnesia_log:decision_log_version(),
536    if
537	H#log_header.log_version == V->
538	    ok;
539	H#log_header.log_version == "2.0" ->
540	    verbose("Accepting an old version format of decision log: ~p~n",
541		    [V]),
542	    ok;
543	true ->
544	    fatal("Bad version of decision log: ~p~n", [H])
545    end;
546
547note_log_decision(H, _InitBy) when H#log_header.log_kind == decision_tab ->
548    V = mnesia_log:decision_tab_version(),
549    if
550	V == H#log_header.log_version ->
551	    ok;
552	true ->
553	    fatal("Bad version of decision tab: ~p~n", [H])
554    end;
555note_log_decision({decision_list, ItemList}, InitBy) ->
556    note_log_decisions(ItemList, InitBy);
557note_log_decision(BadItem, InitBy) ->
558    exit({"Bad decision log item", BadItem, InitBy}).
559
560trans_tid_serial() ->
561    ?ets_lookup_element(mnesia_decision, serial, 3).
562
563set_trans_tid_serial(Val) ->
564    ?ets_insert(mnesia_decision, {trans_tid, serial, Val}).
565
566incr_trans_tid_serial() ->
567    ?ets_update_counter(mnesia_decision, serial, 1).
568
569sync_trans_tid_serial(ThatCounter) when integer(ThatCounter) ->
570    ThisCounter = trans_tid_serial(),
571    if
572	ThatCounter > ThisCounter ->
573	    set_trans_tid_serial(ThatCounter + 1);
574	true ->
575	    ignore
576    end;
577sync_trans_tid_serial(Tid) ->
578    sync_trans_tid_serial(Tid#tid.counter).
579
580%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
581%%% Callback functions from gen_server
582
583%%----------------------------------------------------------------------
584%% Func: init/1
585%% Returns: {ok, State}          |
586%%          {ok, State, Timeout} |
587%%          {stop, Reason}
588%%----------------------------------------------------------------------
589init([Parent]) ->
590    process_flag(trap_exit, true),
591    mnesia_lib:verbose("~p starting: ~p~n", [?MODULE, self()]),
592    set(latest_transient_decision, create_transient_decision()),
593    set(previous_transient_decisions, []),
594    set(recover_nodes, []),
595    State = #state{supervisor = Parent},
596    {ok, State}.
597
598create_transient_decision() ->
599    ?ets_new_table(mnesia_transient_decision, [{keypos, 2}, set, public]).
600
601%%----------------------------------------------------------------------
602%% Func: handle_call/3
603%% Returns: {reply, Reply, State}          |
604%%          {reply, Reply, State, Timeout} |
605%%          {noreply, State}               |
606%%          {noreply, State, Timeout}      |
607%%          {stop, Reason, Reply, State}   | (terminate/2 is called)
608%%----------------------------------------------------------------------
609
610handle_call(init, From, State) when State#state.initiated == false ->
611    Args = [{keypos, 2}, set, public, named_table],
612    case mnesia_monitor:use_dir() of
613	true ->
614	    ?ets_new_table(mnesia_decision, Args),
615	    set_trans_tid_serial(0),
616	    TabFile = mnesia_log:decision_tab_file(),
617	    case mnesia_lib:exists(TabFile) of
618		true ->
619		    load_decision_tab();
620		false ->
621		    ignore
622	    end,
623	    convert_old(),
624	    mnesia_dumper:opt_dump_log(scan_decisions);
625	false ->
626	    ?ets_new_table(mnesia_decision, Args),
627	    set_trans_tid_serial(0)
628    end,
629    handle_early_msgs(State, From);
630
631handle_call(Msg, From, State) when State#state.initiated == false ->
632    %% Buffer early messages
633    Msgs = State#state.early_msgs,
634    {noreply, State#state{early_msgs = [{call, Msg, From} | Msgs]}};
635
636handle_call({what_happened, Default, Tid}, _From, State) ->
637    sync_trans_tid_serial(Tid),
638    Outcome = outcome(Tid, Default),
639    {reply, {ok, Outcome}, State};
640
641handle_call({wait_for_decision, D}, From, State) ->
642    Recov = val(recover_nodes),
643    AliveRam = (mnesia_lib:intersect(D#decision.ram_nodes, Recov) -- [node()]),
644    RemoteDisc = D#decision.disc_nodes -- [node()],
645    if
646	AliveRam == [], RemoteDisc == [] ->
647	    %% No more else to wait for and we may safely abort
648	    {reply, {ok, aborted}, State};
649	true ->
650	    verbose("Transaction ~p is unclear. "
651		    "Wait for disc nodes: ~w ram: ~w~n",
652		    [D#decision.tid, RemoteDisc, AliveRam]),
653	    AliveDisc = mnesia_lib:intersect(RemoteDisc, Recov),
654	    Msg = {what_decision, node(), D},
655	    abcast(AliveRam, Msg),
656	    abcast(AliveDisc, Msg),
657	    case val(max_wait_for_decision) of
658		infinity ->
659		    ignore;
660		MaxWait ->
661		    ForceMsg =  {force_decision, D#decision.tid},
662		    {ok, _} = timer:send_after(MaxWait, ForceMsg)
663	    end,
664	    State2 = State#state{unclear_pid = From,
665				 unclear_decision = D,
666				 unclear_waitfor = (RemoteDisc ++ AliveRam)},
667	    {noreply, State2}
668    end;
669
670handle_call({log_mnesia_up, Node}, _From, State) ->
671    do_log_mnesia_up(Node),
672    {reply, ok, State};
673
674handle_call({log_mnesia_down, Node}, _From, State) ->
675    do_log_mnesia_down(Node),
676    {reply, ok, State};
677
678handle_call({log_master_nodes, Tab, Nodes, UseDir, IsRunning}, _From, State) ->
679    do_log_master_nodes(Tab, Nodes, UseDir, IsRunning),
680    {reply, ok, State};
681
682handle_call(Msg, _From, State) ->
683    error("~p got unexpected call: ~p~n", [?MODULE, Msg]),
684    {noreply, State}.
685
686do_log_mnesia_up(Node) ->
687    Yoyo = {mnesia_up, Node, Date = date(), Time = time()},
688    case mnesia_monitor:use_dir() of
689	true ->
690	    mnesia_log:append(latest_log, Yoyo),
691	    disk_log:sync(latest_log);
692	false  ->
693	    ignore
694    end,
695    note_up(Node, Date, Time).
696
697do_log_mnesia_down(Node) ->
698    Yoyo = {mnesia_down, Node, Date = date(), Time = time()},
699    case mnesia_monitor:use_dir() of
700	true ->
701	    mnesia_log:append(latest_log, Yoyo),
702	    disk_log:sync(latest_log);
703	false  ->
704	    ignore
705    end,
706    note_down(Node, Date, Time).
707
708do_log_master_nodes(Tab, Nodes, UseDir, IsRunning) ->
709    Master = {master_nodes, Tab, Nodes},
710    Res =
711	case UseDir of
712	    true ->
713		LogRes = mnesia_log:append(latest_log, Master),
714		disk_log:sync(latest_log),
715		LogRes;
716	    false  ->
717		ok
718	end,
719    case IsRunning of
720	yes ->
721	    note_master_nodes(Tab, Nodes);
722	_NotRunning ->
723	    ignore
724    end,
725    Res.
726
727%%----------------------------------------------------------------------
728%% Func: handle_cast/2
729%% Returns: {noreply, State}          |
730%%          {noreply, State, Timeout} |
731%%          {stop, Reason, State}            (terminate/2 is called)
732%%----------------------------------------------------------------------
733
734handle_cast(Msg, State) when State#state.initiated == false ->
735    %% Buffer early messages
736    Msgs = State#state.early_msgs,
737    {noreply, State#state{early_msgs = [{cast, Msg} | Msgs]}};
738
739handle_cast({im_certain, Node, NewD}, State) ->
740    OldD = decision(NewD#decision.tid),
741    MergedD = merge_decisions(Node, OldD, NewD),
742    do_log_decision(MergedD, false),
743    {noreply, State};
744
745handle_cast(allow_garb, State) ->
746    do_allow_garb(),
747    {noreply, State};
748
749handle_cast({decisions, Node, Decisions}, State) ->
750    mnesia_lib:add(recover_nodes, Node),
751    State2 = add_remote_decisions(Node, Decisions, State),
752    {noreply, State2};
753
754handle_cast({what_decision, Node, OtherD}, State) ->
755    Tid = OtherD#decision.tid,
756    sync_trans_tid_serial(Tid),
757    Decision =
758	case decision(Tid) of
759	    no_decision -> OtherD;
760	    MyD when record(MyD, decision) -> MyD
761	end,
762    announce([Node], [Decision], [], true),
763    {noreply, State};
764
765handle_cast({mnesia_down, Node}, State) ->
766    case State#state.unclear_decision of
767	undefined ->
768	    {noreply, State};
769	D ->
770	    case lists:member(Node, D#decision.ram_nodes) of
771		false ->
772		    {noreply, State};
773		true ->
774		    State2 = add_remote_decision(Node, D, State),
775		    {noreply, State2}
776		end
777    end;
778
779handle_cast({announce_all, Nodes}, State) ->
780    announce_all(Nodes, tabs()),
781    {noreply, State};
782
783handle_cast(Msg, State) ->
784    error("~p got unexpected cast: ~p~n", [?MODULE, Msg]),
785    {noreply, State}.
786
787%%----------------------------------------------------------------------
788%% Func: handle_info/2
789%% Returns: {noreply, State}          |
790%%          {noreply, State, Timeout} |
791%%          {stop, Reason, State}            (terminate/2 is called)
792%%----------------------------------------------------------------------
793
794%% No need for buffering
795%% handle_info(Msg, State) when State#state.initiated == false ->
796%%     %% Buffer early messages
797%%     Msgs = State#state.early_msgs,
798%%     {noreply, State#state{early_msgs = [{info, Msg} | Msgs]}};
799
800handle_info(check_overload, S) ->
801    %% Time to check if mnesia_tm is overloaded
802    case whereis(mnesia_tm) of
803	Pid when pid(Pid) ->
804
805	    Threshold = 100,
806	    Prev = S#state.tm_queue_len,
807	    {message_queue_len, Len} =
808		process_info(Pid, message_queue_len),
809	    if
810		Len > Threshold, Prev > Threshold ->
811		    What = {mnesia_tm, message_queue_len, [Prev, Len]},
812		    mnesia_lib:report_system_event({mnesia_overload, What}),
813		    {noreply, S#state{tm_queue_len = 0}};
814
815		Len > Threshold ->
816		    {noreply, S#state{tm_queue_len = Len}};
817
818		true ->
819		    {noreply, S#state{tm_queue_len = 0}}
820	    end;
821	undefined ->
822	    {noreply, S}
823    end;
824
825handle_info(garb_decisions, State) ->
826    do_garb_decisions(),
827    {noreply, State};
828
829handle_info({force_decision, Tid}, State) ->
830    %% Enforce a transaction recovery decision,
831    %% if we still are waiting for the outcome
832
833    case State#state.unclear_decision of
834	U when U#decision.tid == Tid ->
835	    verbose("Decided to abort transaction ~p since "
836		    "max_wait_for_decision has been exceeded~n",
837		    [Tid]),
838	    D = U#decision{outcome = aborted},
839	    State2 = add_remote_decision(node(), D, State),
840	    {noreply, State2};
841	_ ->
842	    {noreply, State}
843    end;
844
845handle_info({'EXIT', Pid, R}, State) when Pid == State#state.supervisor ->
846    mnesia_lib:dbg_out("~p was ~p~n",[?MODULE, R]),
847    {stop, shutdown, State};
848
849handle_info(Msg, State) ->
850    error("~p got unexpected info: ~p~n", [?MODULE, Msg]),
851    {noreply, State}.
852
853%%----------------------------------------------------------------------
854%% Func: terminate/2
855%% Purpose: Shutdown the server
856%% Returns: any (ignored by gen_server)
857%%----------------------------------------------------------------------
858
859terminate(Reason, State) ->
860    mnesia_monitor:terminate_proc(?MODULE, Reason, State).
861
862%%----------------------------------------------------------------------
863%% Func: code_change/3
864%% Purpose: Upgrade process when its code is to be changed
865%% Returns: {ok, NewState}
866%%----------------------------------------------------------------------
867code_change(_OldVsn, State, _Extra) ->
868    {ok, State}.
869
870%%%----------------------------------------------------------------------
871%%% Internal functions
872%%%----------------------------------------------------------------------
873
874handle_early_msgs(State, From) ->
875    Res = do_handle_early_msgs(State#state.early_msgs,
876			       State#state{early_msgs = [],
877					   initiated = true}),
878    gen_server:reply(From, ok),
879    Res.
880
881do_handle_early_msgs([Msg | Msgs], State) ->
882    %% The messages are in reverted order
883    case do_handle_early_msgs(Msgs, State) of
884        {stop, Reason, Reply, State2} ->
885	    {stop, Reason, Reply, State2};
886        {stop, Reason, State2} ->
887	    {stop, Reason, State2};
888	{noreply, State2} ->
889	    handle_early_msg(Msg, State2)
890    end;
891
892do_handle_early_msgs([], State) ->
893    {noreply, State}.
894
895handle_early_msg({call, Msg, From}, State) ->
896    case handle_call(Msg, From, State) of
897	{reply, R, S} ->
898	    gen_server:reply(From, R),
899	    {noreply, S};
900	Other ->
901	    Other
902    end;
903handle_early_msg({cast, Msg}, State) ->
904    handle_cast(Msg, State);
905handle_early_msg({info, Msg}, State) ->
906    handle_info(Msg, State).
907
908tabs() ->
909    Curr = val(latest_transient_decision),    % Do not miss any trans even
910    Prev = val(previous_transient_decisions), % if the tabs are switched
911    [Curr, mnesia_decision | Prev].           % Ordered by hit probability
912
913decision(Tid) ->
914    decision(Tid, tabs()).
915
916decision(Tid, [Tab | Tabs]) ->
917    case catch ?ets_lookup(Tab, Tid) of
918	[D] when record(D, decision) ->
919	    D;
920	[C] when record(C, transient_decision) ->
921	    #decision{tid = C#transient_decision.tid,
922		      outcome =  C#transient_decision.outcome,
923		      disc_nodes = [],
924		      ram_nodes = []
925		     };
926	[] ->
927	    decision(Tid, Tabs);
928	{'EXIT', _} ->
929	    %% Recently switched transient decision table
930	    decision(Tid, Tabs)
931    end;
932decision(_Tid, []) ->
933    no_decision.
934
935outcome(Tid, Default) ->
936    outcome(Tid, Default, tabs()).
937
938outcome(Tid, Default, [Tab | Tabs]) ->
939    case catch ?ets_lookup_element(Tab, Tid, 3) of
940	{'EXIT', _} ->
941	    outcome(Tid, Default, Tabs);
942	Val ->
943	    Val
944    end;
945outcome(_Tid, Default, []) ->
946    Default.
947
948filter_outcome(Val) ->
949    case Val of
950	unclear -> unclear;
951	aborted -> aborted;
952	presume_abort -> aborted;
953	committed -> committed;
954	pre_commit -> unclear
955    end.
956
957filter_aborted(D) when D#decision.outcome == presume_abort ->
958    D#decision{outcome = aborted};
959filter_aborted(D) ->
960    D.
961
962%% Merge old decision D with new (probably remote) decision
963merge_decisions(Node, D, NewD0) ->
964    NewD = filter_aborted(NewD0),
965    if
966	D == no_decision, node() /= Node ->
967	    %% We did not know anything about this txn
968	    NewD#decision{disc_nodes = []};
969	D == no_decision ->
970	    NewD;
971	record(D, decision) ->
972	    DiscNs = D#decision.disc_nodes -- ([node(), Node]),
973	    OldD = filter_aborted(D#decision{disc_nodes = DiscNs}),
974%%	    mnesia_lib:dbg_out("merge ~w: NewD = ~w~n D = ~w~n OldD = ~w~n",
975%%			       [Node, NewD, D, OldD]),
976	    if
977		OldD#decision.outcome == unclear,
978		NewD#decision.outcome == unclear ->
979		    D;
980
981		OldD#decision.outcome == NewD#decision.outcome ->
982		    %% We have come to the same decision
983		    OldD;
984
985		OldD#decision.outcome == committed,
986		NewD#decision.outcome == aborted ->
987		    %% Interesting! We have already committed,
988		    %% but someone else has aborted. Now we
989		    %% have a nice little inconcistency. The
990		    %% other guy (or some one else) has
991		    %% enforced a recovery decision when
992		    %% max_wait_for_decision was exceeded.
993		    %% We will pretend that we have obeyed
994		    %% the forced recovery decision, but we
995		    %% will also generate an event in case the
996		    %% application wants to do something clever.
997		    Msg = {inconsistent_database, bad_decision, Node},
998		    mnesia_lib:report_system_event(Msg),
999		    OldD#decision{outcome = aborted};
1000
1001		OldD#decision.outcome == aborted ->
1002		    %% aborted overrrides anything
1003		    OldD#decision{outcome = aborted};
1004
1005		NewD#decision.outcome == aborted ->
1006		    %% aborted overrrides anything
1007		    OldD#decision{outcome = aborted};
1008
1009		OldD#decision.outcome == committed,
1010		NewD#decision.outcome == unclear ->
1011		    %% committed overrides unclear
1012		    OldD#decision{outcome = committed};
1013
1014		OldD#decision.outcome == unclear,
1015		NewD#decision.outcome == committed ->
1016		    %% committed overrides unclear
1017		    OldD#decision{outcome = committed}
1018	    end
1019    end.
1020
1021add_remote_decisions(Node, [D | Tail], State) when record(D, decision) ->
1022    State2 = add_remote_decision(Node, D, State),
1023    add_remote_decisions(Node, Tail, State2);
1024
1025add_remote_decisions(Node, [C | Tail], State)
1026        when record(C, transient_decision) ->
1027    D = #decision{tid = C#transient_decision.tid,
1028		  outcome = C#transient_decision.outcome,
1029		  disc_nodes = [],
1030		  ram_nodes = []},
1031    State2 = add_remote_decision(Node, D, State),
1032    add_remote_decisions(Node, Tail, State2);
1033
1034add_remote_decisions(Node, [{mnesia_down, _, _, _} | Tail], State) ->
1035    add_remote_decisions(Node, Tail, State);
1036
1037add_remote_decisions(Node, [{trans_tid, serial, Serial} | Tail], State) ->
1038    sync_trans_tid_serial(Serial),
1039    case State#state.unclear_decision of
1040	undefined ->
1041	    ignored;
1042	D ->
1043	    case lists:member(Node, D#decision.ram_nodes) of
1044		true ->
1045		    ignore;
1046		false ->
1047		    abcast([Node], {what_decision, node(), D})
1048	    end
1049    end,
1050    add_remote_decisions(Node, Tail, State);
1051
1052add_remote_decisions(_Node, [], State) ->
1053    State.
1054
1055add_remote_decision(Node, NewD, State) ->
1056    Tid = NewD#decision.tid,
1057    OldD = decision(Tid),
1058    D = merge_decisions(Node, OldD, NewD),
1059    do_log_decision(D, false),
1060    Outcome = D#decision.outcome,
1061    if
1062	OldD == no_decision ->
1063	    ignore;
1064	Outcome == unclear ->
1065	    ignore;
1066	true ->
1067	    case lists:member(node(), NewD#decision.disc_nodes) or
1068		 lists:member(node(), NewD#decision.ram_nodes) of
1069		true ->
1070		    tell_im_certain([Node], D);
1071		false ->
1072		    ignore
1073	    end
1074    end,
1075    case State#state.unclear_decision of
1076	U when U#decision.tid == Tid ->
1077	    WaitFor = State#state.unclear_waitfor -- [Node],
1078	    if
1079		Outcome == unclear, WaitFor == [] ->
1080		    %% Everybody are uncertain, lets abort
1081		    NewOutcome = aborted,
1082		    CertainD = D#decision{outcome = NewOutcome,
1083					  disc_nodes = [],
1084					  ram_nodes = []},
1085		    tell_im_certain(D#decision.disc_nodes, CertainD),
1086		    tell_im_certain(D#decision.ram_nodes, CertainD),
1087		    do_log_decision(CertainD, false),
1088		    verbose("Decided to abort transaction ~p "
1089			    "since everybody are uncertain ~p~n",
1090			    [Tid, CertainD]),
1091		    gen_server:reply(State#state.unclear_pid, {ok, NewOutcome}),
1092		    State#state{unclear_pid = undefined,
1093				unclear_decision = undefined,
1094				unclear_waitfor = undefined};
1095		Outcome /= unclear ->
1096		    verbose("~p told us that transaction ~p was ~p~n",
1097			    [Node, Tid, Outcome]),
1098		    gen_server:reply(State#state.unclear_pid, {ok, Outcome}),
1099		    State#state{unclear_pid = undefined,
1100				unclear_decision = undefined,
1101				unclear_waitfor = undefined};
1102		Outcome == unclear ->
1103		    State#state{unclear_waitfor = WaitFor}
1104	    end;
1105	_ ->
1106	    State
1107    end.
1108
1109announce_all([], _Tabs) ->
1110    ok;
1111announce_all(ToNodes, [Tab | Tabs]) ->
1112    case catch mnesia_lib:db_match_object(ram_copies, Tab, '_') of
1113	{'EXIT', _} ->
1114	    %% Oops, we are in the middle of a 'garb_decisions'
1115	    announce_all(ToNodes, Tabs);
1116	List ->
1117	    announce(ToNodes, List, [], false),
1118	    announce_all(ToNodes, Tabs)
1119    end;
1120announce_all(_ToNodes, []) ->
1121    ok.
1122
1123announce(ToNodes, [Head | Tail], Acc, ForceSend) ->
1124    Acc2 = arrange(ToNodes, Head, Acc, ForceSend),
1125    announce(ToNodes, Tail, Acc2, ForceSend);
1126
1127announce(_ToNodes, [], Acc, _ForceSend) ->
1128    send_decisions(Acc).
1129
1130send_decisions([{Node, Decisions} | Tail]) ->
1131    abcast([Node], {decisions, node(), Decisions}),
1132    send_decisions(Tail);
1133send_decisions([]) ->
1134    ok.
1135
1136arrange([To | ToNodes], D, Acc, ForceSend) when record(D, decision) ->
1137    NeedsAdd = (ForceSend or
1138		lists:member(To, D#decision.disc_nodes) or
1139		lists:member(To, D#decision.ram_nodes)),
1140    case NeedsAdd of
1141	true ->
1142	    Acc2 = add_decision(To, D, Acc),
1143	    arrange(ToNodes, D, Acc2, ForceSend);
1144	false ->
1145	    arrange(ToNodes, D, Acc, ForceSend)
1146    end;
1147
1148arrange([To | ToNodes], C, Acc, ForceSend) when record(C, transient_decision) ->
1149    Acc2 = add_decision(To, C, Acc),
1150    arrange(ToNodes, C, Acc2, ForceSend);
1151
1152arrange([_To | _ToNodes], {mnesia_down, _Node, _Date, _Time}, Acc, _ForceSend) ->
1153    %% The others have their own info about this
1154    Acc;
1155
1156arrange([_To | _ToNodes], {master_nodes, _Tab, _Nodes}, Acc, _ForceSend) ->
1157    %% The others have their own info about this
1158    Acc;
1159
1160arrange([To | ToNodes], {trans_tid, serial, Serial}, Acc, ForceSend) ->
1161    %% Do the lamport thing plus release the others
1162    %% from uncertainity.
1163    Acc2 = add_decision(To, {trans_tid, serial, Serial}, Acc),
1164    arrange(ToNodes, {trans_tid, serial, Serial}, Acc2, ForceSend);
1165
1166arrange([], _Decision, Acc, _ForceSend) ->
1167    Acc.
1168
1169add_decision(Node, Decision, [{Node, Decisions} | Tail]) ->
1170    [{Node, [Decision | Decisions]} | Tail];
1171add_decision(Node, Decision, [Head | Tail]) ->
1172    [Head | add_decision(Node, Decision, Tail)];
1173add_decision(Node, Decision, []) ->
1174    [{Node, [Decision]}].
1175