1%%
2%% %CopyrightBegin%
3%%
4%% Copyright Ericsson AB 1996-2016. All Rights Reserved.
5%%
6%% Licensed under the Apache License, Version 2.0 (the "License");
7%% you may not use this file except in compliance with the License.
8%% You may obtain a copy of the License at
9%%
10%%     http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing, software
13%% distributed under the License is distributed on an "AS IS" BASIS,
14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15%% See the License for the specific language governing permissions and
16%% limitations under the License.
17%%
18%% %CopyrightEnd%
19%%
20
21%%
22%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
23%%
24%% MODULE
25%%
26%%   mnesia_tpcb - TPC-B benchmarking of Mnesia
27%%
28%% DESCRIPTION
29%%
30%%   The metrics used in the TPC-B benchmark are throughput as measured
31%%   in transactions per second (TPS). The benchmark uses a single,
32%%   simple update-intensive transaction to load the database system.
33%%   The single transaction type provides a simple, repeatable
34%%   unit of work, and is designed to exercise the basic components of
35%%   a database system.
36%%
37%%   The definition of the TPC-B states lots of detailed rules and
38%%   conditions that must be fullfilled, e.g. how the ACID (atomicity,
39%%   consistency, isolation and durability) properties are verified,
40%%   how the random numbers must be distributed, minimum sizes of
41%%   the different types of records, minimum duration of the benchmark,
42%%   formulas to calculate prices (dollars per tps), disclosure issues
43%%   etc. Please, see http://www.tpc.org/ about the nitty gritty details.
44%%
45%%   The TPC-B benchmark is stated in terms of a hypothetical bank. The
46%%   bank has one or more branches. Each branch has multiple tellers. The
47%%   bank has many customers, each with an account. The database represents
48%%   the cash position of each entity (branch, teller and account) and a
49%%   history of recent transactions run by the bank. The transaction
50%%   represents the work done when a customer makes a deposit or a
51%%   withdrawal against his account. The transaction is performed by a
52%%   teller at some branch.
53%%
54%%   Each process that performs TPC-B transactions is called a driver.
55%%   Drivers generates teller_id, account_id and delta amount of
56%%   money randomly. An account, a teller and a branch are read, their
57%%   balances are adjusted and a history record is created. The driver
58%%   measures the time for 3 reads, 3 writes and 1 create.
59%%
60%% GETTING STARTED
61%%
62%%   Generate tables and run with default configuration:
63%%
64%%     mnesia_tpcb:start().
65%%
66%%  A little bit more advanced;
67%%
68%%     spawn(mnesia_tpcb, start, [[[{n_drivers_per_node, 8}, {stop_after, infinity}]]),
69%%     mnesia_tpcb:stop().
70%%
71%%  Really advanced;
72%%
73%%    mnesia_tpcb:init(([{n_branches, 8}, {replica_type, disc_only_copies}]),
74%%    mnesia_tpcb:run(([{n_drivers_per_node, 8}]),
75%%    mnesia_tpcb:run(([{n_drivers_per_node, 64}]).
76%%
77%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
78
79-module(mnesia_tpcb).
80-author('hakan@erix.ericsson.se').
81
82-export([
83	 config/2,
84	 count_balance/0,
85	 driver_init/2,
86	 init/1,
87	 reporter_init/2,
88	 run/1,
89	 start/0,
90	 start/1,
91	 start/2,
92	 stop/0,
93	 real_trans/5,
94	 verify_tabs/0,
95	 reply_gen_branch/3,
96	 frag_add_delta/7,
97
98	 conflict_test/1,
99	 dist_test/1,
100	 replica_test/1,
101	 sticky_replica_test/1,
102	 remote_test/1,
103	 remote_frag2_test/1,
104
105	 conflict_benchmark/1
106	]).
107
108-include_lib("common_test/include/ct_event.hrl").
109
110-define(SECOND, 1000000).
111
112%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
113%%% Account record, total size must be at least 100 bytes
114
115-define(ACCOUNT_FILLER,
116	{123456789012345678901234567890123456789012345678901234567890,
117	 123456789012345678901234567890123456789012345678901234567890,
118	 123456789012345678901234567890123456789012345678901234}).
119
120-record(account,
121       {
122	id           = 0, % Unique account id
123	branch_id    = 0, % Branch where the account is held
124	balance      = 0, % Account balance
125	filler       = ?ACCOUNT_FILLER  % Gap filler to ensure size >= 100 bytes
126       }).
127
128%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
129%%% Branch record, total size must be at least 100 bytes
130
131-define(BRANCH_FILLER,
132	{123456789012345678901234567890123456789012345678901234567890,
133	 123456789012345678901234567890123456789012345678901234567890,
134	 123456789012345678901234567890123456789012345678901234567890}).
135
136-record(branch,
137       {
138	id           = 0, % Unique branch id
139	balance      = 0, % Total balance of whole branch
140	filler       = ?BRANCH_FILLER  % Gap filler to ensure size >= 100 bytes
141       }).
142
143%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
144%%% Teller record, total size must be at least 100 bytes
145
146-define(TELLER_FILLER,
147	{123456789012345678901234567890123456789012345678901234567890,
148	 123456789012345678901234567890123456789012345678901234567890,
149	 1234567890123456789012345678901234567890123456789012345678}).
150
151-record(teller,
152       {
153	id           = 0, % Unique teller id
154	branch_id    = 0, % Branch where the teller is located
155	balance      = 0, % Teller balance
156	filler       = ?TELLER_FILLER % Gap filler to ensure size >= 100 bytes
157       }).
158
159%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
160%%% History record, total size must be at least 50 bytes
161
162-define(HISTORY_FILLER, 1234567890).
163
164-record(history,
165	{
166	 history_id  = {0, 0}, % {DriverId, DriverLocalHistoryid}
167	 time_stamp  = erlang:system_time(),  % Time point during active transaction
168	 branch_id   = 0,      % Branch associated with teller
169	 teller_id   = 0,      % Teller invlolved in transaction
170	 account_id  = 0,      % Account updated by transaction
171	 amount      = 0,      % Amount (delta) specified by transaction
172	 filler      = ?HISTORY_FILLER % Gap filler to ensure size >= 50 bytes
173       }).
174
175%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
176-record(tab_config,
177	{
178	  db_nodes = [node()],
179	  n_replicas = 1, % Ignored for non-fragmented tables
180	  replica_nodes = [node()],
181	  replica_type = ram_copies,
182	  use_running_mnesia = false,
183	  n_fragments = 0,
184	  n_branches = 1,
185	  n_tellers_per_branch = 10, % Must be 10
186	  n_accounts_per_branch = 100000, % Must be 100000
187	  branch_filler = ?BRANCH_FILLER,
188	  account_filler = ?ACCOUNT_FILLER,
189	  teller_filler = ?TELLER_FILLER
190	 }).
191
192%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
193
194-record(run_config,
195	{
196	  driver_nodes = [node()],
197	  n_drivers_per_node = 1,
198	  use_running_mnesia = false,
199	  seed,
200	  stop_after = timer:minutes(15), % Minimum 15 min
201	  report_interval = timer:minutes(1),
202	  send_bench_report = false,
203	  use_sticky_locks = false,
204	  spawn_near_branch = false,
205	  activity_type = transaction,
206	  reuse_history_id = false
207	 }).
208
209%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
210
211-record(time,
212	{
213	  n_trans = 0,
214	  min_n = 0,
215	  max_n = 0,
216	  acc_time = 0,
217	  max_time = 0
218	 }).
219
220%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
221
222-record(driver_state,
223	{
224	  driver_id,
225	  driver_node,
226	  seed,
227	  n_local_branches,
228	  local_branches,
229	  tab_config,
230	  run_config,
231	  history_id,
232	  time = #time{},
233	  acc_time = #time{},
234	  reuse_history_id
235	 }).
236
237%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
238
239-record(reporter_state,
240	{
241	  driver_pids,
242	  starter_pid,
243	  n_iters = 0,
244	  prev_tps = 0,
245	  curr = #time{},
246	  acc = #time{},
247	  init_micros,
248	  prev_micros,
249	  run_config
250	 }).
251
252%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
253%% One driver on each node, table not replicated
254
255config(frag_test, ReplicaType) ->
256    Remote = nodes(),
257    Local = node(),
258    Nodes = [Local | Remote],
259    [
260     {n_branches, length(Nodes)},
261     {n_fragments, length(Nodes)},
262     {replica_nodes, Nodes},
263     {db_nodes, Nodes},
264     {driver_nodes, Nodes},
265     {n_accounts_per_branch, 100},
266     {replica_type, ReplicaType},
267     {stop_after, timer:minutes(1)},
268     {report_interval, timer:seconds(10)},
269     {reuse_history_id, true}
270    ];
271
272%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
273%% One driver on each node, table replicated to two nodes.
274
275config(frag2_test, ReplicaType) ->
276    Remote = nodes(),
277    Local = node(),
278    Nodes = [Local | Remote],
279    [
280     {n_branches, length(Nodes)},
281     {n_fragments, length(Nodes)},
282     {n_replicas, 2},
283     {replica_nodes,  Nodes},
284     {db_nodes, Nodes},
285     {driver_nodes, Nodes},
286     {n_accounts_per_branch, 100},
287     {replica_type, ReplicaType},
288     {stop_after, timer:minutes(1)},
289     {report_interval, timer:seconds(10)},
290     {reuse_history_id, true}
291    ];
292
293%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
294%% One driver on this node, table replicated to all nodes.
295
296config(replica_test, ReplicaType) ->
297    Remote = nodes(),
298    Local = node(),
299    Nodes = [Local | Remote],
300    [
301     {db_nodes, Nodes},
302     {driver_nodes, [Local]},
303     {replica_nodes, Nodes},
304     {n_accounts_per_branch, 100},
305     {replica_type, ReplicaType},
306     {stop_after, timer:minutes(1)},
307     {report_interval, timer:seconds(10)},
308     {reuse_history_id, true}
309    ];
310
311%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
312%% One driver on this node, table replicated to all nodes.
313
314config(sticky_replica_test, ReplicaType) ->
315    Remote = nodes(),
316    Local = node(),
317    Nodes = [Local | Remote],
318    [
319     {db_nodes, Nodes},
320     {driver_nodes, [node()]},
321     {replica_nodes, Nodes},
322     {n_accounts_per_branch, 100},
323     {replica_type, ReplicaType},
324     {use_sticky_locks, true},
325     {stop_after, timer:minutes(1)},
326     {report_interval, timer:seconds(10)},
327     {reuse_history_id, true}
328    ];
329
330%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
331%% Ten drivers per node, tables replicated to all nodes, lots of branches
332
333config(dist_test, ReplicaType) ->
334    Remote = nodes(),
335    Local = node(),
336    Nodes = [Local | Remote],
337    [
338     {db_nodes, Nodes},
339     {driver_nodes, Nodes},
340     {replica_nodes, Nodes},
341     {n_drivers_per_node, 10},
342     {n_branches, 10 * length(Nodes) * 100},
343     {n_accounts_per_branch, 10},
344     {replica_type, ReplicaType},
345     {stop_after, timer:minutes(1)},
346     {report_interval, timer:seconds(10)},
347     {reuse_history_id, true}
348    ];
349
350%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
351%% Ten drivers per node, tables replicated to all nodes, single branch
352
353config(conflict_test, ReplicaType) ->
354    Remote = nodes(),
355    Local = node(),
356    Nodes = [Local | Remote],
357    [
358     {db_nodes, Nodes},
359     {driver_nodes, Nodes},
360     {replica_nodes, Nodes},
361     {n_drivers_per_node, 10},
362     {n_branches, 1},
363     {n_accounts_per_branch, 10},
364     {replica_type, ReplicaType},
365     {stop_after, timer:minutes(1)},
366     {report_interval, timer:seconds(10)},
367     {reuse_history_id, true}
368    ];
369
370%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
371%% One driver on this node, table replicated to all other nodes.
372
373config(remote_test, ReplicaType) ->
374    Remote = nodes(),
375    Local = node(),
376    Nodes = [Local | Remote],
377    [
378     {db_nodes, Nodes},
379     {driver_nodes, [Local]},
380     {replica_nodes, Remote},
381     {n_accounts_per_branch, 100},
382     {replica_type, ReplicaType},
383     {stop_after, timer:minutes(1)},
384     {report_interval, timer:seconds(10)},
385     {reuse_history_id, true}
386    ];
387
388%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
389%% One driver on this node, table replicated to two other nodes.
390
391config(remote_frag2_test, ReplicaType) ->
392    Remote = nodes(),
393    Local = node(),
394    Nodes = [Local | Remote],
395    [
396     {n_branches, length(Remote)},
397     {n_fragments, length(Remote)},
398     {n_replicas, 2},
399     {replica_nodes, Remote},
400     {db_nodes, Nodes},
401     {driver_nodes, [Local]},
402     {n_accounts_per_branch, 100},
403     {replica_type, ReplicaType},
404     {stop_after, timer:minutes(1)},
405     {report_interval, timer:seconds(10)},
406     {reuse_history_id, true}
407    ];
408
409%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
410%% Ten drivers per node, tables replicated to all nodes, single branch
411
412config(conflict_benchmark, ReplicaType) ->
413    Remote = nodes(),
414    Local = node(),
415    Nodes = [Local | Remote],
416    [{db_nodes, Nodes},
417     {driver_nodes, Nodes},
418     {replica_nodes, Nodes},
419     {n_drivers_per_node, 10},
420     {n_branches, 1},
421     {n_accounts_per_branch, 10},
422     {replica_type, ReplicaType},
423     {stop_after, timer:minutes(1)},
424     {report_interval, timer:seconds(10)},
425     {send_bench_report, true},
426     {reuse_history_id, true}
427    ].
428
429
430%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
431
432start(What, ReplicaType) ->
433    spawn_link(?MODULE, start, [config(What, ReplicaType)]).
434
435replica_test(ReplicaType) ->
436    start(replica_test, ReplicaType).
437
438sticky_replica_test(ReplicaType) ->
439    start(sticky_replica_test, ReplicaType).
440
441dist_test(ReplicaType) ->
442    start(dist_test, ReplicaType).
443
444conflict_test(ReplicaType) ->
445    start(conflict_test, ReplicaType).
446
447remote_test(ReplicaType) ->
448    start(remote_test, ReplicaType).
449
450remote_frag2_test(ReplicaType) ->
451    start(remote_frag2_test, ReplicaType).
452
453conflict_benchmark(ReplicaType) ->
454    start(config(conflict_benchmark, ReplicaType)).
455
456%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
457%% Args is a list of {Key, Val} tuples where Key is a field name
458%% in either the record tab_config or run_config. Unknown keys are ignored.
459
460start() ->
461    start([]).
462start(Args) ->
463    init(Args),
464    run(Args).
465
466list2rec(List, Fields, DefaultTuple) ->
467    [Name|Defaults] = tuple_to_list(DefaultTuple),
468    List2 = list2rec(List, Fields, Defaults, []),
469    list_to_tuple([Name] ++ List2).
470
471list2rec(_List, [], [], Acc) ->
472    Acc;
473list2rec(List, [F|Fields], [D|Defaults], Acc) ->
474    {Val, List2} =
475	case lists:keysearch(F, 1, List) of
476	    false ->
477		{D, List};
478	    {value, {F, NewVal}} ->
479		{NewVal, lists:keydelete(F, 1, List)}
480	end,
481    list2rec(List2, Fields, Defaults, Acc ++ [Val]).
482
483stop() ->
484    case whereis(mnesia_tpcb) of
485	undefined ->
486	    {error, not_running};
487	Pid ->
488	    sync_stop(Pid)
489    end.
490
491sync_stop(Pid) ->
492    Pid ! {self(), stop},
493    receive
494	{Pid, {stopped, Res}} ->  Res
495    after timer:minutes(1) ->
496	    exit(Pid, kill),
497	    {error, brutal_kill}
498    end.
499
500%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
501%%% Initialization
502
503%% Args is a list of {Key, Val} tuples where Key is a field name
504%% in the record tab_config, unknown keys are ignored.
505
506init(Args) ->
507    TabConfig0 = list2rec(Args, record_info(fields, tab_config), #tab_config{}),
508    TabConfig =
509	if
510	    TabConfig0#tab_config.n_fragments =:= 0 ->
511		TabConfig0#tab_config{n_replicas = length(TabConfig0#tab_config.replica_nodes)};
512	    true ->
513		TabConfig0
514	end,
515    Tags = record_info(fields, tab_config),
516    Fun = fun(F, Pos) -> {{F, element(Pos, TabConfig)}, Pos + 1} end,
517    {List, _} = lists:mapfoldl(Fun, 2, Tags),
518    io:format("TPC-B: Table config: ~p ~n", [List]),
519
520    DbNodes = TabConfig#tab_config.db_nodes,
521    stop(),
522    if
523	TabConfig#tab_config.use_running_mnesia =:= true ->
524	    ignore;
525	true ->
526	    rpc:multicall(DbNodes, mnesia, lkill, []),
527	    case mnesia:delete_schema(DbNodes) of
528		ok ->
529		    case mnesia:create_schema(DbNodes) of
530			ok ->
531			    {Replies, BadNodes} =
532				rpc:multicall(DbNodes, mnesia, start, []),
533			    case [Res || Res <- Replies, Res =/= ok] of
534				[] when BadNodes =:= [] ->
535				    ok;
536				BadRes ->
537				    io:format("TPC-B: <ERROR> "
538					      "Failed to start ~p: ~p~n",
539					      [BadNodes, BadRes]),
540				    exit({start_failed, BadRes, BadNodes})
541			    end;
542			{error, Reason} ->
543			    io:format("TPC-B: <ERROR> "
544				      "Failed to create schema on disc: ~p~n",
545				      [Reason]),
546			    exit({create_schema_failed, Reason})
547		    end;
548		{error, Reason} ->
549		    io:format("TPC-B: <ERROR> "
550			      "Failed to delete schema on disc: ~p~n",
551			      [Reason]),
552		    exit({delete_schema_failed, Reason})
553	    end
554    end,
555    gen_tabs(TabConfig).
556
557gen_tabs(TC) ->
558    create_tab(TC, branch, record_info(fields, branch),
559	       undefined),
560    create_tab(TC, account, record_info(fields, account),
561	       {branch, #account.branch_id}),
562    create_tab(TC, teller, record_info(fields, teller),
563	       {branch, #teller.branch_id}),
564    create_tab(TC, history, record_info(fields, history),
565	       {branch, #history.branch_id}),
566
567    NB = TC#tab_config.n_branches,
568    NT = TC#tab_config.n_tellers_per_branch,
569    NA = TC#tab_config.n_accounts_per_branch,
570    io:format("TPC-B: Generating ~p branches a ~p bytes~n",
571	      [NB, size(term_to_binary(default_branch(TC)))]),
572    io:format("TPC-B: Generating ~p * ~p tellers a ~p bytes~n",
573	      [NB, NT, size(term_to_binary(default_teller(TC)))]),
574    io:format("TPC-B: Generating ~p * ~p accounts a ~p bytes~n",
575	      [NB, NA, size(term_to_binary(default_account(TC)))]),
576    io:format("TPC-B: Generating 0 history records a ~p bytes~n",
577	      [size(term_to_binary(default_history(TC)))]),
578    gen_branches(TC),
579
580    case verify_tabs() of
581	ok ->
582	    ignore;
583	{error, Reason} ->
584	    io:format("TPC-B: <ERROR> Inconsistent tables: ~w~n",
585		      [Reason]),
586	    exit({inconsistent_tables, Reason})
587    end.
588
589create_tab(TC, Name, Attrs, _ForeignKey) when TC#tab_config.n_fragments =:= 0 ->
590    Nodes = TC#tab_config.replica_nodes,
591    Type = TC#tab_config.replica_type,
592    Def = [{Type, Nodes}, {attributes, Attrs}],
593    create_tab(Name, Def);
594create_tab(TC, Name, Attrs, ForeignKey) ->
595    NReplicas = TC#tab_config.n_replicas,
596    NodePool = TC#tab_config.replica_nodes,
597    Type = TC#tab_config.replica_type,
598    NF = TC#tab_config.n_fragments,
599    Props = [{n_fragments, NF},
600	     {node_pool, NodePool},
601	     {n_copies(Type), NReplicas},
602	     {foreign_key, ForeignKey}],
603    Def = [{frag_properties, Props},
604	   {attributes, Attrs}],
605    create_tab(Name, Def).
606
607create_tab(Name, Def) ->
608    mnesia:delete_table(Name),
609    case mnesia:create_table(Name, Def) of
610	{atomic, ok} ->
611	    ok;
612	{aborted, Reason} ->
613	    io:format("TPC-B: <ERROR> failed to create table ~w ~w: ~p~n",
614		      [Name, Def, Reason]),
615	    exit({create_table_failed, Reason})
616    end.
617
618n_copies(Type) ->
619    case Type of
620	ram_copies -> n_ram_copies;
621	disc_copies -> n_disc_copies;
622	disc_only_copies -> n_disc_only_copies
623    end.
624
625gen_branches(TC) ->
626    First = 0,
627    Last = First + TC#tab_config.n_branches - 1,
628    GenPids = gen_branches(TC, First, Last, []),
629    wait_for_gen(GenPids).
630
631wait_for_gen([]) ->
632    ok;
633wait_for_gen(Pids) ->
634    receive
635	{branch_generated, Pid} -> wait_for_gen(lists:delete(Pid, Pids));
636	Exit ->
637	    exit({tpcb_failed, Exit})
638    end.
639
640gen_branches(TC, BranchId, Last, UsedNs) when BranchId =< Last ->
641    UsedNs2 = get_branch_nodes(BranchId, UsedNs),
642    Node = hd(UsedNs2),
643    Pid = spawn_link(Node, ?MODULE, reply_gen_branch,
644		     [self(), TC, BranchId]),
645    [Pid | gen_branches(TC, BranchId + 1, Last, UsedNs2)];
646gen_branches(_, _, _, _) ->
647    [].
648
649reply_gen_branch(ReplyTo, TC, BranchId) ->
650    gen_branch(TC, BranchId),
651    ReplyTo ! {branch_generated, self()},
652    unlink(ReplyTo).
653
654%% Returns a new list of nodes with the best node as head
655get_branch_nodes(BranchId, UsedNs) ->
656    WriteNs = table_info({branch, BranchId}, where_to_write),
657    WeightedNs = [{n_duplicates(N, UsedNs, 0), N} || N <- WriteNs],
658    [{_, LeastUsed} | _ ] = lists:sort(WeightedNs),
659    [LeastUsed | UsedNs].
660
661n_duplicates(_N, [], Count) ->
662    Count;
663n_duplicates(N, [N | Tail], Count) ->
664    n_duplicates(N, Tail, Count + 1);
665n_duplicates(N, [_ | Tail], Count) ->
666    n_duplicates(N, Tail, Count).
667
668gen_branch(TC, BranchId) ->
669    A = default_account(TC),
670    NA = TC#tab_config.n_accounts_per_branch,
671    FirstA = BranchId * NA,
672    ArgsA = [FirstA, FirstA + NA - 1, BranchId, A],
673    ok = mnesia:activity(async_dirty, fun gen_accounts/4, ArgsA, mnesia_frag),
674
675    T = default_teller(TC),
676    NT = TC#tab_config.n_tellers_per_branch,
677    FirstT = BranchId * NT,
678    ArgsT = [FirstT, FirstT + NT - 1, BranchId, T],
679    ok = mnesia:activity(async_dirty, fun gen_tellers/4, ArgsT, mnesia_frag),
680
681    B = default_branch(TC),
682    FunB = fun() -> mnesia:write(branch, B#branch{id = BranchId}, write) end,
683    ok = mnesia:activity(sync_dirty,  FunB, [], mnesia_frag).
684
685gen_tellers(Id, Last, BranchId, T) when Id =< Last ->
686    mnesia:write(teller, T#teller{id = Id, branch_id=BranchId}, write),
687    gen_tellers(Id + 1, Last, BranchId, T);
688gen_tellers(_, _, _, _) ->
689    ok.
690
691gen_accounts(Id, Last, BranchId, A) when Id =< Last ->
692    mnesia:write(account, A#account{id = Id, branch_id=BranchId}, write),
693    gen_accounts(Id + 1, Last, BranchId, A);
694gen_accounts(_, _, _, _) ->
695    ok.
696
697default_branch(TC) ->  #branch{filler = TC#tab_config.branch_filler}.
698default_teller(TC) ->  #teller{filler = TC#tab_config.teller_filler}.
699default_account(TC) -> #account{filler = TC#tab_config.account_filler}.
700default_history(_TC) -> #history{}.
701
702%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
703%%% Run the benchmark
704
705%% Args is a list of {Key, Val} tuples where Key is a field name
706%% in the record run_config, unknown keys are ignored.
707run(Args) ->
708    RunConfig = list2rec(Args, record_info(fields, run_config), #run_config{}),
709    Tags = record_info(fields, run_config),
710    Fun = fun(F, Pos) -> {{F, element(Pos, RunConfig)}, Pos + 1} end,
711    {List, _} = lists:mapfoldl(Fun, 2, Tags),
712    io:format("TPC-B: Run config: ~p ~n", [List]),
713
714    Pid = spawn_link(?MODULE, reporter_init, [self(), RunConfig]),
715    receive
716	{Pid, {stopped, Res}} ->
717	    Res; % Stopped by other process
718	Else ->
719	    {tpcb_got, Else}
720    after RunConfig#run_config.stop_after ->
721	    sync_stop(Pid)
722    end.
723
724reporter_init(Starter, RC) ->
725    register(mnesia_tpcb, self()),
726    process_flag(trap_exit, true),
727    DbNodes = mnesia:system_info(db_nodes),
728    if
729	RC#run_config.use_running_mnesia =:= true ->
730	    ignore;
731	true ->
732	    {Replies, BadNodes} =
733		rpc:multicall(DbNodes, mnesia, start, []),
734	    case [Res || Res <- Replies, Res =/= ok] of
735		[] when BadNodes =:= [] ->
736		    ok;
737		BadRes ->
738		    io:format("TPC-B: <ERROR> "
739			      "Failed to start ~w: ~p~n",
740			      [BadNodes, BadRes]),
741		    exit({start_failed, BadRes, BadNodes})
742	    end,
743	    verify_tabs()
744    end,
745
746    N  = table_info(branch, size),
747    NT = table_info(teller, size) div N,
748    NA = table_info(account, size) div N,
749
750    {Type, NF, RepNodes} = table_storage(branch),
751    TC = #tab_config{n_fragments = NF,
752		     n_branches = N,
753		     n_tellers_per_branch = NT,
754		     n_accounts_per_branch = NA,
755		     db_nodes = DbNodes,
756		     replica_nodes = RepNodes,
757		     replica_type = Type
758		    },
759    Drivers = start_drivers(RC, TC),
760    Now = erlang:monotonic_time(),
761    State = #reporter_state{driver_pids = Drivers,
762			    run_config = RC,
763			    starter_pid = Starter,
764			    init_micros = Now,
765			    prev_micros = Now
766			   },
767    case catch reporter_loop(State) of
768	{'EXIT', Reason} ->
769	    io:format("TPC-B: Abnormal termination: ~p~n", [Reason]),
770	    if
771		RC#run_config.use_running_mnesia =:= true ->
772		    ignore;
773		true ->
774		    rpc:multicall(DbNodes, mnesia, lkill, [])
775	    end,
776	    unlink(Starter),
777	    Starter ! {self(), {stopped, {error, Reason}}}, % To be sure
778	    exit(shutdown);
779	{ok, Stopper, State2} ->
780	    Time = State2#reporter_state.acc,
781	    Res =
782		case verify_tabs() of
783		    ok ->
784			{ok, Time};
785		    {error, Reason} ->
786			io:format("TPC-B: <ERROR> Inconsistent tables, ~p~n",
787				  [{error, Reason}]),
788			{error, Reason}
789		end,
790	    if
791		RC#run_config.use_running_mnesia =:= true ->
792		    ignore;
793		true ->
794		    rpc:multicall(DbNodes, mnesia, stop, [])
795	    end,
796	    unlink(Starter),
797	    Starter ! {self(), {stopped, Res}},
798	    if
799		Stopper =/= Starter ->
800		    Stopper ! {self(), {stopped, Res}};
801		true ->
802		    ignore
803	    end,
804	    exit(shutdown)
805    end.
806
807table_info(Tab, Item) ->
808    Fun = fun() -> mnesia:table_info(Tab, Item) end,
809    mnesia:activity(sync_dirty, Fun, mnesia_frag).
810
811%% Returns {Storage, NFragments, ReplicaNodes}
812table_storage(Tab) ->
813    case mnesia:table_info(branch, frag_properties) of
814	[] ->
815	    NFO = 0,
816	    NR  = length(mnesia:table_info(Tab, ram_copies)),
817	    ND  = length(mnesia:table_info(Tab, disc_copies)),
818	    NDO = length(mnesia:table_info(Tab, disc_only_copies)),
819	    if
820		NR  =/= 0 -> {ram_copies, NFO, NR};
821		ND  =/= 0 -> {disc_copies, NFO, ND};
822		NDO =/= 0 -> {disc_copies, NFO, NDO}
823	    end;
824	Props ->
825	    {value, NFO} = lists:keysearch(n_fragments, 1, Props),
826	    NR  = table_info(Tab, n_ram_copies),
827	    ND  = table_info(Tab, n_disc_copies),
828	    NDO = table_info(Tab, n_disc_only_copies),
829	    if
830		NR  =/= 0 -> {ram_copies, NFO, NR};
831		ND  =/= 0 -> {disc_copies, NFO, ND};
832		NDO =/= 0 -> {disc_copies, NFO, NDO}
833	    end
834    end.
835
836reporter_loop(State) ->
837    RC = State#reporter_state.run_config,
838    receive
839	{From, stop} ->
840	    {ok, From, call_drivers(State, stop)};
841	{'EXIT', Pid, Reason} when Pid =:= State#reporter_state.starter_pid ->
842	    %% call_drivers(State, stop),
843	    exit({starter_died, Pid, Reason})
844    after RC#run_config.report_interval ->
845	    Iters = State#reporter_state.n_iters,
846	    State2 = State#reporter_state{n_iters = Iters + 1},
847	    case call_drivers(State2, report) of
848		State3 when State3#reporter_state.driver_pids =/= [] ->
849		    State4 = State3#reporter_state{curr = #time{}},
850		    reporter_loop(State4);
851		_ ->
852		    exit(drivers_died)
853	    end
854    end.
855
856call_drivers(State, Msg) ->
857    Drivers = State#reporter_state.driver_pids,
858    lists:foreach(fun(Pid) -> Pid ! {self(), Msg} end, Drivers),
859    State2 = show_report(calc_reports(Drivers, State)),
860    case Msg =:= stop of
861	true ->
862	    Acc = State2#reporter_state.acc,
863	    Init = State2#reporter_state.init_micros,
864	    show_report(State2#reporter_state{n_iters = 0,
865					      curr = Acc,
866					      prev_micros = Init});
867	false ->
868	    ignore
869    end,
870    State2.
871
872calc_reports([], State) ->
873    State;
874calc_reports([Pid|Drivers], State) ->
875    receive
876	{'EXIT', P, Reason} when P =:= State#reporter_state.starter_pid ->
877	    exit({starter_died, P, Reason});
878	{'EXIT', Pid, Reason} ->
879	    exit({driver_died, Pid, Reason});
880	{Pid, Time} when is_record(Time, time) ->
881	    %% io:format("~w: ~w~n", [Pid, Time]),
882	    A = add_time(State#reporter_state.acc, Time),
883	    C = add_time(State#reporter_state.curr, Time),
884	    State2 = State#reporter_state{acc = A, curr = C},
885	    calc_reports(Drivers, State2)
886    end.
887
888add_time(Acc, New) ->
889    Acc#time{n_trans = New#time.n_trans + Acc#time.n_trans,
890	     min_n = lists:min([New#time.n_trans, Acc#time.min_n] -- [0]),
891	     max_n = lists:max([New#time.n_trans, Acc#time.max_n]),
892	     acc_time = New#time.acc_time + Acc#time.acc_time,
893	     max_time = lists:max([New#time.max_time, Acc#time.max_time])}.
894
895-define(AVOID_DIV_ZERO(_What_), try (_What_) catch _:_ -> 0 end).
896
897show_report(State) ->
898    Now    = erlang:timestamp(),
899    Iters  = State#reporter_state.n_iters,
900    Cfg    = State#reporter_state.run_config,
901    Time   = State#reporter_state.curr,
902    Max    = Time#time.max_time,
903    N      = Time#time.n_trans,
904    Avg    = ?AVOID_DIV_ZERO(Time#time.acc_time div N),
905    AliveN = length(State#reporter_state.driver_pids),
906    Tps    = ?AVOID_DIV_ZERO((?SECOND * AliveN) div Avg),
907    PrevTps= State#reporter_state.prev_tps,
908    {DiffSign, DiffTps} = signed_diff(Iters, Tps, PrevTps),
909    Unfairness = ?AVOID_DIV_ZERO(Time#time.max_n / Time#time.min_n),
910    BruttoAvg = ?AVOID_DIV_ZERO((Now - State#reporter_state.prev_micros) div N),
911%%    io:format("n_iters=~p, n_trans=~p, n_drivers=~p, avg=~p, now=~p, prev=~p~n",
912%%	      [Iters, N, AliveN, BruttoAvg, Now, State#reporter_state.prev_micros]),
913    BruttoTps = ?AVOID_DIV_ZERO(?SECOND div BruttoAvg),
914    case Iters > 0 of
915	true ->
916	    io:format("TPC-B: ~p iter ~s~p diff ~p (~p) tps ~p avg micros ~p max micros ~p unfairness~n",
917		      [Iters, DiffSign, DiffTps, Tps, BruttoTps, Avg, Max, Unfairness]);
918	false ->
919	    io:format("TPC-B: ~p (~p) transactions per second, "
920		      "duration of longest transaction was ~p milliseconds~n",
921		      [Tps, BruttoTps, Max div 1000])
922    end,
923    case Cfg#run_config.send_bench_report of
924	true ->
925	    ct_event:notify(
926	      #event{name = benchmark_data,
927		     data = [{suite,"mnesia_tpcb"},
928			     {value,Tps}]});
929	_ ->
930	    ok
931    end,
932
933    State#reporter_state{prev_tps = Tps, prev_micros = Now}.
934
935signed_diff(Iters, Curr, Prev) ->
936    case Iters > 1 of
937	true  -> sign(Curr - Prev);
938	false -> sign(0)
939    end.
940
941sign(N) when N > 0 -> {"+", N};
942sign(N) -> {"", N}.
943
944start_drivers(RC, TC) ->
945    LastHistoryId = table_info(history, size),
946    Reuse = RC#run_config.reuse_history_id,
947    DS = #driver_state{tab_config       = TC,
948		       run_config       = RC,
949		       n_local_branches = 0,
950		       local_branches   = [],
951		       history_id       = LastHistoryId,
952		       reuse_history_id = Reuse},
953    Nodes = RC#run_config.driver_nodes,
954    NB = TC#tab_config.n_branches,
955    First = 0,
956    AllBranches = lists:seq(First, First + NB - 1),
957    ND = RC#run_config.n_drivers_per_node,
958    Spawn = fun(Spec) ->
959		    Node = Spec#driver_state.driver_node,
960		    spawn_link(Node, ?MODULE, driver_init, [Spec, AllBranches])
961	    end,
962    Specs = [DS#driver_state{driver_id = Id, driver_node = N}
963	     || N <- Nodes,
964		Id <- lists:seq(1, ND)],
965    Specs2 = lists:sort(lists:flatten(Specs)),
966    {Specs3, OrphanBranches} = alloc_local_branches(AllBranches, Specs2, []),
967    case length(OrphanBranches) of
968	N when N =< 10 ->
969	    io:format("TPC-B: Orphan branches: ~p~n", [OrphanBranches]);
970	N ->
971	    io:format("TPC-B: Orphan branches: ~p~n", [N])
972    end,
973    [Spawn(Spec) || Spec <- Specs3].
974
975alloc_local_branches([BranchId | Tail], Specs, OrphanBranches) ->
976    Nodes = table_info({branch, BranchId}, where_to_write),
977    LocalSpecs = [DS || DS <- Specs,
978			lists:member(DS#driver_state.driver_node, Nodes)],
979    case lists:keysort(#driver_state.n_local_branches, LocalSpecs) of
980	[] ->
981	    alloc_local_branches(Tail, Specs, [BranchId | OrphanBranches]);
982	[DS | _] ->
983	    LocalNB = DS#driver_state.n_local_branches + 1,
984	    LocalBranches = [BranchId | DS#driver_state.local_branches],
985	    DS2 = DS#driver_state{n_local_branches = LocalNB,
986				  local_branches = LocalBranches},
987	    Specs2 = Specs -- [DS],
988	    Specs3 = [DS2 | Specs2],
989	    alloc_local_branches(Tail, Specs3, OrphanBranches)
990    end;
991alloc_local_branches([], Specs, OrphanBranches) ->
992    {Specs, OrphanBranches}.
993
994driver_init(DS, AllBranches) ->
995    Seed = case (DS#driver_state.run_config)#run_config.seed of
996	       undefined -> rand:seed(exsplus);
997	       ExpSeed -> rand:seed(ExpSeed)
998    end,
999
1000    DS2 =
1001	if
1002	    DS#driver_state.n_local_branches =:= 0 ->
1003		DS#driver_state{seed = Seed,
1004				n_local_branches = length(AllBranches),
1005				local_branches   = AllBranches};
1006	    true ->
1007		DS#driver_state{seed = Seed}
1008	end,
1009    io:format("TPC-B: Driver ~p started as ~p on node ~p with ~p local branches~n",
1010	      [DS2#driver_state.driver_id, self(), node(), DS2#driver_state.n_local_branches]),
1011    driver_loop(DS2).
1012
1013driver_loop(DS) ->
1014    receive
1015	{From, report} ->
1016	    From ! {self(), DS#driver_state.time},
1017	    Acc = add_time(DS#driver_state.time, DS#driver_state.acc_time),
1018	    DS2 = DS#driver_state{time=#time{}, acc_time = Acc}, % Reset timer
1019	    DS3 = calc_trans(DS2),
1020	    driver_loop(DS3);
1021	{From, stop} ->
1022	    Acc = add_time(DS#driver_state.time, DS#driver_state.acc_time),
1023	    io:format("TPC-B: Driver ~p (~p) on node ~p stopped: ~w~n",
1024		      [DS#driver_state.driver_id, self(), node(self()), Acc]),
1025	    From ! {self(), DS#driver_state.time},
1026	    unlink(From),
1027	    exit(stopped)
1028    after 0 ->
1029	    DS2 = calc_trans(DS),
1030	    driver_loop(DS2)
1031    end.
1032
1033calc_trans(DS) ->
1034    {Micros, DS2} = time_trans(DS),
1035    Time = DS2#driver_state.time,
1036    Time2 = Time#time{n_trans = Time#time.n_trans + 1,
1037		      acc_time = Time#time.acc_time + Micros,
1038		      max_time = lists:max([Micros, Time#time.max_time])
1039		     },
1040    case DS#driver_state.reuse_history_id of
1041	false ->
1042	    HistoryId = DS#driver_state.history_id + 1,
1043	    DS2#driver_state{time=Time2, history_id = HistoryId};
1044	true ->
1045	    DS2#driver_state{time=Time2}
1046    end.
1047
1048%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1049
1050%% Generate teller_id, account_id and delta
1051%% Time the TPC-B transaction
1052time_trans(DS) ->
1053    {Random, NewSeed} = rand:uniform_s(DS#driver_state.seed),
1054
1055    TC = DS#driver_state.tab_config,
1056    RC = DS#driver_state.run_config,
1057    {Branchid, Args} = random_to_args(Random, DS),
1058    {Fun, Mod} = trans_type(TC, RC),
1059    {Time, Res} = timer:tc(?MODULE, real_trans, [RC, Branchid, Fun, Args, Mod]),
1060
1061    case Res of
1062	AccountBal when is_integer(AccountBal) ->
1063	    {Time, DS#driver_state{seed = NewSeed}};
1064	Other ->
1065	    exit({crash, Other, Args, Random, DS})
1066    end.
1067
1068random_to_args(Random, DS) ->
1069    DriverId = DS#driver_state.driver_id,
1070    TC = DS#driver_state.tab_config,
1071    HistoryId = DS#driver_state.history_id,
1072    Delta = trunc(Random * 1999998) - 999999, % -999999 <= Delta <= +999999
1073
1074    Branches = DS#driver_state.local_branches,
1075    NB = DS#driver_state.n_local_branches,
1076    NT = TC#tab_config.n_tellers_per_branch,
1077    NA = TC#tab_config.n_accounts_per_branch,
1078    Tmp = trunc(Random * NB * NT),
1079    BranchPos = (Tmp div NT) + 1,
1080    BranchId =
1081	case TC#tab_config.n_fragments of
1082	    0 -> BranchPos - 1;
1083	    _ -> lists:nth(BranchPos, Branches)
1084	end,
1085    RelativeTellerId = Tmp div NT,
1086    TellerId = (BranchId * NT) + RelativeTellerId,
1087    {AccountBranchId, AccountId} =
1088	if
1089	    Random >= 0.85, NB > 1 ->
1090		%% Pick from a remote account
1091                TmpAccountId= trunc(Random * (NB - 1) * NA),
1092		TmpAccountBranchId = TmpAccountId div NA,
1093		if
1094		    TmpAccountBranchId =:= BranchId ->
1095			{TmpAccountBranchId + 1, TmpAccountId + NA};
1096		    true ->
1097			{TmpAccountBranchId, TmpAccountId}
1098		end;
1099	    true ->
1100		%% Pick from a local account
1101		RelativeAccountId = trunc(Random * NA),
1102		TmpAccountId = (BranchId * NA) + RelativeAccountId,
1103		{BranchId, TmpAccountId}
1104	end,
1105
1106    {BranchId, [DriverId, BranchId, TellerId, AccountBranchId, AccountId, HistoryId, Delta]}.
1107
1108real_trans(RC, BranchId, Fun, Args, Mod) ->
1109    Type = RC#run_config.activity_type,
1110    case RC#run_config.spawn_near_branch of
1111	false ->
1112	    mnesia:activity(Type, Fun, Args, Mod);
1113	true ->
1114	    Node = table_info({branch, BranchId}, where_to_read),
1115	    case rpc:call(Node, mnesia, activity, [Type, Fun, Args, Mod]) of
1116		{badrpc, Reason} -> exit(Reason);
1117		Other -> Other
1118	    end
1119    end.
1120
1121trans_type(TC, RC) ->
1122    if
1123	TC#tab_config.n_fragments =:= 0,
1124	RC#run_config.use_sticky_locks =:= false ->
1125	    {fun add_delta/7, mnesia};
1126	TC#tab_config.n_fragments =:= 0,
1127	RC#run_config.use_sticky_locks =:= true ->
1128	    {fun sticky_add_delta/7, mnesia};
1129	TC#tab_config.n_fragments > 0,
1130	RC#run_config.use_sticky_locks =:= false ->
1131	    {fun frag_add_delta/7, mnesia_frag}
1132    end.
1133
1134%%
1135%% Runs the TPC-B defined transaction and returns NewAccountBalance
1136%%
1137
1138add_delta(DriverId, BranchId, TellerId, _AccountBranchId, AccountId, HistoryId, Delta) ->
1139    %% Grab write lock already when the record is read
1140
1141    %% Add delta to branch balance
1142    [B]  = mnesia:read(branch, BranchId, write),
1143    NewB = B#branch{balance = B#branch.balance + Delta},
1144    ok = mnesia:write(branch, NewB, write),
1145
1146    %% Add delta to teller balance
1147    [T]  = mnesia:read(teller, TellerId, write),
1148    NewT = T#teller{balance = T#teller.balance + Delta},
1149    ok = mnesia:write(teller, NewT, write),
1150
1151    %% Add delta to account balance
1152    [A]  = mnesia:read(account, AccountId, write),
1153    NewA = A#account{balance = A#account.balance + Delta},
1154    ok = mnesia:write(account, NewA, write),
1155
1156    %% Append to history log
1157    History = #history{history_id   = {DriverId, HistoryId},
1158		       account_id   = AccountId,
1159		       teller_id    = TellerId,
1160		       branch_id    = BranchId,
1161		       amount       = Delta
1162		      },
1163    ok = mnesia:write(history, History, write),
1164
1165    %% Return account balance
1166    NewA#account.balance.
1167
1168sticky_add_delta(DriverId, BranchId, TellerId, _AccountBranchId, AccountId, HistoryId, Delta) ->
1169    %% Grab orinary read lock when the record is read
1170    %% Grab sticky write lock when the record is written
1171    %% This transaction would benefit of an early  stick_write lock at read
1172
1173    %% Add delta to branch balance
1174    [B]  = mnesia:read(branch, BranchId, read),
1175    NewB = B#branch{balance = B#branch.balance + Delta},
1176    ok = mnesia:write(branch, NewB, sticky_write),
1177
1178    %% Add delta to teller balance
1179    [T]  = mnesia:read(teller, TellerId, read),
1180    NewT = T#teller{balance = T#teller.balance + Delta},
1181    ok = mnesia:write(teller, NewT, sticky_write),
1182
1183    %% Add delta to account balance
1184    [A]  = mnesia:read(account, AccountId, read),
1185    NewA = A#account{balance = A#account.balance + Delta},
1186    ok = mnesia:write(account, NewA, sticky_write),
1187
1188    %% Append to history log
1189    History = #history{history_id   = {DriverId, HistoryId},
1190		       account_id   = AccountId,
1191		       teller_id    = TellerId,
1192		       branch_id    = BranchId,
1193		       amount       = Delta
1194		      },
1195    ok = mnesia:write(history, History, sticky_write),
1196
1197    %% Return account balance
1198    NewA#account.balance.
1199
1200frag_add_delta(DriverId, BranchId, TellerId, AccountBranchId, AccountId, HistoryId, Delta) ->
1201    %% Access fragmented table
1202    %% Grab write lock already when the record is read
1203
1204    %% Add delta to branch balance
1205    [B] = mnesia:read(branch, BranchId, write),
1206    NewB = B#branch{balance = B#branch.balance + Delta},
1207    ok = mnesia:write(NewB),
1208
1209    %% Add delta to teller balance
1210    [T] = mnesia:read({teller, BranchId}, TellerId, write),
1211    NewT = T#teller{balance = T#teller.balance + Delta},
1212    ok = mnesia:write(NewT),
1213
1214    %% Add delta to account balance
1215    %%io:format("frag_add_delta(~p): ~p\n", [node(), {account, BranchId, AccountId}]),
1216    [A] = mnesia:read({account, AccountBranchId}, AccountId, write),
1217    NewA = A#account{balance = A#account.balance + Delta},
1218    ok = mnesia:write(NewA),
1219
1220    %% Append to history log
1221    History = #history{history_id   = {DriverId, HistoryId},
1222		       account_id   = AccountId,
1223		       teller_id    = TellerId,
1224		       branch_id    = BranchId,
1225		       amount       = Delta
1226		      },
1227    ok = mnesia:write(History),
1228
1229    %% Return account balance
1230    NewA#account.balance.
1231
1232%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1233%% Verify table consistency
1234
1235verify_tabs() ->
1236    Nodes = mnesia:system_info(running_db_nodes),
1237    case lists:member(node(), Nodes) of
1238	true ->
1239	    Tabs = [branch, teller, account, history],
1240	    io:format("TPC-B: Verifying tables: ~w~n", [Tabs]),
1241	    rpc:multicall(Nodes, mnesia, wait_for_tables, [Tabs, infinity]),
1242
1243	    Fun = fun() ->
1244			  mnesia:write_lock_table(branch),
1245			  mnesia:write_lock_table(teller),
1246			  mnesia:write_lock_table(account),
1247			  mnesia:write_lock_table(history),
1248			  {Res, BadNodes} =
1249			      rpc:multicall(Nodes, ?MODULE, count_balance, []),
1250			  check_balance(Res, BadNodes)
1251		  end,
1252	    case mnesia:transaction(Fun) of
1253		{atomic, Res} -> Res;
1254		{aborted, Reason} -> {error, Reason}
1255	    end;
1256	false ->
1257	    {error, "Must be initiated from a running db_node"}
1258    end.
1259
1260%% Returns a list of {Table, Node, Balance} tuples
1261%% Assumes that no updates are performed
1262
1263-record(summary, {table, node, balance, size}).
1264
1265count_balance() ->
1266    [count_balance(branch, #branch.balance),
1267     count_balance(teller, #teller.balance),
1268     count_balance(account, #account.balance)].
1269
1270count_balance(Tab, BalPos) ->
1271    Frags = table_info(Tab, frag_names),
1272    count_balance(Tab, Frags, 0, 0, BalPos).
1273
1274count_balance(Tab, [Frag | Frags], Bal, Size, BalPos) ->
1275    First = mnesia:dirty_first(Frag),
1276    {Bal2, Size2} = count_frag_balance(Frag, First, Bal, Size, BalPos),
1277    count_balance(Tab, Frags, Bal2, Size2, BalPos);
1278count_balance(Tab, [], Bal, Size, _BalPos) ->
1279    #summary{table = Tab, node = node(), balance = Bal, size = Size}.
1280
1281count_frag_balance(_Frag, '$end_of_table', Bal, Size, _BalPos) ->
1282    {Bal, Size};
1283count_frag_balance(Frag, Key, Bal, Size, BalPos) ->
1284    [Record] = mnesia:dirty_read({Frag, Key}),
1285    Bal2 = Bal + element(BalPos, Record),
1286    Next = mnesia:dirty_next(Frag, Key),
1287    count_frag_balance(Frag, Next, Bal2, Size + 1, BalPos).
1288
1289check_balance([], []) ->
1290    mnesia:abort({"No balance"});
1291check_balance(Summaries, []) ->
1292    [One | Rest] = lists:flatten(Summaries),
1293    Balance = One#summary.balance,
1294    %% Size = One#summary.size,
1295    case [S ||  S <- Rest, S#summary.balance =/= Balance] of
1296	[] ->
1297	    ok;
1298	BadSummaries ->
1299	    mnesia:abort({"Bad balance", One, BadSummaries})
1300    end;
1301check_balance(_, BadNodes) ->
1302    mnesia:abort({"Bad nodes", BadNodes}).
1303