Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework the CQueue implementation #38

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/haskell-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ jobs:
constraints: troupe +werror
allow-newer: focus-1.0.3:transformers
allow-newer: stm-hamt-1.2.0.9:transformers
allow-newer: concurrency-1.11.0.2:mtl
allow-newer: concurrency-1.11.0.2:transformers
allow-newer: dejafu-2.4.0.4:transformers
EOF
$HCPKG list --simple-output --names-only | perl -ne 'for (split /\s+/) { print "constraints: $_ installed\n" unless /^(troupe)$/; }' >> cabal.project.local
cat cabal.project
Expand Down
3 changes: 3 additions & 0 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ Packages:
Allow-Newer:
focus-1.0.3:transformers,
stm-hamt-1.2.0.9:transformers,
concurrency-1.11.0.2:mtl,
concurrency-1.11.0.2:transformers,
dejafu-2.4.0.4:transformers,

Constraints:
troupe +werror,
92 changes: 92 additions & 0 deletions troupe/src/Troupe/Queue.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE LambdaCase #-}

module Troupe.Queue
( Queue,
newQueue,
enqueue,
Match (..),
dequeue,
)
where

import Control.Concurrent.Classy.STM
( MonadSTM,
TQueue,
TVar,
flushTQueue,
modifyTVar',
newTQueue,
newTVar,
orElse,
readTVar,
retry,
tryReadTQueue,
unGetTQueue,
writeTQueue,
writeTVar,
)
import qualified Control.Concurrent.STM as CCS
import Control.Monad.Conc.Class (MonadConc, STM, atomically)

data Queue stm a = Queue
{ queueTQueue :: {-# UNPACK #-} !(TQueue stm a),
queueMessages :: !(TVar stm [a])
}

newQueue :: (MonadSTM stm) => stm (Queue stm a)
newQueue =
Queue
<$> newTQueue
<*> newTVar []
{-# SPECIALIZE newQueue :: CCS.STM (Queue CCS.STM a) #-}

enqueue :: (MonadSTM stm) => Queue stm a -> a -> stm ()
enqueue queue = writeTQueue (queueTQueue queue)
{-# INLINE enqueue #-}
{-# SPECIALIZE enqueue :: Queue CCS.STM a -> a -> CCS.STM () #-}

data Match stm a b
= MatchMessage (a -> Maybe b)
| MatchSTM (stm b)
deriving (Functor)

dequeue :: (MonadConc m) => Queue (STM m) a -> [Match (STM m) a b] -> m b
dequeue queue matches = do
atomically getMessages
atomically $
handleExistingMessages >>= \case
Nothing -> handleNewMessages
Just b -> pure b
where
getMessages = do
newMessages <- flushTQueue (queueTQueue queue)
modifyTVar' (queueMessages queue) (++ newMessages)
handleExistingMessages = do
messages <- readTVar (queueMessages queue)
foldr orElse (pure Nothing) $ flip map matches $ \case
MatchMessage fn -> findMessage fn [] messages
MatchSTM stm -> fmap Just stm
findMessage fn acc = \case
[] -> retry
(x : xs) -> case fn x of
Nothing -> findMessage fn (x : acc) xs
Just a -> do
writeTVar (queueMessages queue) (reverse acc ++ xs)
pure (Just a)
handleNewMessages = foldr orElse (storeMessage >> retry) $ flip map matches $ \case
MatchMessage fn ->
tryReadTQueue (queueTQueue queue) >>= \case
Nothing -> retry
Just msg -> case fn msg of
Nothing -> do
unGetTQueue (queueTQueue queue) msg
retry
Just a -> pure a
MatchSTM stm -> stm
storeMessage = do
mmsg <- tryReadTQueue (queueTQueue queue)
case mmsg of
Nothing -> pure ()
Just msg -> modifyTVar' (queueMessages queue) (\old -> old ++ [msg])
{-# SPECIALIZE dequeue :: Queue CCS.STM a -> [Match CCS.STM a b] -> IO b #-}
125 changes: 125 additions & 0 deletions troupe/test/Troupe/Queue/Test.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
module Troupe.Queue.Test (tests) where

import Control.Concurrent.Classy (threadDelay)
import Control.Concurrent.Classy.Async (withAsync)
import Control.Concurrent.Classy.STM (check, readTVar, registerDelay, throwSTM)
import Control.Exception.Safe (Exception, try)
import Control.Monad.Conc.Class (atomically)
import Test.Tasty (TestTree, testGroup)
import Test.Tasty.DejaFu (testAuto)
import Test.Tasty.HUnit (testCase, (@?=))
import Troupe.Queue (Match (..), dequeue, enqueue, newQueue)

tests :: TestTree
tests =
testGroup
"Troupe.Queue"
[ testCase "Simple" $ do
q <- atomically newQueue
atomically $ enqueue q ()
r <- dequeue q [MatchMessage (\() -> pure True)]
r @?= True,
testCase "Less simple" $ do
q <- atomically newQueue
d <- registerDelay 1000
r <- dequeue q [MatchSTM (readTVar d >>= check)]
r @?= (),
testCase "Another one" $ do
q <- atomically newQueue
d <- registerDelay 100000
atomically $ enqueue q ()
r <- dequeue q [MatchMessage (\() -> pure True), MatchSTM (do readTVar d >>= check; pure False)]
r @?= True,
testCase "Another one" $ do
q <- atomically newQueue
d <- registerDelay 100000
atomically $ enqueue q ()
r <- dequeue q [MatchSTM (do readTVar d >>= check; pure False), MatchMessage (\() -> pure True)]
r @?= True,
testCase "Old ones" $ do
q <- atomically newQueue
atomically $ enqueue q True
atomically $ enqueue q False
dequeue q [MatchMessage (\b -> if not b then Just () else Nothing)]
dequeue q [MatchMessage (\b -> if b then Just () else Nothing)],
testCase "First one wins" $ do
q <- atomically newQueue
r <- dequeue q [MatchSTM (pure True), MatchSTM (pure False)]
r @?= True,
testCase "first one still wins" $ do
q <- atomically newQueue
atomically $ enqueue q True
r <-
dequeue
q
[ MatchMessage (\b -> if not b then Just (0 :: Int) else Nothing),
MatchMessage (\b -> if b then Just 1 else Nothing),
MatchMessage (\b -> if not b then Just 2 else Nothing),
MatchMessage (\b -> if b then Just 3 else Nothing)
]
r @?= 1,
testCase "a" $ do
q <- atomically newQueue
atomically $ enqueue q True
r <- dequeue q [MatchSTM (pure False), MatchMessage Just]
r @?= False
r' <- dequeue q [MatchMessage Just, MatchSTM (pure False)]
r' @?= True,
testAuto "What gives..." $ do
q <- atomically newQueue
atomically $ do
enqueue q (1 :: Int)
enqueue q 2
dequeue
q
[ MatchMessage (\i -> if i == 2 then Just i else Nothing),
MatchMessage (\i -> if i == 3 then Just i else Nothing),
MatchMessage (\i -> if i == 1 then Just i else Nothing)
],
testAuto "Another" $ do
q <- atomically newQueue
dequeue q [MatchMessage (\() -> pure True), MatchSTM (pure False)],
testAuto "F" $ do
q <- atomically newQueue
dequeue q [MatchSTM (pure (1 :: Int)), MatchSTM (pure 2), MatchSTM (pure 3)],
testAuto "W" $ do
q <- atomically $ do
q <- newQueue
enqueue q ()
pure q
dequeue q [MatchMessage (\() -> pure True), MatchSTM (pure False)],
testAuto "MT" $ do
q <- atomically newQueue
withAsync (threadDelay 100000 >> atomically (enqueue q (1 :: Int))) $ \_a ->
withAsync (threadDelay 1000 >> atomically (enqueue q 2)) $ \_b -> do
-- Note, we don't synchronise on `a` or `b`, so the `dequeue` returns
-- either 1 or 2
r <-
dequeue
q
[ MatchMessage (\i -> if i == 1 then Just i else Nothing),
MatchMessage (\i -> if i == 2 then Just i else Nothing)
]
pure $ r == 1 || r == 2,
testCase "throw" $ do
q <- atomically newQueue
atomically $ enqueue q (1 :: Int)

r <-
try $
dequeue
q
[ MatchMessage (\i -> if i == 0 then Just i else Nothing),
MatchSTM (throwSTM TestException),
MatchMessage Just
]
r @?= Left TestException

r' <- dequeue q [MatchMessage Just]
r' @?= 1
]

data TestException = TestException
deriving (Show, Eq)

instance Exception TestException
4 changes: 3 additions & 1 deletion troupe/test/troupe-test.hs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
module Main (main) where

import Test.Tasty (defaultMain, testGroup)
import qualified Troupe.Queue.Test as TQ
import qualified Troupe.Test as T

main :: IO ()
main =
defaultMain $
testGroup
"troupe-test"
[ T.tests
[ T.tests,
TQ.tests
]
13 changes: 11 additions & 2 deletions troupe/troupe.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ common warnings

library
import: warnings
exposed-modules: Troupe
exposed-modules:
Troupe
Troupe.Queue

other-modules:
Troupe.Exceptions
Troupe.Process
Expand All @@ -50,6 +53,7 @@ library
build-depends:
, async ^>=2.2.4
, base ^>=4.17.0.0 || ^>=4.18.0.0
, concurrency ^>=1.11.0.2
, deepseq ^>=1.4.8.0
, deferred-folds ^>=0.9.18.3
, hashable ^>=1.4.2.0
Expand Down Expand Up @@ -92,13 +96,18 @@ test-suite troupe-test
type: exitcode-stdio-1.0
hs-source-dirs: test
main-is: troupe-test.hs
other-modules: Troupe.Test
other-modules:
Troupe.Queue.Test
Troupe.Test

ghc-options: -rtsopts -threaded -with-rtsopts=-N2
build-depends:
, base ^>=4.17.0.0 || ^>=4.18.0.0
, concurrency ^>=1.11.0.2
, deepseq ^>=1.4.8.0
, safe-exceptions ^>=0.1.7.3
, tasty ^>=1.4.3
, tasty-dejafu ^>=2.1.0.0
, tasty-hunit ^>=0.10.0.3
, transformers ^>=0.5.6.2 || ^>=0.6.1.0
, troupe