{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE CPP #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
module Data.Conduit.Process
(
sourceCmdWithConsumer
, sourceProcessWithConsumer
, sourceCmdWithStreams
, sourceProcessWithStreams
, withCheckedProcessCleanup
, FlushInput(..)
, BuilderInput(..)
, module Data.Streaming.Process
) where
import Data.Streaming.Process
import Data.Streaming.Process.Internal
import System.Exit (ExitCode (..))
import Control.Monad.IO.Unlift (MonadIO, liftIO, MonadUnliftIO, withRunInIO, withUnliftIO, unliftIO)
import System.IO (hClose, BufferMode (NoBuffering), hSetBuffering)
import Data.Conduit
import Data.Functor (($>))
import Data.Conduit.Binary (sourceHandle, sinkHandle, sinkHandleBuilder, sinkHandleFlush)
import Data.ByteString (ByteString)
import Data.ByteString.Builder (Builder)
import Control.Concurrent.Async (runConcurrently, Concurrently(..))
import Control.Exception (onException, throwIO, finally, bracket)
#if (__GLASGOW_HASKELL__ < 710)
import Control.Applicative ((<$>), (<*>))
#endif
instance (r ~ (), MonadIO m, i ~ ByteString) => InputSource (ConduitM i o m r) where
isStdStream :: (Maybe Handle -> IO (ConduitM i o m r), Maybe StdStream)
isStdStream = (\(Just Handle
h) -> Handle -> BufferMode -> IO ()
hSetBuffering Handle
h BufferMode
NoBuffering IO ()
-> ConduitT ByteString o m () -> IO (ConduitT ByteString o m ())
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Handle -> ConduitT ByteString o m ()
forall (m :: * -> *) o.
MonadIO m =>
Handle -> ConduitT ByteString o m ()
sinkHandle Handle
h, StdStream -> Maybe StdStream
forall a. a -> Maybe a
Just StdStream
CreatePipe)
instance (r ~ (), r' ~ (), MonadIO m, MonadIO n, i ~ ByteString) => InputSource (ConduitM i o m r, n r') where
isStdStream :: (Maybe Handle -> IO (ConduitM i o m r, n r'), Maybe StdStream)
isStdStream = (\(Just Handle
h) -> Handle -> BufferMode -> IO ()
hSetBuffering Handle
h BufferMode
NoBuffering IO ()
-> (ConduitT ByteString o m (), n ())
-> IO (ConduitT ByteString o m (), n ())
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> (Handle -> ConduitT ByteString o m ()
forall (m :: * -> *) o.
MonadIO m =>
Handle -> ConduitT ByteString o m ()
sinkHandle Handle
h, IO () -> n ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> n ()) -> IO () -> n ()
forall a b. (a -> b) -> a -> b
$ Handle -> IO ()
hClose Handle
h), StdStream -> Maybe StdStream
forall a. a -> Maybe a
Just StdStream
CreatePipe)
newtype BuilderInput o m r = BuilderInput (ConduitM Builder o m r)
newtype FlushInput o m r = FlushInput (ConduitM (Flush ByteString) o m r)
instance (MonadIO m, r ~ ()) => InputSource (BuilderInput o m r) where
isStdStream :: (Maybe Handle -> IO (BuilderInput o m r), Maybe StdStream)
isStdStream = (\(Just Handle
h) -> BuilderInput o m () -> IO (BuilderInput o m ())
forall (m :: * -> *) a. Monad m => a -> m a
return (BuilderInput o m () -> IO (BuilderInput o m ()))
-> BuilderInput o m () -> IO (BuilderInput o m ())
forall a b. (a -> b) -> a -> b
$ ConduitM Builder o m () -> BuilderInput o m ()
forall o (m :: * -> *) r.
ConduitM Builder o m r -> BuilderInput o m r
BuilderInput (ConduitM Builder o m () -> BuilderInput o m ())
-> ConduitM Builder o m () -> BuilderInput o m ()
forall a b. (a -> b) -> a -> b
$ Handle -> ConduitM Builder o m ()
forall (m :: * -> *) o.
MonadIO m =>
Handle -> ConduitM Builder o m ()
sinkHandleBuilder Handle
h, StdStream -> Maybe StdStream
forall a. a -> Maybe a
Just StdStream
CreatePipe)
instance (MonadIO m, MonadIO n, r ~ (), r' ~ ()) => InputSource (BuilderInput o m r, n r') where
isStdStream :: (Maybe Handle -> IO (BuilderInput o m r, n r'), Maybe StdStream)
isStdStream = (\(Just Handle
h) -> (BuilderInput o m (), n ()) -> IO (BuilderInput o m (), n ())
forall (m :: * -> *) a. Monad m => a -> m a
return (ConduitM Builder o m () -> BuilderInput o m ()
forall o (m :: * -> *) r.
ConduitM Builder o m r -> BuilderInput o m r
BuilderInput (ConduitM Builder o m () -> BuilderInput o m ())
-> ConduitM Builder o m () -> BuilderInput o m ()
forall a b. (a -> b) -> a -> b
$ Handle -> ConduitM Builder o m ()
forall (m :: * -> *) o.
MonadIO m =>
Handle -> ConduitM Builder o m ()
sinkHandleBuilder Handle
h, IO () -> n ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> n ()) -> IO () -> n ()
forall a b. (a -> b) -> a -> b
$ Handle -> IO ()
hClose Handle
h), StdStream -> Maybe StdStream
forall a. a -> Maybe a
Just StdStream
CreatePipe)
instance (MonadIO m, r ~ ()) => InputSource (FlushInput o m r) where
isStdStream :: (Maybe Handle -> IO (FlushInput o m r), Maybe StdStream)
isStdStream = (\(Just Handle
h) -> FlushInput o m () -> IO (FlushInput o m ())
forall (m :: * -> *) a. Monad m => a -> m a
return (FlushInput o m () -> IO (FlushInput o m ()))
-> FlushInput o m () -> IO (FlushInput o m ())
forall a b. (a -> b) -> a -> b
$ ConduitM (Flush ByteString) o m () -> FlushInput o m ()
forall o (m :: * -> *) r.
ConduitM (Flush ByteString) o m r -> FlushInput o m r
FlushInput (ConduitM (Flush ByteString) o m () -> FlushInput o m ())
-> ConduitM (Flush ByteString) o m () -> FlushInput o m ()
forall a b. (a -> b) -> a -> b
$ Handle -> ConduitM (Flush ByteString) o m ()
forall (m :: * -> *) o.
MonadIO m =>
Handle -> ConduitM (Flush ByteString) o m ()
sinkHandleFlush Handle
h, StdStream -> Maybe StdStream
forall a. a -> Maybe a
Just StdStream
CreatePipe)
instance (MonadIO m, MonadIO n, r ~ (), r' ~ ()) => InputSource (FlushInput o m r, n r') where
isStdStream :: (Maybe Handle -> IO (FlushInput o m r, n r'), Maybe StdStream)
isStdStream = (\(Just Handle
h) -> (FlushInput o m (), n ()) -> IO (FlushInput o m (), n ())
forall (m :: * -> *) a. Monad m => a -> m a
return (ConduitM (Flush ByteString) o m () -> FlushInput o m ()
forall o (m :: * -> *) r.
ConduitM (Flush ByteString) o m r -> FlushInput o m r
FlushInput (ConduitM (Flush ByteString) o m () -> FlushInput o m ())
-> ConduitM (Flush ByteString) o m () -> FlushInput o m ()
forall a b. (a -> b) -> a -> b
$ Handle -> ConduitM (Flush ByteString) o m ()
forall (m :: * -> *) o.
MonadIO m =>
Handle -> ConduitM (Flush ByteString) o m ()
sinkHandleFlush Handle
h, IO () -> n ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> n ()) -> IO () -> n ()
forall a b. (a -> b) -> a -> b
$ Handle -> IO ()
hClose Handle
h), StdStream -> Maybe StdStream
forall a. a -> Maybe a
Just StdStream
CreatePipe)
instance (r ~ (), MonadIO m, o ~ ByteString) => OutputSink (ConduitM i o m r) where
osStdStream :: (Maybe Handle -> IO (ConduitM i o m r), Maybe StdStream)
osStdStream = (\(Just Handle
h) -> Handle -> BufferMode -> IO ()
hSetBuffering Handle
h BufferMode
NoBuffering IO ()
-> ConduitT i ByteString m () -> IO (ConduitT i ByteString m ())
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Handle -> ConduitT i ByteString m ()
forall (m :: * -> *) i.
MonadIO m =>
Handle -> ConduitT i ByteString m ()
sourceHandle Handle
h, StdStream -> Maybe StdStream
forall a. a -> Maybe a
Just StdStream
CreatePipe)
instance (r ~ (), r' ~ (), MonadIO m, MonadIO n, o ~ ByteString) => OutputSink (ConduitM i o m r, n r') where
osStdStream :: (Maybe Handle -> IO (ConduitM i o m r, n r'), Maybe StdStream)
osStdStream = (\(Just Handle
h) -> Handle -> BufferMode -> IO ()
hSetBuffering Handle
h BufferMode
NoBuffering IO ()
-> (ConduitT i ByteString m (), n ())
-> IO (ConduitT i ByteString m (), n ())
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> (Handle -> ConduitT i ByteString m ()
forall (m :: * -> *) i.
MonadIO m =>
Handle -> ConduitT i ByteString m ()
sourceHandle Handle
h, IO () -> n ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> n ()) -> IO () -> n ()
forall a b. (a -> b) -> a -> b
$ Handle -> IO ()
hClose Handle
h), StdStream -> Maybe StdStream
forall a. a -> Maybe a
Just StdStream
CreatePipe)
sourceProcessWithConsumer :: MonadIO m
=> CreateProcess
-> ConduitT ByteString Void m a
-> m (ExitCode, a)
sourceProcessWithConsumer :: CreateProcess -> ConduitT ByteString Void m a -> m (ExitCode, a)
sourceProcessWithConsumer CreateProcess
cp ConduitT ByteString Void m a
consumer = do
(ClosedStream
ClosedStream, (ConduitT () ByteString m ()
source, m ()
close), ClosedStream
ClosedStream, StreamingProcessHandle
cph) <- CreateProcess
-> m (ClosedStream, (ConduitT () ByteString m (), m ()),
ClosedStream, StreamingProcessHandle)
forall (m :: * -> *) stdin stdout stderr.
(MonadIO m, InputSource stdin, OutputSink stdout,
OutputSink stderr) =>
CreateProcess -> m (stdin, stdout, stderr, StreamingProcessHandle)
streamingProcess CreateProcess
cp
a
res <- ConduitT () Void m a -> m a
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void m a -> m a) -> ConduitT () Void m a -> m a
forall a b. (a -> b) -> a -> b
$ ConduitT () ByteString m ()
source ConduitT () ByteString m ()
-> ConduitT ByteString Void m a -> ConduitT () Void m a
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT ByteString Void m a
consumer
m ()
close
ExitCode
ec <- StreamingProcessHandle -> m ExitCode
forall (m :: * -> *).
MonadIO m =>
StreamingProcessHandle -> m ExitCode
waitForStreamingProcess StreamingProcessHandle
cph
(ExitCode, a) -> m (ExitCode, a)
forall (m :: * -> *) a. Monad m => a -> m a
return (ExitCode
ec, a
res)
sourceCmdWithConsumer :: MonadIO m
=> String
-> ConduitT ByteString Void m a
-> m (ExitCode, a)
sourceCmdWithConsumer :: String -> ConduitT ByteString Void m a -> m (ExitCode, a)
sourceCmdWithConsumer String
cmd = CreateProcess -> ConduitT ByteString Void m a -> m (ExitCode, a)
forall (m :: * -> *) a.
MonadIO m =>
CreateProcess -> ConduitT ByteString Void m a -> m (ExitCode, a)
sourceProcessWithConsumer (String -> CreateProcess
shell String
cmd)
sourceProcessWithStreams
:: MonadUnliftIO m
=> CreateProcess
-> ConduitT () ByteString m ()
-> ConduitT ByteString Void m a
-> ConduitT ByteString Void m b
-> m (ExitCode, a, b)
sourceProcessWithStreams :: CreateProcess
-> ConduitT () ByteString m ()
-> ConduitT ByteString Void m a
-> ConduitT ByteString Void m b
-> m (ExitCode, a, b)
sourceProcessWithStreams CreateProcess
cp ConduitT () ByteString m ()
producerStdin ConduitT ByteString Void m a
consumerStdout ConduitT ByteString Void m b
consumerStderr =
(UnliftIO m -> IO (ExitCode, a, b)) -> m (ExitCode, a, b)
forall (m :: * -> *) a.
MonadUnliftIO m =>
(UnliftIO m -> IO a) -> m a
withUnliftIO ((UnliftIO m -> IO (ExitCode, a, b)) -> m (ExitCode, a, b))
-> (UnliftIO m -> IO (ExitCode, a, b)) -> m (ExitCode, a, b)
forall a b. (a -> b) -> a -> b
$ \UnliftIO m
u -> do
( (ConduitT ByteString Void m ()
sinkStdin, IO ()
closeStdin)
, (ConduitT () ByteString m ()
sourceStdout, IO ()
closeStdout)
, (ConduitT () ByteString m ()
sourceStderr, IO ()
closeStderr)
, StreamingProcessHandle
sph) <- CreateProcess
-> IO
((ConduitT ByteString Void m (), IO ()),
(ConduitT () ByteString m (), IO ()),
(ConduitT () ByteString m (), IO ()), StreamingProcessHandle)
forall (m :: * -> *) stdin stdout stderr.
(MonadIO m, InputSource stdin, OutputSink stdout,
OutputSink stderr) =>
CreateProcess -> m (stdin, stdout, stderr, StreamingProcessHandle)
streamingProcess CreateProcess
cp
(()
_, a
resStdout, b
resStderr) <-
Concurrently ((), a, b) -> IO ((), a, b)
forall a. Concurrently a -> IO a
runConcurrently (
(,,)
(() -> a -> b -> ((), a, b))
-> Concurrently () -> Concurrently (a -> b -> ((), a, b))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO () -> Concurrently ()
forall a. IO a -> Concurrently a
Concurrently ((UnliftIO m -> forall a. m a -> IO a
forall (m :: * -> *). UnliftIO m -> forall a. m a -> IO a
unliftIO UnliftIO m
u (m () -> IO ()) -> m () -> IO ()
forall a b. (a -> b) -> a -> b
$ ConduitT () Void m () -> m ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void m () -> m ()) -> ConduitT () Void m () -> m ()
forall a b. (a -> b) -> a -> b
$ ConduitT () ByteString m ()
producerStdin ConduitT () ByteString m ()
-> ConduitT ByteString Void m () -> ConduitT () Void m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT ByteString Void m ()
sinkStdin) IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`finally` IO ()
closeStdin)
Concurrently (a -> b -> ((), a, b))
-> Concurrently a -> Concurrently (b -> ((), a, b))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> IO a -> Concurrently a
forall a. IO a -> Concurrently a
Concurrently (UnliftIO m -> forall a. m a -> IO a
forall (m :: * -> *). UnliftIO m -> forall a. m a -> IO a
unliftIO UnliftIO m
u (m a -> IO a) -> m a -> IO a
forall a b. (a -> b) -> a -> b
$ ConduitT () Void m a -> m a
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void m a -> m a) -> ConduitT () Void m a -> m a
forall a b. (a -> b) -> a -> b
$ ConduitT () ByteString m ()
sourceStdout ConduitT () ByteString m ()
-> ConduitT ByteString Void m a -> ConduitT () Void m a
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT ByteString Void m a
consumerStdout)
Concurrently (b -> ((), a, b))
-> Concurrently b -> Concurrently ((), a, b)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> IO b -> Concurrently b
forall a. IO a -> Concurrently a
Concurrently (UnliftIO m -> forall a. m a -> IO a
forall (m :: * -> *). UnliftIO m -> forall a. m a -> IO a
unliftIO UnliftIO m
u (m b -> IO b) -> m b -> IO b
forall a b. (a -> b) -> a -> b
$ ConduitT () Void m b -> m b
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void m b -> m b) -> ConduitT () Void m b -> m b
forall a b. (a -> b) -> a -> b
$ ConduitT () ByteString m ()
sourceStderr ConduitT () ByteString m ()
-> ConduitT ByteString Void m b -> ConduitT () Void m b
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT ByteString Void m b
consumerStderr))
IO ((), a, b) -> IO () -> IO ((), a, b)
forall a b. IO a -> IO b -> IO a
`finally` (IO ()
closeStdout IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
closeStderr)
IO ((), a, b) -> IO () -> IO ((), a, b)
forall a b. IO a -> IO b -> IO a
`onException` StreamingProcessHandle -> IO ()
forall (m :: * -> *). MonadIO m => StreamingProcessHandle -> m ()
terminateStreamingProcess StreamingProcessHandle
sph
ExitCode
ec <- StreamingProcessHandle -> IO ExitCode
forall (m :: * -> *).
MonadIO m =>
StreamingProcessHandle -> m ExitCode
waitForStreamingProcess StreamingProcessHandle
sph
(ExitCode, a, b) -> IO (ExitCode, a, b)
forall (m :: * -> *) a. Monad m => a -> m a
return (ExitCode
ec, a
resStdout, b
resStderr)
sourceCmdWithStreams
:: MonadUnliftIO m
=> String
-> ConduitT () ByteString m ()
-> ConduitT ByteString Void m a
-> ConduitT ByteString Void m b
-> m (ExitCode, a, b)
sourceCmdWithStreams :: String
-> ConduitT () ByteString m ()
-> ConduitT ByteString Void m a
-> ConduitT ByteString Void m b
-> m (ExitCode, a, b)
sourceCmdWithStreams String
cmd = CreateProcess
-> ConduitT () ByteString m ()
-> ConduitT ByteString Void m a
-> ConduitT ByteString Void m b
-> m (ExitCode, a, b)
forall (m :: * -> *) a b.
MonadUnliftIO m =>
CreateProcess
-> ConduitT () ByteString m ()
-> ConduitT ByteString Void m a
-> ConduitT ByteString Void m b
-> m (ExitCode, a, b)
sourceProcessWithStreams (String -> CreateProcess
shell String
cmd)
withCheckedProcessCleanup
:: ( InputSource stdin
, OutputSink stderr
, OutputSink stdout
, MonadUnliftIO m
)
=> CreateProcess
-> (stdin -> stdout -> stderr -> m b)
-> m b
withCheckedProcessCleanup :: CreateProcess -> (stdin -> stdout -> stderr -> m b) -> m b
withCheckedProcessCleanup CreateProcess
cp stdin -> stdout -> stderr -> m b
f = ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO b) -> m b)
-> ((forall a. m a -> IO a) -> IO b) -> m b
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> IO (stdin, stdout, stderr, StreamingProcessHandle)
-> ((stdin, stdout, stderr, StreamingProcessHandle) -> IO ())
-> ((stdin, stdout, stderr, StreamingProcessHandle) -> IO b)
-> IO b
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket
(CreateProcess -> IO (stdin, stdout, stderr, StreamingProcessHandle)
forall (m :: * -> *) stdin stdout stderr.
(MonadIO m, InputSource stdin, OutputSink stdout,
OutputSink stderr) =>
CreateProcess -> m (stdin, stdout, stderr, StreamingProcessHandle)
streamingProcess CreateProcess
cp)
(\(stdin
_, stdout
_, stderr
_, StreamingProcessHandle
sph) -> StreamingProcessHandle -> IO ()
forall (m :: * -> *). MonadIO m => StreamingProcessHandle -> m ()
closeStreamingProcessHandle StreamingProcessHandle
sph)
(((stdin, stdout, stderr, StreamingProcessHandle) -> IO b) -> IO b)
-> ((stdin, stdout, stderr, StreamingProcessHandle) -> IO b)
-> IO b
forall a b. (a -> b) -> a -> b
$ \(stdin
x, stdout
y, stderr
z, StreamingProcessHandle
sph) -> do
b
res <- m b -> IO b
forall a. m a -> IO a
run (stdin -> stdout -> stderr -> m b
f stdin
x stdout
y stderr
z) IO b -> IO () -> IO b
forall a b. IO a -> IO b -> IO a
`onException` StreamingProcessHandle -> IO ()
forall (m :: * -> *). MonadIO m => StreamingProcessHandle -> m ()
terminateStreamingProcess StreamingProcessHandle
sph
ExitCode
ec <- StreamingProcessHandle -> IO ExitCode
forall (m :: * -> *).
MonadIO m =>
StreamingProcessHandle -> m ExitCode
waitForStreamingProcess StreamingProcessHandle
sph
if ExitCode
ec ExitCode -> ExitCode -> Bool
forall a. Eq a => a -> a -> Bool
== ExitCode
ExitSuccess
then b -> IO b
forall (m :: * -> *) a. Monad m => a -> m a
return b
res
else ProcessExitedUnsuccessfully -> IO b
forall e a. Exception e => e -> IO a
throwIO (ProcessExitedUnsuccessfully -> IO b)
-> ProcessExitedUnsuccessfully -> IO b
forall a b. (a -> b) -> a -> b
$ CreateProcess -> ExitCode -> ProcessExitedUnsuccessfully
ProcessExitedUnsuccessfully CreateProcess
cp ExitCode
ec
terminateStreamingProcess :: MonadIO m => StreamingProcessHandle -> m ()
terminateStreamingProcess :: StreamingProcessHandle -> m ()
terminateStreamingProcess = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ())
-> (StreamingProcessHandle -> IO ())
-> StreamingProcessHandle
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessHandle -> IO ()
terminateProcess (ProcessHandle -> IO ())
-> (StreamingProcessHandle -> ProcessHandle)
-> StreamingProcessHandle
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamingProcessHandle -> ProcessHandle
streamingProcessHandleRaw