1%%
2%% %CopyrightBegin%
3%%
4%% Copyright Ericsson AB 2001-2016. All Rights Reserved.
5%%
6%% Licensed under the Apache License, Version 2.0 (the "License");
7%% you may not use this file except in compliance with the License.
8%% You may obtain a copy of the License at
9%%
10%%     http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing, software
13%% distributed under the License is distributed on an "AS IS" BASIS,
14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15%% See the License for the specific language governing permissions and
16%% limitations under the License.
17%%
18%% %CopyrightEnd%
19%%
20-module(dets_v9).
21
22%% Dets files, implementation part. This module handles version 9.
23%% To be called from dets.erl only.
24
25-export([mark_dirty/1, read_file_header/2,
26         check_file_header/2, do_perform_save/1, initiate_file/11,
27         prep_table_copy/9, init_freelist/1, fsck_input/4,
28         bulk_input/3, output_objs/3, bchunk_init/2,
29         try_bchunk_header/2, compact_init/3, read_bchunks/2,
30         write_cache/1, may_grow/3, find_object/2, slot_objs/2,
31         scan_objs/8, db_hash/2, no_slots/1, table_parameters/1]).
32
33-export([file_info/1, v_segments/1]).
34
35-export([cache_segps/3]).
36
37-dialyzer(no_improper_lists).
38
39-compile({inline, [{max_objsize,1},{maxobjsize,1}]}).
40-compile({inline, [{write_segment_file,6}]}).
41-compile({inline, [{sz2pos,1},{adjsz,1}]}).
42-compile({inline, [{skip_bytes,6},{make_object,4}]}).
43-compile({inline, [{segp_cache,2},{get_segp,1},{get_arrpart,1}]}).
44-compile({inline, [{h,2}]}).
45
46-include("dets.hrl").
47
48%%  The layout of the file is :
49%%
50%%   bytes   decsription
51%%  ---------------------- File header
52%%    4      FreelistsPointer
53%%    4      Cookie
54%%    4      ClosedProperly (pos=8)
55%%    4      Type (pos=12)
56%%    4      Version (pos=16)
57%%    4      M
58%%    4      Next
59%%    4      KeyPos
60%%    4      NoObjects
61%%    4      NoKeys
62%%    4      MinNoSlots
63%%    4      MaxNoSlots
64%%    4      HashMethod
65%%    4      N
66%%  ---
67%%    256    Version 9(a): Reserved for future versions. Initially zeros.
68%%           Version 9(b) has instead:
69%%    112    28 counters for the buddy system sizes 2^4 to 2^31.
70%%    144    Reserved for future versions. Initially zeros.
71%%           Version 9(c) has instead:
72%%    112    28 counters for the buddy system sizes (as for 9(b)).
73%%    16     MD5-sum for the 44 plus 112 bytes before the MD5-sum.
74%%           (FreelistsPointer, Cookie and ClosedProperly are not digested.)
75%%    128    Reserved for future versions. Initially zeros.
76%%           Version 9(d), introduced in R15A, has instead:
77%%    112    28 counters for the buddy system sizes (as for 9(b)).
78%%    16     MD5-sum for the 44 plus 112 bytes before the MD5-sum.
79%%           (FreelistsPointer, Cookie and ClosedProperly are not digested.)
80%%    4      Base of the buddy system.
81%%           0 (zero) if the base is equal to ?BASE. Compatible with R14B.
82%%           File size at the end of the file is RealFileSize - Base.
83%%           The reason for modifying file size is that when a file created
84%%           by R15 is read by R14 a repair takes place immediately, which
85%%           is acceptable when downgrading.
86%%    124    Reserved for future versions. Initially zeros.
87%%  ---
88%%  ------------------ end of file header
89%%    4*256  SegmentArray Pointers.
90%%  ------------------ This is BASE.
91%%    4*512  SegmentArray Part 1
92%%    ...    More SegmentArray Parts
93%%    8*256  First segment
94%%    ???    Objects (free and alive)
95%%    4*512  Further SegmentArray Part.
96%%    ???    Objects (free and alive)
97%%    8*256  Further segment.
98%%    ???    Objects (free and alive)
99%%    ... more objects, segment array parts, and segments ...
100%%  -----------------------------
101%%    ???    Free lists
102%%  -----------------------------
103%%    4      File size, in bytes. See 9(d) obove.
104
105%%  Before we can find an object we must find the slot where the
106%%  object resides. Each slot is a (possibly empty) list (or chain) of
107%%  objects that hash to the same slot. If the value stored in the
108%%  slot is zero, the slot chain is empty. If the slot value is
109%%  non-zero, the value points to a position in the file where the
110%%  collection of objects resides. Each collection has the following
111%%  layout:
112%%
113%%   bytes  decsription
114%%  --------------------
115%%    4     Size of the area allocated for the collection (8+Sz)
116%%    4     Status  (FREE or ACTIVE). These two are the Object Header.
117%%    Sz    A binary containing the objects per key, sorted on key.
118%%
119%%  When repairing or converting a file, the status field is used.
120%%
121%%  The binary containing the objects per key of a table of type 'set'
122%%  has the following layout:
123%%
124%%   bytes  decsription
125%%  --------------------
126%%    4     Size of the object of the first key (4+OSz1)
127%%    OSz1  The object of the first key
128%%    ...
129%%    4     Size of the object of the ith key (4+OSzi)
130%%    OSzi  The object of the ith key
131%%
132%%  The binary containing the objects per key of a table of type 'bag'
133%%  or 'duplicate_bag' has the following layout:
134%%
135%%   bytes    decsription
136%%  ----------------------
137%%    4       Size of the objects of the first key (4 + OSz1_1+...+OSz1_j+...)
138%%    4       Size of the first object of the first key (4+OSz1_1)
139%%    OSz1_1  The first object of the first key
140%%    ...
141%%    4       Size of the jth object of the first key (4+OSz1_j)
142%%    OSz1_j  The jth object of the first key
143%%    ...
144%%    4       Size of the objects of the ith key (4 + OSzi_1+...+OSzi_k+...)
145%%    4       Size of the first object of the ith key (4+OSzi_1)
146%%    OSzi_1  The first object of the ith key
147%%    ...
148%%    4       Size of the kth object of the ith key (4+OSzi_k)
149%%    OSzi_k  The kth object of the ith key
150%%    ...
151%%
152%%  The objects of a key are placed in time order, that is, the older
153%%  objects come first. If a new object is inserted, it is inserted
154%%  last.
155%%
156%%
157%%
158%%|---------------|
159%%|      head     |
160%%|       	  |
161%%|               |
162%%|_______________|
163%%|               |--|
164%%|___part ptr 1__|  |
165%%|               |  | segarr part 1
166%%|___part ptr 2__|  V______________|
167%%|               |  |   p1         |
168%%|               |  |______________|--|
169%%|     ....      |  |   p2         |  |
170%%     (256)         |______________|  |
171%%                   |              |  |
172%%                   |     ....     |  | segment 1
173%%                   |    (512)     |  V __slot 0 ____|
174%%                                     |   size       |
175%%                                     |   pointer    |--|
176%%                                     |___slot 1 ____|  |
177%%                                     |              |  |
178%%                                     |   ....       |  |  objects in slot 0
179%%                                         (256)         V  segment 1
180%%                                                       |___________|
181%%                                                       |  size     |
182%%                                                       |___________|
183%%                                                       |  status   |
184%%                                                       |___________|
185%%                                                       |           |
186%%                                                       |   object  |
187%%                                                       |   collec. |
188%%                                                       |___________|
189
190%%%
191%%% File header
192%%%
193
194-define(RESERVED, 124).        % Reserved for future use.
195
196-define(COLL_CNTRS, (28*4)).     % Counters for the buddy system.
197-define(MD5SZ, 16).
198-define(FL_BASE, 4).
199
200-define(HEADSZ, 56+?COLL_CNTRS  % The size of the file header, in bytes,
201            +?MD5SZ+?FL_BASE).  % not including the reserved part.
202-define(HEADEND, (?HEADSZ+?RESERVED)).
203                               % End of header and reserved area.
204-define(SEGSZ, 512).           % Size of a segment, in words. SZOBJP*SEGSZP.
205-define(SEGSZP, 256).          % Size of a segment, in number of pointers.
206-define(SEGSZP_LOG2, 8).
207-define(SEGOBJSZ, (4 * ?SZOBJP)).
208-define(SEGPARTSZ, 512).       % Size of segment array part, in words.
209-define(SEGPARTSZ_LOG2, 9).
210-define(SEGARRSZ, 256).        % Maximal number of segment array parts..
211-define(SEGARRADDR(PartN), (?HEADEND + (4 * (PartN)))).
212-define(SEGPARTADDR(P,SegN), ((P) + (4 * ?REM2(SegN, ?SEGPARTSZ)))).
213-define(BASE, ?SEGARRADDR(?SEGARRSZ)).
214-define(MAXSLOTS, (?SEGARRSZ * ?SEGPARTSZ * ?SEGSZP)).
215
216-define(SLOT2SEG(S), ((S) bsr ?SEGSZP_LOG2)).
217-define(SEG2SEGARRPART(S), ((S) bsr ?SEGPARTSZ_LOG2)).
218
219-define(PHASH, 0).
220-define(PHASH2, 1).
221
222%% BIG is used for hashing. BIG must be greater than the maximum
223%% number of slots, currently 32 M (MAXSLOTS).
224-define(BIG, 16#3ffffff). % 64 M
225
226%% Hard coded positions into the file header:
227-define(FREELIST_POS, 0).
228-define(CLOSED_PROPERLY_POS, 8).
229-define(D_POS, 20).
230
231%%% This module handles Dets file format version 9, introduced in
232%%% Erlang/OTP R8.
233%%%
234%%% Version 9(a) tables have 256 reserved bytes in the file header,
235%%% all initialized to zero.
236%%% Version 9(b) tables use the first 112 of these bytes for storing
237%%% number of objects for each size of the buddy system. An empty 9(b)
238%%% table cannot be distinguished from an empty 9(a) table.
239%%% 9(c) has an MD5-sum for the file header.
240
241-define(FILE_FORMAT_VERSION, 9).
242
243-define(NOT_PROPERLY_CLOSED,0).
244-define(CLOSED_PROPERLY,1).
245
246%% Size of object pointer, in words. SEGSZ = SZOBJP * SEGSZP.
247-define(SZOBJP, 2).
248
249-define(OHDSZ, 8).          % The size of the object header, in bytes.
250-define(STATUS_POS, 4).     % Position of the status field.
251
252%% The size of each object is a multiple of 16.
253%% BUMP is used when repairing files.
254-define(BUMP, 16).
255
256%%% '$hash' is the value of HASH_PARMS in Erlang/OTP R8, '$hash2' is
257%%% the value in Erlang/OTP R9.
258%%%
259%%% The fields of the ?HASH_PARMS records are the same, but having
260%%% different tags makes bchunk_init on Erlang/OTP R8 nodes reject
261%%% data from Erlang/OTP R9 nodes, and vice versa. This is overkill,
262%%% and due to an oversight. What should have been done in Erlang/OTP
263%%% R8 was to check the hash method, not only the type of the table
264%%% and the key position. Erlang/OTP R8 nodes cannot handle the phash2
265%%% method.
266-define(HASH_PARMS, '$hash2').
267
268-define(BCHUNK_FORMAT_VERSION, 1).
269
270-record(?HASH_PARMS, {
271	   file_format_version,
272	   bchunk_format_version,
273	   file, type, keypos, hash_method,
274	   n,m,next,
275	   min,max,
276	   no_objects,no_keys,
277	   no_colls :: no_colls()
278	  }).
279
280-define(ACTUAL_SEG_SIZE, (?SEGSZ*4)).
281
282-define(MAXBUD, 32).
283
284%%-define(DEBUGF(X,Y), io:format(X, Y)).
285-define(DEBUGF(X,Y), void).
286
287%% -> ok | throw({NewHead,Error})
288mark_dirty(Head) ->
289    Dirty = [{?CLOSED_PROPERLY_POS, <<?NOT_PROPERLY_CLOSED:32>>}],
290    {_H, ok} = dets_utils:pwrite(Head, Dirty),
291    ok = dets_utils:sync(Head),
292    {ok, _Pos} = dets_utils:position(Head, Head#head.freelists_p),
293    dets_utils:truncate(Head, cur).
294
295%% -> {ok, head()} | throw(Error) | throw(badarg)
296prep_table_copy(Fd, Tab, Fname, Type, Kp, Ram, CacheSz, Auto, Parms) ->
297    case Parms of
298	#?HASH_PARMS{file_format_version = ?FILE_FORMAT_VERSION,
299		     bchunk_format_version = ?BCHUNK_FORMAT_VERSION,
300		     n = N, m = M, next = Next,
301		     min = Min, max = Max,
302		     hash_method = HashMethodCode,
303		     no_objects = NoObjects, no_keys = NoKeys,
304		     no_colls = _NoColls}
305	        when is_integer(N), is_integer(M), is_integer(Next),
306		     is_integer(Min), is_integer(Max),
307		     is_integer(NoObjects), is_integer(NoKeys),
308		     NoObjects >= NoKeys ->
309            HashMethod = code_to_hash_method(HashMethodCode),
310	    case hash_invars(N, M, Next, Min, Max) of
311		false ->
312		    throw(badarg);
313		true ->
314		    init_file(Fd, Tab, Fname, Type, Kp, Min, Max, Ram,
315			      CacheSz, Auto, false, M, N, Next, HashMethod,
316			      NoObjects, NoKeys)
317	    end;
318	_ ->
319	    throw(badarg)
320    end.
321
322%% -> {ok, head()} | throw(Error)
323%% The File header and the SegmentArray Pointers are written here.
324%% SegmentArray Parts are also written, but the segments are are not
325%% initialized on file unless DoInitSegments is 'true'. (When
326%% initializing a file by calling init_table, some time is saved by
327%% not writing the segments twice.)
328initiate_file(Fd, Tab, Fname, Type, Kp, MinSlots0, MaxSlots0,
329	      Ram, CacheSz, Auto, DoInitSegments) ->
330    MaxSlots1 = erlang:min(MaxSlots0, ?MAXSLOTS),
331    MinSlots1 = erlang:min(MinSlots0, MaxSlots1),
332    MinSlots = slots2(MinSlots1),
333    MaxSlots = slots2(MaxSlots1),
334    M = Next = MinSlots,
335    N = 0,
336    init_file(Fd, Tab, Fname, Type, Kp, MinSlots, MaxSlots, Ram, CacheSz,
337	      Auto, DoInitSegments, M, N, Next, phash2, 0, 0).
338
339init_file(Fd, Tab, Fname, Type, Kp, MinSlots, MaxSlots, Ram, CacheSz,
340	  Auto, DoInitSegments, M, N, Next, HashMethod, NoObjects, NoKeys) ->
341    Ftab = dets_utils:init_alloc(?BASE),
342
343    Head0 = #head{
344      m  = M,
345      m2 = M * 2,
346      next = Next,
347      fptr = Fd,
348      no_objects = NoObjects,
349      no_keys = NoKeys,
350      maxobjsize = 0,
351      n = N,
352      type = Type,
353      update_mode = dirty,
354      freelists = Ftab,
355      no_collections = orddict:new(),
356      auto_save = Auto,
357      hash_bif = HashMethod,
358      has_md5 = true,
359      keypos = Kp,
360      min_no_slots = MinSlots,
361      max_no_slots = MaxSlots,
362
363      ram_file = Ram,
364      filename = Fname,
365      name = Tab,
366      cache = dets_utils:new_cache(CacheSz),
367      bump = ?BUMP,
368      base = ?BASE % to be overwritten
369     },
370
371    FreeListsPointer = 0,
372    NoColls = <<0:?COLL_CNTRS/unit:8>>, %% Buddy system counters.
373    FileHeader = file_header(Head0, FreeListsPointer,
374                             ?NOT_PROPERLY_CLOSED, NoColls),
375    W0 = {0, [FileHeader |
376              <<0:(4*?SEGARRSZ)/unit:8>>]},  %% SegmentArray Pointers
377
378    %% Remove cached pointers to segment array parts and segments:
379    lists:foreach(fun({I1,I2}) when is_integer(I1), is_integer(I2) -> ok;
380		     ({K,V}) -> put(K, V)
381		  end, erase()),
382
383    %% Initialize array parts.
384    %% All parts before segments, for the sake of repair and initialization.
385    Zero = seg_zero(),
386    {Head1, Ws1} = init_parts(Head0, 0, no_parts(Next), Zero, []),
387    NoSegs = no_segs(Next),
388
389    {Head2, WsI, WsP} = init_segments(Head1, 0, NoSegs, Zero, [], []),
390    Ws2 = if
391	      DoInitSegments -> WsP ++ WsI;
392	      true -> WsP
393	 end,
394    dets_utils:pwrite(Fd, Fname, [W0 | lists:append(Ws1) ++ Ws2]),
395    true = hash_invars(Head2),
396    %% The allocations that have been made so far (parts, segments)
397    %% are permanent; the table will never shrink. Therefore the base
398    %% of the Buddy system can be set to the first free object.
399    %% This is used in allocate_all(), see below.
400    {_, Where, _} = dets_utils:alloc(Head2, ?BUMP),
401    NewFtab = dets_utils:init_alloc(Where),
402    Head = Head2#head{freelists = NewFtab, base = Where},
403    {ok, Head}.
404
405%% Returns a power of two not less than 256.
406slots2(NoSlots) when NoSlots >= 256 ->
407    ?POW(dets_utils:log2(NoSlots)).
408
409init_parts(Head, PartNo, NoParts, Zero, Ws) when PartNo < NoParts ->
410    PartPos = ?SEGARRADDR(PartNo),
411    {NewHead, W, _Part} = alloc_part(Head, Zero, PartPos),
412    init_parts(NewHead, PartNo+1, NoParts, Zero, [W | Ws]);
413init_parts(Head, _PartNo, _NoParts, _Zero, Ws) ->
414    {Head, Ws}.
415
416%% -> {Head, SegInitList, OtherList};
417%% SegPtrList = SegInitList = pwrite_list().
418init_segments(Head, SegNo, NoSegs, SegZero, WsP, WsI) when SegNo < NoSegs ->
419    {NewHead, WI, Ws} = allocate_segment(Head, SegZero, SegNo),
420    init_segments(NewHead, SegNo+1, NoSegs, SegZero, Ws ++ WsP, [WI | WsI]);
421init_segments(Head, _SegNo, _NoSegs, _SegZero, WsP, WsI) ->
422    {Head, WsI, WsP}.
423
424%% -> {NewHead, SegInit, [SegPtr | PartStuff]}
425allocate_segment(Head, SegZero, SegNo) ->
426    PartPos = ?SEGARRADDR(SegNo div ?SEGPARTSZ),
427    case get_arrpart(PartPos) of
428	undefined ->
429	    %% may throw error:
430	    {Head1, [InitArrPart, ArrPartPointer], Part} =
431		alloc_part(Head, SegZero, PartPos),
432	    {NewHead, InitSegment, [SegPointer]} =
433                alloc_seg(Head1, SegZero, SegNo, Part),
434	    {NewHead, InitSegment, [InitArrPart, SegPointer, ArrPartPointer]};
435	Part ->
436            alloc_seg(Head, SegZero, SegNo, Part)
437    end.
438
439alloc_part(Head, PartZero, PartPos) ->
440    %% may throw error:
441    {NewHead, Part, _} = dets_utils:alloc(Head, adjsz(4 * ?SEGPARTSZ)),
442    arrpart_cache(PartPos, Part),
443    InitArrPart = {Part, PartZero}, % same size as segment
444    ArrPartPointer = {PartPos, <<Part:32>>},
445    {NewHead, [InitArrPart, ArrPartPointer], Part}.
446
447alloc_seg(Head, SegZero, SegNo, Part) ->
448    %% may throw error:
449    {NewHead, Segment, _} = dets_utils:alloc(Head, adjsz(4 * ?SEGSZ)),
450    InitSegment = {Segment, SegZero},
451    Pos = ?SEGPARTADDR(Part, SegNo),
452    segp_cache(Pos, Segment),
453    dets_utils:disk_map_segment(Segment, SegZero),
454    SegPointer = {Pos, <<Segment:32>>},
455    {NewHead, InitSegment, [SegPointer]}.
456
457%% Read free lists (using a Buddy System) from file.
458init_freelist(Head) ->
459    Pos = Head#head.freelists_p,
460    free_lists_from_file(Head, Pos).
461
462%% -> {ok, Fd, fileheader()} | throw(Error)
463read_file_header(Fd, FileName) ->
464    {ok, Bin} = dets_utils:pread_close(Fd, FileName, 0, ?HEADSZ),
465    <<FreeList:32,   Cookie:32,  CP:32,         Type2:32,
466      Version:32,    M:32,       Next:32,       Kp:32,
467      NoObjects:32,  NoKeys:32,  MinNoSlots:32, MaxNoSlots:32,
468      HashMethod:32, N:32, NoCollsB:?COLL_CNTRS/binary,
469      MD5:?MD5SZ/binary, FlBase:32>> = Bin,
470    <<_:12/binary,MD5DigestedPart:(?HEADSZ-?MD5SZ-?FL_BASE-12)/binary,
471      _/binary>> = Bin,
472    {ok, EOF} = dets_utils:position_close(Fd, FileName, eof),
473    {ok, <<FileSize:32>>} = dets_utils:pread_close(Fd, FileName, EOF-4, 4),
474    {CL, <<>>} = lists:foldl(fun(LSz, {Acc,<<NN:32,R/binary>>}) ->
475				     if
476					 NN =:= 0 -> {Acc, R};
477					 true -> {[{LSz,NN} | Acc], R}
478				     end
479			     end, {[], NoCollsB}, lists:seq(4, ?MAXBUD-1)),
480    NoColls =
481	if
482	    CL =:= [], NoObjects > 0 -> % Version 9(a)
483		undefined;
484	    true ->
485		lists:reverse(CL)
486	end,
487    Base = case FlBase of
488               0 -> ?BASE;
489               _ -> FlBase
490           end,
491    FH = #fileheader{freelist = FreeList,
492                     fl_base = Base,
493		     cookie = Cookie,
494		     closed_properly = CP,
495		     type = dets_utils:code_to_type(Type2),
496		     version = Version,
497		     m = M,
498		     next = Next,
499		     keypos = Kp,
500		     no_objects = NoObjects,
501		     no_keys = NoKeys,
502		     min_no_slots = MinNoSlots,
503		     max_no_slots = MaxNoSlots,
504		     no_colls = NoColls,
505		     hash_method = HashMethod,
506                     read_md5 = MD5,
507                     has_md5 = <<0:?MD5SZ/unit:8>> =/= MD5,
508                     md5 = erlang:md5(MD5DigestedPart),
509		     trailer = FileSize + FlBase,
510		     eof = EOF,
511		     n = N},
512    {ok, Fd, FH}.
513
514%% -> {ok, head()} | {error, Reason} (Reason lacking file name)
515check_file_header(FH, Fd) ->
516    HashBif = code_to_hash_method(FH#fileheader.hash_method),
517    Test =
518	if
519	    FH#fileheader.cookie =/= ?MAGIC ->
520		{error, not_a_dets_file};
521	    FH#fileheader.type =:= badtype ->
522		{error, invalid_type_code};
523	    FH#fileheader.version =/= ?FILE_FORMAT_VERSION ->
524                {error, bad_version};
525            FH#fileheader.has_md5,
526            FH#fileheader.read_md5 =/= FH#fileheader.md5 ->
527                {error, not_a_dets_file}; % harsh but fair
528	    FH#fileheader.trailer =/= FH#fileheader.eof ->
529		{error, not_closed};
530	    HashBif =:= undefined ->
531		{error, bad_hash_bif};
532	    FH#fileheader.closed_properly =:= ?CLOSED_PROPERLY ->
533		ok;
534	    FH#fileheader.closed_properly =:= ?NOT_PROPERLY_CLOSED ->
535		{error, not_closed};
536	    true ->
537		{error, not_a_dets_file}
538	end,
539    case Test of
540	ok ->
541            MaxObjSize = max_objsize(FH#fileheader.no_colls),
542	    H = #head{
543	      m = FH#fileheader.m,
544	      m2 = FH#fileheader.m * 2,
545	      next = FH#fileheader.next,
546	      fptr = Fd,
547	      no_objects = FH#fileheader.no_objects,
548	      no_keys = FH#fileheader.no_keys,
549	      maxobjsize = MaxObjSize,
550	      n = FH#fileheader.n,
551	      type = FH#fileheader.type,
552	      update_mode = saved,
553	      auto_save = infinity,             % not saved on file
554	      fixed = false,			% not saved on file
555	      freelists_p = FH#fileheader.freelist,
556	      hash_bif = HashBif,
557              has_md5 = FH#fileheader.has_md5,
558	      keypos = FH#fileheader.keypos,
559	      min_no_slots = FH#fileheader.min_no_slots,
560	      max_no_slots = FH#fileheader.max_no_slots,
561	      no_collections = FH#fileheader.no_colls,
562	      bump = ?BUMP,
563	      base = FH#fileheader.fl_base},
564	    {ok, H};
565	Error ->
566	    Error
567    end.
568
569%% Inlined.
570max_objsize(NoColls = undefined) ->
571    NoColls;
572max_objsize(NoColls) ->
573    max_objsize(NoColls, 0).
574
575max_objsize([], Max) ->
576    Max;
577max_objsize([{_,0} | L], Max) ->
578    max_objsize(L, Max);
579max_objsize([{I,_} | L], _Max) ->
580    max_objsize(L, I).
581
582cache_segps(Fd, FileName, M) ->
583    NoParts = no_parts(M),
584    ArrStart = ?SEGARRADDR(0),
585    {ok, Bin} = dets_utils:pread_close(Fd, FileName, ArrStart, 4 * NoParts),
586    cache_arrparts(Bin, ?HEADEND, Fd, FileName).
587
588cache_arrparts(<<ArrPartPos:32, B/binary>>, Pos, Fd, FileName) ->
589    arrpart_cache(Pos, ArrPartPos),
590    {ok, ArrPartBin} = dets_utils:pread_close(Fd, FileName,
591                                              ArrPartPos,
592                                              ?SEGPARTSZ*4),
593    cache_segps1(Fd, ArrPartBin, ArrPartPos),
594    cache_arrparts(B, Pos+4, Fd, FileName);
595cache_arrparts(<<>>, _Pos, _Fd, _FileName) ->
596    ok.
597
598cache_segps1(_Fd, <<0:32,_/binary>>, _P) ->
599    ok;
600cache_segps1(Fd, <<S:32,B/binary>>, P) ->
601    dets_utils:disk_map_segment_p(Fd, S),
602    segp_cache(P, S),
603    cache_segps1(Fd, B, P+4);
604cache_segps1(_Fd, <<>>, _P) ->
605    ok.
606
607no_parts(NoSlots) ->
608    ((NoSlots - 1) div (?SEGSZP * ?SEGPARTSZ)) + 1.
609
610no_segs(NoSlots) ->
611    ((NoSlots - 1) div ?SEGSZP) + 1.
612
613%%%
614%%% Repair, conversion and initialization of a dets file.
615%%%
616
617%%% bulk_input/3. Initialization, the general case (any stream of objects).
618%%% output_objs/3. Initialization (general case) and repair.
619%%% bchunk_init/2. Initialization using bchunk.
620
621bulk_input(Head, InitFun, _Cntrs) ->
622    bulk_input(Head, InitFun, make_ref(), 0).
623
624bulk_input(Head, InitFun, Ref, Seq) ->
625    fun(close) ->
626	    _ = (catch InitFun(close));
627       (read) ->
628	    case catch {Ref, InitFun(read)} of
629		{Ref, end_of_input} ->
630		    end_of_input;
631		{Ref, {L0, NewInitFun}} when is_list(L0),
632                                             is_function(NewInitFun) ->
633		    Kp = Head#head.keypos,
634		    case catch bulk_objects(L0, Head, Kp, Seq, []) of
635			{'EXIT', _Error} ->
636			    _ = (catch NewInitFun(close)),
637			    {error, invalid_objects_list};
638			{L, NSeq} ->
639			    {L, bulk_input(Head, NewInitFun, Ref, NSeq)}
640		    end;
641		{Ref, Value} ->
642		    {error, {init_fun, Value}};
643		Error ->
644		    throw({thrown, Error})
645	    end
646    end.
647
648bulk_objects([T | Ts], Head, Kp, Seq, L) ->
649    BT = term_to_binary(T),
650    Key = element(Kp, T),
651    bulk_objects(Ts, Head, Kp, Seq+1, [make_object(Head, Key, Seq, BT) | L]);
652bulk_objects([], _Head, Kp, Seq, L) when is_integer(Kp), is_integer(Seq) ->
653    {L, Seq}.
654
655-define(FSCK_SEGMENT, 1).
656-define(FSCK_SEGMENT2, 10000).
657
658-define(VEMPTY, {}).
659-define(VSET(I, V, E), setelement(I, V, E)).
660-define(VGET(I, V), element(I, V)).
661-define(VEXT(S, V, T),
662     list_to_tuple(tuple_to_list(V) ++ lists:duplicate(S-tuple_size(V), T))).
663
664%% Number of bytes that will be handled before the cache is written to
665%% file. Used when compacting or writing chunks.
666-define(CACHE_SIZE, (60*?CHUNK_SIZE)).
667
668%% {LogSize,NoObjects} in Cntrs is replaced by
669%% {LogSize,Position,{FileName,FileDescriptor},NoCollections}.
670%% There is also an object {no, NoObjects, NoKeys}.
671-define(COUNTERS, no).
672-define(OBJ_COUNTER, 2).
673-define(KEY_COUNTER, 3).
674
675output_objs(Head, SlotNums, Cntrs) ->
676    fun(close) ->
677            %% Make sure that the segments are initialized in case
678            %% init_table has been called.
679	    Cache = ?VEMPTY,
680            Acc = [], % This is the only way Acc can be empty.
681            true = ets:insert(Cntrs, {?FSCK_SEGMENT,0,[],0}),
682	    true = ets:insert(Cntrs, {?COUNTERS, 0, 0}),
683            Fun = output_objs2(foo, Acc, Head, Cache, Cntrs,
684			       SlotNums, bar),
685            Fun(close);
686       ([]) ->
687	    output_objs(Head, SlotNums, Cntrs);
688       (L) ->
689	    %% Information about number of objects per size is not
690	    %% relevant for version 9. It is the number of collections
691	    %% that matters.
692            true = ets:delete_all_objects(Cntrs),
693	    true = ets:insert(Cntrs, {?COUNTERS, 0, 0}),
694	    Es = bin2term(L, Head#head.keypos),
695	    %% The cache is a tuple indexed by the (log) size. An element
696	    %% is [BinaryObject].
697	    Cache = ?VEMPTY,
698	    {NE, NAcc, NCache} = output_slots(Es, Head, Cache, Cntrs, 0, 0),
699	    output_objs2(NE, NAcc, Head, NCache, Cntrs, SlotNums, 1)
700    end.
701
702output_objs2(E, Acc, Head, Cache, SizeT, SlotNums, 0) ->
703    NCache = write_all_sizes(Cache, SizeT, Head, more),
704    %% Number of handled file_sorter chunks before writing:
705    Max = erlang:max(1, erlang:min(tuple_size(NCache), 10)),
706    output_objs2(E, Acc, Head, NCache, SizeT, SlotNums, Max);
707output_objs2(E, Acc, Head, Cache, SizeT, SlotNums, ChunkI) ->
708    fun(close) ->
709            {_, [], Cache1} =
710                 if
711		     Acc =:= [] -> {foo, [], Cache};
712		     true -> output_slot(Acc, Head, Cache, [], SizeT, 0, 0)
713		 end,
714	    _NCache = write_all_sizes(Cache1, SizeT, Head, no_more),
715	    SegSz = ?ACTUAL_SEG_SIZE,
716	    {_, SegEnd, _} = dets_utils:alloc(Head, adjsz(SegSz)),
717	    [{?COUNTERS,NoObjects,NoKeys}] = ets:lookup(SizeT, ?COUNTERS),
718	    Head1 = Head#head{no_objects = NoObjects, no_keys = NoKeys},
719	    true = ets:delete(SizeT, ?COUNTERS),
720	    {NewHead, NL, _MaxSz, _End} = allocate_all_objects(Head1, SizeT),
721	    %% It is not known until all objects have been collected
722	    %% how many object collections there are per size. Now
723	    %% that is known and the absolute positions of the object
724	    %% collections can be calculated.
725            segment_file(SizeT, NewHead, NL, SegEnd),
726	    {MinSlots, EstNoSlots, MaxSlots} = SlotNums,
727	    if
728		EstNoSlots =:= bulk_init ->
729		    {ok, 0, NewHead};
730		true ->
731		    EstNoSegs = no_segs(EstNoSlots),
732		    MinNoSegs = no_segs(MinSlots),
733		    MaxNoSegs = no_segs(MaxSlots),
734		    NoSegs = no_segs(NoKeys),
735		    Diff = abs(NoSegs - EstNoSegs),
736		    if
737			Diff > 5, NoSegs =< MaxNoSegs, NoSegs >= MinNoSegs  ->
738			    {try_again, NoKeys};
739			true ->
740			    {ok, 0, NewHead}
741		    end
742	    end;
743       (L) ->
744	    Es = bin2term(L, Head#head.keypos),
745	    {NE, NAcc, NCache} =
746		output_slots(E, Es, Acc, Head, Cache, SizeT, 0, 0),
747	    output_objs2(NE, NAcc, Head, NCache, SizeT, SlotNums, ChunkI-1)
748    end.
749
750%%% Compaction.
751
752compact_init(ReadHead, WriteHead, TableParameters) ->
753    SizeT = ets:new(dets_compact, []),
754    #head{no_keys = NoKeys, no_objects = NoObjects} = ReadHead,
755
756    NoObjsPerSize = TableParameters#?HASH_PARMS.no_colls,
757    {NewWriteHead, Bases, SegAddr, SegEnd} =
758	prepare_file_init(NoObjects, NoKeys, NoObjsPerSize, SizeT, WriteHead),
759
760    Input = compact_input(ReadHead, NewWriteHead, SizeT, tuple_size(Bases)),
761    Output = fast_output(NewWriteHead, SizeT, Bases, SegAddr, SegEnd),
762    TmpDir = filename:dirname(NewWriteHead#head.filename),
763    Reply = (catch file_sorter:sort(Input, Output,
764				    [{format, binary},{tmpdir, TmpDir},
765				     {header, 1}])), % compact_objs/9: 13 bytes
766    ets:delete(SizeT),
767    Reply.
768
769compact_input(Head, WHead, SizeT, NoSizes) ->
770    L = dets_utils:all_allocated_as_list(Head),
771    Cache = ?VEXT(NoSizes, ?VEMPTY, [0 | []]),
772    compact_input(Head, WHead, SizeT, Cache, L).
773
774compact_input(Head, WHead, SizeT, Cache, L) ->
775    fun(close) ->
776	    ok;
777       (read) ->
778	    compact_read(Head, WHead, SizeT, Cache, L, 0, [], 0)
779    end.
780
781compact_read(_Head, WHead, SizeT, Cache, [], _Min, [], _ASz) ->
782    _ = fast_write_all_sizes(Cache, SizeT, WHead),
783    end_of_input;
784compact_read(Head, WHead, SizeT, Cache, L, Min, SegBs, ASz)
785            when ASz + Min >= ?CACHE_SIZE, ASz > 0 ->
786    NCache = fast_write_all_sizes(Cache, SizeT, WHead),
787    {SegBs, compact_input(Head, WHead, SizeT, NCache, L)};
788compact_read(Head, WHead, SizeT, Cache, [[From | To] | L], Min, SegBs, ASz) ->
789    Max = erlang:max(?CHUNK_SIZE*3, Min),
790    case check_pread_arg(Max, Head) of
791        true ->
792            case dets_utils:pread_n(Head#head.fptr, From, Max) of
793                eof ->
794                    %% Should never happen since compaction will not
795                    %% be tried unless the file trailer is valid.
796                    not_ok; % try a proper repair
797                Bin1 when byte_size(Bin1) < Min ->
798                    %% The last object may not be padded.
799                    Pad = Min - byte_size(Bin1),
800                    NewBin = <<Bin1/binary, 0:Pad/unit:8>>,
801                        compact_objs(Head, WHead, SizeT, NewBin, L,
802                                     From, To, SegBs, Cache, ASz);
803                NewBin ->
804                    compact_objs(Head, WHead, SizeT, NewBin, L,
805                                 From, To, SegBs, Cache, ASz)
806            end;
807        false ->
808            not_ok % try a proper repair
809    end.
810
811compact_objs(Head, WHead, SizeT, Bin, L, From, To, SegBs, Cache, ASz)
812                            when From =:= To ->
813    case L of
814	[] ->
815	    {SegBs, compact_input(Head, WHead, SizeT, Cache, L)};
816	[[From1 | To1] | L1] ->
817	    Skip1 = From1 - From,
818	    case Bin of
819		<<_:Skip1/binary,NewBin/binary>> ->
820		    compact_objs(Head, WHead, SizeT, NewBin, L1, From1, To1,
821				 SegBs, Cache, ASz);
822		_ when byte_size(Bin) < Skip1 ->
823		    compact_read(Head, WHead, SizeT, Cache, L, 0, SegBs, ASz)
824	    end
825    end;
826compact_objs(Head, WHead, SizeT, <<Size:32, St:32, _Sz:32, KO/binary>> = Bin,
827	     L, From, To, SegBs, Cache, ASz) when St =:= ?ACTIVE ->
828    LSize = sz2pos(Size),
829    Size2 = ?POW(LSize-1),
830    if
831	byte_size(Bin) >= Size2 ->
832	    NASz = ASz + Size2,
833	    <<SlotObjs:Size2/binary, NewBin/binary>> = Bin,
834	    Term = if
835		       Head#head.type =:= set ->
836			   binary_to_term(KO);
837		       true ->
838			   <<_KSz:32,B2/binary>> = KO,
839			   binary_to_term(B2)
840		   end,
841	    Key = element(Head#head.keypos, Term),
842	    Slot = db_hash(Key, Head),
843	    From1 = From + Size2,
844	    [Addr | AL] = ?VGET(LSize, Cache),
845	    NCache = ?VSET(LSize, Cache, [Addr + Size2 | [SlotObjs | AL]]),
846	    NSegBs = [<<Slot:32,Size:32,Addr:32,LSize:8>> | SegBs],
847	    compact_objs(Head, WHead, SizeT, NewBin, L, From1,
848			 To, NSegBs, NCache, NASz);
849	true ->
850	    compact_read(Head, WHead, SizeT, Cache, [[From|To] | L],
851			 Size2, SegBs, ASz)
852    end;
853compact_objs(Head, WHead, SizeT, <<_:32, _St:32, _:32, _/binary>> = Bin,
854	     L, From, To, SegBs, Cache, ASz)
855              when byte_size(Bin) >= ?ACTUAL_SEG_SIZE -> % , _St =/= ?ACTIVE
856    <<_:?ACTUAL_SEG_SIZE/binary, NewBin/binary>> = Bin,
857    compact_objs(Head, WHead, SizeT, NewBin, L, From + ?ACTUAL_SEG_SIZE,
858		 To, SegBs, Cache, ASz);
859compact_objs(Head, WHead, SizeT, <<_:32, _St:32, _:32, _/binary>> = Bin,
860	     L, From, To, SegBs, Cache, ASz)
861              when byte_size(Bin) < ?ACTUAL_SEG_SIZE -> % , _St =/= ?ACTIVE
862    compact_read(Head, WHead, SizeT, Cache, [[From|To] | L],
863		 ?ACTUAL_SEG_SIZE, SegBs, ASz);
864compact_objs(Head, WHead, SizeT, _Bin, L, From, To, SegBs, Cache, ASz) ->
865    compact_read(Head, WHead, SizeT, Cache, [[From|To] | L], 0, SegBs, ASz).
866
867%%% End compaction.
868
869%%% Bchunk.
870
871read_bchunks(Head, L) ->
872    read_bchunks(Head, L, 0, [], 0).
873
874read_bchunks(_Head, L, Min, Bs, ASz) when ASz + Min >= 4*?CHUNK_SIZE,
875					  Bs =/= [] ->
876    {lists:reverse(Bs), L};
877read_bchunks(Head, {From, To, L}, Min, Bs, ASz) ->
878    Max = erlang:max(?CHUNK_SIZE*2, Min),
879    case check_pread_arg(Max, Head) of
880        true ->
881            case dets_utils:pread_n(Head#head.fptr, From, Max) of
882                eof ->
883                    %% Should never happen.
884                    {error, premature_eof};
885                NewBin when byte_size(NewBin) >= Min ->
886                    bchunks(Head, L, NewBin, Bs, ASz, From, To);
887                Bin1 when To - From =:= Min, L =:= <<>> ->
888                    %% when byte_size(Bin1) < Min.
889                    %% The last object may not be padded.
890                    Pad = Min - byte_size(Bin1),
891                    NewBin = <<Bin1/binary, 0:Pad/unit:8>>,
892                    bchunks(Head, L, NewBin, Bs, ASz, From, To);
893                _ ->
894                    {error, premature_eof}
895            end;
896        false ->
897            {error, dets_utils:bad_object(bad_object, {read_bchunks, Max})}
898    end.
899
900bchunks(Head, L, Bin, Bs, ASz, From, To) when From =:= To ->
901    if
902	L =:= <<>> ->
903	    {finished, lists:reverse(Bs)};
904	true ->
905	    <<From1:32, To1:32, L1/binary>> = L,
906	    Skip1 = From1 - From,
907	    case Bin of
908		<<_:Skip1/binary,NewBin/binary>> ->
909		    bchunks(Head, L1, NewBin, Bs, ASz, From1, To1);
910		_ when byte_size(Bin) < Skip1 ->
911		    read_bchunks(Head, {From1,To1,L1}, 0, Bs, ASz)
912	    end
913    end;
914bchunks(Head, L, <<Size:32, St:32, _Sz:32, KO/binary>> = Bin, Bs, ASz,
915	  From, To) when St =:= ?ACTIVE; St =:= ?FREE ->
916    LSize = sz2pos(Size),
917    Size2 = ?POW(LSize-1),
918    if
919	byte_size(Bin) >= Size2 ->
920	    <<B0:Size2/binary, NewBin/binary>> = Bin,
921	    %% LSize and Slot are used in make_slots/6. The reason to
922	    %% calculate Slot here is to reduce the CPU load in
923	    %% make_slots/6.
924	    Term = if
925		       Head#head.type =:= set ->
926			   binary_to_term(KO);
927		       true ->
928			   <<_KSz:32,B2/binary>> = KO,
929			   binary_to_term(B2)
930		   end,
931	    Key = element(Head#head.keypos, Term),
932	    Slot = db_hash(Key, Head),
933	    B = {LSize,Slot,B0},
934	    bchunks(Head, L, NewBin, [B | Bs], ASz + Size2, From+Size2, To);
935	true ->
936	    read_bchunks(Head, {From, To, L}, Size2, Bs, ASz)
937    end;
938bchunks(Head, L, <<_:32, _St:32, _:32, _/binary>> = Bin, Bs, ASz, From, To)
939                  when byte_size(Bin) >= ?ACTUAL_SEG_SIZE ->
940    <<_:?ACTUAL_SEG_SIZE/binary, NewBin/binary>> = Bin,
941    bchunks(Head, L, NewBin, Bs, ASz, From + ?ACTUAL_SEG_SIZE, To);
942bchunks(Head, L, <<_:32, _St:32, _:32, _/binary>> = Bin, Bs, ASz, From, To)
943                  when byte_size(Bin) < ?ACTUAL_SEG_SIZE ->
944    read_bchunks(Head, {From, To, L}, ?ACTUAL_SEG_SIZE, Bs, ASz);
945bchunks(Head, L, _Bin, Bs, ASz, From, To) ->
946    read_bchunks(Head, {From, To, L}, 0, Bs, ASz).
947
948%%% End bchunk.
949
950%% -> {ok, NewHead} | throw(Error) | Error
951bchunk_init(Head, InitFun) ->
952    Ref = make_ref(),
953    %% The non-empty list of data begins with the table parameters.
954    case catch {Ref, InitFun(read)} of
955	{Ref, end_of_input} ->
956	    {error, {init_fun, end_of_input}};
957	{Ref, {[], NInitFun}} when is_function(NInitFun) ->
958	    bchunk_init(Head, NInitFun);
959	{Ref, {[ParmsBin | L], NInitFun}}
960	                when is_list(L), is_function(NInitFun) ->
961	    #head{fptr = Fd, type = Type, keypos = Kp,
962		  auto_save = Auto, cache = Cache,
963		  filename = Fname, ram_file = Ram,
964		  name = Tab} = Head,
965            case try_bchunk_header(ParmsBin, Head) of
966                {ok, Parms} ->
967		    #?HASH_PARMS{no_objects = NoObjects,
968				 no_keys = NoKeys,
969				 no_colls = NoObjsPerSize} = Parms,
970		    CacheSz = dets_utils:cache_size(Cache),
971		    {ok, Head1} =
972			prep_table_copy(Fd, Tab, Fname, Type,
973					Kp, Ram, CacheSz,
974					Auto, Parms),
975		    SizeT = ets:new(dets_init, []),
976		    {NewHead, Bases, SegAddr, SegEnd} =
977			prepare_file_init(NoObjects, NoKeys,
978					  NoObjsPerSize, SizeT, Head1),
979		    ECache = ?VEXT(tuple_size(Bases), ?VEMPTY, [0 | []]),
980		    Input =
981			fun(close) ->
982				_ = (catch NInitFun(close));
983			   (read) ->
984				do_make_slots(L, ECache, SizeT, NewHead, Ref,
985					      0, NInitFun)
986			end,
987		    Output = fast_output(NewHead, SizeT, Bases, SegAddr,SegEnd),
988		    TmpDir = filename:dirname(Head#head.filename),
989		    Reply = (catch file_sorter:sort(Input, Output,
990						    [{format, binary},
991						     {tmpdir, TmpDir},
992						     {header, 1}])),
993		    ets:delete(SizeT),
994		    Reply;
995		not_ok ->
996		    {error, {init_fun, ParmsBin}}
997	    end;
998	{Ref, Value} ->
999	    {error, {init_fun, Value}};
1000	Error ->
1001	    {thrown, Error}
1002    end.
1003
1004try_bchunk_header(ParmsBin, Head) ->
1005    #head{type = Type, keypos = Kp, hash_bif = HashBif} = Head,
1006    HashMethod = hash_method_to_code(HashBif),
1007    case catch binary_to_term(ParmsBin) of
1008        Parms when is_record(Parms, ?HASH_PARMS),
1009                   Parms#?HASH_PARMS.type =:= Type,
1010                   Parms#?HASH_PARMS.keypos =:= Kp,
1011                   Parms#?HASH_PARMS.hash_method =:= HashMethod,
1012                   Parms#?HASH_PARMS.bchunk_format_version =:=
1013                         ?BCHUNK_FORMAT_VERSION ->
1014            {ok, Parms};
1015        _ ->
1016            not_ok
1017    end.
1018
1019bchunk_input(InitFun, SizeT, Head, Ref, Cache, ASz) ->
1020    fun(close) ->
1021            _ = (catch InitFun(close));
1022       (read) ->
1023	    case catch {Ref, InitFun(read)} of
1024		{Ref, end_of_input} ->
1025		    _ = fast_write_all_sizes(Cache, SizeT, Head),
1026		    end_of_input;
1027		{Ref, {L, NInitFun}} when is_list(L), is_function(NInitFun) ->
1028		    do_make_slots(L, Cache, SizeT, Head, Ref, ASz,
1029				  NInitFun);
1030		{Ref, Value} ->
1031		    {error, {init_fun, Value}};
1032		Error ->
1033		    throw({thrown, Error})
1034	    end
1035    end.
1036
1037do_make_slots(L, Cache, SizeT, Head, Ref, ASz, InitFun) ->
1038    case catch make_slots(L, Cache, [], ASz) of
1039	{'EXIT', _} ->
1040	    _ = (catch InitFun(close)),
1041	    {error, invalid_objects_list};
1042	{Cache1, SegBs, NASz} when NASz > ?CACHE_SIZE ->
1043	    NCache = fast_write_all_sizes(Cache1, SizeT, Head),
1044	    F = bchunk_input(InitFun, SizeT, Head, Ref, NCache, 0),
1045	    {SegBs, F};
1046	{NCache, SegBs, NASz} ->
1047	    F = bchunk_input(InitFun, SizeT, Head, Ref, NCache, NASz),
1048	    {SegBs, F}
1049    end.
1050
1051make_slots([{LSize,Slot,<<Size:32, St:32, Sz:32, KO/binary>> = Bin0} | Bins],
1052	   Cache, SegBs, ASz) ->
1053    Bin = if
1054	      St =:= ?ACTIVE ->
1055		  Bin0;
1056	      St =:= ?FREE ->
1057		  <<Size:32,?ACTIVE:32,Sz:32,KO/binary>>
1058          end,
1059    BSz = byte_size(Bin0),
1060    true = (BSz =:= ?POW(LSize-1)),
1061    NASz = ASz + BSz,
1062    [Addr | L] = ?VGET(LSize, Cache),
1063    NSegBs = [<<Slot:32,Size:32,Addr:32,LSize:8>> | SegBs],
1064    NCache = ?VSET(LSize, Cache, [Addr + BSz | [Bin | L]]),
1065    make_slots(Bins, NCache, NSegBs, NASz);
1066make_slots([], Cache, SegBs, ASz) ->
1067    {Cache, SegBs, ASz}.
1068
1069fast_output(Head, SizeT, Bases, SegAddr, SegEnd) ->
1070    fun(close) ->
1071	    fast_output_end(Head, SizeT);
1072       (L) ->
1073	    case file:position(Head#head.fptr, SegAddr) of
1074		{ok, SegAddr} ->
1075		    NewSegAddr = write_segment_file(L, Bases, Head, [],
1076						    SegAddr, SegAddr),
1077		    fast_output2(Head, SizeT, Bases, NewSegAddr,
1078				 SegAddr, SegEnd);
1079		Error ->
1080		    catch dets_utils:file_error(Error, Head#head.filename)
1081	    end
1082    end.
1083
1084fast_output2(Head, SizeT, Bases, SegAddr, SS, SegEnd) ->
1085    fun(close) ->
1086	    FinalZ = SegEnd - SegAddr,
1087	    dets_utils:write(Head, dets_utils:make_zeros(FinalZ)),
1088	    fast_output_end(Head, SizeT);
1089       (L) ->
1090            NewSegAddr = write_segment_file(L, Bases, Head, [], SegAddr, SS),
1091            fast_output2(Head, SizeT, Bases, NewSegAddr, SS, SegEnd)
1092    end.
1093
1094fast_output_end(Head, SizeT) ->
1095    case ets:foldl(fun({_Sz,_Pos,Cnt,NoC}, Acc) -> (Cnt =:= NoC) and Acc end,
1096		   true, SizeT) of
1097	true -> {ok, Head};
1098	false -> {error, invalid_objects_list}
1099    end.
1100
1101%% Inlined.
1102write_segment_file([<<Slot:32,BSize:32,AddrToBe:32,LSize:8>> | Bins],
1103		   Bases, Head, Ws, SegAddr, SS) ->
1104    %% Should call slot_position/1, but since all segments are
1105    %% allocated in a sequence, the position of a slot can be
1106    %% calculated faster.
1107    Pos = SS + ?SZOBJP*4 * Slot, % Same as Pos = slot_position(Slot).
1108    write_segment_file(Bins, Bases, Head, Ws, SegAddr, SS, Pos,
1109		       BSize, AddrToBe, LSize);
1110write_segment_file([], _Bases, Head, Ws, SegAddr, _SS) ->
1111    dets_utils:write(Head, Ws),
1112    SegAddr.
1113
1114write_segment_file(Bins, Bases, Head, Ws, SegAddr, SS, Pos, BSize,
1115		   AddrToBe, LSize) when Pos =:= SegAddr ->
1116    Addr = AddrToBe + element(LSize, Bases),
1117    NWs = [Ws | <<BSize:32,Addr:32>>],
1118    write_segment_file(Bins, Bases, Head, NWs, SegAddr + ?SZOBJP*4, SS);
1119write_segment_file(Bins, Bases, Head, Ws, SegAddr, SS, Pos, BSize,
1120		   AddrToBe, LSize) when Pos - SegAddr < 100 ->
1121    Addr = AddrToBe + element(LSize, Bases),
1122    NoZeros = Pos - SegAddr,
1123    NWs = [Ws | <<0:NoZeros/unit:8,BSize:32,Addr:32>>],
1124    NSegAddr = SegAddr + NoZeros + ?SZOBJP*4,
1125    write_segment_file(Bins, Bases, Head, NWs, NSegAddr, SS);
1126write_segment_file(Bins, Bases, Head, Ws, SegAddr, SS, Pos, BSize,
1127		   AddrToBe, LSize) ->
1128    Addr = AddrToBe + element(LSize, Bases),
1129    NoZeros = Pos - SegAddr,
1130    NWs = [Ws, dets_utils:make_zeros(NoZeros) | <<BSize:32,Addr:32>>],
1131    NSegAddr = SegAddr + NoZeros + ?SZOBJP*4,
1132    write_segment_file(Bins, Bases, Head, NWs, NSegAddr, SS).
1133
1134fast_write_all_sizes(Cache, SizeT, Head) ->
1135    CacheL = lists:reverse(tuple_to_list(Cache)),
1136    fast_write_sizes(CacheL, tuple_size(Cache), SizeT, Head, [], []).
1137
1138fast_write_sizes([], _Sz, _SizeT, Head, NCL, PwriteList) ->
1139    #head{filename = FileName, fptr = Fd} = Head,
1140    ok = dets_utils:pwrite(Fd, FileName, PwriteList),
1141    list_to_tuple(NCL);
1142fast_write_sizes([[_Addr] = C | CL], Sz, SizeT, Head, NCL, PwriteList) ->
1143    fast_write_sizes(CL, Sz-1, SizeT, Head, [C | NCL], PwriteList);
1144fast_write_sizes([[Addr | C] | CL], Sz, SizeT, Head, NCL, PwriteList) ->
1145    case ets:lookup(SizeT, Sz) of
1146	[] ->
1147	    throw({error, invalid_objects_list});
1148	[{Sz,Position,_ObjCounter,_NoCollections}] ->
1149	    %% Update ObjCounter:
1150	    NoColls = length(C),
1151	    _ = ets:update_counter(SizeT, Sz, {3, NoColls}),
1152	    Pos = Position + Addr - NoColls*?POW(Sz-1),
1153            fast_write_sizes(CL, Sz-1, SizeT, Head, [[Addr] | NCL],
1154			     [{Pos,lists:reverse(C)} | PwriteList])
1155    end.
1156
1157prepare_file_init(NoObjects, NoKeys, NoObjsPerSize, SizeT, Head) ->
1158    SegSz = ?ACTUAL_SEG_SIZE,
1159    {_, SegEnd, _} = dets_utils:alloc(Head, adjsz(SegSz)),
1160    Head1 = Head#head{no_objects = NoObjects, no_keys = NoKeys},
1161    true = ets:insert(SizeT, {?FSCK_SEGMENT,0,[],0}),
1162    lists:foreach(fun({LogSz,NoColls}) ->
1163			  true = ets:insert(SizeT, {LogSz+1,0,0,NoColls})
1164		  end, NoObjsPerSize),
1165    {NewHead, NL0, MaxSz, EndOfFile} = allocate_all_objects(Head1, SizeT),
1166    [{?FSCK_SEGMENT,SegAddr,[],0} | NL] = NL0,
1167    true = ets:delete_all_objects(SizeT),
1168    lists:foreach(fun(X) -> true = ets:insert(SizeT, X) end, NL),
1169    Bases = lists:foldl(fun({LSz,P,_D,_N}, A) -> setelement(LSz,A,P) end,
1170			erlang:make_tuple(MaxSz, 0), NL),
1171    Est = lists:foldl(fun({LSz,_,_,N}, A) -> A + ?POW(LSz-1)*N end, 0, NL),
1172    ok = write_bytes(NewHead, EndOfFile, Est),
1173    {NewHead, Bases, SegAddr, SegEnd}.
1174
1175%% Writes "zeros" to the file. This ensures that the file blocks are
1176%% allocated more or less contiguously, which reduces the seek times
1177%% to a minimum when the file is later read serially from beginning to
1178%% end (as is done when calling select and the like). A well-formed
1179%% file will be created also if nothing is written (as is the case for
1180%% small files, for efficiency).
1181write_bytes(_Head, _EndOfFile, Est) when Est < ?CACHE_SIZE ->
1182    ok;
1183write_bytes(Head, EndOfFile, _Est) ->
1184    Fd = Head#head.fptr,
1185    {ok, Start} = file:position(Fd, eof),
1186    BytesToWrite = EndOfFile - Start,
1187    SizeInKB = 64,
1188    Bin = list_to_binary(lists:duplicate(SizeInKB * 4, lists:seq(0, 255))),
1189    write_loop(Head, BytesToWrite, Bin).
1190
1191write_loop(Head, BytesToWrite, Bin) when BytesToWrite >= byte_size(Bin) ->
1192    case file:write(Head#head.fptr, Bin) of
1193	ok -> write_loop(Head, BytesToWrite - byte_size(Bin), Bin);
1194	Error -> dets_utils:file_error(Error, Head#head.filename)
1195    end;
1196write_loop(_Head, 0, _Bin) ->
1197    ok;
1198write_loop(Head, BytesToWrite, Bin) ->
1199    <<SmallBin:BytesToWrite/binary,_/binary>> = Bin,
1200    write_loop(Head, BytesToWrite, SmallBin).
1201
1202%% By allocating bigger objects before smaller ones, holes in the
1203%% buddy system memory map are avoided.
1204allocate_all_objects(Head, SizeT) ->
1205    DTL = lists:reverse(lists:keysort(1, ets:tab2list(SizeT))),
1206    MaxSz = element(1, hd(DTL)),
1207    {Head1, NL} = allocate_all(Head, DTL, []),
1208    %% Find the position that will be the end of the file by allocating
1209    %% a minimal object.
1210    {_Head, EndOfFile, _} = dets_utils:alloc(Head1, ?BUMP),
1211    NewHead = Head1#head{maxobjsize = max_objsize(Head1#head.no_collections)},
1212    {NewHead, NL, MaxSz, EndOfFile}.
1213
1214%% One (temporary) file for each buddy size, write all objects of that
1215%% size to the file.
1216%%
1217%% Before R15 a "hole" was needed before the first bucket if the size
1218%% of the biggest bucket was greater than the size of a segment. The
1219%% hole proved to be a problem with almost full tables with huge
1220%% buckets. Since R15 the hole is no longer needed due to the fact
1221%% that the base of the Buddy system is flexible.
1222allocate_all(Head, [{?FSCK_SEGMENT,_,Data,_}], L) ->
1223    %% And one file for the segments...
1224    %% Note that space for the array parts and the segments has
1225    %% already been allocated, but the segments have not been
1226    %% initialized on disk.
1227    NoParts = no_parts(Head#head.next),
1228    %% All parts first, ensured by init_segments/6.
1229    Addr = ?BASE + NoParts * 4 * ?SEGPARTSZ,
1230    {Head, [{?FSCK_SEGMENT,Addr,Data,0} | L]};
1231allocate_all(Head, [{LSize,_,Data,NoCollections} | DTL], L) ->
1232    Size = ?POW(LSize-1),
1233    {_Head, Addr, _} = dets_utils:alloc(Head, adjsz(Size)),
1234    Head1 = dets_utils:alloc_many(Head, Size, NoCollections, Addr),
1235    NoColls = Head1#head.no_collections,
1236    NewNoColls = orddict:update_counter(LSize-1, NoCollections, NoColls),
1237    NewHead = Head1#head{no_collections = NewNoColls},
1238    E = {LSize,Addr,Data,NoCollections},
1239    allocate_all(NewHead, DTL, [E | L]).
1240
1241bin2term(Bin, Kp) ->
1242    bin2term1(Bin, Kp, []).
1243
1244bin2term1([<<Slot:32, Seq:32, BinTerm/binary>> | BTs], Kp, L) ->
1245    Term = binary_to_term(BinTerm),
1246    Key = element(Kp, Term),
1247    bin2term1(BTs, Kp, [{Slot, Key, Seq, Term, BinTerm} | L]);
1248bin2term1([], _Kp, L) ->
1249    lists:reverse(L).
1250
1251write_all_sizes({}=Cache, _SizeT, _Head, _More) ->
1252    Cache;
1253write_all_sizes(Cache, SizeT, Head, More) ->
1254    CacheL = lists:reverse(tuple_to_list(Cache)),
1255    Sz = length(CacheL),
1256    NCL = case ets:info(SizeT, size) of
1257	      1 when More =:= no_more -> % COUNTERS only...
1258		  all_sizes(CacheL, Sz, SizeT);
1259	      _ ->
1260		  write_sizes(CacheL, Sz, SizeT, Head)
1261	  end,
1262    list_to_tuple(NCL).
1263
1264all_sizes([]=CL, _Sz, _SizeT) ->
1265    CL;
1266all_sizes([[]=C | CL], Sz, SizeT) ->
1267    [C | all_sizes(CL, Sz-1, SizeT)];
1268all_sizes([C0 | CL], Sz, SizeT) ->
1269    C = lists:reverse(C0),
1270    NoCollections = length(C),
1271    true = ets:insert(SizeT, {Sz,0,C,NoCollections}),
1272    [[] | all_sizes(CL, Sz-1, SizeT)].
1273
1274write_sizes([]=CL, _Sz, _SizeT, _Head) ->
1275    CL;
1276write_sizes([[]=C | CL], Sz, SizeT, Head) ->
1277    [C | write_sizes(CL, Sz-1, SizeT, Head)];
1278write_sizes([C | CL], Sz, SizeT, Head) ->
1279    {FileName, Fd} =
1280	case ets:lookup(SizeT, Sz) of
1281	    [] ->
1282		temp_file(Head, SizeT, Sz);
1283	    [{_,_,{FN,F},_}] ->
1284		{FN, F}
1285	end,
1286    NoCollections = length(C),
1287    _ = ets:update_counter(SizeT, Sz, {4,NoCollections}),
1288    case file:write(Fd, lists:reverse(C)) of
1289	ok ->
1290	    [[] | write_sizes(CL, Sz-1, SizeT, Head)];
1291	Error ->
1292	    dets_utils:file_error(FileName, Error)
1293    end.
1294
1295output_slots([E | Es], Head, Cache, SizeT, NoKeys, NoObjs) ->
1296    output_slots(E, Es, [E], Head, Cache, SizeT, NoKeys, NoObjs);
1297output_slots([], _Head, Cache, SizeT, NoKeys, NoObjs) ->
1298    _ = ets:update_counter(SizeT, ?COUNTERS, {?OBJ_COUNTER,NoObjs}),
1299    _ = ets:update_counter(SizeT, ?COUNTERS, {?KEY_COUNTER,NoKeys}),
1300    {not_a_tuple, [], Cache}.
1301
1302output_slots(E, [E1 | Es], Acc, Head, Cache, SizeT, NoKeys, NoObjs)
1303                       when element(1, E) =:= element(1, E1) ->
1304    output_slots(E1, Es, [E1 | Acc], Head, Cache, SizeT, NoKeys, NoObjs);
1305output_slots(E, [], Acc, _Head, Cache, SizeT, NoKeys, NoObjs) ->
1306    _ = ets:update_counter(SizeT, ?COUNTERS, {?OBJ_COUNTER,NoObjs}),
1307    _ = ets:update_counter(SizeT, ?COUNTERS, {?KEY_COUNTER,NoKeys}),
1308    {E, Acc, Cache};
1309output_slots(_E, L, Acc, Head, Cache, SizeT, NoKeys, NoObjs) ->
1310    output_slot(Acc, Head, Cache, L, SizeT, NoKeys, NoObjs).
1311
1312output_slot(Es, Head, Cache, L, SizeT, NoKeys, NoObjs) ->
1313    Slot = element(1, hd(Es)),
1314    %% Plain lists:sort/1 will do.
1315    {Bins, Size, No, KNo} = prep_slot(lists:sort(Es), Head),
1316    NNoKeys = NoKeys + KNo,
1317    NNoObjs = NoObjs + No,
1318
1319    %% First the object collection.
1320    BSize = Size + ?OHDSZ,
1321    LSize = sz2pos(BSize),
1322    Size2 = ?POW(LSize-1),
1323    Pad = <<0:(Size2-BSize)/unit:8>>,
1324    BinObject = [<<BSize:32, ?ACTIVE:32>>, Bins | Pad],
1325    Cache1 =
1326	if
1327	    LSize > tuple_size(Cache) ->
1328                C1 = ?VEXT(LSize, Cache, []),
1329		?VSET(LSize, C1, [BinObject]);
1330	    true ->
1331		CL = ?VGET(LSize, Cache),
1332		?VSET(LSize, Cache, [BinObject | CL])
1333	end,
1334
1335    %% Then the pointer to the object collection.
1336    %% Cannot yet determine the absolute pointers; segment_file/4 does that.
1337    PBin = <<Slot:32,BSize:32,LSize:8>>,
1338    PL = ?VGET(?FSCK_SEGMENT, Cache1),
1339    NCache = ?VSET(?FSCK_SEGMENT, Cache1, [PBin | PL]),
1340    output_slots(L, Head, NCache, SizeT, NNoKeys, NNoObjs).
1341
1342prep_slot(L, Head) when Head#head.type =/= set ->
1343    prep_slot(L, Head, []);
1344prep_slot([{_Slot,Key,_Seq,_T,BT} | L], _Head) ->
1345    prep_set_slot(L, Key, BT, 0, 0, 0, []).
1346
1347prep_slot([{_Slot, Key, Seq, T, _BT} | L], Head, W) ->
1348    prep_slot(L, Head, [{Key, {Seq, {insert,T}}} | W]);
1349prep_slot([], Head, W) ->
1350    WLs = dets_utils:family(W),
1351    {[], Bins, Size, No, KNo, _} =
1352	eval_slot(WLs, [], Head#head.type, [], [], 0, 0, 0, false),
1353    {Bins, Size, No, KNo}.
1354
1355%% Optimization, prep_slot/3 would work for set tables as well.
1356prep_set_slot([{_,K,_Seq,_T1,BT1} | L], K, _BT, Sz, NoKeys, NoObjs, Ws) ->
1357    prep_set_slot(L, K, BT1, Sz, NoKeys, NoObjs, Ws);
1358prep_set_slot([{_,K1,_Seq,_T1,BT1} | L], _K, BT, Sz, NoKeys, NoObjs, Ws) ->
1359    BSize = byte_size(BT) + 4,
1360    NWs = [Ws,<<BSize:32>>|BT],
1361    prep_set_slot(L, K1, BT1, Sz+BSize, NoKeys+1, NoObjs+1, NWs);
1362prep_set_slot([], _K, BT, Sz, NoKeys, NoObjs, Ws) ->
1363    BSize = byte_size(BT) + 4,
1364    {[Ws, <<BSize:32>> | BT], Sz + BSize, NoKeys+1, NoObjs+1}.
1365
1366segment_file(SizeT, Head, FileData, SegEnd) ->
1367    I = 2,
1368    true = ets:delete_all_objects(SizeT),
1369    lists:foreach(fun(X) -> true = ets:insert(SizeT, X) end, FileData),
1370    [{?FSCK_SEGMENT,SegAddr,Data,0} | FileData1] = FileData,
1371    NewData =
1372	case Data of
1373	    {InFile,In0} ->
1374		{OutFile, Out} = temp_file(Head, SizeT, I),
1375		_ = file:close(In0),
1376		{ok, In} = dets_utils:open(InFile, [raw,binary,read]),
1377		{ok, 0} = dets_utils:position(In, InFile, bof),
1378		seg_file(SegAddr, SegAddr, In, InFile, Out, OutFile, SizeT,
1379			 SegEnd),
1380		_ = file:close(In),
1381		_ = file:delete(InFile),
1382		{OutFile,Out};
1383	    Objects ->
1384		{LastAddr, B} = seg_file(Objects, SegAddr, SegAddr, SizeT, []),
1385                dets_utils:disk_map_segment(SegAddr, B),
1386		FinalZ = SegEnd - LastAddr,
1387		[B | dets_utils:make_zeros(FinalZ)]
1388	end,
1389    %% Restore the positions.
1390    true = ets:delete_all_objects(SizeT),
1391    %% To get the segments copied first by dets:fsck_copy/4, use a big
1392    %% number here, FSCK_SEGMENT2.
1393    lists:foreach(fun(X) -> true = ets:insert(SizeT, X) end,
1394		  [{?FSCK_SEGMENT2,SegAddr,NewData,0} | FileData1]),
1395    ok.
1396
1397seg_file(Addr, SS, In, InFile, Out, OutFile, SizeT, SegEnd) ->
1398    case dets_utils:read_n(In, 4500) of
1399	eof ->
1400	    FinalZ = SegEnd - Addr,
1401	    dets_utils:fwrite(Out, OutFile, dets_utils:make_zeros(FinalZ));
1402	Bin ->
1403	    {NewAddr, L} = seg_file(Bin, Addr, SS, SizeT, []),
1404            dets_utils:disk_map_segment(Addr, L),
1405	    ok = dets_utils:fwrite(Out, OutFile, L),
1406	    seg_file(NewAddr, SS, In, InFile, Out, OutFile, SizeT, SegEnd)
1407    end.
1408
1409seg_file(<<Slot:32,BSize:32,LSize:8,T/binary>>, Addr, SS, SizeT, L) ->
1410    seg_file_item(T, Addr, SS, SizeT, L, Slot, BSize, LSize);
1411seg_file([<<Slot:32,BSize:32,LSize:8>> | T], Addr, SS, SizeT, L) ->
1412    seg_file_item(T, Addr, SS, SizeT, L, Slot, BSize, LSize);
1413seg_file([], Addr, _SS, _SizeT, L) ->
1414    {Addr, lists:reverse(L)};
1415seg_file(<<>>, Addr, _SS, _SizeT, L) ->
1416    {Addr, lists:reverse(L)}.
1417
1418seg_file_item(T, Addr, SS, SizeT, L, Slot, BSize, LSize) ->
1419    %% Should call slot_position/1, but since all segments are
1420    %% allocated in a sequence, the position of a slot can be
1421    %% calculated faster.
1422    SlotPos = SS + ?SZOBJP*4 * Slot, % SlotPos = slot_position(Slot)
1423    NoZeros = SlotPos - Addr,
1424    PSize = NoZeros+?SZOBJP*4,
1425    Inc = ?POW(LSize-1),
1426    CollP = ets:update_counter(SizeT, LSize, Inc) - Inc,
1427    PointerBin = if
1428		     NoZeros =:= 0 ->
1429			 <<BSize:32, CollP:32>>;
1430		     NoZeros > 100 ->
1431			 [dets_utils:make_zeros(NoZeros) |
1432			  <<BSize:32, CollP:32>>];
1433		     true ->
1434			 <<0:NoZeros/unit:8, BSize:32, CollP:32>>
1435                 end,
1436    seg_file(T, Addr + PSize, SS, SizeT, [PointerBin | L]).
1437
1438temp_file(Head, SizeT, N) ->
1439    TmpName = lists:concat([Head#head.filename, '.', N]),
1440    {ok, Fd} = dets_utils:open(TmpName, [raw, binary, write]),
1441    %% The file table is consulted when cleaning up.
1442    true = ets:insert(SizeT, {N,0,{TmpName,Fd},0}),
1443    {TmpName, Fd}.
1444
1445%% Does not close Fd.
1446fsck_input(Head, Fd, Cntrs, FileHeader) ->
1447    MaxSz0 = case FileHeader#fileheader.has_md5 of
1448                 true when is_list(FileHeader#fileheader.no_colls) ->
1449                     ?POW(max_objsize(FileHeader#fileheader.no_colls));
1450                 _ ->
1451                     %% The file is not compressed, so the bucket size
1452                     %% cannot exceed the filesize, for all buckets.
1453                     case file:position(Fd, eof) of
1454                         {ok, Pos} ->
1455                             Pos;
1456                         _ ->
1457                             1 bsl 32
1458                     end
1459             end,
1460    MaxSz = erlang:max(MaxSz0, ?CHUNK_SIZE),
1461    State0 = fsck_read(?BASE, Fd, [], 0),
1462    fsck_input(Head, State0, Fd, MaxSz, Cntrs).
1463
1464fsck_input(Head, State, Fd, MaxSz, Cntrs) ->
1465    fun(close) ->
1466	    ok;
1467       (read) ->
1468	    case State of
1469		done ->
1470		    end_of_input;
1471		{done, L, _Seq} ->
1472		    R = count_input(L),
1473		    {R, fsck_input(Head, done, Fd, MaxSz, Cntrs)};
1474		{cont, L, Bin, Pos, Seq} ->
1475		    R = count_input(L),
1476                    FR = fsck_objs(Bin, Head#head.keypos, Head, [], Seq),
1477                    NewState = fsck_read(FR, Pos, Fd, MaxSz, Head),
1478		    {R, fsck_input(Head, NewState, Fd, MaxSz, Cntrs)}
1479	    end
1480    end.
1481
1482%% The ets table Cntrs is used for counting objects per size.
1483count_input(L) ->
1484    lists:reverse(L).
1485
1486fsck_read(Pos, F, L, Seq) ->
1487    case file:position(F, Pos) of
1488	{ok, _} ->
1489	    read_more_bytes([], 0, Pos, F, L, Seq);
1490	_Error ->
1491	    {done, L, Seq}
1492    end.
1493
1494fsck_read({more, Bin, Sz, L, Seq}, Pos, F, MaxSz, Head) when Sz > MaxSz ->
1495    FR = skip_bytes(Bin, ?BUMP, Head#head.keypos, Head, L, Seq),
1496    fsck_read(FR, Pos, F, MaxSz, Head);
1497fsck_read({more, Bin, Sz, L, Seq}, Pos, F, _MaxSz, _Head) ->
1498    read_more_bytes(Bin, Sz, Pos, F, L, Seq);
1499fsck_read({new, Skip, L, Seq}, Pos, F, _MaxSz, _Head) ->
1500    NewPos = Pos + Skip,
1501    fsck_read(NewPos, F, L, Seq).
1502
1503read_more_bytes(B, Min, Pos, F, L, Seq) ->
1504    Max = if
1505	      Min < ?CHUNK_SIZE -> ?CHUNK_SIZE;
1506	      true -> Min
1507	  end,
1508    case dets_utils:read_n(F, Max) of
1509	eof ->
1510	    {done, L, Seq};
1511	Bin ->
1512	    NewPos = Pos + byte_size(Bin),
1513	    {cont, L, list_to_binary([B, Bin]), NewPos, Seq}
1514    end.
1515
1516fsck_objs(Bin = <<Sz:32, Status:32, Tail/binary>>, Kp, Head, L, Seq) ->
1517    if
1518	Status =:= ?ACTIVE ->
1519	    Sz1 = Sz-?OHDSZ,
1520	    case Tail of
1521		<<BinTerm:Sz1/binary, Tail2/binary>> ->
1522		    case catch bin2keybins(BinTerm, Head) of
1523			{'EXIT', _Reason} ->
1524			    %% The whole collection of objects is skipped.
1525			    skip_bytes(Bin, ?BUMP, Kp, Head, L, Seq);
1526			BOs ->
1527			    {NL, NSeq} = make_objects(BOs, Seq, Kp, Head, L),
1528			    Skip = ?POW(sz2pos(Sz)-1) - Sz,
1529			    skip_bytes(Tail2, Skip, Kp, Head, NL, NSeq)
1530		    end;
1531		_ when byte_size(Tail) < Sz1 ->
1532                    {more, Bin, Sz, L, Seq}
1533	    end;
1534	true ->
1535	    skip_bytes(Bin, ?BUMP, Kp, Head, L, Seq)
1536    end;
1537fsck_objs(Bin, _Kp, _Head, L, Seq) ->
1538    {more, Bin, 0, L, Seq}.
1539
1540make_objects([{K,BT} | Os], Seq, Kp, Head, L) ->
1541    Obj = make_object(Head, K, Seq, BT),
1542    make_objects(Os, Seq+1, Kp, Head, [Obj | L]);
1543make_objects([], Seq, _Kp, _Head, L) ->
1544    {L, Seq}.
1545
1546%% Inlined.
1547make_object(Head, Key, Seq, BT) ->
1548    Slot = db_hash(Key, Head),
1549    <<Slot:32, Seq:32, BT/binary>>.
1550
1551%% Inlined.
1552skip_bytes(Bin, Skip, Kp, Head, L, Seq) ->
1553    case Bin of
1554	<<_:Skip/binary, Tail/binary>> ->
1555	    fsck_objs(Tail, Kp, Head, L, Seq);
1556	_ when byte_size(Bin) < Skip ->
1557            {new, Skip - byte_size(Bin), L, Seq}
1558    end.
1559
1560%%%
1561%%% End of repair, conversion and initialization of a dets file.
1562%%%
1563
1564%% -> {NewHead, ok} | throw({Head, Error})
1565do_perform_save(H) ->
1566    {ok, FreeListsPointer} = dets_utils:position(H, eof),
1567    H1 = H#head{freelists_p = FreeListsPointer},
1568    {FLW, FLSize} = free_lists_to_file(H1),
1569    FileSize = FreeListsPointer + FLSize + 4,
1570    AdjustedFileSize = case H#head.base of
1571                           ?BASE -> FileSize;
1572                           Base -> FileSize - Base
1573                       end,
1574    ok = dets_utils:write(H1, [FLW | <<AdjustedFileSize:32>>]),
1575    FileHeader = file_header(H1, FreeListsPointer, ?CLOSED_PROPERLY),
1576    case dets_utils:debug_mode() of
1577        true ->
1578            TmpHead0 = init_freelist(H1#head{fixed = false}),
1579            TmpHead = TmpHead0#head{base = H1#head.base},
1580            case
1581                catch dets_utils:all_allocated_as_list(TmpHead)
1582                      =:= dets_utils:all_allocated_as_list(H1)
1583            of
1584                true ->
1585                    dets_utils:pwrite(H1, [{0, FileHeader}]);
1586                _ ->
1587                    throw(
1588                    dets_utils:corrupt_reason(H1, {failed_to_save_free_lists,
1589                                                   FreeListsPointer,
1590                                                   TmpHead#head.freelists,
1591                                                   H1#head.freelists}))
1592            end;
1593        false ->
1594            dets_utils:pwrite(H1, [{0, FileHeader}])
1595    end.
1596
1597file_header(Head, FreeListsPointer, ClosedProperly) ->
1598    NoColls = case Head#head.no_collections of
1599		  undefined -> [];
1600		  NC -> NC
1601	      end,
1602    L = orddict:merge(fun(_K, V1, V2) -> V1 + V2 end,
1603		      NoColls,
1604		      lists:map(fun(X) -> {X,0} end, lists:seq(4,?MAXBUD-1))),
1605    CW = lists:map(fun({_LSz,N}) -> <<N:32>> end, L),
1606    file_header(Head, FreeListsPointer, ClosedProperly, CW).
1607
1608file_header(Head, FreeListsPointer, ClosedProperly, NoColls) ->
1609    Cookie = ?MAGIC,
1610    TypeCode = dets_utils:type_to_code(Head#head.type),
1611    Version = ?FILE_FORMAT_VERSION,
1612    HashMethod = hash_method_to_code(Head#head.hash_bif),
1613    H1 = <<FreeListsPointer:32, Cookie:32, ClosedProperly:32>>,
1614    H2 = <<TypeCode:32,
1615           Version:32,
1616           (Head#head.m):32,
1617           (Head#head.next):32,
1618           (Head#head.keypos):32,
1619           (Head#head.no_objects):32,
1620           (Head#head.no_keys):32,
1621           (Head#head.min_no_slots):32,
1622           (Head#head.max_no_slots):32,
1623           HashMethod:32,
1624           (Head#head.n):32>>,
1625    DigH = [H2 | NoColls],
1626    MD5 = case Head#head.has_md5 of
1627              true -> erlang:md5(DigH);
1628              false -> <<0:?MD5SZ/unit:8>>
1629          end,
1630    Base = case Head#head.base of
1631               ?BASE -> <<0:32>>;
1632               FlBase -> <<FlBase:32>>
1633           end,
1634    [H1, DigH, MD5, Base | <<0:?RESERVED/unit:8>>].
1635
1636%% Going through some trouble to avoid creating one single binary for
1637%% the free lists. If the free lists are huge, binary_to_term and
1638%% term_to_binary could otherwise stop the emulator for quite some time.
1639
1640-define(MAXFREEOBJ, 4096).
1641-define(ENDFREE, 12345).
1642
1643free_lists_to_file(H) ->
1644    FL = dets_utils:get_freelists(H),
1645    free_list_to_file(FL, H, 1, tuple_size(FL), [], 0).
1646
1647free_list_to_file(_Ftab, _H, Pos, Sz, Ws, WsSz) when Pos > Sz ->
1648    {[Ws | <<(4+?OHDSZ):32, ?FREE:32, ?ENDFREE:32>>], WsSz+4+?OHDSZ};
1649free_list_to_file(Ftab, H, Pos, Sz, Ws, WsSz) ->
1650    Max = (?MAXFREEOBJ - 4 - ?OHDSZ) div 4,
1651    F = fun(N, L, W, S) when N =:= 0 -> {N, L, W, S};
1652	   (N, L, W, S) ->
1653		{L1, N1, More} =
1654		    if
1655			N > Max ->
1656			    {lists:sublist(L, Max), Max,
1657			     {N-Max, lists:nthtail(Max, L)}};
1658			true ->
1659			    {L, N, no_more}
1660		    end,
1661                Size = N1*4 + 4 + ?OHDSZ,
1662		Header = <<Size:32, ?FREE:32, Pos:32>>,
1663		NW = [W, Header | L1],
1664		case More of
1665		    no_more ->
1666			{0, [], NW, S+Size};
1667		    {NN, NL} ->
1668			ok = dets_utils:write(H, NW),
1669			{NN, NL, [], S+Size}
1670		end
1671	end,
1672    {NWs,NWsSz} = dets_utils:tree_to_bin(element(Pos, Ftab), F, Max, Ws, WsSz),
1673    free_list_to_file(Ftab, H, Pos+1, Sz, NWs, NWsSz).
1674
1675free_lists_from_file(H, Pos) ->
1676    {ok, Pos} = dets_utils:position(H#head.fptr, H#head.filename, Pos),
1677    FL = dets_utils:empty_free_lists(),
1678    case catch bin_to_tree([], H, start, FL, -1, []) of
1679	{'EXIT', _} ->
1680	    throw({error, {bad_freelists, H#head.filename}});
1681        Ftab ->
1682            H#head{freelists = Ftab, base = ?BASE}
1683    end.
1684
1685bin_to_tree(Bin, H, LastPos, Ftab, A0, L) ->
1686    case Bin of
1687        <<_Size:32,?FREE:32,?ENDFREE:32,_/binary>> when L =:= [] ->
1688            Ftab;
1689        <<_Size:32,?FREE:32,?ENDFREE:32,_/binary>> ->
1690            setelement(LastPos, Ftab, dets_utils:list_to_tree(L));
1691        <<Size:32,?FREE:32,Pos:32,T/binary>>
1692                      when byte_size(T) >= Size-4-?OHDSZ ->
1693	    {NFtab, L1, A1} =
1694		if
1695		    Pos =/= LastPos, LastPos =/= start ->
1696			Tree = dets_utils:list_to_tree(L),
1697			{setelement(LastPos, Ftab, Tree), [], -1};
1698		    true ->
1699			{Ftab, L, A0}
1700		    end,
1701	    {NL, B2, A2} = bin_to_tree1(T, Size-?OHDSZ-4, A1, L1),
1702	    bin_to_tree(B2, H, Pos, NFtab, A2, NL);
1703        _ ->
1704            Bin2 = dets_utils:read_n(H#head.fptr, ?MAXFREEOBJ),
1705            bin_to_tree(list_to_binary([Bin | Bin2]), H, LastPos, Ftab, A0, L)
1706    end.
1707
1708bin_to_tree1(<<A1:32,A2:32,A3:32,A4:32,T/binary>>, Size, A, L)
1709         when Size >= 16, A < A1, A1 < A2, A2 < A3, A3 < A4 ->
1710    bin_to_tree1(T, Size-16, A4, [A4, A3, A2, A1 | L]);
1711bin_to_tree1(<<A1:32,T/binary>>, Size, A, L) when Size >= 4, A < A1 ->
1712    bin_to_tree1(T, Size - 4, A1, [A1 | L]);
1713bin_to_tree1(B, 0, A, L) ->
1714    {L, B, A}.
1715
1716%% -> [term()] | throw({Head, Error})
1717slot_objs(H, Slot) when Slot >= H#head.next ->
1718    '$end_of_table';
1719slot_objs(H, Slot) ->
1720    {ok, _Pointer, Objects} = slot_objects(H, Slot),
1721    Objects.
1722
1723%% Inlined.
1724h(I, phash2) -> erlang:phash2(I); % -> [0..2^27-1]
1725h(I, phash) -> erlang:phash(I, ?BIG) - 1.
1726
1727db_hash(Key, Head) when Head#head.hash_bif =:= phash2 ->
1728    H = erlang:phash2(Key),
1729    Hash = ?REM2(H, Head#head.m),
1730    if
1731	Hash < Head#head.n ->
1732	    ?REM2(H, Head#head.m2); % H rem (2 * m)
1733	true ->
1734	    Hash
1735    end;
1736db_hash(Key, Head) ->
1737    H = h(Key, Head#head.hash_bif),
1738    Hash = H rem Head#head.m,
1739    if
1740	Hash < Head#head.n ->
1741	    H rem (Head#head.m2); % H rem (2 * m)
1742	true ->
1743	    Hash
1744    end.
1745
1746hash_method_to_code(phash2) -> ?PHASH2;
1747hash_method_to_code(phash) -> ?PHASH.
1748
1749code_to_hash_method(?PHASH2) -> phash2;
1750code_to_hash_method(?PHASH) -> phash;
1751code_to_hash_method(_) -> undefined.
1752
1753no_slots(Head) ->
1754    {Head#head.min_no_slots, Head#head.next, Head#head.max_no_slots}.
1755
1756table_parameters(Head) ->
1757    case Head#head.no_collections of
1758	undefined ->
1759	    undefined; % Version 9(a)
1760	CL ->
1761	    NoColls0 = lists:foldl(fun({_,0}, A) -> A;
1762				      (E, A) -> [E | A]
1763				   end, [], CL),
1764	    NoColls = lists:reverse(NoColls0),
1765	    #?HASH_PARMS{file_format_version = ?FILE_FORMAT_VERSION,
1766			 bchunk_format_version = ?BCHUNK_FORMAT_VERSION,
1767			 file = filename:basename(Head#head.filename),
1768			 type = Head#head.type,
1769			 keypos = Head#head.keypos,
1770			 hash_method = hash_method_to_code(Head#head.hash_bif),
1771			 n = Head#head.n, m = Head#head.m,
1772			 next = Head#head.next,
1773			 min = Head#head.min_no_slots,
1774			 max = Head#head.max_no_slots,
1775			 no_objects = Head#head.no_objects,
1776			 no_keys = Head#head.no_keys, no_colls = NoColls}
1777    end.
1778
1779%% Allow quite a lot when reading object collections.
1780-define(MAXCOLL, (10 * ?CHUNK_SIZE)).
1781
1782%% Re-hashing a segment, starting with SlotStart.
1783%%
1784%% On the average, half of the keys of the slot are put in a new slot.
1785%% If the old slot is i, then the new slot is i+m. The new slots
1786%% reside in a newly allocated segment.
1787%%
1788%% -> {NewHead, ok} | throw({Head, Error})
1789re_hash(Head, SlotStart) ->
1790    FromSlotPos = slot_position(SlotStart),
1791    ToSlotPos = slot_position(SlotStart + Head#head.m),
1792    RSpec = [{FromSlotPos, 4 * ?SEGSZ}],
1793    {ok, [FromBin]} = dets_utils:pread(RSpec, Head),
1794    split_bins(FromBin, Head, FromSlotPos, ToSlotPos, [], [], 0).
1795
1796split_bins(<<>>, Head, _Pos1, _Pos2, _ToRead, _L, 0) ->
1797    {Head, ok};
1798split_bins(<<>>, Head, Pos1, Pos2, ToRead, L, _SoFar) ->
1799    re_hash_write(Head, ToRead, L, Pos1, Pos2);
1800split_bins(FB, Head, Pos1, Pos2, ToRead, L, SoFar) ->
1801    <<Sz1:32, P1:32, FT/binary>> = FB,
1802    <<B1:?OHDSZ/binary, _/binary>> = FB,
1803    NSoFar = SoFar + Sz1,
1804    NPos1 = Pos1 + ?SZOBJP*4,
1805    NPos2 = Pos2 + ?SZOBJP*4,
1806    if
1807	NSoFar > ?MAXCOLL, ToRead =/= [] ->
1808	    {NewHead, ok} = re_hash_write(Head, ToRead, L, Pos1, Pos2),
1809	    split_bins(FB, NewHead, Pos1, Pos2, [], [], 0);
1810	Sz1 =:= 0 ->
1811	    E = {skip,B1},
1812	    split_bins(FT, Head, NPos1, NPos2, ToRead, [E | L], NSoFar);
1813        true ->
1814	    E = {Sz1,P1,B1,Pos1,Pos2},
1815	    NewToRead = [{P1,Sz1} | ToRead],
1816	    split_bins(FT, Head, NPos1, NPos2, NewToRead, [E | L], NSoFar)
1817    end.
1818
1819re_hash_write(Head, ToRead, L, Pos1, Pos2) ->
1820    check_pread2_arg(ToRead, Head),
1821    {ok, Bins} = dets_utils:pread(ToRead, Head),
1822    Z = <<0:32, 0:32>>,
1823    {Head1, BinFS, BinTS, WsB} = re_hash_slots(Bins, L, Head, Z, [],[],[]),
1824    WPos1 = Pos1 - ?SZOBJP*4*length(L),
1825    WPos2 = Pos2 - ?SZOBJP*4*length(L),
1826    ToWrite = [{WPos1,BinFS}, {WPos2, BinTS} | WsB],
1827    dets_utils:pwrite(Head1, ToWrite).
1828
1829re_hash_slots(Bins, [{skip,B1} | L], Head, Z, BinFS, BinTS, WsB) ->
1830    re_hash_slots(Bins, L, Head, Z, [B1 | BinFS], [Z | BinTS], WsB);
1831re_hash_slots([FB | Bins], [E | L], Head, Z, BinFS, BinTS, WsB) ->
1832    {Sz1,P1,B1,Pos1,Pos2} = E,
1833    KeyObjs = case catch per_key(Head, FB) of
1834		  {'EXIT', _Error} ->
1835                      Bad = dets_utils:bad_object(re_hash_slots, {FB, E}),
1836		      throw(dets_utils:corrupt_reason(Head, Bad));
1837		  Else ->
1838		      Else
1839	      end,
1840    case re_hash_split(KeyObjs, Head, [], 0, [], 0) of
1841	{_KL, _KSz, [], 0} ->
1842            Sz1 = _KSz + ?OHDSZ,
1843	    re_hash_slots(Bins, L, Head, Z, [B1 | BinFS], [Z | BinTS], WsB);
1844	{[], 0, _ML, _MSz} -> %% Optimization.
1845            Sz1 = _MSz + ?OHDSZ,
1846	    re_hash_slots(Bins, L, Head, Z, [Z | BinFS], [B1 | BinTS], WsB);
1847	{KL, KSz, ML, MSz} when KL =/= [], KSz > 0, ML =/= [], MSz > 0 ->
1848	    {Head1, FS1, Ws1} =
1849		updated(Head, P1, Sz1, KSz, Pos1, KL, true, foo, bar),
1850	    {NewHead, [{Pos2,Bin2}], Ws2} =
1851		updated(Head1, 0, 0, MSz, Pos2, ML, true, foo, bar),
1852	    NewBinFS = case FS1 of
1853			   [{Pos1,Bin1}] -> [Bin1 | BinFS];
1854			   [] -> [B1 | BinFS] % cannot happen
1855		       end,
1856	    NewBinTS = [Bin2 | BinTS],
1857	    NewWsB = Ws2 ++ Ws1 ++ WsB,
1858	    re_hash_slots(Bins, L, NewHead, Z, NewBinFS, NewBinTS, NewWsB)
1859    end;
1860re_hash_slots([], [], Head, _Z, BinFS, BinTS, WsB) ->
1861    {Head, BinFS, BinTS, lists:reverse(WsB)}.
1862
1863re_hash_split([E | KeyObjs], Head, KL, KSz, ML, MSz) ->
1864    {Key,Sz,Bin,_Item,_Objs} = E,
1865    New = h(Key, Head#head.hash_bif) rem Head#head.m2, % h(key) rem (m * 2)
1866    if
1867	New >= Head#head.m ->
1868	    re_hash_split(KeyObjs, Head, KL, KSz, [Bin | ML], MSz + Sz);
1869	true ->
1870	    re_hash_split(KeyObjs, Head, [Bin | KL], KSz + Sz, ML, MSz)
1871    end;
1872re_hash_split([], _Head, KL, KSz, ML, MSz) ->
1873    {lists:reverse(KL), KSz, lists:reverse(ML), MSz}.
1874
1875%% -> {NewHead, [LookedUpObject], pwrite_list()} | throw({NewHead, Error})
1876write_cache(Head) ->
1877    C = Head#head.cache,
1878    case dets_utils:is_empty_cache(C) of
1879	true -> {Head, [], []};
1880	false ->
1881	    {NewC, MaxInserts, PerKey} = dets_utils:reset_cache(C),
1882	    %% MaxNoInsertedKeys is an upper limit on the number of new keys.
1883	    MaxNoInsertedKeys = erlang:min(MaxInserts, length(PerKey)),
1884	    Head1 = Head#head{cache = NewC},
1885	    case may_grow(Head1, MaxNoInsertedKeys, once) of
1886		{Head2, ok} ->
1887		    eval_work_list(Head2, PerKey);
1888		HeadError ->
1889		    throw(HeadError)
1890	    end
1891    end.
1892
1893%% -> {NewHead, ok} | {NewHead, Error}
1894may_grow(Head, 0, once) ->
1895    %% Do not re-hash if there is a chance that the file is not dirty.
1896    {Head, ok};
1897may_grow(Head, _N, _How) when Head#head.fixed =/= false ->
1898    {Head, ok};
1899may_grow(#head{access = read}=Head, _N, _How) ->
1900    {Head, ok};
1901may_grow(Head, _N, _How) when Head#head.next >= Head#head.max_no_slots ->
1902    {Head, ok};
1903may_grow(Head, N, How) ->
1904    Extra = erlang:min(2*?SEGSZP, Head#head.no_keys + N - Head#head.next),
1905    case catch may_grow1(Head, Extra, How) of
1906	{error, _Reason} = Error -> % alloc may throw error
1907	    dets_utils:corrupt(Head, Error);
1908	{NewHead, Reply} when is_record(Head, head) ->
1909	    {NewHead, Reply}
1910    end.
1911
1912may_grow1(Head, Extra, many_times) when Extra > ?SEGSZP ->
1913    Reply = grow(Head, 1, undefined),
1914    self() ! ?DETS_CALL(self(), may_grow),
1915    Reply;
1916may_grow1(Head, Extra, _How) ->
1917    grow(Head, Extra, undefined).
1918
1919%% -> {Head, ok} | throw({Head, Error})
1920grow(Head, Extra, _SegZero) when Extra =< 0 ->
1921    {Head, ok};
1922grow(Head, Extra, undefined) ->
1923    grow(Head, Extra, seg_zero());
1924grow(Head, _Extra, _SegZero) when Head#head.next >= Head#head.max_no_slots ->
1925    {Head, ok};
1926grow(Head, Extra, SegZero) ->
1927    #head{n = N, next = Next, m = M} = Head,
1928    SegNum = Next div ?SEGSZP,
1929    {Head0, W, Ws1} = allocate_segment(Head, SegZero, SegNum),
1930    %% re_hash/2 will overwrite the segment, but initialize it anyway...
1931    {Head1, ok} = dets_utils:pwrite(Head0, [W | Ws1]),
1932    %% If re_hash fails, segp_cache has been called, but it does not matter.
1933    {Head2, ok} = re_hash(Head1, N),
1934    NewHead =
1935	if
1936	    N + ?SEGSZP =:= M ->
1937		Head2#head{n = 0, next = Next + ?SEGSZP, m = 2 * M, m2 = 4 * M};
1938	    true ->
1939		Head2#head{n = N + ?SEGSZP, next = Next + ?SEGSZP}
1940	end,
1941    true = hash_invars(NewHead),
1942    grow(NewHead, Extra - ?SEGSZP, SegZero).
1943
1944hash_invars(H) ->
1945    hash_invars(H#head.n, H#head.m, H#head.next, H#head.min_no_slots,
1946		H#head.max_no_slots).
1947
1948-define(M8(X), (((X) band (?SEGSZP - 1)) =:= 0)).
1949hash_invars(N, M, Next, Min, Max) ->
1950        ?M8(N) and ?M8(M) and ?M8(Next) and ?M8(Min) and ?M8(Max)
1951    and (0 =< N) and (N =< M) and (N =< 2*Next) and (M =< Next)
1952    and (Next =< 2*M) and (0 =< Min) and (Min =< Next) and (Next =< Max)
1953    and (Min =< M).
1954
1955seg_zero() ->
1956    <<0:(4*?SEGSZ)/unit:8>>.
1957
1958find_object(Head, Object) ->
1959    Key = element(Head#head.keypos, Object),
1960    Slot = db_hash(Key, Head),
1961    find_object(Head, Object, Slot).
1962
1963find_object(H, _Obj, Slot) when Slot >= H#head.next ->
1964    false;
1965find_object(H, Obj, Slot) ->
1966    case catch slot_objects(H, Slot) of
1967	{ok, Pointer, Objects} ->
1968	    case lists:member(Obj, Objects) of
1969		true -> {ok, Pointer};
1970		false -> false
1971	    end;
1972	_ -> false
1973    end.
1974
1975%% -> {ok, BucketP, Objects} | throw({Head, Error})
1976slot_objects(Head, Slot) ->
1977    SlotPos = slot_position(Slot),
1978    MaxSize = maxobjsize(Head),
1979    case dets_utils:ipread(Head, SlotPos, MaxSize) of
1980	{ok, {BucketSz, Pointer, <<BucketSz:32, _St:32, KeysObjs/binary>>}} ->
1981	    case catch bin2objs(KeysObjs, Head#head.type, []) of
1982		{'EXIT', _Error} ->
1983                    Bad = dets_utils:bad_object(slot_objects,
1984                                                {SlotPos, KeysObjs}),
1985		    throw(dets_utils:corrupt_reason(Head, Bad));
1986		Objs when is_list(Objs) ->
1987		    {ok, Pointer, lists:reverse(Objs)}
1988	    end;
1989        [] ->
1990	    {ok, 0, []};
1991	BadRead -> % eof or bad badly formed binary
1992            Bad = dets_utils:bad_object(slot_objects, {SlotPos, BadRead}),
1993	    throw(dets_utils:corrupt_reason(Head, Bad))
1994    end.
1995
1996%%%
1997%%% Cache routines depending on the dets file format.
1998%%%
1999
2000%% -> {Head, [LookedUpObject], pwrite_list()} | throw({Head, Error})
2001eval_work_list(Head, [{Key,[{_Seq,{lookup,Pid}}]}]) ->
2002    SlotPos = slot_position(db_hash(Key, Head)),
2003    MaxSize = maxobjsize(Head),
2004    Objs = case dets_utils:ipread(Head, SlotPos, MaxSize) of
2005	       {ok, {_BucketSz, _Pointer, Bin}} ->
2006                   case catch per_key(Head, Bin) of
2007                       {'EXIT', _Error} ->
2008                           Bad = dets_utils:bad_object(eval_work_list,
2009                                                       {SlotPos, Bin}),
2010                           throw(dets_utils:corrupt_reason(Head, Bad));
2011                       KeyObjs when is_list(KeyObjs) ->
2012                           case dets_utils:mkeysearch(Key, 1, KeyObjs) of
2013                               false ->
2014                                   [];
2015                               {value, {Key,_KS,_KB,O,Os}} ->
2016                                   case catch binobjs2terms(Os) of
2017                                       {'EXIT', _Error} ->
2018                                           Bad = dets_utils:bad_object
2019                                                   (eval_work_list,
2020                                                    {SlotPos, Bin, KeyObjs}),
2021                                           throw(dets_utils:corrupt_reason
2022                                                      (Head, Bad));
2023                                       Terms when is_list(Terms) ->
2024                                           get_objects([O | Terms])
2025                                   end
2026                           end
2027                   end;
2028	       [] ->
2029		   [];
2030	       BadRead -> % eof or bad badly formed binary
2031                   Bad = dets_utils:bad_object(eval_work_list,
2032                                               {SlotPos, BadRead}),
2033		   throw(dets_utils:corrupt_reason(Head, Bad))
2034	   end,
2035    {Head, [{Pid,Objs}], []};
2036eval_work_list(Head, PerKey) ->
2037    SWLs = tag_with_slot(PerKey, Head, []),
2038    P1 = dets_utils:family(SWLs),
2039    {PerSlot, SlotPositions} = remove_slot_tag(P1, [], []),
2040    {ok, Bins} = dets_utils:pread(SlotPositions, Head),
2041    read_buckets(PerSlot, SlotPositions, Bins, Head, [], [], [], [], 0, 0, 0).
2042
2043tag_with_slot([{K,_} = WL | WLs], Head, L) ->
2044    tag_with_slot(WLs, Head, [{db_hash(K, Head), WL} | L]);
2045tag_with_slot([], _Head, L) ->
2046    L.
2047
2048remove_slot_tag([{S,SWLs} | SSWLs], Ls, SPs) ->
2049    remove_slot_tag(SSWLs, [SWLs | Ls], [{slot_position(S), ?SEGOBJSZ} | SPs]);
2050remove_slot_tag([], Ls, SPs) ->
2051    {Ls, SPs}.
2052
2053read_buckets([WLs | SPs], [{P1,_8} | Ss], [<<_Zero:32,P2:32>> | Bs], Head,
2054	      PWLs, ToRead, LU, Ws, NoObjs, NoKeys, SoFar) when P2 =:= 0 ->
2055    {NewHead, NLU, NWs, No, KNo} =
2056	eval_bucket_keys(WLs, P1, 0, 0, [], Head, Ws, LU),
2057    NewNoObjs = No + NoObjs,
2058    NewNoKeys = KNo + NoKeys,
2059    read_buckets(SPs, Ss, Bs, NewHead, PWLs, ToRead, NLU, NWs,
2060		 NewNoObjs, NewNoKeys, SoFar);
2061read_buckets([WorkLists| SPs], [{P1,_8} | Ss], [<<Size:32,P2:32>> | Bs], Head,
2062	     PWLs, ToRead, LU, Ws, NoObjs, NoKeys, SoFar)
2063                                 when SoFar + Size < ?MAXCOLL; ToRead =:= [] ->
2064    NewToRead = [{P2, Size} | ToRead],
2065    NewPWLs = [{P2,P1,WorkLists} | PWLs],
2066    NewSoFar = SoFar + Size,
2067    read_buckets(SPs, Ss, Bs, Head, NewPWLs, NewToRead, LU, Ws,
2068		 NoObjs, NoKeys, NewSoFar);
2069read_buckets(SPs, Ss, Bs, Head, PWLs0, ToRead0, LU, Ws, NoObjs, NoKeys, SoFar)
2070                                                             when SoFar > 0 ->
2071    %% It pays off to sort the positions. The seek times are reduced,
2072    %% at least if the file blocks are reasonably contiguous, as is
2073    %% often the case.
2074    PWLs = lists:keysort(1, PWLs0),
2075    ToRead = lists:keysort(1, ToRead0),
2076    check_pread2_arg(ToRead, Head),
2077    {ok, Bins} = dets_utils:pread(ToRead, Head),
2078    case catch eval_buckets(Bins, PWLs, Head, LU, Ws, 0, 0) of
2079	{ok, NewHead, NLU, [], 0, 0} ->
2080            read_buckets(SPs, Ss, Bs, NewHead, [], [], NLU, [],
2081			 NoObjs, NoKeys, 0);
2082	{ok, Head1, NLU, NWs, No, KNo} ->
2083	    NewNoObjs = NoObjs + No,
2084	    NewNoKeys = NoKeys + KNo,
2085	    %% It does not seem to reduce seek times to sort positions
2086	    %% when writing (maybe because it takes several calls to
2087	    %% write_cache/1 to fill the file system's buffer cache).
2088            {NewHead, ok} = dets_utils:pwrite(Head1, lists:reverse(NWs)),
2089            read_buckets(SPs, Ss, Bs, NewHead, [], [], NLU, [],
2090			 NewNoObjs, NewNoKeys, 0);
2091	Error  ->
2092            Bad = dets_utils:bad_object(read_buckets, {Bins, Error}),
2093	    throw(dets_utils:corrupt_reason(Head, Bad))
2094    end;
2095read_buckets([], [], [], Head, [], [], LU, Ws, NoObjs, NoKeys, 0) ->
2096    {NewHead, NWs} = update_no_keys(Head, Ws, NoObjs, NoKeys),
2097    {NewHead, LU, lists:reverse(NWs)}.
2098
2099eval_buckets([Bin | Bins], [SP | SPs], Head, LU, Ws, NoObjs, NoKeys) ->
2100    {Pos, P1, WLs} = SP,
2101    KeyObjs = per_key(Head, Bin),
2102    {NewHead, NLU, NWs, No, KNo} =
2103	eval_bucket_keys(WLs, P1, Pos, byte_size(Bin), KeyObjs, Head,Ws,LU),
2104    eval_buckets(Bins, SPs, NewHead, NLU, NWs, NoObjs + No, NoKeys + KNo);
2105eval_buckets([], [], Head, LU, Ws, NoObjs, NoKeys) ->
2106    {ok, Head, LU, Ws, NoObjs, NoKeys}.
2107
2108eval_bucket_keys(WLs, SlotPos, Pos, OldSize, KeyObjs, Head, Ws, LU) ->
2109    {NLU, Bins, BSize, No, KNo, Ch} =
2110         eval_slot(WLs, KeyObjs, Head#head.type, LU, [], 0, 0, 0, false),
2111    {NewHead, W1, W2} =
2112	updated(Head, Pos, OldSize, BSize, SlotPos, Bins, Ch, No, KNo),
2113    {NewHead, NLU, W2++W1++Ws, No, KNo}.
2114
2115updated(Head, Pos, OldSize, BSize, SlotPos, Bins, Ch, DeltaNoOs, DeltaNoKs) ->
2116    BinsSize = BSize + ?OHDSZ,
2117    if
2118	Pos =:= 0, BSize =:= 0 ->
2119	    {Head, [], []};
2120	Pos =:= 0, BSize > 0 ->
2121	    {Head1, NewPos, FPos} = dets_utils:alloc(Head, adjsz(BinsSize)),
2122            NewHead = one_bucket_added(Head1, FPos-1),
2123	    W1 = {NewPos, [<<BinsSize:32, ?ACTIVE:32>> | Bins]},
2124	    W2 = {SlotPos, <<BinsSize:32, NewPos:32>>},
2125	    {NewHead, [W2], [W1]};
2126	Pos =/= 0, BSize =:= 0 ->
2127	    {Head1, FPos} = dets_utils:free(Head, Pos, adjsz(OldSize)),
2128            NewHead = one_bucket_removed(Head1, FPos-1),
2129	    W1 = {Pos+?STATUS_POS, <<?FREE:32>>},
2130	    W2 = {SlotPos, <<0:32, 0:32>>},
2131	    {NewHead, [W2], [W1]};
2132	Pos =/= 0, BSize > 0, Ch =:= false ->
2133	    {Head, [], []};
2134	Pos =/= 0, BSize > 0 ->
2135	    %% Doubtful. The scan function has to be careful since
2136	    %% partly scanned objects may be overwritten.
2137	    Overwrite0 = if
2138			     OldSize =:= BinsSize -> same;
2139			     true -> sz2pos(OldSize) =:= sz2pos(BinsSize)
2140			 end,
2141            Overwrite = if
2142			    Head#head.fixed =/= false ->
2143				%% Make sure that if the table is
2144				%% fixed, nothing is overwritten,
2145				%% unless the number of objects and
2146				%% the number of keys remain the same.
2147				%% This is used by bchunk, which
2148				%% assumes that it traverses exactly
2149				%% the same number of objects and keys
2150				%% (and collections) as were present
2151				%% when chunking started (the table
2152				%% must have been fixed).
2153				(Overwrite0 =/= false) and
2154				(DeltaNoOs =:= 0) and (DeltaNoKs =:= 0);
2155			    true ->
2156				Overwrite0
2157			end,
2158	    if
2159		Overwrite =:= same ->
2160		    W1 = {Pos+?OHDSZ, Bins},
2161		    {Head, [], [W1]};
2162		Overwrite ->
2163		    W1 = {Pos, [<<BinsSize:32, ?ACTIVE:32>> | Bins]},
2164		    %% Pos is already there, but return {SlotPos, <8 bytes>}.
2165		    W2 = {SlotPos, <<BinsSize:32, Pos:32>>},
2166		    {Head, [W2], [W1]};
2167		true ->
2168		    {Head1, FPosF} = dets_utils:free(Head, Pos, adjsz(OldSize)),
2169		    {Head2, NewPos, FPosA} =
2170			dets_utils:alloc(Head1, adjsz(BinsSize)),
2171                    Head3 = one_bucket_added(Head2, FPosA-1),
2172                    NewHead = one_bucket_removed(Head3, FPosF-1),
2173		    W0 = {NewPos, [<<BinsSize:32, ?ACTIVE:32>> | Bins]},
2174		    W2 = {SlotPos, <<BinsSize:32, NewPos:32>>},
2175		    W1 = if
2176			      Pos =/= NewPos ->
2177                                  %% W0 first.
2178				  [W0, {Pos+?STATUS_POS, <<?FREE:32>>}];
2179			      true ->
2180				  [W0]
2181			  end,
2182		    {NewHead, [W2], W1}
2183	    end
2184    end.
2185
2186one_bucket_added(H, _Log2) when H#head.no_collections =:= undefined ->
2187    H;
2188one_bucket_added(H, Log2) when H#head.maxobjsize >= Log2 ->
2189    NewNoColls = orddict:update_counter(Log2, 1, H#head.no_collections),
2190    H#head{no_collections = NewNoColls};
2191one_bucket_added(H, Log2) ->
2192    NewNoColls = orddict:update_counter(Log2, 1, H#head.no_collections),
2193    H#head{no_collections = NewNoColls, maxobjsize = Log2}.
2194
2195one_bucket_removed(H, _FPos) when H#head.no_collections =:= undefined ->
2196    H;
2197one_bucket_removed(H, Log2) when H#head.maxobjsize > Log2 ->
2198    NewNoColls = orddict:update_counter(Log2, -1, H#head.no_collections),
2199    H#head{no_collections = NewNoColls};
2200one_bucket_removed(H, Log2) when H#head.maxobjsize =:= Log2 ->
2201    NewNoColls = orddict:update_counter(Log2, -1, H#head.no_collections),
2202    MaxObjSize = max_objsize(NewNoColls),
2203    H#head{no_collections = NewNoColls, maxobjsize = MaxObjSize}.
2204
2205eval_slot([{Key,Commands} | WLs] = WLs0, [{K,KS,KB,O,Os} | KOs1]=KOs,
2206          Type, LU, Ws, No, KNo,BSz, Ch) ->
2207    case dets_utils:cmp(K, Key) of
2208        0 ->
2209            Old = [O | binobjs2terms(Os)],
2210            {NLU, NWs, Sz, No1, KNo1, NCh} =
2211		eval_key(Key, Commands, Old, Type, KB, KS, LU, Ws, Ch),
2212            eval_slot(WLs, KOs1, Type, NLU, NWs, No1 + No,
2213		      KNo1 + KNo, Sz + BSz, NCh);
2214        -1 ->
2215            eval_slot(WLs0, KOs1, Type, LU, [Ws | KB], No,
2216		      KNo, KS + BSz, Ch);
2217        1 ->
2218            {NLU, NWs, Sz, No1, KNo1, NCh} =
2219		eval_key(Key, Commands, [], Type, [], 0, LU, Ws, Ch),
2220            eval_slot(WLs, KOs, Type, NLU, NWs, No1 + No,
2221		      KNo1 + KNo, Sz + BSz, NCh)
2222    end;
2223eval_slot([{Key,Commands} | WLs], [], Type, LU, Ws, No, KNo,BSz, Ch) ->
2224    {NLU, NWs, Sz, No1, KNo1, NCh} =
2225        eval_key(Key, Commands, [], Type, [], 0, LU, Ws, Ch),
2226    eval_slot(WLs, [], Type, NLU, NWs, No1 + No, KNo1 + KNo, Sz + BSz, NCh);
2227eval_slot([], [{_Key,Size,KeyBin,_,_} | KOs], Type, LU, Ws, No, KNo,BSz, Ch) ->
2228    eval_slot([], KOs, Type, LU, [Ws | KeyBin], No, KNo, Size + BSz, Ch);
2229eval_slot([], [], _Type, LU, Ws, No, KNo, BSz, Ch) ->
2230    {LU, Ws, BSz, No, KNo, Ch}.
2231
2232eval_key(_K, [{_Seq,{lookup,Pid}}], [], _Type, _KeyBin, _KeySz, LU, Ws, Ch) ->
2233    NLU = [{Pid, []} | LU],
2234    {NLU, Ws, 0, 0, 0, Ch};
2235eval_key(_K, [{_Seq,{lookup,Pid}}], Old0, _Type, KeyBin, KeySz, LU, Ws, Ch) ->
2236    Old = lists:keysort(2, Old0), % sort on sequence number
2237    Objs = get_objects(Old),
2238    NLU = [{Pid, Objs} | LU],
2239    {NLU, [Ws | KeyBin], KeySz, 0, 0, Ch};
2240eval_key(K, Comms, Orig, Type, KeyBin, KeySz, LU, Ws, Ch) ->
2241    Old = dets_utils:msort(Orig),
2242    case eval_key1(Comms, [], Old, Type, K, LU, Ws, 0, Orig) of
2243	{ok, NLU} when Old =:= [] ->
2244	    {NLU, Ws, 0, 0, 0, Ch};
2245	{ok, NLU} ->
2246	    {NLU, [Ws | KeyBin], KeySz, 0, 0, Ch};
2247	{NLU, NWs, NSz, No} when Old =:= [], NSz > 0 ->
2248	    {NLU, NWs, NSz, No, 1, true};
2249	{NLU, NWs, NSz, No} when Old =/= [], NSz =:= 0 ->
2250	    {NLU, NWs, NSz, No, -1, true};
2251	{NLU, NWs, NSz, No} ->
2252	    {NLU, NWs, NSz, No, 0, true}
2253    end.
2254
2255%% First find 'delete_key' and 'lookup' commands, and handle the 'set' type.
2256eval_key1([{_Seq,{insert,Term}} | L], Cs, [{Term,_,_}] = Old, Type=set, K,
2257	  LU, Ws, No, Orig) ->
2258    eval_key1(L, Cs, Old, Type, K, LU, Ws, No, Orig);
2259eval_key1([{Seq,{insert,Term}} | L], Cs, Old, Type=set, K, LU, Ws, No, Orig)
2260                                         ->
2261    NNo = No + 1 - length(Old),
2262    eval_key1(L, Cs, [{Term,Seq,insert}], Type, K, LU, Ws, NNo, Orig);
2263eval_key1([{_Seq,{lookup,Pid}} | L], Cs, Old, Type, Key, LU, Ws, No, Orig) ->
2264    {ok, New0, NewNo} = eval_comms(Cs, Old, Type, No),
2265    New = lists:keysort(2, New0), % sort on sequence number
2266    Objs = get_objects(New),
2267    NLU = [{Pid, Objs} | LU],
2268    if
2269	L =:= [] ->
2270	    eval_end(New, NLU, Type, Ws, NewNo, Orig);
2271	true ->
2272	    NewOld = dets_utils:msort(New),
2273	    eval_key1(L, [], NewOld, Type, Key, NLU, Ws, NewNo, Orig)
2274    end;
2275eval_key1([{_Seq,delete_key} | L], _Cs, Old, Type, K, LU, Ws, No, Orig) ->
2276    NewNo = No - length(Old),
2277    eval_key1(L, [], [], Type, K, LU, Ws, NewNo, Orig);
2278eval_key1([{_Seq,{delete_object,Term}} | L], Cs, [{Term,_,_}], Type=set, K,
2279	  LU, Ws, No, Orig) ->
2280    eval_key1(L, Cs, [], Type, K, LU, Ws, No-1, Orig);
2281eval_key1([{_Seq,{delete_object,_T}}| L], Cs, Old1, Type=set, K, LU,
2282	      Ws, No, Orig) ->
2283    eval_key1(L, Cs, Old1, Type, K, LU, Ws, No, Orig);
2284eval_key1([{Seq,{Comm,Term}} | L], Cs, Old, Type, K, LU, Ws, No, Orig)
2285                                         when Type =/= set ->
2286    eval_key1(L, [{Term,Seq,Comm} | Cs], Old, Type, K, LU, Ws, No, Orig);
2287eval_key1([], Cs, Old, Type=set, _Key, LU, Ws, No, Orig) ->
2288    [] = Cs,
2289    eval_end(Old, LU, Type, Ws, No, Orig);
2290eval_key1([], Cs, Old, Type, _Key, LU, Ws, No, Orig) ->
2291    {ok, New, NewNo} = eval_comms(Cs, Old, Type, No),
2292    eval_end(New, LU, Type, Ws, NewNo, Orig).
2293
2294eval_comms([], L, _Type=set, No) ->
2295    {ok, L, No};
2296eval_comms(Cs, Old, Type, No) ->
2297    Commands = dets_utils:msort(Cs),
2298    case Type of
2299	bag -> eval_bag(Commands, Old, [], No);
2300	duplicate_bag -> eval_dupbag(Commands, Old, [], No)
2301    end.
2302
2303eval_end(New0, LU, Type, Ws, NewNo, Orig) ->
2304    New = lists:keysort(2, New0), % sort on sequence number
2305    NoChange = if
2306		   length(New) =/= length(Orig) -> false;
2307		   true ->
2308		       same_terms(Orig, New)
2309	       end,
2310    if
2311        NoChange ->
2312	    %% The key's objects have not changed.
2313	    {ok, LU};
2314	New =:= [] ->
2315	    {LU, Ws, 0, NewNo};
2316	true ->
2317	    {Ws1, Sz} = make_bins(New, [], 0),
2318	    if
2319		Type =:= set ->
2320		    {LU, [Ws | Ws1], Sz, NewNo};
2321		true ->
2322		    NSz = Sz + 4,
2323		    {LU, [Ws, <<NSz:32>> | Ws1], NSz, NewNo}
2324	    end
2325    end.
2326
2327same_terms([E1 | L1], [E2 | L2]) when element(1, E1) =:= element(1, E2) ->
2328    same_terms(L1, L2);
2329same_terms([], []) ->
2330    true;
2331same_terms(_L1, _L2) ->
2332    false.
2333
2334make_bins([{_Term,_Seq,B} | L], W, Sz) when is_binary(B) ->
2335    make_bins(L, [W | B], Sz + byte_size(B));
2336make_bins([{Term,_Seq,insert} | L], W, Sz) ->
2337    B = term_to_binary(Term),
2338    BSize = byte_size(B) + 4,
2339    make_bins(L, [W, [<<BSize:32>> | B]], Sz + BSize);
2340make_bins([], W, Sz) ->
2341    {W, Sz}.
2342
2343get_objects([{T,_S,_BT} | L]) ->
2344    [T | get_objects(L)];
2345get_objects([]) ->
2346    [].
2347
2348eval_bag([{Term1,_S1,Op}=N | L]=L0, [{Term2,_,_}=O | Old]=Old0, New, No) ->
2349    case {Op, dets_utils:cmp(Term1, Term2)} of
2350        {delete_object, -1} ->
2351            eval_bag(L, Old0, New, No);
2352        {insert, -1} ->
2353            bag_object(L, Old0, New, No, [N], Term1);
2354        {delete_object, 0} ->
2355            bag_object(L, Old, New, No-1, [], Term1);
2356        {insert, 0} ->
2357            bag_object(L, Old, New, No-1, [N], Term1);
2358        {_, 1} ->
2359            eval_bag(L0, Old, [O | New], No)
2360    end;
2361eval_bag([{_Term1,_Seq1,delete_object} | L], []=Old, New, No) ->
2362    eval_bag(L, Old, New, No);
2363eval_bag([{Term,_Seq1,insert} = N | L], []=Old, New, No) ->
2364    bag_object(L, Old, New, No, [N], Term);
2365eval_bag([]=L, [O | Old], New, No) ->
2366    eval_bag(L, Old, [O | New], No);
2367eval_bag([], [], New, No) ->
2368    {ok, New, No}.
2369
2370bag_object([{Term,_,insert} = N | L], Old, New, No, _N, Term) ->
2371    bag_object(L, Old, New, No, [N], Term);
2372bag_object([{Term,_,delete_object} | L], Old, New, No, _N, Term) ->
2373    bag_object(L, Old, New, No, [], Term);
2374bag_object(L, Old, New, No, [], _Term) ->
2375    eval_bag(L, Old, New, No);
2376bag_object(L, Old, New, No, [N], _Term) ->
2377    eval_bag(L, Old, [N | New], No+1).
2378
2379eval_dupbag([{Term1,_S1,Op}=N | L]=L0, [{Term2,_,_}=O | Old]=Old0, New, No) ->
2380    case {Op, dets_utils:cmp(Term1, Term2)} of
2381        {delete_object, -1} ->
2382            eval_dupbag(L, Old0, New, No);
2383        {insert, -1} ->
2384            dup_object(L, Old0, New, No+1, Term1, [N]);
2385        {_, 0} ->
2386            old_dup_object(L0, Old, New, No, Term1, [O]);
2387        {_, 1} ->
2388            eval_dupbag(L0, Old, [O | New], No)
2389    end;
2390eval_dupbag([{_Term1,_Seq1,delete_object} | L], []=Old, New, No) ->
2391    eval_dupbag(L, Old, New, No);
2392eval_dupbag([{Term,_Seq1,insert} = N | L], []=Old, New, No) ->
2393    dup_object(L, Old, New, No+1, Term, [N]);
2394eval_dupbag([]=L, [O | Old], New, No) ->
2395    eval_dupbag(L, Old, [O | New], No);
2396eval_dupbag([], [], New, No) ->
2397    {ok, New, No}.
2398
2399old_dup_object(L, [{Term,_,_} = Obj | Old], New, No, Term, N) ->
2400    old_dup_object(L, Old, New, No, Term, [Obj | N]);
2401old_dup_object(L, Old, New, No, Term, N) ->
2402    dup_object(L, Old, New, No, Term, N).
2403
2404dup_object([{Term,_,insert} = Obj | L], Old, New, No, Term, Q) ->
2405    dup_object(L, Old, New, No+1, Term, [Obj | Q]);
2406dup_object([{Term,_Seq,delete_object} | L], Old, New, No, Term, Q) ->
2407    %% All objects are deleted.
2408    NewNo = No - length(Q),
2409    dup_object(L, Old, New, NewNo, Term, []);
2410dup_object(L, Old, New, No, _Term, Q) ->
2411    eval_dupbag(L, Old, Q ++ New, No).
2412
2413%% Update no_keys on the file too, if the number of segments that
2414%% dets:fsck/6 uses for estimate has changed.
2415update_no_keys(Head, Ws, 0, 0) -> {Head, Ws};
2416update_no_keys(Head, Ws, DeltaObjects, DeltaKeys) ->
2417    NoKeys = Head#head.no_keys,
2418    NewNoKeys = NoKeys + DeltaKeys,
2419    NewNoObject = Head#head.no_objects + DeltaObjects,
2420    NewHead = Head#head{no_objects = NewNoObject, no_keys = NewNoKeys},
2421    NWs =
2422	if
2423	    NewNoKeys > NewHead#head.max_no_slots ->
2424		Ws;
2425	    NoKeys div ?SEGSZP =:= NewNoKeys div ?SEGSZP ->
2426		Ws;
2427	    true ->
2428                [{0, file_header(NewHead, 0, ?NOT_PROPERLY_CLOSED)} | Ws]
2429	end,
2430    {NewHead, NWs}.
2431
2432slot_position(S) ->
2433    SegNo = ?SLOT2SEG(S), % S div ?SEGSZP
2434    PartPos = ?SEGARRADDR(?SEG2SEGARRPART(SegNo)), % SegNo div ?SEGPARTSZ
2435    Part = get_arrpart(PartPos),
2436    Pos = ?SEGPARTADDR(Part, SegNo),
2437    get_segp(Pos) + (?SEGOBJSZ * ?REM2(S, ?SEGSZP)).
2438
2439check_pread2_arg([{_Pos,Sz}], Head) when Sz > ?MAXCOLL ->
2440    case check_pread_arg(Sz, Head) of
2441        true ->
2442            ok;
2443        false ->
2444            Bad = dets_utils:bad_object(check_pread2_arg, Sz),
2445            throw(dets_utils:corrupt_reason(Head, Bad))
2446    end;
2447check_pread2_arg(_ToRead, _Head) ->
2448    ok.
2449
2450check_pread_arg(Sz, Head) when Sz > ?MAXCOLL ->
2451    maxobjsize(Head) >= Sz;
2452check_pread_arg(_Sz, _Head) ->
2453    true.
2454
2455%% Inlined.
2456segp_cache(Pos, Segment) ->
2457    put(Pos, Segment).
2458
2459%% Inlined.
2460get_segp(Pos) ->
2461    get(Pos).
2462
2463arrpart_cache(Pos, ArrPart) ->
2464    put(Pos, ArrPart).
2465
2466%% Inlined.
2467get_arrpart(Pos) ->
2468    get(Pos).
2469
2470sz2pos(N) ->
2471    1 + dets_utils:log2(N).
2472
2473%% Inlined. Compensates for the bug in dets_utils:sz2pos/1.
2474adjsz(N) ->
2475    N-1.
2476
2477%% Inlined.
2478maxobjsize(Head) when Head#head.maxobjsize =:= undefined ->
2479    ?POW(32);
2480maxobjsize(Head) ->
2481    ?POW(Head#head.maxobjsize).
2482
2483scan_objs(Head, Bin, From, To, L, Ts, R, Type) ->
2484    case catch scan_skip(Bin, From, To, L, Ts, R, Type, 0) of
2485	{'EXIT', _Reason} ->
2486	    bad_object;
2487        Reply = {more, _From1, _To, _L, _Ts, _R, Size} when Size > ?MAXCOLL ->
2488            case check_pread_arg(Size, Head) of
2489                true -> Reply;
2490                false -> bad_object
2491            end;
2492	Reply ->
2493	    Reply
2494    end.
2495
2496scan_skip(Bin, From, To, L, Ts, R, Type, Skip) ->
2497    From1 = From + Skip,
2498    case Bin of
2499        _ when From1 >= To ->
2500            if
2501                From1 > To; L =:= <<>> ->
2502                    {more, From1, To, L, Ts, R, 0};
2503		true ->
2504		    <<From2:32, To1:32, L1/binary>> = L,
2505		    Skip1 = From2 - From,
2506		    scan_skip(Bin, From, To1, L1, Ts, R, Type, Skip1)
2507	    end;
2508	<<_:Skip/binary, _Size:32, St:32, _Sz:32, KO/binary>>
2509	                 when St =/= ?ACTIVE, St =/= ?FREE ->
2510	    %% Neither ?ACTIVE nor ?FREE is a multiple of ?BUMP and
2511	    %% thus cannot be found in segments or segment array
2512	    %% parts.
2513	    scan_skip(KO, From1+12, To, L, Ts, R, Type, ?ACTUAL_SEG_SIZE-12);
2514	<<_:Skip/binary, Size:32, _St:32, Sz:32, KO/binary>>
2515	                 when Size-12 =< byte_size(KO) ->
2516	    %% St = ?FREE means that the object was deleted after
2517	    %% scanning started
2518	    bin2bins(KO, From1+12, To, L, Ts, R, Type, Size, Sz);
2519	<<_:Skip/binary, Size:32, _St:32, _Sz:32, _KO/binary>> ->
2520	    {more, From1, To, L, Ts, R, Size};
2521	_ when Skip >= 0 ->
2522	    {more, From1, To, L, Ts, R, 0}
2523    end.
2524
2525%% Appends objects in reversed order. All objects of the slot are
2526%% extracted. Note that binary_to_term/1 ignores garbage at the end.
2527bin2bins(Bin, From, To, L, Ts, R, Type=set, Size, ObjSz0) ->
2528    ObjsSz1 = Size - ObjSz0,
2529    if
2530	ObjsSz1 =:= ?OHDSZ ->
2531	    slot_end(Bin, From, To, L, [Bin | Ts], R, Type, Size, 1);
2532	true ->
2533	    ObjSz = ObjSz0-4,
2534	    <<_:ObjSz/binary, NObjSz:32, T/binary>> = Bin,
2535	    bins_set(T, From, To, L, [Bin | Ts], R, Type, Size, 2,
2536		     NObjSz, ObjsSz1-NObjSz, Bin)
2537    end;
2538bin2bins(<<ObjSz:32, Bin/binary>> = KO, From, To, L, Ts, R, Type, Size, Sz) ->
2539    bins_bag(Bin, From, To, L, Ts, R, Type, Size, 1,
2540	     Sz-ObjSz-4, ObjSz-4, Size-Sz, KO).
2541
2542bins_set(Bin, From, To, L, Ts, R, Type, Size, NoObjs, _ObjSz0, ?OHDSZ, KO) ->
2543    slot_end(KO, From, To, L, [Bin | Ts], R, Type, Size, NoObjs);
2544bins_set(Bin, From, To, L, Ts, R, Type, Size, NoObjs, ObjSz0, ObjsSz, KO) ->
2545    ObjSz = ObjSz0 - 4,
2546    <<_:ObjSz/binary, NObjSz:32, T/binary>> = Bin,
2547    bins_set(T, From, To, L, [Bin | Ts], R, Type, Size, NoObjs + 1,
2548	     NObjSz, ObjsSz-NObjSz, KO).
2549
2550bins_bag(Bin, From, To, L, Ts, R, Type, Size, NoObjs, Sz, ObjSz, ObjsSz, KO)
2551       when Sz > 0 ->
2552    <<_:ObjSz/binary, NObjSz:32, T/binary>> = Bin,
2553    bins_bag(T, From, To, L, [Bin | Ts], R, Type, Size, NoObjs + 1,
2554	     Sz-NObjSz, NObjSz-4, ObjsSz, KO);
2555bins_bag(Bin, From, To, L, Ts, R, Type, Size, NoObjs, _Z, _ObjSz, ?OHDSZ, KO) ->
2556    slot_end(KO, From, To, L, [Bin | Ts], R, Type, Size, NoObjs);
2557bins_bag(Bin, From, To, L, Ts, R, Type, Size, NoObjs, _Z, ObjSz, ObjsSz, KO) ->
2558    <<_:ObjSz/binary, Sz:32, NObjSz:32, T/binary>> = Bin,
2559    bins_bag(T, From, To, L, [Bin | Ts], R, Type, Size, NoObjs + 1,
2560	     Sz-NObjSz-4, NObjSz-4, ObjsSz-Sz, KO).
2561
2562slot_end(KO, From, To, L, Ts, R, Type, Size, NoObjs) ->
2563    Skip = ?POW(dets_utils:log2(Size)) - 12, % expensive...
2564    if
2565	R >= 0 ->
2566	    scan_skip(KO, From, To, L, Ts, R+Size, Type, Skip);
2567	true ->
2568	    %% Should check this at the end of every key.
2569	    case R + NoObjs of
2570		R1 when R1 >= -1 ->
2571		    From1 = From + Skip,
2572                    Bin1 = case KO of
2573                               <<_:Skip/binary, B/binary>> -> B;
2574                               _ -> <<>>
2575                           end,
2576                    {stop, Bin1, From1, To, L, Ts};
2577		R1 ->
2578		    scan_skip(KO, From, To, L, Ts, R1, Type, Skip)
2579	    end
2580    end.
2581
2582%%%%%%%%%%%%%%%%%  DEBUG functions %%%%%%%%%%%%%%%%
2583
2584file_info(FH) ->
2585    #fileheader{closed_properly = CP, keypos = Kp,
2586                m = M, next = Next, n = N, version = Version,
2587                type = Type, no_objects = NoObjects, no_keys = NoKeys}
2588        = FH,
2589    if
2590        CP =:= 0 ->
2591            {error, not_closed};
2592        FH#fileheader.cookie =/= ?MAGIC ->
2593            {error, not_a_dets_file};
2594        FH#fileheader.version =/= ?FILE_FORMAT_VERSION ->
2595            {error, bad_version};
2596        true ->
2597            {ok, [{closed_properly,CP},{keypos,Kp},{m, M},{n,N},
2598		  {next,Next},{no_objects,NoObjects},{no_keys,NoKeys},
2599                  {type,Type},{version,Version}]}
2600    end.
2601
2602v_segments(#head{}=H) ->
2603    v_parts(H, 0, 0).
2604
2605v_parts(_H, ?SEGARRSZ, _SegNo) ->
2606    done;
2607v_parts(H, PartNo, SegNo) ->
2608    Fd = H#head.fptr,
2609    PartPos = dets_utils:read_4(Fd, ?SEGARRADDR(PartNo)),
2610    if
2611	PartPos =:= 0 ->
2612	    done;
2613	true ->
2614	    PartBin = dets_utils:pread_n(Fd, PartPos, ?SEGPARTSZ*4),
2615	    v_segments(H, PartBin, PartNo+1, SegNo)
2616    end.
2617
2618v_segments(H, <<>>, PartNo, SegNo) ->
2619    v_parts(H, PartNo, SegNo);
2620v_segments(_H, <<0:32,_/binary>>, _PartNo, _SegNo) ->
2621    done;
2622v_segments(H, <<Seg:32,T/binary>>, PartNo, SegNo) ->
2623    io:format("<~w>SEGMENT ~w~n", [Seg, SegNo]),
2624    v_segment(H, SegNo, Seg, 0),
2625    v_segments(H, T, PartNo, SegNo+1).
2626
2627v_segment(_H, _, _SegPos, ?SEGSZP) ->
2628    done;
2629v_segment(H, SegNo, SegPos, SegSlot) ->
2630    Slot = SegSlot + (SegNo * ?SEGSZP),
2631    BucketP = SegPos + (4 * ?SZOBJP * SegSlot),
2632    case catch read_bucket(H, BucketP, H#head.type) of
2633	{'EXIT', Reason} ->
2634	    dets_utils:vformat("** dets: Corrupt or truncated dets file~n",
2635			       []),
2636	    io:format("~nERROR ~tp~n", [Reason]);
2637	[] ->  %% don't print empty buckets
2638	    true;
2639	{Size, CollP, Objects} ->
2640	    io:format("   <~w>~w: <~w:~p>~w~n",
2641		      [BucketP, Slot, CollP, Size, Objects])
2642    end,
2643    v_segment(H, SegNo, SegPos, SegSlot+1).
2644
2645%% -> [] | {Pointer, [object()]} | throw(EXIT)
2646read_bucket(Head, Position, Type) ->
2647    MaxSize = maxobjsize(Head),
2648    case dets_utils:ipread(Head, Position, MaxSize) of
2649	{ok, {Size, Pointer, <<Size:32, _Status:32, KeysObjs/binary>>}} ->
2650	    Objs = bin2objs(KeysObjs, Type, []),
2651	    {Size, Pointer, lists:reverse(Objs)};
2652	[] ->
2653	    []
2654    end.
2655
2656-define(SEQSTART, -(1 bsl 26)).
2657
2658%% -> [{Key,SizeOfWholeKey,WholeKeyBin,FirstObject,OtherObjects}] |throw(EXIT)
2659%% FirstObject = {Term, Seq, Binary}
2660%% Seq < 0 (and ascending).
2661per_key(Head, <<BinSize:32, ?ACTIVE:32, Bin/binary>> = B) ->
2662    true = (byte_size(B) =:= BinSize),
2663    if
2664	Head#head.type =:= set ->
2665	    per_set_key(Bin, Head#head.keypos, []);
2666	true ->
2667	    per_bag_key(Bin, Head#head.keypos, [])
2668    end.
2669
2670per_set_key(<<Size:32, T/binary>> = B, KeyPos, L) ->
2671    <<KeyBin:Size/binary, R/binary>> = B,
2672    Term = binary_to_term(T),
2673    Key = element(KeyPos, Term),
2674    Item = {Term, ?SEQSTART, KeyBin},
2675    per_set_key(R, KeyPos, [{Key,Size,KeyBin,Item,[]} | L]);
2676per_set_key(<<>>, KeyPos, L) when is_integer(KeyPos) ->
2677    lists:reverse(L).
2678
2679per_bag_key(<<Size:32, ObjSz:32, T/binary>> = B, KeyPos, L) ->
2680    <<KeyBin:Size/binary, R/binary>> = B,
2681    ObjSz1 = ObjSz - 4,
2682    Size1 = Size - ObjSz - 4,
2683    <<_:ObjSz1/binary, KeyObjs:Size1/binary, _/binary>> = T,
2684    <<_Size:32, Bin:ObjSz/binary, _/binary>> = B,
2685    Term = binary_to_term(T),
2686    Key = element(KeyPos, Term),
2687    Item = {Term, ?SEQSTART, Bin},
2688    per_bag_key(R, KeyPos, [{Key,Size,KeyBin,Item,KeyObjs} | L]);
2689per_bag_key(<<>>, KeyPos, L) when is_integer(KeyPos) ->
2690    lists:reverse(L).
2691
2692
2693binobjs2terms(<<ObjSz:32, T/binary>> = B) ->
2694    binobjs2terms(B, T, ObjSz, byte_size(B)-ObjSz, ?SEQSTART+1, []);
2695binobjs2terms([] = B) ->
2696    B;
2697binobjs2terms(<<>>) ->
2698    [].
2699
2700binobjs2terms(Bin, Obj, _ObjSz, _Size=0, N, L) ->
2701    lists:reverse(L, [{binary_to_term(Obj), N, Bin}]);
2702binobjs2terms(Bin, Bin1, ObjSz, Size, N, L) ->
2703    <<B:ObjSz/binary, T/binary>> = Bin,
2704    <<NObjSz:32, T1/binary>> = T,
2705    Item = {binary_to_term(Bin1), N, B},
2706    binobjs2terms(T, T1, NObjSz, Size-NObjSz, N+1, [Item | L]).
2707
2708
2709%% Appends objects in reversed order.
2710bin2objs(KeysObjs, set, Ts) ->
2711    <<ObjSz:32, T/binary>> = KeysObjs,
2712    bin2objs(T, ObjSz-4, byte_size(KeysObjs)-ObjSz, Ts);
2713bin2objs(KeysObjs, _Type, Ts) ->
2714    bin2objs2(KeysObjs, Ts).
2715
2716bin2objs2(<<Size:32, ObjSz:32, T/binary>>, Ts) ->
2717    bin2objs(T, ObjSz-4, Size-ObjSz-4, Ts);
2718bin2objs2(<<>>, Ts) ->
2719    Ts.
2720
2721bin2objs(Bin, ObjSz, _Size=0, Ts) ->
2722    <<_:ObjSz/binary, T/binary>> = Bin,
2723    bin2objs2(T, [binary_to_term(Bin) | Ts]);
2724bin2objs(Bin, ObjSz, Size, Ts) ->
2725    <<_:ObjSz/binary, NObjSz:32, T/binary>> = Bin,
2726    bin2objs(T, NObjSz-4, Size-NObjSz, [binary_to_term(Bin) | Ts]).
2727
2728
2729bin2keybins(KeysObjs, Head) when Head#head.type =:= set ->
2730    <<ObjSz:32, T/binary>> = KeysObjs,
2731    bin2keybins(T, Head#head.keypos, ObjSz-4, byte_size(KeysObjs)-ObjSz,[]);
2732bin2keybins(KeysObjs, Head) ->
2733    bin2keybins2(KeysObjs, Head#head.keypos, []).
2734
2735bin2keybins2(<<Size:32, ObjSz:32, T/binary>>, Kp, L) ->
2736    bin2keybins(T, Kp, ObjSz-4, Size-ObjSz-4, L);
2737bin2keybins2(<<>>, Kp, L) when is_integer(Kp) ->
2738    lists:reverse(L).
2739
2740bin2keybins(Bin, Kp, ObjSz, _Size=0, L) ->
2741    <<Obj:ObjSz/binary, T/binary>> = Bin,
2742    Term = binary_to_term(Obj),
2743    bin2keybins2(T, Kp, [{element(Kp, Term),Obj} | L]);
2744bin2keybins(Bin, Kp, ObjSz, Size, L) ->
2745    <<Obj:ObjSz/binary, NObjSz:32, T/binary>> = Bin,
2746    Term = binary_to_term(Obj),
2747    bin2keybins(T, Kp, NObjSz-4, Size-NObjSz, [{element(Kp,Term),Obj} | L]).
2748