1{-# LANGUAGE BangPatterns #-} 2 3-- --------------------------------------------------------------------------- 4-- | 5-- Module : Data.Vector.Algorithms.Tim 6-- Copyright : (c) 2013-2015 Dan Doel, 2015 Tim Baumann 7-- Maintainer : Dan Doel <dan.doel@gmail.com> 8-- Stability : Experimental 9-- Portability : Non-portable (bang patterns) 10-- 11-- Timsort is a complex, adaptive, bottom-up merge sort. It is designed to 12-- minimize comparisons as much as possible, even at some cost in overhead. 13-- Thus, it may not be ideal for sorting simple primitive types, for which 14-- comparison is cheap. It may, however, be significantly faster for sorting 15-- arrays of complex values (strings would be an example, though an algorithm 16-- not based on comparison would probably be superior in that particular 17-- case). 18-- 19-- For more information on the details of the algorithm, read on. 20-- 21-- The first step of the algorithm is to identify runs of elements. These can 22-- either be non-decreasing or strictly decreasing sequences of elements in 23-- the input. Strictly decreasing sequences are used rather than 24-- non-increasing so that they can be easily reversed in place without the 25-- sort becoming unstable. 26-- 27-- If the natural runs are too short, they are padded to a minimum value. The 28-- minimum is chosen based on the length of the array, and padded runs are put 29-- in order using insertion sort. The length of the minimum run size is 30-- determined as follows: 31-- 32-- * If the length of the array is less than 64, the minimum size is the 33-- length of the array, and insertion sort is used for the entirety 34-- 35-- * Otherwise, a value between 32 and 64 is chosen such that N/min is 36-- either equal to or just below a power of two. This avoids having a 37-- small chunk left over to merge into much larger chunks at the end. 38-- 39-- This is accomplished by taking the the mininum to be the lowest six bits 40-- containing the highest set bit, and adding one if any other bits are set. 41-- For instance: 42-- 43-- length: 00000000 00000000 00000000 00011011 = 25 44-- min: 00000000 00000000 00000000 00011011 = 25 45-- 46-- length: 00000000 11111100 00000000 00000000 = 63 * 2^18 47-- min: 00000000 00000000 00000000 00111111 = 63 48-- 49-- length: 00000000 11111100 00000000 00000001 = 63 * 2^18 + 1 50-- min: 00000000 00000000 00000000 01000000 = 64 51-- 52-- Once chunks can be produced, the next step is merging them. The indices of 53-- all runs are stored in a stack. When we identify a new run, we push it onto 54-- the stack. However, certain invariants are maintained of the stack entries. 55-- Namely: 56-- 57-- if stk = _ :> z :> y :> x 58-- length x + length y < length z 59-- 60-- if stk = _ :> y :> x 61-- length x < length y 62-- 63-- This ensures that the chunks stored are decreasing, and that the chunk 64-- sizes follow something like the fibonacci sequence, ensuring there at most 65-- log-many chunks at any time. If pushing a new chunk on the stack would 66-- violate either of the invariants, we first perform a merge. 67-- 68-- If length x + length y >= length z, then y is merged with the smaller of x 69-- and z (if they are tied, x is chosen, because it is more likely to be 70-- cached). If, further, length x >= length y then they are merged. These steps 71-- are repeated until the invariants are established. 72-- 73-- The last important piece of the algorithm is the merging. At first, two 74-- chunks are merged element-wise. However, while doing so, counts are kept of 75-- the number of elements taken from one chunk without any from its partner. If 76-- this count exceeds a threshold, the merge switches to searching for elements 77-- from one chunk in the other, and copying chunks at a time. If these chunks 78-- start falling below the threshold, the merge switches back to element-wise. 79-- 80-- The search used in the merge is also special. It uses a galloping strategy, 81-- where exponentially increasing indices are tested, and once two such indices 82-- are determined to bracket the desired value, binary search is used to find 83-- the exact index within that range. This is asymptotically the same as simply 84-- using binary search, but is likely to do fewer comparisons than binary search 85-- would. 86-- 87-- One aspect that is not yet implemented from the original Tim sort is the 88-- adjustment of the above threshold. When galloping saves time, the threshold 89-- is lowered, and when it doesn't, it is raised. This may be implemented in the 90-- future. 91 92module Data.Vector.Algorithms.Tim 93 ( sort 94 , sortBy 95 ) where 96 97import Prelude hiding (length, reverse) 98 99import Control.Monad.Primitive 100import Control.Monad (when) 101import Data.Bits 102 103import Data.Vector.Generic.Mutable 104 105import Data.Vector.Algorithms.Search ( gallopingSearchRightPBounds 106 , gallopingSearchLeftPBounds 107 ) 108import Data.Vector.Algorithms.Insertion (sortByBounds', Comparison) 109 110-- | Sorts an array using the default comparison. 111sort :: (PrimMonad m, MVector v e, Ord e) => v (PrimState m) e -> m () 112sort = sortBy compare 113{-# INLINABLE sort #-} 114 115-- | Sorts an array using a custom comparison. 116sortBy :: (PrimMonad m, MVector v e) 117 => Comparison e -> v (PrimState m) e -> m () 118sortBy cmp vec 119 | mr == len = iter [0] 0 (error "no merge buffer needed!") 120 | otherwise = new 256 >>= iter [] 0 121 where 122 len = length vec 123 mr = minrun len 124 iter s i tmpBuf 125 | i >= len = performRemainingMerges s tmpBuf 126 | otherwise = do (order, runLen) <- nextRun cmp vec i len 127 when (order == Descending) $ 128 reverse $ unsafeSlice i runLen vec 129 let runEnd = min len (i + max runLen mr) 130 sortByBounds' cmp vec i (i+runLen) runEnd 131 (s', tmpBuf') <- performMerges (i : s) runEnd tmpBuf 132 iter s' runEnd tmpBuf' 133 runLengthInvariantBroken a b c i = (b - a <= i - b) || (c - b <= i - c) 134 performMerges [b,a] i tmpBuf 135 | i - b >= b - a = merge cmp vec a b i tmpBuf >>= performMerges [a] i 136 performMerges (c:b:a:ss) i tmpBuf 137 | runLengthInvariantBroken a b c i = 138 if i - c <= b - a 139 then merge cmp vec b c i tmpBuf >>= performMerges (b:a:ss) i 140 else do tmpBuf' <- merge cmp vec a b c tmpBuf 141 (ass', tmpBuf'') <- performMerges (a:ss) c tmpBuf' 142 performMerges (c:ass') i tmpBuf'' 143 performMerges s _ tmpBuf = return (s, tmpBuf) 144 performRemainingMerges (b:a:ss) tmpBuf = 145 merge cmp vec a b len tmpBuf >>= performRemainingMerges (a:ss) 146 performRemainingMerges _ _ = return () 147{-# INLINE sortBy #-} 148 149-- | Computes the minimum run size for the sort. The goal is to choose a size 150-- such that there are almost if not exactly 2^n chunks of that size in the 151-- array. 152minrun :: Int -> Int 153minrun n0 = (n0 `unsafeShiftR` extra) + if (lowMask .&. n0) > 0 then 1 else 0 154 where 155 -- smear the bits down from the most significant bit 156 !n1 = n0 .|. unsafeShiftR n0 1 157 !n2 = n1 .|. unsafeShiftR n1 2 158 !n3 = n2 .|. unsafeShiftR n2 4 159 !n4 = n3 .|. unsafeShiftR n3 8 160 !n5 = n4 .|. unsafeShiftR n4 16 161 !n6 = n5 .|. unsafeShiftR n5 32 162 163 -- mask for the bits lower than the 6 highest bits 164 !lowMask = n6 `unsafeShiftR` 6 165 166 !extra = popCount lowMask 167{-# INLINE minrun #-} 168 169data Order = Ascending | Descending deriving (Eq, Show) 170 171-- | Identify the next run (that is a monotonically increasing or strictly 172-- decreasing sequence) in the slice [l,u) in vec. Returns the order and length 173-- of the run. 174nextRun :: (PrimMonad m, MVector v e) 175 => Comparison e 176 -> v (PrimState m) e 177 -> Int -- ^ l 178 -> Int -- ^ u 179 -> m (Order, Int) 180nextRun _ _ i len | i+1 >= len = return (Ascending, 1) 181nextRun cmp vec i len = do x <- unsafeRead vec i 182 y <- unsafeRead vec (i+1) 183 if x `gt` y then desc y 2 else asc y 2 184 where 185 gt a b = cmp a b == GT 186 desc _ !k | i + k >= len = return (Descending, k) 187 desc x !k = do y <- unsafeRead vec (i+k) 188 if x `gt` y then desc y (k+1) else return (Descending, k) 189 asc _ !k | i + k >= len = return (Ascending, k) 190 asc x !k = do y <- unsafeRead vec (i+k) 191 if x `gt` y then return (Ascending, k) else asc y (k+1) 192{-# INLINE nextRun #-} 193 194-- | Tests if a temporary buffer has a given size. If not, allocates a new 195-- buffer and returns it instead of the old temporary buffer. 196ensureCapacity :: (PrimMonad m, MVector v e) 197 => Int -> v (PrimState m) e -> m (v (PrimState m) e) 198ensureCapacity l tmpBuf 199 | l <= length tmpBuf = return tmpBuf 200 | otherwise = new (2*l) 201{-# INLINE ensureCapacity #-} 202 203-- | Copy the slice [i,i+len) from vec to tmpBuf. If tmpBuf is not large enough, 204-- a new buffer is allocated and used. Returns the buffer. 205cloneSlice :: (PrimMonad m, MVector v e) 206 => Int -- ^ i 207 -> Int -- ^ len 208 -> v (PrimState m) e -- ^ vec 209 -> v (PrimState m) e -- ^ tmpBuf 210 -> m (v (PrimState m) e) 211cloneSlice i len vec tmpBuf = do 212 tmpBuf' <- ensureCapacity len tmpBuf 213 unsafeCopy (unsafeSlice 0 len tmpBuf') (unsafeSlice i len vec) 214 return tmpBuf' 215{-# INLINE cloneSlice #-} 216 217-- | Number of consecutive times merge chooses the element from the same run 218-- before galloping mode is activated. 219minGallop :: Int 220minGallop = 7 221{-# INLINE minGallop #-} 222 223-- | Merge the adjacent sorted slices [l,m) and [m,u) in vec. This is done by 224-- copying the slice [l,m) to a temporary buffer. Returns the (enlarged) 225-- temporary buffer. 226mergeLo :: (PrimMonad m, MVector v e) 227 => Comparison e 228 -> v (PrimState m) e -- ^ vec 229 -> Int -- ^ l 230 -> Int -- ^ m 231 -> Int -- ^ u 232 -> v (PrimState m) e -- ^ tmpBuf 233 -> m (v (PrimState m) e) 234mergeLo cmp vec l m u tempBuf' = do 235 tmpBuf <- cloneSlice l tmpBufLen vec tempBuf' 236 vi <- unsafeRead tmpBuf 0 237 vj <- unsafeRead vec m 238 iter tmpBuf 0 m l vi vj minGallop minGallop 239 return tmpBuf 240 where 241 gt a b = cmp a b == GT 242 gte a b = cmp a b /= LT 243 tmpBufLen = m - l 244 245 finalize tmpBuf i k = do 246 let from = unsafeSlice i (tmpBufLen-i) tmpBuf 247 to = unsafeSlice k (tmpBufLen-i) vec 248 unsafeCopy to from 249 250 iter _ i _ _ _ _ _ _ | i >= tmpBufLen = return () 251 iter tmpBuf i j k _ _ _ _ | j >= u = finalize tmpBuf i k 252 iter tmpBuf i j k _ vj 0 _ = do 253 i' <- gallopingSearchLeftPBounds (`gt` vj) tmpBuf i tmpBufLen 254 let gallopLen = i' - i 255 from = unsafeSlice i gallopLen tmpBuf 256 to = unsafeSlice k gallopLen vec 257 unsafeCopy to from 258 when (i' < tmpBufLen) $ do 259 vi' <- unsafeRead tmpBuf i' 260 iter tmpBuf i' j (k+gallopLen) vi' vj minGallop minGallop 261 iter tmpBuf i j k vi _ _ 0 = do 262 j' <- gallopingSearchLeftPBounds (`gte` vi) vec j u 263 let gallopLen = j' - j 264 from = slice j gallopLen vec 265 to = slice k gallopLen vec 266 unsafeMove to from 267 if j' >= u then finalize tmpBuf i (k + gallopLen) else do 268 vj' <- unsafeRead vec j' 269 iter tmpBuf i j' (k+gallopLen) vi vj' minGallop minGallop 270 iter tmpBuf i j k vi vj ga gb 271 | vj `gte` vi = do unsafeWrite vec k vi 272 when (i + 1 < tmpBufLen) $ do 273 vi' <- unsafeRead tmpBuf (i+1) 274 iter tmpBuf (i+1) j (k+1) vi' vj (ga-1) minGallop 275 | otherwise = do unsafeWrite vec k vj 276 if j + 1 >= u then finalize tmpBuf i (k + 1) else do 277 vj' <- unsafeRead vec (j+1) 278 iter tmpBuf i (j+1) (k+1) vi vj' minGallop (gb-1) 279{-# INLINE mergeLo #-} 280 281-- | Merge the adjacent sorted slices [l,m) and [m,u) in vec. This is done by 282-- copying the slice [j,k) to a temporary buffer. Returns the (enlarged) 283-- temporary buffer. 284mergeHi :: (PrimMonad m, MVector v e) 285 => Comparison e 286 -> v (PrimState m) e -- ^ vec 287 -> Int -- ^ l 288 -> Int -- ^ m 289 -> Int -- ^ u 290 -> v (PrimState m) e -- ^ tmpBuf 291 -> m (v (PrimState m) e) 292mergeHi cmp vec l m u tmpBuf' = do 293 tmpBuf <- cloneSlice m tmpBufLen vec tmpBuf' 294 vi <- unsafeRead vec (m-1) 295 vj <- unsafeRead tmpBuf (tmpBufLen-1) 296 iter tmpBuf (m-1) (tmpBufLen-1) (u-1) vi vj minGallop minGallop 297 return tmpBuf 298 where 299 gt a b = cmp a b == GT 300 gte a b = cmp a b /= LT 301 tmpBufLen = u - m 302 303 finalize tmpBuf j = do 304 let from = unsafeSlice 0 (j+1) tmpBuf 305 to = unsafeSlice l (j+1) vec 306 unsafeCopy to from 307 308 iter _ _ j _ _ _ _ _ | j < 0 = return () 309 iter tmpBuf i j _ _ _ _ _ | i < l = finalize tmpBuf j 310 iter tmpBuf i j k _ vj 0 _ = do 311 i' <- gallopingSearchRightPBounds (`gt` vj) vec l i 312 let gallopLen = i - i' 313 from = slice (i'+1) gallopLen vec 314 to = slice (k-gallopLen+1) gallopLen vec 315 unsafeMove to from 316 if i' < l then finalize tmpBuf j else do 317 vi' <- unsafeRead vec i' 318 iter tmpBuf i' j (k-gallopLen) vi' vj minGallop minGallop 319 iter tmpBuf i j k vi _ _ 0 = do 320 j' <- gallopingSearchRightPBounds (`gte` vi) tmpBuf 0 j 321 let gallopLen = j - j' 322 from = slice (j'+1) gallopLen tmpBuf 323 to = slice (k-gallopLen+1) gallopLen vec 324 unsafeCopy to from 325 when (j' >= 0) $ do 326 vj' <- unsafeRead tmpBuf j' 327 iter tmpBuf i j' (k-gallopLen) vi vj' minGallop minGallop 328 iter tmpBuf i j k vi vj ga gb 329 | vi `gt` vj = do unsafeWrite vec k vi 330 if i - 1 < l then finalize tmpBuf j else do 331 vi' <- unsafeRead vec (i-1) 332 iter tmpBuf (i-1) j (k-1) vi' vj (ga-1) minGallop 333 | otherwise = do unsafeWrite vec k vj 334 when (j - 1 >= 0) $ do 335 vj' <- unsafeRead tmpBuf (j-1) 336 iter tmpBuf i (j-1) (k-1) vi vj' minGallop (gb-1) 337{-# INLINE mergeHi #-} 338 339-- | Merge the adjacent sorted slices A=[l,m) and B=[m,u) in vec. This begins 340-- with galloping searches to find the index of vec[m] in A and the index of 341-- vec[m-1] in B to reduce the sizes of A and B. Then it uses `mergeHi` or 342-- `mergeLo` depending on whether A or B is larger. Returns the (enlarged) 343-- temporary buffer. 344merge :: (PrimMonad m, MVector v e) 345 => Comparison e 346 -> v (PrimState m) e -- ^ vec 347 -> Int -- ^ l 348 -> Int -- ^ m 349 -> Int -- ^ u 350 -> v (PrimState m) e -- ^ tmpBuf 351 -> m (v (PrimState m) e) 352merge cmp vec l m u tmpBuf = do 353 vm <- unsafeRead vec m 354 l' <- gallopingSearchLeftPBounds (`gt` vm) vec l m 355 if l' >= m 356 then return tmpBuf 357 else do 358 vn <- unsafeRead vec (m-1) 359 u' <- gallopingSearchRightPBounds (`gte` vn) vec m u 360 if u' <= m 361 then return tmpBuf 362 else (if (m-l') <= (u'-m) then mergeLo else mergeHi) cmp vec l' m u' tmpBuf 363 where 364 gt a b = cmp a b == GT 365 gte a b = cmp a b /= LT 366{-# INLINE merge #-} 367