{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
module Servant.API.WebSocketConduit where
import Control.Concurrent (newEmptyMVar, putMVar, takeMVar)
import Control.Concurrent.Async (race_)
import Control.Monad (forever, void, (>=>))
import Control.Monad.Catch (handle)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans.Control (MonadBaseControl)
import Control.Monad.Trans.Resource (MonadUnliftIO, ResourceT, runResourceT)
import Data.Aeson (FromJSON, ToJSON, decode, encode)
import Data.ByteString.Lazy (fromStrict)
import Data.Conduit (ConduitT, runConduitRes, yieldM, (.|))
import Data.Proxy (Proxy (..))
import Data.Text (Text)
import Data.Void (Void)
import Network.Wai.Handler.WebSockets (websocketsOr)
import Network.WebSockets (Connection, ConnectionException, acceptRequest,
defaultConnectionOptions, forkPingThread, receiveData,
receiveDataMessage, sendClose, sendTextData)
import Servant.Server (HasServer (..), ServerError (..), ServerT)
import Servant.Server.Internal.Router (leafRouter)
import Servant.Server.Internal.RouteResult (RouteResult (..))
import Servant.Server.Internal.Delayed (runDelayed)
import qualified Data.Conduit.List as CL
data WebSocketConduit i o
instance (FromJSON i, ToJSON o) => HasServer (WebSocketConduit i o) ctx where
type ServerT (WebSocketConduit i o) m = ConduitT i o (ResourceT IO) ()
#if MIN_VERSION_servant_server(0,12,0)
hoistServerWithContext :: Proxy (WebSocketConduit i o)
-> Proxy ctx
-> (forall x. m x -> n x)
-> ServerT (WebSocketConduit i o) m
-> ServerT (WebSocketConduit i o) n
hoistServerWithContext Proxy (WebSocketConduit i o)
_ Proxy ctx
_ forall x. m x -> n x
_ ServerT (WebSocketConduit i o) m
svr = ServerT (WebSocketConduit i o) m
ServerT (WebSocketConduit i o) n
svr
#endif
route :: Proxy (WebSocketConduit i o)
-> Context ctx
-> Delayed env (Server (WebSocketConduit i o))
-> Router env
route Proxy (WebSocketConduit i o)
Proxy Context ctx
_ Delayed env (Server (WebSocketConduit i o))
app = (env
-> Request
-> (RouteResult Response -> IO ResponseReceived)
-> IO ResponseReceived)
-> Router env
forall env a. (env -> a) -> Router' env a
leafRouter ((env
-> Request
-> (RouteResult Response -> IO ResponseReceived)
-> IO ResponseReceived)
-> Router env)
-> (env
-> Request
-> (RouteResult Response -> IO ResponseReceived)
-> IO ResponseReceived)
-> Router env
forall a b. (a -> b) -> a -> b
$ \env
env Request
request RouteResult Response -> IO ResponseReceived
respond -> ResourceT IO ResponseReceived -> IO ResponseReceived
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
runResourceT (ResourceT IO ResponseReceived -> IO ResponseReceived)
-> ResourceT IO ResponseReceived -> IO ResponseReceived
forall a b. (a -> b) -> a -> b
$
Delayed env (ConduitT i o (ResourceT IO) ())
-> env
-> Request
-> ResourceT IO (RouteResult (ConduitT i o (ResourceT IO) ()))
forall env a.
Delayed env a -> env -> Request -> ResourceT IO (RouteResult a)
runDelayed Delayed env (ConduitT i o (ResourceT IO) ())
Delayed env (Server (WebSocketConduit i o))
app env
env Request
request ResourceT IO (RouteResult (ConduitT i o (ResourceT IO) ()))
-> (RouteResult (ConduitT i o (ResourceT IO) ())
-> ResourceT IO ResponseReceived)
-> ResourceT IO ResponseReceived
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO ResponseReceived -> ResourceT IO ResponseReceived
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ResponseReceived -> ResourceT IO ResponseReceived)
-> (RouteResult (ConduitT i o (ResourceT IO) ())
-> IO ResponseReceived)
-> RouteResult (ConduitT i o (ResourceT IO) ())
-> ResourceT IO ResponseReceived
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Request
-> (RouteResult Response -> IO ResponseReceived)
-> RouteResult (ConduitT i o (ResourceT IO) ())
-> IO ResponseReceived
forall b b.
(FromJSON b, ToJSON b) =>
Request
-> (RouteResult Response -> IO ResponseReceived)
-> RouteResult (ConduitT b b (ResourceT IO) ())
-> IO ResponseReceived
go Request
request RouteResult Response -> IO ResponseReceived
respond
where
go :: Request
-> (RouteResult Response -> IO ResponseReceived)
-> RouteResult (ConduitT b b (ResourceT IO) ())
-> IO ResponseReceived
go Request
request RouteResult Response -> IO ResponseReceived
respond (Route ConduitT b b (ResourceT IO) ()
cond) =
ConnectionOptions -> ServerApp -> Application -> Application
websocketsOr
ConnectionOptions
defaultConnectionOptions
(ConduitT b b (ResourceT IO) () -> ServerApp
forall b b.
(FromJSON b, ToJSON b) =>
ConduitT b b (ResourceT IO) () -> ServerApp
runWSApp ConduitT b b (ResourceT IO) ()
cond)
(\Request
_ Response -> IO ResponseReceived
_ -> RouteResult Response -> IO ResponseReceived
respond (RouteResult Response -> IO ResponseReceived)
-> RouteResult Response -> IO ResponseReceived
forall a b. (a -> b) -> a -> b
$ ServerError -> RouteResult Response
forall a. ServerError -> RouteResult a
Fail ServerError
upgradeRequired)
Request
request (RouteResult Response -> IO ResponseReceived
respond (RouteResult Response -> IO ResponseReceived)
-> (Response -> RouteResult Response)
-> Response
-> IO ResponseReceived
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Response -> RouteResult Response
forall a. a -> RouteResult a
Route)
go Request
_ RouteResult Response -> IO ResponseReceived
respond (Fail ServerError
e) = RouteResult Response -> IO ResponseReceived
respond (RouteResult Response -> IO ResponseReceived)
-> RouteResult Response -> IO ResponseReceived
forall a b. (a -> b) -> a -> b
$ ServerError -> RouteResult Response
forall a. ServerError -> RouteResult a
Fail ServerError
e
go Request
_ RouteResult Response -> IO ResponseReceived
respond (FailFatal ServerError
e) = RouteResult Response -> IO ResponseReceived
respond (RouteResult Response -> IO ResponseReceived)
-> RouteResult Response -> IO ResponseReceived
forall a b. (a -> b) -> a -> b
$ ServerError -> RouteResult Response
forall a. ServerError -> RouteResult a
FailFatal ServerError
e
runWSApp :: ConduitT b b (ResourceT IO) () -> ServerApp
runWSApp ConduitT b b (ResourceT IO) ()
cond = PendingConnection -> IO Connection
acceptRequest (PendingConnection -> IO Connection)
-> (Connection -> IO ()) -> ServerApp
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> \Connection
c -> (ConnectionException -> IO ()) -> IO () -> IO ()
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
(e -> m a) -> m a -> m a
handle (\(ConnectionException
_ :: ConnectionException) -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
MVar ByteString
i <- IO (MVar ByteString)
forall a. IO (MVar a)
newEmptyMVar
IO Any -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
race_ (IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO Any) -> IO () -> IO Any
forall a b. (a -> b) -> a -> b
$ Connection -> IO ByteString
forall a. WebSocketsData a => Connection -> IO a
receiveData Connection
c IO ByteString -> (ByteString -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= MVar ByteString -> ByteString -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ByteString
i) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
Connection -> ConduitT () Void (ResourceT IO) () -> IO ()
forall (m :: * -> *).
(MonadBaseControl IO m, MonadUnliftIO m) =>
Connection -> ConduitT () Void (ResourceT m) () -> m ()
runConduitWebSocket Connection
c (ConduitT () Void (ResourceT IO) () -> IO ())
-> ConduitT () Void (ResourceT IO) () -> IO ()
forall a b. (a -> b) -> a -> b
$
ConduitT () ByteString (ResourceT IO) ()
-> ConduitT () ByteString (ResourceT IO) ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (ResourceT IO ByteString -> ConduitT () ByteString (ResourceT IO) ()
forall (m :: * -> *) o i. Monad m => m o -> ConduitT i o m ()
yieldM (ResourceT IO ByteString
-> ConduitT () ByteString (ResourceT IO) ())
-> (IO ByteString -> ResourceT IO ByteString)
-> IO ByteString
-> ConduitT () ByteString (ResourceT IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO ByteString -> ResourceT IO ByteString
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ByteString -> ConduitT () ByteString (ResourceT IO) ())
-> IO ByteString -> ConduitT () ByteString (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ MVar ByteString -> IO ByteString
forall a. MVar a -> IO a
takeMVar MVar ByteString
i)
ConduitT () ByteString (ResourceT IO) ()
-> ConduitT ByteString Void (ResourceT IO) ()
-> ConduitT () Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| (ByteString -> Maybe b) -> ConduitT ByteString b (ResourceT IO) ()
forall (m :: * -> *) a b.
Monad m =>
(a -> Maybe b) -> ConduitT a b m ()
CL.mapMaybe (ByteString -> Maybe b
forall a. FromJSON a => ByteString -> Maybe a
decode (ByteString -> Maybe b)
-> (ByteString -> ByteString) -> ByteString -> Maybe b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
fromStrict)
ConduitT ByteString b (ResourceT IO) ()
-> ConduitT b Void (ResourceT IO) ()
-> ConduitT ByteString Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT b b (ResourceT IO) ()
cond
ConduitT b b (ResourceT IO) ()
-> ConduitT b Void (ResourceT IO) ()
-> ConduitT b Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| (b -> ResourceT IO ()) -> ConduitT b Void (ResourceT IO) ()
forall (m :: * -> *) a o.
Monad m =>
(a -> m ()) -> ConduitT a o m ()
CL.mapM_ (IO () -> ResourceT IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ResourceT IO ()) -> (b -> IO ()) -> b -> ResourceT IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> ByteString -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
sendTextData Connection
c (ByteString -> IO ()) -> (b -> ByteString) -> b -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. b -> ByteString
forall a. ToJSON a => a -> ByteString
encode)
data WebSocketSource o
instance ToJSON o => HasServer (WebSocketSource o) ctx where
type ServerT (WebSocketSource o) m = ConduitT () o (ResourceT IO) ()
#if MIN_VERSION_servant_server(0,12,0)
hoistServerWithContext :: Proxy (WebSocketSource o)
-> Proxy ctx
-> (forall x. m x -> n x)
-> ServerT (WebSocketSource o) m
-> ServerT (WebSocketSource o) n
hoistServerWithContext Proxy (WebSocketSource o)
_ Proxy ctx
_ forall x. m x -> n x
_ ServerT (WebSocketSource o) m
svr = ServerT (WebSocketSource o) m
ServerT (WebSocketSource o) n
svr
#endif
route :: Proxy (WebSocketSource o)
-> Context ctx
-> Delayed env (Server (WebSocketSource o))
-> Router env
route Proxy (WebSocketSource o)
Proxy Context ctx
_ Delayed env (Server (WebSocketSource o))
app = (env
-> Request
-> (RouteResult Response -> IO ResponseReceived)
-> IO ResponseReceived)
-> Router env
forall env a. (env -> a) -> Router' env a
leafRouter ((env
-> Request
-> (RouteResult Response -> IO ResponseReceived)
-> IO ResponseReceived)
-> Router env)
-> (env
-> Request
-> (RouteResult Response -> IO ResponseReceived)
-> IO ResponseReceived)
-> Router env
forall a b. (a -> b) -> a -> b
$ \env
env Request
request RouteResult Response -> IO ResponseReceived
respond -> ResourceT IO ResponseReceived -> IO ResponseReceived
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
runResourceT (ResourceT IO ResponseReceived -> IO ResponseReceived)
-> ResourceT IO ResponseReceived -> IO ResponseReceived
forall a b. (a -> b) -> a -> b
$
Delayed env (ConduitT () o (ResourceT IO) ())
-> env
-> Request
-> ResourceT IO (RouteResult (ConduitT () o (ResourceT IO) ()))
forall env a.
Delayed env a -> env -> Request -> ResourceT IO (RouteResult a)
runDelayed Delayed env (ConduitT () o (ResourceT IO) ())
Delayed env (Server (WebSocketSource o))
app env
env Request
request ResourceT IO (RouteResult (ConduitT () o (ResourceT IO) ()))
-> (RouteResult (ConduitT () o (ResourceT IO) ())
-> ResourceT IO ResponseReceived)
-> ResourceT IO ResponseReceived
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO ResponseReceived -> ResourceT IO ResponseReceived
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ResponseReceived -> ResourceT IO ResponseReceived)
-> (RouteResult (ConduitT () o (ResourceT IO) ())
-> IO ResponseReceived)
-> RouteResult (ConduitT () o (ResourceT IO) ())
-> ResourceT IO ResponseReceived
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Request
-> (RouteResult Response -> IO ResponseReceived)
-> RouteResult (ConduitT () o (ResourceT IO) ())
-> IO ResponseReceived
forall b.
ToJSON b =>
Request
-> (RouteResult Response -> IO ResponseReceived)
-> RouteResult (ConduitT () b (ResourceT IO) ())
-> IO ResponseReceived
go Request
request RouteResult Response -> IO ResponseReceived
respond
where
go :: Request
-> (RouteResult Response -> IO ResponseReceived)
-> RouteResult (ConduitT () b (ResourceT IO) ())
-> IO ResponseReceived
go Request
request RouteResult Response -> IO ResponseReceived
respond (Route ConduitT () b (ResourceT IO) ()
cond) =
ConnectionOptions -> ServerApp -> Application -> Application
websocketsOr
ConnectionOptions
defaultConnectionOptions
(ConduitT () b (ResourceT IO) () -> ServerApp
forall b. ToJSON b => ConduitT () b (ResourceT IO) () -> ServerApp
runWSApp ConduitT () b (ResourceT IO) ()
cond)
(\Request
_ Response -> IO ResponseReceived
_ -> RouteResult Response -> IO ResponseReceived
respond (RouteResult Response -> IO ResponseReceived)
-> RouteResult Response -> IO ResponseReceived
forall a b. (a -> b) -> a -> b
$ ServerError -> RouteResult Response
forall a. ServerError -> RouteResult a
Fail ServerError
upgradeRequired)
Request
request (RouteResult Response -> IO ResponseReceived
respond (RouteResult Response -> IO ResponseReceived)
-> (Response -> RouteResult Response)
-> Response
-> IO ResponseReceived
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Response -> RouteResult Response
forall a. a -> RouteResult a
Route)
go Request
_ RouteResult Response -> IO ResponseReceived
respond (Fail ServerError
e) = RouteResult Response -> IO ResponseReceived
respond (RouteResult Response -> IO ResponseReceived)
-> RouteResult Response -> IO ResponseReceived
forall a b. (a -> b) -> a -> b
$ ServerError -> RouteResult Response
forall a. ServerError -> RouteResult a
Fail ServerError
e
go Request
_ RouteResult Response -> IO ResponseReceived
respond (FailFatal ServerError
e) = RouteResult Response -> IO ResponseReceived
respond (RouteResult Response -> IO ResponseReceived)
-> RouteResult Response -> IO ResponseReceived
forall a b. (a -> b) -> a -> b
$ ServerError -> RouteResult Response
forall a. ServerError -> RouteResult a
FailFatal ServerError
e
runWSApp :: ConduitT () b (ResourceT IO) () -> ServerApp
runWSApp ConduitT () b (ResourceT IO) ()
cond = PendingConnection -> IO Connection
acceptRequest (PendingConnection -> IO Connection)
-> (Connection -> IO ()) -> ServerApp
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> \Connection
c -> (ConnectionException -> IO ()) -> IO () -> IO ()
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
(e -> m a) -> m a -> m a
handle (\(ConnectionException
_ :: ConnectionException) -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
IO Any -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
race_ (IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO Any) -> (IO Text -> IO ()) -> IO Text -> IO Any
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO Text -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Text -> IO Any) -> IO Text -> IO Any
forall a b. (a -> b) -> a -> b
$ (Connection -> IO Text
forall a. WebSocketsData a => Connection -> IO a
receiveData Connection
c :: IO Text)) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
Connection -> ConduitT () Void (ResourceT IO) () -> IO ()
forall (m :: * -> *).
(MonadBaseControl IO m, MonadUnliftIO m) =>
Connection -> ConduitT () Void (ResourceT m) () -> m ()
runConduitWebSocket Connection
c (ConduitT () Void (ResourceT IO) () -> IO ())
-> ConduitT () Void (ResourceT IO) () -> IO ()
forall a b. (a -> b) -> a -> b
$ ConduitT () b (ResourceT IO) ()
cond ConduitT () b (ResourceT IO) ()
-> ConduitT b Void (ResourceT IO) ()
-> ConduitT () Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| (b -> ResourceT IO ()) -> ConduitT b Void (ResourceT IO) ()
forall (m :: * -> *) a o.
Monad m =>
(a -> m ()) -> ConduitT a o m ()
CL.mapM_ (IO () -> ResourceT IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ResourceT IO ()) -> (b -> IO ()) -> b -> ResourceT IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> ByteString -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
sendTextData Connection
c (ByteString -> IO ()) -> (b -> ByteString) -> b -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. b -> ByteString
forall a. ToJSON a => a -> ByteString
encode)
runConduitWebSocket :: (MonadBaseControl IO m, MonadUnliftIO m) => Connection -> ConduitT () Void (ResourceT m) () -> m ()
runConduitWebSocket :: Connection -> ConduitT () Void (ResourceT m) () -> m ()
runConduitWebSocket Connection
c ConduitT () Void (ResourceT m) ()
a = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Connection -> Int -> IO ()
forkPingThread Connection
c Int
10
m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ ConduitT () Void (ResourceT m) () -> m ()
forall (m :: * -> *) r.
MonadUnliftIO m =>
ConduitT () Void (ResourceT m) r -> m r
runConduitRes ConduitT () Void (ResourceT m) ()
a
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Connection -> Text -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
sendClose Connection
c (Text
"Out of data" :: Text)
IO DataMessage -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO DataMessage -> IO ()) -> IO DataMessage -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection -> IO DataMessage
receiveDataMessage Connection
c
upgradeRequired :: ServerError
upgradeRequired :: ServerError
upgradeRequired = ServerError :: Int -> String -> ByteString -> [Header] -> ServerError
ServerError { errHTTPCode :: Int
errHTTPCode = Int
426
, errReasonPhrase :: String
errReasonPhrase = String
"Upgrade Required"
, errBody :: ByteString
errBody = ByteString
forall a. Monoid a => a
mempty
, errHeaders :: [Header]
errHeaders = [Header]
forall a. Monoid a => a
mempty
}