{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternGuards #-}
{-# LANGUAGE RecordWildCards #-}
module Network.HTTP2.Server.Worker (
worker
, WorkerConf(..)
, fromContext
) where
import Control.Concurrent.STM
import Control.Exception (SomeException(..), AsyncException(..))
import qualified Control.Exception as E
import Data.IORef
import qualified Network.HTTP.Types as H
import qualified System.TimeManager as T
import Imports hiding (insert)
import Network.HPACK
import Network.HPACK.Token
import Network.HTTP2.Arch
import Network.HTTP2.Frame
import Network.HTTP2.Server.Types
data WorkerConf a = WorkerConf {
WorkerConf a -> IO (Input a)
readInputQ :: IO (Input a)
, WorkerConf a -> Output a -> IO ()
writeOutputQ :: Output a -> IO ()
, WorkerConf a -> a -> IO ()
workerCleanup :: a -> IO ()
, WorkerConf a -> IO Bool
isPushable :: IO Bool
, WorkerConf a -> StreamId -> a -> IO ()
insertStream :: StreamId -> a -> IO ()
, WorkerConf a -> a -> PushPromise -> IO (StreamId, StreamId, a)
makePushStream :: a -> PushPromise -> IO (StreamId, StreamId, a)
}
fromContext :: Context -> WorkerConf Stream
fromContext :: Context -> WorkerConf Stream
fromContext ctx :: Context
ctx@Context{TVar StreamId
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef Settings
TQueue Control
TQueue (Output Stream)
DynamicTable
Rate
StreamTable
RoleInfo
Role
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
connectionWindow :: Context -> TVar StreamId
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
controlQ :: Context -> TQueue Control
outputQ :: Context -> TQueue (Output Stream)
peerStreamId :: Context -> IORef StreamId
myStreamId :: Context -> IORef StreamId
continued :: Context -> IORef (Maybe StreamId)
concurrency :: Context -> IORef StreamId
streamTable :: Context -> StreamTable
firstSettings :: Context -> IORef Bool
http2settings :: Context -> IORef Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
emptyFrameRate :: Rate
settingsRate :: Rate
pingRate :: Rate
connectionWindow :: TVar StreamId
decodeDynamicTable :: DynamicTable
encodeDynamicTable :: DynamicTable
controlQ :: TQueue Control
outputQ :: TQueue (Output Stream)
peerStreamId :: IORef StreamId
myStreamId :: IORef StreamId
continued :: IORef (Maybe StreamId)
concurrency :: IORef StreamId
streamTable :: StreamTable
firstSettings :: IORef Bool
http2settings :: IORef Settings
roleInfo :: RoleInfo
role :: Role
..} = WorkerConf :: forall a.
IO (Input a)
-> (Output a -> IO ())
-> (a -> IO ())
-> IO Bool
-> (StreamId -> a -> IO ())
-> (a -> PushPromise -> IO (StreamId, StreamId, a))
-> WorkerConf a
WorkerConf {
readInputQ :: IO (Input Stream)
readInputQ = STM (Input Stream) -> IO (Input Stream)
forall a. STM a -> IO a
atomically (STM (Input Stream) -> IO (Input Stream))
-> STM (Input Stream) -> IO (Input Stream)
forall a b. (a -> b) -> a -> b
$ TQueue (Input Stream) -> STM (Input Stream)
forall a. TQueue a -> STM a
readTQueue (TQueue (Input Stream) -> STM (Input Stream))
-> TQueue (Input Stream) -> STM (Input Stream)
forall a b. (a -> b) -> a -> b
$ RoleInfo -> TQueue (Input Stream)
inputQ RoleInfo
roleInfo
, writeOutputQ :: Output Stream -> IO ()
writeOutputQ = TQueue (Output Stream) -> Output Stream -> IO ()
enqueueOutput TQueue (Output Stream)
outputQ
, workerCleanup :: Stream -> IO ()
workerCleanup = \Stream
strm -> do
Context -> Stream -> ClosedCode -> IO ()
closed Context
ctx Stream
strm ClosedCode
Killed
let frame :: ByteString
frame = ErrorCodeId -> StreamId -> ByteString
resetFrame ErrorCodeId
InternalError (Stream -> StreamId
streamNumber Stream
strm)
TQueue Control -> Control -> IO ()
enqueueControl TQueue Control
controlQ (Control -> IO ()) -> Control -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> Control
CFrame ByteString
frame
, isPushable :: IO Bool
isPushable = Settings -> Bool
enablePush (Settings -> Bool) -> IO Settings -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef Settings -> IO Settings
forall a. IORef a -> IO a
readIORef IORef Settings
http2settings
, insertStream :: StreamId -> Stream -> IO ()
insertStream = StreamTable -> StreamId -> Stream -> IO ()
insert StreamTable
streamTable
, makePushStream :: Stream -> PushPromise -> IO (StreamId, StreamId, Stream)
makePushStream = \Stream
pstrm PushPromise
_ -> do
StreamId
ws <- Settings -> StreamId
initialWindowSize (Settings -> StreamId) -> IO Settings -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef Settings -> IO Settings
forall a. IORef a -> IO a
readIORef IORef Settings
http2settings
StreamId
sid <- Context -> IO StreamId
getMyNewStreamId Context
ctx
Stream
newstrm <- StreamId -> StreamId -> IO Stream
newPushStream StreamId
sid StreamId
ws
let pid :: StreamId
pid = Stream -> StreamId
streamNumber Stream
pstrm
(StreamId, StreamId, Stream) -> IO (StreamId, StreamId, Stream)
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamId
pid, StreamId
sid, Stream
newstrm)
}
pushStream :: WorkerConf a
-> a
-> ValueTable
-> [PushPromise]
-> IO OutputType
pushStream :: WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
pushStream WorkerConf a
_ a
_ ValueTable
_ [] = OutputType -> IO OutputType
forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
OObj
pushStream WorkerConf{IO Bool
IO (Input a)
a -> IO ()
a -> PushPromise -> IO (StreamId, StreamId, a)
StreamId -> a -> IO ()
Output a -> IO ()
makePushStream :: a -> PushPromise -> IO (StreamId, StreamId, a)
insertStream :: StreamId -> a -> IO ()
isPushable :: IO Bool
workerCleanup :: a -> IO ()
writeOutputQ :: Output a -> IO ()
readInputQ :: IO (Input a)
makePushStream :: forall a.
WorkerConf a -> a -> PushPromise -> IO (StreamId, StreamId, a)
insertStream :: forall a. WorkerConf a -> StreamId -> a -> IO ()
isPushable :: forall a. WorkerConf a -> IO Bool
workerCleanup :: forall a. WorkerConf a -> a -> IO ()
writeOutputQ :: forall a. WorkerConf a -> Output a -> IO ()
readInputQ :: forall a. WorkerConf a -> IO (Input a)
..} a
pstrm ValueTable
reqvt [PushPromise]
pps0
| StreamId
len StreamId -> StreamId -> Bool
forall a. Eq a => a -> a -> Bool
== StreamId
0 = OutputType -> IO OutputType
forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
OObj
| Bool
otherwise = do
Bool
pushable <- IO Bool
isPushable
if Bool
pushable then do
TVar StreamId
tvar <- StreamId -> IO (TVar StreamId)
forall a. a -> IO (TVar a)
newTVarIO StreamId
0
StreamId
lim <- TVar StreamId -> [PushPromise] -> StreamId -> IO StreamId
forall a.
Num a =>
TVar a -> [PushPromise] -> StreamId -> IO StreamId
push TVar StreamId
tvar [PushPromise]
pps0 StreamId
0
if StreamId
lim StreamId -> StreamId -> Bool
forall a. Eq a => a -> a -> Bool
== StreamId
0 then
OutputType -> IO OutputType
forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
OObj
else
OutputType -> IO OutputType
forall (m :: * -> *) a. Monad m => a -> m a
return (OutputType -> IO OutputType) -> OutputType -> IO OutputType
forall a b. (a -> b) -> a -> b
$ IO () -> OutputType
OWait (StreamId -> TVar StreamId -> IO ()
forall a. Ord a => a -> TVar a -> IO ()
waiter StreamId
lim TVar StreamId
tvar)
else
OutputType -> IO OutputType
forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
OObj
where
len :: StreamId
len = [PushPromise] -> StreamId
forall (t :: * -> *) a. Foldable t => t a -> StreamId
length [PushPromise]
pps0
increment :: TVar a -> IO ()
increment TVar a
tvar = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar a -> (a -> a) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar a
tvar (a -> a -> a
forall a. Num a => a -> a -> a
+a
1)
waiter :: a -> TVar a -> IO ()
waiter a
lim TVar a
tvar = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
a
n <- TVar a -> STM a
forall a. TVar a -> STM a
readTVar TVar a
tvar
Bool -> STM ()
check (a
n a -> a -> Bool
forall a. Ord a => a -> a -> Bool
>= a
lim)
push :: TVar a -> [PushPromise] -> StreamId -> IO StreamId
push TVar a
_ [] StreamId
n = StreamId -> IO StreamId
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamId
n :: Int)
push TVar a
tvar (PushPromise
pp:[PushPromise]
pps) StreamId
n = do
(StreamId
pid, StreamId
sid, a
newstrm) <- a -> PushPromise -> IO (StreamId, StreamId, a)
makePushStream a
pstrm PushPromise
pp
StreamId -> a -> IO ()
insertStream StreamId
sid a
newstrm
let scheme :: ByteString
scheme = Maybe ByteString -> ByteString
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe ByteString -> ByteString) -> Maybe ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ Token -> ValueTable -> Maybe ByteString
getHeaderValue Token
tokenScheme ValueTable
reqvt
auth :: ByteString
auth = Maybe ByteString -> ByteString
forall a. HasCallStack => Maybe a -> a
fromJust (Token -> ValueTable -> Maybe ByteString
getHeaderValue Token
tokenHost ValueTable
reqvt
Maybe ByteString -> Maybe ByteString -> Maybe ByteString
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Token -> ValueTable -> Maybe ByteString
getHeaderValue Token
tokenAuthority ValueTable
reqvt)
path :: ByteString
path = PushPromise -> ByteString
promiseRequestPath PushPromise
pp
promiseRequest :: [(Token, ByteString)]
promiseRequest = [(Token
tokenMethod, ByteString
H.methodGet)
,(Token
tokenScheme, ByteString
scheme)
,(Token
tokenAuthority, ByteString
auth)
,(Token
tokenPath, ByteString
path)]
ot :: OutputType
ot = [(Token, ByteString)] -> StreamId -> OutputType
OPush [(Token, ByteString)]
promiseRequest StreamId
pid
Response OutObj
rsp = PushPromise -> Response
promiseResponse PushPromise
pp
out :: Output a
out = a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
newstrm OutObj
rsp OutputType
ot Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing (IO () -> Output a) -> IO () -> Output a
forall a b. (a -> b) -> a -> b
$ TVar a -> IO ()
forall a. Num a => TVar a -> IO ()
increment TVar a
tvar
Output a -> IO ()
writeOutputQ Output a
out
TVar a -> [PushPromise] -> StreamId -> IO StreamId
push TVar a
tvar [PushPromise]
pps (StreamId
n StreamId -> StreamId -> StreamId
forall a. Num a => a -> a -> a
+ StreamId
1)
response :: WorkerConf a -> Manager -> T.Handle -> ThreadContinue -> a -> Request -> Response -> [PushPromise] -> IO ()
response :: WorkerConf a
-> Manager
-> Handle
-> ThreadContinue
-> a
-> Request
-> Response
-> [PushPromise]
-> IO ()
response wc :: WorkerConf a
wc@WorkerConf{IO Bool
IO (Input a)
a -> IO ()
a -> PushPromise -> IO (StreamId, StreamId, a)
StreamId -> a -> IO ()
Output a -> IO ()
makePushStream :: a -> PushPromise -> IO (StreamId, StreamId, a)
insertStream :: StreamId -> a -> IO ()
isPushable :: IO Bool
workerCleanup :: a -> IO ()
writeOutputQ :: Output a -> IO ()
readInputQ :: IO (Input a)
makePushStream :: forall a.
WorkerConf a -> a -> PushPromise -> IO (StreamId, StreamId, a)
insertStream :: forall a. WorkerConf a -> StreamId -> a -> IO ()
isPushable :: forall a. WorkerConf a -> IO Bool
workerCleanup :: forall a. WorkerConf a -> a -> IO ()
writeOutputQ :: forall a. WorkerConf a -> Output a -> IO ()
readInputQ :: forall a. WorkerConf a -> IO (Input a)
..} Manager
mgr Handle
th ThreadContinue
tconf a
strm (Request InpObj
req) (Response OutObj
rsp) [PushPromise]
pps = case OutObj -> OutBody
outObjBody OutObj
rsp of
OutBody
OutBodyNone -> do
ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
True
Output a -> IO ()
writeOutputQ (Output a -> IO ()) -> Output a -> IO ()
forall a b. (a -> b) -> a -> b
$ a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
strm OutObj
rsp OutputType
OObj Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
OutBodyBuilder Builder
_ -> do
OutputType
otyp <- WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
forall a.
WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
pushStream WorkerConf a
wc a
strm ValueTable
reqvt [PushPromise]
pps
ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
True
Output a -> IO ()
writeOutputQ (Output a -> IO ()) -> Output a -> IO ()
forall a b. (a -> b) -> a -> b
$ a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
strm OutObj
rsp OutputType
otyp Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
OutBodyFile FileSpec
_ -> do
OutputType
otyp <- WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
forall a.
WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
pushStream WorkerConf a
wc a
strm ValueTable
reqvt [PushPromise]
pps
ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
True
Output a -> IO ()
writeOutputQ (Output a -> IO ()) -> Output a -> IO ()
forall a b. (a -> b) -> a -> b
$ a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
strm OutObj
rsp OutputType
otyp Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
OutBodyStreaming (Builder -> IO ()) -> IO () -> IO ()
strmbdy -> do
OutputType
otyp <- WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
forall a.
WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
pushStream WorkerConf a
wc a
strm ValueTable
reqvt [PushPromise]
pps
Manager -> IO ()
spawnAction Manager
mgr
ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
False
TBQueue StreamingChunk
tbq <- Natural -> IO (TBQueue StreamingChunk)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
10
Output a -> IO ()
writeOutputQ (Output a -> IO ()) -> Output a -> IO ()
forall a b. (a -> b) -> a -> b
$ a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
strm OutObj
rsp OutputType
otyp (TBQueue StreamingChunk -> Maybe (TBQueue StreamingChunk)
forall a. a -> Maybe a
Just TBQueue StreamingChunk
tbq) (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
let push :: Builder -> IO ()
push Builder
b = do
Handle -> IO ()
T.pause Handle
th
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (Builder -> StreamingChunk
StreamingBuilder Builder
b)
Handle -> IO ()
T.resume Handle
th
flush :: IO ()
flush = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq StreamingChunk
StreamingFlush
(Builder -> IO ()) -> IO () -> IO ()
strmbdy Builder -> IO ()
push IO ()
flush
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq StreamingChunk
StreamingFinished
Manager -> IO ()
deleteMyId Manager
mgr
where
([(Token, ByteString)]
_,ValueTable
reqvt) = InpObj -> ([(Token, ByteString)], ValueTable)
inpObjHeaders InpObj
req
worker :: WorkerConf a -> Manager -> Server -> Action
worker :: WorkerConf a -> Manager -> Server -> IO ()
worker wc :: WorkerConf a
wc@WorkerConf{IO Bool
IO (Input a)
a -> IO ()
a -> PushPromise -> IO (StreamId, StreamId, a)
StreamId -> a -> IO ()
Output a -> IO ()
makePushStream :: a -> PushPromise -> IO (StreamId, StreamId, a)
insertStream :: StreamId -> a -> IO ()
isPushable :: IO Bool
workerCleanup :: a -> IO ()
writeOutputQ :: Output a -> IO ()
readInputQ :: IO (Input a)
makePushStream :: forall a.
WorkerConf a -> a -> PushPromise -> IO (StreamId, StreamId, a)
insertStream :: forall a. WorkerConf a -> StreamId -> a -> IO ()
isPushable :: forall a. WorkerConf a -> IO Bool
workerCleanup :: forall a. WorkerConf a -> a -> IO ()
writeOutputQ :: forall a. WorkerConf a -> Output a -> IO ()
readInputQ :: forall a. WorkerConf a -> IO (Input a)
..} Manager
mgr Server
server = do
StreamInfo a
sinfo <- IO (StreamInfo a)
forall a. IO (StreamInfo a)
newStreamInfo
ThreadContinue
tcont <- IO ThreadContinue
newThreadContinue
Manager -> (Handle -> IO ()) -> IO ()
timeoutKillThread Manager
mgr ((Handle -> IO ()) -> IO ()) -> (Handle -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ StreamInfo a -> ThreadContinue -> Handle -> IO ()
go StreamInfo a
sinfo ThreadContinue
tcont
where
go :: StreamInfo a -> ThreadContinue -> Handle -> IO ()
go StreamInfo a
sinfo ThreadContinue
tcont Handle
th = do
ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tcont Bool
True
Either SomeException ()
ex <- IO () -> IO (Either SomeException ())
forall e a. Exception e => IO a -> IO (Either e a)
E.try (IO () -> IO (Either SomeException ()))
-> IO () -> IO (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ do
Handle -> IO ()
T.pause Handle
th
Input a
strm InpObj
req <- IO (Input a)
readInputQ
let req' :: InpObj
req' = InpObj -> Handle -> InpObj
pauseRequestBody InpObj
req Handle
th
StreamInfo a -> a -> IO ()
forall a. StreamInfo a -> a -> IO ()
setStreamInfo StreamInfo a
sinfo a
strm
Handle -> IO ()
T.resume Handle
th
Handle -> IO ()
T.tickle Handle
th
let aux :: Aux
aux = Handle -> Aux
Aux Handle
th
Server
server (InpObj -> Request
Request InpObj
req') Aux
aux ((Response -> [PushPromise] -> IO ()) -> IO ())
-> (Response -> [PushPromise] -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ WorkerConf a
-> Manager
-> Handle
-> ThreadContinue
-> a
-> Request
-> Response
-> [PushPromise]
-> IO ()
forall a.
WorkerConf a
-> Manager
-> Handle
-> ThreadContinue
-> a
-> Request
-> Response
-> [PushPromise]
-> IO ()
response WorkerConf a
wc Manager
mgr Handle
th ThreadContinue
tcont a
strm (InpObj -> Request
Request InpObj
req')
Bool
cont1 <- case Either SomeException ()
ex of
Right () -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Left e :: SomeException
e@(SomeException e
_)
| Just AsyncException
ThreadKilled <- SomeException -> Maybe AsyncException
forall e. Exception e => SomeException -> Maybe e
E.fromException SomeException
e -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
| Just TimeoutThread
T.TimeoutThread <- SomeException -> Maybe TimeoutThread
forall e. Exception e => SomeException -> Maybe e
E.fromException SomeException
e -> do
StreamInfo a -> IO ()
cleanup StreamInfo a
sinfo
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
| Bool
otherwise -> do
StreamInfo a -> IO ()
cleanup StreamInfo a
sinfo
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Bool
cont2 <- ThreadContinue -> IO Bool
getThreadContinue ThreadContinue
tcont
StreamInfo a -> IO ()
forall a. StreamInfo a -> IO ()
clearStreamInfo StreamInfo a
sinfo
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool
cont1 Bool -> Bool -> Bool
&& Bool
cont2) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ StreamInfo a -> ThreadContinue -> Handle -> IO ()
go StreamInfo a
sinfo ThreadContinue
tcont Handle
th
pauseRequestBody :: InpObj -> Handle -> InpObj
pauseRequestBody InpObj
req Handle
th = InpObj
req { inpObjBody :: InpBody
inpObjBody = InpBody
readBody' }
where
readBody :: InpBody
readBody = InpObj -> InpBody
inpObjBody InpObj
req
readBody' :: InpBody
readBody' = do
Handle -> IO ()
T.pause Handle
th
ByteString
bs <- InpBody
readBody
Handle -> IO ()
T.resume Handle
th
ByteString -> InpBody
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
bs
cleanup :: StreamInfo a -> IO ()
cleanup StreamInfo a
sinfo = do
Maybe a
minp <- StreamInfo a -> IO (Maybe a)
forall a. StreamInfo a -> IO (Maybe a)
getStreamInfo StreamInfo a
sinfo
case Maybe a
minp of
Maybe a
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just a
strm -> a -> IO ()
workerCleanup a
strm
newtype ThreadContinue = ThreadContinue (IORef Bool)
{-# INLINE newThreadContinue #-}
newThreadContinue :: IO ThreadContinue
newThreadContinue :: IO ThreadContinue
newThreadContinue = IORef Bool -> ThreadContinue
ThreadContinue (IORef Bool -> ThreadContinue)
-> IO (IORef Bool) -> IO ThreadContinue
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
True
{-# INLINE setThreadContinue #-}
setThreadContinue :: ThreadContinue -> Bool -> IO ()
setThreadContinue :: ThreadContinue -> Bool -> IO ()
setThreadContinue (ThreadContinue IORef Bool
ref) Bool
x = IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
ref Bool
x
{-# INLINE getThreadContinue #-}
getThreadContinue :: ThreadContinue -> IO Bool
getThreadContinue :: ThreadContinue -> IO Bool
getThreadContinue (ThreadContinue IORef Bool
ref) = IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
ref
newtype StreamInfo a = StreamInfo (IORef (Maybe a))
{-# INLINE newStreamInfo #-}
newStreamInfo :: IO (StreamInfo a)
newStreamInfo :: IO (StreamInfo a)
newStreamInfo = IORef (Maybe a) -> StreamInfo a
forall a. IORef (Maybe a) -> StreamInfo a
StreamInfo (IORef (Maybe a) -> StreamInfo a)
-> IO (IORef (Maybe a)) -> IO (StreamInfo a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe a -> IO (IORef (Maybe a))
forall a. a -> IO (IORef a)
newIORef Maybe a
forall a. Maybe a
Nothing
{-# INLINE clearStreamInfo #-}
clearStreamInfo :: StreamInfo a -> IO ()
clearStreamInfo :: StreamInfo a -> IO ()
clearStreamInfo (StreamInfo IORef (Maybe a)
ref) = IORef (Maybe a) -> Maybe a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref Maybe a
forall a. Maybe a
Nothing
{-# INLINE setStreamInfo #-}
setStreamInfo :: StreamInfo a -> a -> IO ()
setStreamInfo :: StreamInfo a -> a -> IO ()
setStreamInfo (StreamInfo IORef (Maybe a)
ref) a
inp = IORef (Maybe a) -> Maybe a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref (Maybe a -> IO ()) -> Maybe a -> IO ()
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
inp
{-# INLINE getStreamInfo #-}
getStreamInfo :: StreamInfo a -> IO (Maybe a)
getStreamInfo :: StreamInfo a -> IO (Maybe a)
getStreamInfo (StreamInfo IORef (Maybe a)
ref) = IORef (Maybe a) -> IO (Maybe a)
forall a. IORef a -> IO a
readIORef IORef (Maybe a)
ref