1%% 2%% %CopyrightBegin% 3%% 4%% Copyright Ericsson AB 1996-2020. All Rights Reserved. 5%% 6%% Licensed under the Apache License, Version 2.0 (the "License"); 7%% you may not use this file except in compliance with the License. 8%% You may obtain a copy of the License at 9%% 10%% http://www.apache.org/licenses/LICENSE-2.0 11%% 12%% Unless required by applicable law or agreed to in writing, software 13%% distributed under the License is distributed on an "AS IS" BASIS, 14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15%% See the License for the specific language governing permissions and 16%% limitations under the License. 17%% 18%% %CopyrightEnd% 19%% 20 21%% 22-module(mnesia_bup). 23-export([ 24 %% Public interface 25 iterate/4, 26 read_schema/2, 27 fallback_bup/0, 28 fallback_exists/0, 29 tm_fallback_start/1, 30 create_schema/1, 31 create_schema/2, 32 install_fallback/1, 33 install_fallback/2, 34 uninstall_fallback/0, 35 uninstall_fallback/1, 36 traverse_backup/4, 37 traverse_backup/6, 38 make_initial_backup/3, 39 fallback_to_schema/0, 40 lookup_schema/2, 41 schema2bup/1, 42 refresh_cookie/2, 43 44 %% Internal 45 fallback_receiver/2, 46 install_fallback_master/2, 47 uninstall_fallback_master/2, 48 local_uninstall_fallback/2, 49 do_traverse_backup/7, 50 trav_apply/5 51 ]). 52 53-include("mnesia.hrl"). 54-import(mnesia_lib, [verbose/2, dbg_out/2]). 55 56-record(restore, {mode, bup_module, bup_data}). 57 58-record(fallback_args, {opaque, 59 scope = global, 60 module = mnesia_monitor:get_env(backup_module), 61 use_default_dir = true, 62 mnesia_dir, 63 fallback_bup, 64 fallback_tmp, 65 skip_tables = [], 66 keep_tables = [], 67 default_op = keep_tables 68 }). 69 70-type fallback_args() :: #fallback_args{}. 71 72%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 73%% Backup iterator 74 75%% Reads schema section and iterates over all records in a backup. 76%% 77%% Fun(BunchOfRecords, Header, Schema, Acc) is applied when a suitable amount 78%% of records has been collected. 79%% 80%% BunchOfRecords will be [] when the iteration is done. 81iterate(Mod, Fun, Opaque, Acc) -> 82 R = #restore{bup_module = Mod, bup_data = Opaque}, 83 try read_schema_section(R) of 84 {R2, {Header, Schema, Rest}} -> 85 Ext = get_ext_types(Schema), 86 try iter(R2, Header, Schema, Ext, Fun, Acc, Rest) of 87 {ok, R3, Res} -> 88 close_read(R3), 89 {ok, Res} 90 catch throw:Err -> 91 close_read(R2), 92 Err; 93 _:Reason:Stacktrace -> 94 close_read(R2), 95 {error, {Reason, Stacktrace}} 96 end 97 catch throw:{error,_} = Err -> 98 Err 99 end. 100 101get_ext_types(Schema) -> 102 try 103 List = lookup_schema(schema, Schema), 104 case lists:keyfind(user_properties, 1, List) of 105 {_, Props} -> 106 proplists:get_value( 107 mnesia_backend_types, Props, []); 108 false -> 109 [] 110 end 111 catch 112 throw:{error, {"Cannot lookup",_}} -> 113 [] 114 end. 115 116iter(R, Header, Schema, Ext, Fun, Acc, []) -> 117 case safe_apply(R, read, [R#restore.bup_data]) of 118 {R2, []} -> 119 Res = Fun([], Header, Schema, Ext, Acc), 120 {ok, R2, Res}; 121 {R2, BupItems} -> 122 iter(R2, Header, Schema, Ext, Fun, Acc, BupItems) 123 end; 124iter(R, Header, Schema, Ext, Fun, Acc, BupItems) -> 125 Acc2 = Fun(BupItems, Header, Schema, Ext, Acc), 126 iter(R, Header, Schema, Ext, Fun, Acc2, []). 127 128-spec safe_apply(#restore{}, atom(), list()) -> tuple(). 129safe_apply(R, write, [_, Items]) when Items =:= [] -> 130 R; 131safe_apply(R, What, Args) -> 132 Abort = abort_restore_fun(R, What, Args), 133 Mod = R#restore.bup_module, 134 try apply(Mod, What, Args) of 135 {ok, Opaque, Items} when What =:= read -> 136 {R#restore{bup_data = Opaque}, Items}; 137 {ok, Opaque} when What =/= read-> 138 R#restore{bup_data = Opaque}; 139 {error, Re} -> 140 Abort(Re); 141 Re -> 142 Abort(Re) 143 catch _:Re -> 144 Abort(Re) 145 end. 146 147-spec abort_restore_fun(_, _, _) -> fun((_) -> no_return()). 148abort_restore_fun(R, What, Args) -> 149 fun(Re) -> abort_restore(R, What, Args, Re) end. 150 151abort_restore(R = #restore{bup_module=Mod}, What, Args, Reason) -> 152 dbg_out("Restore aborted. ~p:~p~p -> ~p~n", 153 [Mod, What, Args, Reason]), 154 close_read(R), 155 throw({error, Reason}). 156 157close_read(#restore{bup_module=Mod, bup_data=Opaque}) -> 158 ?SAFE(Mod:close_read(Opaque)). 159 160fallback_to_schema() -> 161 Fname = fallback_bup(), 162 fallback_to_schema(Fname). 163 164fallback_to_schema(Fname) -> 165 Mod = mnesia_backup, 166 case read_schema(Mod, Fname) of 167 {error, Reason} -> 168 {error, Reason}; 169 Schema -> 170 try lookup_schema(schema, Schema) of 171 List -> {ok, fallback, List} 172 catch throw:_ -> 173 {error, "No schema in fallback"} 174 end 175 end. 176 177%% Opens Opaque reads schema and then close 178read_schema(Mod, Opaque) -> 179 R = #restore{bup_module = Mod, bup_data = Opaque}, 180 try read_schema_section(R) of 181 {R2, {_Header, Schema, _}} -> 182 close_read(R2), 183 Schema 184 catch throw:{error,_} = Error -> 185 Error 186 end. 187 188%% Open backup media and extract schema 189%% rewind backup media and leave it open 190%% Returns {R, {Header, Schema}} 191read_schema_section(R) -> 192 {R2, {H, Schema, Rest}} = do_read_schema_section(R), 193 Schema2 = convert_schema(H#log_header.log_version, Schema), 194 {R2, {H, Schema2, Rest}}. 195 196do_read_schema_section(R) -> 197 R2 = safe_apply(R, open_read, [R#restore.bup_data]), 198 try 199 {R3, RawSchema} = safe_apply(R2, read, [R2#restore.bup_data]), 200 do_read_schema_section(R3, verify_header(RawSchema), []) 201 catch T:E:S -> 202 close_read(R2), 203 erlang:raise(T,E,S) 204 end. 205 206do_read_schema_section(R, {ok, B, C, []}, Acc) -> 207 case safe_apply(R, read, [R#restore.bup_data]) of 208 {R2, []} -> 209 {R2, {B, Acc, []}}; 210 {R2, RawSchema} -> 211 do_read_schema_section(R2, {ok, B, C, RawSchema}, Acc) 212 end; 213 214do_read_schema_section(R, {ok, B, C, [Head | Tail]}, Acc) 215 when element(1, Head) =:= schema -> 216 do_read_schema_section(R, {ok, B, C, Tail}, Acc ++ [Head]); 217 218do_read_schema_section(R, {ok, B, _C, Rest}, Acc) -> 219 {R, {B, Acc, Rest}}; 220 221do_read_schema_section(_R, {error, Reason}, _Acc) -> 222 throw({error, Reason}). 223 224verify_header([H | RawSchema]) when is_record(H, log_header) -> 225 Current = mnesia_log:backup_log_header(), 226 if 227 H#log_header.log_kind =:= Current#log_header.log_kind -> 228 Versions = ["0.1", "1.1", Current#log_header.log_version], 229 case lists:member(H#log_header.log_version, Versions) of 230 true -> 231 {ok, H, Current, RawSchema}; 232 false -> 233 {error, {"Bad header version. Cannot be used as backup.", H}} 234 end; 235 true -> 236 {error, {"Bad kind of header. Cannot be used as backup.", H}} 237 end; 238verify_header(RawSchema) -> 239 {error, {"Missing header. Cannot be used as backup.", ?CATCH(hd(RawSchema))}}. 240 241refresh_cookie(Schema, NewCookie) -> 242 case lists:keysearch(schema, 2, Schema) of 243 {value, {schema, schema, List}} -> 244 Cs = mnesia_schema:list2cs(List), 245 Cs2 = Cs#cstruct{cookie = NewCookie}, 246 Item = {schema, schema, mnesia_schema:cs2list(Cs2)}, 247 lists:keyreplace(schema, 2, Schema, Item); 248 249 false -> 250 Reason = "No schema found. Cannot be used as backup.", 251 throw({error, {Reason, Schema}}) 252 end. 253 254%% Convert schema items from an external backup 255%% If backup format is the latest, no conversion is needed 256%% All supported backup formats should have their converters 257%% here as separate function clauses. 258convert_schema("0.1", Schema) -> 259 convert_0_1(Schema); 260convert_schema("1.1", Schema) -> 261 %% The new backup format is a pure extension of the old one 262 Current = mnesia_log:backup_log_header(), 263 convert_schema(Current#log_header.log_version, Schema); 264convert_schema(Latest, Schema) -> 265 H = mnesia_log:backup_log_header(), 266 if 267 H#log_header.log_version =:= Latest -> 268 Schema; 269 true -> 270 Reason = "Bad backup header version. Cannot convert schema.", 271 throw({error, {Reason, H}}) 272 end. 273 274%% Backward compatibility for 0.1 275convert_0_1(Schema) -> 276 case lists:keysearch(schema, 2, Schema) of 277 {value, {schema, schema, List}} -> 278 Schema2 = lists:keydelete(schema, 2, Schema), 279 Cs = mnesia_schema:list2cs(List), 280 convert_0_1(Schema2, [], Cs); 281 false -> 282 List = mnesia_schema:get_initial_schema(disc_copies, [node()], []), 283 Cs = mnesia_schema:list2cs(List), 284 convert_0_1(Schema, [], Cs) 285 end. 286 287convert_0_1([{schema, cookie, Cookie} | Schema], Acc, Cs) -> 288 convert_0_1(Schema, Acc, Cs#cstruct{cookie = Cookie}); 289convert_0_1([{schema, db_nodes, DbNodes} | Schema], Acc, Cs) -> 290 convert_0_1(Schema, Acc, Cs#cstruct{disc_copies = DbNodes}); 291convert_0_1([{schema, version, Version} | Schema], Acc, Cs) -> 292 convert_0_1(Schema, Acc, Cs#cstruct{version = Version}); 293convert_0_1([{schema, Tab, Def} | Schema], Acc, Cs) -> 294 Head = 295 case lists:keysearch(index, 1, Def) of 296 {value, {index, PosList}} -> 297 %% Remove the snmp "index" 298 P = PosList -- [snmp], 299 Def2 = lists:keyreplace(index, 1, Def, {index, P}), 300 {schema, Tab, Def2}; 301 false -> 302 {schema, Tab, Def} 303 end, 304 convert_0_1(Schema, [Head | Acc], Cs); 305convert_0_1([Head | Schema], Acc, Cs) -> 306 convert_0_1(Schema, [Head | Acc], Cs); 307convert_0_1([], Acc, Cs) -> 308 [schema2bup({schema, schema, Cs}) | Acc]. 309 310%% Returns Val or throw error 311lookup_schema(Key, Schema) -> 312 case lists:keysearch(Key, 2, Schema) of 313 {value, {schema, Key, Val}} -> Val; 314 false -> throw({error, {"Cannot lookup", Key}}) 315 end. 316 317%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 318%% Backup compatibility 319 320%% Convert internal schema items to backup dito 321schema2bup({schema, Tab}) -> 322 {schema, Tab}; 323schema2bup({schema, Tab, TableDef}) -> 324 {schema, Tab, mnesia_schema:cs2list(TableDef)}. 325 326%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 327%% Create schema on the given nodes 328%% Requires that old schemas has been deleted 329%% Returns ok | {error, Reason} 330create_schema(Nodes) -> 331 create_schema(Nodes, []). 332 333create_schema([], Props) -> 334 create_schema([node()], Props); 335create_schema(Ns, Props) when is_list(Ns), is_list(Props) -> 336 case is_set(Ns) of 337 true -> 338 create_schema(Ns, mnesia_schema:ensure_no_schema(Ns), Props); 339 false -> 340 {error, {combine_error, Ns}} 341 end; 342create_schema(Ns, _Props) -> 343 {error, {badarg, Ns}}. 344 345is_set(List) when is_list(List) -> 346 ordsets:is_set(lists:sort(List)); 347is_set(_) -> 348 false. 349 350create_schema(Ns, ok, Props) -> 351 %% Ensure that we access the intended Mnesia 352 %% directory. This function may not be called 353 %% during startup since it will cause the 354 %% application_controller to get into deadlock 355 case mnesia_lib:ensure_loaded(?APPLICATION) of 356 ok -> 357 case mnesia_monitor:get_env(schema_location) of 358 ram -> 359 {error, {has_no_disc, node()}}; 360 _ -> 361 case mnesia_schema:opt_create_dir(true, mnesia_lib:dir()) of 362 {error, What} -> 363 {error, What}; 364 ok -> 365 Mod = mnesia_backup, 366 Str = mk_str(), 367 File = mnesia_lib:dir(Str), 368 file:delete(File), 369 try make_initial_backup(Ns, File, Mod, Props) of 370 {ok, _Res} -> 371 case do_install_fallback(File, Mod) of 372 ok -> 373 file:delete(File), 374 ok; 375 {error, Reason} -> 376 {error, Reason} 377 end 378 catch throw:{error, Reason} -> 379 {error, Reason} 380 end 381 end 382 end; 383 {error, Reason} -> 384 {error, Reason} 385 end; 386create_schema(_Ns, {error, Reason}, _) -> 387 {error, Reason}; 388create_schema(_Ns, Reason, _) -> 389 {error, Reason}. 390 391mk_str() -> 392 Now = integer_to_list(erlang:unique_integer([positive])), 393 lists:concat([node()] ++ Now ++ ".TMP"). 394 395make_initial_backup(Ns, Opaque, Mod) -> 396 make_initial_backup(Ns, Opaque, Mod, []). 397 398make_initial_backup(Ns, Opaque, Mod, Props) -> 399 Orig = mnesia_schema:get_initial_schema(disc_copies, Ns, Props), 400 Modded = proplists:delete(storage_properties, proplists:delete(majority, Orig)), 401 Schema = [{schema, schema, Modded}], 402 O2 = do_apply(Mod, open_write, [Opaque], Opaque), 403 O3 = do_apply(Mod, write, [O2, [mnesia_log:backup_log_header()]], O2), 404 O4 = do_apply(Mod, write, [O3, Schema], O3), 405 O5 = do_apply(Mod, commit_write, [O4], O4), 406 {ok, O5}. 407 408do_apply(_, write, [_, Items], Opaque) when Items =:= [] -> 409 Opaque; 410do_apply(Mod, What, Args, _Opaque) -> 411 try apply(Mod, What, Args) of 412 {ok, Opaque2} -> Opaque2; 413 {error, Reason} -> throw({error, Reason}) 414 catch _:Reason -> 415 throw({error, {'EXIT', Reason}}) 416 end. 417 418%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 419%% Restore 420 421%% Restore schema and possibly other tables from a backup 422%% and replicate them to the necessary nodes 423%% Requires that old schemas has been deleted 424%% Returns ok | {error, Reason} 425install_fallback(Opaque) -> 426 install_fallback(Opaque, []). 427 428install_fallback(Opaque, Args) -> 429 %% Ensure that we access the intended Mnesia 430 %% directory. This function may not be called 431 %% during startup since it will cause the 432 %% application_controller to get into deadlock 433 case mnesia_lib:ensure_loaded(?APPLICATION) of 434 ok -> 435 do_install_fallback(Opaque, Args); 436 {error, Reason} -> 437 {error, Reason} 438 end. 439 440do_install_fallback(Opaque, Mod) when is_atom(Mod) -> 441 do_install_fallback(Opaque, [{module, Mod}]); 442do_install_fallback(Opaque, Args) when is_list(Args) -> 443 case check_fallback_args(Args, #fallback_args{opaque = Opaque}) of 444 {ok, FA} -> 445 do_install_fallback(FA); 446 {error, Reason} -> 447 {error, Reason} 448 end; 449do_install_fallback(_Opaque, Args) -> 450 {error, {badarg, Args}}. 451 452check_fallback_args([Arg | Tail], FA) -> 453 try check_fallback_arg_type(Arg, FA) of 454 FA2 -> 455 check_fallback_args(Tail, FA2) 456 catch error:_ -> 457 {error, {badarg, Arg}} 458 end; 459check_fallback_args([], FA) -> 460 {ok, FA}. 461 462check_fallback_arg_type(Arg, FA) -> 463 case Arg of 464 {scope, global} -> 465 FA#fallback_args{scope = global}; 466 {scope, local} -> 467 FA#fallback_args{scope = local}; 468 {module, Mod} -> 469 Mod2 = mnesia_monitor:do_check_type(backup_module, Mod), 470 FA#fallback_args{module = Mod2}; 471 {mnesia_dir, Dir} -> 472 FA#fallback_args{mnesia_dir = Dir, 473 use_default_dir = false}; 474 {keep_tables, Tabs} -> 475 atom_list(Tabs), 476 FA#fallback_args{keep_tables = Tabs}; 477 {skip_tables, Tabs} -> 478 atom_list(Tabs), 479 FA#fallback_args{skip_tables = Tabs}; 480 {default_op, keep_tables} -> 481 FA#fallback_args{default_op = keep_tables}; 482 {default_op, skip_tables} -> 483 FA#fallback_args{default_op = skip_tables} 484 end. 485 486atom_list([H | T]) when is_atom(H) -> 487 atom_list(T); 488atom_list([]) -> 489 ok. 490 491do_install_fallback(FA) -> 492 Pid = spawn_link(?MODULE, install_fallback_master, [self(), FA]), 493 Res = 494 receive 495 {'EXIT', Pid, Reason} -> % if appl has trapped exit 496 {error, {'EXIT', Reason}}; 497 {Pid, Res2} -> 498 case Res2 of 499 {ok, _} -> 500 ok; 501 {error, Reason} -> 502 {error, {"Cannot install fallback", Reason}} 503 end 504 end, 505 Res. 506 507install_fallback_master(ClientPid, FA) -> 508 process_flag(trap_exit, true), 509 State = {start, FA}, 510 Opaque = FA#fallback_args.opaque, 511 Mod = FA#fallback_args.module, 512 Res = iterate(Mod, fun restore_recs/5, Opaque, State), 513 unlink(ClientPid), 514 ClientPid ! {self(), Res}, 515 exit(shutdown). 516 517restore_recs(_, _, _, _, stop) -> 518 throw({error, "restore_recs already stopped"}); 519 520restore_recs(Recs, Header, Schema, Ext, {start, FA}) -> 521 %% No records in backup 522 Schema2 = convert_schema(Header#log_header.log_version, Schema), 523 CreateList = lookup_schema(schema, Schema2), 524 try mnesia_schema:list2cs(CreateList) of 525 Cs -> 526 Ns = get_fallback_nodes(FA, Cs#cstruct.disc_copies), 527 global:set_lock({{mnesia_table_lock, schema}, self()}, Ns, infinity), 528 Args = [self(), FA], 529 Pids = [spawn_link(N, ?MODULE, fallback_receiver, Args) || N <- Ns], 530 send_fallback(Pids, {start, Header, Schema2}), 531 Res = restore_recs(Recs, Header, Schema2, Ext, Pids), 532 global:del_lock({{mnesia_table_lock, schema}, self()}, Ns), 533 Res 534 catch _:Reason -> 535 throw({error, {"Bad schema in restore_recs", Reason}}) 536 end; 537 538restore_recs([], _Header, _Schema, _Ext, Pids) -> 539 send_fallback(Pids, swap), 540 send_fallback(Pids, stop), 541 stop; 542 543restore_recs(Recs, _, _, _, Pids) -> 544 send_fallback(Pids, {records, Recs}), 545 Pids. 546 547get_fallback_nodes(FA, Ns) -> 548 This = node(), 549 case lists:member(This, Ns) of 550 true -> 551 case FA#fallback_args.scope of 552 global -> Ns; 553 local -> [This] 554 end; 555 false -> 556 throw({error, {"No disc resident schema on local node", Ns}}) 557 end. 558 559send_fallback(Pids, Msg) when is_list(Pids), Pids =/= [] -> 560 lists:foreach(fun(Pid) -> Pid ! {self(), Msg} end, Pids), 561 rec_answers(Pids, []). 562 563rec_answers([], Acc) -> 564 case {lists:keysearch(error, 1, Acc), mnesia_lib:uniq(Acc)} of 565 {{value, {error, Val}}, _} -> throw({error, Val}); 566 {_, [SameAnswer]} -> SameAnswer; 567 {_, Other} -> throw({error, {"Different answers", Other}}) 568 end; 569rec_answers(Pids, Acc) -> 570 receive 571 {'EXIT', Pid, stopped} -> 572 Pids2 = lists:delete(Pid, Pids), 573 rec_answers(Pids2, [stopped|Acc]); 574 {'EXIT', Pid, Reason} -> 575 Pids2 = lists:delete(Pid, Pids), 576 rec_answers(Pids2, [{error, {'EXIT', Pid, Reason}}|Acc]); 577 {Pid, Reply} -> 578 Pids2 = lists:delete(Pid, Pids), 579 rec_answers(Pids2, [Reply|Acc]) 580 end. 581 582fallback_exists() -> 583 Fname = fallback_bup(), 584 fallback_exists(Fname). 585 586fallback_exists(Fname) -> 587 case mnesia_monitor:use_dir() of 588 true -> 589 mnesia_lib:exists(Fname); 590 false -> 591 case ?catch_val(active_fallback) of 592 {'EXIT', _} -> false; 593 Bool -> Bool 594 end 595 end. 596 597fallback_name() -> "FALLBACK.BUP". 598fallback_bup() -> mnesia_lib:dir(fallback_name()). 599 600fallback_tmp_name() -> "FALLBACK.TMP". 601%% fallback_full_tmp_name() -> mnesia_lib:dir(fallback_tmp_name()). 602 603-spec fallback_receiver(pid(), fallback_args()) -> no_return(). 604fallback_receiver(Master, FA) -> 605 process_flag(trap_exit, true), 606 607 Res = try 608 register(mnesia_fallback, self()), 609 FA2 = check_fallback_dir(FA), 610 Bup = FA2#fallback_args.fallback_bup, 611 false = mnesia_lib:exists(Bup), 612 Mod = mnesia_backup, 613 Tmp = FA2#fallback_args.fallback_tmp, 614 R = #restore{mode = replace, 615 bup_module = Mod, 616 bup_data = Tmp}, 617 file:delete(Tmp), 618 fallback_receiver_loop(Master, R, FA2, schema) 619 catch 620 error:_ -> 621 Reason = {already_exists, node()}, 622 local_fallback_error(Master, Reason); 623 throw:{error, Reason} -> 624 local_fallback_error(Master, Reason) 625 end, 626 exit(Res). 627 628local_fallback_error(Master, Reason) -> 629 Master ! {self(), {error, Reason}}, 630 unlink(Master), 631 exit(Reason). 632 633 634check_fallback_dir(Master, FA) -> 635 try check_fallback_dir(FA) 636 catch throw:{error,Reason} -> 637 local_fallback_error(Master, Reason) 638 end. 639 640check_fallback_dir(FA) -> 641 case mnesia:system_info(schema_location) of 642 ram -> 643 Reason = {has_no_disc, node()}, 644 throw({error, Reason}); 645 _ -> 646 Dir = check_fallback_dir_arg(FA), 647 Bup = filename:join([Dir, fallback_name()]), 648 Tmp = filename:join([Dir, fallback_tmp_name()]), 649 FA#fallback_args{fallback_bup = Bup, 650 fallback_tmp = Tmp, 651 mnesia_dir = Dir} 652 end. 653 654check_fallback_dir_arg(FA) -> 655 case FA#fallback_args.use_default_dir of 656 true -> 657 mnesia_lib:dir(); 658 false when FA#fallback_args.scope =:= local -> 659 Dir = FA#fallback_args.mnesia_dir, 660 try mnesia_monitor:do_check_type(dir, Dir) 661 catch _:_ -> 662 Reason = {badarg, {dir, Dir}, node()}, 663 throw({error, Reason}) 664 end; 665 false when FA#fallback_args.scope =:= global -> 666 Reason = {combine_error, global, dir, node()}, 667 throw({error, Reason}) 668 end. 669 670fallback_receiver_loop(Master, R, FA, State) -> 671 receive 672 {Master, {start, Header, Schema}} when State =:= schema -> 673 Dir = FA#fallback_args.mnesia_dir, 674 throw_bad_res(ok, mnesia_schema:opt_create_dir(true, Dir)), 675 R2 = safe_apply(R, open_write, [R#restore.bup_data]), 676 R3 = safe_apply(R2, write, [R2#restore.bup_data, [Header]]), 677 BupSchema = [schema2bup(S) || S <- Schema], 678 R4 = safe_apply(R3, write, [R3#restore.bup_data, BupSchema]), 679 Master ! {self(), ok}, 680 fallback_receiver_loop(Master, R4, FA, records); 681 682 {Master, {records, Recs}} when State =:= records -> 683 R2 = safe_apply(R, write, [R#restore.bup_data, Recs]), 684 Master ! {self(), ok}, 685 fallback_receiver_loop(Master, R2, FA, records); 686 687 {Master, swap} when State =/= schema -> 688 ?eval_debug_fun({?MODULE, fallback_receiver_loop, pre_swap}, []), 689 safe_apply(R, commit_write, [R#restore.bup_data]), 690 Bup = FA#fallback_args.fallback_bup, 691 Tmp = FA#fallback_args.fallback_tmp, 692 throw_bad_res(ok, file:rename(Tmp, Bup)), 693 ?SAFE(mnesia_lib:set(active_fallback, true)), 694 ?eval_debug_fun({?MODULE, fallback_receiver_loop, post_swap}, []), 695 Master ! {self(), ok}, 696 fallback_receiver_loop(Master, R, FA, stop); 697 698 {Master, stop} when State =:= stop -> 699 stopped; 700 701 Msg -> 702 safe_apply(R, abort_write, [R#restore.bup_data]), 703 Tmp = FA#fallback_args.fallback_tmp, 704 file:delete(Tmp), 705 throw({error, "Unexpected msg fallback_receiver_loop", Msg}) 706 end. 707 708throw_bad_res(Expected, Expected) -> Expected; 709throw_bad_res(_Expected, {error, Actual}) -> throw({error, Actual}); 710throw_bad_res(_Expected, Actual) -> throw({error, Actual}). 711 712-record(local_tab, {name, 713 storage_type, 714 open, 715 add, 716 close, 717 swap, 718 record_name, 719 opened}). 720 721tm_fallback_start(IgnoreFallback) -> 722 mnesia_schema:lock_schema(), 723 Res = do_fallback_start(fallback_exists(), IgnoreFallback), 724 mnesia_schema:unlock_schema(), 725 case Res of 726 ok -> ok; 727 {error, Reason} -> exit(Reason) 728 end. 729 730do_fallback_start(false, _IgnoreFallback) -> 731 ok; 732do_fallback_start(true, true) -> 733 verbose("Ignoring fallback at startup, but leaving it active...~n", []), 734 mnesia_lib:set(active_fallback, true), 735 ok; 736do_fallback_start(true, false) -> 737 verbose("Starting from fallback...~n", []), 738 739 BupFile = fallback_bup(), 740 Mod = mnesia_backup, 741 LocalTabs = ?ets_new_table(mnesia_local_tables, [set, public, {keypos, 2}]), 742 case iterate(Mod, fun restore_tables/5, BupFile, {start, LocalTabs}) of 743 {ok, _Res} -> 744 ?SAFE(dets:close(schema)), 745 TmpSchema = mnesia_lib:tab2tmp(schema), 746 DatSchema = mnesia_lib:tab2dat(schema), 747 AllLT = ?ets_match_object(LocalTabs, '_'), 748 ?ets_delete_table(LocalTabs), 749 case file:rename(TmpSchema, DatSchema) of 750 ok -> 751 [(LT#local_tab.swap)(LT#local_tab.name, LT) || 752 LT <- AllLT, LT#local_tab.name =/= schema], 753 file:delete(BupFile), 754 ok; 755 {error, Reason} -> 756 file:delete(TmpSchema), 757 {error, {"Cannot start from fallback. Rename error.", Reason}} 758 end; 759 {error, Reason} -> 760 {error, {"Cannot start from fallback", Reason}} 761 end. 762 763restore_tables(All=[Rec | Recs], Header, Schema, Ext, 764 State={local, LocalTabs, LT}) -> 765 Tab = element(1, Rec), 766 if 767 Tab =:= LT#local_tab.name -> 768 Key = element(2, Rec), 769 (LT#local_tab.add)(Tab, Key, Rec, LT), 770 restore_tables(Recs, Header, Schema, Ext, State); 771 true -> 772 NewState = {new, LocalTabs}, 773 restore_tables(All, Header, Schema, Ext, NewState) 774 end; 775restore_tables(All=[Rec | Recs], Header, Schema, Ext, {new, LocalTabs}) -> 776 Tab = element(1, Rec), 777 case ?ets_lookup(LocalTabs, Tab) of 778 [] -> 779 State = {not_local, LocalTabs, Tab}, 780 restore_tables(Recs, Header, Schema, Ext, State); 781 [LT] when is_record(LT, local_tab) -> 782 State = {local, LocalTabs, LT}, 783 case LT#local_tab.opened of 784 true -> ignore; 785 false -> 786 (LT#local_tab.open)(Tab, LT), 787 ?ets_insert(LocalTabs,LT#local_tab{opened=true}) 788 end, 789 restore_tables(All, Header, Schema, Ext, State) 790 end; 791restore_tables(All=[Rec | Recs], Header, Schema, Ext, 792 S = {not_local, LocalTabs, PrevTab}) -> 793 Tab = element(1, Rec), 794 if 795 Tab =:= PrevTab -> 796 restore_tables(Recs, Header, Schema, Ext, S); 797 true -> 798 State = {new, LocalTabs}, 799 restore_tables(All, Header, Schema, Ext, State) 800 end; 801restore_tables(Recs, Header, Schema, Ext, {start, LocalTabs}) -> 802 Dir = mnesia_lib:dir(), 803 OldDir = filename:join([Dir, "OLD_DIR"]), 804 mnesia_schema:purge_dir(OldDir, []), 805 mnesia_schema:purge_dir(Dir, [fallback_name()]), 806 init_dat_files(Schema, Ext, LocalTabs), 807 State = {new, LocalTabs}, 808 restore_tables(Recs, Header, Schema, Ext, State); 809restore_tables([], _Header, _Schema, _Ext, State) -> 810 State. 811 812%% Creates all neccessary dat files and inserts 813%% the table definitions in the schema table 814%% 815%% Returns a list of local_tab tuples for all local tables 816init_dat_files(Schema, Ext, LocalTabs) -> 817 TmpFile = mnesia_lib:tab2tmp(schema), 818 Args = [{file, TmpFile}, {keypos, 2}, {type, set}], 819 case dets:open_file(schema, Args) of % Assume schema lock 820 {ok, _} -> 821 create_dat_files(Schema, Ext, LocalTabs), 822 ok = dets:close(schema), 823 LocalTab = #local_tab{name = schema, 824 storage_type = disc_copies, 825 open = undefined, 826 add = undefined, 827 close = undefined, 828 swap = undefined, 829 record_name = schema, 830 opened = false}, 831 ?ets_insert(LocalTabs, LocalTab); 832 {error, Reason} -> 833 throw({error, {"Cannot open file", schema, Args, Reason}}) 834 end. 835 836create_dat_files([{schema, schema, TabDef} | Tail], Ext, LocalTabs) -> 837 ok = dets:insert(schema, {schema, schema, TabDef}), 838 create_dat_files(Tail, Ext, LocalTabs); 839create_dat_files([{schema, Tab, TabDef} | Tail], Ext, LocalTabs) -> 840 TmpFile = mnesia_lib:tab2tmp(Tab), 841 DatFile = mnesia_lib:tab2dat(Tab), 842 DclFile = mnesia_lib:tab2dcl(Tab), 843 DcdFile = mnesia_lib:tab2dcd(Tab), 844 Expunge = fun() -> 845 file:delete(DatFile), 846 file:delete(DclFile), 847 file:delete(DcdFile) 848 end, 849 850 mnesia_lib:dets_sync_close(Tab), 851 file:delete(TmpFile), 852 Cs = mnesia_schema:list2cs(TabDef, Ext), 853 ok = dets:insert(schema, {schema, Tab, TabDef}), 854 RecName = Cs#cstruct.record_name, 855 Storage = mnesia_lib:cs_to_storage_type(node(), Cs), 856 delete_ext(Storage, Tab), 857 Semantics = mnesia_lib:semantics(Storage, storage), 858 if 859 Semantics =:= undefined -> 860 ok = dets:delete(schema, {schema, Tab}), 861 create_dat_files(Tail, Ext, LocalTabs); 862 Semantics =:= disc_only_copies -> 863 Open = disc_only_open_fun(Storage, Cs), 864 Add = disc_only_add_fun(Storage, Cs), 865 Close = disc_only_close_fun(Storage), 866 Swap = disc_only_swap_fun(Storage, Expunge, Open, Close), 867 LocalTab = #local_tab{name = Tab, 868 storage_type = Storage, 869 open = Open, 870 add = Add, 871 close = Close, 872 swap = Swap, 873 record_name = RecName, 874 opened = false}, 875 ?ets_insert(LocalTabs, LocalTab), 876 create_dat_files(Tail, Ext, LocalTabs); 877 Semantics =:= ram_copies; Storage =:= disc_copies -> 878 Open = fun(T, LT) when T =:= LT#local_tab.name -> 879 mnesia_log:open_log({?MODULE, T}, 880 mnesia_log:dcl_log_header(), 881 TmpFile, 882 false, 883 false, 884 read_write) 885 end, 886 Add = fun(T, Key, Rec, LT) when T =:= LT#local_tab.name -> 887 Log = {?MODULE, T}, 888 case Rec of 889 {_T, Key} -> 890 mnesia_log:append(Log, {{T, Key}, {T, Key}, delete}); 891 (Rec) when T =:= RecName -> 892 mnesia_log:append(Log, {{T, Key}, Rec, write}); 893 (Rec) -> 894 Rec2 = setelement(1, Rec, RecName), 895 mnesia_log:append(Log, {{T, Key}, Rec2, write}) 896 end 897 end, 898 Close = fun(T, LT) when T =:= LT#local_tab.name -> 899 mnesia_log:close_log({?MODULE, T}) 900 end, 901 Swap = fun(T, LT) when T =:= LT#local_tab.name -> 902 Expunge(), 903 if 904 Storage =:= ram_copies, LT#local_tab.opened =:= false -> 905 ok; 906 true -> 907 Log = mnesia_log:open_log(fallback_tab, 908 mnesia_log:dcd_log_header(), 909 DcdFile, 910 false), 911 mnesia_log:close_log(Log), 912 case LT#local_tab.opened of 913 true -> 914 Close(T,LT); 915 false -> 916 Open(T,LT), 917 Close(T,LT) 918 end, 919 case file:rename(TmpFile, DclFile) of 920 ok -> 921 ok; 922 {error, Reason} -> 923 mnesia_lib:fatal("Cannot rename file ~tp -> ~tp: ~tp~n", 924 [TmpFile, DclFile, Reason]) 925 end 926 end 927 end, 928 LocalTab = #local_tab{name = Tab, 929 storage_type = Storage, 930 open = Open, 931 add = Add, 932 close = Close, 933 swap = Swap, 934 record_name = RecName, 935 opened = false 936 }, 937 ?ets_insert(LocalTabs, LocalTab), 938 create_dat_files(Tail, Ext, LocalTabs); 939 true -> 940 error({unknown_semantics, [{semantics, Semantics}, 941 {tabdef, TabDef}, 942 {ext, Ext}]}) 943 end; 944create_dat_files([{schema, Tab} | Tail], Ext, LocalTabs) -> 945 ?ets_delete(LocalTabs, Tab), 946 ok = dets:delete(schema, {schema, Tab}), 947 TmpFile = mnesia_lib:tab2tmp(Tab), 948 mnesia_lib:dets_sync_close(Tab), 949 file:delete(TmpFile), 950 create_dat_files(Tail, Ext, LocalTabs); 951create_dat_files([], _Ext, _LocalTabs) -> 952 ok. 953 954delete_ext({ext, Alias, Mod}, Tab) -> 955 Mod:close_table(Alias, Tab), 956 Mod:delete_table(Alias, Tab), 957 ok; 958delete_ext(_, _) -> 959 ok. 960 961 962disc_only_open_fun(disc_only_copies, #cstruct{name = Tab} =Cs) -> 963 TmpFile = mnesia_lib:tab2tmp(Tab), 964 Args = [{file, TmpFile}, {keypos, 2}, 965 {type, mnesia_lib:disk_type(Tab, Cs#cstruct.type)}], 966 fun(T, LT) when T =:= LT#local_tab.name -> 967 case mnesia_lib:dets_sync_open(T, Args) of 968 {ok, _} -> 969 ok; 970 {error, Reason} -> 971 throw({error, {"Cannot open file", T, Args, Reason}}) 972 end 973 end; 974disc_only_open_fun({ext,Alias,Mod}, Cs) -> 975 fun(T, LT) when T =:= LT#local_tab.name -> 976 ok = Mod:load_table(Alias, T, restore, mnesia_schema:cs2list(Cs)) 977 end. 978 979disc_only_add_fun(Storage, #cstruct{name = Tab, 980 record_name = RecName}) -> 981 fun(T, Key, Rec, #local_tab{name = T}) when T =:= Tab-> 982 case Rec of 983 {_T, Key} -> 984 ok = mnesia_lib:db_erase(Storage, T, Key); 985 (Rec) when T =:= RecName -> 986 ok = mnesia_lib:db_put(Storage, T, Rec); 987 (Rec) -> 988 ok = mnesia_lib:db_put(Storage, T, 989 setelement(1, Rec, RecName)) 990 end 991 end. 992 993disc_only_close_fun(disc_only_copies) -> 994 fun(T, LT) when T =:= LT#local_tab.name -> 995 mnesia_lib:dets_sync_close(T) 996 end; 997disc_only_close_fun({ext, Alias, Mod}) -> 998 fun(T, _LT) -> 999 Mod:sync_close_table(Alias, T) 1000 end. 1001 1002 1003disc_only_swap_fun(disc_only_copies, Expunge, Open, Close) -> 1004 fun(T, LT) when T =:= LT#local_tab.name -> 1005 TmpFile = mnesia_lib:tab2tmp(T), 1006 DatFile = mnesia_lib:tab2dat(T), 1007 Expunge(), 1008 case LT#local_tab.opened of 1009 true -> 1010 Close(T,LT); 1011 false -> 1012 Open(T,LT), 1013 Close(T,LT) 1014 end, 1015 case file:rename(TmpFile, DatFile) of 1016 ok -> 1017 ok; 1018 {error, Reason} -> 1019 mnesia_lib:fatal("Cannot rename file ~tp -> ~tp: ~tp~n", 1020 [TmpFile, DatFile, Reason]) 1021 end 1022 end; 1023disc_only_swap_fun({ext, _Alias, _Mod}, _Expunge, _Open, Close) -> 1024 fun(T, #local_tab{name = T} = LT) -> 1025 Close(T, LT) 1026 end. 1027 1028 1029uninstall_fallback() -> 1030 uninstall_fallback([{scope, global}]). 1031 1032uninstall_fallback(Args) -> 1033 case check_fallback_args(Args, #fallback_args{}) of 1034 {ok, FA} -> 1035 do_uninstall_fallback(FA); 1036 {error, Reason} -> 1037 {error, Reason} 1038 end. 1039 1040do_uninstall_fallback(FA) -> 1041 %% Ensure that we access the intended Mnesia 1042 %% directory. This function may not be called 1043 %% during startup since it will cause the 1044 %% application_controller to get into deadlock 1045 case mnesia_lib:ensure_loaded(?APPLICATION) of 1046 ok -> 1047 Pid = spawn_link(?MODULE, uninstall_fallback_master, [self(), FA]), 1048 receive 1049 {'EXIT', Pid, Reason} -> % if appl has trapped exit 1050 {error, {'EXIT', Reason}}; 1051 {Pid, Res} -> 1052 Res 1053 end; 1054 {error, Reason} -> 1055 {error, Reason} 1056 end. 1057 1058-spec uninstall_fallback_master(pid(), fallback_args()) -> no_return(). 1059uninstall_fallback_master(ClientPid, FA) -> 1060 process_flag(trap_exit, true), 1061 1062 FA2 = check_fallback_dir(ClientPid, FA), % May exit 1063 Bup = FA2#fallback_args.fallback_bup, 1064 case fallback_to_schema(Bup) of 1065 {ok, fallback, List} -> 1066 Cs = mnesia_schema:list2cs(List), 1067 try get_fallback_nodes(FA, Cs#cstruct.disc_copies) of 1068 Ns when is_list(Ns) -> 1069 do_uninstall(ClientPid, Ns, FA) 1070 catch throw:{error, Reason} -> 1071 local_fallback_error(ClientPid, Reason) 1072 end; 1073 {error, Reason} -> 1074 local_fallback_error(ClientPid, Reason) 1075 end. 1076 1077do_uninstall(ClientPid, Ns, FA) -> 1078 Args = [self(), FA], 1079 global:set_lock({{mnesia_table_lock, schema}, self()}, Ns, infinity), 1080 Pids = [spawn_link(N, ?MODULE, local_uninstall_fallback, Args) || N <- Ns], 1081 Res = do_uninstall(ClientPid, Pids, [], [], ok), 1082 global:del_lock({{mnesia_table_lock, schema}, self()}, Ns), 1083 ClientPid ! {self(), Res}, 1084 unlink(ClientPid), 1085 exit(shutdown). 1086 1087do_uninstall(ClientPid, [Pid | Pids], GoodPids, BadNodes, Res) -> 1088 receive 1089 %% {'EXIT', ClientPid, _} -> 1090 %% client_exit; 1091 {'EXIT', Pid, Reason} -> 1092 BadNode = node(Pid), 1093 BadRes = {error, {"Uninstall fallback", BadNode, Reason}}, 1094 do_uninstall(ClientPid, Pids, GoodPids, [BadNode | BadNodes], BadRes); 1095 {Pid, {error, Reason}} -> 1096 BadNode = node(Pid), 1097 BadRes = {error, {"Uninstall fallback", BadNode, Reason}}, 1098 do_uninstall(ClientPid, Pids, GoodPids, [BadNode | BadNodes], BadRes); 1099 {Pid, started} -> 1100 do_uninstall(ClientPid, Pids, [Pid | GoodPids], BadNodes, Res) 1101 end; 1102do_uninstall(ClientPid, [], GoodPids, [], ok) -> 1103 lists:foreach(fun(Pid) -> Pid ! {self(), do_uninstall} end, GoodPids), 1104 rec_uninstall(ClientPid, GoodPids, ok); 1105do_uninstall(_ClientPid, [], GoodPids, BadNodes, BadRes) -> 1106 lists:foreach(fun(Pid) -> exit(Pid, shutdown) end, GoodPids), 1107 {error, {node_not_running, BadNodes, BadRes}}. 1108 1109local_uninstall_fallback(Master, FA) -> 1110 %% Don't trap exit 1111 1112 register(mnesia_fallback, self()), % May exit 1113 FA2 = check_fallback_dir(Master, FA), % May exit 1114 Master ! {self(), started}, 1115 1116 receive 1117 {Master, do_uninstall} -> 1118 ?eval_debug_fun({?MODULE, uninstall_fallback2, pre_delete}, []), 1119 ?SAFE(mnesia_lib:set(active_fallback, false)), 1120 Tmp = FA2#fallback_args.fallback_tmp, 1121 Bup = FA2#fallback_args.fallback_bup, 1122 file:delete(Tmp), 1123 Res = file:delete(Bup), 1124 unregister(mnesia_fallback), 1125 ?eval_debug_fun({?MODULE, uninstall_fallback2, post_delete}, []), 1126 Master ! {self(), Res}, 1127 unlink(Master), 1128 exit(normal) 1129 end. 1130 1131rec_uninstall(ClientPid, [Pid | Pids], AccRes) -> 1132 receive 1133 %% {'EXIT', ClientPid, _} -> 1134 %% exit(shutdown); 1135 {'EXIT', Pid, R} -> 1136 Reason = {node_not_running, {node(Pid), R}}, 1137 rec_uninstall(ClientPid, Pids, {error, Reason}); 1138 {Pid, ok} -> 1139 rec_uninstall(ClientPid, Pids, AccRes); 1140 {Pid, BadRes} -> 1141 rec_uninstall(ClientPid, Pids, BadRes) 1142 end; 1143rec_uninstall(_, [], Res) -> 1144 Res. 1145 1146%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 1147%% Backup traversal 1148 1149%% Iterate over a backup and produce a new backup. 1150%% Fun(BackupItem, Acc) is applied for each BackupItem. 1151%% 1152%% Valid BackupItems are: 1153%% 1154%% {schema, Tab} Table to be deleted 1155%% {schema, Tab, CreateList} Table to be created, CreateList may be empty 1156%% {schema, db_nodes, DbNodes}List of nodes, defaults to [node()] OLD 1157%% {schema, version, Version} Schema version OLD 1158%% {schema, cookie, Cookie} Unique schema cookie OLD 1159%% {Tab, Key} Oid for record to be deleted 1160%% Record Record to be inserted. 1161%% 1162%% The Fun must return a tuple {BackupItems, NewAcc} 1163%% where BackupItems is a list of valid BackupItems and 1164%% NewAcc is a new accumulator value. Once BackupItems 1165%% that not are schema related has been returned, no more schema 1166%% items may be returned. The schema related items must always be 1167%% first in the backup. 1168%% 1169%% If TargetMod =:= read_only, no new backup will be created. 1170%% 1171%% Opening of the source media will be performed by 1172%% to SourceMod:open_read(Source) 1173%% 1174%% Opening of the target media will be performed by 1175%% to TargetMod:open_write(Target) 1176traverse_backup(Source, Target, Fun, Acc) -> 1177 Mod = mnesia_monitor:get_env(backup_module), 1178 traverse_backup(Source, Mod, Target, Mod, Fun, Acc). 1179 1180traverse_backup(Source, SourceMod, Target, TargetMod, Fun, Acc) -> 1181 Args = [self(), Source, SourceMod, Target, TargetMod, Fun, Acc], 1182 Pid = spawn_link(?MODULE, do_traverse_backup, Args), 1183 receive 1184 {'EXIT', Pid, Reason} -> 1185 {error, {"Backup traversal crashed", Reason}}; 1186 {iter_done, Pid, Res} -> 1187 Res 1188 end. 1189 1190do_traverse_backup(ClientPid, Source, SourceMod, Target, TargetMod, Fun, Acc) -> 1191 process_flag(trap_exit, true), 1192 Iter = 1193 if 1194 TargetMod =/= read_only -> 1195 try do_apply(TargetMod, open_write, [Target], Target) 1196 catch throw:{error, Error} -> 1197 unlink(ClientPid), 1198 ClientPid ! {iter_done, self(), {error, Error}}, 1199 exit(Error) 1200 end; 1201 true -> 1202 ignore 1203 end, 1204 A = {start, Fun, Acc, TargetMod, Iter}, 1205 Res = 1206 case iterate(SourceMod, fun trav_apply/5, Source, A) of 1207 {ok, {iter, _, Acc2, _, Iter2}} when TargetMod =/= read_only -> 1208 try 1209 do_apply(TargetMod, commit_write, [Iter2], Iter2), 1210 {ok, Acc2} 1211 catch throw:{error, Reason} -> 1212 {error, Reason} 1213 end; 1214 {ok, {iter, _, Acc2, _, _}} -> 1215 {ok, Acc2}; 1216 {error, Reason} when TargetMod =/= read_only-> 1217 ?CATCH(do_apply(TargetMod, abort_write, [Iter], Iter)), 1218 {error, {"Backup traversal failed", Reason}}; 1219 {error, Reason} -> 1220 {error, {"Backup traversal failed", Reason}} 1221 end, 1222 unlink(ClientPid), 1223 ClientPid ! {iter_done, self(), Res}. 1224 1225trav_apply(Recs, _Header, _Schema, _Ext, {iter, Fun, Acc, Mod, Iter}) -> 1226 {NewRecs, Acc2} = filter_foldl(Fun, Acc, Recs), 1227 if 1228 Mod =/= read_only, NewRecs =/= [] -> 1229 Iter2 = do_apply(Mod, write, [Iter, NewRecs], Iter), 1230 {iter, Fun, Acc2, Mod, Iter2}; 1231 true -> 1232 {iter, Fun, Acc2, Mod, Iter} 1233 end; 1234trav_apply(Recs, Header, Schema, Ext, {start, Fun, Acc, Mod, Iter}) -> 1235 Iter2 = 1236 if 1237 Mod =/= read_only -> 1238 do_apply(Mod, write, [Iter, [Header]], Iter); 1239 true -> 1240 Iter 1241 end, 1242 TravAcc = trav_apply(Schema, Header, Schema, Ext, 1243 {iter, Fun, Acc, Mod, Iter2}), 1244 trav_apply(Recs, Header, Schema, Ext, TravAcc). 1245 1246filter_foldl(Fun, Acc, [Head|Tail]) -> 1247 case Fun(Head, Acc) of 1248 {HeadItems, HeadAcc} when is_list(HeadItems) -> 1249 {TailItems, TailAcc} = filter_foldl(Fun, HeadAcc, Tail), 1250 {HeadItems ++ TailItems, TailAcc}; 1251 Other -> 1252 throw({error, {"Fun must return a list", Other}}) 1253 end; 1254filter_foldl(_Fun, Acc, []) -> 1255 {[], Acc}. 1256 1257