diff --git a/.github/workflows/haskell-ci.yml b/.github/workflows/haskell-ci.yml index 3138c25..caea6f2 100644 --- a/.github/workflows/haskell-ci.yml +++ b/.github/workflows/haskell-ci.yml @@ -168,6 +168,9 @@ jobs: constraints: troupe +werror allow-newer: focus-1.0.3:transformers allow-newer: stm-hamt-1.2.0.9:transformers + allow-newer: concurrency-1.11.0.2:mtl + allow-newer: concurrency-1.11.0.2:transformers + allow-newer: dejafu-2.4.0.4:transformers EOF $HCPKG list --simple-output --names-only | perl -ne 'for (split /\s+/) { print "constraints: $_ installed\n" unless /^(troupe)$/; }' >> cabal.project.local cat cabal.project diff --git a/cabal.project b/cabal.project index 7ccc2f6..d32ef8b 100644 --- a/cabal.project +++ b/cabal.project @@ -4,6 +4,9 @@ Packages: Allow-Newer: focus-1.0.3:transformers, stm-hamt-1.2.0.9:transformers, + concurrency-1.11.0.2:mtl, + concurrency-1.11.0.2:transformers, + dejafu-2.4.0.4:transformers, Constraints: troupe +werror, diff --git a/troupe/src/Troupe/Queue.hs b/troupe/src/Troupe/Queue.hs new file mode 100644 index 0000000..a3cd3f4 --- /dev/null +++ b/troupe/src/Troupe/Queue.hs @@ -0,0 +1,92 @@ +{-# LANGUAGE DeriveFunctor #-} +{-# LANGUAGE LambdaCase #-} + +module Troupe.Queue + ( Queue, + newQueue, + enqueue, + Match (..), + dequeue, + ) +where + +import Control.Concurrent.Classy.STM + ( MonadSTM, + TQueue, + TVar, + flushTQueue, + modifyTVar', + newTQueue, + newTVar, + orElse, + readTVar, + retry, + tryReadTQueue, + unGetTQueue, + writeTQueue, + writeTVar, + ) +import qualified Control.Concurrent.STM as CCS +import Control.Monad.Conc.Class (MonadConc, STM, atomically) + +data Queue stm a = Queue + { queueTQueue :: {-# UNPACK #-} !(TQueue stm a), + queueMessages :: !(TVar stm [a]) + } + +newQueue :: (MonadSTM stm) => stm (Queue stm a) +newQueue = + Queue + <$> newTQueue + <*> newTVar [] +{-# SPECIALIZE newQueue :: CCS.STM (Queue CCS.STM a) #-} + +enqueue :: (MonadSTM stm) => Queue stm a -> a -> stm () +enqueue queue = writeTQueue (queueTQueue queue) +{-# INLINE enqueue #-} +{-# SPECIALIZE enqueue :: Queue CCS.STM a -> a -> CCS.STM () #-} + +data Match stm a b + = MatchMessage (a -> Maybe b) + | MatchSTM (stm b) + deriving (Functor) + +dequeue :: (MonadConc m) => Queue (STM m) a -> [Match (STM m) a b] -> m b +dequeue queue matches = do + atomically getMessages + atomically $ + handleExistingMessages >>= \case + Nothing -> handleNewMessages + Just b -> pure b + where + getMessages = do + newMessages <- flushTQueue (queueTQueue queue) + modifyTVar' (queueMessages queue) (++ newMessages) + handleExistingMessages = do + messages <- readTVar (queueMessages queue) + foldr orElse (pure Nothing) $ flip map matches $ \case + MatchMessage fn -> findMessage fn [] messages + MatchSTM stm -> fmap Just stm + findMessage fn acc = \case + [] -> retry + (x : xs) -> case fn x of + Nothing -> findMessage fn (x : acc) xs + Just a -> do + writeTVar (queueMessages queue) (reverse acc ++ xs) + pure (Just a) + handleNewMessages = foldr orElse (storeMessage >> retry) $ flip map matches $ \case + MatchMessage fn -> + tryReadTQueue (queueTQueue queue) >>= \case + Nothing -> retry + Just msg -> case fn msg of + Nothing -> do + unGetTQueue (queueTQueue queue) msg + retry + Just a -> pure a + MatchSTM stm -> stm + storeMessage = do + mmsg <- tryReadTQueue (queueTQueue queue) + case mmsg of + Nothing -> pure () + Just msg -> modifyTVar' (queueMessages queue) (\old -> old ++ [msg]) +{-# SPECIALIZE dequeue :: Queue CCS.STM a -> [Match CCS.STM a b] -> IO b #-} diff --git a/troupe/test/Troupe/Queue/Test.hs b/troupe/test/Troupe/Queue/Test.hs new file mode 100644 index 0000000..4a72387 --- /dev/null +++ b/troupe/test/Troupe/Queue/Test.hs @@ -0,0 +1,125 @@ +module Troupe.Queue.Test (tests) where + +import Control.Concurrent.Classy (threadDelay) +import Control.Concurrent.Classy.Async (withAsync) +import Control.Concurrent.Classy.STM (check, readTVar, registerDelay, throwSTM) +import Control.Exception.Safe (Exception, try) +import Control.Monad.Conc.Class (atomically) +import Test.Tasty (TestTree, testGroup) +import Test.Tasty.DejaFu (testAuto) +import Test.Tasty.HUnit (testCase, (@?=)) +import Troupe.Queue (Match (..), dequeue, enqueue, newQueue) + +tests :: TestTree +tests = + testGroup + "Troupe.Queue" + [ testCase "Simple" $ do + q <- atomically newQueue + atomically $ enqueue q () + r <- dequeue q [MatchMessage (\() -> pure True)] + r @?= True, + testCase "Less simple" $ do + q <- atomically newQueue + d <- registerDelay 1000 + r <- dequeue q [MatchSTM (readTVar d >>= check)] + r @?= (), + testCase "Another one" $ do + q <- atomically newQueue + d <- registerDelay 100000 + atomically $ enqueue q () + r <- dequeue q [MatchMessage (\() -> pure True), MatchSTM (do readTVar d >>= check; pure False)] + r @?= True, + testCase "Another one" $ do + q <- atomically newQueue + d <- registerDelay 100000 + atomically $ enqueue q () + r <- dequeue q [MatchSTM (do readTVar d >>= check; pure False), MatchMessage (\() -> pure True)] + r @?= True, + testCase "Old ones" $ do + q <- atomically newQueue + atomically $ enqueue q True + atomically $ enqueue q False + dequeue q [MatchMessage (\b -> if not b then Just () else Nothing)] + dequeue q [MatchMessage (\b -> if b then Just () else Nothing)], + testCase "First one wins" $ do + q <- atomically newQueue + r <- dequeue q [MatchSTM (pure True), MatchSTM (pure False)] + r @?= True, + testCase "first one still wins" $ do + q <- atomically newQueue + atomically $ enqueue q True + r <- + dequeue + q + [ MatchMessage (\b -> if not b then Just (0 :: Int) else Nothing), + MatchMessage (\b -> if b then Just 1 else Nothing), + MatchMessage (\b -> if not b then Just 2 else Nothing), + MatchMessage (\b -> if b then Just 3 else Nothing) + ] + r @?= 1, + testCase "a" $ do + q <- atomically newQueue + atomically $ enqueue q True + r <- dequeue q [MatchSTM (pure False), MatchMessage Just] + r @?= False + r' <- dequeue q [MatchMessage Just, MatchSTM (pure False)] + r' @?= True, + testAuto "What gives..." $ do + q <- atomically newQueue + atomically $ do + enqueue q (1 :: Int) + enqueue q 2 + dequeue + q + [ MatchMessage (\i -> if i == 2 then Just i else Nothing), + MatchMessage (\i -> if i == 3 then Just i else Nothing), + MatchMessage (\i -> if i == 1 then Just i else Nothing) + ], + testAuto "Another" $ do + q <- atomically newQueue + dequeue q [MatchMessage (\() -> pure True), MatchSTM (pure False)], + testAuto "F" $ do + q <- atomically newQueue + dequeue q [MatchSTM (pure (1 :: Int)), MatchSTM (pure 2), MatchSTM (pure 3)], + testAuto "W" $ do + q <- atomically $ do + q <- newQueue + enqueue q () + pure q + dequeue q [MatchMessage (\() -> pure True), MatchSTM (pure False)], + testAuto "MT" $ do + q <- atomically newQueue + withAsync (threadDelay 100000 >> atomically (enqueue q (1 :: Int))) $ \_a -> + withAsync (threadDelay 1000 >> atomically (enqueue q 2)) $ \_b -> do + -- Note, we don't synchronise on `a` or `b`, so the `dequeue` returns + -- either 1 or 2 + r <- + dequeue + q + [ MatchMessage (\i -> if i == 1 then Just i else Nothing), + MatchMessage (\i -> if i == 2 then Just i else Nothing) + ] + pure $ r == 1 || r == 2, + testCase "throw" $ do + q <- atomically newQueue + atomically $ enqueue q (1 :: Int) + + r <- + try $ + dequeue + q + [ MatchMessage (\i -> if i == 0 then Just i else Nothing), + MatchSTM (throwSTM TestException), + MatchMessage Just + ] + r @?= Left TestException + + r' <- dequeue q [MatchMessage Just] + r' @?= 1 + ] + +data TestException = TestException + deriving (Show, Eq) + +instance Exception TestException diff --git a/troupe/test/troupe-test.hs b/troupe/test/troupe-test.hs index 1bd4d60..da12271 100644 --- a/troupe/test/troupe-test.hs +++ b/troupe/test/troupe-test.hs @@ -1,6 +1,7 @@ module Main (main) where import Test.Tasty (defaultMain, testGroup) +import qualified Troupe.Queue.Test as TQ import qualified Troupe.Test as T main :: IO () @@ -8,5 +9,6 @@ main = defaultMain $ testGroup "troupe-test" - [ T.tests + [ T.tests, + TQ.tests ] diff --git a/troupe/troupe.cabal b/troupe/troupe.cabal index 1049d1e..1fbb10f 100644 --- a/troupe/troupe.cabal +++ b/troupe/troupe.cabal @@ -41,7 +41,10 @@ common warnings library import: warnings - exposed-modules: Troupe + exposed-modules: + Troupe + Troupe.Queue + other-modules: Troupe.Exceptions Troupe.Process @@ -50,6 +53,7 @@ library build-depends: , async ^>=2.2.4 , base ^>=4.17.0.0 || ^>=4.18.0.0 + , concurrency ^>=1.11.0.2 , deepseq ^>=1.4.8.0 , deferred-folds ^>=0.9.18.3 , hashable ^>=1.4.2.0 @@ -92,13 +96,18 @@ test-suite troupe-test type: exitcode-stdio-1.0 hs-source-dirs: test main-is: troupe-test.hs - other-modules: Troupe.Test + other-modules: + Troupe.Queue.Test + Troupe.Test + ghc-options: -rtsopts -threaded -with-rtsopts=-N2 build-depends: , base ^>=4.17.0.0 || ^>=4.18.0.0 + , concurrency ^>=1.11.0.2 , deepseq ^>=1.4.8.0 , safe-exceptions ^>=0.1.7.3 , tasty ^>=1.4.3 + , tasty-dejafu ^>=2.1.0.0 , tasty-hunit ^>=0.10.0.3 , transformers ^>=0.5.6.2 || ^>=0.6.1.0 , troupe