1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5%   http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13-module(couch_emsort).
14
15% This is an implementation of an external N-way merge sort. It's primary
16% purpose is to be used during database compaction as an optimization for
17% managing the docid btree.
18%
19% Trunk currently writes the docid btree as its compacting the database but
20% this is quite inneficient as its written out of order in the general case
21% as writes are ordered by update_seq.
22%
23% The general design of this module is a very standard merge sort with one
24% caveat due to append only files. This is described in more detail in the
25% sorting phase.
26%
27% The basic algorithm is in two halves. The first half stores KV pairs to disk
28% which is then followed by the actual sorting phase that streams KV's back
29% to the client using a fold-like function. After some basic definitions we'll
30% describe both phases.
31%
32% Key/Value apairs (aka, KV pairs, or KVs) are simply lists of two-tuples with
33% a key as the first element and an arbitrary value as the second. The key of
34% this pair is what used to determine the sort order based on native Erlang
35% term comparison.
36%
37% Internally, KVs are stored as lists with a max size defined by
38% #ems.chain_chunk. These lists are then chained together on disk using disk
39% offsets as a poor man's linked list. The basic format of a list looks like
40% {KVs, DiskOffset} where DiskOffset is either the atom nil which means "end
41% of the list" or an integer that is a file position offset that is the
42% location of another {KVs, DiskOffset} term. The head of each list is
43% referred to with a single DiskOffset. The set of terms that extend from
44% this initial DiskOffset to the last {KVs, nil} term is referred to in the
45% code as a chain. Two important facts are that one call to couch_emsort:add/2
46% creates a single chain, and that a chain is always sorted on disk (though its
47% possible to be sorted in descending order which will be discussed later).
48%
49% The second major internal structure is the back bone. This is a list of
50% chains that has a quite similar structure to chains but contains different
51% data types and has no guarantee on ordering. The back bone is merely the
52% list of all head DiskOffsets. The structure has the similar structure of
53% {DiskOffsets, DiskOffset} that we use for chains, except that DiskOffsets is
54% a list of integers that refer to the heads of chains. The maximum size of
55% DiskOffsets is defined by #ems.bb_chunk. It is important to note that the
56% backbone has no defined ordering. The other thing of note is that the RAM
57% bounds are loosely defined as:
58%
59%     #ems.bb_chunk * #ems.chain_chunk * avg_size(KV).
60%
61% Build Phase
62% -----------
63%
64% As mentioned, each call to couch_emsort:add/2 creates a chain from the
65% list of KVs that are passed in. This list is first sorted and then the
66% chain is created by foldr-ing (note: r) across the list to build the
67% chain on disk. It is important to note that the final chain is then
68% sorted in ascending order on disk.
69%
70%
71% Sort Phase
72% ----------
73%
74% The sort phase is where the merge sort kicks in. This is generally your
75% average merge sort with a caveat for append only storage. First the
76% general outline.
77%
78% The general outline for this sort is that it iteratively merges chains
79% in the backbone until less than #ems.bb_chunk chains exist. At this
80% point it switches to the last merge sort phase where it just streams
81% the sorted KVs back to the client using a fold function.
82%
83% The general chain merging is a pretty standard merge sort. You load up
84% the initial KVs from each phase, pick the next one in sort order and
85% then when you run out of KVs you're left with a single DiskOffset for
86% the head of a single chain that represents the merge. These new
87% DiskOffsets are used to build the new back bone.
88%
89% The one caveat here is that we're using append only storage. This is
90% important because once we make a pass we've effectively reversed the
91% sort order of each chain. Ie, the first merge results in chains that
92% are ordered in descending order. Since, one pass reverses the list
93% the trick is that each phase does two passes. The first phase picks
94% the smallest KV to write next and the second phase picks the largest.
95% In this manner each time we do a back bone merge we end up with chains
96% that are always sorted in an ascending order.
97%
98% The one downfall is that in the interest of simplicity the sorting is
99% restricted to Erlang's native term sorting. A possible extension would
100% be to allow two comparison functions to be used, but this module is
101% currently only used for docid sorting which is hardcoded to be raw
102% Erlang ordering.
103%
104% Diagram
105% -------
106%
107% If it helps, this is a general diagram of the internal structures. A
108% couple points to note since this is ASCII art. The BB pointers across
109% the top are lists of chains going down. Each BBN item is one of the
110% {DiskOffsets, DiskOffset} structures discussed earlier. Going down,
111% the CMN nodes are actually representing #ems.bb_chunk chains in parallel
112% going off the back bone. It is important and not represented in this
113% diagram that within these groups the chains don't have to be the same
114% length. That's just a limitiationg of my ASCII artistic abilities.
115%
116% The BBN* node is marked with a * to denote that it is the only state
117% that we store when writing headeres to disk as it has pointers that
118% lead us to all data in the tree.
119%
120%     BB1 <- BB2 <- BB3 <- BBN*
121%      |      |      |      |
122%      v      v      v      v
123%     CA1    CB1    CC1    CD1
124%      |             |      |
125%      v             v      v
126%     CA2           CC2    CD2
127%      |                    |
128%      v                    v
129%     CA3                  CD3
130%
131
132-export([open/1, open/2, get_fd/1, get_state/1]).
133-export([add/2, merge/1, merge/2, sort/1, iter/1, next/1]).
134-export([num_kvs/1, num_merges/1]).
135
136-record(ems, {
137    fd,
138    root,
139    bb_chunk = 10,
140    chain_chunk = 100,
141    num_kvs = 0,
142    num_bb = 0
143}).
144
145
146-define(REPORT_INTERVAL, 1000).
147
148
149open(Fd) ->
150    {ok, #ems{fd=Fd}}.
151
152
153open(Fd, Options) ->
154    {ok, set_options(#ems{fd=Fd}, Options)}.
155
156
157set_options(Ems, []) ->
158    Ems;
159set_options(Ems, [{root, Root} | Rest]) ->
160    set_options(Ems#ems{root=Root}, Rest);
161set_options(Ems, [{chain_chunk, Count} | Rest]) when is_integer(Count) ->
162    set_options(Ems#ems{chain_chunk=Count}, Rest);
163set_options(Ems, [{back_bone_chunk, Count} | Rest]) when is_integer(Count) ->
164    set_options(Ems#ems{bb_chunk=Count}, Rest);
165set_options(Ems, [{num_kvs, NumKVs} | Rest]) when is_integer(NumKVs) ->
166    set_options(Ems#ems{num_kvs=NumKVs}, Rest);
167set_options(Ems, [{num_bb, NumBB} | Rest]) when is_integer(NumBB) ->
168    set_options(Ems#ems{num_bb=NumBB}, Rest).
169
170
171get_fd(#ems{fd=Fd}) ->
172    Fd.
173
174
175get_state(#ems{} = Ems) ->
176    #ems{
177        root = Root,
178        num_kvs = NumKVs,
179        num_bb = NumBB
180    } = Ems,
181    [
182        {root, Root},
183        {num_kvs, NumKVs},
184        {num_bb, NumBB}
185    ].
186
187
188add(Ems, []) ->
189    {ok, Ems};
190add(Ems, KVs) ->
191    Pos = write_kvs(Ems, KVs),
192    NewEms = add_bb_pos(Ems, Pos),
193    {ok, NewEms#ems{
194        num_kvs = Ems#ems.num_kvs + length(KVs),
195        num_bb = Ems#ems.num_bb + 1
196    }}.
197
198
199sort(#ems{}=Ems) ->
200    {ok, Ems1} = merge(Ems),
201    iter(Ems1).
202
203
204merge(Ems) ->
205    merge(Ems, fun(_) -> ok end).
206
207
208merge(#ems{root=undefined}=Ems, _Reporter) ->
209    {ok, Ems};
210merge(#ems{}=Ems, Reporter) ->
211    {ok, decimate(Ems, Reporter)}.
212
213
214iter(#ems{root=undefined}=Ems) ->
215    {ok, {Ems, []}};
216iter(#ems{root={BB, nil}}=Ems) ->
217    Chains = init_chains(Ems, small, BB),
218    {ok, {Ems, Chains}};
219iter(#ems{root={_, _}}) ->
220    {error, not_merged}.
221
222
223next({_Ems, []}) ->
224    finished;
225next({Ems, Chains}) ->
226    {KV, RestChains} = choose_kv(small, Ems, Chains),
227    {ok, KV, {Ems, RestChains}}.
228
229
230num_kvs(#ems{num_kvs=NumKVs}) ->
231    NumKVs.
232
233num_merges(#ems{bb_chunk=BBChunk, num_bb=NumBB}) ->
234    num_merges(BBChunk, NumBB).
235
236
237add_bb_pos(#ems{root=undefined}=Ems, Pos) ->
238    Ems#ems{root={[Pos], nil}};
239add_bb_pos(#ems{root={BB, Prev}}=Ems, Pos) ->
240    {NewBB, NewPrev} = append_item(Ems, {BB, Prev}, Pos, Ems#ems.bb_chunk),
241    Ems#ems{root={NewBB, NewPrev}}.
242
243
244write_kvs(Ems, KVs) ->
245    % Write the list of KV's to disk in sorted order in chunks
246    % of 100. Also make sure that the order is so that they
247    % can be streamed in asscending order.
248    {LastKVs, LastPos} =
249    lists:foldr(fun(KV, Acc) ->
250        append_item(Ems, Acc, KV, Ems#ems.chain_chunk)
251    end, {[], nil}, lists:sort(KVs)),
252    {ok, Final, _} = couch_file:append_term(Ems#ems.fd, {LastKVs, LastPos}),
253    Final.
254
255
256decimate(#ems{root={_BB, nil}}=Ems, _Reporter) ->
257    % We have less than bb_chunk backbone pointers so we're
258    % good to start streaming KV's back to the client.
259    Ems;
260decimate(#ems{root={BB, NextBB}}=Ems, Reporter) ->
261    % To make sure we have a bounded amount of data in RAM
262    % at any given point we first need to decimate the data
263    % by performing the first couple iterations of a merge
264    % sort writing the intermediate results back to disk.
265
266    % The first pass gives us a sort with pointers linked from
267    % largest to smallest.
268    {RevBB, RevNextBB} = merge_back_bone(Ems, small, BB, NextBB, Reporter),
269
270    % We have to run a second pass so that links are pointed
271    % back from smallest to largest.
272    {FwdBB, FwdNextBB} = merge_back_bone(Ems, big, RevBB, RevNextBB, Reporter),
273
274    % Continue deicmating until we have an acceptable bound on
275    % the number of keys to use.
276    decimate(Ems#ems{root={FwdBB, FwdNextBB}}, Reporter).
277
278
279merge_back_bone(Ems, Choose, BB, NextBB, Reporter) ->
280    BBPos = merge_chains(Ems, Choose, BB, Reporter),
281    Reporter(length(BB)),
282    merge_rest_back_bone(Ems, Choose, NextBB, {[BBPos], nil}, Reporter).
283
284
285merge_rest_back_bone(_Ems, _Choose, nil, Acc, _Reporter) ->
286    Acc;
287merge_rest_back_bone(Ems, Choose, BBPos, Acc, Reporter) ->
288    {ok, {BB, NextBB}} = couch_file:pread_term(Ems#ems.fd, BBPos),
289    NewPos = merge_chains(Ems, Choose, BB, Reporter),
290    {NewBB, NewPrev} = append_item(Ems, Acc, NewPos, Ems#ems.bb_chunk),
291    merge_rest_back_bone(Ems, Choose, NextBB, {NewBB, NewPrev}, Reporter).
292
293
294merge_chains(Ems, Choose, BB, Reporter) ->
295    Chains = init_chains(Ems, Choose, BB),
296    merge_chains(Ems, Choose, Chains, {[], nil}, Reporter, 0).
297
298
299merge_chains(Ems, _Choose, [], ChainAcc, _Reporter, _Count) ->
300    {ok, CPos, _} = couch_file:append_term(Ems#ems.fd, ChainAcc),
301    CPos;
302merge_chains(#ems{chain_chunk=CC}=Ems, Choose, Chains, Acc, Reporter, Count0) ->
303    {KV, RestChains} = choose_kv(Choose, Ems, Chains),
304    {NewKVs, NewPrev} = append_item(Ems, Acc, KV, CC),
305    Count1 = case (Count0 + 1) rem ?REPORT_INTERVAL of
306        0 ->
307            Reporter(Count0),
308            0;
309        _ ->
310            Count0 + 1
311    end,
312    merge_chains(Ems, Choose, RestChains, {NewKVs, NewPrev}, Reporter, Count1).
313
314
315init_chains(Ems, Choose, BB) ->
316    Chains = lists:map(fun(CPos) ->
317        {ok, {KVs, NextKVs}} = couch_file:pread_term(Ems#ems.fd, CPos),
318        {KVs, NextKVs}
319    end, BB),
320    order_chains(Choose, Chains).
321
322
323order_chains(small, Chains) -> lists:sort(Chains);
324order_chains(big, Chains) -> lists:reverse(lists:sort(Chains)).
325
326
327choose_kv(_Choose, _Ems, [{[KV], nil} | Rest]) ->
328    {KV, Rest};
329choose_kv(Choose, Ems, [{[KV], Pos} | RestChains]) ->
330    {ok, Chain} = couch_file:pread_term(Ems#ems.fd, Pos),
331    case Choose of
332        small -> {KV, ins_small_chain(RestChains, Chain, [])};
333        big -> {KV, ins_big_chain(RestChains, Chain, [])}
334    end;
335choose_kv(Choose, _Ems, [{[KV | RestKVs], Prev} | RestChains]) ->
336    case Choose of
337        small -> {KV, ins_small_chain(RestChains, {RestKVs, Prev}, [])};
338        big -> {KV, ins_big_chain(RestChains, {RestKVs, Prev}, [])}
339    end.
340
341
342ins_small_chain([{[{K1,_}|_],_}=C1|Rest], {[{K2,_}|_],_}=C2, Acc) when K1<K2 ->
343    ins_small_chain(Rest, C2, [C1 | Acc]);
344ins_small_chain(Rest, Chain, Acc) ->
345    lists:reverse(Acc, [Chain | Rest]).
346
347
348ins_big_chain([{[{K1,_}|_],_}=C1|Rest], {[{K2,_}|_],_}=C2, Acc) when K1>K2 ->
349    ins_big_chain(Rest, C2, [C1 | Acc]);
350ins_big_chain(Rest, Chain, Acc) ->
351    lists:reverse(Acc, [Chain | Rest]).
352
353
354append_item(Ems, {List, Prev}, Pos, Size) when length(List) >= Size ->
355    {ok, PrevList, _} = couch_file:append_term(Ems#ems.fd, {List, Prev}),
356    {[Pos], PrevList};
357append_item(_Ems, {List, Prev}, Pos, _Size) ->
358    {[Pos | List], Prev}.
359
360
361num_merges(BBChunk, NumBB) when NumBB =< BBChunk ->
362    0;
363num_merges(BBChunk, NumBB) when NumBB > BBChunk ->
364    RevNumBB = ceil(NumBB / BBChunk),
365    FwdNumBB = ceil(RevNumBB / BBChunk),
366    2 + num_merges(BBChunk, FwdNumBB).
367