diff --git a/mafia b/mafia index ae8673b..fc829d1 100755 --- a/mafia +++ b/mafia @@ -1,123 +1,63 @@ #!/bin/sh -eu : ${MAFIA_HOME:=$HOME/.mafia} - -fetch_latest () { - if [ -z ${MAFIA_TEST_MODE+x} ]; then - TZ=$(date +"%T") - curl --silent "https://raw.githubusercontent.com/ambiata/mafia/master/script/mafia?$TZ" - else - cat ../script/mafia - fi -} +: ${MAFIA_VERSIONS:=$MAFIA_HOME/versions} latest_version () { - git ls-remote https://github.com/ambiata/mafia | grep refs/heads/master | cut -f 1 + git ls-remote https://github.com/ambiata/mafia | grep refs/heads/master | cut -f 1 } -local_version () { - awk '/^# Version: / { print $3; exit 0; }' $0 +build_version() { + MAFIA_VERSION="$1" + MAFIA_TEMP=$(mktemp -d 2>/dev/null || mktemp -d -t 'exec_mafia') + MAFIA_FILE=mafia-$MAFIA_VERSION + MAFIA_PATH=$MAFIA_VERSIONS/$MAFIA_FILE + mkdir -p $MAFIA_VERSIONS + echo "Building $MAFIA_FILE in $MAFIA_TEMP" + git clone https://github.com/ambiata/mafia $MAFIA_TEMP + git --git-dir="$MAFIA_TEMP/.git" --work-tree="$MAFIA_TEMP" reset --hard $MAFIA_VERSION || { + echo "mafia version ($MAFIA_VERSION) could not be found." >&2 + exit 1 + } + (cd "$MAFIA_TEMP" && ./bin/bootstrap) || { + got=$? + echo "mafia version ($MAFIA_VERSION) could not be built." >&2 + exit "$got" + } + chmod +x "$MAFIA_TEMP/.cabal-sandbox/bin/mafia" + # Ensure executable is on same file-system so final mv is atomic. + mv -f "$MAFIA_TEMP/.cabal-sandbox/bin/mafia" "$MAFIA_PATH.$$" + mv "$MAFIA_PATH.$$" "$MAFIA_PATH" || { + rm -f "$MAFIA_PATH.$$" + echo "INFO: mafia version ($MAFIA_VERSION) already exists not overiding," >&2 + echo "INFO: this is expected if parallel builds of the same version of" >&2 + echo "INFO: mafia occur, we are playing by first in, wins." >&2 + exit 0 + } } -run_upgrade () { - MAFIA_TEMP=$(mktemp 2>/dev/null || mktemp -t 'upgrade_mafia') - - clean_up () { - rm -f "$MAFIA_TEMP" - } - - trap clean_up EXIT - - MAFIA_CUR="$0" - - if [ -L "$MAFIA_CUR" ]; then - echo 'Refusing to overwrite a symlink; run `upgrade` from the canonical path.' >&2 - exit 1 - fi - - echo "Checking for a new version of mafia ..." - fetch_latest > $MAFIA_TEMP - - LATEST_VERSION=$(latest_version) - echo "# Version: $LATEST_VERSION" >> $MAFIA_TEMP - - if ! cmp $MAFIA_CUR $MAFIA_TEMP >/dev/null 2>&1; then - mv $MAFIA_TEMP $MAFIA_CUR - chmod +x $MAFIA_CUR - echo "New version found and upgraded. You can now commit it to your git repo." - else - echo "You have latest mafia." - fi +enable_version() { + if [ $# -eq 0 ]; then + MAFIA_VERSION="$(latest_version)" + echo "INFO: No explicit mafia version requested installing latest ($MAFIA_VERSION)." >&2 + else + MAFIA_VERSION="$1" + fi + [ -x "$MAFIA_HOME/versions/mafia-$MAFIA_VERSION" ] || build_version "$MAFIA_VERSION" + ln -sf "$MAFIA_HOME/versions/mafia-$MAFIA_VERSION" "$MAFIA_HOME/versions/mafia" } exec_mafia () { - MAFIA_VERSION=$(local_version) - - if [ "x$MAFIA_VERSION" = "x" ]; then - # If we can't find the mafia version, then we need to upgrade the script. - run_upgrade - else - MAFIA_BIN=$MAFIA_HOME/bin - MAFIA_FILE=mafia-$MAFIA_VERSION - MAFIA_PATH=$MAFIA_BIN/$MAFIA_FILE - - [ -f "$MAFIA_PATH" ] || { - # Create a temporary directory which will be deleted when the script - # terminates. Unfortunately `mktemp` doesn't behave the same on - # Linux and OS/X so we need to try two different approaches. - MAFIA_TEMP=$(mktemp -d 2>/dev/null || mktemp -d -t 'exec_mafia') - - # Create a temporary file in MAFIA_BIN so we can do an atomic copy/move dance. - mkdir -p $MAFIA_BIN - - clean_up () { - rm -rf "$MAFIA_TEMP" - } - - trap clean_up EXIT - - echo "Building $MAFIA_FILE in $MAFIA_TEMP" - - ( cd "$MAFIA_TEMP" - - git clone https://github.com/ambiata/mafia - cd mafia - - git reset --hard $MAFIA_VERSION - - bin/bootstrap ) || exit $? - - MAFIA_PATH_TEMP=$(mktemp --tmpdir=$MAFIA_BIN $MAFIA_FILE-XXXXXX 2>/dev/null || TMPDIR=$MAFIA_BIN mktemp -t $MAFIA_FILE) - - clean_up_temp () { - clean_up - rm -f "$MAFIA_PATH_TEMP" - } - trap clean_up_temp EXIT - - cp "$MAFIA_TEMP/mafia/.cabal-sandbox/bin/mafia" "$MAFIA_PATH_TEMP" - chmod 755 "$MAFIA_PATH_TEMP" - mv "$MAFIA_PATH_TEMP" "$MAFIA_PATH" - - clean_up_temp - } - - exec $MAFIA_PATH "$@" - fi + [ -x "$MAFIA_HOME/versions/mafia" ] || enable_version + "$MAFIA_HOME/versions/mafia" "$@" } # # The actual start of the script..... # -if [ $# -gt 0 ]; then - MODE="$1" -else - MODE="" -fi - -case "$MODE" in -upgrade) shift; run_upgrade "$@" ;; +case "${1:-}" in +upgrade) shift; enable_version "$@" ;; *) exec_mafia "$@" esac -# Version: 7c6993f5ad2ac2a605cbc46cd5a108358b5e8c06 +# Version: b4c88e80a2f5539a56e966116743e55413a67bd2 diff --git a/master.toml b/master.toml index 38bdd8b..fe20f87 100644 --- a/master.toml +++ b/master.toml @@ -1,7 +1,7 @@ [master] version = 1 - runner = "s3://ambiata-dispensary-v2/dist/master/master-haskell/linux/x86_64/20161027043126-a807489/master-haskell-20161027043126-a807489" - sha1 = "51b26872b0be6c2f248f9bed3721dce09451c3f8" + runner = "s3://ambiata-dispensary-v2/dist/master/master-haskell/linux/x86_64/20170327040840-618a87f/master-haskell-20170327040840-618a87f" + sha1 = "9dab3a57a16a1707fa7920a9d86f1cbca1e3495c" [build.dist] GHC_VERSION="7.10.2" diff --git a/src/Regiment/IO.hs b/src/Regiment/IO.hs index 1f3ad26..7c3d9c0 100644 --- a/src/Regiment/IO.hs +++ b/src/Regiment/IO.hs @@ -11,8 +11,6 @@ import Control.Monad.IO.Class (liftIO) import Control.Monad.Trans.Resource (MonadResource (..)) import qualified Control.Monad.Trans.Resource as R -import Data.String (String) - import P import Regiment.Data @@ -28,29 +26,28 @@ import System.IO.Temp (withSystemTempDirectory) import X.Control.Monad.Trans.Either (EitherT, newEitherT, mapEitherT, runEitherT, firstEitherT) data RegimentIOError = - RegimentIOReadKeysFailed - | RegimentIOReadPastEOF - | RegimentIONullWrite - | RegimentIOBytestringParseFailed String - | RegimentIOUnpackFailed - | RegimentIOMinOfEmptyVector - | RegimentIOParseError RegimentParseError + RegimentIOParseError RegimentParseError | RegimentIOMergeError (RegimentMergeError RegimentMergeIOError) deriving (Eq, Show) renderRegimentIOError :: RegimentIOError -> Text -renderRegimentIOError _ = - "TODO" +renderRegimentIOError err = + case err of + RegimentIOParseError e -> + renderRegimentParseError e + RegimentIOMergeError e -> + renderRegimentMergeError renderRegimentMergeIOError e -regiment :: InputFile - -> Maybe OutputFile - -> [SortColumn] - -> FormatKind - -> Newline - -> NumColumns - -> Separator - -> MemoryLimit - -> EitherT RegimentIOError IO () +regiment :: + InputFile + -> Maybe OutputFile + -> [SortColumn] + -> FormatKind + -> Newline + -> NumColumns + -> Separator + -> MemoryLimit + -> EitherT RegimentIOError IO () regiment inn out sc f n nc sep m = do let fmt = @@ -66,9 +63,10 @@ regiment inn out sc f n nc sep m = do toTempFiles inn (TempDirectory tmp) fmt sc m firstT RegimentParseMergeError $ merge (TempDirectory tmp) out -merge :: TempDirectory - -> Maybe OutputFile - -> EitherT (RegimentMergeError RegimentMergeIOError) IO () +merge :: + TempDirectory + -> Maybe OutputFile + -> EitherT (RegimentMergeError RegimentMergeIOError) IO () merge (TempDirectory tmp) out = mapEitherT R.runResourceT $ do fs <- liftIO $ fmap (filter (flip notElem [".", ".."])) $ getDirectoryContents tmp handles <- mapM (open ReadMode) $ fmap (tmp ) fs diff --git a/src/Regiment/Parse.hs b/src/Regiment/Parse.hs index a60efaa..2d2f991 100644 --- a/src/Regiment/Parse.hs +++ b/src/Regiment/Parse.hs @@ -4,6 +4,7 @@ {-# LANGUAGE ScopedTypeVariables #-} module Regiment.Parse ( RegimentParseError (..) + , renderRegimentParseError , toTempFiles , selectSortKeys , writeCursor @@ -34,17 +35,29 @@ import System.IO (IO, IOMode (..)) import qualified System.IO as IO import System.FilePath (()) -import X.Control.Monad.Trans.Either (EitherT, left, newEitherT, runEitherT) +import X.Control.Monad.Trans.Either (EitherT, left, runEitherT, newEitherT, hoistEither) import qualified X.Data.Vector as Boxed import qualified X.Data.Vector.Grow as Grow data RegimentParseError = RegimentParseKeyNotFound | RegimentParseIONullWrite - | RegimentParseVectorToKPFailed (Boxed.Vector BS.ByteString) + | RegimentParseVectorToKPFailed Text | RegimentParseMergeError (RegimentMergeError RegimentMergeIOError) deriving (Eq, Show) +renderRegimentParseError :: RegimentParseError -> Text +renderRegimentParseError err = + case err of + RegimentParseKeyNotFound -> + "Regiment Parse Error: sort keys not found in vector of parsed fields" + RegimentParseIONullWrite -> + "Regiment Parse Error: write chunk called with a null vector" + RegimentParseVectorToKPFailed t -> + "Regiment Parse Error: Vector could not be converted to KeyedPayload. Reason: " <> t + RegimentParseMergeError e -> + renderRegimentMergeError renderRegimentMergeIOError e + toTempFiles :: InputFile -> TempDirectory @@ -118,10 +131,11 @@ selectSortKeys bytes parsed sortColumns = -- returns a vector consisting of keys and payload False -> Right $ (Boxed.fromList ks) Boxed.++ (Boxed.singleton bytes) -flushVector :: Grow.Grow Boxed.MVector (PrimState IO) (Boxed.Vector BS.ByteString) - -> Int - -> TempDirectory - -> EitherT RegimentParseError IO () +flushVector :: + Grow.Grow Boxed.MVector (PrimState IO) (Boxed.Vector BS.ByteString) + -> Int + -> TempDirectory + -> EitherT RegimentParseError IO () flushVector acc counter (TempDirectory tmp) = do mv <- Grow.unsafeElems acc Tim.sort mv @@ -132,22 +146,19 @@ flushVector acc counter (TempDirectory tmp) = do -- done using 'v' Grow.clear acc -writeChunk :: IO.Handle - -> Boxed.Vector (Boxed.Vector BS.ByteString) - -> EitherT RegimentParseError IO () +writeChunk :: + IO.Handle + -> Boxed.Vector (Boxed.Vector BS.ByteString) + -> EitherT RegimentParseError IO () writeChunk h vs = case Boxed.uncons vs of Nothing -> left RegimentParseIONullWrite Just (bs, tl) -> do - let - maybeKp = vecToKP bs - case maybeKp of - Nothing -> left $ RegimentParseVectorToKPFailed bs - Just kp -> do - liftIO $ writeCursor h kp - if Boxed.null tl - then return () - else writeChunk h tl + kp <- hoistEither $ vecToKP bs + liftIO $ writeCursor h kp + if Boxed.null tl + then return () + else writeChunk h tl writeCursor :: IO.Handle -> KeyedPayload -> IO () writeCursor h kp = do @@ -155,21 +166,23 @@ writeCursor h kp = do liftIO $ Builder.hPutBuilder h (bKeyedPayload kp) -vecToKP :: Boxed.Vector BS.ByteString -> Maybe KeyedPayload +vecToKP :: Boxed.Vector BS.ByteString -> Either RegimentParseError KeyedPayload vecToKP vbs = -- expected format of Vector - -- [k_1,k_2,...,k_n,payload] where k_i are sortkeys - -- expect at least one sortkey + -- [k_1,k_2,...,k_n,payload] where k_i are sort keys + -- expect n >= 1, thus length vbs >= 2 let l = Boxed.length vbs in - if l > 1 - then + case l of + 0 -> + Left . RegimentParseVectorToKPFailed $ "Empty vector" + 1 -> + Left . RegimentParseVectorToKPFailed $ "Singleton vector: " <> (T.pack . show $ Boxed.head vbs) + _ -> let p = Boxed.last vbs sks = Key <$> Boxed.take (l-1) vbs in - Just $ KeyedPayload sks p - else - Nothing + Right $ KeyedPayload sks p diff --git a/src/Regiment/Serial.hs b/src/Regiment/Serial.hs index 6832410..7faa695 100644 --- a/src/Regiment/Serial.hs +++ b/src/Regiment/Serial.hs @@ -45,6 +45,3 @@ getKeyedPayload = do KeyedPayload <$> getKeys (bcount - 1) <*> getSizedPrefixedBytes - - - diff --git a/src/Regiment/Vanguard/Base.hs b/src/Regiment/Vanguard/Base.hs index cdf27e4..4a4c00d 100644 --- a/src/Regiment/Vanguard/Base.hs +++ b/src/Regiment/Vanguard/Base.hs @@ -2,6 +2,7 @@ {-# LANGUAGE OverloadedStrings #-} module Regiment.Vanguard.Base ( RegimentMergeError (..) + , renderRegimentMergeError , readCursor , formVanguard , runVanguard @@ -23,38 +24,47 @@ data RegimentMergeError e = | RegimentMergeVanguardEmptyError deriving (Eq, Show) -readCursor :: Monad m - => (a -> EitherT x m (Maybe KeyedPayload)) - -> a - -> EitherT (RegimentMergeError x) m (Cursor a) +renderRegimentMergeError :: (e -> Text) -> (RegimentMergeError e) -> Text +renderRegimentMergeError render err = + case err of + RegimentMergeCursorError e -> + "Regiment Merge Error: failed to read cursor " <> (render e) + RegimentMergeVanguardEmptyError -> + "Regiment Merge Error: Cannot run an empty Vanguard." + +readCursor :: + Monad m + => (a -> EitherT x m (Maybe KeyedPayload)) + -> a + -> EitherT (RegimentMergeError x) m (Cursor a) readCursor reader a' = do bimapEitherT RegimentMergeCursorError (maybe EOF (NonEmpty a')) (reader a') - -formVanguard :: Monad m - => (a -> EitherT x m (Maybe KeyedPayload)) - -> [a] - -> EitherT (RegimentMergeError x) m (Vanguard a) +formVanguard :: + Monad m + => (a -> EitherT x m (Maybe KeyedPayload)) + -> [a] + -> EitherT (RegimentMergeError x) m (Vanguard a) formVanguard reader l = do v <- mapM (readCursor reader) l return . Vanguard $ DH.fromList v - -updateVanguard :: Monad m - => a - -> Vanguard a - -> (a -> EitherT x m (Maybe KeyedPayload)) - -> EitherT (RegimentMergeError x) m (Vanguard a) +updateVanguard :: + Monad m + => a + -> Vanguard a + -> (a -> EitherT x m (Maybe KeyedPayload)) + -> EitherT (RegimentMergeError x) m (Vanguard a) updateVanguard h v reader = do nextCursor <- readCursor reader h return . Vanguard . DH.insert nextCursor $ unVanguard v - -runVanguard :: Monad m - => Vanguard a - -> (a -> EitherT x m (Maybe KeyedPayload)) - -> (BS.ByteString -> m ()) - -> EitherT (RegimentMergeError x) m () +runVanguard :: + Monad m + => Vanguard a + -> (a -> EitherT x m (Maybe KeyedPayload)) + -> (BS.ByteString -> m ()) + -> EitherT (RegimentMergeError x) m () runVanguard (Vanguard v) reader writer = do when (DH.null v) $ left $ RegimentMergeVanguardEmptyError diff --git a/src/Regiment/Vanguard/IO.hs b/src/Regiment/Vanguard/IO.hs index 557a8cf..fd7e7ff 100644 --- a/src/Regiment/Vanguard/IO.hs +++ b/src/Regiment/Vanguard/IO.hs @@ -2,6 +2,7 @@ {-# LANGUAGE OverloadedStrings #-} module Regiment.Vanguard.IO ( RegimentMergeIOError (..) + , renderRegimentMergeIOError , readCursorIO , formVanguardIO , readKeyedPayloadIO @@ -15,6 +16,7 @@ import qualified Data.ByteString as BS import Data.ByteString.Internal (ByteString(..)) import qualified Data.ByteString.Lazy as Lazy import Data.String (String) +import qualified Data.Text as T import Foreign.Storable (Storable(..)) import Foreign.ForeignPtr (withForeignPtr) @@ -35,9 +37,18 @@ data RegimentMergeIOError = | RegimentMergeIOByteStringConversionFailed String deriving (Eq, Show) -readKeyedPayloadIO :: MonadIO m - => IO.Handle - -> EitherT RegimentMergeIOError m (Maybe KeyedPayload) +renderRegimentMergeIOError :: RegimentMergeIOError -> Text +renderRegimentMergeIOError err = + case err of + RegimentMergeIOReadKeysFailed -> + "Regiment Merge IO Error: Failed to read keys from temp file." + RegimentMergeIOByteStringConversionFailed s -> + "Regiment Merge IO Error: Failed to extract KeyedPayload from ByteString. Error: " <> T.pack s + +readKeyedPayloadIO :: + MonadIO m + => IO.Handle + -> EitherT RegimentMergeIOError m (Maybe KeyedPayload) readKeyedPayloadIO h = do isEOF <- liftIO $ IO.hIsEOF h if isEOF @@ -53,22 +64,23 @@ readKeyedPayloadIO h = do Left e -> left e Right kp -> return $ Just kp -readCursorIO :: MonadIO m - => IO.Handle - -> EitherT (RegimentMergeError RegimentMergeIOError) m (Cursor Handle) +readCursorIO :: + MonadIO m + => IO.Handle + -> EitherT (RegimentMergeError RegimentMergeIOError) m (Cursor Handle) readCursorIO h = readCursor readKeyedPayloadIO h -runVanguardIO :: Vanguard Handle - -> Handle - -> EitherT (RegimentMergeError RegimentMergeIOError) IO () +runVanguardIO :: + Vanguard Handle + -> Handle + -> EitherT (RegimentMergeError RegimentMergeIOError) IO () runVanguardIO v out = runVanguard v readKeyedPayloadIO (BS.hPut out) -formVanguardIO :: [Handle] - -> EitherT - (RegimentMergeError RegimentMergeIOError) - IO (Vanguard Handle) +formVanguardIO :: + [Handle] + -> EitherT (RegimentMergeError RegimentMergeIOError) IO (Vanguard Handle) formVanguardIO handles = do formVanguard readKeyedPayloadIO handles @@ -87,4 +99,3 @@ bsToKP bs = Left $ RegimentMergeIOByteStringConversionFailed e Right (_, _, x) -> Right x - diff --git a/src/Regiment/Vanguard/List.hs b/src/Regiment/Vanguard/List.hs index f30efe3..7c8cdc6 100644 --- a/src/Regiment/Vanguard/List.hs +++ b/src/Regiment/Vanguard/List.hs @@ -21,8 +21,7 @@ import Regiment.Vanguard.Base import X.Control.Monad.Trans.Either (EitherT, runEitherT) -readCursorList :: ST.STRef s [KeyedPayload] - -> EitherT x (ST s) (Maybe KeyedPayload) +readCursorList :: ST.STRef s [KeyedPayload] -> EitherT x (ST s) (Maybe KeyedPayload) readCursorList lkp = do kps <- lift $ ST.readSTRef lkp case kps of @@ -35,11 +34,10 @@ writePayloadToList :: ST.STRef s [BS.ByteString] -> BS.ByteString -> ST s () writePayloadToList lst p = ST.modifySTRef lst (\lp -> lp <> [p]) -formVanguardList :: (Monad (ST s)) - => [ST.STRef s [KeyedPayload]] - -> EitherT - (RegimentMergeError x) - (ST s) (Vanguard (ST.STRef s [KeyedPayload])) +formVanguardList :: + Monad (ST s) + => [ST.STRef s [KeyedPayload]] + -> EitherT (RegimentMergeError x) (ST s) (Vanguard (ST.STRef s [KeyedPayload])) formVanguardList strkps = formVanguard readCursorList strkps diff --git a/test/Test/Regiment/Arbitrary.hs b/test/Test/Regiment/Arbitrary.hs index 8e0c6a9..9b2cd50 100644 --- a/test/Test/Regiment/Arbitrary.hs +++ b/test/Test/Regiment/Arbitrary.hs @@ -128,4 +128,9 @@ genNonNullSeparator = genRestrictedFormat :: Separator -> Jack Format genRestrictedFormat sep = - arbitrary `suchThat` (\fmt -> (formatColumnCount fmt) > 0 && (formatKind fmt) == Delimited && (formatSeparator fmt) == sep && (formatNewline fmt) == LF) + arbitrary `suchThat` (\fmt -> + (formatColumnCount fmt) > 0 + && (formatKind fmt) == Delimited + && (formatSeparator fmt) == sep + && (formatNewline fmt) == LF + )