Skip to content

Commit

Permalink
Merge pull request #21 from ambiata/topic/splits
Browse files Browse the repository at this point in the history
Implements split and merge commands.
  • Loading branch information
novemberkilo authored Apr 4, 2017
2 parents 6beb19f + 329ddd7 commit 5c37109
Show file tree
Hide file tree
Showing 10 changed files with 363 additions and 144 deletions.
43 changes: 42 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ A command line tool for sorting standardized separated files
## Usage
--------

`sort` a standardized separated file.

```
# specify:
# column to sort on (mandatory)
Expand Down Expand Up @@ -48,4 +50,43 @@ regiment sort -k 5 -c 15 -f ',' -o "path/to/output-file" input-file
regiment sort -f ',' -k 1 -k 4 -k 5 -c 26 -m 10G --crlf --standardized -o "path/to/output-file" input-file
```

Note: `regiment` requires local storage roughly equivalent to the size of the inputs, and follows unix `TMPDIR` conventions for that storage.
`split` a standardized separated file into a set of temporary files, each of which is sorted,
and is in regiment's [binary format](doc/temp-file-format.md)

```
# specify:
# same options as for sort (except for --output)
# a directory within which to write the sorted splits (mandatory)
# NOTE: this directory must not exist, it will be created for you
regiment split <same opts as sort> --dir "path/to/output-dir" input-file
regiment split <same opts as sort> -d "path/to/output-dir" input-file
```

Given the format of an input standardized separated file, merge a set of sorted temporary files
(in regiment's [binary format](doc/temp-file-format.md)) into an output-file (that has the same format
as the input standardized separated file).

```
# specify:
# directories containing sorted splits that require merging (typically outputs of running split)
# output file (optional) -- defaults to stdout
regiment merge-tmps dir1 dir2 ... dirn
# explicity specify path to output file -- defaults to stdout
regiment merge-tmps --output "path/to/output-file" dir1 dir2 ... dirn
regiment merge-tmps -o "path/to/output-file" dir1 dir2 ... dirn
```

Relationship between `sort`, `split` and `merge-tmps`

```
regiment sort -k 1 -c 5 -f ',' --standardized input-file
generates the same output as
regiment split -k 1 -c 5 -f ',' --standardized -d "/foo/bar/baz" input-file
regiment merge-tmps "/foo/bar/baz"
```

Note: `regiment` requires local storage roughly equivalent to the size of the inputs,
and follows unix `TMPDIR` conventions for that storage.
1 change: 1 addition & 0 deletions ambiata-regiment.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ library
, binary >= 0.7 && < 0.9
, bytestring == 0.10.*
, directory == 1.2.*
, exceptions >= 0.6 && < 0.9
, filepath == 1.3.*
, heaps == 0.3.*
, primitive == 0.6.*
Expand Down
60 changes: 49 additions & 11 deletions main/regiment.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import DependencyInfo_ambiata_regiment

import qualified Data.Attoparsec.Text as A
import Data.Char (ord)
import Data.Text as T
import qualified Data.Text as T
import Data.Word (Word8)

import Options.Applicative
Expand All @@ -33,22 +33,44 @@ main = do
SortCommand inn out nc sc sep m f n ->
orDie renderRegimentIOError $
regiment inn out sc f n nc sep m
SplitCommand inn nc sc sep m f n tmp ->
orDie renderRegimentIOError $ do
createDirectory tmp
firstT RegimentIOParseError $
split inn tmp sc f n nc sep m
MergeCommand out dirs ->
orDie renderRegimentIOError $
mergeDirs dirs out

parser :: Parser Command
parser =
subparser $
command' "sort" "Sort input file based on sort column(s)."
(SortCommand <$> inputFileP
<*> optional outputP
<*> numColumnsP
<*> some sortColumnP
<*> separatorP
<*> memP
<*> formatP
<*> newlineP)
command' "sort" "Sort input file based on sort column(s)."
(SortCommand <$> inputFileP
<*> optional outputP
<*> numColumnsP
<*> some sortColumnP
<*> separatorP
<*> memP
<*> formatP
<*> newlineP)
<> command' "split" "Split input file into sorted chunks (intermediate state of sort)."
(SplitCommand <$> inputFileP
<*> numColumnsP
<*> some sortColumnP
<*> separatorP
<*> memP
<*> formatP
<*> newlineP
<*> tempDirectoryArgP)
<> command' "merge-tmps" "Merge sorted temp files (output of split)."
(MergeCommand <$> optional outputP
<*> some tempDirectoryP)

data Command =
SortCommand InputFile (Maybe OutputFile) NumColumns [SortColumn] Separator MemoryLimit FormatKind Newline
SortCommand InputFile (Maybe OutputFile) NumColumns [SortColumn] Separator MemoryLimit FormatKind Newline
| SplitCommand InputFile NumColumns [SortColumn] Separator MemoryLimit FormatKind Newline TempDirectory
| MergeCommand (Maybe OutputFile) [TempDirectory]
deriving (Eq, Show)

inputFileP :: Parser InputFile
Expand All @@ -65,6 +87,22 @@ outputP = OutputFile <$> (strOption $
<> metavar "FILE"
<> help "Optional path to output file -- defaults to stdout.")

tempDirectoryP :: Parser TempDirectory
tempDirectoryP =
fmap TempDirectory . strArgument . mconcat $ [
metavar "TMP_DIRECTORY"
, help "Path to directory containing intermediate sorted files."
]

tempDirectoryArgP :: Parser TempDirectory
tempDirectoryArgP =
fmap TempDirectory . strOption . mconcat $ [
long "dir"
, short 'd'
, metavar "TMP_DIRECTORY"
, help "Path to directory to write out intermediate sorted files."
]

sortColumnP :: Parser SortColumn
sortColumnP =
fmap (SortColumn . (\k -> k - 1)) . option auto . mconcat $ [
Expand Down
2 changes: 1 addition & 1 deletion src/Regiment/Data.hs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ countKeyedPayload kp =
-- └─────────┴─────────┴─────────┘

data Cursor a =
NonEmpty a KeyedPayload
NonEmpty !a !KeyedPayload
| EOF
deriving (Show)

Expand Down
84 changes: 71 additions & 13 deletions src/Regiment/IO.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,42 @@
{-# LANGUAGE OverloadedStrings #-}
module Regiment.IO (
RegimentIOError (..)
, mergeDirs
, regiment
, renderRegimentIOError
, split
, open
, createDirectory
) where

import Control.Monad.IO.Class (liftIO)
import Control.Exception (SomeException)
import Control.Monad.Catch (MonadCatch)
import Control.Monad.IO.Class (liftIO, MonadIO)
import Control.Monad.Trans.Resource (MonadResource (..))
import qualified Control.Monad.Trans.Resource as R

import qualified Data.Text as T

import P

import Regiment.Data
import Regiment.Parse
import Regiment.Vanguard.Base
import Regiment.Vanguard.IO

import System.Directory (getDirectoryContents)
import qualified System.Directory as SD
import System.FilePath ((</>))
import System.IO (IO, stdout, IOMode (..), FilePath, Handle, hClose, openBinaryFile)
import System.IO (IO, stdout, IOMode (..), FilePath, Handle, hClose, openBinaryFile, hFlush)
import System.IO.Temp (withSystemTempDirectory)

import X.Control.Monad.Trans.Either (EitherT, newEitherT, mapEitherT, runEitherT, firstEitherT)
import X.Control.Monad.Trans.Either (EitherT, newEitherT, mapEitherT, left)
import X.Control.Monad.Trans.Either (runEitherT, firstEitherT, tryEitherT)

data RegimentIOError =
RegimentIOParseError RegimentParseError
| RegimentIOMergeError (RegimentMergeError RegimentMergeIOError)
| RegimentIOCreateDirectoryError TempDirectory Text
| RegimentIOTmpDirExistsError TempDirectory
deriving (Eq, Show)

renderRegimentIOError :: RegimentIOError -> Text
Expand All @@ -37,6 +47,15 @@ renderRegimentIOError err =
renderRegimentParseError e
RegimentIOMergeError e ->
renderRegimentMergeError renderRegimentMergeIOError e
RegimentIOCreateDirectoryError (TempDirectory tmp) t ->
"Failed to create directory "
<> T.pack tmp
<> "Error: " <> t
RegimentIOTmpDirExistsError (TempDirectory tmp) ->
"Failed to create directory "
<> T.pack tmp
<> "\n"
<> "Target directory should not exist, it will be created and filled."

regiment ::
InputFile
Expand All @@ -49,6 +68,24 @@ regiment ::
-> MemoryLimit
-> EitherT RegimentIOError IO ()
regiment inn out sc f n nc sep m = do
firstEitherT RegimentIOParseError . newEitherT $
withSystemTempDirectory "regiment." $ \tmp ->
R.runResourceT . runEitherT $ do
mapEitherT liftIO $ split inn (TempDirectory tmp) sc f n nc sep m
handles <- openDir (TempDirectory tmp)
mapEitherT liftIO . firstT RegimentParseMergeError $ merge handles out

split ::
InputFile
-> TempDirectory
-> [SortColumn]
-> FormatKind
-> Newline
-> NumColumns
-> Separator
-> MemoryLimit
-> EitherT RegimentParseError IO ()
split inn tmp sc f n nc sep m = do
let
fmt =
Format {
Expand All @@ -57,26 +94,47 @@ regiment inn out sc f n nc sep m = do
, formatColumnCount = numColumns nc
, formatKind = f
}
firstEitherT RegimentIOParseError . newEitherT $
withSystemTempDirectory "regiment." $ \tmp ->
runEitherT $ do
toTempFiles inn (TempDirectory tmp) fmt sc m
firstT RegimentParseMergeError $ merge (TempDirectory tmp) out
toTempFiles inn tmp fmt sc m

mergeDirs ::
[TempDirectory]
-> Maybe OutputFile
-> EitherT RegimentIOError IO ()
mergeDirs dirs out = firstT RegimentIOMergeError $
mapEitherT R.runResourceT $ do
handles <- mapM openDir dirs
mapEitherT liftIO $ merge (concat handles) out

merge ::
TempDirectory
[Handle]
-> Maybe OutputFile
-> EitherT (RegimentMergeError RegimentMergeIOError) IO ()
merge (TempDirectory tmp) out = mapEitherT R.runResourceT $ do
fs <- liftIO $ fmap (filter (flip notElem [".", ".."])) $ getDirectoryContents tmp
handles <- mapM (open ReadMode) $ fmap (tmp </>) fs
merge handles out = mapEitherT R.runResourceT $ do
v <- mapEitherT liftIO $ formVanguardIO handles
out' <- case out of
Just (OutputFile o) -> open WriteMode o
Nothing -> return stdout
mapEitherT liftIO $ runVanguardIO v out'
liftIO $ hFlush out'

openDir :: MonadResource m => TempDirectory -> m [Handle]
openDir (TempDirectory tmp) = do
fs <- liftIO $ fmap (filter (flip notElem [".", ".."])) $ SD.getDirectoryContents tmp
handles <- mapM (open ReadMode) $ fmap (tmp </>) fs
return handles

open :: MonadResource m => IOMode -> FilePath -> m Handle
open m f = do
snd <$> R.allocate (openBinaryFile f m) hClose

createDirectory :: (MonadIO m, MonadCatch m) => TempDirectory -> EitherT RegimentIOError m ()
createDirectory t@(TempDirectory tmp) = do
exists <- liftIO $ SD.doesDirectoryExist tmp
if exists
then
left $ RegimentIOTmpDirExistsError t
else
tryEitherT handler . liftIO $ SD.createDirectoryIfMissing True tmp
where
handler :: SomeException -> RegimentIOError
handler e = RegimentIOCreateDirectoryError t $ T.pack (show e)
18 changes: 10 additions & 8 deletions src/Regiment/Parse.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import qualified Data.Vector.Algorithms.Tim as Tim
import P

import qualified Parsley.Xsv.Parser as Parsley
import qualified Parsley.Xsv.Render as Parsley

import Regiment.Data
import Regiment.Serial
Expand Down Expand Up @@ -93,8 +94,7 @@ toTempFiles (InputFile inn) tmpDir f sc (MemoryLimit cap) = do
>> go counter (drops + 1) partNum memCounter rest)
(Parsley.Success $ \rest fields ->
let
bytesParsed = BS.take (BS.length bytes - BS.length rest) bytes
sko = selectSortKeys bytesParsed (Parsley.getFields fields) sc
sko = selectSortKeys fields f sc
in
case sko of
Left _ ->
Expand All @@ -116,20 +116,22 @@ toTempFiles (InputFile inn) tmpDir f sc (MemoryLimit cap) = do
liftIO (BS.hGetSome h innChunkSize) >>= go (0 :: Int) (0 :: Int) (0 :: Int) (0 :: Int)

selectSortKeys ::
BS.ByteString
-> (Boxed.Vector BS.ByteString)
Parsley.Fields
-> Format
-> [SortColumn]
-> Either RegimentParseError (Boxed.Vector BS.ByteString)
selectSortKeys bytes parsed sortColumns =
selectSortKeys fields fmt sortColumns =
let
parsed = Parsley.getFields fields
unparsed = Parsley.renderRow fmt parsed <> (Parsley.renderNewline $ formatNewline fmt)
maybeSortkeys = L.map (\sc -> parsed Boxed.!? (sortColumn sc)) sortColumns
ks = DM.catMaybes maybeSortkeys
keyNotFound = and $ L.map isNothing maybeSortkeys
in do
in
case keyNotFound of
True -> Left RegimentParseKeyNotFound
-- returns a vector consisting of keys and payload
False -> Right $ (Boxed.fromList ks) Boxed.++ (Boxed.singleton bytes)
False -> Right $ (Boxed.fromList ks) Boxed.++ (Boxed.singleton unparsed)

flushVector ::
Grow.Grow Boxed.MVector (PrimState IO) (Boxed.Vector BS.ByteString)
Expand Down Expand Up @@ -157,7 +159,7 @@ writeChunk h vs =
kp <- hoistEither $ vecToKP bs
liftIO $ writeCursor h kp
if Boxed.null tl
then return ()
then liftIO $ IO.hFlush h >> return ()
else writeChunk h tl

writeCursor :: IO.Handle -> KeyedPayload -> IO ()
Expand Down
Loading

0 comments on commit 5c37109

Please sign in to comment.