1%% 2%% %CopyrightBegin% 3%% 4%% Copyright Ericsson AB 1996-2016. All Rights Reserved. 5%% 6%% Licensed under the Apache License, Version 2.0 (the "License"); 7%% you may not use this file except in compliance with the License. 8%% You may obtain a copy of the License at 9%% 10%% http://www.apache.org/licenses/LICENSE-2.0 11%% 12%% Unless required by applicable law or agreed to in writing, software 13%% distributed under the License is distributed on an "AS IS" BASIS, 14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15%% See the License for the specific language governing permissions and 16%% limitations under the License. 17%% 18%% %CopyrightEnd% 19%% 20 21%% 22%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 23%% 24%% MODULE 25%% 26%% mnesia_tpcb - TPC-B benchmarking of Mnesia 27%% 28%% DESCRIPTION 29%% 30%% The metrics used in the TPC-B benchmark are throughput as measured 31%% in transactions per second (TPS). The benchmark uses a single, 32%% simple update-intensive transaction to load the database system. 33%% The single transaction type provides a simple, repeatable 34%% unit of work, and is designed to exercise the basic components of 35%% a database system. 36%% 37%% The definition of the TPC-B states lots of detailed rules and 38%% conditions that must be fullfilled, e.g. how the ACID (atomicity, 39%% consistency, isolation and durability) properties are verified, 40%% how the random numbers must be distributed, minimum sizes of 41%% the different types of records, minimum duration of the benchmark, 42%% formulas to calculate prices (dollars per tps), disclosure issues 43%% etc. Please, see http://www.tpc.org/ about the nitty gritty details. 44%% 45%% The TPC-B benchmark is stated in terms of a hypothetical bank. The 46%% bank has one or more branches. Each branch has multiple tellers. The 47%% bank has many customers, each with an account. The database represents 48%% the cash position of each entity (branch, teller and account) and a 49%% history of recent transactions run by the bank. The transaction 50%% represents the work done when a customer makes a deposit or a 51%% withdrawal against his account. The transaction is performed by a 52%% teller at some branch. 53%% 54%% Each process that performs TPC-B transactions is called a driver. 55%% Drivers generates teller_id, account_id and delta amount of 56%% money randomly. An account, a teller and a branch are read, their 57%% balances are adjusted and a history record is created. The driver 58%% measures the time for 3 reads, 3 writes and 1 create. 59%% 60%% GETTING STARTED 61%% 62%% Generate tables and run with default configuration: 63%% 64%% mnesia_tpcb:start(). 65%% 66%% A little bit more advanced; 67%% 68%% spawn(mnesia_tpcb, start, [[[{n_drivers_per_node, 8}, {stop_after, infinity}]]), 69%% mnesia_tpcb:stop(). 70%% 71%% Really advanced; 72%% 73%% mnesia_tpcb:init(([{n_branches, 8}, {replica_type, disc_only_copies}]), 74%% mnesia_tpcb:run(([{n_drivers_per_node, 8}]), 75%% mnesia_tpcb:run(([{n_drivers_per_node, 64}]). 76%% 77%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 78 79-module(mnesia_tpcb). 80-author('hakan@erix.ericsson.se'). 81 82-export([ 83 config/2, 84 count_balance/0, 85 driver_init/2, 86 init/1, 87 reporter_init/2, 88 run/1, 89 start/0, 90 start/1, 91 start/2, 92 stop/0, 93 real_trans/5, 94 verify_tabs/0, 95 reply_gen_branch/3, 96 frag_add_delta/7, 97 98 conflict_test/1, 99 dist_test/1, 100 replica_test/1, 101 sticky_replica_test/1, 102 remote_test/1, 103 remote_frag2_test/1, 104 105 conflict_benchmark/1 106 ]). 107 108-include_lib("common_test/include/ct_event.hrl"). 109 110-define(SECOND, 1000000). 111 112%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 113%%% Account record, total size must be at least 100 bytes 114 115-define(ACCOUNT_FILLER, 116 {123456789012345678901234567890123456789012345678901234567890, 117 123456789012345678901234567890123456789012345678901234567890, 118 123456789012345678901234567890123456789012345678901234}). 119 120-record(account, 121 { 122 id = 0, % Unique account id 123 branch_id = 0, % Branch where the account is held 124 balance = 0, % Account balance 125 filler = ?ACCOUNT_FILLER % Gap filler to ensure size >= 100 bytes 126 }). 127 128%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 129%%% Branch record, total size must be at least 100 bytes 130 131-define(BRANCH_FILLER, 132 {123456789012345678901234567890123456789012345678901234567890, 133 123456789012345678901234567890123456789012345678901234567890, 134 123456789012345678901234567890123456789012345678901234567890}). 135 136-record(branch, 137 { 138 id = 0, % Unique branch id 139 balance = 0, % Total balance of whole branch 140 filler = ?BRANCH_FILLER % Gap filler to ensure size >= 100 bytes 141 }). 142 143%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 144%%% Teller record, total size must be at least 100 bytes 145 146-define(TELLER_FILLER, 147 {123456789012345678901234567890123456789012345678901234567890, 148 123456789012345678901234567890123456789012345678901234567890, 149 1234567890123456789012345678901234567890123456789012345678}). 150 151-record(teller, 152 { 153 id = 0, % Unique teller id 154 branch_id = 0, % Branch where the teller is located 155 balance = 0, % Teller balance 156 filler = ?TELLER_FILLER % Gap filler to ensure size >= 100 bytes 157 }). 158 159%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 160%%% History record, total size must be at least 50 bytes 161 162-define(HISTORY_FILLER, 1234567890). 163 164-record(history, 165 { 166 history_id = {0, 0}, % {DriverId, DriverLocalHistoryid} 167 time_stamp = erlang:system_time(), % Time point during active transaction 168 branch_id = 0, % Branch associated with teller 169 teller_id = 0, % Teller invlolved in transaction 170 account_id = 0, % Account updated by transaction 171 amount = 0, % Amount (delta) specified by transaction 172 filler = ?HISTORY_FILLER % Gap filler to ensure size >= 50 bytes 173 }). 174 175%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 176-record(tab_config, 177 { 178 db_nodes = [node()], 179 n_replicas = 1, % Ignored for non-fragmented tables 180 replica_nodes = [node()], 181 replica_type = ram_copies, 182 use_running_mnesia = false, 183 n_fragments = 0, 184 n_branches = 1, 185 n_tellers_per_branch = 10, % Must be 10 186 n_accounts_per_branch = 100000, % Must be 100000 187 branch_filler = ?BRANCH_FILLER, 188 account_filler = ?ACCOUNT_FILLER, 189 teller_filler = ?TELLER_FILLER 190 }). 191 192%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 193 194-record(run_config, 195 { 196 driver_nodes = [node()], 197 n_drivers_per_node = 1, 198 use_running_mnesia = false, 199 seed, 200 stop_after = timer:minutes(15), % Minimum 15 min 201 report_interval = timer:minutes(1), 202 send_bench_report = false, 203 use_sticky_locks = false, 204 spawn_near_branch = false, 205 activity_type = transaction, 206 reuse_history_id = false 207 }). 208 209%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 210 211-record(time, 212 { 213 n_trans = 0, 214 min_n = 0, 215 max_n = 0, 216 acc_time = 0, 217 max_time = 0 218 }). 219 220%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 221 222-record(driver_state, 223 { 224 driver_id, 225 driver_node, 226 seed, 227 n_local_branches, 228 local_branches, 229 tab_config, 230 run_config, 231 history_id, 232 time = #time{}, 233 acc_time = #time{}, 234 reuse_history_id 235 }). 236 237%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 238 239-record(reporter_state, 240 { 241 driver_pids, 242 starter_pid, 243 n_iters = 0, 244 prev_tps = 0, 245 curr = #time{}, 246 acc = #time{}, 247 init_micros, 248 prev_micros, 249 run_config 250 }). 251 252%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 253%% One driver on each node, table not replicated 254 255config(frag_test, ReplicaType) -> 256 Remote = nodes(), 257 Local = node(), 258 Nodes = [Local | Remote], 259 [ 260 {n_branches, length(Nodes)}, 261 {n_fragments, length(Nodes)}, 262 {replica_nodes, Nodes}, 263 {db_nodes, Nodes}, 264 {driver_nodes, Nodes}, 265 {n_accounts_per_branch, 100}, 266 {replica_type, ReplicaType}, 267 {stop_after, timer:minutes(1)}, 268 {report_interval, timer:seconds(10)}, 269 {reuse_history_id, true} 270 ]; 271 272%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 273%% One driver on each node, table replicated to two nodes. 274 275config(frag2_test, ReplicaType) -> 276 Remote = nodes(), 277 Local = node(), 278 Nodes = [Local | Remote], 279 [ 280 {n_branches, length(Nodes)}, 281 {n_fragments, length(Nodes)}, 282 {n_replicas, 2}, 283 {replica_nodes, Nodes}, 284 {db_nodes, Nodes}, 285 {driver_nodes, Nodes}, 286 {n_accounts_per_branch, 100}, 287 {replica_type, ReplicaType}, 288 {stop_after, timer:minutes(1)}, 289 {report_interval, timer:seconds(10)}, 290 {reuse_history_id, true} 291 ]; 292 293%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 294%% One driver on this node, table replicated to all nodes. 295 296config(replica_test, ReplicaType) -> 297 Remote = nodes(), 298 Local = node(), 299 Nodes = [Local | Remote], 300 [ 301 {db_nodes, Nodes}, 302 {driver_nodes, [Local]}, 303 {replica_nodes, Nodes}, 304 {n_accounts_per_branch, 100}, 305 {replica_type, ReplicaType}, 306 {stop_after, timer:minutes(1)}, 307 {report_interval, timer:seconds(10)}, 308 {reuse_history_id, true} 309 ]; 310 311%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 312%% One driver on this node, table replicated to all nodes. 313 314config(sticky_replica_test, ReplicaType) -> 315 Remote = nodes(), 316 Local = node(), 317 Nodes = [Local | Remote], 318 [ 319 {db_nodes, Nodes}, 320 {driver_nodes, [node()]}, 321 {replica_nodes, Nodes}, 322 {n_accounts_per_branch, 100}, 323 {replica_type, ReplicaType}, 324 {use_sticky_locks, true}, 325 {stop_after, timer:minutes(1)}, 326 {report_interval, timer:seconds(10)}, 327 {reuse_history_id, true} 328 ]; 329 330%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 331%% Ten drivers per node, tables replicated to all nodes, lots of branches 332 333config(dist_test, ReplicaType) -> 334 Remote = nodes(), 335 Local = node(), 336 Nodes = [Local | Remote], 337 [ 338 {db_nodes, Nodes}, 339 {driver_nodes, Nodes}, 340 {replica_nodes, Nodes}, 341 {n_drivers_per_node, 10}, 342 {n_branches, 10 * length(Nodes) * 100}, 343 {n_accounts_per_branch, 10}, 344 {replica_type, ReplicaType}, 345 {stop_after, timer:minutes(1)}, 346 {report_interval, timer:seconds(10)}, 347 {reuse_history_id, true} 348 ]; 349 350%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 351%% Ten drivers per node, tables replicated to all nodes, single branch 352 353config(conflict_test, ReplicaType) -> 354 Remote = nodes(), 355 Local = node(), 356 Nodes = [Local | Remote], 357 [ 358 {db_nodes, Nodes}, 359 {driver_nodes, Nodes}, 360 {replica_nodes, Nodes}, 361 {n_drivers_per_node, 10}, 362 {n_branches, 1}, 363 {n_accounts_per_branch, 10}, 364 {replica_type, ReplicaType}, 365 {stop_after, timer:minutes(1)}, 366 {report_interval, timer:seconds(10)}, 367 {reuse_history_id, true} 368 ]; 369 370%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 371%% One driver on this node, table replicated to all other nodes. 372 373config(remote_test, ReplicaType) -> 374 Remote = nodes(), 375 Local = node(), 376 Nodes = [Local | Remote], 377 [ 378 {db_nodes, Nodes}, 379 {driver_nodes, [Local]}, 380 {replica_nodes, Remote}, 381 {n_accounts_per_branch, 100}, 382 {replica_type, ReplicaType}, 383 {stop_after, timer:minutes(1)}, 384 {report_interval, timer:seconds(10)}, 385 {reuse_history_id, true} 386 ]; 387 388%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 389%% One driver on this node, table replicated to two other nodes. 390 391config(remote_frag2_test, ReplicaType) -> 392 Remote = nodes(), 393 Local = node(), 394 Nodes = [Local | Remote], 395 [ 396 {n_branches, length(Remote)}, 397 {n_fragments, length(Remote)}, 398 {n_replicas, 2}, 399 {replica_nodes, Remote}, 400 {db_nodes, Nodes}, 401 {driver_nodes, [Local]}, 402 {n_accounts_per_branch, 100}, 403 {replica_type, ReplicaType}, 404 {stop_after, timer:minutes(1)}, 405 {report_interval, timer:seconds(10)}, 406 {reuse_history_id, true} 407 ]; 408 409%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 410%% Ten drivers per node, tables replicated to all nodes, single branch 411 412config(conflict_benchmark, ReplicaType) -> 413 Remote = nodes(), 414 Local = node(), 415 Nodes = [Local | Remote], 416 [{db_nodes, Nodes}, 417 {driver_nodes, Nodes}, 418 {replica_nodes, Nodes}, 419 {n_drivers_per_node, 10}, 420 {n_branches, 1}, 421 {n_accounts_per_branch, 10}, 422 {replica_type, ReplicaType}, 423 {stop_after, timer:minutes(1)}, 424 {report_interval, timer:seconds(10)}, 425 {send_bench_report, true}, 426 {reuse_history_id, true} 427 ]. 428 429 430%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 431 432start(What, ReplicaType) -> 433 spawn_link(?MODULE, start, [config(What, ReplicaType)]). 434 435replica_test(ReplicaType) -> 436 start(replica_test, ReplicaType). 437 438sticky_replica_test(ReplicaType) -> 439 start(sticky_replica_test, ReplicaType). 440 441dist_test(ReplicaType) -> 442 start(dist_test, ReplicaType). 443 444conflict_test(ReplicaType) -> 445 start(conflict_test, ReplicaType). 446 447remote_test(ReplicaType) -> 448 start(remote_test, ReplicaType). 449 450remote_frag2_test(ReplicaType) -> 451 start(remote_frag2_test, ReplicaType). 452 453conflict_benchmark(ReplicaType) -> 454 start(config(conflict_benchmark, ReplicaType)). 455 456%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 457%% Args is a list of {Key, Val} tuples where Key is a field name 458%% in either the record tab_config or run_config. Unknown keys are ignored. 459 460start() -> 461 start([]). 462start(Args) -> 463 init(Args), 464 run(Args). 465 466list2rec(List, Fields, DefaultTuple) -> 467 [Name|Defaults] = tuple_to_list(DefaultTuple), 468 List2 = list2rec(List, Fields, Defaults, []), 469 list_to_tuple([Name] ++ List2). 470 471list2rec(_List, [], [], Acc) -> 472 Acc; 473list2rec(List, [F|Fields], [D|Defaults], Acc) -> 474 {Val, List2} = 475 case lists:keysearch(F, 1, List) of 476 false -> 477 {D, List}; 478 {value, {F, NewVal}} -> 479 {NewVal, lists:keydelete(F, 1, List)} 480 end, 481 list2rec(List2, Fields, Defaults, Acc ++ [Val]). 482 483stop() -> 484 case whereis(mnesia_tpcb) of 485 undefined -> 486 {error, not_running}; 487 Pid -> 488 sync_stop(Pid) 489 end. 490 491sync_stop(Pid) -> 492 Pid ! {self(), stop}, 493 receive 494 {Pid, {stopped, Res}} -> Res 495 after timer:minutes(1) -> 496 exit(Pid, kill), 497 {error, brutal_kill} 498 end. 499 500%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 501%%% Initialization 502 503%% Args is a list of {Key, Val} tuples where Key is a field name 504%% in the record tab_config, unknown keys are ignored. 505 506init(Args) -> 507 TabConfig0 = list2rec(Args, record_info(fields, tab_config), #tab_config{}), 508 TabConfig = 509 if 510 TabConfig0#tab_config.n_fragments =:= 0 -> 511 TabConfig0#tab_config{n_replicas = length(TabConfig0#tab_config.replica_nodes)}; 512 true -> 513 TabConfig0 514 end, 515 Tags = record_info(fields, tab_config), 516 Fun = fun(F, Pos) -> {{F, element(Pos, TabConfig)}, Pos + 1} end, 517 {List, _} = lists:mapfoldl(Fun, 2, Tags), 518 io:format("TPC-B: Table config: ~p ~n", [List]), 519 520 DbNodes = TabConfig#tab_config.db_nodes, 521 stop(), 522 if 523 TabConfig#tab_config.use_running_mnesia =:= true -> 524 ignore; 525 true -> 526 rpc:multicall(DbNodes, mnesia, lkill, []), 527 case mnesia:delete_schema(DbNodes) of 528 ok -> 529 case mnesia:create_schema(DbNodes) of 530 ok -> 531 {Replies, BadNodes} = 532 rpc:multicall(DbNodes, mnesia, start, []), 533 case [Res || Res <- Replies, Res =/= ok] of 534 [] when BadNodes =:= [] -> 535 ok; 536 BadRes -> 537 io:format("TPC-B: <ERROR> " 538 "Failed to start ~p: ~p~n", 539 [BadNodes, BadRes]), 540 exit({start_failed, BadRes, BadNodes}) 541 end; 542 {error, Reason} -> 543 io:format("TPC-B: <ERROR> " 544 "Failed to create schema on disc: ~p~n", 545 [Reason]), 546 exit({create_schema_failed, Reason}) 547 end; 548 {error, Reason} -> 549 io:format("TPC-B: <ERROR> " 550 "Failed to delete schema on disc: ~p~n", 551 [Reason]), 552 exit({delete_schema_failed, Reason}) 553 end 554 end, 555 gen_tabs(TabConfig). 556 557gen_tabs(TC) -> 558 create_tab(TC, branch, record_info(fields, branch), 559 undefined), 560 create_tab(TC, account, record_info(fields, account), 561 {branch, #account.branch_id}), 562 create_tab(TC, teller, record_info(fields, teller), 563 {branch, #teller.branch_id}), 564 create_tab(TC, history, record_info(fields, history), 565 {branch, #history.branch_id}), 566 567 NB = TC#tab_config.n_branches, 568 NT = TC#tab_config.n_tellers_per_branch, 569 NA = TC#tab_config.n_accounts_per_branch, 570 io:format("TPC-B: Generating ~p branches a ~p bytes~n", 571 [NB, size(term_to_binary(default_branch(TC)))]), 572 io:format("TPC-B: Generating ~p * ~p tellers a ~p bytes~n", 573 [NB, NT, size(term_to_binary(default_teller(TC)))]), 574 io:format("TPC-B: Generating ~p * ~p accounts a ~p bytes~n", 575 [NB, NA, size(term_to_binary(default_account(TC)))]), 576 io:format("TPC-B: Generating 0 history records a ~p bytes~n", 577 [size(term_to_binary(default_history(TC)))]), 578 gen_branches(TC), 579 580 case verify_tabs() of 581 ok -> 582 ignore; 583 {error, Reason} -> 584 io:format("TPC-B: <ERROR> Inconsistent tables: ~w~n", 585 [Reason]), 586 exit({inconsistent_tables, Reason}) 587 end. 588 589create_tab(TC, Name, Attrs, _ForeignKey) when TC#tab_config.n_fragments =:= 0 -> 590 Nodes = TC#tab_config.replica_nodes, 591 Type = TC#tab_config.replica_type, 592 Def = [{Type, Nodes}, {attributes, Attrs}], 593 create_tab(Name, Def); 594create_tab(TC, Name, Attrs, ForeignKey) -> 595 NReplicas = TC#tab_config.n_replicas, 596 NodePool = TC#tab_config.replica_nodes, 597 Type = TC#tab_config.replica_type, 598 NF = TC#tab_config.n_fragments, 599 Props = [{n_fragments, NF}, 600 {node_pool, NodePool}, 601 {n_copies(Type), NReplicas}, 602 {foreign_key, ForeignKey}], 603 Def = [{frag_properties, Props}, 604 {attributes, Attrs}], 605 create_tab(Name, Def). 606 607create_tab(Name, Def) -> 608 mnesia:delete_table(Name), 609 case mnesia:create_table(Name, Def) of 610 {atomic, ok} -> 611 ok; 612 {aborted, Reason} -> 613 io:format("TPC-B: <ERROR> failed to create table ~w ~w: ~p~n", 614 [Name, Def, Reason]), 615 exit({create_table_failed, Reason}) 616 end. 617 618n_copies(Type) -> 619 case Type of 620 ram_copies -> n_ram_copies; 621 disc_copies -> n_disc_copies; 622 disc_only_copies -> n_disc_only_copies 623 end. 624 625gen_branches(TC) -> 626 First = 0, 627 Last = First + TC#tab_config.n_branches - 1, 628 GenPids = gen_branches(TC, First, Last, []), 629 wait_for_gen(GenPids). 630 631wait_for_gen([]) -> 632 ok; 633wait_for_gen(Pids) -> 634 receive 635 {branch_generated, Pid} -> wait_for_gen(lists:delete(Pid, Pids)); 636 Exit -> 637 exit({tpcb_failed, Exit}) 638 end. 639 640gen_branches(TC, BranchId, Last, UsedNs) when BranchId =< Last -> 641 UsedNs2 = get_branch_nodes(BranchId, UsedNs), 642 Node = hd(UsedNs2), 643 Pid = spawn_link(Node, ?MODULE, reply_gen_branch, 644 [self(), TC, BranchId]), 645 [Pid | gen_branches(TC, BranchId + 1, Last, UsedNs2)]; 646gen_branches(_, _, _, _) -> 647 []. 648 649reply_gen_branch(ReplyTo, TC, BranchId) -> 650 gen_branch(TC, BranchId), 651 ReplyTo ! {branch_generated, self()}, 652 unlink(ReplyTo). 653 654%% Returns a new list of nodes with the best node as head 655get_branch_nodes(BranchId, UsedNs) -> 656 WriteNs = table_info({branch, BranchId}, where_to_write), 657 WeightedNs = [{n_duplicates(N, UsedNs, 0), N} || N <- WriteNs], 658 [{_, LeastUsed} | _ ] = lists:sort(WeightedNs), 659 [LeastUsed | UsedNs]. 660 661n_duplicates(_N, [], Count) -> 662 Count; 663n_duplicates(N, [N | Tail], Count) -> 664 n_duplicates(N, Tail, Count + 1); 665n_duplicates(N, [_ | Tail], Count) -> 666 n_duplicates(N, Tail, Count). 667 668gen_branch(TC, BranchId) -> 669 A = default_account(TC), 670 NA = TC#tab_config.n_accounts_per_branch, 671 FirstA = BranchId * NA, 672 ArgsA = [FirstA, FirstA + NA - 1, BranchId, A], 673 ok = mnesia:activity(async_dirty, fun gen_accounts/4, ArgsA, mnesia_frag), 674 675 T = default_teller(TC), 676 NT = TC#tab_config.n_tellers_per_branch, 677 FirstT = BranchId * NT, 678 ArgsT = [FirstT, FirstT + NT - 1, BranchId, T], 679 ok = mnesia:activity(async_dirty, fun gen_tellers/4, ArgsT, mnesia_frag), 680 681 B = default_branch(TC), 682 FunB = fun() -> mnesia:write(branch, B#branch{id = BranchId}, write) end, 683 ok = mnesia:activity(sync_dirty, FunB, [], mnesia_frag). 684 685gen_tellers(Id, Last, BranchId, T) when Id =< Last -> 686 mnesia:write(teller, T#teller{id = Id, branch_id=BranchId}, write), 687 gen_tellers(Id + 1, Last, BranchId, T); 688gen_tellers(_, _, _, _) -> 689 ok. 690 691gen_accounts(Id, Last, BranchId, A) when Id =< Last -> 692 mnesia:write(account, A#account{id = Id, branch_id=BranchId}, write), 693 gen_accounts(Id + 1, Last, BranchId, A); 694gen_accounts(_, _, _, _) -> 695 ok. 696 697default_branch(TC) -> #branch{filler = TC#tab_config.branch_filler}. 698default_teller(TC) -> #teller{filler = TC#tab_config.teller_filler}. 699default_account(TC) -> #account{filler = TC#tab_config.account_filler}. 700default_history(_TC) -> #history{}. 701 702%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 703%%% Run the benchmark 704 705%% Args is a list of {Key, Val} tuples where Key is a field name 706%% in the record run_config, unknown keys are ignored. 707run(Args) -> 708 RunConfig = list2rec(Args, record_info(fields, run_config), #run_config{}), 709 Tags = record_info(fields, run_config), 710 Fun = fun(F, Pos) -> {{F, element(Pos, RunConfig)}, Pos + 1} end, 711 {List, _} = lists:mapfoldl(Fun, 2, Tags), 712 io:format("TPC-B: Run config: ~p ~n", [List]), 713 714 Pid = spawn_link(?MODULE, reporter_init, [self(), RunConfig]), 715 receive 716 {Pid, {stopped, Res}} -> 717 Res; % Stopped by other process 718 Else -> 719 {tpcb_got, Else} 720 after RunConfig#run_config.stop_after -> 721 sync_stop(Pid) 722 end. 723 724reporter_init(Starter, RC) -> 725 register(mnesia_tpcb, self()), 726 process_flag(trap_exit, true), 727 DbNodes = mnesia:system_info(db_nodes), 728 if 729 RC#run_config.use_running_mnesia =:= true -> 730 ignore; 731 true -> 732 {Replies, BadNodes} = 733 rpc:multicall(DbNodes, mnesia, start, []), 734 case [Res || Res <- Replies, Res =/= ok] of 735 [] when BadNodes =:= [] -> 736 ok; 737 BadRes -> 738 io:format("TPC-B: <ERROR> " 739 "Failed to start ~w: ~p~n", 740 [BadNodes, BadRes]), 741 exit({start_failed, BadRes, BadNodes}) 742 end, 743 verify_tabs() 744 end, 745 746 N = table_info(branch, size), 747 NT = table_info(teller, size) div N, 748 NA = table_info(account, size) div N, 749 750 {Type, NF, RepNodes} = table_storage(branch), 751 TC = #tab_config{n_fragments = NF, 752 n_branches = N, 753 n_tellers_per_branch = NT, 754 n_accounts_per_branch = NA, 755 db_nodes = DbNodes, 756 replica_nodes = RepNodes, 757 replica_type = Type 758 }, 759 Drivers = start_drivers(RC, TC), 760 Now = erlang:monotonic_time(), 761 State = #reporter_state{driver_pids = Drivers, 762 run_config = RC, 763 starter_pid = Starter, 764 init_micros = Now, 765 prev_micros = Now 766 }, 767 case catch reporter_loop(State) of 768 {'EXIT', Reason} -> 769 io:format("TPC-B: Abnormal termination: ~p~n", [Reason]), 770 if 771 RC#run_config.use_running_mnesia =:= true -> 772 ignore; 773 true -> 774 rpc:multicall(DbNodes, mnesia, lkill, []) 775 end, 776 unlink(Starter), 777 Starter ! {self(), {stopped, {error, Reason}}}, % To be sure 778 exit(shutdown); 779 {ok, Stopper, State2} -> 780 Time = State2#reporter_state.acc, 781 Res = 782 case verify_tabs() of 783 ok -> 784 {ok, Time}; 785 {error, Reason} -> 786 io:format("TPC-B: <ERROR> Inconsistent tables, ~p~n", 787 [{error, Reason}]), 788 {error, Reason} 789 end, 790 if 791 RC#run_config.use_running_mnesia =:= true -> 792 ignore; 793 true -> 794 rpc:multicall(DbNodes, mnesia, stop, []) 795 end, 796 unlink(Starter), 797 Starter ! {self(), {stopped, Res}}, 798 if 799 Stopper =/= Starter -> 800 Stopper ! {self(), {stopped, Res}}; 801 true -> 802 ignore 803 end, 804 exit(shutdown) 805 end. 806 807table_info(Tab, Item) -> 808 Fun = fun() -> mnesia:table_info(Tab, Item) end, 809 mnesia:activity(sync_dirty, Fun, mnesia_frag). 810 811%% Returns {Storage, NFragments, ReplicaNodes} 812table_storage(Tab) -> 813 case mnesia:table_info(branch, frag_properties) of 814 [] -> 815 NFO = 0, 816 NR = length(mnesia:table_info(Tab, ram_copies)), 817 ND = length(mnesia:table_info(Tab, disc_copies)), 818 NDO = length(mnesia:table_info(Tab, disc_only_copies)), 819 if 820 NR =/= 0 -> {ram_copies, NFO, NR}; 821 ND =/= 0 -> {disc_copies, NFO, ND}; 822 NDO =/= 0 -> {disc_copies, NFO, NDO} 823 end; 824 Props -> 825 {value, NFO} = lists:keysearch(n_fragments, 1, Props), 826 NR = table_info(Tab, n_ram_copies), 827 ND = table_info(Tab, n_disc_copies), 828 NDO = table_info(Tab, n_disc_only_copies), 829 if 830 NR =/= 0 -> {ram_copies, NFO, NR}; 831 ND =/= 0 -> {disc_copies, NFO, ND}; 832 NDO =/= 0 -> {disc_copies, NFO, NDO} 833 end 834 end. 835 836reporter_loop(State) -> 837 RC = State#reporter_state.run_config, 838 receive 839 {From, stop} -> 840 {ok, From, call_drivers(State, stop)}; 841 {'EXIT', Pid, Reason} when Pid =:= State#reporter_state.starter_pid -> 842 %% call_drivers(State, stop), 843 exit({starter_died, Pid, Reason}) 844 after RC#run_config.report_interval -> 845 Iters = State#reporter_state.n_iters, 846 State2 = State#reporter_state{n_iters = Iters + 1}, 847 case call_drivers(State2, report) of 848 State3 when State3#reporter_state.driver_pids =/= [] -> 849 State4 = State3#reporter_state{curr = #time{}}, 850 reporter_loop(State4); 851 _ -> 852 exit(drivers_died) 853 end 854 end. 855 856call_drivers(State, Msg) -> 857 Drivers = State#reporter_state.driver_pids, 858 lists:foreach(fun(Pid) -> Pid ! {self(), Msg} end, Drivers), 859 State2 = show_report(calc_reports(Drivers, State)), 860 case Msg =:= stop of 861 true -> 862 Acc = State2#reporter_state.acc, 863 Init = State2#reporter_state.init_micros, 864 show_report(State2#reporter_state{n_iters = 0, 865 curr = Acc, 866 prev_micros = Init}); 867 false -> 868 ignore 869 end, 870 State2. 871 872calc_reports([], State) -> 873 State; 874calc_reports([Pid|Drivers], State) -> 875 receive 876 {'EXIT', P, Reason} when P =:= State#reporter_state.starter_pid -> 877 exit({starter_died, P, Reason}); 878 {'EXIT', Pid, Reason} -> 879 exit({driver_died, Pid, Reason}); 880 {Pid, Time} when is_record(Time, time) -> 881 %% io:format("~w: ~w~n", [Pid, Time]), 882 A = add_time(State#reporter_state.acc, Time), 883 C = add_time(State#reporter_state.curr, Time), 884 State2 = State#reporter_state{acc = A, curr = C}, 885 calc_reports(Drivers, State2) 886 end. 887 888add_time(Acc, New) -> 889 Acc#time{n_trans = New#time.n_trans + Acc#time.n_trans, 890 min_n = lists:min([New#time.n_trans, Acc#time.min_n] -- [0]), 891 max_n = lists:max([New#time.n_trans, Acc#time.max_n]), 892 acc_time = New#time.acc_time + Acc#time.acc_time, 893 max_time = lists:max([New#time.max_time, Acc#time.max_time])}. 894 895-define(AVOID_DIV_ZERO(_What_), try (_What_) catch _:_ -> 0 end). 896 897show_report(State) -> 898 Now = erlang:timestamp(), 899 Iters = State#reporter_state.n_iters, 900 Cfg = State#reporter_state.run_config, 901 Time = State#reporter_state.curr, 902 Max = Time#time.max_time, 903 N = Time#time.n_trans, 904 Avg = ?AVOID_DIV_ZERO(Time#time.acc_time div N), 905 AliveN = length(State#reporter_state.driver_pids), 906 Tps = ?AVOID_DIV_ZERO((?SECOND * AliveN) div Avg), 907 PrevTps= State#reporter_state.prev_tps, 908 {DiffSign, DiffTps} = signed_diff(Iters, Tps, PrevTps), 909 Unfairness = ?AVOID_DIV_ZERO(Time#time.max_n / Time#time.min_n), 910 BruttoAvg = ?AVOID_DIV_ZERO((Now - State#reporter_state.prev_micros) div N), 911%% io:format("n_iters=~p, n_trans=~p, n_drivers=~p, avg=~p, now=~p, prev=~p~n", 912%% [Iters, N, AliveN, BruttoAvg, Now, State#reporter_state.prev_micros]), 913 BruttoTps = ?AVOID_DIV_ZERO(?SECOND div BruttoAvg), 914 case Iters > 0 of 915 true -> 916 io:format("TPC-B: ~p iter ~s~p diff ~p (~p) tps ~p avg micros ~p max micros ~p unfairness~n", 917 [Iters, DiffSign, DiffTps, Tps, BruttoTps, Avg, Max, Unfairness]); 918 false -> 919 io:format("TPC-B: ~p (~p) transactions per second, " 920 "duration of longest transaction was ~p milliseconds~n", 921 [Tps, BruttoTps, Max div 1000]) 922 end, 923 case Cfg#run_config.send_bench_report of 924 true -> 925 ct_event:notify( 926 #event{name = benchmark_data, 927 data = [{suite,"mnesia_tpcb"}, 928 {value,Tps}]}); 929 _ -> 930 ok 931 end, 932 933 State#reporter_state{prev_tps = Tps, prev_micros = Now}. 934 935signed_diff(Iters, Curr, Prev) -> 936 case Iters > 1 of 937 true -> sign(Curr - Prev); 938 false -> sign(0) 939 end. 940 941sign(N) when N > 0 -> {"+", N}; 942sign(N) -> {"", N}. 943 944start_drivers(RC, TC) -> 945 LastHistoryId = table_info(history, size), 946 Reuse = RC#run_config.reuse_history_id, 947 DS = #driver_state{tab_config = TC, 948 run_config = RC, 949 n_local_branches = 0, 950 local_branches = [], 951 history_id = LastHistoryId, 952 reuse_history_id = Reuse}, 953 Nodes = RC#run_config.driver_nodes, 954 NB = TC#tab_config.n_branches, 955 First = 0, 956 AllBranches = lists:seq(First, First + NB - 1), 957 ND = RC#run_config.n_drivers_per_node, 958 Spawn = fun(Spec) -> 959 Node = Spec#driver_state.driver_node, 960 spawn_link(Node, ?MODULE, driver_init, [Spec, AllBranches]) 961 end, 962 Specs = [DS#driver_state{driver_id = Id, driver_node = N} 963 || N <- Nodes, 964 Id <- lists:seq(1, ND)], 965 Specs2 = lists:sort(lists:flatten(Specs)), 966 {Specs3, OrphanBranches} = alloc_local_branches(AllBranches, Specs2, []), 967 case length(OrphanBranches) of 968 N when N =< 10 -> 969 io:format("TPC-B: Orphan branches: ~p~n", [OrphanBranches]); 970 N -> 971 io:format("TPC-B: Orphan branches: ~p~n", [N]) 972 end, 973 [Spawn(Spec) || Spec <- Specs3]. 974 975alloc_local_branches([BranchId | Tail], Specs, OrphanBranches) -> 976 Nodes = table_info({branch, BranchId}, where_to_write), 977 LocalSpecs = [DS || DS <- Specs, 978 lists:member(DS#driver_state.driver_node, Nodes)], 979 case lists:keysort(#driver_state.n_local_branches, LocalSpecs) of 980 [] -> 981 alloc_local_branches(Tail, Specs, [BranchId | OrphanBranches]); 982 [DS | _] -> 983 LocalNB = DS#driver_state.n_local_branches + 1, 984 LocalBranches = [BranchId | DS#driver_state.local_branches], 985 DS2 = DS#driver_state{n_local_branches = LocalNB, 986 local_branches = LocalBranches}, 987 Specs2 = Specs -- [DS], 988 Specs3 = [DS2 | Specs2], 989 alloc_local_branches(Tail, Specs3, OrphanBranches) 990 end; 991alloc_local_branches([], Specs, OrphanBranches) -> 992 {Specs, OrphanBranches}. 993 994driver_init(DS, AllBranches) -> 995 Seed = case (DS#driver_state.run_config)#run_config.seed of 996 undefined -> rand:seed(exsplus); 997 ExpSeed -> rand:seed(ExpSeed) 998 end, 999 1000 DS2 = 1001 if 1002 DS#driver_state.n_local_branches =:= 0 -> 1003 DS#driver_state{seed = Seed, 1004 n_local_branches = length(AllBranches), 1005 local_branches = AllBranches}; 1006 true -> 1007 DS#driver_state{seed = Seed} 1008 end, 1009 io:format("TPC-B: Driver ~p started as ~p on node ~p with ~p local branches~n", 1010 [DS2#driver_state.driver_id, self(), node(), DS2#driver_state.n_local_branches]), 1011 driver_loop(DS2). 1012 1013driver_loop(DS) -> 1014 receive 1015 {From, report} -> 1016 From ! {self(), DS#driver_state.time}, 1017 Acc = add_time(DS#driver_state.time, DS#driver_state.acc_time), 1018 DS2 = DS#driver_state{time=#time{}, acc_time = Acc}, % Reset timer 1019 DS3 = calc_trans(DS2), 1020 driver_loop(DS3); 1021 {From, stop} -> 1022 Acc = add_time(DS#driver_state.time, DS#driver_state.acc_time), 1023 io:format("TPC-B: Driver ~p (~p) on node ~p stopped: ~w~n", 1024 [DS#driver_state.driver_id, self(), node(self()), Acc]), 1025 From ! {self(), DS#driver_state.time}, 1026 unlink(From), 1027 exit(stopped) 1028 after 0 -> 1029 DS2 = calc_trans(DS), 1030 driver_loop(DS2) 1031 end. 1032 1033calc_trans(DS) -> 1034 {Micros, DS2} = time_trans(DS), 1035 Time = DS2#driver_state.time, 1036 Time2 = Time#time{n_trans = Time#time.n_trans + 1, 1037 acc_time = Time#time.acc_time + Micros, 1038 max_time = lists:max([Micros, Time#time.max_time]) 1039 }, 1040 case DS#driver_state.reuse_history_id of 1041 false -> 1042 HistoryId = DS#driver_state.history_id + 1, 1043 DS2#driver_state{time=Time2, history_id = HistoryId}; 1044 true -> 1045 DS2#driver_state{time=Time2} 1046 end. 1047 1048%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 1049 1050%% Generate teller_id, account_id and delta 1051%% Time the TPC-B transaction 1052time_trans(DS) -> 1053 {Random, NewSeed} = rand:uniform_s(DS#driver_state.seed), 1054 1055 TC = DS#driver_state.tab_config, 1056 RC = DS#driver_state.run_config, 1057 {Branchid, Args} = random_to_args(Random, DS), 1058 {Fun, Mod} = trans_type(TC, RC), 1059 {Time, Res} = timer:tc(?MODULE, real_trans, [RC, Branchid, Fun, Args, Mod]), 1060 1061 case Res of 1062 AccountBal when is_integer(AccountBal) -> 1063 {Time, DS#driver_state{seed = NewSeed}}; 1064 Other -> 1065 exit({crash, Other, Args, Random, DS}) 1066 end. 1067 1068random_to_args(Random, DS) -> 1069 DriverId = DS#driver_state.driver_id, 1070 TC = DS#driver_state.tab_config, 1071 HistoryId = DS#driver_state.history_id, 1072 Delta = trunc(Random * 1999998) - 999999, % -999999 <= Delta <= +999999 1073 1074 Branches = DS#driver_state.local_branches, 1075 NB = DS#driver_state.n_local_branches, 1076 NT = TC#tab_config.n_tellers_per_branch, 1077 NA = TC#tab_config.n_accounts_per_branch, 1078 Tmp = trunc(Random * NB * NT), 1079 BranchPos = (Tmp div NT) + 1, 1080 BranchId = 1081 case TC#tab_config.n_fragments of 1082 0 -> BranchPos - 1; 1083 _ -> lists:nth(BranchPos, Branches) 1084 end, 1085 RelativeTellerId = Tmp div NT, 1086 TellerId = (BranchId * NT) + RelativeTellerId, 1087 {AccountBranchId, AccountId} = 1088 if 1089 Random >= 0.85, NB > 1 -> 1090 %% Pick from a remote account 1091 TmpAccountId= trunc(Random * (NB - 1) * NA), 1092 TmpAccountBranchId = TmpAccountId div NA, 1093 if 1094 TmpAccountBranchId =:= BranchId -> 1095 {TmpAccountBranchId + 1, TmpAccountId + NA}; 1096 true -> 1097 {TmpAccountBranchId, TmpAccountId} 1098 end; 1099 true -> 1100 %% Pick from a local account 1101 RelativeAccountId = trunc(Random * NA), 1102 TmpAccountId = (BranchId * NA) + RelativeAccountId, 1103 {BranchId, TmpAccountId} 1104 end, 1105 1106 {BranchId, [DriverId, BranchId, TellerId, AccountBranchId, AccountId, HistoryId, Delta]}. 1107 1108real_trans(RC, BranchId, Fun, Args, Mod) -> 1109 Type = RC#run_config.activity_type, 1110 case RC#run_config.spawn_near_branch of 1111 false -> 1112 mnesia:activity(Type, Fun, Args, Mod); 1113 true -> 1114 Node = table_info({branch, BranchId}, where_to_read), 1115 case rpc:call(Node, mnesia, activity, [Type, Fun, Args, Mod]) of 1116 {badrpc, Reason} -> exit(Reason); 1117 Other -> Other 1118 end 1119 end. 1120 1121trans_type(TC, RC) -> 1122 if 1123 TC#tab_config.n_fragments =:= 0, 1124 RC#run_config.use_sticky_locks =:= false -> 1125 {fun add_delta/7, mnesia}; 1126 TC#tab_config.n_fragments =:= 0, 1127 RC#run_config.use_sticky_locks =:= true -> 1128 {fun sticky_add_delta/7, mnesia}; 1129 TC#tab_config.n_fragments > 0, 1130 RC#run_config.use_sticky_locks =:= false -> 1131 {fun frag_add_delta/7, mnesia_frag} 1132 end. 1133 1134%% 1135%% Runs the TPC-B defined transaction and returns NewAccountBalance 1136%% 1137 1138add_delta(DriverId, BranchId, TellerId, _AccountBranchId, AccountId, HistoryId, Delta) -> 1139 %% Grab write lock already when the record is read 1140 1141 %% Add delta to branch balance 1142 [B] = mnesia:read(branch, BranchId, write), 1143 NewB = B#branch{balance = B#branch.balance + Delta}, 1144 ok = mnesia:write(branch, NewB, write), 1145 1146 %% Add delta to teller balance 1147 [T] = mnesia:read(teller, TellerId, write), 1148 NewT = T#teller{balance = T#teller.balance + Delta}, 1149 ok = mnesia:write(teller, NewT, write), 1150 1151 %% Add delta to account balance 1152 [A] = mnesia:read(account, AccountId, write), 1153 NewA = A#account{balance = A#account.balance + Delta}, 1154 ok = mnesia:write(account, NewA, write), 1155 1156 %% Append to history log 1157 History = #history{history_id = {DriverId, HistoryId}, 1158 account_id = AccountId, 1159 teller_id = TellerId, 1160 branch_id = BranchId, 1161 amount = Delta 1162 }, 1163 ok = mnesia:write(history, History, write), 1164 1165 %% Return account balance 1166 NewA#account.balance. 1167 1168sticky_add_delta(DriverId, BranchId, TellerId, _AccountBranchId, AccountId, HistoryId, Delta) -> 1169 %% Grab orinary read lock when the record is read 1170 %% Grab sticky write lock when the record is written 1171 %% This transaction would benefit of an early stick_write lock at read 1172 1173 %% Add delta to branch balance 1174 [B] = mnesia:read(branch, BranchId, read), 1175 NewB = B#branch{balance = B#branch.balance + Delta}, 1176 ok = mnesia:write(branch, NewB, sticky_write), 1177 1178 %% Add delta to teller balance 1179 [T] = mnesia:read(teller, TellerId, read), 1180 NewT = T#teller{balance = T#teller.balance + Delta}, 1181 ok = mnesia:write(teller, NewT, sticky_write), 1182 1183 %% Add delta to account balance 1184 [A] = mnesia:read(account, AccountId, read), 1185 NewA = A#account{balance = A#account.balance + Delta}, 1186 ok = mnesia:write(account, NewA, sticky_write), 1187 1188 %% Append to history log 1189 History = #history{history_id = {DriverId, HistoryId}, 1190 account_id = AccountId, 1191 teller_id = TellerId, 1192 branch_id = BranchId, 1193 amount = Delta 1194 }, 1195 ok = mnesia:write(history, History, sticky_write), 1196 1197 %% Return account balance 1198 NewA#account.balance. 1199 1200frag_add_delta(DriverId, BranchId, TellerId, AccountBranchId, AccountId, HistoryId, Delta) -> 1201 %% Access fragmented table 1202 %% Grab write lock already when the record is read 1203 1204 %% Add delta to branch balance 1205 [B] = mnesia:read(branch, BranchId, write), 1206 NewB = B#branch{balance = B#branch.balance + Delta}, 1207 ok = mnesia:write(NewB), 1208 1209 %% Add delta to teller balance 1210 [T] = mnesia:read({teller, BranchId}, TellerId, write), 1211 NewT = T#teller{balance = T#teller.balance + Delta}, 1212 ok = mnesia:write(NewT), 1213 1214 %% Add delta to account balance 1215 %%io:format("frag_add_delta(~p): ~p\n", [node(), {account, BranchId, AccountId}]), 1216 [A] = mnesia:read({account, AccountBranchId}, AccountId, write), 1217 NewA = A#account{balance = A#account.balance + Delta}, 1218 ok = mnesia:write(NewA), 1219 1220 %% Append to history log 1221 History = #history{history_id = {DriverId, HistoryId}, 1222 account_id = AccountId, 1223 teller_id = TellerId, 1224 branch_id = BranchId, 1225 amount = Delta 1226 }, 1227 ok = mnesia:write(History), 1228 1229 %% Return account balance 1230 NewA#account.balance. 1231 1232%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 1233%% Verify table consistency 1234 1235verify_tabs() -> 1236 Nodes = mnesia:system_info(running_db_nodes), 1237 case lists:member(node(), Nodes) of 1238 true -> 1239 Tabs = [branch, teller, account, history], 1240 io:format("TPC-B: Verifying tables: ~w~n", [Tabs]), 1241 rpc:multicall(Nodes, mnesia, wait_for_tables, [Tabs, infinity]), 1242 1243 Fun = fun() -> 1244 mnesia:write_lock_table(branch), 1245 mnesia:write_lock_table(teller), 1246 mnesia:write_lock_table(account), 1247 mnesia:write_lock_table(history), 1248 {Res, BadNodes} = 1249 rpc:multicall(Nodes, ?MODULE, count_balance, []), 1250 check_balance(Res, BadNodes) 1251 end, 1252 case mnesia:transaction(Fun) of 1253 {atomic, Res} -> Res; 1254 {aborted, Reason} -> {error, Reason} 1255 end; 1256 false -> 1257 {error, "Must be initiated from a running db_node"} 1258 end. 1259 1260%% Returns a list of {Table, Node, Balance} tuples 1261%% Assumes that no updates are performed 1262 1263-record(summary, {table, node, balance, size}). 1264 1265count_balance() -> 1266 [count_balance(branch, #branch.balance), 1267 count_balance(teller, #teller.balance), 1268 count_balance(account, #account.balance)]. 1269 1270count_balance(Tab, BalPos) -> 1271 Frags = table_info(Tab, frag_names), 1272 count_balance(Tab, Frags, 0, 0, BalPos). 1273 1274count_balance(Tab, [Frag | Frags], Bal, Size, BalPos) -> 1275 First = mnesia:dirty_first(Frag), 1276 {Bal2, Size2} = count_frag_balance(Frag, First, Bal, Size, BalPos), 1277 count_balance(Tab, Frags, Bal2, Size2, BalPos); 1278count_balance(Tab, [], Bal, Size, _BalPos) -> 1279 #summary{table = Tab, node = node(), balance = Bal, size = Size}. 1280 1281count_frag_balance(_Frag, '$end_of_table', Bal, Size, _BalPos) -> 1282 {Bal, Size}; 1283count_frag_balance(Frag, Key, Bal, Size, BalPos) -> 1284 [Record] = mnesia:dirty_read({Frag, Key}), 1285 Bal2 = Bal + element(BalPos, Record), 1286 Next = mnesia:dirty_next(Frag, Key), 1287 count_frag_balance(Frag, Next, Bal2, Size + 1, BalPos). 1288 1289check_balance([], []) -> 1290 mnesia:abort({"No balance"}); 1291check_balance(Summaries, []) -> 1292 [One | Rest] = lists:flatten(Summaries), 1293 Balance = One#summary.balance, 1294 %% Size = One#summary.size, 1295 case [S || S <- Rest, S#summary.balance =/= Balance] of 1296 [] -> 1297 ok; 1298 BadSummaries -> 1299 mnesia:abort({"Bad balance", One, BadSummaries}) 1300 end; 1301check_balance(_, BadNodes) -> 1302 mnesia:abort({"Bad nodes", BadNodes}). 1303