1%% 2%% %CopyrightBegin% 3%% 4%% Copyright Ericsson AB 1996-2018. All Rights Reserved. 5%% 6%% Licensed under the Apache License, Version 2.0 (the "License"); 7%% you may not use this file except in compliance with the License. 8%% You may obtain a copy of the License at 9%% 10%% http://www.apache.org/licenses/LICENSE-2.0 11%% 12%% Unless required by applicable law or agreed to in writing, software 13%% distributed under the License is distributed on an "AS IS" BASIS, 14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15%% See the License for the specific language governing permissions and 16%% limitations under the License. 17%% 18%% %CopyrightEnd% 19%% 20-module(dets). 21 22%% Disk based linear hashing lookup dictionary. 23 24%% Public. 25-export([all/0, 26 bchunk/2, 27 close/1, 28 delete/2, 29 delete_all_objects/1, 30 delete_object/2, 31 first/1, 32 foldl/3, 33 foldr/3, 34 from_ets/2, 35 info/1, 36 info/2, 37 init_table/2, 38 init_table/3, 39 insert/2, 40 insert_new/2, 41 is_compatible_bchunk_format/2, 42 is_dets_file/1, 43 lookup/2, 44 match/1, 45 match/2, 46 match/3, 47 match_delete/2, 48 match_object/1, 49 match_object/2, 50 match_object/3, 51 member/2, 52 next/2, 53 open_file/1, 54 open_file/2, 55 pid2name/1, 56 repair_continuation/2, 57 safe_fixtable/2, 58 select/1, 59 select/2, 60 select/3, 61 select_delete/2, 62 slot/2, 63 sync/1, 64 table/1, 65 table/2, 66 to_ets/2, 67 traverse/2, 68 update_counter/3]). 69 70%% Server export. 71-export([start/0, stop/0]). 72 73%% Internal exports. 74-export([istart_link/1, init/2, internal_open/3, add_user/3, 75 internal_close/1, remove_user/2, 76 system_continue/3, system_terminate/4, system_code_change/4]). 77 78%% Debug. 79-export([file_info/1, 80 fsck/1, 81 fsck/2, 82 get_head_field/2, 83 view/1, 84 where/2, 85 verbose/0, 86 verbose/1 87 ]). 88 89%% Not documented, or not ready for publication. 90-export([lookup_keys/2]). 91 92-export_type([bindings_cont/0, cont/0, object_cont/0, select_cont/0, 93 tab_name/0]). 94 95-compile({inline, [{einval,2},{badarg,2},{undefined,1}, 96 {badarg_exit,2},{lookup_reply,2}]}). 97 98-include_lib("kernel/include/file.hrl"). 99 100-include("dets.hrl"). 101 102%%% This is the implementation of the mnesia file storage. Each (non 103%%% ram-copy) table is maintained in a corresponding .DAT file. The 104%%% dat file is organized as a segmented linear hashlist. The head of 105%%% the file with the split indicator, size etc is held in ram by the 106%%% server at all times. 107%%% 108 109%% The method of hashing is the so called linear hashing algorithm 110%% with segments. 111%% 112%% Linear hashing: 113%% 114%% - n indicates next bucket to split (initially zero); 115%% - m is the size of the hash table 116%% - initially next = m and n = 0 117%% 118%% - to insert: 119%% - hash = key mod m 120%% - if hash < n then hash = key mod 2m 121%% - when the number of objects exceeds the initial size 122%% of the hash table, each insertion of an object 123%% causes bucket n to be split: 124%% - add a new bucket to the end of the table 125%% - redistribute the contents of bucket n 126%% using hash = key mod 2m 127%% - increment n 128%% - if n = m then m = 2m, n = 0 129%% - to search: 130%% hash = key mod m 131%% if hash < n then hash = key mod 2m 132%% do linear scan of the bucket 133%% 134 135%%% If a file error occurs on a working dets file, update_mode is set 136%%% to the error tuple. When in 'error' mode, the free lists are not 137%%% written, and a repair is forced next time the file is opened. 138 139-record(dets_cont, { 140 what :: 'undefined' | 'bchunk' | 'bindings' | 'object' | 'select', 141 no_objs :: 'default' | pos_integer(), % requested number of objects 142 bin :: 'eof' | binary(), % small chunk not consumed, 143 % or 'eof' at end-of-file 144 alloc :: binary() % the part of the file not yet scanned 145 | {From :: non_neg_integer(), 146 To :: non_neg_integer, 147 binary()}, 148 tab :: tab_name(), 149 proc :: 'undefined' | pid(), % the pid of the Dets process 150 match_program :: 'true' 151 | 'undefined' 152 | {'match_spec', ets:comp_match_spec()} 153 }). 154 155-record(open_args, { 156 file :: list(), 157 type :: type(), 158 keypos :: keypos(), 159 repair :: 'force' | boolean(), 160 min_no_slots :: no_slots(), 161 max_no_slots :: no_slots(), 162 ram_file :: boolean(), 163 delayed_write :: cache_parms(), 164 auto_save :: auto_save(), 165 access :: access(), 166 debug :: boolean() 167 }). 168 169-define(PATTERN_TO_OBJECT_MATCH_SPEC(Pat), [{Pat,[],['$_']}]). 170-define(PATTERN_TO_BINDINGS_MATCH_SPEC(Pat), [{Pat,[],['$$']}]). 171-define(PATTERN_TO_TRUE_MATCH_SPEC(Pat), [{Pat,[],[true]}]). 172 173%%-define(DEBUGM(X, Y), io:format(X, Y)). 174-define(DEBUGM(X, Y), true). 175 176%%-define(DEBUGF(X,Y), io:format(X, Y)). 177-define(DEBUGF(X,Y), void). 178 179%%-define(PROFILE(C), C). 180-define(PROFILE(C), void). 181 182-opaque bindings_cont() :: #dets_cont{}. 183-opaque cont() :: #dets_cont{}. 184-type match_spec() :: ets:match_spec(). 185-type object() :: tuple(). 186-opaque object_cont() :: #dets_cont{}. 187-type pattern() :: atom() | tuple(). 188-opaque select_cont() :: #dets_cont{}. 189 190%%% Some further debug code was added in R12B-1 (stdlib-1.15.1): 191%%% - there is a new open_file() option 'debug'; 192%%% - there is a new OS environment variable 'DETS_DEBUG'; 193%%% - verbose(true) implies that info messages are written onto 194%%% the error log whenever an unsafe traversal is started. 195%%% The 'debug' mode (set by the open_file() option 'debug' or 196%%% by os:putenv("DETS_DEBUG", "true")) implies that the results of 197%%% calling pwrite() and pread() are tested to some extent. It also 198%%% means a considerable overhead when it comes to RAM usage. The 199%%% operation of Dets is also slowed down a bit. Note that in debug 200%%% mode terms will be output on the error logger. 201 202%%%---------------------------------------------------------------------- 203%%% API 204%%%---------------------------------------------------------------------- 205 206add_user(Pid, Tab, Args) -> 207 req(Pid, {add_user, Tab, Args}). 208 209-spec all() -> [tab_name()]. 210 211all() -> 212 dets_server:all(). 213 214-spec bchunk(Name, Continuation) -> 215 {Continuation2, Data} | '$end_of_table' | {'error', Reason} when 216 Name :: tab_name(), 217 Continuation :: 'start' | cont(), 218 Continuation2 :: cont(), 219 Data :: binary() | tuple(), 220 Reason :: term(). 221 222bchunk(Tab, start) -> 223 badarg(treq(Tab, {bchunk_init, Tab}), [Tab, start]); 224bchunk(Tab, #dets_cont{what = bchunk, tab = Tab} = State) -> 225 badarg(treq(Tab, {bchunk, State}), [Tab, State]); 226bchunk(Tab, Term) -> 227 erlang:error(badarg, [Tab, Term]). 228 229-spec close(Name) -> 'ok' | {'error', Reason} when 230 Name :: tab_name(), 231 Reason :: term(). 232 233close(Tab) -> 234 case dets_server:close(Tab) of 235 badarg -> % Should not happen. 236 {error, not_owner}; % Backwards compatibility... 237 Reply -> 238 Reply 239 end. 240 241-spec delete(Name, Key) -> 'ok' | {'error', Reason} when 242 Name :: tab_name(), 243 Key :: term(), 244 Reason :: term(). 245 246delete(Tab, Key) -> 247 badarg(treq(Tab, {delete_key, [Key]}), [Tab, Key]). 248 249-spec delete_all_objects(Name) -> 'ok' | {'error', Reason} when 250 Name :: tab_name(), 251 Reason :: term(). 252 253delete_all_objects(Tab) -> 254 case treq(Tab, delete_all_objects) of 255 badarg -> 256 erlang:error(badarg, [Tab]); 257 fixed -> 258 match_delete(Tab, '_'); 259 Reply -> 260 Reply 261 end. 262 263-spec delete_object(Name, Object) -> 'ok' | {'error', Reason} when 264 Name :: tab_name(), 265 Object :: object(), 266 Reason :: term(). 267 268delete_object(Tab, O) -> 269 badarg(treq(Tab, {delete_object, [O]}), [Tab, O]). 270 271%% Backwards compatibility. 272fsck(Fname, _Version) -> 273 fsck(Fname). 274 275%% Given a filename, fsck it. Debug. 276fsck(Fname) -> 277 catch begin 278 {ok, Fd, FH} = read_file_header(Fname, read, false), 279 ?DEBUGF("FileHeader: ~p~n", [FH]), 280 case dets_v9:check_file_header(FH, Fd) of 281 {error, not_closed} -> 282 fsck(Fd, make_ref(), Fname, FH, default, default); 283 {ok, _Head} -> 284 fsck(Fd, make_ref(), Fname, FH, default, default); 285 Error -> 286 Error 287 end 288 end. 289 290-spec first(Name) -> Key | '$end_of_table' when 291 Name :: tab_name(), 292 Key :: term(). 293 294first(Tab) -> 295 badarg_exit(treq(Tab, first), [Tab]). 296 297-spec foldr(Function, Acc0, Name) -> Acc | {'error', Reason} when 298 Name :: tab_name(), 299 Function :: fun((Object :: object(), AccIn) -> AccOut), 300 Acc0 :: term(), 301 Acc :: term(), 302 AccIn :: term(), 303 AccOut :: term(), 304 Reason :: term(). 305 306foldr(Fun, Acc, Tab) -> 307 foldl(Fun, Acc, Tab). 308 309-spec foldl(Function, Acc0, Name) -> Acc | {'error', Reason} when 310 Name :: tab_name(), 311 Function :: fun((Object :: object(), AccIn) -> AccOut), 312 Acc0 :: term(), 313 Acc :: term(), 314 AccIn :: term(), 315 AccOut :: term(), 316 Reason :: term(). 317 318foldl(Fun, Acc, Tab) -> 319 Ref = make_ref(), 320 badarg(do_traverse(Fun, Acc, Tab, Ref), [Fun, Acc, Tab]). 321 322-spec from_ets(Name, EtsTab) -> 'ok' | {'error', Reason} when 323 Name :: tab_name(), 324 EtsTab :: ets:tab(), 325 Reason :: term(). 326 327from_ets(DTab, ETab) -> 328 ets:safe_fixtable(ETab, true), 329 Spec = ?PATTERN_TO_OBJECT_MATCH_SPEC('_'), 330 LC = ets:select(ETab, Spec, 100), 331 InitFun = from_ets_fun(LC, ETab), 332 Reply = treq(DTab, {initialize, InitFun, term, default}), 333 ets:safe_fixtable(ETab, false), 334 case Reply of 335 {thrown, Thrown} -> throw(Thrown); 336 Else -> badarg(Else, [DTab, ETab]) 337 end. 338 339from_ets_fun(LC, ETab) -> 340 fun(close) -> 341 ok; 342 (read) when LC =:= '$end_of_table' -> 343 end_of_input; 344 (read) -> 345 {L, C} = LC, 346 {L, from_ets_fun(ets:select(C), ETab)} 347 end. 348 349-spec info(Name) -> InfoList | 'undefined' when 350 Name :: tab_name(), 351 InfoList :: [InfoTuple], 352 InfoTuple :: {'file_size', non_neg_integer()} 353 | {'filename', file:name()} 354 | {'keypos', keypos()} 355 | {'size', non_neg_integer()} 356 | {'type', type()}. 357 358info(Tab) -> 359 case catch dets_server:get_pid(Tab) of 360 {'EXIT', _Reason} -> 361 undefined; 362 Pid -> 363 undefined(req(Pid, info)) 364 end. 365 366-spec info(Name, Item) -> Value | 'undefined' when 367 Name :: tab_name(), 368 Item :: 'access' | 'auto_save' | 'bchunk_format' 369 | 'hash' | 'file_size' | 'filename' | 'keypos' | 'memory' 370 | 'no_keys' | 'no_objects' | 'no_slots' | 'owner' | 'ram_file' 371 | 'safe_fixed' | 'safe_fixed_monotonic_time' | 'size' | 'type', 372 Value :: term(). 373 374info(Tab, owner) -> 375 case catch dets_server:get_pid(Tab) of 376 Pid when is_pid(Pid) -> 377 Pid; 378 _ -> 379 undefined 380 end; 381info(Tab, users) -> % undocumented 382 case dets_server:users(Tab) of 383 [] -> 384 undefined; 385 Users -> 386 Users 387 end; 388info(Tab, Tag) -> 389 case catch dets_server:get_pid(Tab) of 390 {'EXIT', _Reason} -> 391 undefined; 392 Pid -> 393 undefined(req(Pid, {info, Tag})) 394 end. 395 396-spec init_table(Name, InitFun) -> ok | {'error', Reason} when 397 Name :: tab_name(), 398 InitFun :: fun((Arg) -> Res), 399 Arg :: read | close, 400 Res :: end_of_input | {[object()], InitFun} | {Data, InitFun} | term(), 401 Reason :: term(), 402 Data :: binary() | tuple(). 403 404init_table(Tab, InitFun) -> 405 init_table(Tab, InitFun, []). 406 407-spec init_table(Name, InitFun, Options) -> ok | {'error', Reason} when 408 Name :: tab_name(), 409 InitFun :: fun((Arg) -> Res), 410 Arg :: read | close, 411 Res :: end_of_input | {[object()], InitFun} | {Data, InitFun} | term(), 412 Options :: Option | [Option], 413 Option :: {min_no_slots,no_slots()} | {format,term | bchunk}, 414 Reason :: term(), 415 Data :: binary() | tuple(). 416 417init_table(Tab, InitFun, Options) when is_function(InitFun) -> 418 case options(Options, [format, min_no_slots]) of 419 {badarg,_} -> 420 erlang:error(badarg, [Tab, InitFun, Options]); 421 [Format, MinNoSlots] -> 422 case treq(Tab, {initialize, InitFun, Format, MinNoSlots}) of 423 {thrown, Thrown} -> throw(Thrown); 424 Else -> badarg(Else, [Tab, InitFun, Options]) 425 end 426 end; 427init_table(Tab, InitFun, Options) -> 428 erlang:error(badarg, [Tab, InitFun, Options]). 429 430-spec insert(Name, Objects) -> 'ok' | {'error', Reason} when 431 Name :: tab_name(), 432 Objects :: object() | [object()], 433 Reason :: term(). 434 435insert(Tab, Objs) when is_list(Objs) -> 436 badarg(treq(Tab, {insert, Objs}), [Tab, Objs]); 437insert(Tab, Obj) -> 438 badarg(treq(Tab, {insert, [Obj]}), [Tab, Obj]). 439 440-spec insert_new(Name, Objects) -> boolean() | {'error', Reason} when 441 Name :: tab_name(), 442 Objects :: object() | [object()], 443 Reason :: term(). 444 445insert_new(Tab, Objs) when is_list(Objs) -> 446 badarg(treq(Tab, {insert_new, Objs}), [Tab, Objs]); 447insert_new(Tab, Obj) -> 448 badarg(treq(Tab, {insert_new, [Obj]}), [Tab, Obj]). 449 450internal_close(Pid) -> 451 req(Pid, close). 452 453internal_open(Pid, Ref, Args) -> 454 req(Pid, {internal_open, Ref, Args}). 455 456-spec is_compatible_bchunk_format(Name, BchunkFormat) -> boolean() when 457 Name :: tab_name(), 458 BchunkFormat :: binary(). 459 460is_compatible_bchunk_format(Tab, Term) -> 461 badarg(treq(Tab, {is_compatible_bchunk_format, Term}), [Tab, Term]). 462 463-spec is_dets_file(Filename) -> boolean() | {'error', Reason} when 464 Filename :: file:name(), 465 Reason :: term(). 466 467is_dets_file(FileName) -> 468 case catch read_file_header(FileName, read, false) of 469 {ok, Fd, FH} -> 470 _ = file:close(Fd), 471 FH#fileheader.cookie =:= ?MAGIC; 472 {error, {tooshort, _}} -> 473 false; 474 {error, {not_a_dets_file, _}} -> 475 false; 476 Other -> 477 Other 478 end. 479 480-spec lookup(Name, Key) -> Objects | {'error', Reason} when 481 Name :: tab_name(), 482 Key :: term(), 483 Objects :: [object()], 484 Reason :: term(). 485 486lookup(Tab, Key) -> 487 badarg(treq(Tab, {lookup_keys, [Key]}), [Tab, Key]). 488 489%% Not public. 490lookup_keys(Tab, Keys) -> 491 case catch lists:usort(Keys) of 492 UKeys when is_list(UKeys), UKeys =/= [] -> 493 badarg(treq(Tab, {lookup_keys, UKeys}), [Tab, Keys]); 494 _Else -> 495 erlang:error(badarg, [Tab, Keys]) 496 end. 497 498-spec match(Name, Pattern) -> [Match] | {'error', Reason} when 499 Name :: tab_name(), 500 Pattern :: pattern(), 501 Match :: [term()], 502 Reason :: term(). 503 504match(Tab, Pat) -> 505 badarg(safe_match(Tab, Pat, bindings), [Tab, Pat]). 506 507-spec match(Name, Pattern, N) -> 508 {[Match], Continuation} | '$end_of_table' | {'error', Reason} when 509 Name :: tab_name(), 510 Pattern :: pattern(), 511 N :: 'default' | non_neg_integer(), 512 Continuation :: bindings_cont(), 513 Match :: [term()], 514 Reason :: term(). 515 516match(Tab, Pat, N) -> 517 badarg(init_chunk_match(Tab, Pat, bindings, N, no_safe), [Tab, Pat, N]). 518 519-spec match(Continuation) -> 520 {[Match], Continuation2} | '$end_of_table' | {'error', Reason} when 521 Continuation :: bindings_cont(), 522 Continuation2 :: bindings_cont(), 523 Match :: [term()], 524 Reason :: term(). 525 526match(State) when State#dets_cont.what =:= bindings -> 527 badarg(chunk_match(State, no_safe), [State]); 528match(Term) -> 529 erlang:error(badarg, [Term]). 530 531-spec match_delete(Name, Pattern) -> 'ok' | {'error', Reason} when 532 Name :: tab_name(), 533 Pattern :: pattern(), 534 Reason :: term(). 535 536match_delete(Tab, Pat) -> 537 badarg(match_delete(Tab, Pat, delete), [Tab, Pat]). 538 539match_delete(Tab, Pat, What) -> 540 case compile_match_spec(What, Pat) of 541 {Spec, MP} -> 542 case catch dets_server:get_pid(Tab) of 543 {'EXIT', _Reason} -> 544 badarg; 545 Proc -> 546 R = req(Proc, {match_delete_init, MP, Spec}), 547 do_match_delete(Proc, R, What, 0) 548 end; 549 badarg -> 550 badarg 551 end. 552 553do_match_delete(_Proc, {done, N1}, select, N) -> 554 N + N1; 555do_match_delete(_Proc, {done, _N1}, _What, _N) -> 556 ok; 557do_match_delete(Proc, {cont, State, N1}, What, N) -> 558 do_match_delete(Proc, req(Proc, {match_delete, State}), What, N+N1); 559do_match_delete(_Proc, Error, _What, _N) -> 560 Error. 561 562-spec match_object(Name, Pattern) -> Objects | {'error', Reason} when 563 Name :: tab_name(), 564 Pattern :: pattern(), 565 Objects :: [object()], 566 Reason :: term(). 567 568match_object(Tab, Pat) -> 569 badarg(safe_match(Tab, Pat, object), [Tab, Pat]). 570 571-spec match_object(Name, Pattern, N) -> 572 {Objects, Continuation} | '$end_of_table' | {'error', Reason} when 573 Name :: tab_name(), 574 Pattern :: pattern(), 575 N :: 'default' | non_neg_integer(), 576 Continuation :: object_cont(), 577 Objects :: [object()], 578 Reason :: term(). 579 580match_object(Tab, Pat, N) -> 581 badarg(init_chunk_match(Tab, Pat, object, N, no_safe), [Tab, Pat, N]). 582 583-spec match_object(Continuation) -> 584 {Objects, Continuation2} | '$end_of_table' | {'error', Reason} when 585 Continuation :: object_cont(), 586 Continuation2 :: object_cont(), 587 Objects :: [object()], 588 Reason :: term(). 589 590match_object(State) when State#dets_cont.what =:= object -> 591 badarg(chunk_match(State, no_safe), [State]); 592match_object(Term) -> 593 erlang:error(badarg, [Term]). 594 595-spec member(Name, Key) -> boolean() | {'error', Reason} when 596 Name :: tab_name(), 597 Key :: term(), 598 Reason :: term(). 599 600member(Tab, Key) -> 601 badarg(treq(Tab, {member, Key}), [Tab, Key]). 602 603-spec next(Name, Key1) -> Key2 | '$end_of_table' when 604 Name :: tab_name(), 605 Key1 :: term(), 606 Key2 :: term(). 607 608next(Tab, Key) -> 609 badarg_exit(treq(Tab, {next, Key}), [Tab, Key]). 610 611-spec open_file(Filename) -> {'ok', Reference} | {'error', Reason} when 612 Filename :: file:name(), 613 Reference :: reference(), 614 Reason :: term(). 615 616%% Assuming that a file already exists, open it with the 617%% parameters as already specified in the file itself. 618%% Return a ref leading to the file. 619open_file(File0) -> 620 File = to_list(File0), 621 case is_list(File) of 622 true -> 623 case dets_server:open_file(File) of 624 badarg -> % Should not happen. 625 erlang:error(dets_process_died, [File]); 626 Reply -> 627 einval(Reply, [File]) 628 end; 629 false -> 630 erlang:error(badarg, [File0]) 631 end. 632 633-spec open_file(Name, Args) -> {'ok', Name} | {'error', Reason} when 634 Name :: tab_name(), 635 Args :: [OpenArg], 636 OpenArg :: {'access', access()} 637 | {'auto_save', auto_save()} 638 | {'estimated_no_objects', non_neg_integer()} 639 | {'file', file:name()} 640 | {'max_no_slots', no_slots()} 641 | {'min_no_slots', no_slots()} 642 | {'keypos', keypos()} 643 | {'ram_file', boolean()} 644 | {'repair', boolean() | 'force'} 645 | {'type', type()}, 646 Reason :: term(). 647 648open_file(Tab, Args) when is_list(Args) -> 649 case catch defaults(Tab, Args) of 650 OpenArgs when is_record(OpenArgs, open_args) -> 651 case dets_server:open_file(Tab, OpenArgs) of 652 badarg -> % Should not happen. 653 erlang:error(dets_process_died, [Tab, Args]); 654 Reply -> 655 einval(Reply, [Tab, Args]) 656 end; 657 _ -> 658 erlang:error(badarg, [Tab, Args]) 659 end; 660open_file(Tab, Arg) -> 661 open_file(Tab, [Arg]). 662 663-spec pid2name(Pid) -> {'ok', Name} | 'undefined' when 664 Pid :: pid(), 665 Name :: tab_name(). 666 667pid2name(Pid) -> 668 dets_server:pid2name(Pid). 669 670remove_user(Pid, From) -> 671 req(Pid, {close, From}). 672 673-spec repair_continuation(Continuation, MatchSpec) -> Continuation2 when 674 Continuation :: select_cont(), 675 Continuation2 :: select_cont(), 676 MatchSpec :: match_spec(). 677 678repair_continuation(#dets_cont{match_program = {match_spec, B}}=Cont, MS) -> 679 case ets:is_compiled_ms(B) of 680 true -> 681 Cont; 682 false -> 683 Cont#dets_cont{match_program = {match_spec, 684 ets:match_spec_compile(MS)}} 685 end; 686repair_continuation(#dets_cont{}=Cont, _MS) -> 687 Cont; 688repair_continuation(T, MS) -> 689 erlang:error(badarg, [T, MS]). 690 691-spec safe_fixtable(Name, Fix) -> 'ok' when 692 Name :: tab_name(), 693 Fix :: boolean(). 694 695safe_fixtable(Tab, Bool) when Bool; not Bool -> 696 badarg(treq(Tab, {safe_fixtable, Bool}), [Tab, Bool]); 697safe_fixtable(Tab, Term) -> 698 erlang:error(badarg, [Tab, Term]). 699 700-spec select(Name, MatchSpec) -> Selection | {'error', Reason} when 701 Name :: tab_name(), 702 MatchSpec :: match_spec(), 703 Selection :: [term()], 704 Reason :: term(). 705 706select(Tab, Pat) -> 707 badarg(safe_match(Tab, Pat, select), [Tab, Pat]). 708 709-spec select(Name, MatchSpec, N) -> 710 {Selection, Continuation} | '$end_of_table' | {'error', Reason} when 711 Name :: tab_name(), 712 MatchSpec :: match_spec(), 713 N :: 'default' | non_neg_integer(), 714 Continuation :: select_cont(), 715 Selection :: [term()], 716 Reason :: term(). 717 718select(Tab, Pat, N) -> 719 badarg(init_chunk_match(Tab, Pat, select, N, no_safe), [Tab, Pat, N]). 720 721-spec select(Continuation) -> 722 {Selection, Continuation2} | '$end_of_table' | {'error', Reason} when 723 Continuation :: select_cont(), 724 Continuation2 :: select_cont(), 725 Selection :: [term()], 726 Reason :: term(). 727 728select(State) when State#dets_cont.what =:= select -> 729 badarg(chunk_match(State, no_safe), [State]); 730select(Term) -> 731 erlang:error(badarg, [Term]). 732 733-spec select_delete(Name, MatchSpec) -> N | {'error', Reason} when 734 Name :: tab_name(), 735 MatchSpec :: match_spec(), 736 N :: non_neg_integer(), 737 Reason :: term(). 738 739select_delete(Tab, Pat) -> 740 badarg(match_delete(Tab, Pat, select), [Tab, Pat]). 741 742-spec slot(Name, I) -> '$end_of_table' | Objects | {'error', Reason} when 743 Name :: tab_name(), 744 I :: non_neg_integer(), 745 Objects :: [object()], 746 Reason :: term(). 747 748slot(Tab, Slot) when is_integer(Slot), Slot >= 0 -> 749 badarg(treq(Tab, {slot, Slot}), [Tab, Slot]); 750slot(Tab, Term) -> 751 erlang:error(badarg, [Tab, Term]). 752 753start() -> 754 dets_server:start(). 755 756stop() -> 757 dets_server:stop(). 758 759istart_link(Server) -> 760 {ok, proc_lib:spawn_link(dets, init, [self(), Server])}. 761 762-spec sync(Name) -> 'ok' | {'error', Reason} when 763 Name :: tab_name(), 764 Reason :: term(). 765 766sync(Tab) -> 767 badarg(treq(Tab, sync), [Tab]). 768 769-spec table(Name) -> QueryHandle when 770 Name :: tab_name(), 771 QueryHandle :: qlc:query_handle(). 772 773table(Tab) -> 774 table(Tab, []). 775 776-spec table(Name, Options) -> QueryHandle when 777 Name :: tab_name(), 778 Options :: Option | [Option], 779 Option :: {'n_objects', Limit} 780 | {'traverse', TraverseMethod}, 781 Limit :: 'default' | pos_integer(), 782 TraverseMethod :: 'first_next' | 'select' | {'select', match_spec()}, 783 QueryHandle :: qlc:query_handle(). 784 785table(Tab, Opts) -> 786 case options(Opts, [traverse, n_objects]) of 787 {badarg,_} -> 788 erlang:error(badarg, [Tab, Opts]); 789 [Traverse, NObjs] -> 790 TF = case Traverse of 791 first_next -> 792 fun() -> qlc_next(Tab, first(Tab)) end; 793 select -> 794 fun(MS) -> qlc_select(select(Tab, MS, NObjs)) end; 795 {select, MS} -> 796 fun() -> qlc_select(select(Tab, MS, NObjs)) end 797 end, 798 PreFun = fun(_) -> safe_fixtable(Tab, true) end, 799 PostFun = fun() -> safe_fixtable(Tab, false) end, 800 InfoFun = fun(Tag) -> table_info(Tab, Tag) end, 801 %% lookup_keys is not public, but convenient 802 LookupFun = 803 case Traverse of 804 {select, _MS} -> 805 undefined; 806 _ -> 807 fun(_KeyPos, [K]) -> lookup(Tab, K); 808 (_KeyPos, Ks) -> lookup_keys(Tab, Ks) 809 end 810 end, 811 FormatFun = 812 fun({all, _NElements, _ElementFun}) -> 813 As = [Tab | [Opts || _ <- [[]], Opts =/= []]], 814 {?MODULE, table, As}; 815 ({match_spec, MS}) -> 816 {?MODULE, table, [Tab, [{traverse, {select, MS}} | 817 listify(Opts)]]}; 818 ({lookup, _KeyPos, [Value], _NElements, ElementFun}) -> 819 io_lib:format("~w:lookup(~w, ~w)", 820 [?MODULE, Tab, ElementFun(Value)]); 821 ({lookup, _KeyPos, Values, _NElements, ElementFun}) -> 822 Vals = [ElementFun(V) || V <- Values], 823 io_lib:format("lists:flatmap(fun(V) -> " 824 "~w:lookup(~w, V) end, ~w)", 825 [?MODULE, Tab, Vals]) 826 end, 827 qlc:table(TF, [{pre_fun, PreFun}, {post_fun, PostFun}, 828 {info_fun, InfoFun}, {format_fun, FormatFun}, 829 {key_equality, '=:='}, 830 {lookup_fun, LookupFun}]) 831 end. 832 833qlc_next(_Tab, '$end_of_table') -> 834 []; 835qlc_next(Tab, Key) -> 836 case lookup(Tab, Key) of 837 Objects when is_list(Objects) -> 838 Objects ++ fun() -> qlc_next(Tab, next(Tab, Key)) end; 839 Error -> 840 %% Do what first and next do. 841 exit(Error) 842 end. 843 844qlc_select('$end_of_table') -> 845 []; 846qlc_select({Objects, Cont}) when is_list(Objects) -> 847 Objects ++ fun() -> qlc_select(select(Cont)) end; 848qlc_select(Error) -> 849 Error. 850 851table_info(Tab, num_of_objects) -> 852 info(Tab, size); 853table_info(Tab, keypos) -> 854 info(Tab, keypos); 855table_info(Tab, is_unique_objects) -> 856 info(Tab, type) =/= duplicate_bag; 857table_info(_Tab, _) -> 858 undefined. 859 860%% End of table/2. 861 862-spec to_ets(Name, EtsTab) -> EtsTab | {'error', Reason} when 863 Name :: tab_name(), 864 EtsTab :: ets:tab(), 865 Reason :: term(). 866 867to_ets(DTab, ETab) -> 868 case ets:info(ETab, protection) of 869 undefined -> 870 erlang:error(badarg, [DTab, ETab]); 871 _ -> 872 Fun = fun(X, T) -> true = ets:insert(T, X), T end, 873 foldl(Fun, ETab, DTab) 874 end. 875 876-spec traverse(Name, Fun) -> Return | {'error', Reason} when 877 Name :: tab_name(), 878 Fun :: fun((Object) -> FunReturn), 879 Object :: object(), 880 FunReturn :: 'continue' 881 | {'continue', Val} 882 | {'done', Value} 883 | OtherValue, 884 Return :: [term()] | OtherValue, 885 Val :: term(), 886 Value :: term(), 887 OtherValue :: term(), 888 Reason :: term(). 889 890traverse(Tab, Fun) -> 891 Ref = make_ref(), 892 TFun = 893 fun(O, Acc) -> 894 case Fun(O) of 895 continue -> 896 Acc; 897 {continue, Val} -> 898 [Val | Acc]; 899 {done, Value} -> 900 throw({Ref, [Value | Acc]}); 901 Other -> 902 throw({Ref, Other}) 903 end 904 end, 905 badarg(do_traverse(TFun, [], Tab, Ref), [Tab, Fun]). 906 907-spec update_counter(Name, Key, Increment) -> Result when 908 Name :: tab_name(), 909 Key :: term(), 910 Increment :: {Pos, Incr} | Incr, 911 Pos :: integer(), 912 Incr :: integer(), 913 Result :: integer(). 914 915update_counter(Tab, Key, C) -> 916 badarg(treq(Tab, {update_counter, Key, C}), [Tab, Key, C]). 917 918verbose() -> 919 verbose(true). 920 921verbose(What) -> 922 ok = dets_server:verbose(What), 923 All = dets_server:all(), 924 Fun = fun(Tab) -> treq(Tab, {set_verbose, What}) end, 925 lists:foreach(Fun, All), 926 All. 927 928%% Where in the (open) table is Object located? 929%% The address of the first matching object is returned. 930%% Format 9 returns the address of the object collection. 931%% -> {ok, Address} | false 932where(Tab, Object) -> 933 badarg(treq(Tab, {where, Object}), [Tab, Object]). 934 935do_traverse(Fun, Acc, Tab, Ref) -> 936 case catch dets_server:get_pid(Tab) of 937 {'EXIT', _Reason} -> 938 badarg; 939 Proc -> 940 try 941 do_trav(Proc, Acc, Fun) 942 catch {Ref, Result} -> 943 Result 944 end 945 end. 946 947do_trav(Proc, Acc, Fun) -> 948 {Spec, MP} = compile_match_spec(object, '_'), 949 %% MP not used 950 case req(Proc, {match, MP, Spec, default, safe}) of 951 {cont, State} -> 952 do_trav(State, Proc, Acc, Fun); 953 Error -> 954 Error 955 end. 956 957do_trav(State, Proc, Acc, Fun) -> 958 case req(Proc, {match_init, State, safe}) of 959 '$end_of_table'-> 960 Acc; 961 {cont, {Bins, NewState}} -> 962 do_trav_bins(NewState, Proc, Acc, Fun, lists:reverse(Bins)); 963 Error -> 964 Error 965 end. 966 967do_trav_bins(State, Proc, Acc, Fun, []) -> 968 do_trav(State, Proc, Acc, Fun); 969do_trav_bins(State, Proc, Acc, Fun, [Bin | Bins]) -> 970 %% Unpack one binary at a time, using the client's heap. 971 case catch binary_to_term(Bin) of 972 {'EXIT', _} -> 973 req(Proc, {corrupt, dets_utils:bad_object(do_trav_bins, Bin)}); 974 Term -> 975 NewAcc = Fun(Term, Acc), 976 do_trav_bins(State, Proc, NewAcc, Fun, Bins) 977 end. 978 979safe_match(Tab, Pat, What) -> 980 do_safe_match(init_chunk_match(Tab, Pat, What, default, safe), []). 981 982do_safe_match({error, Error}, _L) -> 983 {error, Error}; 984do_safe_match({L, C}, LL) -> 985 do_safe_match(chunk_match(C, safe), L++LL); 986do_safe_match('$end_of_table', L) -> 987 L; 988do_safe_match(badarg, _L) -> 989 badarg. 990 991%% What = object | bindings | select 992init_chunk_match(Tab, Pat, What, N, Safe) when is_integer(N), N >= 0; 993 N =:= default -> 994 case compile_match_spec(What, Pat) of 995 {Spec, MP} -> 996 case catch dets_server:get_pid(Tab) of 997 {'EXIT', _Reason} -> 998 badarg; 999 Proc -> 1000 case req(Proc, {match, MP, Spec, N, Safe}) of 1001 {done, L} -> 1002 {L, #dets_cont{tab = Tab, proc = Proc, 1003 what = What, bin = eof, 1004 no_objs = default, 1005 alloc = <<>>}}; 1006 {cont, State} -> 1007 chunk_match(State#dets_cont{what = What, 1008 tab = Tab, 1009 proc = Proc}, 1010 Safe); 1011 Error -> 1012 Error 1013 end 1014 end; 1015 badarg -> 1016 badarg 1017 end; 1018init_chunk_match(_Tab, _Pat, _What, _N, _Safe) -> 1019 badarg. 1020 1021chunk_match(#dets_cont{proc = Proc}=State, Safe) -> 1022 case req(Proc, {match_init, State, Safe}) of 1023 '$end_of_table'=Reply -> 1024 Reply; 1025 {cont, {Bins, NewState}} -> 1026 MP = NewState#dets_cont.match_program, 1027 case catch do_foldl_bins(Bins, MP) of 1028 {'EXIT', _} -> 1029 case ets:is_compiled_ms(MP) of 1030 true -> 1031 Bad = dets_utils:bad_object(chunk_match, Bins), 1032 req(Proc, {corrupt, Bad}); 1033 false -> 1034 badarg 1035 end; 1036 [] -> 1037 chunk_match(NewState, Safe); 1038 Terms -> 1039 {Terms, NewState} 1040 end; 1041 Error -> 1042 Error 1043 end. 1044 1045do_foldl_bins(Bins, true) -> 1046 foldl_bins(Bins, []); 1047do_foldl_bins(Bins, {match_spec, MP}) -> 1048 foldl_bins(Bins, MP, []). 1049 1050foldl_bins([], Terms) -> 1051 %% Preserve time order. 1052 Terms; 1053foldl_bins([Bin | Bins], Terms) -> 1054 foldl_bins(Bins, [binary_to_term(Bin) | Terms]). 1055 1056foldl_bins([], _MP, Terms) -> 1057 %% Preserve time order. 1058 Terms; 1059foldl_bins([Bin | Bins], MP, Terms) -> 1060 Term = binary_to_term(Bin), 1061 case ets:match_spec_run([Term], MP) of 1062 [] -> 1063 foldl_bins(Bins, MP, Terms); 1064 [Result] -> 1065 foldl_bins(Bins, MP, [Result | Terms]) 1066 end. 1067 1068%% -> {Spec, binary()} | badarg 1069compile_match_spec(select, ?PATTERN_TO_OBJECT_MATCH_SPEC('_') = Spec) -> 1070 {Spec, true}; 1071compile_match_spec(select, Spec) -> 1072 try {Spec, {match_spec, ets:match_spec_compile(Spec)}} 1073 catch error:_ -> badarg 1074 end; 1075compile_match_spec(object, Pat) -> 1076 compile_match_spec(select, ?PATTERN_TO_OBJECT_MATCH_SPEC(Pat)); 1077compile_match_spec(bindings, Pat) -> 1078 compile_match_spec(select, ?PATTERN_TO_BINDINGS_MATCH_SPEC(Pat)); 1079compile_match_spec(delete, Pat) -> 1080 compile_match_spec(select, ?PATTERN_TO_TRUE_MATCH_SPEC(Pat)). 1081 1082%% Process the args list as provided to open_file/2. 1083defaults(Tab, Args) -> 1084 Defaults0 = #open_args{file = to_list(Tab), 1085 type = set, 1086 keypos = 1, 1087 repair = true, 1088 min_no_slots = default, 1089 max_no_slots = default, 1090 ram_file = false, 1091 delayed_write = ?DEFAULT_CACHE, 1092 auto_save = timer:minutes(?DEFAULT_AUTOSAVE), 1093 access = read_write, 1094 debug = false}, 1095 Fun = fun repl/2, 1096 Defaults = lists:foldl(Fun, Defaults0, Args), 1097 true = is_list(Defaults#open_args.file), 1098 is_comp_min_max(Defaults). 1099 1100to_list(T) when is_atom(T) -> atom_to_list(T); 1101to_list(T) -> T. 1102 1103repl({access, A}, Defs) -> 1104 mem(A, [read, read_write]), 1105 Defs#open_args{access = A}; 1106repl({auto_save, Int}, Defs) when is_integer(Int), Int >= 0 -> 1107 Defs#open_args{auto_save = Int}; 1108repl({auto_save, infinity}, Defs) -> 1109 Defs#open_args{auto_save =infinity}; 1110repl({cache_size, Int}, Defs) when is_integer(Int), Int >= 0 -> 1111 %% Recognized, but ignored. 1112 Defs; 1113repl({cache_size, infinity}, Defs) -> 1114 Defs; 1115repl({delayed_write, default}, Defs) -> 1116 Defs#open_args{delayed_write = ?DEFAULT_CACHE}; 1117repl({delayed_write, {Delay,Size} = C}, Defs) 1118 when is_integer(Delay), Delay >= 0, is_integer(Size), Size >= 0 -> 1119 Defs#open_args{delayed_write = C}; 1120repl({estimated_no_objects, I}, Defs) -> 1121 repl({min_no_slots, I}, Defs); 1122repl({file, File}, Defs) -> 1123 Defs#open_args{file = to_list(File)}; 1124repl({keypos, P}, Defs) when is_integer(P), P > 0 -> 1125 Defs#open_args{keypos =P}; 1126repl({max_no_slots, I}, Defs) -> 1127 MaxSlots = is_max_no_slots(I), 1128 Defs#open_args{max_no_slots = MaxSlots}; 1129repl({min_no_slots, I}, Defs) -> 1130 MinSlots = is_min_no_slots(I), 1131 Defs#open_args{min_no_slots = MinSlots}; 1132repl({ram_file, Bool}, Defs) -> 1133 mem(Bool, [true, false]), 1134 Defs#open_args{ram_file = Bool}; 1135repl({repair, T}, Defs) -> 1136 mem(T, [true, false, force]), 1137 Defs#open_args{repair = T}; 1138repl({type, T}, Defs) -> 1139 mem(T, [set, bag, duplicate_bag]), 1140 Defs#open_args{type =T}; 1141repl({version, Version}, Defs) -> 1142 %% Backwards compatibility. 1143 is_version(Version), 1144 Defs; 1145repl({debug, Bool}, Defs) -> 1146 %% Not documented. 1147 mem(Bool, [true, false]), 1148 Defs#open_args{debug = Bool}; 1149repl({_, _}, _) -> 1150 exit(badarg). 1151 1152is_min_no_slots(default) -> default; 1153is_min_no_slots(I) when is_integer(I), I >= ?DEFAULT_MIN_NO_SLOTS -> I; 1154is_min_no_slots(I) when is_integer(I), I >= 0 -> ?DEFAULT_MIN_NO_SLOTS. 1155 1156is_max_no_slots(default) -> default; 1157is_max_no_slots(I) when is_integer(I), I > 0, I < 1 bsl 31 -> I. 1158 1159is_comp_min_max(Defs) -> 1160 #open_args{max_no_slots = Max, min_no_slots = Min} = Defs, 1161 if 1162 Min =:= default -> Defs; 1163 Max =:= default -> Defs; 1164 true -> true = Min =< Max, Defs 1165 end. 1166 1167is_version(default) -> true; 1168is_version(9) -> true. 1169 1170mem(X, L) -> 1171 case lists:member(X, L) of 1172 true -> true; 1173 false -> exit(badarg) 1174 end. 1175 1176options(Options, Keys) when is_list(Options) -> 1177 options(Options, Keys, []); 1178options(Option, Keys) -> 1179 options([Option], Keys, []). 1180 1181options(Options, [Key | Keys], L) when is_list(Options) -> 1182 V = case lists:keysearch(Key, 1, Options) of 1183 {value, {format, Format}} when Format =:= term; 1184 Format =:= bchunk -> 1185 {ok, Format}; 1186 {value, {min_no_slots, I}} -> 1187 case catch is_min_no_slots(I) of 1188 {'EXIT', _} -> badarg; 1189 MinNoSlots -> {ok, MinNoSlots} 1190 end; 1191 {value, {n_objects, default}} -> 1192 {ok, default_option(Key)}; 1193 {value, {n_objects, NObjs}} when is_integer(NObjs), 1194 NObjs >= 1 -> 1195 {ok, NObjs}; 1196 {value, {traverse, select}} -> 1197 {ok, select}; 1198 {value, {traverse, {select, MS}}} -> 1199 {ok, {select, MS}}; 1200 {value, {traverse, first_next}} -> 1201 {ok, first_next}; 1202 {value, {Key, _}} -> 1203 badarg; 1204 false -> 1205 Default = default_option(Key), 1206 {ok, Default} 1207 end, 1208 case V of 1209 badarg -> 1210 {badarg, Key}; 1211 {ok, Value} -> 1212 NewOptions = lists:keydelete(Key, 1, Options), 1213 options(NewOptions, Keys, [Value | L]) 1214 end; 1215options([], [], L) -> 1216 lists:reverse(L); 1217options(Options, _, _L) -> 1218 {badarg,Options}. 1219 1220default_option(format) -> term; 1221default_option(min_no_slots) -> default; 1222default_option(traverse) -> select; 1223default_option(n_objects) -> default. 1224 1225listify(L) when is_list(L) -> 1226 L; 1227listify(T) -> 1228 [T]. 1229 1230treq(Tab, R) -> 1231 case catch dets_server:get_pid(Tab) of 1232 Pid when is_pid(Pid) -> 1233 req(Pid, R); 1234 _ -> 1235 badarg 1236 end. 1237 1238req(Proc, R) -> 1239 Ref = erlang:monitor(process, Proc), 1240 Proc ! ?DETS_CALL(self(), R), 1241 receive 1242 {'DOWN', Ref, process, Proc, _Info} -> 1243 badarg; 1244 {Proc, Reply} -> 1245 erlang:demonitor(Ref, [flush]), 1246 Reply 1247 end. 1248 1249%% Inlined. 1250einval({error, {file_error, _, einval}}, A) -> 1251 erlang:error(badarg, A); 1252einval({error, {file_error, _, badarg}}, A) -> 1253 erlang:error(badarg, A); 1254einval(Reply, _A) -> 1255 Reply. 1256 1257%% Inlined. 1258badarg(badarg, A) -> 1259 erlang:error(badarg, A); 1260badarg(Reply, _A) -> 1261 Reply. 1262 1263%% Inlined. 1264undefined(badarg) -> 1265 undefined; 1266undefined(Reply) -> 1267 Reply. 1268 1269%% Inlined. 1270badarg_exit(badarg, A) -> 1271 erlang:error(badarg, A); 1272badarg_exit({ok, Reply}, _A) -> 1273 Reply; 1274badarg_exit(Reply, _A) -> 1275 exit(Reply). 1276 1277%%%----------------------------------------------------------------- 1278%%% Server functions 1279%%%----------------------------------------------------------------- 1280 1281init(Parent, Server) -> 1282 process_flag(trap_exit, true), 1283 %% The Dets server pretends the file is open before 1284 %% internal_open() has been called, which means that unless the 1285 %% internal_open message is applied first, other processes can 1286 %% find the pid by calling dets_server:get_pid() and do things 1287 %% before Head has been initialized properly. 1288 receive 1289 ?DETS_CALL(From, {internal_open, Ref, Args}=Op) -> 1290 try do_internal_open(Parent, Server, From, Ref, Args) of 1291 Head -> 1292 open_file_loop(Head, 0) 1293 catch 1294 exit:normal -> 1295 exit(normal); 1296 _:Bad:Stacktrace -> 1297 bug_found(no_name, Op, Bad, Stacktrace, From), 1298 exit(Bad) % give up 1299 end 1300 end. 1301 1302open_file_loop(Head, N) when element(1, Head#head.update_mode) =:= error -> 1303 open_file_loop2(Head, N); 1304open_file_loop(Head, N) -> 1305 receive 1306 %% When the table is fixed it can be assumed that at least one 1307 %% traversal is in progress. To speed the traversal up three 1308 %% things have been done: 1309 %% - prioritize match_init, bchunk, next, and match_delete_init; 1310 %% - do not peek the message queue for updates; 1311 %% - wait 1 ms after each update. 1312 %% next is normally followed by lookup, but since lookup is also 1313 %% used when not traversing the table, it is not prioritized. 1314 ?DETS_CALL(From, {match_init, _State, _Safe} = Op) -> 1315 do_apply_op(Op, From, Head, N); 1316 ?DETS_CALL(From, {bchunk, _State} = Op) -> 1317 do_apply_op(Op, From, Head, N); 1318 ?DETS_CALL(From, {next, _Key} = Op) -> 1319 do_apply_op(Op, From, Head, N); 1320 ?DETS_CALL(From, {match_delete_init, _MP, _Spec} = Op) -> 1321 do_apply_op(Op, From, Head, N); 1322 {'EXIT', Pid, Reason} when Pid =:= Head#head.parent -> 1323 %% Parent orders shutdown. 1324 _NewHead = do_stop(Head), 1325 exit(Reason); 1326 {'EXIT', Pid, Reason} when Pid =:= Head#head.server -> 1327 %% The server is gone. 1328 _NewHead = do_stop(Head), 1329 exit(Reason); 1330 {'EXIT', Pid, _Reason} -> 1331 %% A process fixing the table exits. 1332 H2 = remove_fix(Head, Pid, close), 1333 open_file_loop(H2, N); 1334 {system, From, Req} -> 1335 sys:handle_system_msg(Req, From, Head#head.parent, 1336 ?MODULE, [], Head) 1337 after 0 -> 1338 open_file_loop2(Head, N) 1339 end. 1340 1341open_file_loop2(Head, N) -> 1342 receive 1343 ?DETS_CALL(From, Op) -> 1344 do_apply_op(Op, From, Head, N); 1345 {'EXIT', Pid, Reason} when Pid =:= Head#head.parent -> 1346 %% Parent orders shutdown. 1347 _NewHead = do_stop(Head), 1348 exit(Reason); 1349 {'EXIT', Pid, Reason} when Pid =:= Head#head.server -> 1350 %% The server is gone. 1351 _NewHead = do_stop(Head), 1352 exit(Reason); 1353 {'EXIT', Pid, _Reason} -> 1354 %% A process fixing the table exits. 1355 H2 = remove_fix(Head, Pid, close), 1356 open_file_loop(H2, N); 1357 {system, From, Req} -> 1358 sys:handle_system_msg(Req, From, Head#head.parent, 1359 ?MODULE, [], Head); 1360 Message -> 1361 error_logger:format("** dets: unexpected message" 1362 "(ignored): ~tw~n", [Message]), 1363 open_file_loop(Head, N) 1364 end. 1365 1366do_apply_op(Op, From, Head, N) -> 1367 try apply_op(Op, From, Head, N) of 1368 ok -> 1369 open_file_loop(Head, N); 1370 {N2, H2} when is_record(H2, head), is_integer(N2) -> 1371 open_file_loop(H2, N2); 1372 H2 when is_record(H2, head) -> 1373 open_file_loop(H2, N); 1374 {{more,From1,Op1,N1}, NewHead} -> 1375 do_apply_op(Op1, From1, NewHead, N1) 1376 catch 1377 exit:normal -> 1378 exit(normal); 1379 _:Bad:Stacktrace -> 1380 bug_found(Head#head.name, Op, Bad, Stacktrace, From), 1381 open_file_loop(Head, N) 1382 end. 1383 1384apply_op(Op, From, Head, N) -> 1385 case Op of 1386 {add_user, Tab, OpenArgs}-> 1387 #open_args{file = Fname, type = Type, keypos = Keypos, 1388 ram_file = Ram, access = Access} = OpenArgs, 1389 %% min_no_slots and max_no_slots are not tested 1390 Res = if 1391 Tab =:= Head#head.name, 1392 Head#head.keypos =:= Keypos, 1393 Head#head.type =:= Type, 1394 Head#head.ram_file =:= Ram, 1395 Head#head.access =:= Access, 1396 Fname =:= Head#head.filename -> 1397 ok; 1398 true -> 1399 err({error, incompatible_arguments}) 1400 end, 1401 From ! {self(), Res}, 1402 ok; 1403 auto_save -> 1404 case Head#head.update_mode of 1405 saved -> 1406 Head; 1407 {error, _Reason} -> 1408 Head; 1409 _Dirty when N =:= 0 -> % dirty or new_dirty 1410 %% The updates seems to have declined 1411 dets_utils:vformat("** dets: Auto save of ~tp\n", 1412 [Head#head.name]), 1413 {NewHead, _Res} = perform_save(Head, true), 1414 erlang:garbage_collect(), 1415 {0, NewHead}; 1416 dirty -> 1417 %% Reset counter and try later 1418 start_auto_save_timer(Head), 1419 {0, Head} 1420 end; 1421 close -> 1422 From ! {self(), fclose(Head)}, 1423 _NewHead = unlink_fixing_procs(Head), 1424 ?PROFILE(ep:done()), 1425 exit(normal); 1426 {close, Pid} -> 1427 %% Used from dets_server when Pid has closed the table, 1428 %% but the table is still opened by some process. 1429 NewHead = remove_fix(Head, Pid, close), 1430 From ! {self(), status(NewHead)}, 1431 NewHead; 1432 {corrupt, Reason} -> 1433 {H2, Error} = dets_utils:corrupt_reason(Head, Reason), 1434 From ! {self(), Error}, 1435 H2; 1436 {delayed_write, WrTime} -> 1437 delayed_write(Head, WrTime); 1438 info -> 1439 {H2, Res} = finfo(Head), 1440 From ! {self(), Res}, 1441 H2; 1442 {info, Tag} -> 1443 {H2, Res} = finfo(Head, Tag), 1444 From ! {self(), Res}, 1445 H2; 1446 {is_compatible_bchunk_format, Term} -> 1447 Res = test_bchunk_format(Head, Term), 1448 From ! {self(), Res}, 1449 ok; 1450 {internal_open, Ref, Args} -> 1451 do_internal_open(Head#head.parent, Head#head.server, From, 1452 Ref, Args); 1453 may_grow when Head#head.update_mode =/= saved -> 1454 if 1455 Head#head.update_mode =:= dirty -> 1456 %% Won't grow more if the table is full. 1457 {H2, _Res} = 1458 dets_v9:may_grow(Head, 0, many_times), 1459 {N + 1, H2}; 1460 true -> 1461 ok 1462 end; 1463 {set_verbose, What} -> 1464 set_verbose(What), 1465 From ! {self(), ok}, 1466 ok; 1467 {where, Object} -> 1468 {H2, Res} = where_is_object(Head, Object), 1469 From ! {self(), Res}, 1470 H2; 1471 _Message when element(1, Head#head.update_mode) =:= error -> 1472 From ! {self(), status(Head)}, 1473 ok; 1474 %% The following messages assume that the status of the table is OK. 1475 {bchunk_init, Tab} -> 1476 {H2, Res} = do_bchunk_init(Head, Tab), 1477 From ! {self(), Res}, 1478 H2; 1479 {bchunk, State} -> 1480 {H2, Res} = do_bchunk(Head, State), 1481 From ! {self(), Res}, 1482 H2; 1483 delete_all_objects -> 1484 {H2, Res} = fdelete_all_objects(Head), 1485 From ! {self(), Res}, 1486 erlang:garbage_collect(), 1487 {0, H2}; 1488 {delete_key, _Keys} when Head#head.update_mode =:= dirty -> 1489 stream_op(Op, From, [], Head, N); 1490 {delete_object, Objs} when Head#head.update_mode =:= dirty -> 1491 case check_objects(Objs, Head#head.keypos) of 1492 true -> 1493 stream_op(Op, From, [], Head, N); 1494 false -> 1495 From ! {self(), badarg}, 1496 ok 1497 end; 1498 first -> 1499 {H2, Res} = ffirst(Head), 1500 From ! {self(), Res}, 1501 H2; 1502 {initialize, InitFun, Format, MinNoSlots} -> 1503 {H2, Res} = finit(Head, InitFun, Format, MinNoSlots), 1504 From ! {self(), Res}, 1505 erlang:garbage_collect(), 1506 H2; 1507 {insert, Objs} when Head#head.update_mode =:= dirty -> 1508 case check_objects(Objs, Head#head.keypos) of 1509 true -> 1510 stream_op(Op, From, [], Head, N); 1511 false -> 1512 From ! {self(), badarg}, 1513 ok 1514 end; 1515 {insert_new, Objs} when Head#head.update_mode =:= dirty -> 1516 {H2, Res} = finsert_new(Head, Objs), 1517 From ! {self(), Res}, 1518 {N + 1, H2}; 1519 {lookup_keys, _Keys} -> 1520 stream_op(Op, From, [], Head, N); 1521 {match_init, State, Safe} -> 1522 {H1, Res} = fmatch_init(Head, State), 1523 H2 = case Res of 1524 {cont,_} -> H1; 1525 _ when Safe =:= no_safe-> H1; 1526 _ when Safe =:= safe -> do_safe_fixtable(H1, From, false) 1527 end, 1528 From ! {self(), Res}, 1529 H2; 1530 {match, MP, Spec, NObjs, Safe} -> 1531 {H2, Res} = fmatch(Head, MP, Spec, NObjs, Safe, From), 1532 From ! {self(), Res}, 1533 H2; 1534 {member, _Key} = Op -> 1535 stream_op(Op, From, [], Head, N); 1536 {next, Key} -> 1537 {H2, Res} = fnext(Head, Key), 1538 From ! {self(), Res}, 1539 H2; 1540 {match_delete, State} when Head#head.update_mode =:= dirty -> 1541 {H1, Res} = fmatch_delete(Head, State), 1542 H2 = case Res of 1543 {cont,_S,_N} -> H1; 1544 _ -> do_safe_fixtable(H1, From, false) 1545 end, 1546 From ! {self(), Res}, 1547 {N + 1, H2}; 1548 {match_delete_init, MP, Spec} when Head#head.update_mode =:= dirty -> 1549 {H2, Res} = fmatch_delete_init(Head, MP, Spec, From), 1550 From ! {self(), Res}, 1551 {N + 1, H2}; 1552 {safe_fixtable, Bool} -> 1553 NewHead = do_safe_fixtable(Head, From, Bool), 1554 From ! {self(), ok}, 1555 NewHead; 1556 {slot, Slot} -> 1557 {H2, Res} = fslot(Head, Slot), 1558 From ! {self(), Res}, 1559 H2; 1560 sync -> 1561 {NewHead, Res} = perform_save(Head, true), 1562 From ! {self(), Res}, 1563 erlang:garbage_collect(), 1564 {0, NewHead}; 1565 {update_counter, Key, Incr} when Head#head.update_mode =:= dirty -> 1566 {NewHead, Res} = do_update_counter(Head, Key, Incr), 1567 From ! {self(), Res}, 1568 {N + 1, NewHead}; 1569 WriteOp when Head#head.update_mode =:= new_dirty -> 1570 H2 = Head#head{update_mode = dirty}, 1571 apply_op(WriteOp, From, H2, 0); 1572 WriteOp when Head#head.access =:= read_write, 1573 Head#head.update_mode =:= saved -> 1574 case catch dets_v9:mark_dirty(Head) of 1575 ok -> 1576 start_auto_save_timer(Head), 1577 H2 = Head#head{update_mode = dirty}, 1578 apply_op(WriteOp, From, H2, 0); 1579 {NewHead, Error} when is_record(NewHead, head) -> 1580 From ! {self(), Error}, 1581 NewHead 1582 end; 1583 WriteOp when is_tuple(WriteOp), Head#head.access =:= read -> 1584 Reason = {access_mode, Head#head.filename}, 1585 From ! {self(), err({error, Reason})}, 1586 ok 1587 end. 1588 1589bug_found(Name, Op, Bad, Stacktrace, From) -> 1590 case dets_utils:debug_mode() of 1591 true -> 1592 %% If stream_op/5 found more requests, this is not 1593 %% the last operation. 1594 error_logger:format 1595 ("** dets: Bug was found when accessing table ~tw,~n" 1596 "** dets: operation was ~tp and reply was ~tw.~n" 1597 "** dets: Stacktrace: ~tw~n", 1598 [Name, Op, Bad, Stacktrace]); 1599 false -> 1600 error_logger:format 1601 ("** dets: Bug was found when accessing table ~tw~n", 1602 [Name]) 1603 end, 1604 if 1605 From =/= self() -> 1606 From ! {self(), {error, {dets_bug, Name, Op, Bad}}}, 1607 ok; 1608 true -> % auto_save | may_grow | {delayed_write, _} 1609 ok 1610 end. 1611 1612do_internal_open(Parent, Server, From, Ref, Args) -> 1613 ?PROFILE(ep:do()), 1614 case do_open_file(Args, Parent, Server, Ref) of 1615 {ok, Head} -> 1616 From ! {self(), ok}, 1617 Head; 1618 Error -> 1619 From ! {self(), Error}, 1620 exit(normal) 1621 end. 1622 1623start_auto_save_timer(Head) when Head#head.auto_save =:= infinity -> 1624 ok; 1625start_auto_save_timer(Head) -> 1626 Millis = Head#head.auto_save, 1627 _Ref = erlang:send_after(Millis, self(), ?DETS_CALL(self(), auto_save)), 1628 ok. 1629 1630%% Peek the message queue and try to evaluate several 1631%% lookup requests in parallel. Evalute delete_object, delete and 1632%% insert as well. 1633stream_op(Op, Pid, Pids, Head, N) -> 1634 #head{fixed = Fxd, update_mode = M} = Head, 1635 stream_op(Head, Pids, [], N, Pid, Op, Fxd, M). 1636 1637stream_loop(Head, Pids, C, N, false = Fxd, M) -> 1638 receive 1639 ?DETS_CALL(From, Message) -> 1640 stream_op(Head, Pids, C, N, From, Message, Fxd, M) 1641 after 0 -> 1642 stream_end(Head, Pids, C, N, no_more) 1643 end; 1644stream_loop(Head, Pids, C, N, _Fxd, _M) -> 1645 stream_end(Head, Pids, C, N, no_more). 1646 1647stream_op(Head, Pids, C, N, Pid, {lookup_keys,Keys}, Fxd, M) -> 1648 NC = [{{lookup,Pid},Keys} | C], 1649 stream_loop(Head, Pids, NC, N, Fxd, M); 1650stream_op(Head, Pids, C, N, Pid, {insert, _Objects} = Op, Fxd, dirty = M) -> 1651 NC = [Op | C], 1652 stream_loop(Head, [Pid | Pids], NC, N, Fxd, M); 1653stream_op(Head, Pids, C, N, Pid, {delete_key, _Keys} = Op, Fxd, dirty = M) -> 1654 NC = [Op | C], 1655 stream_loop(Head, [Pid | Pids], NC, N, Fxd, M); 1656stream_op(Head, Pids, C, N, Pid, {delete_object, _Os} = Op, Fxd, dirty = M) -> 1657 NC = [Op | C], 1658 stream_loop(Head, [Pid | Pids], NC, N, Fxd, M); 1659stream_op(Head, Pids, C, N, Pid, {member, Key}, Fxd, M) -> 1660 NC = [{{lookup,[Pid]},[Key]} | C], 1661 stream_loop(Head, Pids, NC, N, Fxd, M); 1662stream_op(Head, Pids, C, N, Pid, Op, _Fxd, _M) -> 1663 stream_end(Head, Pids, C, N, {Pid,Op}). 1664 1665stream_end(Head, Pids0, C, N, Next) -> 1666 case catch update_cache(Head, lists:reverse(C)) of 1667 {Head1, [], PwriteList} -> 1668 stream_end1(Pids0, Next, N, C, Head1, PwriteList); 1669 {Head1, Found, PwriteList} -> 1670 %% Possibly an optimization: reply to lookup requests 1671 %% first, then write stuff. This makes it possible for 1672 %% clients to continue while the disk is accessed. 1673 %% (Replies to lookup requests are sent earlier than 1674 %% replies to delete and insert requests even if the 1675 %% latter requests were made before the lookup requests, 1676 %% which can be confusing.) 1677 _ = lookup_replies(Found), 1678 stream_end1(Pids0, Next, N, C, Head1, PwriteList); 1679 Head1 when is_record(Head1, head) -> 1680 stream_end2(Pids0, Pids0, Next, N, C, Head1, ok); 1681 {Head1, Error} when is_record(Head1, head) -> 1682 %% Dig out the processes that did lookup or member. 1683 Fun = fun({{lookup,[Pid]},_Keys}, L) -> [Pid | L]; 1684 ({{lookup,Pid},_Keys}, L) -> [Pid | L]; 1685 (_, L) -> L 1686 end, 1687 LPs0 = lists:foldl(Fun, [], C), 1688 LPs = lists:usort(lists:flatten(LPs0)), 1689 stream_end2(Pids0 ++ LPs, Pids0, Next, N, C, Head1, Error); 1690 DetsError -> 1691 throw(DetsError) 1692 end. 1693 1694stream_end1(Pids, Next, N, C, Head, []) -> 1695 stream_end2(Pids, Pids, Next, N, C, Head, ok); 1696stream_end1(Pids, Next, N, C, Head, PwriteList) -> 1697 {Head1, PR} = (catch dets_utils:pwrite(Head, PwriteList)), 1698 stream_end2(Pids, Pids, Next, N, C, Head1, PR). 1699 1700stream_end2([Pid | Pids], Ps, Next, N, C, Head, Reply) -> 1701 Pid ! {self(), Reply}, 1702 stream_end2(Pids, Ps, Next, N+1, C, Head, Reply); 1703stream_end2([], Ps, no_more, N, C, Head, _Reply) -> 1704 penalty(Head, Ps, C), 1705 {N, Head}; 1706stream_end2([], _Ps, {From, Op}, N, _C, Head, _Reply) -> 1707 {{more,From,Op,N},Head}. 1708 1709penalty(H, _Ps, _C) when H#head.fixed =:= false -> 1710 ok; 1711penalty(_H, _Ps, [{{lookup,_Pids},_Keys}]) -> 1712 ok; 1713penalty(#head{fixed = {_,[{Pid,_}]}}, [Pid], _C) -> 1714 ok; 1715penalty(_H, _Ps, _C) -> 1716 timer:sleep(1). 1717 1718lookup_replies([{P,O}]) -> 1719 lookup_reply(P, O); 1720lookup_replies(Q) -> 1721 [{P,O} | L] = dets_utils:family(Q), 1722 lookup_replies(P, lists:append(O), L). 1723 1724lookup_replies(P, O, []) -> 1725 lookup_reply(P, O); 1726lookup_replies(P, O, [{P2,O2} | L]) -> 1727 _ = lookup_reply(P, O), 1728 lookup_replies(P2, lists:append(O2), L). 1729 1730%% If a list of Pid then op was {member, Key}. Inlined. 1731lookup_reply([P], O) -> 1732 P ! {self(), O =/= []}; 1733lookup_reply(P, O) -> 1734 P ! {self(), O}. 1735 1736%%----------------------------------------------------------------- 1737%% Callback functions for system messages handling. 1738%%----------------------------------------------------------------- 1739system_continue(_Parent, _, Head) -> 1740 open_file_loop(Head, 0). 1741 1742system_terminate(Reason, _Parent, _, Head) -> 1743 _NewHead = do_stop(Head), 1744 exit(Reason). 1745 1746%%----------------------------------------------------------------- 1747%% Code for upgrade. 1748%%----------------------------------------------------------------- 1749system_code_change(State, _Module, _OldVsn, _Extra) -> 1750 {ok, State}. 1751 1752 1753%%%---------------------------------------------------------------------- 1754%%% Internal functions 1755%%%---------------------------------------------------------------------- 1756 1757%% -> {ok, Fd, fileheader()} | throw(Error) 1758read_file_header(FileName, Access, RamFile) -> 1759 BF = if 1760 RamFile -> 1761 case file:read_file(FileName) of 1762 {ok, B} -> B; 1763 Err -> dets_utils:file_error(FileName, Err) 1764 end; 1765 true -> 1766 FileName 1767 end, 1768 {ok, Fd} = dets_utils:open(BF, open_args(Access, RamFile)), 1769 {ok, <<Version:32>>} = 1770 dets_utils:pread_close(Fd, FileName, ?FILE_FORMAT_VERSION_POS, 4), 1771 if 1772 Version =< 8 -> 1773 _ = file:close(Fd), 1774 throw({error, {format_8_no_longer_supported, FileName}}); 1775 Version =:= 9 -> 1776 dets_v9:read_file_header(Fd, FileName); 1777 true -> 1778 _ = file:close(Fd), 1779 throw({error, {not_a_dets_file, FileName}}) 1780 end. 1781 1782fclose(Head) -> 1783 {Head1, Res} = perform_save(Head, false), 1784 case Head1#head.ram_file of 1785 true -> 1786 Res; 1787 false -> 1788 dets_utils:stop_disk_map(), 1789 Res2 = file:close(Head1#head.fptr), 1790 if 1791 Res2 =:= ok -> Res; 1792 true -> Res2 1793 end 1794 end. 1795 1796%% -> {NewHead, Res} 1797perform_save(Head, DoSync) when Head#head.update_mode =:= dirty; 1798 Head#head.update_mode =:= new_dirty -> 1799 case catch begin 1800 {Head1, []} = write_cache(Head), 1801 {Head2, ok} = dets_v9:do_perform_save(Head1), 1802 ok = ensure_written(Head2, DoSync), 1803 {Head2#head{update_mode = saved}, ok} 1804 end of 1805 {NewHead, _} = Reply when is_record(NewHead, head) -> 1806 Reply 1807 end; 1808perform_save(Head, _DoSync) -> 1809 {Head, status(Head)}. 1810 1811ensure_written(Head, DoSync) when Head#head.ram_file -> 1812 {ok, EOF} = dets_utils:position(Head, eof), 1813 {ok, Bin} = dets_utils:pread(Head, 0, EOF, 0), 1814 if 1815 DoSync -> 1816 dets_utils:write_file(Head, Bin); 1817 not DoSync -> 1818 case file:write_file(Head#head.filename, Bin) of 1819 ok -> 1820 ok; 1821 Error -> 1822 dets_utils:corrupt_file(Head, Error) 1823 end 1824 end; 1825ensure_written(Head, true) when not Head#head.ram_file -> 1826 dets_utils:sync(Head); 1827ensure_written(Head, false) when not Head#head.ram_file -> 1828 ok. 1829 1830%% -> {NewHead, {cont(), [binary()]}} | {NewHead, Error} 1831do_bchunk_init(Head, Tab) -> 1832 case catch write_cache(Head) of 1833 {H2, []} -> 1834 case dets_v9:table_parameters(H2) of 1835 undefined -> 1836 {H2, {error, old_version}}; 1837 Parms -> 1838 L = dets_utils:all_allocated(H2), 1839 Bin = if 1840 L =:= <<>> -> eof; 1841 true -> <<>> 1842 end, 1843 BinParms = term_to_binary(Parms), 1844 {H2, {#dets_cont{no_objs = default, bin = Bin, alloc = L, 1845 tab = Tab, proc = self(),what = bchunk}, 1846 [BinParms]}} 1847 end; 1848 {NewHead, _} = HeadError when is_record(NewHead, head) -> 1849 HeadError 1850 end. 1851 1852%% -> {NewHead, {cont(), [binary()]}} | {NewHead, Error} 1853do_bchunk(Head, #dets_cont{proc = Proc}) when Proc =/= self() -> 1854 {Head, badarg}; 1855do_bchunk(Head, #dets_cont{bin = eof}) -> 1856 {Head, '$end_of_table'}; 1857do_bchunk(Head, State) -> 1858 case dets_v9:read_bchunks(Head, State#dets_cont.alloc) of 1859 {error, Reason} -> 1860 dets_utils:corrupt_reason(Head, Reason); 1861 {finished, Bins} -> 1862 {Head, {State#dets_cont{bin = eof}, Bins}}; 1863 {Bins, NewL} -> 1864 {Head, {State#dets_cont{alloc = NewL}, Bins}} 1865 end. 1866 1867%% -> {NewHead, Result} 1868fdelete_all_objects(Head) when Head#head.fixed =:= false -> 1869 case catch do_delete_all_objects(Head) of 1870 {ok, NewHead} -> 1871 start_auto_save_timer(NewHead), 1872 {NewHead, ok}; 1873 {error, Reason} -> 1874 dets_utils:corrupt_reason(Head, Reason) 1875 end; 1876fdelete_all_objects(Head) -> 1877 {Head, fixed}. 1878 1879do_delete_all_objects(Head) -> 1880 #head{fptr = Fd, name = Tab, filename = Fname, type = Type, keypos = Kp, 1881 ram_file = Ram, auto_save = Auto, min_no_slots = MinSlots, 1882 max_no_slots = MaxSlots, cache = Cache} = Head, 1883 CacheSz = dets_utils:cache_size(Cache), 1884 ok = dets_utils:truncate(Fd, Fname, bof), 1885 dets_v9:initiate_file(Fd, Tab, Fname, Type, Kp, MinSlots, MaxSlots, 1886 Ram, CacheSz, Auto, true). 1887 1888ffirst(H) -> 1889 Ref = make_ref(), 1890 case catch {Ref, ffirst1(H)} of 1891 {Ref, {NH, R}} -> 1892 {NH, {ok, R}}; 1893 {NH, R} when is_record(NH, head) -> 1894 {NH, {error, R}} 1895 end. 1896 1897ffirst1(H) -> 1898 check_safe_fixtable(H), 1899 {NH, []} = write_cache(H), 1900 ffirst(NH, 0). 1901 1902ffirst(H, Slot) -> 1903 case dets_v9:slot_objs(H, Slot) of 1904 '$end_of_table' -> {H, '$end_of_table'}; 1905 [] -> ffirst(H, Slot+1); 1906 [X|_] -> {H, element(H#head.keypos, X)} 1907 end. 1908 1909%% -> {NewHead, Reply}, Reply = ok | badarg | Error. 1910finsert(Head, Objects) -> 1911 case catch update_cache(Head, Objects, insert) of 1912 {NewHead, []} -> 1913 {NewHead, ok}; 1914 {NewHead, _} = HeadError when is_record(NewHead, head) -> 1915 HeadError 1916 end. 1917 1918%% -> {NewHead, Reply}, Reply = ok | badarg | Error. 1919finsert_new(Head, Objects) -> 1920 KeyPos = Head#head.keypos, 1921 case catch lists:map(fun(Obj) -> element(KeyPos, Obj) end, Objects) of 1922 Keys when is_list(Keys) -> 1923 case catch update_cache(Head, Keys, {lookup, nopid}) of 1924 {Head1, PidObjs} when is_list(PidObjs) -> 1925 case lists:all(fun({_P,OL}) -> OL =:= [] end, PidObjs) of 1926 true -> 1927 case catch update_cache(Head1, Objects, insert) of 1928 {NewHead, []} -> 1929 {NewHead, true}; 1930 {NewHead, Error} when is_record(NewHead, head) -> 1931 {NewHead, Error} 1932 end; 1933 false=Reply -> 1934 {Head1, Reply} 1935 end; 1936 {NewHead, _} = HeadError when is_record(NewHead, head) -> 1937 HeadError 1938 end; 1939 _ -> 1940 {Head, badarg} 1941 end. 1942 1943do_safe_fixtable(Head, Pid, true) -> 1944 case Head#head.fixed of 1945 false -> 1946 link(Pid), 1947 MonTime = erlang:monotonic_time(), 1948 TimeOffset = erlang:time_offset(), 1949 Fixed = {{MonTime, TimeOffset}, [{Pid, 1}]}, 1950 Ftab = dets_utils:get_freelists(Head), 1951 Head#head{fixed = Fixed, freelists = {Ftab, Ftab}}; 1952 {TimeStamp, Counters} -> 1953 case lists:keysearch(Pid, 1, Counters) of 1954 {value, {Pid, Counter}} -> % when Counter > 1 1955 NewCounters = lists:keyreplace(Pid, 1, Counters, 1956 {Pid, Counter+1}), 1957 Head#head{fixed = {TimeStamp, NewCounters}}; 1958 false -> 1959 link(Pid), 1960 Fixed = {TimeStamp, [{Pid, 1} | Counters]}, 1961 Head#head{fixed = Fixed} 1962 end 1963 end; 1964do_safe_fixtable(Head, Pid, false) -> 1965 remove_fix(Head, Pid, false). 1966 1967remove_fix(Head, Pid, How) -> 1968 case Head#head.fixed of 1969 false -> 1970 Head; 1971 {TimeStamp, Counters} -> 1972 case lists:keysearch(Pid, 1, Counters) of 1973 %% How =:= close when Pid closes the table. 1974 {value, {Pid, Counter}} when Counter =:= 1; How =:= close -> 1975 unlink(Pid), 1976 case lists:keydelete(Pid, 1, Counters) of 1977 [] -> 1978 check_growth(Head), 1979 erlang:garbage_collect(), 1980 Head#head{fixed = false, 1981 freelists = dets_utils:get_freelists(Head)}; 1982 NewCounters -> 1983 Head#head{fixed = {TimeStamp, NewCounters}} 1984 end; 1985 {value, {Pid, Counter}} -> 1986 NewCounters = lists:keyreplace(Pid, 1, Counters, 1987 {Pid, Counter-1}), 1988 Head#head{fixed = {TimeStamp, NewCounters}}; 1989 false -> 1990 Head 1991 end 1992 end. 1993 1994do_stop(Head) -> 1995 _NewHead = unlink_fixing_procs(Head), 1996 fclose(Head). 1997 1998unlink_fixing_procs(Head) -> 1999 case Head#head.fixed of 2000 false -> 2001 Head; 2002 {_, Counters} -> 2003 lists:foreach(fun({Pid, _Counter}) -> unlink(Pid) end, Counters), 2004 Head#head{fixed = false, 2005 freelists = dets_utils:get_freelists(Head)} 2006 end. 2007 2008check_growth(#head{access = read}) -> 2009 ok; 2010check_growth(Head) -> 2011 NoThings = no_things(Head), 2012 if 2013 NoThings > Head#head.next -> 2014 _Ref = erlang:send_after 2015 (200, self(), ?DETS_CALL(self(), may_grow)), % Catch up. 2016 ok; 2017 true -> 2018 ok 2019 end. 2020 2021finfo(H) -> 2022 case catch write_cache(H) of 2023 {H2, []} -> 2024 Info = (catch [{type, H2#head.type}, 2025 {keypos, H2#head.keypos}, 2026 {size, H2#head.no_objects}, 2027 {file_size, 2028 file_size(H2#head.fptr, H2#head.filename)}, 2029 {filename, H2#head.filename}]), 2030 {H2, Info}; 2031 {H2, _} = HeadError when is_record(H2, head) -> 2032 HeadError 2033 end. 2034 2035finfo(H, access) -> {H, H#head.access}; 2036finfo(H, auto_save) -> {H, H#head.auto_save}; 2037finfo(H, bchunk_format) -> 2038 case catch write_cache(H) of 2039 {H2, []} -> 2040 case dets_v9:table_parameters(H2) of 2041 undefined = Undef -> 2042 {H2, Undef}; 2043 Parms -> 2044 {H2, term_to_binary(Parms)} 2045 end; 2046 {H2, _} = HeadError when is_record(H2, head) -> 2047 HeadError 2048 end; 2049finfo(H, delayed_write) -> % undocumented 2050 {H, dets_utils:cache_size(H#head.cache)}; 2051finfo(H, filename) -> {H, H#head.filename}; 2052finfo(H, file_size) -> 2053 case catch write_cache(H) of 2054 {H2, []} -> 2055 {H2, catch file_size(H#head.fptr, H#head.filename)}; 2056 {H2, _} = HeadError when is_record(H2, head) -> 2057 HeadError 2058 end; 2059finfo(H, fixed) -> 2060 %% true if fixtable/2 has been called 2061 {H, not (H#head.fixed =:= false)}; 2062finfo(H, hash) -> {H, H#head.hash_bif}; 2063finfo(H, keypos) -> {H, H#head.keypos}; 2064finfo(H, memory) -> finfo(H, file_size); 2065finfo(H, no_objects) -> finfo(H, size); 2066finfo(H, no_keys) -> 2067 case catch write_cache(H) of 2068 {H2, []} -> 2069 {H2, H2#head.no_keys}; 2070 {H2, _} = HeadError when is_record(H2, head) -> 2071 HeadError 2072 end; 2073finfo(H, no_slots) -> {H, dets_v9:no_slots(H)}; 2074finfo(H, pid) -> {H, self()}; 2075finfo(H, ram_file) -> {H, H#head.ram_file}; 2076finfo(H, safe_fixed) -> 2077 {H, 2078 case H#head.fixed of 2079 false -> 2080 false; 2081 {{FixMonTime, TimeOffset}, RefList} -> 2082 {make_timestamp(FixMonTime, TimeOffset), RefList} 2083 end}; 2084finfo(H, safe_fixed_monotonic_time) -> 2085 {H, 2086 case H#head.fixed of 2087 false -> 2088 false; 2089 {{FixMonTime, _TimeOffset}, RefList} -> 2090 {FixMonTime, RefList} 2091 end}; 2092finfo(H, size) -> 2093 case catch write_cache(H) of 2094 {H2, []} -> 2095 {H2, H2#head.no_objects}; 2096 {H2, _} = HeadError when is_record(H2, head) -> 2097 HeadError 2098 end; 2099finfo(H, type) -> {H, H#head.type}; 2100finfo(H, version) -> {H, 9}; 2101finfo(H, _) -> {H, undefined}. 2102 2103file_size(Fd, FileName) -> 2104 {ok, Pos} = dets_utils:position(Fd, FileName, eof), 2105 Pos. 2106 2107test_bchunk_format(_Head, undefined) -> 2108 false; 2109test_bchunk_format(Head, Term) -> 2110 dets_v9:try_bchunk_header(Term, Head) =/= not_ok. 2111 2112do_open_file([Fname, Verbose], Parent, Server, Ref) -> 2113 case catch fopen2(Fname, Ref) of 2114 {error, {tooshort, _}} -> 2115 err({error, {not_a_dets_file, Fname}}); 2116 {error, _Reason} = Error -> 2117 err(Error); 2118 {ok, Head} -> 2119 maybe_put(verbose, Verbose), 2120 {ok, Head#head{parent = Parent, server = Server}}; 2121 {'EXIT', _Reason} = Error -> 2122 Error; 2123 Bad -> 2124 error_logger:format 2125 ("** dets: Bug was found in open_file/1, reply was ~tw.~n", 2126 [Bad]), 2127 {error, {dets_bug, Fname, Bad}} 2128 end; 2129do_open_file([Tab, OpenArgs, Verb], Parent, Server, _Ref) -> 2130 case catch fopen3(Tab, OpenArgs) of 2131 {error, {tooshort, _}} -> 2132 err({error, {not_a_dets_file, OpenArgs#open_args.file}}); 2133 {error, _Reason} = Error -> 2134 err(Error); 2135 {ok, Head} -> 2136 maybe_put(verbose, Verb), 2137 {ok, Head#head{parent = Parent, server = Server}}; 2138 {'EXIT', _Reason} = Error -> 2139 Error; 2140 Bad -> 2141 error_logger:format 2142 ("** dets: Bug was found in open_file/2, arguments were~n" 2143 "** dets: ~tw and reply was ~tw.~n", 2144 [OpenArgs, Bad]), 2145 {error, {dets_bug, Tab, {open_file, OpenArgs}, Bad}} 2146 end. 2147 2148maybe_put(_, undefined) -> 2149 ignore; 2150maybe_put(K, V) -> 2151 put(K, V). 2152 2153%% -> {Head, Result}, Result = ok | Error | {thrown, Error} | badarg 2154finit(Head, InitFun, _Format, _NoSlots) when Head#head.access =:= read -> 2155 _ = (catch InitFun(close)), 2156 {Head, {error, {access_mode, Head#head.filename}}}; 2157finit(Head, InitFun, _Format, _NoSlots) when Head#head.fixed =/= false -> 2158 _ = (catch InitFun(close)), 2159 {Head, {error, {fixed_table, Head#head.name}}}; 2160finit(Head, InitFun, Format, NoSlots) -> 2161 case catch do_finit(Head, InitFun, Format, NoSlots) of 2162 {ok, NewHead} -> 2163 check_growth(NewHead), 2164 start_auto_save_timer(NewHead), 2165 {NewHead, ok}; 2166 badarg -> 2167 {Head, badarg}; 2168 Error -> 2169 dets_utils:corrupt(Head, Error) 2170 end. 2171 2172%% -> {ok, NewHead} | throw(badarg) | throw(Error) 2173do_finit(Head, Init, Format, NoSlots) -> 2174 #head{fptr = Fd, type = Type, keypos = Kp, auto_save = Auto, 2175 cache = Cache, filename = Fname, ram_file = Ram, 2176 min_no_slots = MinSlots0, max_no_slots = MaxSlots, 2177 name = Tab, update_mode = UpdateMode} = Head, 2178 CacheSz = dets_utils:cache_size(Cache), 2179 {How, Head1} = 2180 case Format of 2181 term when is_integer(NoSlots), NoSlots > MaxSlots -> 2182 throw(badarg); 2183 term -> 2184 MinSlots = choose_no_slots(NoSlots, MinSlots0), 2185 if 2186 UpdateMode =:= new_dirty, MinSlots =:= MinSlots0 -> 2187 {general_init, Head}; 2188 true -> 2189 ok = dets_utils:truncate(Fd, Fname, bof), 2190 {ok, H} = 2191 dets_v9:initiate_file(Fd, Tab, Fname, Type, Kp, 2192 MinSlots, MaxSlots, Ram, 2193 CacheSz, Auto, false), 2194 {general_init, H} 2195 end; 2196 bchunk -> 2197 ok = dets_utils:truncate(Fd, Fname, bof), 2198 {bchunk_init, Head} 2199 end, 2200 case How of 2201 bchunk_init -> 2202 case dets_v9:bchunk_init(Head1, Init) of 2203 {ok, NewHead} -> 2204 {ok, NewHead#head{update_mode = dirty}}; 2205 Error -> 2206 Error 2207 end; 2208 general_init -> 2209 Cntrs = ets:new(dets_init, []), 2210 Input = dets_v9:bulk_input(Head1, Init, Cntrs), 2211 SlotNumbers = {Head1#head.min_no_slots, bulk_init, MaxSlots}, 2212 {Reply, SizeData} = 2213 do_sort(Head1, SlotNumbers, Input, Cntrs, Fname), 2214 Bulk = true, 2215 case Reply of 2216 {ok, NoDups, H1} -> 2217 fsck_copy(SizeData, H1, Bulk, NoDups); 2218 Else -> 2219 close_files(Bulk, SizeData, Head1), 2220 Else 2221 end 2222 end. 2223 2224%% -> {NewHead, [LookedUpObject]} | {NewHead, Error} 2225flookup_keys(Head, Keys) -> 2226 case catch update_cache(Head, Keys, {lookup, nopid}) of 2227 {NewHead, [{_NoPid,Objs}]} -> 2228 {NewHead, Objs}; 2229 {NewHead, L} when is_list(L) -> 2230 {NewHead, lists:flatmap(fun({_Pid,OL}) -> OL end, L)}; 2231 {NewHead, _} = HeadError when is_record(NewHead, head) -> 2232 HeadError 2233 end. 2234 2235%% -> {NewHead, Result} 2236fmatch_init(Head, #dets_cont{bin = eof}) -> 2237 {Head, '$end_of_table'}; 2238fmatch_init(Head, C) -> 2239 case scan(Head, C) of 2240 {scan_error, Reason} -> 2241 dets_utils:corrupt_reason(Head, Reason); 2242 {Ts, NC} -> 2243 {Head, {cont, {Ts, NC}}} 2244 end. 2245 2246%% -> {NewHead, Result} 2247fmatch(Head, MP, Spec, N, Safe, From) -> 2248 KeyPos = Head#head.keypos, 2249 case find_all_keys(Spec, KeyPos, []) of 2250 [] -> 2251 %% Complete match 2252 case catch write_cache(Head) of 2253 {Head1, []} -> 2254 NewHead = 2255 case Safe of 2256 safe -> do_safe_fixtable(Head1, From, true); 2257 no_safe -> Head1 2258 end, 2259 C0 = init_scan(NewHead, N), 2260 {NewHead, {cont, C0#dets_cont{match_program = MP}}}; 2261 {NewHead, _} = HeadError when is_record(NewHead, head) -> 2262 HeadError 2263 end; 2264 List -> 2265 Keys = lists:usort(List), 2266 {NewHead, Reply} = flookup_keys(Head, Keys), 2267 case Reply of 2268 Objs when is_list(Objs) -> 2269 {match_spec, MS} = MP, 2270 MatchingObjs = ets:match_spec_run(Objs, MS), 2271 {NewHead, {done, MatchingObjs}}; 2272 Error -> 2273 {NewHead, Error} 2274 end 2275 end. 2276 2277find_all_keys([], _, Ks) -> 2278 Ks; 2279find_all_keys([{H,_,_} | T], KeyPos, Ks) when is_tuple(H) -> 2280 case tuple_size(H) of 2281 Enough when Enough >= KeyPos -> 2282 Key = element(KeyPos, H), 2283 case contains_variable(Key) of 2284 true -> 2285 []; 2286 false -> 2287 find_all_keys(T, KeyPos, [Key | Ks]) 2288 end; 2289 _ -> 2290 find_all_keys(T, KeyPos, Ks) 2291 end; 2292find_all_keys(_, _, _) -> 2293 []. 2294 2295contains_variable('_') -> 2296 true; 2297contains_variable(A) when is_atom(A) -> 2298 case atom_to_list(A) of 2299 [$$ | T] -> 2300 case (catch list_to_integer(T)) of 2301 {'EXIT', _} -> 2302 false; 2303 _ -> 2304 true 2305 end; 2306 _ -> 2307 false 2308 end; 2309contains_variable(T) when is_tuple(T) -> 2310 contains_variable(tuple_to_list(T)); 2311contains_variable([]) -> 2312 false; 2313contains_variable([H|T]) -> 2314 case contains_variable(H) of 2315 true -> 2316 true; 2317 false -> 2318 contains_variable(T) 2319 end; 2320contains_variable(_) -> 2321 false. 2322 2323%% -> {NewHead, Res} 2324fmatch_delete_init(Head, MP, Spec, From) -> 2325 KeyPos = Head#head.keypos, 2326 case catch 2327 case find_all_keys(Spec, KeyPos, []) of 2328 [] -> 2329 do_fmatch_delete_var_keys(Head, MP, Spec, From); 2330 List -> 2331 Keys = lists:usort(List), 2332 do_fmatch_constant_keys(Head, Keys, MP) 2333 end of 2334 {NewHead, _} = Reply when is_record(NewHead, head) -> 2335 Reply 2336 end. 2337 2338%% A note: If deleted objects reside in a bucket with other objects 2339%% that are not deleted, the bucket is moved. If the address of the 2340%% moved bucket is greater than original bucket address the kept 2341%% objects will be read once again later on. 2342%% -> {NewHead, Res} 2343fmatch_delete(Head, C) -> 2344 case scan(Head, C) of 2345 {scan_error, Reason} -> 2346 dets_utils:corrupt_reason(Head, Reason); 2347 {[], _} -> 2348 {Head, {done, 0}}; 2349 {RTs, NC} -> 2350 {match_spec, MP} = C#dets_cont.match_program, 2351 case catch filter_binary_terms(RTs, MP, []) of 2352 {'EXIT', _} -> 2353 Bad = dets_utils:bad_object(fmatch_delete, RTs), 2354 dets_utils:corrupt_reason(Head, Bad); 2355 Terms -> 2356 do_fmatch_delete(Head, Terms, NC) 2357 end 2358 end. 2359 2360do_fmatch_delete_var_keys(Head, _MP, ?PATTERN_TO_TRUE_MATCH_SPEC('_'), _From) 2361 when Head#head.fixed =:= false -> 2362 %% Handle the case where the file is emptied efficiently. 2363 %% Empty the cache just to get the number of objects right. 2364 {Head1, []} = write_cache(Head), 2365 N = Head1#head.no_objects, 2366 case fdelete_all_objects(Head1) of 2367 {NewHead, ok} -> 2368 {NewHead, {done, N}}; 2369 Reply -> 2370 Reply 2371 end; 2372do_fmatch_delete_var_keys(Head, MP, _Spec, From) -> 2373 Head1 = do_safe_fixtable(Head, From, true), 2374 {NewHead, []} = write_cache(Head1), 2375 C0 = init_scan(NewHead, default), 2376 {NewHead, {cont, C0#dets_cont{match_program = MP}, 0}}. 2377 2378do_fmatch_constant_keys(Head, Keys, {match_spec, MP}) -> 2379 case flookup_keys(Head, Keys) of 2380 {NewHead, ReadTerms} when is_list(ReadTerms) -> 2381 Terms = filter_terms(ReadTerms, MP, []), 2382 do_fmatch_delete(NewHead, Terms, fixed); 2383 Reply -> 2384 Reply 2385 end. 2386 2387filter_binary_terms([Bin | Bins], MP, L) -> 2388 Term = binary_to_term(Bin), 2389 case ets:match_spec_run([Term], MP) of 2390 [true] -> 2391 filter_binary_terms(Bins, MP, [Term | L]); 2392 _ -> 2393 filter_binary_terms(Bins, MP, L) 2394 end; 2395filter_binary_terms([], _MP, L) -> 2396 L. 2397 2398filter_terms([Term | Terms], MP, L) -> 2399 case ets:match_spec_run([Term], MP) of 2400 [true] -> 2401 filter_terms(Terms, MP, [Term | L]); 2402 _ -> 2403 filter_terms(Terms, MP, L) 2404 end; 2405filter_terms([], _MP, L) -> 2406 L. 2407 2408do_fmatch_delete(Head, Terms, What) -> 2409 N = length(Terms), 2410 case do_delete(Head, Terms, delete_object) of 2411 {NewHead, ok} when What =:= fixed -> 2412 {NewHead, {done, N}}; 2413 {NewHead, ok} -> 2414 {NewHead, {cont, What, N}}; 2415 Reply -> 2416 Reply 2417 end. 2418 2419do_delete(Head, Things, What) -> 2420 case catch update_cache(Head, Things, What) of 2421 {NewHead, []} -> 2422 {NewHead, ok}; 2423 {NewHead, _} = HeadError when is_record(NewHead, head) -> 2424 HeadError 2425 end. 2426 2427fnext(Head, Key) -> 2428 Slot = dets_v9:db_hash(Key, Head), 2429 Ref = make_ref(), 2430 case catch {Ref, fnext(Head, Key, Slot)} of 2431 {Ref, {H, R}} -> 2432 {H, {ok, R}}; 2433 {NewHead, _} = HeadError when is_record(NewHead, head) -> 2434 HeadError 2435 end. 2436 2437fnext(H, Key, Slot) -> 2438 {NH, []} = write_cache(H), 2439 case dets_v9:slot_objs(NH, Slot) of 2440 '$end_of_table' -> {NH, '$end_of_table'}; 2441 L -> fnext_search(NH, Key, Slot, L) 2442 end. 2443 2444fnext_search(H, K, Slot, L) -> 2445 Kp = H#head.keypos, 2446 case beyond_key(K, Kp, L) of 2447 [] -> fnext_slot(H, K, Slot+1); 2448 L2 -> {H, element(H#head.keypos, hd(L2))} 2449 end. 2450 2451%% We've got to continue to search for the next key in the next slot 2452fnext_slot(H, K, Slot) -> 2453 case dets_v9:slot_objs(H, Slot) of 2454 '$end_of_table' -> {H, '$end_of_table'}; 2455 [] -> fnext_slot(H, K, Slot+1); 2456 L -> {H, element(H#head.keypos, hd(L))} 2457 end. 2458 2459beyond_key(_K, _Kp, []) -> []; 2460beyond_key(K, Kp, [H|T]) -> 2461 case dets_utils:cmp(element(Kp, H), K) of 2462 0 -> beyond_key2(K, Kp, T); 2463 _ -> beyond_key(K, Kp, T) 2464 end. 2465 2466beyond_key2(_K, _Kp, []) -> []; 2467beyond_key2(K, Kp, [H|T]=L) -> 2468 case dets_utils:cmp(element(Kp, H), K) of 2469 0 -> beyond_key2(K, Kp, T); 2470 _ -> L 2471 end. 2472 2473%% Open an already existing file, no arguments 2474%% -> {ok, head()} | throw(Error) 2475fopen2(Fname, Tab) -> 2476 case file:read_file_info(Fname) of 2477 {ok, _} -> 2478 Acc = read_write, 2479 Ram = false, 2480 {ok, Fd, FH} = read_file_header(Fname, Acc, Ram), 2481 Do = case dets_v9:check_file_header(FH, Fd) of 2482 {ok, Head1} -> 2483 Head2 = Head1#head{filename = Fname}, 2484 try {ok, dets_v9:init_freelist(Head2)} 2485 catch 2486 throw:_ -> 2487 {repair, " has bad free lists, repairing ..."} 2488 end; 2489 {error, not_closed} -> 2490 M = " not properly closed, repairing ...", 2491 {repair, M}; 2492 Else -> 2493 Else 2494 end, 2495 case Do of 2496 {repair, Mess} -> 2497 io:format(user, "dets: file ~tp~s~n", [Fname, Mess]), 2498 case fsck(Fd, Tab, Fname, FH, default, default) of 2499 ok -> 2500 fopen2(Fname, Tab); 2501 Error -> 2502 throw(Error) 2503 end; 2504 {ok, Head} -> 2505 open_final(Head, Fname, Acc, Ram, ?DEFAULT_CACHE, 2506 Tab, false); 2507 {error, Reason} -> 2508 throw({error, {Reason, Fname}}) 2509 end; 2510 Error -> 2511 dets_utils:file_error(Fname, Error) 2512 end. 2513 2514%% Open and possibly create and initialize a file 2515%% -> {ok, head()} | throw(Error) 2516fopen3(Tab, OpenArgs) -> 2517 FileName = OpenArgs#open_args.file, 2518 case file:read_file_info(FileName) of 2519 {ok, _} -> 2520 fopen_existing_file(Tab, OpenArgs); 2521 Error when OpenArgs#open_args.access =:= read -> 2522 dets_utils:file_error(FileName, Error); 2523 _Error -> 2524 fopen_init_file(Tab, OpenArgs) 2525 end. 2526 2527fopen_existing_file(Tab, OpenArgs) -> 2528 #open_args{file = Fname, type = Type, keypos = Kp, repair = Rep, 2529 min_no_slots = MinSlots, max_no_slots = MaxSlots, 2530 ram_file = Ram, delayed_write = CacheSz, auto_save = 2531 Auto, access = Acc, debug = Debug} = 2532 OpenArgs, 2533 {ok, Fd, FH} = read_file_header(Fname, Acc, Ram), 2534 MinF = (MinSlots =:= default) or (MinSlots =:= FH#fileheader.min_no_slots), 2535 MaxF = (MaxSlots =:= default) or (MaxSlots =:= FH#fileheader.max_no_slots), 2536 Wh = case dets_v9:check_file_header(FH, Fd) of 2537 {ok, Head} when Rep =:= force, Acc =:= read_write, 2538 FH#fileheader.no_colls =/= undefined, 2539 MinF, MaxF -> 2540 {compact, Head}; 2541 {ok, _Head} when Rep =:= force, Acc =:= read -> 2542 throw({error, {access_mode, Fname}}); 2543 {ok, _Head} when Rep =:= force -> 2544 M = ", repair forced.", 2545 {repair, M}; 2546 {ok, Head} -> 2547 {final, Head}; 2548 {error, not_closed} when Rep =:= force, Acc =:= read_write -> 2549 M = ", repair forced.", 2550 {repair, M}; 2551 {error, not_closed} when Rep =:= true, Acc =:= read_write -> 2552 M = " not properly closed, repairing ...", 2553 {repair, M}; 2554 {error, not_closed} when Rep =:= false -> 2555 throw({error, {needs_repair, Fname}}); 2556 {error, Reason} -> 2557 throw({error, {Reason, Fname}}) 2558 end, 2559 Do = case Wh of 2560 {Tag, Hd} when Tag =:= final; Tag =:= compact -> 2561 Hd1 = Hd#head{filename = Fname}, 2562 try {Tag, dets_v9:init_freelist(Hd1)} 2563 catch 2564 throw:_ -> 2565 {repair, " has bad free lists, repairing ..."} 2566 end; 2567 Else -> 2568 Else 2569 end, 2570 case Do of 2571 _ when FH#fileheader.type =/= Type -> 2572 throw({error, {type_mismatch, Fname}}); 2573 _ when FH#fileheader.keypos =/= Kp -> 2574 throw({error, {keypos_mismatch, Fname}}); 2575 {compact, SourceHead} -> 2576 io:format(user, "dets: file ~tp is now compacted ...~n", [Fname]), 2577 {ok, NewSourceHead} = open_final(SourceHead, Fname, read, false, 2578 ?DEFAULT_CACHE, Tab, Debug), 2579 case catch compact(NewSourceHead) of 2580 ok -> 2581 erlang:garbage_collect(), 2582 fopen3(Tab, OpenArgs#open_args{repair = false}); 2583 _Err -> 2584 _ = file:close(Fd), 2585 dets_utils:stop_disk_map(), 2586 io:format(user, "dets: compaction of file ~tp failed, " 2587 "now repairing ...~n", [Fname]), 2588 {ok, Fd2, _FH} = read_file_header(Fname, Acc, Ram), 2589 do_repair(Fd2, Tab, Fname, FH, MinSlots, MaxSlots, 2590 OpenArgs) 2591 end; 2592 {repair, Mess} -> 2593 io:format(user, "dets: file ~tp~s~n", [Fname, Mess]), 2594 do_repair(Fd, Tab, Fname, FH, MinSlots, MaxSlots, 2595 OpenArgs); 2596 {final, H} -> 2597 H1 = H#head{auto_save = Auto}, 2598 open_final(H1, Fname, Acc, Ram, CacheSz, Tab, Debug) 2599 end. 2600 2601do_repair(Fd, Tab, Fname, FH, MinSlots, MaxSlots, OpenArgs) -> 2602 case fsck(Fd, Tab, Fname, FH, MinSlots, MaxSlots) of 2603 ok -> 2604 erlang:garbage_collect(), 2605 fopen3(Tab, OpenArgs#open_args{repair = false}); 2606 Error -> 2607 throw(Error) 2608 end. 2609 2610%% -> {ok, head()} | throw(Error) 2611open_final(Head, Fname, Acc, Ram, CacheSz, Tab, Debug) -> 2612 Head1 = Head#head{access = Acc, 2613 ram_file = Ram, 2614 filename = Fname, 2615 name = Tab, 2616 cache = dets_utils:new_cache(CacheSz)}, 2617 init_disk_map(Tab, Debug), 2618 dets_v9:cache_segps(Head1#head.fptr, Fname, Head1#head.next), 2619 check_growth(Head1), 2620 {ok, Head1}. 2621 2622%% -> {ok, head()} | throw(Error) 2623fopen_init_file(Tab, OpenArgs) -> 2624 #open_args{file = Fname, type = Type, keypos = Kp, 2625 min_no_slots = MinSlotsArg, max_no_slots = MaxSlotsArg, 2626 ram_file = Ram, delayed_write = CacheSz, auto_save = Auto, 2627 debug = Debug} = OpenArgs, 2628 MinSlots = choose_no_slots(MinSlotsArg, ?DEFAULT_MIN_NO_SLOTS), 2629 MaxSlots = choose_no_slots(MaxSlotsArg, ?DEFAULT_MAX_NO_SLOTS), 2630 FileSpec = if 2631 Ram -> []; 2632 true -> Fname 2633 end, 2634 {ok, Fd} = dets_utils:open(FileSpec, open_args(read_write, Ram)), 2635 %% No need to truncate an empty file. 2636 init_disk_map(Tab, Debug), 2637 case catch dets_v9:initiate_file(Fd, Tab, Fname, Type, Kp, 2638 MinSlots, MaxSlots, 2639 Ram, CacheSz, Auto, true) of 2640 {error, Reason} when Ram -> 2641 _ = file:close(Fd), 2642 throw({error, Reason}); 2643 {error, Reason} -> 2644 _ = file:close(Fd), 2645 _ = file:delete(Fname), 2646 throw({error, Reason}); 2647 {ok, Head} -> 2648 start_auto_save_timer(Head), 2649 %% init_table does not need to truncate and write header 2650 {ok, Head#head{update_mode = new_dirty}} 2651 end. 2652 2653%% Debug. 2654init_disk_map(Name, Debug) -> 2655 case Debug orelse dets_utils:debug_mode() of 2656 true -> 2657 dets_utils:init_disk_map(Name); 2658 false -> 2659 ok 2660 end. 2661 2662open_args(Access, RamFile) -> 2663 A1 = case Access of 2664 read -> []; 2665 read_write -> [write] 2666 end, 2667 A2 = case RamFile of 2668 true -> [ram]; 2669 false -> [raw] 2670 end, 2671 A1 ++ A2 ++ [binary, read]. 2672 2673%% -> ok | throw(Error) 2674compact(SourceHead) -> 2675 #head{name = Tab, filename = Fname, fptr = SFd, type = Type, keypos = Kp, 2676 ram_file = Ram, auto_save = Auto} = SourceHead, 2677 Tmp = tempfile(Fname), 2678 TblParms = dets_v9:table_parameters(SourceHead), 2679 {ok, Fd} = dets_utils:open(Tmp, open_args(read_write, false)), 2680 CacheSz = ?DEFAULT_CACHE, 2681 %% It is normally not possible to have two open tables in the same 2682 %% process since the process dictionary is used for caching 2683 %% segment pointers, but here is works anyway--when reading a file 2684 %% serially the pointers do not need to be used. 2685 Head = case catch dets_v9:prep_table_copy(Fd, Tab, Tmp, Type, Kp, Ram, 2686 CacheSz, Auto, TblParms) of 2687 {ok, H} -> 2688 H; 2689 Error -> 2690 _ = file:close(Fd), 2691 _ = file:delete(Tmp), 2692 throw(Error) 2693 end, 2694 2695 case dets_v9:compact_init(SourceHead, Head, TblParms) of 2696 {ok, NewHead} -> 2697 R = case fclose(NewHead) of 2698 ok -> 2699 ok = file:close(SFd), 2700 %% Save (rename) Fname first? 2701 dets_utils:rename(Tmp, Fname); 2702 E -> 2703 E 2704 end, 2705 if 2706 R =:= ok -> ok; 2707 true -> 2708 _ = file:delete(Tmp), 2709 throw(R) 2710 end; 2711 Err -> 2712 _ = file:close(Fd), 2713 _ = file:delete(Tmp), 2714 throw(Err) 2715 end. 2716 2717%% -> ok | Error 2718%% Closes Fd. 2719fsck(Fd, Tab, Fname, FH, MinSlotsArg, MaxSlotsArg) -> 2720 %% MinSlots and MaxSlots are the option values. 2721 #fileheader{min_no_slots = MinSlotsFile, 2722 max_no_slots = MaxSlotsFile} = FH, 2723 EstNoSlots0 = file_no_things(FH), 2724 MinSlots = choose_no_slots(MinSlotsArg, MinSlotsFile), 2725 MaxSlots = choose_no_slots(MaxSlotsArg, MaxSlotsFile), 2726 EstNoSlots = erlang:min(MaxSlots, erlang:max(MinSlots, EstNoSlots0)), 2727 SlotNumbers = {MinSlots, EstNoSlots, MaxSlots}, 2728 %% When repairing: We first try and sort on slots using MinSlots. 2729 %% If the number of objects (keys) turns out to be significantly 2730 %% different from NoSlots, we try again with the correct number of 2731 %% objects (keys). 2732 case fsck_try(Fd, Tab, FH, Fname, SlotNumbers) of 2733 {try_again, BetterNoSlots} -> 2734 BetterSlotNumbers = {MinSlots, BetterNoSlots, MaxSlots}, 2735 case fsck_try(Fd, Tab, FH, Fname, BetterSlotNumbers) of 2736 {try_again, _} -> 2737 _ = file:close(Fd), 2738 {error, {cannot_repair, Fname}}; 2739 Else -> 2740 Else 2741 end; 2742 Else -> 2743 Else 2744 end. 2745 2746choose_no_slots(default, NoSlots) -> NoSlots; 2747choose_no_slots(NoSlots, _) -> NoSlots. 2748 2749%% -> ok | {try_again, integer()} | Error 2750%% Closes Fd unless {try_again, _} is returned. 2751%% Initiating a table using a fun and repairing (or converting) a 2752%% file are completely different things, but nevertheless the same 2753%% method is used in both cases... 2754fsck_try(Fd, Tab, FH, Fname, SlotNumbers) -> 2755 Tmp = tempfile(Fname), 2756 #fileheader{type = Type, keypos = KeyPos} = FH, 2757 {_MinSlots, EstNoSlots, MaxSlots} = SlotNumbers, 2758 OpenArgs = #open_args{file = Tmp, type = Type, keypos = KeyPos, 2759 repair = false, min_no_slots = EstNoSlots, 2760 max_no_slots = MaxSlots, 2761 ram_file = false, delayed_write = ?DEFAULT_CACHE, 2762 auto_save = infinity, access = read_write, 2763 debug = false}, 2764 case catch fopen3(Tab, OpenArgs) of 2765 {ok, Head} -> 2766 case fsck_try_est(Head, Fd, Fname, SlotNumbers, FH) of 2767 {ok, NewHead} -> 2768 R = case fclose(NewHead) of 2769 ok -> 2770 %% Save (rename) Fname first? 2771 dets_utils:rename(Tmp, Fname); 2772 Error -> 2773 Error 2774 end, 2775 if 2776 R =:= ok -> ok; 2777 true -> 2778 _ = file:delete(Tmp), 2779 R 2780 end; 2781 TryAgainOrError -> 2782 _ = file:delete(Tmp), 2783 TryAgainOrError 2784 end; 2785 Error -> 2786 _ = file:close(Fd), 2787 Error 2788 end. 2789 2790tempfile(Fname) -> 2791 Tmp = lists:concat([Fname, ".TMP"]), 2792 case file:delete(Tmp) of 2793 {error, _Reason} -> % typically enoent 2794 ok; 2795 ok -> 2796 assure_no_file(Tmp) 2797 end, 2798 Tmp. 2799 2800assure_no_file(File) -> 2801 case file:read_file_info(File) of 2802 {ok, _FileInfo} -> 2803 %% Wait for some other process to close the file: 2804 timer:sleep(100), 2805 assure_no_file(File); 2806 {error, _} -> 2807 ok 2808 end. 2809 2810%% -> {ok, NewHead} | {try_again, integer()} | Error 2811fsck_try_est(Head, Fd, Fname, SlotNumbers, FH) -> 2812 %% Mod is the module to use for reading input when repairing. 2813 Cntrs = ets:new(dets_repair, []), 2814 Input = dets_v9:fsck_input(Head, Fd, Cntrs, FH), 2815 {Reply, SizeData} = do_sort(Head, SlotNumbers, Input, Cntrs, Fname), 2816 Bulk = false, 2817 case Reply of 2818 {ok, NoDups, H1} -> 2819 _ = file:close(Fd), 2820 fsck_copy(SizeData, H1, Bulk, NoDups); 2821 {try_again, _} = Return -> 2822 close_files(Bulk, SizeData, Head), 2823 Return; 2824 Else -> 2825 _ = file:close(Fd), 2826 close_files(Bulk, SizeData, Head), 2827 Else 2828 end. 2829 2830do_sort(Head, SlotNumbers, Input, Cntrs, Fname) -> 2831 %% output_objs/4 replaces {LogSize,NoObjects} in Cntrs by 2832 %% {LogSize,Position,Data,NoObjects | NoCollections}. 2833 %% Data = {FileName,FileDescriptor} | [object()] 2834 %% For small tables Data can be a list of objects which is more 2835 %% efficient since no temporary files are created. 2836 Output = dets_v9:output_objs(Head, SlotNumbers, Cntrs), 2837 TmpDir = filename:dirname(Fname), 2838 Reply = (catch file_sorter:sort(Input, Output, 2839 [{format, binary},{tmpdir, TmpDir}])), 2840 L = ets:tab2list(Cntrs), 2841 ets:delete(Cntrs), 2842 {Reply, lists:reverse(lists:keysort(1, L))}. 2843 2844fsck_copy([{_LogSz, Pos, Bins, _NoObjects} | SizeData], Head, _Bulk, NoDups) 2845 when is_list(Bins) -> 2846 true = NoDups =:= 0, 2847 PWs = [{Pos,Bins} | lists:map(fun({_, P, B, _}) -> {P, B} end, SizeData)], 2848 #head{fptr = Fd, filename = FileName} = Head, 2849 dets_utils:pwrite(Fd, FileName, PWs), 2850 {ok, Head#head{update_mode = dirty}}; 2851fsck_copy(SizeData, Head, Bulk, NoDups) -> 2852 catch fsck_copy1(SizeData, Head, Bulk, NoDups). 2853 2854fsck_copy1([SzData | L], Head, Bulk, NoDups) -> 2855 Out = Head#head.fptr, 2856 {LogSz, Pos, {FileName, Fd}, NoObjects} = SzData, 2857 Size = if NoObjects =:= 0 -> 0; true -> ?POW(LogSz-1) end, 2858 ExpectedSize = Size * NoObjects, 2859 case close_tmp(Fd) of 2860 ok -> ok; 2861 Err -> 2862 close_files(Bulk, L, Head), 2863 dets_utils:file_error(FileName, Err) 2864 end, 2865 case file:position(Out, Pos) of 2866 {ok, Pos} -> ok; 2867 Err2 -> 2868 close_files(Bulk, L, Head), 2869 dets_utils:file_error(Head#head.filename, Err2) 2870 end, 2871 CR = file:copy({FileName, [raw,binary]}, Out), 2872 _ = file:delete(FileName), 2873 case CR of 2874 {ok, Copied} when Copied =:= ExpectedSize; 2875 NoObjects =:= 0 -> % the segments 2876 fsck_copy1(L, Head, Bulk, NoDups); 2877 {ok, _Copied} -> % should never happen 2878 close_files(Bulk, L, Head), 2879 Reason = if Bulk -> initialization_failed; 2880 true -> repair_failed end, 2881 {error, {Reason, Head#head.filename}}; 2882 FError -> 2883 close_files(Bulk, L, Head), 2884 dets_utils:file_error(FileName, FError) 2885 end; 2886fsck_copy1([], Head, _Bulk, NoDups) when NoDups =/= 0 -> 2887 {error, {initialization_failed, Head#head.filename}}; 2888fsck_copy1([], Head, _Bulk, _NoDups) -> 2889 {ok, Head#head{update_mode = dirty}}. 2890 2891close_files(false, SizeData, Head) -> 2892 _ = file:close(Head#head.fptr), 2893 close_files(true, SizeData, Head); 2894close_files(true, SizeData, _Head) -> 2895 Fun = fun({_Size, _Pos, {FileName, Fd}, _No}) -> 2896 _ = close_tmp(Fd), 2897 file:delete(FileName); 2898 (_) -> 2899 ok 2900 end, 2901 lists:foreach(Fun, SizeData). 2902 2903close_tmp(Fd) -> 2904 file:close(Fd). 2905 2906fslot(H, Slot) -> 2907 case catch begin 2908 {NH, []} = write_cache(H), 2909 Objs = dets_v9:slot_objs(NH, Slot), 2910 {NH, Objs} 2911 end of 2912 {NewHead, _Objects} = Reply when is_record(NewHead, head) -> 2913 Reply 2914 end. 2915 2916do_update_counter(Head, _Key, _Incr) when Head#head.type =/= set -> 2917 {Head, badarg}; 2918do_update_counter(Head, Key, Incr) -> 2919 case flookup_keys(Head, [Key]) of 2920 {H1, [O]} -> 2921 Kp = H1#head.keypos, 2922 case catch try_update_tuple(O, Kp, Incr) of 2923 {'EXIT', _} -> 2924 {H1, badarg}; 2925 {New, Term} -> 2926 case finsert(H1, [Term]) of 2927 {H2, ok} -> 2928 {H2, New}; 2929 Reply -> 2930 Reply 2931 end 2932 end; 2933 {H1, []} -> 2934 {H1, badarg}; 2935 HeadError -> 2936 HeadError 2937 end. 2938 2939try_update_tuple(O, _Kp, {Pos, Incr}) -> 2940 try_update_tuple2(O, Pos, Incr); 2941try_update_tuple(O, Kp, Incr) -> 2942 try_update_tuple2(O, Kp+1, Incr). 2943 2944try_update_tuple2(O, Pos, Incr) -> 2945 New = element(Pos, O) + Incr, 2946 {New, setelement(Pos, O, New)}. 2947 2948set_verbose(true) -> 2949 put(verbose, yes); 2950set_verbose(_) -> 2951 erase(verbose). 2952 2953where_is_object(Head, Object) -> 2954 Keypos = Head#head.keypos, 2955 case check_objects([Object], Keypos) of 2956 true -> 2957 case catch write_cache(Head) of 2958 {NewHead, []} -> 2959 {NewHead, dets_v9:find_object(NewHead, Object)}; 2960 {NewHead, _} = HeadError when is_record(NewHead, head) -> 2961 HeadError 2962 end; 2963 false -> 2964 {Head, badarg} 2965 end. 2966 2967check_objects([T | Ts], Kp) when tuple_size(T) >= Kp -> 2968 check_objects(Ts, Kp); 2969check_objects(L, _Kp) -> 2970 L =:= []. 2971 2972no_things(Head) -> 2973 Head#head.no_keys. 2974 2975file_no_things(FH) -> 2976 FH#fileheader.no_keys. 2977 2978%%% The write cache is list of {Key, [Item]} where Item is one of 2979%%% {Seq, delete_key}, {Seq, {lookup,Pid}}, {Seq, {delete_object,object()}}, 2980%%% or {Seq, {insert,object()}}. Seq is a number that increases 2981%%% monotonically for each item put in the cache. The purpose is to 2982%%% make sure that items are sorted correctly. Sequences of delete and 2983%%% insert operations are inserted in the cache without doing any file 2984%%% operations. When the cache is considered full, a lookup operation 2985%%% is requested, or after some delay, the contents of the cache are 2986%%% written to the file, and the cache emptied. 2987%%% 2988%%% Data is not allowed to linger more than 'delay' milliseconds in 2989%%% the write cache. A delayed_write message is received when some 2990%%% datum has become too old. If 'wrtime' is equal to 'undefined', 2991%%% then the cache is empty and no such delayed_write message has been 2992%%% scheduled. Otherwise there is a delayed_write message scheduled, 2993%%% and the value of 'wrtime' is the time when the cache was last 2994%%% written, or when it was first updated after the cache was last 2995%%% written. 2996 2997update_cache(Head, KeysOrObjects, What) -> 2998 {Head1, LU, PwriteList} = update_cache(Head, [{What,KeysOrObjects}]), 2999 {NewHead, ok} = dets_utils:pwrite(Head1, PwriteList), 3000 {NewHead, LU}. 3001 3002%% -> {NewHead, [object()], pwrite_list()} | throw({Head, Error}) 3003update_cache(Head, ToAdd) -> 3004 Cache = Head#head.cache, 3005 #cache{cache = C, csize = Size0, inserts = Ins} = Cache, 3006 NewSize = Size0 + erlang:external_size(ToAdd), 3007 %% The size is used as a sequence number here; it increases monotonically. 3008 {NewC, NewIns, Lookup, Found} = 3009 cache_binary(Head, ToAdd, C, Size0, Ins, false, []), 3010 NewCache = Cache#cache{cache = NewC, csize = NewSize, inserts = NewIns}, 3011 Head1 = Head#head{cache = NewCache}, 3012 if 3013 Lookup; NewSize >= Cache#cache.tsize -> 3014 %% The cache is considered full, or some lookup. 3015 {NewHead, LU, PwriteList} = dets_v9:write_cache(Head1), 3016 {NewHead, Found ++ LU, PwriteList}; 3017 NewC =:= [] -> 3018 {Head1, Found, []}; 3019 Cache#cache.wrtime =:= undefined -> 3020 %% Empty cache. Schedule a delayed write. 3021 Now = time_now(), Me = self(), 3022 Call = ?DETS_CALL(Me, {delayed_write, Now}), 3023 erlang:send_after(Cache#cache.delay, Me, Call), 3024 {Head1#head{cache = NewCache#cache{wrtime = Now}}, Found, []}; 3025 Size0 =:= 0 -> 3026 %% Empty cache that has been written after the 3027 %% currently scheduled delayed write. 3028 {Head1#head{cache = NewCache#cache{wrtime = time_now()}}, Found, []}; 3029 true -> 3030 %% Cache is not empty, delayed write has been scheduled. 3031 {Head1, Found, []} 3032 end. 3033 3034cache_binary(Head, [{Q,Os} | L], C, Seq, Ins, Lu,F) when Q =:= delete_object -> 3035 cache_obj_op(Head, L, C, Seq, Ins, Lu, F, Os, Head#head.keypos, Q); 3036cache_binary(Head, [{Q,Os} | L], C, Seq, Ins, Lu, F) when Q =:= insert -> 3037 NewIns = Ins + length(Os), 3038 cache_obj_op(Head, L, C, Seq, NewIns, Lu, F, Os, Head#head.keypos, Q); 3039cache_binary(Head, [{Q,Ks} | L], C, Seq, Ins, Lu, F) when Q =:= delete_key -> 3040 cache_key_op(Head, L, C, Seq, Ins, Lu, F, Ks, Q); 3041cache_binary(Head, [{Q,Ks} | L], C, Seq, Ins, _Lu, F) when C =:= [] -> % lookup 3042 cache_key_op(Head, L, C, Seq, Ins, true, F, Ks, Q); 3043cache_binary(Head, [{Q,Ks} | L], C, Seq, Ins, Lu, F) -> % lookup 3044 case dets_utils:cache_lookup(Head#head.type, Ks, C, []) of 3045 false -> 3046 cache_key_op(Head, L, C, Seq, Ins, true, F, Ks, Q); 3047 Found -> 3048 {lookup,Pid} = Q, 3049 cache_binary(Head, L, C, Seq, Ins, Lu, [{Pid,Found} | F]) 3050 end; 3051cache_binary(_Head, [], C, _Seq, Ins, Lu, F) -> 3052 {C, Ins, Lu, F}. 3053 3054cache_key_op(Head, L, C, Seq, Ins, Lu, F, [K | Ks], Q) -> 3055 E = {K, {Seq, Q}}, 3056 cache_key_op(Head, L, [E | C], Seq+1, Ins, Lu, F, Ks, Q); 3057cache_key_op(Head, L, C, Seq, Ins, Lu, F, [], _Q) -> 3058 cache_binary(Head, L, C, Seq, Ins, Lu, F). 3059 3060cache_obj_op(Head, L, C, Seq, Ins, Lu, F, [O | Os], Kp, Q) -> 3061 E = {element(Kp, O), {Seq, {Q, O}}}, 3062 cache_obj_op(Head, L, [E | C], Seq+1, Ins, Lu, F, Os, Kp, Q); 3063cache_obj_op(Head, L, C, Seq, Ins, Lu, F, [], _Kp, _Q) -> 3064 cache_binary(Head, L, C, Seq, Ins, Lu, F). 3065 3066%% Called after some delay. 3067%% -> NewHead 3068delayed_write(Head, WrTime) -> 3069 Cache = Head#head.cache, 3070 LastWrTime = Cache#cache.wrtime, 3071 if 3072 LastWrTime =:= WrTime -> 3073 %% The cache was not emptied during the last delay. 3074 case catch write_cache(Head) of 3075 {Head2, []} -> 3076 NewCache = (Head2#head.cache)#cache{wrtime = undefined}, 3077 Head2#head{cache = NewCache}; 3078 {NewHead, _Error} -> % Head.update_mode has been updated 3079 NewHead 3080 end; 3081 true -> 3082 %% The cache was emptied during the delay. 3083 %% Has anything been written since then? 3084 if 3085 Cache#cache.csize =:= 0 -> 3086 %% No, further delayed write not needed. 3087 NewCache = Cache#cache{wrtime = undefined}, 3088 Head#head{cache = NewCache}; 3089 true -> 3090 %% Yes, schedule a new delayed write. 3091 When = round((LastWrTime - WrTime)/1000), Me = self(), 3092 Call = ?DETS_CALL(Me, {delayed_write, LastWrTime}), 3093 erlang:send_after(When, Me, Call), 3094 Head 3095 end 3096 end. 3097 3098%% -> {NewHead, [LookedUpObject]} | throw({NewHead, Error}) 3099write_cache(Head) -> 3100 {Head1, LU, PwriteList} = dets_v9:write_cache(Head), 3101 {NewHead, ok} = dets_utils:pwrite(Head1, PwriteList), 3102 {NewHead, LU}. 3103 3104status(Head) -> 3105 case Head#head.update_mode of 3106 saved -> ok; 3107 dirty -> ok; 3108 new_dirty -> ok; 3109 Error -> Error 3110 end. 3111 3112%%% Scan the file from start to end by reading chunks. 3113 3114%% -> dets_cont() 3115init_scan(Head, NoObjs) -> 3116 check_safe_fixtable(Head), 3117 FreeLists = dets_utils:get_freelists(Head), 3118 Base = Head#head.base, 3119 case dets_utils:find_next_allocated(FreeLists, Base, Base) of 3120 {From, To} -> 3121 #dets_cont{no_objs = NoObjs, bin = <<>>, alloc = {From,To,<<>>}}; 3122 none -> 3123 #dets_cont{no_objs = NoObjs, bin = eof, alloc = <<>>} 3124 end. 3125 3126check_safe_fixtable(Head) -> 3127 case (Head#head.fixed =:= false) andalso 3128 ((get(verbose) =:= yes) orelse dets_utils:debug_mode()) of 3129 true -> 3130 error_logger:format 3131 ("** dets: traversal of ~tp needs safe_fixtable~n", 3132 [Head#head.name]); 3133 false -> 3134 ok 3135 end. 3136 3137%% -> {[RTerm], dets_cont()} | {scan_error, Reason} 3138%% RTerm = {Pos, Next, Size, Status, Term} 3139scan(_Head, #dets_cont{alloc = <<>>}=C) -> 3140 {[], C}; 3141scan(Head, C) -> % when is_record(C, dets_cont) 3142 #dets_cont{no_objs = No, alloc = L0, bin = Bin} = C, 3143 {From, To, L} = L0, 3144 R = case No of 3145 default -> 3146 0; 3147 _ when is_integer(No) -> 3148 -No-1 3149 end, 3150 scan(Bin, Head, From, To, L, [], R, {C, Head#head.type}). 3151 3152scan(Bin, H, From, To, L, Ts, R, {C0, Type} = C) -> 3153 case dets_v9:scan_objs(H, Bin, From, To, L, Ts, R, Type) of 3154 {more, NFrom, NTo, NL, NTs, NR, Sz} -> 3155 scan_read(H, NFrom, NTo, Sz, NL, NTs, NR, C); 3156 {stop, <<>>=B, NFrom, NTo, <<>>=NL, NTs} -> 3157 Ftab = dets_utils:get_freelists(H), 3158 case dets_utils:find_next_allocated(Ftab, NFrom, H#head.base) of 3159 none -> 3160 {NTs, C0#dets_cont{bin = eof, alloc = B}}; 3161 _ -> 3162 {NTs, C0#dets_cont{bin = B, alloc = {NFrom, NTo, NL}}} 3163 end; 3164 {stop, B, NFrom, NTo, NL, NTs} -> 3165 {NTs, C0#dets_cont{bin = B, alloc = {NFrom, NTo, NL}}}; 3166 bad_object -> 3167 {scan_error, dets_utils:bad_object(scan, {From, To, Bin})} 3168 end. 3169 3170scan_read(_H, From, To, _Min, L0, Ts, 3171 R, {C, _Type}) when R >= ?CHUNK_SIZE -> 3172 %% We may have read (much) more than CHUNK_SIZE, if there are holes. 3173 L = {From, To, L0}, 3174 {Ts, C#dets_cont{bin = <<>>, alloc = L}}; 3175scan_read(H, From, _To, Min, _L, Ts, R, C) -> 3176 Max = if 3177 Min < ?CHUNK_SIZE -> ?CHUNK_SIZE; 3178 true -> Min 3179 end, 3180 FreeLists = dets_utils:get_freelists(H), 3181 case dets_utils:find_allocated(FreeLists, From, Max, H#head.base) of 3182 <<>>=Bin0 -> 3183 {Cont, _} = C, 3184 {Ts, Cont#dets_cont{bin = eof, alloc = Bin0}}; 3185 <<From1:32,To1:32,L1/binary>> -> 3186 case dets_utils:pread_n(H#head.fptr, From1, Max) of 3187 eof -> 3188 {scan_error, premature_eof}; 3189 NewBin -> 3190 scan(NewBin, H, From1, To1, L1, Ts, R, C) 3191 end 3192 end. 3193 3194err(Error) -> 3195 case get(verbose) of 3196 yes -> 3197 error_logger:format("** dets: failed with ~tw~n", [Error]), 3198 Error; 3199 undefined -> 3200 Error 3201 end. 3202 3203-compile({inline, [time_now/0]}). 3204time_now() -> 3205 erlang:monotonic_time(1000000). 3206 3207make_timestamp(MonTime, TimeOffset) -> 3208 ErlangSystemTime = erlang:convert_time_unit(MonTime+TimeOffset, 3209 native, 3210 microsecond), 3211 MegaSecs = ErlangSystemTime div 1000000000000, 3212 Secs = ErlangSystemTime div 1000000 - MegaSecs*1000000, 3213 MicroSecs = ErlangSystemTime rem 1000000, 3214 {MegaSecs, Secs, MicroSecs}. 3215 3216%%%%%%%%%%%%%%%%% DEBUG functions %%%%%%%%%%%%%%%% 3217 3218file_info(FileName) -> 3219 case catch read_file_header(FileName, read, false) of 3220 {ok, Fd, FH} -> 3221 _ = file:close(Fd), 3222 dets_v9:file_info(FH); 3223 Other -> 3224 Other 3225 end. 3226 3227get_head_field(Fd, Field) -> 3228 dets_utils:read_4(Fd, Field). 3229 3230%% Dump the contents of a DAT file to the tty 3231%% internal debug function which ignores the closed properly thingie 3232%% and just tries anyway 3233 3234view(FileName) -> 3235 case catch read_file_header(FileName, read, false) of 3236 {ok, Fd, FH} -> 3237 try dets_v9:check_file_header(FH, Fd) of 3238 {ok, H0} -> 3239 case dets_v9:check_file_header(FH, Fd) of 3240 {ok, H0} -> 3241 H = dets_v9:init_freelist(H0), 3242 v_free_list(H), 3243 dets_v9:v_segments(H), 3244 ok; 3245 X -> 3246 X 3247 end 3248 after _ = file:close(Fd) 3249 end; 3250 X -> 3251 X 3252 end. 3253 3254v_free_list(Head) -> 3255 io:format("FREE LIST ...... \n",[]), 3256 io:format("~p~n", [dets_utils:all_free(Head)]), 3257 io:format("END OF FREE LIST \n",[]). 3258