1% Licensed under the Apache License, Version 2.0 (the "License"); you may not 2% use this file except in compliance with the License. You may obtain a copy of 3% the License at 4% 5% http://www.apache.org/licenses/LICENSE-2.0 6% 7% Unless required by applicable law or agreed to in writing, software 8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 10% License for the specific language governing permissions and limitations under 11% the License. 12 13-module(couch_emsort). 14 15% This is an implementation of an external N-way merge sort. It's primary 16% purpose is to be used during database compaction as an optimization for 17% managing the docid btree. 18% 19% Trunk currently writes the docid btree as its compacting the database but 20% this is quite inneficient as its written out of order in the general case 21% as writes are ordered by update_seq. 22% 23% The general design of this module is a very standard merge sort with one 24% caveat due to append only files. This is described in more detail in the 25% sorting phase. 26% 27% The basic algorithm is in two halves. The first half stores KV pairs to disk 28% which is then followed by the actual sorting phase that streams KV's back 29% to the client using a fold-like function. After some basic definitions we'll 30% describe both phases. 31% 32% Key/Value apairs (aka, KV pairs, or KVs) are simply lists of two-tuples with 33% a key as the first element and an arbitrary value as the second. The key of 34% this pair is what used to determine the sort order based on native Erlang 35% term comparison. 36% 37% Internally, KVs are stored as lists with a max size defined by 38% #ems.chain_chunk. These lists are then chained together on disk using disk 39% offsets as a poor man's linked list. The basic format of a list looks like 40% {KVs, DiskOffset} where DiskOffset is either the atom nil which means "end 41% of the list" or an integer that is a file position offset that is the 42% location of another {KVs, DiskOffset} term. The head of each list is 43% referred to with a single DiskOffset. The set of terms that extend from 44% this initial DiskOffset to the last {KVs, nil} term is referred to in the 45% code as a chain. Two important facts are that one call to couch_emsort:add/2 46% creates a single chain, and that a chain is always sorted on disk (though its 47% possible to be sorted in descending order which will be discussed later). 48% 49% The second major internal structure is the back bone. This is a list of 50% chains that has a quite similar structure to chains but contains different 51% data types and has no guarantee on ordering. The back bone is merely the 52% list of all head DiskOffsets. The structure has the similar structure of 53% {DiskOffsets, DiskOffset} that we use for chains, except that DiskOffsets is 54% a list of integers that refer to the heads of chains. The maximum size of 55% DiskOffsets is defined by #ems.bb_chunk. It is important to note that the 56% backbone has no defined ordering. The other thing of note is that the RAM 57% bounds are loosely defined as: 58% 59% #ems.bb_chunk * #ems.chain_chunk * avg_size(KV). 60% 61% Build Phase 62% ----------- 63% 64% As mentioned, each call to couch_emsort:add/2 creates a chain from the 65% list of KVs that are passed in. This list is first sorted and then the 66% chain is created by foldr-ing (note: r) across the list to build the 67% chain on disk. It is important to note that the final chain is then 68% sorted in ascending order on disk. 69% 70% 71% Sort Phase 72% ---------- 73% 74% The sort phase is where the merge sort kicks in. This is generally your 75% average merge sort with a caveat for append only storage. First the 76% general outline. 77% 78% The general outline for this sort is that it iteratively merges chains 79% in the backbone until less than #ems.bb_chunk chains exist. At this 80% point it switches to the last merge sort phase where it just streams 81% the sorted KVs back to the client using a fold function. 82% 83% The general chain merging is a pretty standard merge sort. You load up 84% the initial KVs from each phase, pick the next one in sort order and 85% then when you run out of KVs you're left with a single DiskOffset for 86% the head of a single chain that represents the merge. These new 87% DiskOffsets are used to build the new back bone. 88% 89% The one caveat here is that we're using append only storage. This is 90% important because once we make a pass we've effectively reversed the 91% sort order of each chain. Ie, the first merge results in chains that 92% are ordered in descending order. Since, one pass reverses the list 93% the trick is that each phase does two passes. The first phase picks 94% the smallest KV to write next and the second phase picks the largest. 95% In this manner each time we do a back bone merge we end up with chains 96% that are always sorted in an ascending order. 97% 98% The one downfall is that in the interest of simplicity the sorting is 99% restricted to Erlang's native term sorting. A possible extension would 100% be to allow two comparison functions to be used, but this module is 101% currently only used for docid sorting which is hardcoded to be raw 102% Erlang ordering. 103% 104% Diagram 105% ------- 106% 107% If it helps, this is a general diagram of the internal structures. A 108% couple points to note since this is ASCII art. The BB pointers across 109% the top are lists of chains going down. Each BBN item is one of the 110% {DiskOffsets, DiskOffset} structures discussed earlier. Going down, 111% the CMN nodes are actually representing #ems.bb_chunk chains in parallel 112% going off the back bone. It is important and not represented in this 113% diagram that within these groups the chains don't have to be the same 114% length. That's just a limitiationg of my ASCII artistic abilities. 115% 116% The BBN* node is marked with a * to denote that it is the only state 117% that we store when writing headeres to disk as it has pointers that 118% lead us to all data in the tree. 119% 120% BB1 <- BB2 <- BB3 <- BBN* 121% | | | | 122% v v v v 123% CA1 CB1 CC1 CD1 124% | | | 125% v v v 126% CA2 CC2 CD2 127% | | 128% v v 129% CA3 CD3 130% 131 132-export([open/1, open/2, get_fd/1, get_state/1]). 133-export([add/2, merge/1, merge/2, sort/1, iter/1, next/1]). 134-export([num_kvs/1, num_merges/1]). 135 136-record(ems, { 137 fd, 138 root, 139 bb_chunk = 10, 140 chain_chunk = 100, 141 num_kvs = 0, 142 num_bb = 0 143}). 144 145 146-define(REPORT_INTERVAL, 1000). 147 148 149open(Fd) -> 150 {ok, #ems{fd=Fd}}. 151 152 153open(Fd, Options) -> 154 {ok, set_options(#ems{fd=Fd}, Options)}. 155 156 157set_options(Ems, []) -> 158 Ems; 159set_options(Ems, [{root, Root} | Rest]) -> 160 set_options(Ems#ems{root=Root}, Rest); 161set_options(Ems, [{chain_chunk, Count} | Rest]) when is_integer(Count) -> 162 set_options(Ems#ems{chain_chunk=Count}, Rest); 163set_options(Ems, [{back_bone_chunk, Count} | Rest]) when is_integer(Count) -> 164 set_options(Ems#ems{bb_chunk=Count}, Rest); 165set_options(Ems, [{num_kvs, NumKVs} | Rest]) when is_integer(NumKVs) -> 166 set_options(Ems#ems{num_kvs=NumKVs}, Rest); 167set_options(Ems, [{num_bb, NumBB} | Rest]) when is_integer(NumBB) -> 168 set_options(Ems#ems{num_bb=NumBB}, Rest). 169 170 171get_fd(#ems{fd=Fd}) -> 172 Fd. 173 174 175get_state(#ems{} = Ems) -> 176 #ems{ 177 root = Root, 178 num_kvs = NumKVs, 179 num_bb = NumBB 180 } = Ems, 181 [ 182 {root, Root}, 183 {num_kvs, NumKVs}, 184 {num_bb, NumBB} 185 ]. 186 187 188add(Ems, []) -> 189 {ok, Ems}; 190add(Ems, KVs) -> 191 Pos = write_kvs(Ems, KVs), 192 NewEms = add_bb_pos(Ems, Pos), 193 {ok, NewEms#ems{ 194 num_kvs = Ems#ems.num_kvs + length(KVs), 195 num_bb = Ems#ems.num_bb + 1 196 }}. 197 198 199sort(#ems{}=Ems) -> 200 {ok, Ems1} = merge(Ems), 201 iter(Ems1). 202 203 204merge(Ems) -> 205 merge(Ems, fun(_) -> ok end). 206 207 208merge(#ems{root=undefined}=Ems, _Reporter) -> 209 {ok, Ems}; 210merge(#ems{}=Ems, Reporter) -> 211 {ok, decimate(Ems, Reporter)}. 212 213 214iter(#ems{root=undefined}=Ems) -> 215 {ok, {Ems, []}}; 216iter(#ems{root={BB, nil}}=Ems) -> 217 Chains = init_chains(Ems, small, BB), 218 {ok, {Ems, Chains}}; 219iter(#ems{root={_, _}}) -> 220 {error, not_merged}. 221 222 223next({_Ems, []}) -> 224 finished; 225next({Ems, Chains}) -> 226 {KV, RestChains} = choose_kv(small, Ems, Chains), 227 {ok, KV, {Ems, RestChains}}. 228 229 230num_kvs(#ems{num_kvs=NumKVs}) -> 231 NumKVs. 232 233num_merges(#ems{bb_chunk=BBChunk, num_bb=NumBB}) -> 234 num_merges(BBChunk, NumBB). 235 236 237add_bb_pos(#ems{root=undefined}=Ems, Pos) -> 238 Ems#ems{root={[Pos], nil}}; 239add_bb_pos(#ems{root={BB, Prev}}=Ems, Pos) -> 240 {NewBB, NewPrev} = append_item(Ems, {BB, Prev}, Pos, Ems#ems.bb_chunk), 241 Ems#ems{root={NewBB, NewPrev}}. 242 243 244write_kvs(Ems, KVs) -> 245 % Write the list of KV's to disk in sorted order in chunks 246 % of 100. Also make sure that the order is so that they 247 % can be streamed in asscending order. 248 {LastKVs, LastPos} = 249 lists:foldr(fun(KV, Acc) -> 250 append_item(Ems, Acc, KV, Ems#ems.chain_chunk) 251 end, {[], nil}, lists:sort(KVs)), 252 {ok, Final, _} = couch_file:append_term(Ems#ems.fd, {LastKVs, LastPos}), 253 Final. 254 255 256decimate(#ems{root={_BB, nil}}=Ems, _Reporter) -> 257 % We have less than bb_chunk backbone pointers so we're 258 % good to start streaming KV's back to the client. 259 Ems; 260decimate(#ems{root={BB, NextBB}}=Ems, Reporter) -> 261 % To make sure we have a bounded amount of data in RAM 262 % at any given point we first need to decimate the data 263 % by performing the first couple iterations of a merge 264 % sort writing the intermediate results back to disk. 265 266 % The first pass gives us a sort with pointers linked from 267 % largest to smallest. 268 {RevBB, RevNextBB} = merge_back_bone(Ems, small, BB, NextBB, Reporter), 269 270 % We have to run a second pass so that links are pointed 271 % back from smallest to largest. 272 {FwdBB, FwdNextBB} = merge_back_bone(Ems, big, RevBB, RevNextBB, Reporter), 273 274 % Continue deicmating until we have an acceptable bound on 275 % the number of keys to use. 276 decimate(Ems#ems{root={FwdBB, FwdNextBB}}, Reporter). 277 278 279merge_back_bone(Ems, Choose, BB, NextBB, Reporter) -> 280 BBPos = merge_chains(Ems, Choose, BB, Reporter), 281 Reporter(length(BB)), 282 merge_rest_back_bone(Ems, Choose, NextBB, {[BBPos], nil}, Reporter). 283 284 285merge_rest_back_bone(_Ems, _Choose, nil, Acc, _Reporter) -> 286 Acc; 287merge_rest_back_bone(Ems, Choose, BBPos, Acc, Reporter) -> 288 {ok, {BB, NextBB}} = couch_file:pread_term(Ems#ems.fd, BBPos), 289 NewPos = merge_chains(Ems, Choose, BB, Reporter), 290 {NewBB, NewPrev} = append_item(Ems, Acc, NewPos, Ems#ems.bb_chunk), 291 merge_rest_back_bone(Ems, Choose, NextBB, {NewBB, NewPrev}, Reporter). 292 293 294merge_chains(Ems, Choose, BB, Reporter) -> 295 Chains = init_chains(Ems, Choose, BB), 296 merge_chains(Ems, Choose, Chains, {[], nil}, Reporter, 0). 297 298 299merge_chains(Ems, _Choose, [], ChainAcc, _Reporter, _Count) -> 300 {ok, CPos, _} = couch_file:append_term(Ems#ems.fd, ChainAcc), 301 CPos; 302merge_chains(#ems{chain_chunk=CC}=Ems, Choose, Chains, Acc, Reporter, Count0) -> 303 {KV, RestChains} = choose_kv(Choose, Ems, Chains), 304 {NewKVs, NewPrev} = append_item(Ems, Acc, KV, CC), 305 Count1 = case (Count0 + 1) rem ?REPORT_INTERVAL of 306 0 -> 307 Reporter(Count0), 308 0; 309 _ -> 310 Count0 + 1 311 end, 312 merge_chains(Ems, Choose, RestChains, {NewKVs, NewPrev}, Reporter, Count1). 313 314 315init_chains(Ems, Choose, BB) -> 316 Chains = lists:map(fun(CPos) -> 317 {ok, {KVs, NextKVs}} = couch_file:pread_term(Ems#ems.fd, CPos), 318 {KVs, NextKVs} 319 end, BB), 320 order_chains(Choose, Chains). 321 322 323order_chains(small, Chains) -> lists:sort(Chains); 324order_chains(big, Chains) -> lists:reverse(lists:sort(Chains)). 325 326 327choose_kv(_Choose, _Ems, [{[KV], nil} | Rest]) -> 328 {KV, Rest}; 329choose_kv(Choose, Ems, [{[KV], Pos} | RestChains]) -> 330 {ok, Chain} = couch_file:pread_term(Ems#ems.fd, Pos), 331 case Choose of 332 small -> {KV, ins_small_chain(RestChains, Chain, [])}; 333 big -> {KV, ins_big_chain(RestChains, Chain, [])} 334 end; 335choose_kv(Choose, _Ems, [{[KV | RestKVs], Prev} | RestChains]) -> 336 case Choose of 337 small -> {KV, ins_small_chain(RestChains, {RestKVs, Prev}, [])}; 338 big -> {KV, ins_big_chain(RestChains, {RestKVs, Prev}, [])} 339 end. 340 341 342ins_small_chain([{[{K1,_}|_],_}=C1|Rest], {[{K2,_}|_],_}=C2, Acc) when K1<K2 -> 343 ins_small_chain(Rest, C2, [C1 | Acc]); 344ins_small_chain(Rest, Chain, Acc) -> 345 lists:reverse(Acc, [Chain | Rest]). 346 347 348ins_big_chain([{[{K1,_}|_],_}=C1|Rest], {[{K2,_}|_],_}=C2, Acc) when K1>K2 -> 349 ins_big_chain(Rest, C2, [C1 | Acc]); 350ins_big_chain(Rest, Chain, Acc) -> 351 lists:reverse(Acc, [Chain | Rest]). 352 353 354append_item(Ems, {List, Prev}, Pos, Size) when length(List) >= Size -> 355 {ok, PrevList, _} = couch_file:append_term(Ems#ems.fd, {List, Prev}), 356 {[Pos], PrevList}; 357append_item(_Ems, {List, Prev}, Pos, _Size) -> 358 {[Pos | List], Prev}. 359 360 361num_merges(BBChunk, NumBB) when NumBB =< BBChunk -> 362 0; 363num_merges(BBChunk, NumBB) when NumBB > BBChunk -> 364 RevNumBB = ceil(NumBB / BBChunk), 365 FwdNumBB = ceil(RevNumBB / BBChunk), 366 2 + num_merges(BBChunk, FwdNumBB). 367