Skip to content

Commit

Permalink
Merge pull request #18 from ambiata/topic/heap
Browse files Browse the repository at this point in the history
Replace MVector with Heap in Vanguard.
  • Loading branch information
novemberkilo authored Mar 28, 2017
2 parents 944baa3 + e5ad481 commit 936f27d
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 64 deletions.
1 change: 1 addition & 0 deletions ambiata-regiment.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ library
, bytestring == 0.10.*
, directory == 1.2.*
, filepath == 1.3.*
, heaps == 0.3.*
, primitive == 0.6.*
, resourcet == 1.1.*
, temporary == 1.2.0.4
Expand Down
12 changes: 6 additions & 6 deletions src/Regiment/Data.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ module Regiment.Data (

import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import qualified Data.Heap as DH
import Data.Vector (Vector)
import qualified Data.Vector as Boxed
import qualified Data.Vector.Mutable as MBoxed
import Data.Word (Word8)

import P
Expand Down Expand Up @@ -69,8 +69,8 @@ newtype Key =

data KeyedPayload =
KeyedPayload {
keys :: Vector Key
, payload :: ByteString
keys :: !(Vector Key)
, payload :: !ByteString
} deriving (Eq, Show)

instance Ord KeyedPayload where
Expand Down Expand Up @@ -116,10 +116,10 @@ data Cursor a =
| EOF
deriving (Show)

data Vanguard s a =
newtype Vanguard a =
Vanguard {
vanguard :: MBoxed.MVector s (Cursor a)
}
unVanguard :: DH.Heap (Cursor a)
} deriving (Eq, Show)

instance Eq (Cursor a) where
(==) x y = compare x y == EQ
Expand Down
68 changes: 24 additions & 44 deletions src/Regiment/Vanguard/Base.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@ module Regiment.Vanguard.Base (
, readCursor
, formVanguard
, runVanguard
, updateMinCursor
) where

import Control.Monad.Primitive (PrimMonad, PrimState)
import Control.Monad.Trans.Class (lift)

import qualified Data.ByteString as BS
import qualified Data.Vector as Boxed
import qualified Data.Vector.Mutable as MBoxed
import qualified Data.Heap as DH

import P

Expand All @@ -33,54 +30,37 @@ readCursor :: Monad m
readCursor reader a' = do
bimapEitherT RegimentMergeCursorError (maybe EOF (NonEmpty a')) (reader a')

formVanguard :: PrimMonad m

formVanguard :: Monad m
=> (a -> EitherT x m (Maybe KeyedPayload))
-> [a]
-> EitherT (RegimentMergeError x) m (Vanguard (PrimState m) a)
-> EitherT (RegimentMergeError x) m (Vanguard a)
formVanguard reader l = do
v <- Boxed.mapM (readCursor reader) (Boxed.fromList l)
v' <- Boxed.thaw v
return $ Vanguard v'
v <- mapM (readCursor reader) l
return . Vanguard $ DH.fromList v


updateMinCursor :: PrimMonad m
=> (a -> EitherT x m (Maybe KeyedPayload))
-> Vanguard (PrimState m) a
-> EitherT (RegimentMergeError x) m (Cursor a, Vanguard (PrimState m) a)
updateMinCursor reader v =
let
vcs = vanguard v
len = MBoxed.length vcs
in
case len of
0 -> left $ RegimentMergeVanguardEmptyError
_ -> do
when (len > 1) $
-- linear bubble up of min
-- TODO: use a heap instead
for_ [1 .. ((MBoxed.length vcs) - 1)] $ \i -> do
m <- MBoxed.read vcs 0
n <- MBoxed.read vcs i
when (n < m)
(MBoxed.unsafeSwap vcs 0 i)
-- elt at index 0 should now be min
minCursor <- MBoxed.read vcs 0
case minCursor of
EOF -> return (EOF, Vanguard vcs)
NonEmpty h _ -> do
nl <- readCursor reader h
MBoxed.write vcs 0 nl
return $ (minCursor, Vanguard vcs)
updateVanguard :: Monad m
=> a
-> Vanguard a
-> (a -> EitherT x m (Maybe KeyedPayload))
-> EitherT (RegimentMergeError x) m (Vanguard a)
updateVanguard h v reader = do
nextCursor <- readCursor reader h
return . Vanguard . DH.insert nextCursor $ unVanguard v

runVanguard :: PrimMonad m
=> Vanguard (PrimState m) a

runVanguard :: Monad m
=> Vanguard a
-> (a -> EitherT x m (Maybe KeyedPayload))
-> (BS.ByteString -> m ())
-> EitherT (RegimentMergeError x) m ()
runVanguard v reader writer = do
(minCursor, v') <- updateMinCursor reader v
case minCursor of
runVanguard (Vanguard v) reader writer = do
when (DH.null v) $
left $ RegimentMergeVanguardEmptyError
case DH.minimum v of
EOF -> return ()
NonEmpty _ kp -> do
NonEmpty h kp -> do
lift . writer $ payload kp
v' <- updateVanguard h (Vanguard $ DH.deleteMin v) reader
runVanguard v' reader writer

13 changes: 2 additions & 11 deletions src/Regiment/Vanguard/IO.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@ module Regiment.Vanguard.IO (
, formVanguardIO
, readKeyedPayloadIO
, runVanguardIO
, updateMinCursorIO
) where

import Control.Monad.IO.Class (liftIO, MonadIO)
import Control.Monad.Primitive (PrimState)

import qualified Data.Binary.Get as Get
import qualified Data.ByteString as BS
Expand Down Expand Up @@ -61,7 +59,7 @@ readCursorIO :: MonadIO m
readCursorIO h =
readCursor readKeyedPayloadIO h

runVanguardIO :: Vanguard (PrimState IO) Handle
runVanguardIO :: Vanguard Handle
-> Handle
-> EitherT (RegimentMergeError RegimentMergeIOError) IO ()
runVanguardIO v out =
Expand All @@ -70,17 +68,10 @@ runVanguardIO v out =
formVanguardIO :: [Handle]
-> EitherT
(RegimentMergeError RegimentMergeIOError)
IO (Vanguard (PrimState IO) Handle)
IO (Vanguard Handle)
formVanguardIO handles = do
formVanguard readKeyedPayloadIO handles

updateMinCursorIO :: Vanguard (PrimState IO) Handle
-> EitherT
(RegimentMergeError RegimentMergeIOError)
IO (Cursor Handle, Vanguard (PrimState IO) Handle)
updateMinCursorIO v =
updateMinCursor readKeyedPayloadIO v

peekInt32 :: ByteString -> IO (Maybe Int32)
peekInt32 (PS fp off len) =
if len /= 4 then
Expand Down
5 changes: 2 additions & 3 deletions src/Regiment/Vanguard/List.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ module Regiment.Vanguard.List (
, runVanguardList
) where

import Control.Monad.Primitive (PrimState, PrimMonad)
import Control.Monad.ST (runST, ST)
import Control.Monad.Trans.Class (lift)

Expand Down Expand Up @@ -36,11 +35,11 @@ writePayloadToList :: ST.STRef s [BS.ByteString] -> BS.ByteString -> ST s ()
writePayloadToList lst p =
ST.modifySTRef lst (\lp -> lp <> [p])

formVanguardList :: (PrimMonad (ST s))
formVanguardList :: (Monad (ST s))
=> [ST.STRef s [KeyedPayload]]
-> EitherT
(RegimentMergeError x)
(ST s) (Vanguard (PrimState (ST s)) (ST.STRef s [KeyedPayload]))
(ST s) (Vanguard (ST.STRef s [KeyedPayload]))
formVanguardList strkps =
formVanguard readCursorList strkps

Expand Down

0 comments on commit 936f27d

Please sign in to comment.