module Network.Wai.Handler.Warp.HTTP2.Worker (
Respond
, response
, worker
) where
#if __GLASGOW_HASKELL__ < 709
import Control.Applicative
#endif
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception (Exception, SomeException(..), AsyncException(..))
import qualified Control.Exception as E
import Control.Monad (void, when)
import Data.Typeable
import qualified Network.HTTP.Types as H
import Network.HTTP2
import Network.HTTP2.Priority
import Network.Wai
import Network.Wai.Handler.Warp.HTTP2.EncodeFrame
import Network.Wai.Handler.Warp.HTTP2.Manager
import Network.Wai.Handler.Warp.HTTP2.Types
import Network.Wai.Handler.Warp.IORef
import Network.Wai.HTTP2
( Chunk(..)
, HTTP2Application
, PushPromise
, Responder(runResponder)
, RespondFunc
)
import qualified Network.Wai.Handler.Warp.Settings as S
import qualified Network.Wai.Handler.Warp.Timeout as T
type Respond = IO () -> Stream -> RespondFunc ()
response :: Context -> Manager -> ThreadContinue -> Respond
response ctx mgr tconf tickle strm s h strmbdy = do
myThreadId >>= replaceWithAction mgr
setThreadContinue tconf False
runStream ctx OResponse tickle strm s h strmbdy
runStream :: Context
-> (Stream -> H.Status -> H.ResponseHeaders -> Aux -> Output)
-> Respond
runStream Context{outputQ} mkOutput tickle strm s h strmbdy = do
sq <- newTBQueueIO 10
tvar <- newTVarIO SyncNone
let out = mkOutput strm s h (Persist sq tvar)
void $ forkIO $ waiter tvar sq strm outputQ
atomically $ writeTVar tvar $ SyncNext out
let write chunk = do
atomically $ writeTBQueue sq $ case chunk of
BuilderChunk b -> SBuilder b
FileChunk path part -> SFile path part
tickle
flush = atomically $ writeTBQueue sq SFlush
trailers <- strmbdy write flush
atomically $ writeTBQueue sq $ SFinish trailers
cleanupStream :: Context -> S.Settings -> Stream -> Maybe Request -> Maybe SomeException -> IO ()
cleanupStream Context{outputQ} set strm req me = do
closed strm Killed
let sid = streamNumber strm
frame = resetFrame InternalError sid
enqueueControl outputQ sid $ OFrame frame
case me of
Nothing -> return ()
Just e -> S.settingsOnException set req e
pushResponder :: Context -> S.Settings -> Stream -> PushPromise -> Responder -> IO Bool
pushResponder ctx set strm promise responder = do
let Context{ http2settings
, pushConcurrency
} = ctx
cnt <- readIORef pushConcurrency
settings <- readIORef http2settings
let enabled = enablePush settings
fits = maybe True (cnt <) $ maxConcurrentStreams settings
canPush = fits && enabled
if canPush then
actuallyPushResponder ctx set strm promise responder
else
return False
actuallyPushResponder :: Context -> S.Settings -> Stream -> PushPromise -> Responder -> IO Bool
actuallyPushResponder ctx set strm promise responder = do
let Context{ http2settings
, nextPushStreamId
, pushConcurrency
, streamTable
} = ctx
newSid <- atomicModifyIORef nextPushStreamId $ \sid -> (sid+2, sid)
ws <- initialWindowSize <$> readIORef http2settings
newStrm <- newStream pushConcurrency newSid ws
writeIORef (streamPrecedence newStrm) $
toPrecedence $ defaultPriority { streamDependency = streamNumber strm }
opened newStrm
insert streamTable newSid newStrm
mvar <- newEmptyMVar
let mkOutput = OPush strm promise mvar
tickle = return ()
respond = runStream ctx mkOutput
_ <- forkIO $ runResponder responder (respond tickle newStrm) `E.catch`
(cleanupStream ctx set strm Nothing . Just)
takeMVar mvar
data Break = Break deriving (Show, Typeable)
instance Exception Break
worker :: Context
-> S.Settings
-> T.Manager
-> HTTP2Application
-> (ThreadContinue -> Respond)
-> IO ()
worker ctx@Context{inputQ} set tm app respond = do
tid <- myThreadId
sinfo <- newStreamInfo
tcont <- newThreadContinue
let setup = T.register tm $ E.throwTo tid Break
E.bracket setup T.cancel $ go sinfo tcont
where
go sinfo tcont th = do
setThreadContinue tcont True
ex <- E.try $ do
T.pause th
Input strm req <- atomically $ readTQueue inputQ
setStreamInfo sinfo strm req
T.resume th
T.tickle th
let responder = app req $ pushResponder ctx set strm
runResponder responder $ respond tcont (T.tickle th) strm
cont1 <- case ex of
Right () -> return True
Left e@(SomeException _)
| Just Break <- E.fromException e -> do
cleanup sinfo Nothing
return True
| Just ThreadKilled <- E.fromException e -> do
cleanup sinfo Nothing
return False
| otherwise -> do
cleanup sinfo (Just e)
return True
cont2 <- getThreadContinue tcont
when (cont1 && cont2) $ go sinfo tcont th
cleanup sinfo me = do
m <- getStreamInfo sinfo
case m of
Nothing -> return ()
Just (strm,req) -> do
cleanupStream ctx set strm (Just req) me
clearStreamInfo sinfo
waiter :: TVar Sync -> TBQueue Sequence -> Stream -> PriorityTree Output -> IO ()
waiter tvar sq strm outQ = do
mx <- atomically $ do
mout <- readTVar tvar
case mout of
SyncNone -> retry
SyncNext out -> do
writeTVar tvar SyncNone
return $ Just out
SyncFinish -> return Nothing
case mx of
Nothing -> return ()
Just out -> do
atomically $ do
isEmpty <- isEmptyTBQueue sq
when isEmpty retry
enqueueWhenWindowIsOpen outQ out
waiter tvar sq strm outQ
newtype ThreadContinue = ThreadContinue (IORef Bool)
newThreadContinue :: IO ThreadContinue
newThreadContinue = ThreadContinue <$> newIORef True
setThreadContinue :: ThreadContinue -> Bool -> IO ()
setThreadContinue (ThreadContinue ref) x = writeIORef ref x
getThreadContinue :: ThreadContinue -> IO Bool
getThreadContinue (ThreadContinue ref) = readIORef ref
newtype StreamInfo = StreamInfo (IORef (Maybe (Stream,Request)))
newStreamInfo :: IO StreamInfo
newStreamInfo = StreamInfo <$> newIORef Nothing
clearStreamInfo :: StreamInfo -> IO ()
clearStreamInfo (StreamInfo ref) = writeIORef ref Nothing
setStreamInfo :: StreamInfo -> Stream -> Request -> IO ()
setStreamInfo (StreamInfo ref) strm req = writeIORef ref $ Just (strm,req)
getStreamInfo :: StreamInfo -> IO (Maybe (Stream, Request))
getStreamInfo (StreamInfo ref) = readIORef ref