1%%
2%% %CopyrightBegin%
3%%
4%% Copyright Ericsson AB 1996-2018. All Rights Reserved.
5%%
6%% Licensed under the Apache License, Version 2.0 (the "License");
7%% you may not use this file except in compliance with the License.
8%% You may obtain a copy of the License at
9%%
10%%     http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing, software
13%% distributed under the License is distributed on an "AS IS" BASIS,
14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15%% See the License for the specific language governing permissions and
16%% limitations under the License.
17%%
18%% %CopyrightEnd%
19%%
20-module(dets).
21
22%% Disk based linear hashing lookup dictionary.
23
24%% Public.
25-export([all/0,
26	 bchunk/2,
27         close/1,
28         delete/2,
29         delete_all_objects/1,
30         delete_object/2,
31         first/1,
32         foldl/3,
33         foldr/3,
34         from_ets/2,
35         info/1,
36         info/2,
37         init_table/2,
38	 init_table/3,
39         insert/2,
40         insert_new/2,
41         is_compatible_bchunk_format/2,
42	 is_dets_file/1,
43         lookup/2,
44         match/1,
45         match/2,
46         match/3,
47         match_delete/2,
48         match_object/1,
49         match_object/2,
50         match_object/3,
51         member/2,
52         next/2,
53         open_file/1,
54         open_file/2,
55         pid2name/1,
56         repair_continuation/2,
57         safe_fixtable/2,
58         select/1,
59         select/2,
60         select/3,
61         select_delete/2,
62         slot/2,
63         sync/1,
64         table/1,
65         table/2,
66         to_ets/2,
67         traverse/2,
68         update_counter/3]).
69
70%% Server export.
71-export([start/0, stop/0]).
72
73%% Internal exports.
74-export([istart_link/1, init/2, internal_open/3, add_user/3,
75         internal_close/1, remove_user/2,
76	 system_continue/3, system_terminate/4, system_code_change/4]).
77
78%% Debug.
79-export([file_info/1,
80	 fsck/1,
81         fsck/2,
82	 get_head_field/2,
83	 view/1,
84	 where/2,
85	 verbose/0,
86	 verbose/1
87	]).
88
89%% Not documented, or not ready for publication.
90-export([lookup_keys/2]).
91
92-export_type([bindings_cont/0, cont/0, object_cont/0, select_cont/0,
93              tab_name/0]).
94
95-compile({inline, [{einval,2},{badarg,2},{undefined,1},
96                   {badarg_exit,2},{lookup_reply,2}]}).
97
98-include_lib("kernel/include/file.hrl").
99
100-include("dets.hrl").
101
102%%% This is the implementation of the mnesia file storage. Each (non
103%%% ram-copy) table is maintained in a corresponding .DAT file. The
104%%% dat file is organized as a segmented linear hashlist. The head of
105%%% the file with the split indicator, size etc is held in ram by the
106%%% server at all times.
107%%%
108
109%%  The method of hashing is the so called linear hashing algorithm
110%%  with segments.
111%%
112%%  Linear hashing:
113%%
114%%         - n indicates next bucket to split (initially zero);
115%%         - m is the size of the hash table
116%%         - initially next = m and n = 0
117%%
118%%         - to insert:
119%%                - hash = key mod m
120%%                - if hash < n then hash = key mod 2m
121%%                - when the number of objects exceeds the initial size
122%%                  of the hash table, each insertion of an object
123%%                  causes bucket n to be split:
124%%                      - add a new bucket to the end of the table
125%%                      - redistribute the contents of bucket n
126%%                        using hash = key mod 2m
127%%                      - increment n
128%%                      - if n = m then m = 2m, n = 0
129%%         - to search:
130%%                hash = key mod m
131%%                if hash < n then hash = key mod 2m
132%%                do linear scan of the bucket
133%%
134
135%%% If a file error occurs on a working dets file, update_mode is set
136%%% to the error tuple. When in 'error' mode, the free lists are not
137%%% written, and a repair is forced next time the file is opened.
138
139-record(dets_cont, {
140          what :: 'undefined' | 'bchunk' | 'bindings' | 'object' | 'select',
141          no_objs :: 'default' | pos_integer(), % requested number of objects
142          bin :: 'eof' | binary(), % small chunk not consumed,
143                                  % or 'eof' at end-of-file
144          alloc :: binary() % the part of the file not yet scanned
145                 | {From :: non_neg_integer(),
146                    To :: non_neg_integer,
147                    binary()},
148          tab :: tab_name(),
149          proc :: 'undefined' | pid(), % the pid of the Dets process
150          match_program :: 'true'
151                         | 'undefined'
152                         | {'match_spec', ets:comp_match_spec()}
153	 }).
154
155-record(open_args, {
156          file :: list(),
157          type :: type(),
158          keypos :: keypos(),
159          repair :: 'force' | boolean(),
160          min_no_slots :: no_slots(),
161	  max_no_slots :: no_slots(),
162          ram_file :: boolean(),
163          delayed_write :: cache_parms(),
164          auto_save :: auto_save(),
165          access :: access(),
166          debug :: boolean()
167         }).
168
169-define(PATTERN_TO_OBJECT_MATCH_SPEC(Pat), [{Pat,[],['$_']}]).
170-define(PATTERN_TO_BINDINGS_MATCH_SPEC(Pat), [{Pat,[],['$$']}]).
171-define(PATTERN_TO_TRUE_MATCH_SPEC(Pat), [{Pat,[],[true]}]).
172
173%%-define(DEBUGM(X, Y), io:format(X, Y)).
174-define(DEBUGM(X, Y), true).
175
176%%-define(DEBUGF(X,Y), io:format(X, Y)).
177-define(DEBUGF(X,Y), void).
178
179%%-define(PROFILE(C), C).
180-define(PROFILE(C), void).
181
182-opaque bindings_cont() :: #dets_cont{}.
183-opaque cont()    :: #dets_cont{}.
184-type match_spec()  :: ets:match_spec().
185-type object()    :: tuple().
186-opaque object_cont() :: #dets_cont{}.
187-type pattern()   :: atom() | tuple().
188-opaque select_cont() :: #dets_cont{}.
189
190%%% Some further debug code was added in R12B-1 (stdlib-1.15.1):
191%%% - there is a new open_file() option 'debug';
192%%% - there is a new OS environment variable 'DETS_DEBUG';
193%%% - verbose(true) implies that info messages are written onto
194%%%   the error log whenever an unsafe traversal is started.
195%%% The 'debug' mode (set by the open_file() option 'debug' or
196%%% by os:putenv("DETS_DEBUG", "true")) implies that the results of
197%%% calling pwrite() and pread() are tested to some extent. It also
198%%% means a considerable overhead when it comes to RAM usage. The
199%%% operation of Dets is also slowed down a bit. Note that in debug
200%%% mode terms will be output on the error logger.
201
202%%%----------------------------------------------------------------------
203%%% API
204%%%----------------------------------------------------------------------
205
206add_user(Pid, Tab, Args) ->
207    req(Pid, {add_user, Tab, Args}).
208
209-spec all() -> [tab_name()].
210
211all() ->
212    dets_server:all().
213
214-spec bchunk(Name, Continuation) ->
215    {Continuation2, Data} | '$end_of_table' | {'error', Reason} when
216      Name :: tab_name(),
217      Continuation :: 'start' | cont(),
218      Continuation2 :: cont(),
219      Data :: binary() | tuple(),
220      Reason :: term().
221
222bchunk(Tab, start) ->
223    badarg(treq(Tab, {bchunk_init, Tab}), [Tab, start]);
224bchunk(Tab, #dets_cont{what = bchunk, tab = Tab} = State) ->
225    badarg(treq(Tab, {bchunk, State}), [Tab, State]);
226bchunk(Tab, Term) ->
227    erlang:error(badarg, [Tab, Term]).
228
229-spec close(Name) -> 'ok' | {'error', Reason} when
230      Name :: tab_name(),
231      Reason :: term().
232
233close(Tab) ->
234    case dets_server:close(Tab) of
235        badarg -> % Should not happen.
236	    {error, not_owner}; % Backwards compatibility...
237        Reply ->
238            Reply
239    end.
240
241-spec delete(Name, Key) -> 'ok' | {'error', Reason} when
242      Name :: tab_name(),
243      Key :: term(),
244      Reason :: term().
245
246delete(Tab, Key) ->
247    badarg(treq(Tab, {delete_key, [Key]}), [Tab, Key]).
248
249-spec delete_all_objects(Name) -> 'ok' | {'error', Reason} when
250      Name :: tab_name(),
251      Reason :: term().
252
253delete_all_objects(Tab) ->
254    case treq(Tab, delete_all_objects) of
255	badarg ->
256	    erlang:error(badarg, [Tab]);
257	fixed ->
258	    match_delete(Tab, '_');
259	Reply ->
260	    Reply
261    end.
262
263-spec delete_object(Name, Object) -> 'ok' | {'error', Reason} when
264      Name :: tab_name(),
265      Object :: object(),
266      Reason :: term().
267
268delete_object(Tab, O) ->
269    badarg(treq(Tab, {delete_object, [O]}), [Tab, O]).
270
271%% Backwards compatibility.
272fsck(Fname, _Version) ->
273    fsck(Fname).
274
275%% Given a filename, fsck it. Debug.
276fsck(Fname) ->
277    catch begin
278      {ok, Fd, FH} = read_file_header(Fname, read, false),
279      ?DEBUGF("FileHeader: ~p~n", [FH]),
280      case dets_v9:check_file_header(FH, Fd) of
281          {error, not_closed} ->
282              fsck(Fd, make_ref(), Fname, FH, default, default);
283          {ok, _Head} ->
284              fsck(Fd, make_ref(), Fname, FH, default, default);
285          Error ->
286              Error
287      end
288    end.
289
290-spec first(Name) -> Key | '$end_of_table' when
291      Name :: tab_name(),
292      Key :: term().
293
294first(Tab) ->
295    badarg_exit(treq(Tab, first), [Tab]).
296
297-spec foldr(Function, Acc0, Name) -> Acc | {'error', Reason} when
298      Name :: tab_name(),
299      Function :: fun((Object :: object(), AccIn) -> AccOut),
300      Acc0 :: term(),
301      Acc :: term(),
302      AccIn :: term(),
303      AccOut :: term(),
304      Reason :: term().
305
306foldr(Fun, Acc, Tab) ->
307    foldl(Fun, Acc, Tab).
308
309-spec foldl(Function, Acc0, Name) -> Acc | {'error', Reason} when
310      Name :: tab_name(),
311      Function :: fun((Object :: object(), AccIn) -> AccOut),
312      Acc0 :: term(),
313      Acc :: term(),
314      AccIn :: term(),
315      AccOut :: term(),
316      Reason :: term().
317
318foldl(Fun, Acc, Tab) ->
319    Ref = make_ref(),
320    badarg(do_traverse(Fun, Acc, Tab, Ref), [Fun, Acc, Tab]).
321
322-spec from_ets(Name, EtsTab) -> 'ok' | {'error', Reason} when
323      Name :: tab_name(),
324      EtsTab :: ets:tab(),
325      Reason :: term().
326
327from_ets(DTab, ETab) ->
328    ets:safe_fixtable(ETab, true),
329    Spec = ?PATTERN_TO_OBJECT_MATCH_SPEC('_'),
330    LC = ets:select(ETab, Spec, 100),
331    InitFun = from_ets_fun(LC, ETab),
332    Reply = treq(DTab, {initialize, InitFun, term, default}),
333    ets:safe_fixtable(ETab, false),
334    case Reply of
335        {thrown, Thrown} -> throw(Thrown);
336        Else -> badarg(Else, [DTab, ETab])
337    end.
338
339from_ets_fun(LC, ETab) ->
340    fun(close) ->
341            ok;
342       (read) when LC =:= '$end_of_table' ->
343            end_of_input;
344       (read) ->
345            {L, C} = LC,
346            {L, from_ets_fun(ets:select(C), ETab)}
347    end.
348
349-spec info(Name) -> InfoList | 'undefined' when
350      Name :: tab_name(),
351      InfoList :: [InfoTuple],
352      InfoTuple :: {'file_size', non_neg_integer()}
353                 | {'filename', file:name()}
354                 | {'keypos', keypos()}
355                 | {'size', non_neg_integer()}
356                 | {'type', type()}.
357
358info(Tab) ->
359    case catch dets_server:get_pid(Tab) of
360	{'EXIT', _Reason} ->
361	    undefined;
362	Pid ->
363	    undefined(req(Pid, info))
364    end.
365
366-spec info(Name, Item) -> Value | 'undefined' when
367      Name :: tab_name(),
368      Item :: 'access' | 'auto_save' | 'bchunk_format'
369            | 'hash' | 'file_size' | 'filename' | 'keypos' | 'memory'
370            | 'no_keys' | 'no_objects' | 'no_slots' | 'owner' | 'ram_file'
371            | 'safe_fixed' | 'safe_fixed_monotonic_time' | 'size' | 'type',
372      Value :: term().
373
374info(Tab, owner) ->
375    case catch dets_server:get_pid(Tab) of
376	Pid when is_pid(Pid) ->
377	    Pid;
378	_ ->
379	    undefined
380    end;
381info(Tab, users) -> % undocumented
382    case dets_server:users(Tab) of
383	[] ->
384	    undefined;
385	Users ->
386	    Users
387    end;
388info(Tab, Tag) ->
389    case catch dets_server:get_pid(Tab) of
390	{'EXIT', _Reason} ->
391	    undefined;
392	Pid ->
393	    undefined(req(Pid, {info, Tag}))
394    end.
395
396-spec init_table(Name, InitFun) -> ok | {'error', Reason} when
397      Name :: tab_name(),
398      InitFun :: fun((Arg) -> Res),
399      Arg :: read | close,
400      Res :: end_of_input | {[object()], InitFun} | {Data, InitFun} | term(),
401      Reason :: term(),
402      Data :: binary() | tuple().
403
404init_table(Tab, InitFun) ->
405    init_table(Tab, InitFun, []).
406
407-spec init_table(Name, InitFun, Options) -> ok | {'error', Reason} when
408      Name :: tab_name(),
409      InitFun :: fun((Arg) -> Res),
410      Arg :: read | close,
411      Res :: end_of_input | {[object()], InitFun} | {Data, InitFun} | term(),
412      Options :: Option | [Option],
413      Option :: {min_no_slots,no_slots()} | {format,term | bchunk},
414      Reason :: term(),
415      Data :: binary() | tuple().
416
417init_table(Tab, InitFun, Options) when is_function(InitFun) ->
418    case options(Options, [format, min_no_slots]) of
419	{badarg,_} ->
420	    erlang:error(badarg, [Tab, InitFun, Options]);
421	[Format, MinNoSlots] ->
422	    case treq(Tab, {initialize, InitFun, Format, MinNoSlots}) of
423		{thrown, Thrown} -> throw(Thrown);
424		Else -> badarg(Else, [Tab, InitFun, Options])
425	    end
426    end;
427init_table(Tab, InitFun, Options) ->
428    erlang:error(badarg, [Tab, InitFun, Options]).
429
430-spec insert(Name, Objects) -> 'ok' | {'error', Reason} when
431      Name :: tab_name(),
432      Objects :: object() | [object()],
433      Reason :: term().
434
435insert(Tab, Objs) when is_list(Objs) ->
436    badarg(treq(Tab, {insert, Objs}), [Tab, Objs]);
437insert(Tab, Obj) ->
438    badarg(treq(Tab, {insert, [Obj]}), [Tab, Obj]).
439
440-spec insert_new(Name, Objects) -> boolean() | {'error', Reason} when
441      Name :: tab_name(),
442      Objects :: object() | [object()],
443      Reason :: term().
444
445insert_new(Tab, Objs) when is_list(Objs) ->
446    badarg(treq(Tab, {insert_new, Objs}), [Tab, Objs]);
447insert_new(Tab, Obj) ->
448    badarg(treq(Tab, {insert_new, [Obj]}), [Tab, Obj]).
449
450internal_close(Pid) ->
451    req(Pid, close).
452
453internal_open(Pid, Ref, Args) ->
454    req(Pid, {internal_open, Ref, Args}).
455
456-spec is_compatible_bchunk_format(Name, BchunkFormat) -> boolean() when
457      Name :: tab_name(),
458      BchunkFormat :: binary().
459
460is_compatible_bchunk_format(Tab, Term) ->
461    badarg(treq(Tab, {is_compatible_bchunk_format, Term}), [Tab, Term]).
462
463-spec is_dets_file(Filename) -> boolean() | {'error', Reason} when
464      Filename :: file:name(),
465      Reason :: term().
466
467is_dets_file(FileName) ->
468    case catch read_file_header(FileName, read, false) of
469	{ok, Fd, FH} ->
470	    _ = file:close(Fd),
471	    FH#fileheader.cookie =:= ?MAGIC;
472	{error, {tooshort, _}} ->
473	    false;
474	{error, {not_a_dets_file, _}} ->
475	    false;
476	Other ->
477	    Other
478    end.
479
480-spec lookup(Name, Key) -> Objects | {'error', Reason} when
481      Name :: tab_name(),
482      Key :: term(),
483      Objects :: [object()],
484      Reason :: term().
485
486lookup(Tab, Key) ->
487    badarg(treq(Tab, {lookup_keys, [Key]}), [Tab, Key]).
488
489%% Not public.
490lookup_keys(Tab, Keys) ->
491    case catch lists:usort(Keys) of
492	UKeys when is_list(UKeys), UKeys =/= [] ->
493	    badarg(treq(Tab, {lookup_keys, UKeys}), [Tab, Keys]);
494	_Else ->
495	    erlang:error(badarg, [Tab, Keys])
496    end.
497
498-spec match(Name, Pattern) -> [Match] | {'error', Reason} when
499      Name :: tab_name(),
500      Pattern :: pattern(),
501      Match :: [term()],
502      Reason :: term().
503
504match(Tab, Pat) ->
505    badarg(safe_match(Tab, Pat, bindings), [Tab, Pat]).
506
507-spec match(Name, Pattern, N) ->
508          {[Match], Continuation} | '$end_of_table' | {'error', Reason} when
509      Name :: tab_name(),
510      Pattern :: pattern(),
511      N :: 'default' | non_neg_integer(),
512      Continuation :: bindings_cont(),
513      Match :: [term()],
514      Reason :: term().
515
516match(Tab, Pat, N) ->
517    badarg(init_chunk_match(Tab, Pat, bindings, N, no_safe), [Tab, Pat, N]).
518
519-spec match(Continuation) ->
520          {[Match], Continuation2} | '$end_of_table' | {'error', Reason} when
521      Continuation :: bindings_cont(),
522      Continuation2 :: bindings_cont(),
523      Match :: [term()],
524      Reason :: term().
525
526match(State) when State#dets_cont.what =:= bindings ->
527    badarg(chunk_match(State, no_safe), [State]);
528match(Term) ->
529    erlang:error(badarg, [Term]).
530
531-spec match_delete(Name, Pattern) -> 'ok' | {'error', Reason} when
532      Name :: tab_name(),
533      Pattern :: pattern(),
534      Reason :: term().
535
536match_delete(Tab, Pat) ->
537    badarg(match_delete(Tab, Pat, delete), [Tab, Pat]).
538
539match_delete(Tab, Pat, What) ->
540    case compile_match_spec(What, Pat) of
541	{Spec, MP} ->
542            case catch dets_server:get_pid(Tab) of
543                {'EXIT', _Reason} ->
544                    badarg;
545                Proc ->
546                    R = req(Proc, {match_delete_init, MP, Spec}),
547                    do_match_delete(Proc, R, What, 0)
548            end;
549	badarg ->
550	    badarg
551    end.
552
553do_match_delete(_Proc, {done, N1}, select, N) ->
554    N + N1;
555do_match_delete(_Proc, {done, _N1}, _What, _N) ->
556    ok;
557do_match_delete(Proc, {cont, State, N1}, What, N) ->
558    do_match_delete(Proc, req(Proc, {match_delete, State}), What, N+N1);
559do_match_delete(_Proc, Error, _What, _N) ->
560    Error.
561
562-spec match_object(Name, Pattern) -> Objects | {'error', Reason} when
563      Name :: tab_name(),
564      Pattern :: pattern(),
565      Objects :: [object()],
566      Reason :: term().
567
568match_object(Tab, Pat) ->
569    badarg(safe_match(Tab, Pat, object), [Tab, Pat]).
570
571-spec match_object(Name, Pattern, N) ->
572           {Objects, Continuation} | '$end_of_table' | {'error', Reason} when
573      Name :: tab_name(),
574      Pattern :: pattern(),
575      N :: 'default' | non_neg_integer(),
576      Continuation :: object_cont(),
577      Objects :: [object()],
578      Reason :: term().
579
580match_object(Tab, Pat, N) ->
581    badarg(init_chunk_match(Tab, Pat, object, N, no_safe), [Tab, Pat, N]).
582
583-spec match_object(Continuation) ->
584           {Objects, Continuation2} | '$end_of_table' | {'error', Reason} when
585      Continuation :: object_cont(),
586      Continuation2 :: object_cont(),
587      Objects :: [object()],
588      Reason :: term().
589
590match_object(State) when State#dets_cont.what =:= object ->
591    badarg(chunk_match(State, no_safe), [State]);
592match_object(Term) ->
593    erlang:error(badarg, [Term]).
594
595-spec member(Name, Key) -> boolean() | {'error', Reason} when
596      Name :: tab_name(),
597      Key :: term(),
598      Reason :: term().
599
600member(Tab, Key) ->
601    badarg(treq(Tab, {member, Key}), [Tab, Key]).
602
603-spec next(Name, Key1) -> Key2 | '$end_of_table' when
604      Name :: tab_name(),
605      Key1 :: term(),
606      Key2 :: term().
607
608next(Tab, Key) ->
609    badarg_exit(treq(Tab, {next, Key}), [Tab, Key]).
610
611-spec open_file(Filename) -> {'ok', Reference} | {'error', Reason} when
612      Filename :: file:name(),
613      Reference :: reference(),
614      Reason :: term().
615
616%% Assuming that a file already exists, open it with the
617%% parameters as already specified in the file itself.
618%% Return a ref leading to the file.
619open_file(File0) ->
620    File = to_list(File0),
621    case is_list(File) of
622        true ->
623            case dets_server:open_file(File) of
624                badarg -> % Should not happen.
625                    erlang:error(dets_process_died, [File]);
626                Reply ->
627                    einval(Reply, [File])
628            end;
629        false ->
630	    erlang:error(badarg, [File0])
631    end.
632
633-spec open_file(Name, Args) -> {'ok', Name} | {'error', Reason} when
634      Name :: tab_name(),
635      Args :: [OpenArg],
636      OpenArg  :: {'access', access()}
637                | {'auto_save', auto_save()}
638                | {'estimated_no_objects', non_neg_integer()}
639                | {'file', file:name()}
640                | {'max_no_slots', no_slots()}
641                | {'min_no_slots', no_slots()}
642                | {'keypos', keypos()}
643                | {'ram_file', boolean()}
644                | {'repair', boolean() | 'force'}
645                | {'type', type()},
646      Reason :: term().
647
648open_file(Tab, Args) when is_list(Args) ->
649    case catch defaults(Tab, Args) of
650        OpenArgs when is_record(OpenArgs, open_args) ->
651            case dets_server:open_file(Tab, OpenArgs) of
652                badarg -> % Should not happen.
653                    erlang:error(dets_process_died, [Tab, Args]);
654                Reply ->
655                    einval(Reply, [Tab, Args])
656            end;
657	_ ->
658	    erlang:error(badarg, [Tab, Args])
659    end;
660open_file(Tab, Arg) ->
661    open_file(Tab, [Arg]).
662
663-spec pid2name(Pid) -> {'ok', Name} | 'undefined' when
664      Pid :: pid(),
665      Name :: tab_name().
666
667pid2name(Pid) ->
668    dets_server:pid2name(Pid).
669
670remove_user(Pid, From) ->
671    req(Pid, {close, From}).
672
673-spec repair_continuation(Continuation, MatchSpec) -> Continuation2 when
674      Continuation :: select_cont(),
675      Continuation2 :: select_cont(),
676      MatchSpec :: match_spec().
677
678repair_continuation(#dets_cont{match_program = {match_spec, B}}=Cont, MS) ->
679    case ets:is_compiled_ms(B) of
680	true ->
681	    Cont;
682	false ->
683            Cont#dets_cont{match_program = {match_spec,
684                                            ets:match_spec_compile(MS)}}
685    end;
686repair_continuation(#dets_cont{}=Cont, _MS) ->
687    Cont;
688repair_continuation(T, MS) ->
689    erlang:error(badarg, [T, MS]).
690
691-spec safe_fixtable(Name, Fix) -> 'ok' when
692      Name :: tab_name(),
693      Fix :: boolean().
694
695safe_fixtable(Tab, Bool) when Bool; not Bool ->
696    badarg(treq(Tab, {safe_fixtable, Bool}), [Tab, Bool]);
697safe_fixtable(Tab, Term) ->
698    erlang:error(badarg, [Tab, Term]).
699
700-spec select(Name, MatchSpec) -> Selection | {'error', Reason} when
701      Name :: tab_name(),
702      MatchSpec :: match_spec(),
703      Selection :: [term()],
704      Reason :: term().
705
706select(Tab, Pat) ->
707    badarg(safe_match(Tab, Pat, select), [Tab, Pat]).
708
709-spec select(Name, MatchSpec, N) ->
710          {Selection, Continuation} | '$end_of_table' | {'error', Reason} when
711      Name :: tab_name(),
712      MatchSpec :: match_spec(),
713      N :: 'default' | non_neg_integer(),
714      Continuation :: select_cont(),
715      Selection :: [term()],
716      Reason :: term().
717
718select(Tab, Pat, N) ->
719    badarg(init_chunk_match(Tab, Pat, select, N, no_safe), [Tab, Pat, N]).
720
721-spec select(Continuation) ->
722          {Selection, Continuation2} | '$end_of_table' | {'error', Reason} when
723      Continuation :: select_cont(),
724      Continuation2 :: select_cont(),
725      Selection :: [term()],
726      Reason :: term().
727
728select(State) when State#dets_cont.what =:= select ->
729    badarg(chunk_match(State, no_safe), [State]);
730select(Term) ->
731    erlang:error(badarg, [Term]).
732
733-spec select_delete(Name, MatchSpec) -> N | {'error', Reason} when
734      Name :: tab_name(),
735      MatchSpec :: match_spec(),
736      N :: non_neg_integer(),
737      Reason :: term().
738
739select_delete(Tab, Pat) ->
740    badarg(match_delete(Tab, Pat, select), [Tab, Pat]).
741
742-spec slot(Name, I) -> '$end_of_table' | Objects | {'error', Reason} when
743      Name :: tab_name(),
744      I :: non_neg_integer(),
745      Objects :: [object()],
746      Reason :: term().
747
748slot(Tab, Slot) when is_integer(Slot), Slot >= 0 ->
749    badarg(treq(Tab, {slot, Slot}), [Tab, Slot]);
750slot(Tab, Term) ->
751    erlang:error(badarg, [Tab, Term]).
752
753start() ->
754    dets_server:start().
755
756stop() ->
757    dets_server:stop().
758
759istart_link(Server) ->
760    {ok, proc_lib:spawn_link(dets, init, [self(), Server])}.
761
762-spec sync(Name) -> 'ok' | {'error', Reason} when
763      Name :: tab_name(),
764      Reason :: term().
765
766sync(Tab) ->
767    badarg(treq(Tab, sync), [Tab]).
768
769-spec table(Name) -> QueryHandle when
770      Name :: tab_name(),
771      QueryHandle :: qlc:query_handle().
772
773table(Tab) ->
774    table(Tab, []).
775
776-spec table(Name, Options) -> QueryHandle when
777      Name :: tab_name(),
778      Options :: Option | [Option],
779      Option :: {'n_objects', Limit}
780              | {'traverse', TraverseMethod},
781      Limit :: 'default' | pos_integer(),
782      TraverseMethod :: 'first_next' | 'select' | {'select', match_spec()},
783      QueryHandle :: qlc:query_handle().
784
785table(Tab, Opts) ->
786    case options(Opts, [traverse, n_objects]) of
787        {badarg,_} ->
788            erlang:error(badarg, [Tab, Opts]);
789        [Traverse, NObjs] ->
790            TF = case Traverse of
791                     first_next ->
792                         fun() -> qlc_next(Tab, first(Tab)) end;
793                     select ->
794                         fun(MS) -> qlc_select(select(Tab, MS, NObjs)) end;
795                     {select, MS} ->
796                         fun() -> qlc_select(select(Tab, MS, NObjs)) end
797                 end,
798            PreFun = fun(_) -> safe_fixtable(Tab, true) end,
799            PostFun = fun() -> safe_fixtable(Tab, false) end,
800            InfoFun = fun(Tag) -> table_info(Tab, Tag) end,
801            %% lookup_keys is not public, but convenient
802            LookupFun =
803                case Traverse of
804                    {select, _MS} ->
805                        undefined;
806                    _ ->
807                        fun(_KeyPos, [K]) -> lookup(Tab, K);
808                           (_KeyPos, Ks) -> lookup_keys(Tab, Ks)
809                        end
810                end,
811            FormatFun =
812                fun({all, _NElements, _ElementFun}) ->
813                        As = [Tab | [Opts || _ <- [[]], Opts =/= []]],
814                        {?MODULE, table, As};
815                   ({match_spec, MS}) ->
816                        {?MODULE, table, [Tab, [{traverse, {select, MS}} |
817                                                listify(Opts)]]};
818                   ({lookup, _KeyPos, [Value], _NElements, ElementFun}) ->
819                        io_lib:format("~w:lookup(~w, ~w)",
820                                      [?MODULE, Tab, ElementFun(Value)]);
821                   ({lookup, _KeyPos, Values, _NElements, ElementFun}) ->
822                        Vals = [ElementFun(V) || V <- Values],
823                        io_lib:format("lists:flatmap(fun(V) -> "
824                                      "~w:lookup(~w, V) end, ~w)",
825                                      [?MODULE, Tab, Vals])
826                end,
827            qlc:table(TF, [{pre_fun, PreFun}, {post_fun, PostFun},
828                           {info_fun, InfoFun}, {format_fun, FormatFun},
829                           {key_equality, '=:='},
830                           {lookup_fun, LookupFun}])
831    end.
832
833qlc_next(_Tab, '$end_of_table') ->
834    [];
835qlc_next(Tab, Key) ->
836    case lookup(Tab, Key) of
837        Objects when is_list(Objects) ->
838            Objects ++ fun() -> qlc_next(Tab, next(Tab, Key)) end;
839        Error ->
840            %% Do what first and next do.
841            exit(Error)
842    end.
843
844qlc_select('$end_of_table') ->
845    [];
846qlc_select({Objects, Cont}) when is_list(Objects) ->
847    Objects ++ fun() -> qlc_select(select(Cont)) end;
848qlc_select(Error) ->
849    Error.
850
851table_info(Tab, num_of_objects) ->
852    info(Tab, size);
853table_info(Tab, keypos) ->
854    info(Tab, keypos);
855table_info(Tab, is_unique_objects) ->
856    info(Tab, type) =/= duplicate_bag;
857table_info(_Tab, _) ->
858    undefined.
859
860%% End of table/2.
861
862-spec to_ets(Name, EtsTab) -> EtsTab | {'error', Reason} when
863      Name :: tab_name(),
864      EtsTab :: ets:tab(),
865      Reason :: term().
866
867to_ets(DTab, ETab) ->
868    case ets:info(ETab, protection) of
869	undefined ->
870	    erlang:error(badarg, [DTab, ETab]);
871        _ ->
872	    Fun = fun(X, T) -> true = ets:insert(T, X), T end,
873	    foldl(Fun, ETab, DTab)
874    end.
875
876-spec traverse(Name, Fun) -> Return | {'error', Reason} when
877      Name :: tab_name(),
878      Fun :: fun((Object) -> FunReturn),
879      Object :: object(),
880      FunReturn :: 'continue'
881                 | {'continue', Val}
882                 | {'done', Value}
883                 | OtherValue,
884      Return :: [term()] | OtherValue,
885      Val :: term(),
886      Value :: term(),
887      OtherValue :: term(),
888      Reason :: term().
889
890traverse(Tab, Fun) ->
891    Ref = make_ref(),
892    TFun =
893	fun(O, Acc) ->
894		case Fun(O) of
895		    continue  ->
896			Acc;
897		    {continue, Val} ->
898			[Val | Acc];
899		    {done, Value} ->
900			throw({Ref, [Value | Acc]});
901		    Other ->
902			throw({Ref, Other})
903		end
904	end,
905    badarg(do_traverse(TFun, [], Tab, Ref), [Tab, Fun]).
906
907-spec update_counter(Name, Key, Increment) -> Result when
908      Name :: tab_name(),
909      Key :: term(),
910      Increment :: {Pos, Incr} | Incr,
911      Pos :: integer(),
912      Incr :: integer(),
913      Result :: integer().
914
915update_counter(Tab, Key, C) ->
916    badarg(treq(Tab, {update_counter, Key, C}), [Tab, Key, C]).
917
918verbose() ->
919    verbose(true).
920
921verbose(What) ->
922    ok = dets_server:verbose(What),
923    All = dets_server:all(),
924    Fun = fun(Tab) -> treq(Tab, {set_verbose, What}) end,
925    lists:foreach(Fun, All),
926    All.
927
928%% Where in the (open) table is Object located?
929%% The address of the first matching object is returned.
930%% Format 9 returns the address of the object collection.
931%% -> {ok, Address} | false
932where(Tab, Object) ->
933    badarg(treq(Tab, {where, Object}), [Tab, Object]).
934
935do_traverse(Fun, Acc, Tab, Ref) ->
936    case catch dets_server:get_pid(Tab) of
937        {'EXIT', _Reason} ->
938            badarg;
939        Proc ->
940            try
941                do_trav(Proc, Acc, Fun)
942            catch {Ref, Result} ->
943                Result
944            end
945    end.
946
947do_trav(Proc, Acc, Fun) ->
948    {Spec, MP} = compile_match_spec(object, '_'),
949    %% MP not used
950    case req(Proc, {match, MP, Spec, default, safe}) of
951	{cont, State} ->
952	    do_trav(State, Proc, Acc, Fun);
953	Error ->
954	    Error
955    end.
956
957do_trav(State, Proc, Acc, Fun) ->
958    case req(Proc, {match_init, State, safe}) of
959        '$end_of_table'->
960            Acc;
961	{cont, {Bins, NewState}} ->
962	    do_trav_bins(NewState, Proc, Acc, Fun, lists:reverse(Bins));
963	Error ->
964	    Error
965    end.
966
967do_trav_bins(State, Proc, Acc, Fun, []) ->
968    do_trav(State, Proc, Acc, Fun);
969do_trav_bins(State, Proc, Acc, Fun, [Bin | Bins]) ->
970    %% Unpack one binary at a time, using the client's heap.
971    case catch binary_to_term(Bin) of
972	{'EXIT', _} ->
973	    req(Proc, {corrupt, dets_utils:bad_object(do_trav_bins, Bin)});
974	Term ->
975	    NewAcc = Fun(Term, Acc),
976	    do_trav_bins(State, Proc, NewAcc, Fun, Bins)
977    end.
978
979safe_match(Tab, Pat, What) ->
980    do_safe_match(init_chunk_match(Tab, Pat, What, default, safe), []).
981
982do_safe_match({error, Error}, _L) ->
983    {error, Error};
984do_safe_match({L, C}, LL) ->
985    do_safe_match(chunk_match(C, safe), L++LL);
986do_safe_match('$end_of_table', L) ->
987    L;
988do_safe_match(badarg, _L) ->
989    badarg.
990
991%% What = object | bindings | select
992init_chunk_match(Tab, Pat, What, N, Safe) when is_integer(N), N >= 0;
993                                               N =:= default ->
994    case compile_match_spec(What, Pat) of
995	{Spec, MP} ->
996            case catch dets_server:get_pid(Tab) of
997                {'EXIT', _Reason} ->
998                    badarg;
999                Proc ->
1000                    case req(Proc, {match, MP, Spec, N, Safe}) of
1001                        {done, L} ->
1002                            {L, #dets_cont{tab = Tab, proc = Proc,
1003                                           what = What, bin = eof,
1004                                           no_objs = default,
1005                                           alloc = <<>>}};
1006                        {cont, State} ->
1007                            chunk_match(State#dets_cont{what = What,
1008                                                        tab = Tab,
1009                                                        proc = Proc},
1010                                       Safe);
1011                        Error ->
1012                            Error
1013                    end
1014	    end;
1015	badarg ->
1016	    badarg
1017    end;
1018init_chunk_match(_Tab, _Pat, _What, _N, _Safe) ->
1019    badarg.
1020
1021chunk_match(#dets_cont{proc = Proc}=State, Safe) ->
1022    case req(Proc, {match_init, State, Safe}) of
1023        '$end_of_table'=Reply ->
1024            Reply;
1025        {cont, {Bins, NewState}} ->
1026            MP = NewState#dets_cont.match_program,
1027            case catch do_foldl_bins(Bins, MP) of
1028                {'EXIT', _} ->
1029                    case ets:is_compiled_ms(MP) of
1030                        true ->
1031                            Bad = dets_utils:bad_object(chunk_match, Bins),
1032                            req(Proc, {corrupt, Bad});
1033                        false ->
1034                            badarg
1035                    end;
1036                [] ->
1037                    chunk_match(NewState, Safe);
1038                Terms ->
1039                    {Terms, NewState}
1040            end;
1041        Error ->
1042            Error
1043    end.
1044
1045do_foldl_bins(Bins, true) ->
1046    foldl_bins(Bins, []);
1047do_foldl_bins(Bins, {match_spec, MP}) ->
1048    foldl_bins(Bins, MP, []).
1049
1050foldl_bins([], Terms) ->
1051    %% Preserve time order.
1052    Terms;
1053foldl_bins([Bin | Bins], Terms) ->
1054    foldl_bins(Bins, [binary_to_term(Bin) | Terms]).
1055
1056foldl_bins([], _MP, Terms) ->
1057    %% Preserve time order.
1058    Terms;
1059foldl_bins([Bin | Bins], MP, Terms) ->
1060    Term = binary_to_term(Bin),
1061    case ets:match_spec_run([Term], MP) of
1062	[] ->
1063	    foldl_bins(Bins, MP, Terms);
1064	[Result] ->
1065	    foldl_bins(Bins, MP, [Result | Terms])
1066    end.
1067
1068%% -> {Spec, binary()} | badarg
1069compile_match_spec(select, ?PATTERN_TO_OBJECT_MATCH_SPEC('_') = Spec) ->
1070    {Spec, true};
1071compile_match_spec(select, Spec) ->
1072    try {Spec, {match_spec, ets:match_spec_compile(Spec)}}
1073    catch error:_ -> badarg
1074    end;
1075compile_match_spec(object, Pat) ->
1076    compile_match_spec(select, ?PATTERN_TO_OBJECT_MATCH_SPEC(Pat));
1077compile_match_spec(bindings, Pat) ->
1078    compile_match_spec(select, ?PATTERN_TO_BINDINGS_MATCH_SPEC(Pat));
1079compile_match_spec(delete, Pat) ->
1080    compile_match_spec(select, ?PATTERN_TO_TRUE_MATCH_SPEC(Pat)).
1081
1082%% Process the args list as provided to open_file/2.
1083defaults(Tab, Args) ->
1084    Defaults0 = #open_args{file = to_list(Tab),
1085                           type = set,
1086                           keypos = 1,
1087                           repair = true,
1088                           min_no_slots = default,
1089                           max_no_slots = default,
1090                           ram_file = false,
1091                           delayed_write = ?DEFAULT_CACHE,
1092                           auto_save = timer:minutes(?DEFAULT_AUTOSAVE),
1093                           access = read_write,
1094                           debug = false},
1095    Fun = fun repl/2,
1096    Defaults = lists:foldl(Fun, Defaults0, Args),
1097    true = is_list(Defaults#open_args.file),
1098    is_comp_min_max(Defaults).
1099
1100to_list(T) when is_atom(T) -> atom_to_list(T);
1101to_list(T) -> T.
1102
1103repl({access, A}, Defs) ->
1104    mem(A, [read, read_write]),
1105    Defs#open_args{access = A};
1106repl({auto_save, Int}, Defs) when is_integer(Int), Int >= 0 ->
1107    Defs#open_args{auto_save = Int};
1108repl({auto_save, infinity}, Defs) ->
1109    Defs#open_args{auto_save =infinity};
1110repl({cache_size, Int}, Defs) when is_integer(Int), Int >= 0 ->
1111    %% Recognized, but ignored.
1112    Defs;
1113repl({cache_size, infinity}, Defs) ->
1114    Defs;
1115repl({delayed_write, default}, Defs) ->
1116    Defs#open_args{delayed_write = ?DEFAULT_CACHE};
1117repl({delayed_write, {Delay,Size} = C}, Defs)
1118          when is_integer(Delay), Delay >= 0, is_integer(Size), Size >= 0 ->
1119    Defs#open_args{delayed_write = C};
1120repl({estimated_no_objects, I}, Defs)  ->
1121    repl({min_no_slots, I}, Defs);
1122repl({file, File}, Defs) ->
1123    Defs#open_args{file = to_list(File)};
1124repl({keypos, P}, Defs) when is_integer(P), P > 0 ->
1125    Defs#open_args{keypos =P};
1126repl({max_no_slots, I}, Defs)  ->
1127    MaxSlots = is_max_no_slots(I),
1128    Defs#open_args{max_no_slots = MaxSlots};
1129repl({min_no_slots, I}, Defs)  ->
1130    MinSlots = is_min_no_slots(I),
1131    Defs#open_args{min_no_slots = MinSlots};
1132repl({ram_file, Bool}, Defs) ->
1133    mem(Bool, [true, false]),
1134    Defs#open_args{ram_file = Bool};
1135repl({repair, T}, Defs) ->
1136    mem(T, [true, false, force]),
1137    Defs#open_args{repair = T};
1138repl({type, T}, Defs) ->
1139    mem(T, [set, bag, duplicate_bag]),
1140    Defs#open_args{type =T};
1141repl({version, Version}, Defs) ->
1142    %% Backwards compatibility.
1143    is_version(Version),
1144    Defs;
1145repl({debug, Bool}, Defs) ->
1146    %% Not documented.
1147    mem(Bool, [true, false]),
1148    Defs#open_args{debug = Bool};
1149repl({_, _}, _) ->
1150    exit(badarg).
1151
1152is_min_no_slots(default) -> default;
1153is_min_no_slots(I) when is_integer(I), I >= ?DEFAULT_MIN_NO_SLOTS -> I;
1154is_min_no_slots(I) when is_integer(I), I >= 0 -> ?DEFAULT_MIN_NO_SLOTS.
1155
1156is_max_no_slots(default) -> default;
1157is_max_no_slots(I) when is_integer(I), I > 0, I < 1 bsl 31 -> I.
1158
1159is_comp_min_max(Defs) ->
1160    #open_args{max_no_slots = Max, min_no_slots = Min} = Defs,
1161    if
1162        Min =:= default -> Defs;
1163	Max =:= default -> Defs;
1164	true -> true = Min =< Max, Defs
1165    end.
1166
1167is_version(default) -> true;
1168is_version(9) -> true.
1169
1170mem(X, L) ->
1171    case lists:member(X, L) of
1172	true -> true;
1173	false -> exit(badarg)
1174    end.
1175
1176options(Options, Keys) when is_list(Options) ->
1177    options(Options, Keys, []);
1178options(Option, Keys) ->
1179    options([Option], Keys, []).
1180
1181options(Options, [Key | Keys], L) when is_list(Options) ->
1182    V = case lists:keysearch(Key, 1, Options) of
1183	    {value, {format, Format}} when Format =:= term;
1184                                           Format =:= bchunk ->
1185		{ok, Format};
1186	    {value, {min_no_slots, I}} ->
1187		case catch is_min_no_slots(I) of
1188		    {'EXIT', _} -> badarg;
1189		    MinNoSlots -> {ok, MinNoSlots}
1190		end;
1191            {value, {n_objects, default}} ->
1192                {ok, default_option(Key)};
1193            {value, {n_objects, NObjs}} when is_integer(NObjs),
1194                                             NObjs >= 1 ->
1195                {ok, NObjs};
1196            {value, {traverse, select}} ->
1197                {ok, select};
1198            {value, {traverse, {select, MS}}} ->
1199                {ok, {select, MS}};
1200            {value, {traverse, first_next}} ->
1201                {ok, first_next};
1202	    {value, {Key, _}} ->
1203		badarg;
1204	    false ->
1205		Default = default_option(Key),
1206		{ok, Default}
1207	end,
1208    case V of
1209	badarg ->
1210	    {badarg, Key};
1211	{ok, Value} ->
1212	    NewOptions = lists:keydelete(Key, 1, Options),
1213	    options(NewOptions, Keys, [Value | L])
1214    end;
1215options([], [], L) ->
1216    lists:reverse(L);
1217options(Options, _, _L) ->
1218    {badarg,Options}.
1219
1220default_option(format) -> term;
1221default_option(min_no_slots) -> default;
1222default_option(traverse) -> select;
1223default_option(n_objects) -> default.
1224
1225listify(L) when is_list(L) ->
1226    L;
1227listify(T) ->
1228    [T].
1229
1230treq(Tab, R) ->
1231    case catch dets_server:get_pid(Tab) of
1232	Pid when is_pid(Pid) ->
1233	    req(Pid, R);
1234	_ ->
1235	    badarg
1236    end.
1237
1238req(Proc, R) ->
1239    Ref = erlang:monitor(process, Proc),
1240    Proc ! ?DETS_CALL(self(), R),
1241    receive
1242	{'DOWN', Ref, process, Proc, _Info} ->
1243            badarg;
1244	{Proc, Reply} ->
1245	    erlang:demonitor(Ref, [flush]),
1246	    Reply
1247    end.
1248
1249%% Inlined.
1250einval({error, {file_error, _, einval}}, A) ->
1251    erlang:error(badarg, A);
1252einval({error, {file_error, _, badarg}}, A) ->
1253    erlang:error(badarg, A);
1254einval(Reply, _A) ->
1255    Reply.
1256
1257%% Inlined.
1258badarg(badarg, A) ->
1259    erlang:error(badarg, A);
1260badarg(Reply, _A) ->
1261    Reply.
1262
1263%% Inlined.
1264undefined(badarg) ->
1265    undefined;
1266undefined(Reply) ->
1267    Reply.
1268
1269%% Inlined.
1270badarg_exit(badarg, A) ->
1271    erlang:error(badarg, A);
1272badarg_exit({ok, Reply}, _A) ->
1273    Reply;
1274badarg_exit(Reply, _A) ->
1275    exit(Reply).
1276
1277%%%-----------------------------------------------------------------
1278%%% Server functions
1279%%%-----------------------------------------------------------------
1280
1281init(Parent, Server) ->
1282    process_flag(trap_exit, true),
1283    %% The Dets server pretends the file is open before
1284    %% internal_open() has been called, which means that unless the
1285    %% internal_open message is applied first, other processes can
1286    %% find the pid by calling dets_server:get_pid() and do things
1287    %% before Head has been initialized properly.
1288    receive
1289        ?DETS_CALL(From, {internal_open, Ref, Args}=Op) ->
1290            try do_internal_open(Parent, Server, From, Ref, Args) of
1291                Head ->
1292                    open_file_loop(Head, 0)
1293            catch
1294                exit:normal ->
1295                    exit(normal);
1296                _:Bad:Stacktrace ->
1297                    bug_found(no_name, Op, Bad, Stacktrace, From),
1298                    exit(Bad) % give up
1299            end
1300    end.
1301
1302open_file_loop(Head, N) when element(1, Head#head.update_mode) =:= error ->
1303    open_file_loop2(Head, N);
1304open_file_loop(Head, N) ->
1305    receive
1306        %% When the table is fixed it can be assumed that at least one
1307        %% traversal is in progress. To speed the traversal up three
1308        %% things have been done:
1309        %% - prioritize match_init, bchunk, next, and match_delete_init;
1310        %% - do not peek the message queue for updates;
1311        %% - wait 1 ms after each update.
1312        %% next is normally followed by lookup, but since lookup is also
1313        %% used when not traversing the table, it is not prioritized.
1314        ?DETS_CALL(From, {match_init, _State, _Safe} = Op) ->
1315            do_apply_op(Op, From, Head, N);
1316        ?DETS_CALL(From, {bchunk, _State} = Op) ->
1317            do_apply_op(Op, From, Head, N);
1318        ?DETS_CALL(From, {next, _Key} = Op) ->
1319            do_apply_op(Op, From, Head, N);
1320        ?DETS_CALL(From, {match_delete_init, _MP, _Spec} = Op) ->
1321            do_apply_op(Op, From, Head, N);
1322        {'EXIT', Pid, Reason} when Pid =:= Head#head.parent ->
1323            %% Parent orders shutdown.
1324            _NewHead = do_stop(Head),
1325            exit(Reason);
1326        {'EXIT', Pid, Reason} when Pid =:= Head#head.server ->
1327            %% The server is gone.
1328            _NewHead = do_stop(Head),
1329            exit(Reason);
1330        {'EXIT', Pid, _Reason} ->
1331            %% A process fixing the table exits.
1332            H2 = remove_fix(Head, Pid, close),
1333            open_file_loop(H2, N);
1334        {system, From, Req} ->
1335            sys:handle_system_msg(Req, From, Head#head.parent,
1336                                  ?MODULE, [], Head)
1337    after 0 ->
1338            open_file_loop2(Head, N)
1339    end.
1340
1341open_file_loop2(Head, N) ->
1342    receive
1343        ?DETS_CALL(From, Op) ->
1344            do_apply_op(Op, From, Head, N);
1345        {'EXIT', Pid, Reason} when Pid =:= Head#head.parent ->
1346            %% Parent orders shutdown.
1347            _NewHead = do_stop(Head),
1348            exit(Reason);
1349        {'EXIT', Pid, Reason} when Pid =:= Head#head.server ->
1350            %% The server is gone.
1351            _NewHead = do_stop(Head),
1352            exit(Reason);
1353        {'EXIT', Pid, _Reason} ->
1354            %% A process fixing the table exits.
1355            H2 = remove_fix(Head, Pid, close),
1356            open_file_loop(H2, N);
1357        {system, From, Req} ->
1358            sys:handle_system_msg(Req, From, Head#head.parent,
1359                                  ?MODULE, [], Head);
1360        Message ->
1361            error_logger:format("** dets: unexpected message"
1362                                "(ignored): ~tw~n", [Message]),
1363            open_file_loop(Head, N)
1364    end.
1365
1366do_apply_op(Op, From, Head, N) ->
1367    try apply_op(Op, From, Head, N) of
1368        ok ->
1369            open_file_loop(Head, N);
1370        {N2, H2} when is_record(H2, head), is_integer(N2) ->
1371            open_file_loop(H2, N2);
1372        H2 when is_record(H2, head) ->
1373            open_file_loop(H2, N);
1374        {{more,From1,Op1,N1}, NewHead} ->
1375            do_apply_op(Op1, From1, NewHead, N1)
1376    catch
1377        exit:normal ->
1378            exit(normal);
1379        _:Bad:Stacktrace ->
1380            bug_found(Head#head.name, Op, Bad, Stacktrace, From),
1381            open_file_loop(Head, N)
1382    end.
1383
1384apply_op(Op, From, Head, N) ->
1385    case Op of
1386	{add_user, Tab, OpenArgs}->
1387            #open_args{file = Fname, type = Type, keypos = Keypos,
1388                       ram_file = Ram, access = Access} = OpenArgs,
1389	    %% min_no_slots and max_no_slots are not tested
1390	    Res = if
1391		      Tab =:= Head#head.name,
1392		      Head#head.keypos =:= Keypos,
1393		      Head#head.type =:= Type,
1394		      Head#head.ram_file =:= Ram,
1395		      Head#head.access =:= Access,
1396		      Fname =:= Head#head.filename ->
1397			  ok;
1398		      true ->
1399			  err({error, incompatible_arguments})
1400		  end,
1401	    From ! {self(), Res},
1402	    ok;
1403	auto_save ->
1404	    case Head#head.update_mode of
1405		saved ->
1406		    Head;
1407		{error, _Reason} ->
1408		    Head;
1409		_Dirty when N =:= 0 -> % dirty or new_dirty
1410		    %% The updates seems to have declined
1411		    dets_utils:vformat("** dets: Auto save of ~tp\n",
1412                                       [Head#head.name]),
1413		    {NewHead, _Res} = perform_save(Head, true),
1414		    erlang:garbage_collect(),
1415		    {0, NewHead};
1416		dirty ->
1417		    %% Reset counter and try later
1418		    start_auto_save_timer(Head),
1419		    {0, Head}
1420	    end;
1421	close  ->
1422	    From ! {self(), fclose(Head)},
1423	    _NewHead = unlink_fixing_procs(Head),
1424	    ?PROFILE(ep:done()),
1425	    exit(normal);
1426	{close, Pid} ->
1427	    %% Used from dets_server when Pid has closed the table,
1428	    %% but the table is still opened by some process.
1429	    NewHead = remove_fix(Head, Pid, close),
1430	    From ! {self(), status(NewHead)},
1431	    NewHead;
1432	{corrupt, Reason} ->
1433	    {H2, Error} = dets_utils:corrupt_reason(Head, Reason),
1434	    From ! {self(), Error},
1435	    H2;
1436	{delayed_write, WrTime} ->
1437	    delayed_write(Head, WrTime);
1438	info ->
1439	    {H2, Res} = finfo(Head),
1440	    From ! {self(), Res},
1441	    H2;
1442	{info, Tag} ->
1443	    {H2, Res} = finfo(Head, Tag),
1444	    From ! {self(), Res},
1445	    H2;
1446        {is_compatible_bchunk_format, Term} ->
1447            Res = test_bchunk_format(Head, Term),
1448            From ! {self(), Res},
1449            ok;
1450	{internal_open, Ref, Args} ->
1451            do_internal_open(Head#head.parent, Head#head.server, From,
1452                             Ref, Args);
1453	may_grow when Head#head.update_mode =/= saved ->
1454	    if
1455		Head#head.update_mode =:= dirty ->
1456		    %% Won't grow more if the table is full.
1457		    {H2, _Res} =
1458			dets_v9:may_grow(Head, 0, many_times),
1459		    {N + 1, H2};
1460		true ->
1461		    ok
1462	    end;
1463	{set_verbose, What} ->
1464	    set_verbose(What),
1465	    From ! {self(), ok},
1466	    ok;
1467	{where, Object} ->
1468	    {H2, Res} = where_is_object(Head, Object),
1469	    From ! {self(), Res},
1470	    H2;
1471	_Message when element(1, Head#head.update_mode) =:= error ->
1472	    From ! {self(), status(Head)},
1473	    ok;
1474	%% The following messages assume that the status of the table is OK.
1475	{bchunk_init, Tab} ->
1476	    {H2, Res} = do_bchunk_init(Head, Tab),
1477	    From ! {self(), Res},
1478	    H2;
1479	{bchunk, State} ->
1480	    {H2, Res} = do_bchunk(Head, State),
1481	    From ! {self(), Res},
1482	    H2;
1483	delete_all_objects ->
1484	    {H2, Res} = fdelete_all_objects(Head),
1485	    From ! {self(), Res},
1486	    erlang:garbage_collect(),
1487	    {0, H2};
1488	{delete_key, _Keys} when Head#head.update_mode =:= dirty ->
1489            stream_op(Op, From, [], Head, N);
1490	{delete_object, Objs} when Head#head.update_mode =:= dirty ->
1491	    case check_objects(Objs, Head#head.keypos) of
1492		true ->
1493		    stream_op(Op, From, [], Head, N);
1494		false ->
1495		    From ! {self(), badarg},
1496		    ok
1497	    end;
1498	first ->
1499	    {H2, Res} = ffirst(Head),
1500	    From ! {self(), Res},
1501	    H2;
1502        {initialize, InitFun, Format, MinNoSlots} ->
1503            {H2, Res} = finit(Head, InitFun, Format, MinNoSlots),
1504            From ! {self(), Res},
1505	    erlang:garbage_collect(),
1506            H2;
1507	{insert, Objs} when Head#head.update_mode =:= dirty ->
1508	    case check_objects(Objs, Head#head.keypos) of
1509		true ->
1510		    stream_op(Op, From, [], Head, N);
1511		false ->
1512		    From ! {self(), badarg},
1513		    ok
1514	    end;
1515	{insert_new, Objs} when Head#head.update_mode =:= dirty ->
1516            {H2, Res} = finsert_new(Head, Objs),
1517            From ! {self(), Res},
1518            {N + 1, H2};
1519	{lookup_keys, _Keys} ->
1520	    stream_op(Op, From, [], Head, N);
1521	{match_init, State, Safe} ->
1522	    {H1, Res} = fmatch_init(Head, State),
1523            H2 = case Res of
1524                     {cont,_} -> H1;
1525                     _ when Safe =:= no_safe-> H1;
1526                     _ when Safe =:= safe -> do_safe_fixtable(H1, From, false)
1527                 end,
1528	    From ! {self(), Res},
1529	    H2;
1530	{match, MP, Spec, NObjs, Safe} ->
1531	    {H2, Res} = fmatch(Head, MP, Spec, NObjs, Safe, From),
1532	    From ! {self(), Res},
1533	    H2;
1534	{member, _Key} = Op ->
1535	    stream_op(Op, From, [], Head, N);
1536	{next, Key} ->
1537	    {H2, Res} = fnext(Head, Key),
1538	    From ! {self(), Res},
1539	    H2;
1540	{match_delete, State} when Head#head.update_mode =:= dirty ->
1541	    {H1, Res} = fmatch_delete(Head, State),
1542            H2 = case Res of
1543                     {cont,_S,_N} -> H1;
1544                     _ -> do_safe_fixtable(H1, From, false)
1545                 end,
1546	    From ! {self(), Res},
1547	    {N + 1, H2};
1548	{match_delete_init, MP, Spec} when Head#head.update_mode =:= dirty ->
1549	    {H2, Res} = fmatch_delete_init(Head, MP, Spec, From),
1550	    From ! {self(), Res},
1551	    {N + 1, H2};
1552	{safe_fixtable, Bool} ->
1553	    NewHead = do_safe_fixtable(Head, From, Bool),
1554	    From ! {self(), ok},
1555	    NewHead;
1556	{slot, Slot} ->
1557	    {H2, Res} = fslot(Head, Slot),
1558	    From ! {self(), Res},
1559	    H2;
1560	sync ->
1561	    {NewHead, Res} = perform_save(Head, true),
1562	    From ! {self(), Res},
1563	    erlang:garbage_collect(),
1564	    {0, NewHead};
1565	{update_counter, Key, Incr} when Head#head.update_mode =:= dirty ->
1566	    {NewHead, Res} = do_update_counter(Head, Key, Incr),
1567	    From ! {self(), Res},
1568	    {N + 1, NewHead};
1569	WriteOp when Head#head.update_mode =:= new_dirty ->
1570	    H2 = Head#head{update_mode = dirty},
1571	    apply_op(WriteOp, From, H2, 0);
1572	WriteOp when Head#head.access =:= read_write,
1573		     Head#head.update_mode =:= saved ->
1574	    case catch dets_v9:mark_dirty(Head) of
1575		ok ->
1576		    start_auto_save_timer(Head),
1577		    H2 = Head#head{update_mode = dirty},
1578		    apply_op(WriteOp, From, H2, 0);
1579		{NewHead, Error} when is_record(NewHead, head) ->
1580		    From ! {self(), Error},
1581		    NewHead
1582	    end;
1583	WriteOp when is_tuple(WriteOp), Head#head.access =:= read ->
1584	    Reason = {access_mode, Head#head.filename},
1585	    From ! {self(), err({error, Reason})},
1586	    ok
1587    end.
1588
1589bug_found(Name, Op, Bad, Stacktrace, From) ->
1590    case dets_utils:debug_mode() of
1591        true ->
1592            %% If stream_op/5 found more requests, this is not
1593            %% the last operation.
1594            error_logger:format
1595              ("** dets: Bug was found when accessing table ~tw,~n"
1596               "** dets: operation was ~tp and reply was ~tw.~n"
1597               "** dets: Stacktrace: ~tw~n",
1598               [Name, Op, Bad, Stacktrace]);
1599        false ->
1600            error_logger:format
1601              ("** dets: Bug was found when accessing table ~tw~n",
1602               [Name])
1603    end,
1604    if
1605        From =/= self() ->
1606            From ! {self(), {error, {dets_bug, Name, Op, Bad}}},
1607            ok;
1608        true -> % auto_save | may_grow | {delayed_write, _}
1609            ok
1610    end.
1611
1612do_internal_open(Parent, Server, From, Ref, Args) ->
1613    ?PROFILE(ep:do()),
1614    case do_open_file(Args, Parent, Server, Ref) of
1615        {ok, Head} ->
1616            From ! {self(), ok},
1617            Head;
1618        Error ->
1619            From ! {self(), Error},
1620            exit(normal)
1621    end.
1622
1623start_auto_save_timer(Head) when Head#head.auto_save =:= infinity ->
1624    ok;
1625start_auto_save_timer(Head) ->
1626    Millis = Head#head.auto_save,
1627    _Ref = erlang:send_after(Millis, self(), ?DETS_CALL(self(), auto_save)),
1628    ok.
1629
1630%% Peek the message queue and try to evaluate several
1631%% lookup requests in parallel. Evalute delete_object, delete and
1632%% insert as well.
1633stream_op(Op, Pid, Pids, Head, N) ->
1634    #head{fixed = Fxd, update_mode = M} = Head,
1635    stream_op(Head, Pids, [], N, Pid, Op, Fxd, M).
1636
1637stream_loop(Head, Pids, C, N, false = Fxd, M) ->
1638    receive
1639	?DETS_CALL(From, Message) ->
1640	    stream_op(Head, Pids, C, N, From, Message, Fxd, M)
1641    after 0 ->
1642	    stream_end(Head, Pids, C, N, no_more)
1643    end;
1644stream_loop(Head, Pids, C, N, _Fxd, _M) ->
1645    stream_end(Head, Pids, C, N, no_more).
1646
1647stream_op(Head, Pids, C, N, Pid, {lookup_keys,Keys}, Fxd, M) ->
1648    NC = [{{lookup,Pid},Keys} | C],
1649    stream_loop(Head, Pids, NC, N, Fxd, M);
1650stream_op(Head, Pids, C, N, Pid, {insert, _Objects} = Op, Fxd, dirty = M) ->
1651    NC = [Op | C],
1652    stream_loop(Head, [Pid | Pids], NC, N, Fxd, M);
1653stream_op(Head, Pids, C, N, Pid, {delete_key, _Keys} = Op, Fxd, dirty = M) ->
1654    NC = [Op | C],
1655    stream_loop(Head, [Pid | Pids], NC, N, Fxd, M);
1656stream_op(Head, Pids, C, N, Pid, {delete_object, _Os} = Op, Fxd, dirty = M) ->
1657    NC = [Op | C],
1658    stream_loop(Head, [Pid | Pids], NC, N, Fxd, M);
1659stream_op(Head, Pids, C, N, Pid, {member, Key}, Fxd, M) ->
1660    NC = [{{lookup,[Pid]},[Key]} | C],
1661    stream_loop(Head, Pids, NC, N, Fxd, M);
1662stream_op(Head, Pids, C, N, Pid, Op, _Fxd, _M) ->
1663    stream_end(Head, Pids, C, N, {Pid,Op}).
1664
1665stream_end(Head, Pids0, C, N, Next) ->
1666    case catch update_cache(Head, lists:reverse(C)) of
1667	{Head1, [], PwriteList} ->
1668	    stream_end1(Pids0, Next, N, C, Head1, PwriteList);
1669	{Head1, Found, PwriteList} ->
1670	    %% Possibly an optimization: reply to lookup requests
1671	    %% first, then write stuff. This makes it possible for
1672	    %% clients to continue while the disk is accessed.
1673	    %% (Replies to lookup requests are sent earlier than
1674	    %%  replies to delete and insert requests even if the
1675	    %%  latter requests were made before the lookup requests,
1676	    %%  which can be confusing.)
1677	    _ = lookup_replies(Found),
1678	    stream_end1(Pids0, Next, N, C, Head1, PwriteList);
1679	Head1 when is_record(Head1, head) ->
1680	    stream_end2(Pids0, Pids0, Next, N, C, Head1, ok);
1681	{Head1, Error} when is_record(Head1, head) ->
1682	    %% Dig out the processes that did lookup or member.
1683	    Fun = fun({{lookup,[Pid]},_Keys}, L) -> [Pid | L];
1684		     ({{lookup,Pid},_Keys}, L) -> [Pid | L];
1685		     (_, L) -> L
1686		  end,
1687	    LPs0 = lists:foldl(Fun, [], C),
1688	    LPs = lists:usort(lists:flatten(LPs0)),
1689	    stream_end2(Pids0 ++ LPs, Pids0, Next, N, C, Head1, Error);
1690        DetsError ->
1691            throw(DetsError)
1692    end.
1693
1694stream_end1(Pids, Next, N, C, Head, []) ->
1695    stream_end2(Pids, Pids, Next, N, C, Head, ok);
1696stream_end1(Pids, Next, N, C, Head, PwriteList) ->
1697    {Head1, PR} = (catch dets_utils:pwrite(Head, PwriteList)),
1698    stream_end2(Pids, Pids, Next, N, C, Head1, PR).
1699
1700stream_end2([Pid | Pids], Ps, Next, N, C, Head, Reply) ->
1701    Pid ! {self(), Reply},
1702    stream_end2(Pids, Ps, Next, N+1, C, Head, Reply);
1703stream_end2([], Ps, no_more, N, C, Head, _Reply) ->
1704    penalty(Head, Ps, C),
1705    {N, Head};
1706stream_end2([], _Ps, {From, Op}, N, _C, Head, _Reply) ->
1707    {{more,From,Op,N},Head}.
1708
1709penalty(H, _Ps, _C) when H#head.fixed =:= false ->
1710    ok;
1711penalty(_H, _Ps, [{{lookup,_Pids},_Keys}]) ->
1712    ok;
1713penalty(#head{fixed = {_,[{Pid,_}]}}, [Pid], _C) ->
1714    ok;
1715penalty(_H, _Ps, _C) ->
1716    timer:sleep(1).
1717
1718lookup_replies([{P,O}]) ->
1719    lookup_reply(P, O);
1720lookup_replies(Q) ->
1721    [{P,O} | L] = dets_utils:family(Q),
1722    lookup_replies(P, lists:append(O), L).
1723
1724lookup_replies(P, O, []) ->
1725    lookup_reply(P, O);
1726lookup_replies(P, O, [{P2,O2} | L]) ->
1727    _ = lookup_reply(P, O),
1728    lookup_replies(P2, lists:append(O2), L).
1729
1730%% If a list of Pid then op was {member, Key}. Inlined.
1731lookup_reply([P], O) ->
1732    P ! {self(), O =/= []};
1733lookup_reply(P, O) ->
1734    P ! {self(), O}.
1735
1736%%-----------------------------------------------------------------
1737%% Callback functions for system messages handling.
1738%%-----------------------------------------------------------------
1739system_continue(_Parent, _, Head) ->
1740    open_file_loop(Head, 0).
1741
1742system_terminate(Reason, _Parent, _, Head) ->
1743    _NewHead = do_stop(Head),
1744    exit(Reason).
1745
1746%%-----------------------------------------------------------------
1747%% Code for upgrade.
1748%%-----------------------------------------------------------------
1749system_code_change(State, _Module, _OldVsn, _Extra) ->
1750    {ok, State}.
1751
1752
1753%%%----------------------------------------------------------------------
1754%%% Internal functions
1755%%%----------------------------------------------------------------------
1756
1757%% -> {ok, Fd, fileheader()} | throw(Error)
1758read_file_header(FileName, Access, RamFile) ->
1759    BF = if
1760	     RamFile ->
1761		 case file:read_file(FileName) of
1762		     {ok, B} -> B;
1763		     Err -> dets_utils:file_error(FileName, Err)
1764		 end;
1765	     true ->
1766		 FileName
1767	 end,
1768    {ok, Fd} = dets_utils:open(BF, open_args(Access, RamFile)),
1769    {ok, <<Version:32>>} =
1770        dets_utils:pread_close(Fd, FileName, ?FILE_FORMAT_VERSION_POS, 4),
1771    if
1772        Version =< 8 ->
1773            _ = file:close(Fd),
1774            throw({error, {format_8_no_longer_supported, FileName}});
1775        Version =:= 9 ->
1776            dets_v9:read_file_header(Fd, FileName);
1777        true ->
1778            _ = file:close(Fd),
1779            throw({error, {not_a_dets_file, FileName}})
1780    end.
1781
1782fclose(Head) ->
1783    {Head1, Res} = perform_save(Head, false),
1784    case Head1#head.ram_file of
1785	true ->
1786            Res;
1787	false ->
1788            dets_utils:stop_disk_map(),
1789	    Res2 = file:close(Head1#head.fptr),
1790            if
1791                Res2 =:= ok -> Res;
1792                true -> Res2
1793            end
1794    end.
1795
1796%% -> {NewHead, Res}
1797perform_save(Head, DoSync) when Head#head.update_mode =:= dirty;
1798				Head#head.update_mode =:= new_dirty ->
1799    case catch begin
1800                   {Head1, []} = write_cache(Head),
1801                   {Head2, ok} = dets_v9:do_perform_save(Head1),
1802                   ok = ensure_written(Head2, DoSync),
1803                   {Head2#head{update_mode = saved}, ok}
1804               end of
1805        {NewHead, _} = Reply when is_record(NewHead, head) ->
1806            Reply
1807    end;
1808perform_save(Head, _DoSync) ->
1809    {Head, status(Head)}.
1810
1811ensure_written(Head, DoSync) when Head#head.ram_file ->
1812    {ok, EOF} = dets_utils:position(Head, eof),
1813    {ok, Bin} = dets_utils:pread(Head, 0, EOF, 0),
1814    if
1815	DoSync ->
1816	    dets_utils:write_file(Head, Bin);
1817	not DoSync ->
1818            case file:write_file(Head#head.filename, Bin) of
1819                ok ->
1820                    ok;
1821                Error ->
1822                    dets_utils:corrupt_file(Head, Error)
1823            end
1824    end;
1825ensure_written(Head, true) when not Head#head.ram_file ->
1826    dets_utils:sync(Head);
1827ensure_written(Head, false) when not Head#head.ram_file ->
1828    ok.
1829
1830%% -> {NewHead, {cont(), [binary()]}} | {NewHead, Error}
1831do_bchunk_init(Head, Tab) ->
1832    case catch write_cache(Head) of
1833	{H2, []} ->
1834	    case dets_v9:table_parameters(H2) of
1835		undefined ->
1836		    {H2, {error, old_version}};
1837		Parms ->
1838                    L = dets_utils:all_allocated(H2),
1839                    Bin = if
1840                              L =:= <<>> -> eof;
1841                              true -> <<>>
1842                          end,
1843		    BinParms = term_to_binary(Parms),
1844		    {H2, {#dets_cont{no_objs = default, bin = Bin, alloc = L,
1845                                     tab = Tab, proc = self(),what = bchunk},
1846                          [BinParms]}}
1847	    end;
1848	{NewHead, _} = HeadError when is_record(NewHead, head) ->
1849	    HeadError
1850    end.
1851
1852%% -> {NewHead, {cont(), [binary()]}} | {NewHead, Error}
1853do_bchunk(Head, #dets_cont{proc = Proc}) when Proc =/= self() ->
1854    {Head, badarg};
1855do_bchunk(Head, #dets_cont{bin = eof}) ->
1856    {Head, '$end_of_table'};
1857do_bchunk(Head, State) ->
1858    case dets_v9:read_bchunks(Head, State#dets_cont.alloc) of
1859	{error, Reason} ->
1860	    dets_utils:corrupt_reason(Head, Reason);
1861	{finished, Bins} ->
1862	    {Head, {State#dets_cont{bin = eof}, Bins}};
1863	{Bins, NewL} ->
1864	    {Head, {State#dets_cont{alloc = NewL}, Bins}}
1865    end.
1866
1867%% -> {NewHead, Result}
1868fdelete_all_objects(Head) when Head#head.fixed =:= false ->
1869    case catch do_delete_all_objects(Head) of
1870	{ok, NewHead} ->
1871	    start_auto_save_timer(NewHead),
1872	    {NewHead, ok};
1873	{error, Reason} ->
1874	    dets_utils:corrupt_reason(Head, Reason)
1875    end;
1876fdelete_all_objects(Head) ->
1877    {Head, fixed}.
1878
1879do_delete_all_objects(Head) ->
1880    #head{fptr = Fd, name = Tab, filename = Fname, type = Type, keypos = Kp,
1881	  ram_file = Ram, auto_save = Auto, min_no_slots = MinSlots,
1882	  max_no_slots = MaxSlots, cache = Cache} = Head,
1883    CacheSz = dets_utils:cache_size(Cache),
1884    ok = dets_utils:truncate(Fd, Fname, bof),
1885    dets_v9:initiate_file(Fd, Tab, Fname, Type, Kp, MinSlots, MaxSlots,
1886                          Ram, CacheSz, Auto, true).
1887
1888ffirst(H) ->
1889    Ref = make_ref(),
1890    case catch {Ref, ffirst1(H)} of
1891	{Ref, {NH, R}} ->
1892	    {NH, {ok, R}};
1893	{NH, R} when is_record(NH, head) ->
1894	    {NH, {error, R}}
1895    end.
1896
1897ffirst1(H) ->
1898    check_safe_fixtable(H),
1899    {NH, []} = write_cache(H),
1900    ffirst(NH, 0).
1901
1902ffirst(H, Slot) ->
1903    case dets_v9:slot_objs(H, Slot) of
1904	'$end_of_table' -> {H, '$end_of_table'};
1905	[] -> ffirst(H, Slot+1);
1906	[X|_] -> {H, element(H#head.keypos, X)}
1907    end.
1908
1909%% -> {NewHead, Reply}, Reply = ok | badarg | Error.
1910finsert(Head, Objects) ->
1911    case catch update_cache(Head, Objects, insert) of
1912	{NewHead, []} ->
1913	    {NewHead, ok};
1914	{NewHead, _} = HeadError when is_record(NewHead, head) ->
1915	    HeadError
1916    end.
1917
1918%% -> {NewHead, Reply}, Reply = ok | badarg | Error.
1919finsert_new(Head, Objects) ->
1920    KeyPos = Head#head.keypos,
1921    case catch lists:map(fun(Obj) -> element(KeyPos, Obj) end, Objects) of
1922        Keys when is_list(Keys) ->
1923            case catch update_cache(Head, Keys, {lookup, nopid}) of
1924                {Head1, PidObjs} when is_list(PidObjs) ->
1925                    case lists:all(fun({_P,OL}) -> OL =:= [] end, PidObjs) of
1926                        true ->
1927                            case catch update_cache(Head1, Objects, insert) of
1928                                {NewHead, []} ->
1929                                    {NewHead, true};
1930                                {NewHead, Error} when is_record(NewHead, head) ->
1931                                    {NewHead, Error}
1932                            end;
1933                        false=Reply ->
1934                            {Head1, Reply}
1935                    end;
1936                {NewHead, _} = HeadError when is_record(NewHead, head) ->
1937                    HeadError
1938            end;
1939        _ ->
1940            {Head, badarg}
1941    end.
1942
1943do_safe_fixtable(Head, Pid, true) ->
1944    case Head#head.fixed of
1945	false ->
1946	    link(Pid),
1947	    MonTime = erlang:monotonic_time(),
1948	    TimeOffset = erlang:time_offset(),
1949	    Fixed = {{MonTime, TimeOffset}, [{Pid, 1}]},
1950	    Ftab = dets_utils:get_freelists(Head),
1951	    Head#head{fixed = Fixed, freelists = {Ftab, Ftab}};
1952	{TimeStamp, Counters} ->
1953	    case lists:keysearch(Pid, 1, Counters) of
1954		{value, {Pid, Counter}} -> % when Counter > 1
1955		    NewCounters = lists:keyreplace(Pid, 1, Counters,
1956						   {Pid, Counter+1}),
1957		    Head#head{fixed = {TimeStamp, NewCounters}};
1958		false ->
1959		    link(Pid),
1960		    Fixed = {TimeStamp, [{Pid, 1} | Counters]},
1961		    Head#head{fixed = Fixed}
1962	    end
1963    end;
1964do_safe_fixtable(Head, Pid, false) ->
1965    remove_fix(Head, Pid, false).
1966
1967remove_fix(Head, Pid, How) ->
1968    case Head#head.fixed of
1969	false ->
1970	    Head;
1971	{TimeStamp, Counters} ->
1972	    case lists:keysearch(Pid, 1, Counters) of
1973		%% How =:= close when Pid closes the table.
1974		{value, {Pid, Counter}} when Counter =:= 1; How =:= close ->
1975		    unlink(Pid),
1976		    case lists:keydelete(Pid, 1, Counters) of
1977			[] ->
1978			    check_growth(Head),
1979			    erlang:garbage_collect(),
1980			    Head#head{fixed = false,
1981				  freelists = dets_utils:get_freelists(Head)};
1982			NewCounters ->
1983			    Head#head{fixed = {TimeStamp, NewCounters}}
1984		    end;
1985		{value, {Pid, Counter}} ->
1986		    NewCounters = lists:keyreplace(Pid, 1, Counters,
1987						   {Pid, Counter-1}),
1988		    Head#head{fixed = {TimeStamp, NewCounters}};
1989		false ->
1990		    Head
1991	    end
1992    end.
1993
1994do_stop(Head) ->
1995    _NewHead = unlink_fixing_procs(Head),
1996    fclose(Head).
1997
1998unlink_fixing_procs(Head) ->
1999    case Head#head.fixed of
2000	false ->
2001	    Head;
2002	{_, Counters} ->
2003	    lists:foreach(fun({Pid, _Counter}) -> unlink(Pid) end, Counters),
2004	    Head#head{fixed = false,
2005		      freelists = dets_utils:get_freelists(Head)}
2006    end.
2007
2008check_growth(#head{access = read}) ->
2009    ok;
2010check_growth(Head) ->
2011    NoThings = no_things(Head),
2012    if
2013	NoThings > Head#head.next ->
2014	    _Ref = erlang:send_after
2015                     (200, self(), ?DETS_CALL(self(), may_grow)), % Catch up.
2016            ok;
2017	true ->
2018	    ok
2019    end.
2020
2021finfo(H) ->
2022    case catch write_cache(H) of
2023	{H2, []} ->
2024	    Info = (catch [{type, H2#head.type},
2025			   {keypos, H2#head.keypos},
2026			   {size, H2#head.no_objects},
2027			   {file_size,
2028			    file_size(H2#head.fptr, H2#head.filename)},
2029			   {filename, H2#head.filename}]),
2030	    {H2, Info};
2031	{H2, _} = HeadError when is_record(H2, head) ->
2032	    HeadError
2033    end.
2034
2035finfo(H, access) -> {H, H#head.access};
2036finfo(H, auto_save) -> {H, H#head.auto_save};
2037finfo(H, bchunk_format) ->
2038    case catch write_cache(H) of
2039        {H2, []} ->
2040            case dets_v9:table_parameters(H2) of
2041                undefined = Undef ->
2042                    {H2, Undef};
2043                Parms ->
2044                    {H2, term_to_binary(Parms)}
2045            end;
2046        {H2, _} = HeadError when is_record(H2, head) ->
2047            HeadError
2048    end;
2049finfo(H, delayed_write) -> % undocumented
2050    {H, dets_utils:cache_size(H#head.cache)};
2051finfo(H, filename) -> {H, H#head.filename};
2052finfo(H, file_size) ->
2053    case catch write_cache(H) of
2054	{H2, []} ->
2055	    {H2, catch file_size(H#head.fptr, H#head.filename)};
2056	{H2, _} = HeadError when is_record(H2, head) ->
2057	    HeadError
2058    end;
2059finfo(H, fixed) ->
2060    %% true if fixtable/2 has been called
2061    {H, not (H#head.fixed =:= false)};
2062finfo(H, hash) -> {H, H#head.hash_bif};
2063finfo(H, keypos) -> {H, H#head.keypos};
2064finfo(H, memory) -> finfo(H, file_size);
2065finfo(H, no_objects) -> finfo(H, size);
2066finfo(H, no_keys) ->
2067    case catch write_cache(H) of
2068	{H2, []} ->
2069	    {H2, H2#head.no_keys};
2070	{H2, _} = HeadError when is_record(H2, head) ->
2071	    HeadError
2072    end;
2073finfo(H, no_slots) -> {H, dets_v9:no_slots(H)};
2074finfo(H, pid) -> {H, self()};
2075finfo(H, ram_file) -> {H, H#head.ram_file};
2076finfo(H, safe_fixed) ->
2077    {H,
2078     case H#head.fixed of
2079	 false ->
2080	     false;
2081	 {{FixMonTime, TimeOffset}, RefList} ->
2082	     {make_timestamp(FixMonTime, TimeOffset), RefList}
2083     end};
2084finfo(H, safe_fixed_monotonic_time) ->
2085    {H,
2086     case H#head.fixed of
2087	 false ->
2088	     false;
2089	 {{FixMonTime, _TimeOffset}, RefList} ->
2090	     {FixMonTime, RefList}
2091     end};
2092finfo(H, size) ->
2093    case catch write_cache(H) of
2094	{H2, []} ->
2095	    {H2, H2#head.no_objects};
2096	{H2, _} = HeadError when is_record(H2, head) ->
2097	    HeadError
2098    end;
2099finfo(H, type) -> {H, H#head.type};
2100finfo(H, version) -> {H, 9};
2101finfo(H, _) -> {H, undefined}.
2102
2103file_size(Fd, FileName) ->
2104    {ok, Pos} = dets_utils:position(Fd, FileName, eof),
2105    Pos.
2106
2107test_bchunk_format(_Head, undefined) ->
2108    false;
2109test_bchunk_format(Head, Term) ->
2110    dets_v9:try_bchunk_header(Term, Head) =/= not_ok.
2111
2112do_open_file([Fname, Verbose], Parent, Server, Ref) ->
2113    case catch fopen2(Fname, Ref) of
2114	{error, {tooshort, _}} ->
2115            err({error, {not_a_dets_file, Fname}});
2116	{error, _Reason} = Error ->
2117	    err(Error);
2118	{ok, Head} ->
2119	    maybe_put(verbose, Verbose),
2120	    {ok, Head#head{parent = Parent, server = Server}};
2121	{'EXIT', _Reason} = Error ->
2122	    Error;
2123	Bad ->
2124	    error_logger:format
2125	      ("** dets: Bug was found in open_file/1, reply was ~tw.~n",
2126	       [Bad]),
2127	    {error, {dets_bug, Fname, Bad}}
2128    end;
2129do_open_file([Tab, OpenArgs, Verb], Parent, Server, _Ref) ->
2130    case catch fopen3(Tab, OpenArgs) of
2131	{error, {tooshort, _}} ->
2132            err({error, {not_a_dets_file, OpenArgs#open_args.file}});
2133	{error, _Reason} = Error ->
2134	    err(Error);
2135	{ok, Head} ->
2136	    maybe_put(verbose, Verb),
2137	    {ok, Head#head{parent = Parent, server = Server}};
2138	{'EXIT', _Reason} = Error ->
2139	    Error;
2140	Bad ->
2141	    error_logger:format
2142	      ("** dets: Bug was found in open_file/2, arguments were~n"
2143	       "** dets: ~tw and reply was ~tw.~n",
2144	       [OpenArgs, Bad]),
2145	    {error, {dets_bug, Tab, {open_file, OpenArgs}, Bad}}
2146    end.
2147
2148maybe_put(_, undefined) ->
2149    ignore;
2150maybe_put(K, V) ->
2151    put(K, V).
2152
2153%% -> {Head, Result}, Result = ok | Error | {thrown, Error} | badarg
2154finit(Head, InitFun, _Format, _NoSlots) when Head#head.access =:= read ->
2155    _ = (catch InitFun(close)),
2156    {Head, {error, {access_mode, Head#head.filename}}};
2157finit(Head, InitFun, _Format, _NoSlots) when Head#head.fixed =/= false ->
2158    _ = (catch InitFun(close)),
2159    {Head, {error, {fixed_table, Head#head.name}}};
2160finit(Head, InitFun, Format, NoSlots) ->
2161    case catch do_finit(Head, InitFun, Format, NoSlots) of
2162	{ok, NewHead} ->
2163	    check_growth(NewHead),
2164	    start_auto_save_timer(NewHead),
2165	    {NewHead, ok};
2166	badarg ->
2167	    {Head, badarg};
2168	Error ->
2169	    dets_utils:corrupt(Head, Error)
2170    end.
2171
2172%% -> {ok, NewHead} | throw(badarg) | throw(Error)
2173do_finit(Head, Init, Format, NoSlots) ->
2174    #head{fptr = Fd, type = Type, keypos = Kp, auto_save = Auto,
2175          cache = Cache, filename = Fname, ram_file = Ram,
2176	  min_no_slots = MinSlots0, max_no_slots = MaxSlots,
2177          name = Tab, update_mode = UpdateMode} = Head,
2178    CacheSz = dets_utils:cache_size(Cache),
2179    {How, Head1} =
2180	case Format of
2181	    term when is_integer(NoSlots), NoSlots > MaxSlots ->
2182		throw(badarg);
2183	    term ->
2184		MinSlots = choose_no_slots(NoSlots, MinSlots0),
2185		if
2186		    UpdateMode =:= new_dirty, MinSlots =:= MinSlots0 ->
2187			{general_init, Head};
2188		    true ->
2189			ok = dets_utils:truncate(Fd, Fname, bof),
2190			{ok, H} =
2191                            dets_v9:initiate_file(Fd, Tab, Fname, Type, Kp,
2192                                                  MinSlots, MaxSlots, Ram,
2193                                                  CacheSz, Auto, false),
2194			{general_init, H}
2195		end;
2196	    bchunk ->
2197		ok = dets_utils:truncate(Fd, Fname, bof),
2198		{bchunk_init, Head}
2199	end,
2200    case How of
2201	bchunk_init ->
2202	    case dets_v9:bchunk_init(Head1, Init) of
2203		{ok, NewHead} ->
2204		    {ok, NewHead#head{update_mode = dirty}};
2205		Error ->
2206		    Error
2207	    end;
2208	general_init ->
2209	    Cntrs = ets:new(dets_init, []),
2210	    Input = dets_v9:bulk_input(Head1, Init, Cntrs),
2211	    SlotNumbers = {Head1#head.min_no_slots, bulk_init, MaxSlots},
2212	    {Reply, SizeData} =
2213		do_sort(Head1, SlotNumbers, Input, Cntrs, Fname),
2214	    Bulk = true,
2215	    case Reply of
2216		{ok, NoDups, H1} ->
2217		    fsck_copy(SizeData, H1, Bulk, NoDups);
2218		Else ->
2219		    close_files(Bulk, SizeData, Head1),
2220		    Else
2221	    end
2222    end.
2223
2224%% -> {NewHead, [LookedUpObject]} | {NewHead, Error}
2225flookup_keys(Head, Keys) ->
2226    case catch update_cache(Head, Keys, {lookup, nopid}) of
2227	{NewHead, [{_NoPid,Objs}]} ->
2228	    {NewHead, Objs};
2229	{NewHead, L} when is_list(L) ->
2230	    {NewHead, lists:flatmap(fun({_Pid,OL}) -> OL end, L)};
2231	{NewHead, _} = HeadError when is_record(NewHead, head) ->
2232	    HeadError
2233    end.
2234
2235%% -> {NewHead, Result}
2236fmatch_init(Head, #dets_cont{bin = eof}) ->
2237    {Head, '$end_of_table'};
2238fmatch_init(Head, C) ->
2239    case scan(Head, C) of
2240	{scan_error, Reason} ->
2241	    dets_utils:corrupt_reason(Head, Reason);
2242	{Ts, NC} ->
2243	    {Head, {cont, {Ts, NC}}}
2244    end.
2245
2246%% -> {NewHead, Result}
2247fmatch(Head, MP, Spec, N, Safe, From) ->
2248    KeyPos = Head#head.keypos,
2249    case find_all_keys(Spec, KeyPos, []) of
2250	[] ->
2251	    %% Complete match
2252	    case catch write_cache(Head) of
2253		{Head1, []} ->
2254                    NewHead =
2255                        case Safe of
2256                            safe -> do_safe_fixtable(Head1, From, true);
2257                            no_safe -> Head1
2258                        end,
2259		    C0 = init_scan(NewHead, N),
2260		    {NewHead, {cont, C0#dets_cont{match_program = MP}}};
2261		{NewHead, _} = HeadError when is_record(NewHead, head) ->
2262		    HeadError
2263	    end;
2264	List ->
2265	    Keys = lists:usort(List),
2266	    {NewHead, Reply} = flookup_keys(Head, Keys),
2267	    case Reply of
2268		Objs when is_list(Objs) ->
2269                    {match_spec, MS} = MP,
2270		    MatchingObjs = ets:match_spec_run(Objs, MS),
2271		    {NewHead, {done, MatchingObjs}};
2272		Error ->
2273		    {NewHead, Error}
2274	    end
2275    end.
2276
2277find_all_keys([], _, Ks) ->
2278    Ks;
2279find_all_keys([{H,_,_} | T], KeyPos, Ks) when is_tuple(H) ->
2280    case tuple_size(H) of
2281	Enough when Enough >= KeyPos ->
2282	    Key = element(KeyPos, H),
2283	    case contains_variable(Key) of
2284		true ->
2285		    [];
2286		false ->
2287		    find_all_keys(T, KeyPos, [Key | Ks])
2288	    end;
2289	_ ->
2290	    find_all_keys(T, KeyPos, Ks)
2291    end;
2292find_all_keys(_, _, _) ->
2293    [].
2294
2295contains_variable('_') ->
2296    true;
2297contains_variable(A) when is_atom(A) ->
2298    case atom_to_list(A) of
2299	[$$ | T] ->
2300	    case (catch list_to_integer(T)) of
2301		{'EXIT', _} ->
2302		    false;
2303		_ ->
2304		    true
2305	    end;
2306	_ ->
2307	    false
2308    end;
2309contains_variable(T) when is_tuple(T) ->
2310    contains_variable(tuple_to_list(T));
2311contains_variable([]) ->
2312    false;
2313contains_variable([H|T]) ->
2314    case contains_variable(H) of
2315	true ->
2316	    true;
2317	false ->
2318	    contains_variable(T)
2319    end;
2320contains_variable(_) ->
2321    false.
2322
2323%% -> {NewHead, Res}
2324fmatch_delete_init(Head, MP, Spec, From) ->
2325    KeyPos = Head#head.keypos,
2326    case catch
2327        case find_all_keys(Spec, KeyPos, []) of
2328            [] ->
2329                do_fmatch_delete_var_keys(Head, MP, Spec, From);
2330            List ->
2331                Keys = lists:usort(List),
2332                do_fmatch_constant_keys(Head, Keys, MP)
2333        end of
2334        {NewHead, _} = Reply when is_record(NewHead, head) ->
2335            Reply
2336    end.
2337
2338%% A note: If deleted objects reside in a bucket with other objects
2339%% that are not deleted, the bucket is moved. If the address of the
2340%% moved bucket is greater than original bucket address the kept
2341%% objects will be read once again later on.
2342%% -> {NewHead, Res}
2343fmatch_delete(Head, C) ->
2344    case scan(Head, C) of
2345	{scan_error, Reason} ->
2346	    dets_utils:corrupt_reason(Head, Reason);
2347	{[], _} ->
2348	    {Head, {done, 0}};
2349	{RTs, NC} ->
2350	    {match_spec, MP} = C#dets_cont.match_program,
2351	    case catch filter_binary_terms(RTs, MP, []) of
2352		{'EXIT', _} ->
2353                    Bad = dets_utils:bad_object(fmatch_delete, RTs),
2354		    dets_utils:corrupt_reason(Head, Bad);
2355		Terms ->
2356		    do_fmatch_delete(Head, Terms, NC)
2357	    end
2358    end.
2359
2360do_fmatch_delete_var_keys(Head, _MP, ?PATTERN_TO_TRUE_MATCH_SPEC('_'), _From)
2361            when Head#head.fixed =:= false ->
2362    %% Handle the case where the file is emptied efficiently.
2363    %% Empty the cache just to get the number of objects right.
2364    {Head1, []} = write_cache(Head),
2365    N = Head1#head.no_objects,
2366    case fdelete_all_objects(Head1) of
2367	{NewHead, ok} ->
2368	    {NewHead, {done, N}};
2369	Reply ->
2370	    Reply
2371    end;
2372do_fmatch_delete_var_keys(Head, MP, _Spec, From) ->
2373    Head1 = do_safe_fixtable(Head, From, true),
2374    {NewHead, []} = write_cache(Head1),
2375    C0 = init_scan(NewHead, default),
2376    {NewHead, {cont, C0#dets_cont{match_program = MP}, 0}}.
2377
2378do_fmatch_constant_keys(Head, Keys, {match_spec, MP}) ->
2379    case flookup_keys(Head, Keys) of
2380	{NewHead, ReadTerms} when is_list(ReadTerms) ->
2381	    Terms = filter_terms(ReadTerms, MP, []),
2382	    do_fmatch_delete(NewHead, Terms, fixed);
2383	Reply ->
2384	    Reply
2385    end.
2386
2387filter_binary_terms([Bin | Bins], MP, L) ->
2388    Term = binary_to_term(Bin),
2389    case ets:match_spec_run([Term], MP) of
2390	[true] ->
2391	    filter_binary_terms(Bins, MP, [Term | L]);
2392	_ ->
2393	    filter_binary_terms(Bins, MP, L)
2394    end;
2395filter_binary_terms([], _MP, L) ->
2396    L.
2397
2398filter_terms([Term | Terms], MP, L) ->
2399    case ets:match_spec_run([Term], MP) of
2400	[true] ->
2401	    filter_terms(Terms, MP, [Term | L]);
2402	_ ->
2403	    filter_terms(Terms, MP, L)
2404    end;
2405filter_terms([], _MP, L) ->
2406    L.
2407
2408do_fmatch_delete(Head, Terms, What) ->
2409    N = length(Terms),
2410    case do_delete(Head, Terms, delete_object) of
2411	{NewHead, ok} when What =:= fixed ->
2412	    {NewHead, {done, N}};
2413	{NewHead, ok} ->
2414	    {NewHead, {cont, What, N}};
2415	Reply ->
2416	    Reply
2417    end.
2418
2419do_delete(Head, Things, What) ->
2420    case catch update_cache(Head, Things, What) of
2421	{NewHead, []} ->
2422	    {NewHead, ok};
2423	{NewHead, _} = HeadError when is_record(NewHead, head) ->
2424	    HeadError
2425    end.
2426
2427fnext(Head, Key) ->
2428    Slot = dets_v9:db_hash(Key, Head),
2429    Ref = make_ref(),
2430    case catch {Ref, fnext(Head, Key, Slot)} of
2431	{Ref, {H, R}} ->
2432	    {H, {ok, R}};
2433	{NewHead, _} = HeadError when is_record(NewHead, head) ->
2434	    HeadError
2435    end.
2436
2437fnext(H, Key, Slot) ->
2438    {NH, []} = write_cache(H),
2439    case dets_v9:slot_objs(NH, Slot) of
2440	'$end_of_table' -> {NH, '$end_of_table'};
2441	L -> fnext_search(NH, Key, Slot, L)
2442    end.
2443
2444fnext_search(H, K, Slot, L) ->
2445    Kp = H#head.keypos,
2446    case beyond_key(K, Kp, L) of
2447	[] -> fnext_slot(H, K, Slot+1);
2448	L2 -> {H, element(H#head.keypos, hd(L2))}
2449    end.
2450
2451%% We've got to continue to search for the next key in the next slot
2452fnext_slot(H, K, Slot) ->
2453    case dets_v9:slot_objs(H, Slot) of
2454	'$end_of_table' -> {H, '$end_of_table'};
2455	[] -> fnext_slot(H, K, Slot+1);
2456	L -> {H, element(H#head.keypos, hd(L))}
2457    end.
2458
2459beyond_key(_K, _Kp, []) -> [];
2460beyond_key(K, Kp, [H|T]) ->
2461    case dets_utils:cmp(element(Kp, H), K) of
2462        0 -> beyond_key2(K, Kp, T);
2463        _ -> beyond_key(K, Kp, T)
2464    end.
2465
2466beyond_key2(_K, _Kp, []) -> [];
2467beyond_key2(K, Kp, [H|T]=L) ->
2468    case dets_utils:cmp(element(Kp, H), K) of
2469        0 -> beyond_key2(K, Kp, T);
2470        _ -> L
2471    end.
2472
2473%% Open an already existing file, no arguments
2474%% -> {ok, head()} | throw(Error)
2475fopen2(Fname, Tab) ->
2476    case file:read_file_info(Fname) of
2477	{ok, _} ->
2478	    Acc = read_write,
2479	    Ram = false,
2480	    {ok, Fd, FH} = read_file_header(Fname, Acc, Ram),
2481            Do = case dets_v9:check_file_header(FH, Fd) of
2482                     {ok, Head1} ->
2483                         Head2 = Head1#head{filename = Fname},
2484                         try {ok, dets_v9:init_freelist(Head2)}
2485                         catch
2486                             throw:_ ->
2487                                 {repair, " has bad free lists, repairing ..."}
2488                         end;
2489                     {error, not_closed} ->
2490                         M = " not properly closed, repairing ...",
2491                         {repair, M};
2492                     Else ->
2493                         Else
2494                 end,
2495            case Do of
2496		{repair, Mess} ->
2497                    io:format(user, "dets: file ~tp~s~n", [Fname, Mess]),
2498                    case fsck(Fd, Tab, Fname, FH, default, default) of
2499                        ok ->
2500                            fopen2(Fname, Tab);
2501                        Error ->
2502                            throw(Error)
2503                    end;
2504		{ok, Head} ->
2505		    open_final(Head, Fname, Acc, Ram, ?DEFAULT_CACHE,
2506			       Tab, false);
2507		{error, Reason} ->
2508		    throw({error, {Reason, Fname}})
2509	    end;
2510	Error ->
2511	    dets_utils:file_error(Fname, Error)
2512    end.
2513
2514%% Open and possibly create and initialize a file
2515%% -> {ok, head()} | throw(Error)
2516fopen3(Tab, OpenArgs) ->
2517    FileName = OpenArgs#open_args.file,
2518    case file:read_file_info(FileName) of
2519	{ok, _} ->
2520	    fopen_existing_file(Tab, OpenArgs);
2521	Error when OpenArgs#open_args.access =:= read ->
2522	    dets_utils:file_error(FileName, Error);
2523	_Error ->
2524	    fopen_init_file(Tab, OpenArgs)
2525    end.
2526
2527fopen_existing_file(Tab, OpenArgs) ->
2528    #open_args{file = Fname, type = Type, keypos = Kp, repair = Rep,
2529               min_no_slots = MinSlots, max_no_slots = MaxSlots,
2530               ram_file = Ram, delayed_write = CacheSz, auto_save =
2531               Auto, access = Acc, debug = Debug} =
2532        OpenArgs,
2533    {ok, Fd, FH} = read_file_header(Fname, Acc, Ram),
2534    MinF = (MinSlots =:= default) or (MinSlots =:= FH#fileheader.min_no_slots),
2535    MaxF = (MaxSlots =:= default) or (MaxSlots =:= FH#fileheader.max_no_slots),
2536    Wh = case dets_v9:check_file_header(FH, Fd) of
2537	     {ok, Head} when Rep =:= force, Acc =:= read_write,
2538                             FH#fileheader.no_colls =/= undefined,
2539                             MinF, MaxF ->
2540	         {compact, Head};
2541             {ok, _Head} when Rep =:= force, Acc =:= read ->
2542                 throw({error, {access_mode, Fname}});
2543	     {ok, _Head} when Rep =:= force ->
2544		 M = ", repair forced.",
2545		 {repair, M};
2546	     {ok, Head} ->
2547		 {final, Head};
2548	     {error, not_closed} when Rep =:= force, Acc =:= read_write ->
2549		 M = ", repair forced.",
2550		 {repair, M};
2551	     {error, not_closed} when Rep =:= true, Acc =:= read_write ->
2552		 M = " not properly closed, repairing ...",
2553		 {repair, M};
2554	     {error, not_closed} when Rep =:= false ->
2555		 throw({error, {needs_repair, Fname}});
2556	     {error, Reason} ->
2557		 throw({error, {Reason, Fname}})
2558	 end,
2559    Do = case Wh of
2560             {Tag, Hd} when Tag =:= final; Tag =:= compact ->
2561                 Hd1 = Hd#head{filename = Fname},
2562                 try {Tag, dets_v9:init_freelist(Hd1)}
2563                 catch
2564                     throw:_ ->
2565                         {repair, " has bad free lists, repairing ..."}
2566                 end;
2567             Else ->
2568                 Else
2569         end,
2570    case Do of
2571	_ when FH#fileheader.type =/= Type ->
2572	    throw({error, {type_mismatch, Fname}});
2573	_ when FH#fileheader.keypos =/= Kp ->
2574	    throw({error, {keypos_mismatch, Fname}});
2575	{compact, SourceHead} ->
2576	    io:format(user, "dets: file ~tp is now compacted ...~n", [Fname]),
2577	    {ok, NewSourceHead} = open_final(SourceHead, Fname, read, false,
2578					     ?DEFAULT_CACHE, Tab, Debug),
2579	    case catch compact(NewSourceHead) of
2580		ok ->
2581		    erlang:garbage_collect(),
2582		    fopen3(Tab, OpenArgs#open_args{repair = false});
2583		_Err ->
2584                    _ = file:close(Fd),
2585                    dets_utils:stop_disk_map(),
2586		    io:format(user, "dets: compaction of file ~tp failed, "
2587			      "now repairing ...~n", [Fname]),
2588                    {ok, Fd2, _FH} = read_file_header(Fname, Acc, Ram),
2589                    do_repair(Fd2, Tab, Fname, FH, MinSlots, MaxSlots,
2590			      OpenArgs)
2591	    end;
2592	{repair, Mess} ->
2593	    io:format(user, "dets: file ~tp~s~n", [Fname, Mess]),
2594            do_repair(Fd, Tab, Fname, FH, MinSlots, MaxSlots,
2595		      OpenArgs);
2596	{final, H} ->
2597	    H1 = H#head{auto_save = Auto},
2598	    open_final(H1, Fname, Acc, Ram, CacheSz, Tab, Debug)
2599    end.
2600
2601do_repair(Fd, Tab, Fname, FH, MinSlots, MaxSlots, OpenArgs) ->
2602    case fsck(Fd, Tab, Fname, FH, MinSlots, MaxSlots) of
2603	ok ->
2604	    erlang:garbage_collect(),
2605	    fopen3(Tab, OpenArgs#open_args{repair = false});
2606	Error ->
2607	    throw(Error)
2608    end.
2609
2610%% -> {ok, head()} | throw(Error)
2611open_final(Head, Fname, Acc, Ram, CacheSz, Tab, Debug) ->
2612    Head1 = Head#head{access = Acc,
2613		      ram_file = Ram,
2614		      filename = Fname,
2615		      name = Tab,
2616		      cache = dets_utils:new_cache(CacheSz)},
2617    init_disk_map(Tab, Debug),
2618    dets_v9:cache_segps(Head1#head.fptr, Fname, Head1#head.next),
2619    check_growth(Head1),
2620    {ok, Head1}.
2621
2622%% -> {ok, head()} | throw(Error)
2623fopen_init_file(Tab, OpenArgs) ->
2624    #open_args{file = Fname, type = Type, keypos = Kp,
2625               min_no_slots = MinSlotsArg, max_no_slots = MaxSlotsArg,
2626	       ram_file = Ram, delayed_write = CacheSz, auto_save = Auto,
2627               debug = Debug} = OpenArgs,
2628    MinSlots = choose_no_slots(MinSlotsArg, ?DEFAULT_MIN_NO_SLOTS),
2629    MaxSlots = choose_no_slots(MaxSlotsArg, ?DEFAULT_MAX_NO_SLOTS),
2630    FileSpec = if
2631		   Ram -> [];
2632		   true -> Fname
2633	       end,
2634    {ok, Fd} = dets_utils:open(FileSpec, open_args(read_write, Ram)),
2635    %% No need to truncate an empty file.
2636    init_disk_map(Tab, Debug),
2637    case catch dets_v9:initiate_file(Fd, Tab, Fname, Type, Kp,
2638                                     MinSlots, MaxSlots,
2639                                     Ram, CacheSz, Auto, true) of
2640	{error, Reason} when Ram ->
2641	    _ = file:close(Fd),
2642	    throw({error, Reason});
2643	{error, Reason} ->
2644	    _ = file:close(Fd),
2645	    _ = file:delete(Fname),
2646	    throw({error, Reason});
2647	{ok, Head} ->
2648	    start_auto_save_timer(Head),
2649	    %% init_table does not need to truncate and write header
2650	    {ok, Head#head{update_mode = new_dirty}}
2651    end.
2652
2653%% Debug.
2654init_disk_map(Name, Debug) ->
2655    case Debug orelse dets_utils:debug_mode() of
2656        true ->
2657            dets_utils:init_disk_map(Name);
2658        false ->
2659            ok
2660    end.
2661
2662open_args(Access, RamFile) ->
2663    A1 = case Access of
2664	     read -> [];
2665	     read_write -> [write]
2666	 end,
2667    A2 = case RamFile of
2668	     true -> [ram];
2669	     false -> [raw]
2670	   end,
2671    A1 ++ A2 ++ [binary, read].
2672
2673%% -> ok | throw(Error)
2674compact(SourceHead) ->
2675    #head{name = Tab, filename = Fname, fptr = SFd, type = Type, keypos = Kp,
2676	  ram_file = Ram, auto_save = Auto} = SourceHead,
2677    Tmp = tempfile(Fname),
2678    TblParms = dets_v9:table_parameters(SourceHead),
2679    {ok, Fd} = dets_utils:open(Tmp, open_args(read_write, false)),
2680    CacheSz = ?DEFAULT_CACHE,
2681    %% It is normally not possible to have two open tables in the same
2682    %% process since the process dictionary is used for caching
2683    %% segment pointers, but here is works anyway--when reading a file
2684    %% serially the pointers do not need to be used.
2685    Head = case catch dets_v9:prep_table_copy(Fd, Tab, Tmp, Type, Kp, Ram,
2686					      CacheSz, Auto, TblParms) of
2687	       {ok, H} ->
2688		   H;
2689	       Error ->
2690		   _ = file:close(Fd),
2691                   _ = file:delete(Tmp),
2692		   throw(Error)
2693	   end,
2694
2695    case dets_v9:compact_init(SourceHead, Head, TblParms) of
2696	{ok, NewHead} ->
2697	    R = case fclose(NewHead) of
2698		    ok ->
2699                        ok = file:close(SFd),
2700			%% Save (rename) Fname first?
2701			dets_utils:rename(Tmp, Fname);
2702		    E ->
2703			E
2704		end,
2705	    if
2706		R =:= ok -> ok;
2707		true ->
2708		    _ = file:delete(Tmp),
2709		    throw(R)
2710	    end;
2711	Err ->
2712	    _ = file:close(Fd),
2713            _ = file:delete(Tmp),
2714	    throw(Err)
2715    end.
2716
2717%% -> ok | Error
2718%% Closes Fd.
2719fsck(Fd, Tab, Fname, FH, MinSlotsArg, MaxSlotsArg) ->
2720    %% MinSlots and MaxSlots are the option values.
2721    #fileheader{min_no_slots = MinSlotsFile,
2722                max_no_slots = MaxSlotsFile} = FH,
2723    EstNoSlots0 = file_no_things(FH),
2724    MinSlots = choose_no_slots(MinSlotsArg, MinSlotsFile),
2725    MaxSlots = choose_no_slots(MaxSlotsArg, MaxSlotsFile),
2726    EstNoSlots = erlang:min(MaxSlots, erlang:max(MinSlots, EstNoSlots0)),
2727    SlotNumbers = {MinSlots, EstNoSlots, MaxSlots},
2728    %% When repairing: We first try and sort on slots using MinSlots.
2729    %% If the number of objects (keys) turns out to be significantly
2730    %% different from NoSlots, we try again with the correct number of
2731    %% objects (keys).
2732    case fsck_try(Fd, Tab, FH, Fname, SlotNumbers) of
2733        {try_again, BetterNoSlots} ->
2734	    BetterSlotNumbers = {MinSlots, BetterNoSlots, MaxSlots},
2735            case fsck_try(Fd, Tab, FH, Fname, BetterSlotNumbers) of
2736                {try_again, _} ->
2737                    _ = file:close(Fd),
2738                    {error, {cannot_repair, Fname}};
2739                Else ->
2740                    Else
2741            end;
2742        Else ->
2743            Else
2744    end.
2745
2746choose_no_slots(default, NoSlots) -> NoSlots;
2747choose_no_slots(NoSlots, _) -> NoSlots.
2748
2749%% -> ok | {try_again, integer()} | Error
2750%% Closes Fd unless {try_again, _} is returned.
2751%% Initiating a table using a fun and repairing (or converting) a
2752%% file are completely different things, but nevertheless the same
2753%% method is used in both cases...
2754fsck_try(Fd, Tab, FH, Fname, SlotNumbers) ->
2755    Tmp = tempfile(Fname),
2756    #fileheader{type = Type, keypos = KeyPos} = FH,
2757    {_MinSlots, EstNoSlots, MaxSlots} = SlotNumbers,
2758    OpenArgs = #open_args{file = Tmp, type = Type, keypos = KeyPos,
2759                          repair = false, min_no_slots = EstNoSlots,
2760			  max_no_slots = MaxSlots,
2761                          ram_file = false, delayed_write = ?DEFAULT_CACHE,
2762                          auto_save = infinity, access = read_write,
2763                          debug = false},
2764    case catch fopen3(Tab, OpenArgs) of
2765	{ok, Head} ->
2766            case fsck_try_est(Head, Fd, Fname, SlotNumbers, FH) of
2767                {ok, NewHead} ->
2768                    R = case fclose(NewHead) of
2769                            ok ->
2770				%% Save (rename) Fname first?
2771				dets_utils:rename(Tmp, Fname);
2772                            Error ->
2773                                Error
2774                        end,
2775                    if
2776			R =:= ok -> ok;
2777			true ->
2778			    _ = file:delete(Tmp),
2779			    R
2780		    end;
2781		TryAgainOrError ->
2782                    _ = file:delete(Tmp),
2783                    TryAgainOrError
2784            end;
2785	Error ->
2786	    _ = file:close(Fd),
2787	    Error
2788    end.
2789
2790tempfile(Fname) ->
2791    Tmp = lists:concat([Fname, ".TMP"]),
2792    case file:delete(Tmp) of
2793        {error, _Reason} -> % typically enoent
2794            ok;
2795        ok ->
2796            assure_no_file(Tmp)
2797    end,
2798    Tmp.
2799
2800assure_no_file(File) ->
2801    case file:read_file_info(File) of
2802        {ok, _FileInfo} ->
2803            %% Wait for some other process to close the file:
2804            timer:sleep(100),
2805            assure_no_file(File);
2806        {error, _} ->
2807            ok
2808    end.
2809
2810%% -> {ok, NewHead} | {try_again, integer()} | Error
2811fsck_try_est(Head, Fd, Fname, SlotNumbers, FH) ->
2812    %% Mod is the module to use for reading input when repairing.
2813    Cntrs = ets:new(dets_repair, []),
2814    Input = dets_v9:fsck_input(Head, Fd, Cntrs, FH),
2815    {Reply, SizeData} = do_sort(Head, SlotNumbers, Input, Cntrs, Fname),
2816    Bulk = false,
2817    case Reply of
2818        {ok, NoDups, H1} ->
2819            _ = file:close(Fd),
2820            fsck_copy(SizeData, H1, Bulk, NoDups);
2821        {try_again, _} = Return ->
2822            close_files(Bulk, SizeData, Head),
2823            Return;
2824        Else ->
2825            _ = file:close(Fd),
2826            close_files(Bulk, SizeData, Head),
2827	    Else
2828    end.
2829
2830do_sort(Head, SlotNumbers, Input, Cntrs, Fname) ->
2831    %% output_objs/4 replaces {LogSize,NoObjects} in Cntrs by
2832    %% {LogSize,Position,Data,NoObjects | NoCollections}.
2833    %% Data = {FileName,FileDescriptor} | [object()]
2834    %% For small tables Data can be a list of objects which is more
2835    %% efficient since no temporary files are created.
2836    Output = dets_v9:output_objs(Head, SlotNumbers, Cntrs),
2837    TmpDir = filename:dirname(Fname),
2838    Reply = (catch file_sorter:sort(Input, Output,
2839				    [{format, binary},{tmpdir, TmpDir}])),
2840    L = ets:tab2list(Cntrs),
2841    ets:delete(Cntrs),
2842    {Reply, lists:reverse(lists:keysort(1, L))}.
2843
2844fsck_copy([{_LogSz, Pos, Bins, _NoObjects} | SizeData], Head, _Bulk, NoDups)
2845   when is_list(Bins) ->
2846    true = NoDups =:= 0,
2847    PWs = [{Pos,Bins} | lists:map(fun({_, P, B, _}) -> {P, B} end, SizeData)],
2848    #head{fptr = Fd, filename = FileName} = Head,
2849    dets_utils:pwrite(Fd, FileName, PWs),
2850    {ok, Head#head{update_mode = dirty}};
2851fsck_copy(SizeData, Head, Bulk, NoDups) ->
2852    catch fsck_copy1(SizeData, Head, Bulk, NoDups).
2853
2854fsck_copy1([SzData | L], Head, Bulk, NoDups) ->
2855    Out = Head#head.fptr,
2856    {LogSz, Pos, {FileName, Fd}, NoObjects} = SzData,
2857    Size = if NoObjects =:= 0 -> 0; true -> ?POW(LogSz-1) end,
2858    ExpectedSize = Size * NoObjects,
2859    case close_tmp(Fd) of
2860        ok -> ok;
2861        Err ->
2862	    close_files(Bulk, L, Head),
2863	    dets_utils:file_error(FileName, Err)
2864    end,
2865    case file:position(Out, Pos) of
2866        {ok, Pos} -> ok;
2867        Err2 ->
2868	    close_files(Bulk, L, Head),
2869	    dets_utils:file_error(Head#head.filename, Err2)
2870        end,
2871    CR = file:copy({FileName, [raw,binary]}, Out),
2872    _ = file:delete(FileName),
2873    case CR of
2874	{ok, Copied} when Copied =:= ExpectedSize;
2875			  NoObjects =:= 0 -> % the segments
2876	    fsck_copy1(L, Head, Bulk, NoDups);
2877	{ok, _Copied} -> % should never happen
2878	    close_files(Bulk, L, Head),
2879	    Reason = if Bulk -> initialization_failed;
2880			true -> repair_failed end,
2881            {error, {Reason, Head#head.filename}};
2882	FError ->
2883	    close_files(Bulk, L, Head),
2884	    dets_utils:file_error(FileName, FError)
2885    end;
2886fsck_copy1([], Head, _Bulk, NoDups) when NoDups =/= 0 ->
2887    {error, {initialization_failed, Head#head.filename}};
2888fsck_copy1([], Head, _Bulk, _NoDups) ->
2889    {ok, Head#head{update_mode = dirty}}.
2890
2891close_files(false, SizeData, Head) ->
2892    _ = file:close(Head#head.fptr),
2893    close_files(true, SizeData, Head);
2894close_files(true, SizeData, _Head) ->
2895    Fun = fun({_Size, _Pos, {FileName, Fd}, _No}) ->
2896		  _ = close_tmp(Fd),
2897		  file:delete(FileName);
2898	     (_) ->
2899		  ok
2900	  end,
2901    lists:foreach(Fun, SizeData).
2902
2903close_tmp(Fd) ->
2904    file:close(Fd).
2905
2906fslot(H, Slot) ->
2907    case catch begin
2908                   {NH, []} = write_cache(H),
2909                   Objs = dets_v9:slot_objs(NH, Slot),
2910                   {NH, Objs}
2911               end of
2912        {NewHead, _Objects} = Reply when is_record(NewHead, head) ->
2913            Reply
2914    end.
2915
2916do_update_counter(Head, _Key, _Incr) when Head#head.type =/= set ->
2917    {Head, badarg};
2918do_update_counter(Head, Key, Incr) ->
2919    case flookup_keys(Head, [Key]) of
2920	{H1, [O]} ->
2921	    Kp = H1#head.keypos,
2922	    case catch try_update_tuple(O, Kp, Incr) of
2923		{'EXIT', _} ->
2924		    {H1, badarg};
2925		{New, Term} ->
2926		    case finsert(H1, [Term]) of
2927			{H2, ok} ->
2928			    {H2, New};
2929			Reply ->
2930			    Reply
2931		    end
2932	    end;
2933	{H1, []} ->
2934	    {H1, badarg};
2935	HeadError ->
2936	    HeadError
2937    end.
2938
2939try_update_tuple(O, _Kp, {Pos, Incr}) ->
2940    try_update_tuple2(O, Pos, Incr);
2941try_update_tuple(O, Kp, Incr) ->
2942    try_update_tuple2(O, Kp+1, Incr).
2943
2944try_update_tuple2(O, Pos, Incr) ->
2945    New = element(Pos, O) + Incr,
2946    {New, setelement(Pos, O, New)}.
2947
2948set_verbose(true) ->
2949    put(verbose, yes);
2950set_verbose(_) ->
2951    erase(verbose).
2952
2953where_is_object(Head, Object) ->
2954    Keypos = Head#head.keypos,
2955    case check_objects([Object], Keypos) of
2956	true ->
2957	    case catch write_cache(Head) of
2958		{NewHead, []} ->
2959		    {NewHead, dets_v9:find_object(NewHead, Object)};
2960		{NewHead, _} = HeadError when is_record(NewHead, head) ->
2961		    HeadError
2962	    end;
2963	false ->
2964	    {Head, badarg}
2965    end.
2966
2967check_objects([T | Ts], Kp) when tuple_size(T) >= Kp ->
2968    check_objects(Ts, Kp);
2969check_objects(L, _Kp) ->
2970    L =:= [].
2971
2972no_things(Head) ->
2973    Head#head.no_keys.
2974
2975file_no_things(FH) ->
2976    FH#fileheader.no_keys.
2977
2978%%% The write cache is list of {Key, [Item]} where Item is one of
2979%%% {Seq, delete_key}, {Seq, {lookup,Pid}}, {Seq, {delete_object,object()}},
2980%%% or {Seq, {insert,object()}}. Seq is a number that increases
2981%%% monotonically for each item put in the cache. The purpose is to
2982%%% make sure that items are sorted correctly. Sequences of delete and
2983%%% insert operations are inserted in the cache without doing any file
2984%%% operations. When the cache is considered full, a lookup operation
2985%%% is requested, or after some delay, the contents of the cache are
2986%%% written to the file, and the cache emptied.
2987%%%
2988%%% Data is not allowed to linger more than 'delay' milliseconds in
2989%%% the write cache. A delayed_write message is received when some
2990%%% datum has become too old. If 'wrtime' is equal to 'undefined',
2991%%% then the cache is empty and no such delayed_write message has been
2992%%% scheduled. Otherwise there is a delayed_write message scheduled,
2993%%% and the value of 'wrtime' is the time when the cache was last
2994%%% written, or when it was first updated after the cache was last
2995%%% written.
2996
2997update_cache(Head, KeysOrObjects, What) ->
2998    {Head1, LU, PwriteList} = update_cache(Head, [{What,KeysOrObjects}]),
2999    {NewHead, ok} = dets_utils:pwrite(Head1, PwriteList),
3000    {NewHead, LU}.
3001
3002%% -> {NewHead, [object()], pwrite_list()} | throw({Head, Error})
3003update_cache(Head, ToAdd) ->
3004    Cache = Head#head.cache,
3005    #cache{cache = C, csize = Size0, inserts = Ins} = Cache,
3006    NewSize = Size0 + erlang:external_size(ToAdd),
3007    %% The size is used as a sequence number here; it increases monotonically.
3008    {NewC, NewIns, Lookup, Found} =
3009	cache_binary(Head, ToAdd, C, Size0, Ins, false, []),
3010    NewCache = Cache#cache{cache = NewC, csize = NewSize, inserts = NewIns},
3011    Head1 = Head#head{cache = NewCache},
3012    if
3013	Lookup; NewSize >= Cache#cache.tsize ->
3014	    %% The cache is considered full, or some lookup.
3015	    {NewHead, LU, PwriteList} = dets_v9:write_cache(Head1),
3016	    {NewHead, Found ++ LU, PwriteList};
3017	NewC =:= [] ->
3018	    {Head1, Found, []};
3019	Cache#cache.wrtime =:= undefined ->
3020	    %% Empty cache. Schedule a delayed write.
3021	    Now = time_now(), Me = self(),
3022	    Call = ?DETS_CALL(Me, {delayed_write, Now}),
3023	    erlang:send_after(Cache#cache.delay, Me, Call),
3024	    {Head1#head{cache = NewCache#cache{wrtime = Now}}, Found, []};
3025	Size0 =:= 0 ->
3026	    %% Empty cache that has been written after the
3027	    %% currently scheduled delayed write.
3028	    {Head1#head{cache = NewCache#cache{wrtime = time_now()}}, Found, []};
3029	true ->
3030	    %% Cache is not empty, delayed write has been scheduled.
3031	    {Head1, Found, []}
3032    end.
3033
3034cache_binary(Head, [{Q,Os} | L], C, Seq, Ins, Lu,F) when Q =:= delete_object ->
3035    cache_obj_op(Head, L, C, Seq, Ins, Lu, F, Os, Head#head.keypos, Q);
3036cache_binary(Head, [{Q,Os} | L], C, Seq, Ins, Lu, F) when Q =:= insert ->
3037    NewIns = Ins + length(Os),
3038    cache_obj_op(Head, L, C, Seq, NewIns, Lu, F, Os, Head#head.keypos, Q);
3039cache_binary(Head, [{Q,Ks} | L], C, Seq, Ins, Lu, F) when Q =:= delete_key ->
3040    cache_key_op(Head, L, C, Seq, Ins, Lu, F, Ks, Q);
3041cache_binary(Head, [{Q,Ks} | L], C, Seq, Ins, _Lu, F) when C =:= [] -> % lookup
3042    cache_key_op(Head, L, C, Seq, Ins, true, F, Ks, Q);
3043cache_binary(Head, [{Q,Ks} | L], C, Seq, Ins, Lu, F) -> % lookup
3044    case dets_utils:cache_lookup(Head#head.type, Ks, C, []) of
3045	false ->
3046	    cache_key_op(Head, L, C, Seq, Ins, true, F, Ks, Q);
3047	Found ->
3048	    {lookup,Pid} = Q,
3049	    cache_binary(Head, L, C, Seq, Ins, Lu, [{Pid,Found} | F])
3050    end;
3051cache_binary(_Head, [], C, _Seq, Ins, Lu, F) ->
3052    {C, Ins, Lu, F}.
3053
3054cache_key_op(Head, L, C, Seq, Ins, Lu, F, [K | Ks], Q) ->
3055    E = {K, {Seq, Q}},
3056    cache_key_op(Head, L, [E | C], Seq+1, Ins, Lu, F, Ks, Q);
3057cache_key_op(Head, L, C, Seq, Ins, Lu, F, [], _Q) ->
3058    cache_binary(Head, L, C, Seq, Ins, Lu, F).
3059
3060cache_obj_op(Head, L, C, Seq, Ins, Lu, F, [O | Os], Kp, Q) ->
3061    E = {element(Kp, O), {Seq, {Q, O}}},
3062    cache_obj_op(Head, L, [E | C], Seq+1, Ins, Lu, F, Os, Kp, Q);
3063cache_obj_op(Head, L, C, Seq, Ins, Lu, F, [], _Kp, _Q) ->
3064    cache_binary(Head, L, C, Seq, Ins, Lu, F).
3065
3066%% Called after some delay.
3067%% -> NewHead
3068delayed_write(Head, WrTime) ->
3069    Cache = Head#head.cache,
3070    LastWrTime = Cache#cache.wrtime,
3071    if
3072	LastWrTime =:= WrTime ->
3073	    %% The cache was not emptied during the last delay.
3074	    case catch write_cache(Head) of
3075		{Head2, []} ->
3076		    NewCache = (Head2#head.cache)#cache{wrtime = undefined},
3077		    Head2#head{cache = NewCache};
3078		{NewHead, _Error} -> % Head.update_mode has been updated
3079		    NewHead
3080	    end;
3081	true ->
3082	    %% The cache was emptied during the delay.
3083	    %% Has anything been written since then?
3084	    if
3085		Cache#cache.csize =:= 0 ->
3086		    %% No, further delayed write not needed.
3087		    NewCache = Cache#cache{wrtime = undefined},
3088		    Head#head{cache = NewCache};
3089		true ->
3090		    %% Yes, schedule a new delayed write.
3091		    When = round((LastWrTime - WrTime)/1000), Me = self(),
3092		    Call = ?DETS_CALL(Me, {delayed_write, LastWrTime}),
3093		    erlang:send_after(When, Me, Call),
3094		    Head
3095	    end
3096    end.
3097
3098%% -> {NewHead, [LookedUpObject]} | throw({NewHead, Error})
3099write_cache(Head) ->
3100    {Head1, LU, PwriteList} = dets_v9:write_cache(Head),
3101    {NewHead, ok} = dets_utils:pwrite(Head1, PwriteList),
3102    {NewHead, LU}.
3103
3104status(Head) ->
3105    case Head#head.update_mode of
3106	saved -> ok;
3107	dirty -> ok;
3108	new_dirty -> ok;
3109	Error -> Error
3110    end.
3111
3112%%% Scan the file from start to end by reading chunks.
3113
3114%% -> dets_cont()
3115init_scan(Head, NoObjs) ->
3116    check_safe_fixtable(Head),
3117    FreeLists = dets_utils:get_freelists(Head),
3118    Base = Head#head.base,
3119    case dets_utils:find_next_allocated(FreeLists, Base, Base) of
3120        {From, To} ->
3121            #dets_cont{no_objs = NoObjs, bin = <<>>, alloc = {From,To,<<>>}};
3122        none ->
3123            #dets_cont{no_objs = NoObjs, bin = eof, alloc = <<>>}
3124    end.
3125
3126check_safe_fixtable(Head) ->
3127    case (Head#head.fixed =:= false) andalso
3128         ((get(verbose) =:= yes) orelse dets_utils:debug_mode()) of
3129        true ->
3130            error_logger:format
3131              ("** dets: traversal of ~tp needs safe_fixtable~n",
3132               [Head#head.name]);
3133        false ->
3134            ok
3135    end.
3136
3137%% -> {[RTerm], dets_cont()} | {scan_error, Reason}
3138%% RTerm = {Pos, Next, Size, Status, Term}
3139scan(_Head, #dets_cont{alloc = <<>>}=C) ->
3140    {[], C};
3141scan(Head, C) -> % when is_record(C, dets_cont)
3142    #dets_cont{no_objs = No, alloc = L0, bin = Bin} = C,
3143    {From, To, L} = L0,
3144    R = case No of
3145	    default ->
3146		0;
3147	    _ when is_integer(No) ->
3148		-No-1
3149	end,
3150    scan(Bin, Head, From, To, L, [], R, {C, Head#head.type}).
3151
3152scan(Bin, H, From, To, L, Ts, R, {C0, Type} = C) ->
3153    case dets_v9:scan_objs(H, Bin, From, To, L, Ts, R, Type) of
3154        {more, NFrom, NTo, NL, NTs, NR, Sz} ->
3155            scan_read(H, NFrom, NTo, Sz, NL, NTs, NR, C);
3156        {stop, <<>>=B, NFrom, NTo, <<>>=NL, NTs} ->
3157            Ftab = dets_utils:get_freelists(H),
3158            case dets_utils:find_next_allocated(Ftab, NFrom, H#head.base) of
3159                none ->
3160                    {NTs, C0#dets_cont{bin = eof, alloc = B}};
3161                _ ->
3162                    {NTs, C0#dets_cont{bin = B, alloc = {NFrom, NTo, NL}}}
3163            end;
3164        {stop, B, NFrom, NTo, NL, NTs} ->
3165            {NTs, C0#dets_cont{bin = B, alloc = {NFrom, NTo, NL}}};
3166        bad_object ->
3167            {scan_error, dets_utils:bad_object(scan, {From, To, Bin})}
3168    end.
3169
3170scan_read(_H, From, To, _Min, L0, Ts,
3171	  R, {C, _Type}) when R >= ?CHUNK_SIZE ->
3172    %% We may have read (much) more than CHUNK_SIZE, if there are holes.
3173    L = {From, To, L0},
3174    {Ts, C#dets_cont{bin = <<>>, alloc = L}};
3175scan_read(H, From, _To, Min, _L, Ts, R, C) ->
3176    Max = if
3177	      Min < ?CHUNK_SIZE -> ?CHUNK_SIZE;
3178	      true -> Min
3179	  end,
3180    FreeLists = dets_utils:get_freelists(H),
3181    case dets_utils:find_allocated(FreeLists, From, Max, H#head.base) of
3182        <<>>=Bin0 ->
3183            {Cont, _} = C,
3184            {Ts, Cont#dets_cont{bin = eof, alloc = Bin0}};
3185        <<From1:32,To1:32,L1/binary>> ->
3186            case dets_utils:pread_n(H#head.fptr, From1, Max) of
3187                eof ->
3188                    {scan_error, premature_eof};
3189                NewBin ->
3190                    scan(NewBin, H, From1, To1, L1, Ts, R, C)
3191            end
3192    end.
3193
3194err(Error) ->
3195    case get(verbose) of
3196	yes ->
3197	    error_logger:format("** dets: failed with ~tw~n", [Error]),
3198	    Error;
3199	undefined  ->
3200	    Error
3201    end.
3202
3203-compile({inline, [time_now/0]}).
3204time_now() ->
3205    erlang:monotonic_time(1000000).
3206
3207make_timestamp(MonTime, TimeOffset) ->
3208    ErlangSystemTime = erlang:convert_time_unit(MonTime+TimeOffset,
3209						native,
3210						microsecond),
3211    MegaSecs = ErlangSystemTime div 1000000000000,
3212    Secs = ErlangSystemTime div 1000000 - MegaSecs*1000000,
3213    MicroSecs = ErlangSystemTime rem 1000000,
3214    {MegaSecs, Secs, MicroSecs}.
3215
3216%%%%%%%%%%%%%%%%%  DEBUG functions %%%%%%%%%%%%%%%%
3217
3218file_info(FileName) ->
3219    case catch read_file_header(FileName, read, false) of
3220	{ok, Fd, FH} ->
3221	    _ = file:close(Fd),
3222            dets_v9:file_info(FH);
3223	Other ->
3224	    Other
3225    end.
3226
3227get_head_field(Fd, Field) ->
3228    dets_utils:read_4(Fd, Field).
3229
3230%% Dump the contents of a DAT file to the tty
3231%% internal debug function which ignores the closed properly thingie
3232%% and just tries anyway
3233
3234view(FileName) ->
3235    case catch read_file_header(FileName, read, false) of
3236        {ok, Fd, FH} ->
3237            try dets_v9:check_file_header(FH, Fd) of
3238                {ok, H0} ->
3239                    case dets_v9:check_file_header(FH, Fd) of
3240                        {ok, H0} ->
3241                            H = dets_v9:init_freelist(H0),
3242                            v_free_list(H),
3243                            dets_v9:v_segments(H),
3244                            ok;
3245                        X ->
3246                            X
3247                    end
3248            after _ = file:close(Fd)
3249            end;
3250	X ->
3251	    X
3252    end.
3253
3254v_free_list(Head) ->
3255    io:format("FREE LIST ...... \n",[]),
3256    io:format("~p~n", [dets_utils:all_free(Head)]),
3257    io:format("END OF FREE LIST \n",[]).
3258