1{-# LANGUAGE BangPatterns #-}
2
3-- ---------------------------------------------------------------------------
4-- |
5-- Module      : Data.Vector.Algorithms.Tim
6-- Copyright   : (c) 2013-2015 Dan Doel, 2015 Tim Baumann
7-- Maintainer  : Dan Doel <dan.doel@gmail.com>
8-- Stability   : Experimental
9-- Portability : Non-portable (bang patterns)
10--
11-- Timsort is a complex, adaptive, bottom-up merge sort. It is designed to
12-- minimize comparisons as much as possible, even at some cost in overhead.
13-- Thus, it may not be ideal for sorting simple primitive types, for which
14-- comparison is cheap. It may, however, be significantly faster for sorting
15-- arrays of complex values (strings would be an example, though an algorithm
16-- not based on comparison would probably be superior in that particular
17-- case).
18--
19-- For more information on the details of the algorithm, read on.
20--
21-- The first step of the algorithm is to identify runs of elements. These can
22-- either be non-decreasing or strictly decreasing sequences of elements in
23-- the input. Strictly decreasing sequences are used rather than
24-- non-increasing so that they can be easily reversed in place without the
25-- sort becoming unstable.
26--
27-- If the natural runs are too short, they are padded to a minimum value. The
28-- minimum is chosen based on the length of the array, and padded runs are put
29-- in order using insertion sort. The length of the minimum run size is
30-- determined as follows:
31--
32--   * If the length of the array is less than 64, the minimum size is the
33--     length of the array, and insertion sort is used for the entirety
34--
35--   * Otherwise, a value between 32 and 64 is chosen such that N/min is
36--     either equal to or just below a power of two. This avoids having a
37--     small chunk left over to merge into much larger chunks at the end.
38--
39-- This is accomplished by taking the the mininum to be the lowest six bits
40-- containing the highest set bit, and adding one if any other bits are set.
41-- For instance:
42--
43--     length: 00000000 00000000 00000000 00011011 = 25
44--     min:    00000000 00000000 00000000 00011011 = 25
45--
46--     length: 00000000 11111100 00000000 00000000 = 63 * 2^18
47--     min:    00000000 00000000 00000000 00111111 = 63
48--
49--     length: 00000000 11111100 00000000 00000001 = 63 * 2^18 + 1
50--     min:    00000000 00000000 00000000 01000000 = 64
51--
52-- Once chunks can be produced, the next step is merging them. The indices of
53-- all runs are stored in a stack. When we identify a new run, we push it onto
54-- the stack. However, certain invariants are maintained of the stack entries.
55-- Namely:
56--
57--   if stk = _ :> z :> y :> x
58--     length x + length y < length z
59--
60--   if stk = _ :> y :> x
61--     length x < length y
62--
63-- This ensures that the chunks stored are decreasing, and that the chunk
64-- sizes follow something like the fibonacci sequence, ensuring there at most
65-- log-many chunks at any time. If pushing a new chunk on the stack would
66-- violate either of the invariants, we first perform a merge.
67--
68-- If length x + length y >= length z, then y is merged with the smaller of x
69-- and z (if they are tied, x is chosen, because it is more likely to be
70-- cached). If, further,  length x >= length y then they are merged. These steps
71-- are repeated until the invariants are established.
72--
73-- The last important piece of the algorithm is the merging. At first, two
74-- chunks are merged element-wise. However, while doing so, counts are kept of
75-- the number of elements taken from one chunk without any from its partner. If
76-- this count exceeds a threshold, the merge switches to searching for elements
77-- from one chunk in the other, and copying chunks at a time. If these chunks
78-- start falling below the threshold, the merge switches back to element-wise.
79--
80-- The search used in the merge is also special. It uses a galloping strategy,
81-- where exponentially increasing indices are tested, and once two such indices
82-- are determined to bracket the desired value, binary search is used to find
83-- the exact index within that range. This is asymptotically the same as simply
84-- using binary search, but is likely to do fewer comparisons than binary search
85-- would.
86--
87-- One aspect that is not yet implemented from the original Tim sort is the
88-- adjustment of the above threshold. When galloping saves time, the threshold
89-- is lowered, and when it doesn't, it is raised. This may be implemented in the
90-- future.
91
92module Data.Vector.Algorithms.Tim
93       ( sort
94       , sortBy
95       ) where
96
97import Prelude hiding (length, reverse)
98
99import Control.Monad.Primitive
100import Control.Monad (when)
101import Data.Bits
102
103import Data.Vector.Generic.Mutable
104
105import Data.Vector.Algorithms.Search ( gallopingSearchRightPBounds
106                                     , gallopingSearchLeftPBounds
107                                     )
108import Data.Vector.Algorithms.Insertion (sortByBounds', Comparison)
109
110-- | Sorts an array using the default comparison.
111sort :: (PrimMonad m, MVector v e, Ord e) => v (PrimState m) e -> m ()
112sort = sortBy compare
113{-# INLINABLE sort #-}
114
115-- | Sorts an array using a custom comparison.
116sortBy :: (PrimMonad m, MVector v e)
117       => Comparison e -> v (PrimState m) e -> m ()
118sortBy cmp vec
119  | mr == len = iter [0] 0 (error "no merge buffer needed!")
120  | otherwise = new 256 >>= iter [] 0
121 where
122 len = length vec
123 mr = minrun len
124 iter s i tmpBuf
125   | i >= len  = performRemainingMerges s tmpBuf
126otherwise = do (order, runLen) <- nextRun cmp vec i len
127                    when (order == Descending) $
128                      reverse $ unsafeSlice i runLen vec
129                    let runEnd = min len (i + max runLen mr)
130                    sortByBounds' cmp vec i (i+runLen) runEnd
131                    (s', tmpBuf') <- performMerges (i : s) runEnd tmpBuf
132                    iter s' runEnd tmpBuf'
133 runLengthInvariantBroken a b c i = (b - a <= i - b) || (c - b <= i - c)
134 performMerges [b,a] i tmpBuf
135   | i - b >= b - a = merge cmp vec a b i tmpBuf >>= performMerges [a] i
136 performMerges (c:b:a:ss) i tmpBuf
137   | runLengthInvariantBroken a b c i =
138     if i - c <= b - a
139       then merge cmp vec b c i tmpBuf >>= performMerges (b:a:ss) i
140       else do tmpBuf' <- merge cmp vec a b c tmpBuf
141               (ass', tmpBuf'') <- performMerges (a:ss) c tmpBuf'
142               performMerges (c:ass') i tmpBuf''
143 performMerges s _ tmpBuf = return (s, tmpBuf)
144 performRemainingMerges (b:a:ss) tmpBuf =
145   merge cmp vec a b len tmpBuf >>= performRemainingMerges (a:ss)
146 performRemainingMerges _ _ = return ()
147{-# INLINE sortBy #-}
148
149-- | Computes the minimum run size for the sort. The goal is to choose a size
150-- such that there are almost if not exactly 2^n chunks of that size in the
151-- array.
152minrun :: Int -> Int
153minrun n0 = (n0 `unsafeShiftR` extra) + if (lowMask .&. n0) > 0 then 1 else 0
154 where
155 -- smear the bits down from the most significant bit
156 !n1 = n0 .|. unsafeShiftR n0 1
157 !n2 = n1 .|. unsafeShiftR n1 2
158 !n3 = n2 .|. unsafeShiftR n2 4
159 !n4 = n3 .|. unsafeShiftR n3 8
160 !n5 = n4 .|. unsafeShiftR n4 16
161 !n6 = n5 .|. unsafeShiftR n5 32
162
163 -- mask for the bits lower than the 6 highest bits
164 !lowMask = n6 `unsafeShiftR` 6
165
166 !extra = popCount lowMask
167{-# INLINE minrun #-}
168
169data Order = Ascending | Descending deriving (Eq, Show)
170
171-- | Identify the next run (that is a monotonically increasing or strictly
172-- decreasing sequence) in the slice [l,u) in vec. Returns the order and length
173-- of the run.
174nextRun :: (PrimMonad m, MVector v e)
175        => Comparison e
176        -> v (PrimState m) e
177        -> Int -- ^ l
178        -> Int -- ^ u
179        -> m (Order, Int)
180nextRun _ _ i len | i+1 >= len = return (Ascending, 1)
181nextRun cmp vec i len = do x <- unsafeRead vec i
182                           y <- unsafeRead vec (i+1)
183                           if x `gt` y then desc y 2 else asc  y 2
184 where
185 gt a b = cmp a b == GT
186 desc _ !k | i + k >= len = return (Descending, k)
187 desc x !k = do y <- unsafeRead vec (i+k)
188                if x `gt` y then desc y (k+1) else return (Descending, k)
189 asc _ !k | i + k >= len = return (Ascending, k)
190 asc x !k = do y <- unsafeRead vec (i+k)
191               if x `gt` y then return (Ascending, k) else asc y (k+1)
192{-# INLINE nextRun #-}
193
194-- | Tests if a temporary buffer has a given size. If not, allocates a new
195-- buffer and returns it instead of the old temporary buffer.
196ensureCapacity :: (PrimMonad m, MVector v e)
197               => Int -> v (PrimState m) e -> m (v (PrimState m) e)
198ensureCapacity l tmpBuf
199  | l <= length tmpBuf = return tmpBuf
200  | otherwise          = new (2*l)
201{-# INLINE ensureCapacity #-}
202
203-- | Copy the slice [i,i+len) from vec to tmpBuf. If tmpBuf is not large enough,
204-- a new buffer is allocated and used. Returns the buffer.
205cloneSlice :: (PrimMonad m, MVector v e)
206           => Int -- ^ i
207           -> Int -- ^ len
208           -> v (PrimState m) e -- ^ vec
209           -> v (PrimState m) e -- ^ tmpBuf
210           -> m (v (PrimState m) e)
211cloneSlice i len vec tmpBuf = do
212  tmpBuf' <- ensureCapacity len tmpBuf
213  unsafeCopy (unsafeSlice 0 len tmpBuf') (unsafeSlice i len vec)
214  return tmpBuf'
215{-# INLINE cloneSlice #-}
216
217-- | Number of consecutive times merge chooses the element from the same run
218-- before galloping mode is activated.
219minGallop :: Int
220minGallop = 7
221{-# INLINE minGallop #-}
222
223-- | Merge the adjacent sorted slices [l,m) and [m,u) in vec. This is done by
224-- copying the slice [l,m) to a temporary buffer. Returns the (enlarged)
225-- temporary buffer.
226mergeLo :: (PrimMonad m, MVector v e)
227        => Comparison e
228        -> v (PrimState m) e -- ^ vec
229        -> Int -- ^ l
230        -> Int -- ^ m
231        -> Int -- ^ u
232        -> v (PrimState m) e -- ^ tmpBuf
233        -> m (v (PrimState m) e)
234mergeLo cmp vec l m u tempBuf' = do
235  tmpBuf <- cloneSlice l tmpBufLen vec tempBuf'
236  vi <- unsafeRead tmpBuf 0
237  vj <- unsafeRead vec m
238  iter tmpBuf 0 m l vi vj minGallop minGallop
239  return tmpBuf
240 where
241 gt  a b = cmp a b == GT
242 gte a b = cmp a b /= LT
243 tmpBufLen = m - l
244
245 finalize tmpBuf i k = do
246   let from = unsafeSlice i (tmpBufLen-i) tmpBuf
247       to   = unsafeSlice k (tmpBufLen-i) vec
248   unsafeCopy to from
249
250 iter _ i _ _ _ _ _ _ | i >= tmpBufLen = return ()
251 iter tmpBuf i j k _ _ _ _ | j >= u = finalize tmpBuf i k
252 iter tmpBuf i j k _ vj 0 _ = do
253   i' <- gallopingSearchLeftPBounds (`gt` vj) tmpBuf i tmpBufLen
254   let gallopLen = i' - i
255       from = unsafeSlice i gallopLen tmpBuf
256       to   = unsafeSlice k gallopLen vec
257   unsafeCopy to from
258   when (i' < tmpBufLen) $ do
259     vi' <- unsafeRead tmpBuf i'
260     iter tmpBuf i' j (k+gallopLen) vi' vj minGallop minGallop
261 iter tmpBuf i j k vi _ _ 0 = do
262   j' <- gallopingSearchLeftPBounds (`gte` vi) vec j u
263   let gallopLen = j' - j
264       from = slice j gallopLen vec
265       to   = slice k gallopLen vec
266   unsafeMove to from
267   if j' >= u then finalize tmpBuf i (k + gallopLen) else do
268     vj' <- unsafeRead vec j'
269     iter tmpBuf i j' (k+gallopLen) vi vj' minGallop minGallop
270 iter tmpBuf i j k vi vj ga gb
271   | vj `gte` vi = do unsafeWrite vec k vi
272                      when (i + 1 < tmpBufLen) $ do
273                        vi' <- unsafeRead tmpBuf (i+1)
274                        iter tmpBuf (i+1) j (k+1) vi' vj (ga-1) minGallop
275   | otherwise   = do unsafeWrite vec k vj
276                      if j + 1 >= u then finalize tmpBuf i (k + 1) else do
277                        vj' <- unsafeRead vec (j+1)
278                        iter tmpBuf i (j+1) (k+1) vi vj' minGallop (gb-1)
279{-# INLINE mergeLo #-}
280
281-- | Merge the adjacent sorted slices [l,m) and [m,u) in vec. This is done by
282-- copying the slice [j,k) to a temporary buffer. Returns the (enlarged)
283-- temporary buffer.
284mergeHi :: (PrimMonad m, MVector v e)
285        => Comparison e
286        -> v (PrimState m) e -- ^ vec
287        -> Int -- ^ l
288        -> Int -- ^ m
289        -> Int -- ^ u
290        -> v (PrimState m) e -- ^ tmpBuf
291        -> m (v (PrimState m) e)
292mergeHi cmp vec l m u tmpBuf' = do
293  tmpBuf <- cloneSlice m tmpBufLen vec tmpBuf'
294  vi <- unsafeRead vec (m-1)
295  vj <- unsafeRead tmpBuf (tmpBufLen-1)
296  iter tmpBuf (m-1) (tmpBufLen-1) (u-1) vi vj minGallop minGallop
297  return tmpBuf
298 where
299 gt  a b = cmp a b == GT
300 gte a b = cmp a b /= LT
301 tmpBufLen = u - m
302
303 finalize tmpBuf j = do
304   let from = unsafeSlice 0 (j+1) tmpBuf
305       to   = unsafeSlice l (j+1) vec
306   unsafeCopy to from
307
308 iter _ _ j _ _ _ _ _ | j < 0 = return ()
309 iter tmpBuf i j _ _ _ _ _ | i < l = finalize tmpBuf j
310 iter tmpBuf i j k _ vj 0 _ = do
311   i' <- gallopingSearchRightPBounds (`gt` vj) vec l i
312   let gallopLen = i - i'
313       from = slice (i'+1) gallopLen vec
314       to   = slice (k-gallopLen+1) gallopLen vec
315   unsafeMove to from
316   if i' < l then finalize tmpBuf j else do
317     vi' <- unsafeRead vec i'
318     iter tmpBuf i' j (k-gallopLen) vi' vj minGallop minGallop
319 iter tmpBuf i j k vi _ _ 0 = do
320   j' <- gallopingSearchRightPBounds (`gte` vi) tmpBuf 0 j
321   let gallopLen = j - j'
322       from = slice (j'+1) gallopLen tmpBuf
323       to   = slice (k-gallopLen+1) gallopLen vec
324   unsafeCopy to from
325   when (j' >= 0) $ do
326     vj' <- unsafeRead tmpBuf j'
327     iter tmpBuf i j' (k-gallopLen) vi vj' minGallop minGallop
328 iter tmpBuf i j k vi vj ga gb
329   | vi `gt` vj = do unsafeWrite vec k vi
330                     if i - 1 < l then finalize tmpBuf j else do
331                       vi' <- unsafeRead vec (i-1)
332                       iter tmpBuf (i-1) j (k-1) vi' vj (ga-1) minGallop
333   | otherwise  = do unsafeWrite vec k vj
334                     when (j - 1 >= 0) $ do
335                       vj' <- unsafeRead tmpBuf (j-1)
336                       iter tmpBuf i (j-1) (k-1) vi vj' minGallop (gb-1)
337{-# INLINE mergeHi #-}
338
339-- | Merge the adjacent sorted slices A=[l,m) and B=[m,u) in vec. This begins
340-- with galloping searches to find the index of vec[m] in A and the index of
341-- vec[m-1] in B to reduce the sizes of A and B. Then it uses `mergeHi` or
342-- `mergeLo` depending on whether A or B is larger. Returns the (enlarged)
343-- temporary buffer.
344merge :: (PrimMonad m, MVector v e)
345      => Comparison e
346      -> v (PrimState m) e -- ^ vec
347      -> Int -- ^ l
348      -> Int -- ^ m
349      -> Int -- ^ u
350      -> v (PrimState m) e -- ^ tmpBuf
351      -> m (v (PrimState m) e)
352merge cmp vec l m u tmpBuf = do
353  vm <- unsafeRead vec m
354  l' <- gallopingSearchLeftPBounds (`gt` vm) vec l m
355  if l' >= m
356    then return tmpBuf
357    else do
358      vn <- unsafeRead vec (m-1)
359      u' <- gallopingSearchRightPBounds (`gte` vn) vec m u
360      if u' <= m
361        then return tmpBuf
362        else (if (m-l') <= (u'-m) then mergeLo else mergeHi) cmp vec l' m u' tmpBuf
363 where
364 gt  a b = cmp a b == GT
365 gte a b = cmp a b /= LT
366{-# INLINE merge #-}
367