import Control.Monad (when, forM_)
import Control.Concurrent (forkIO,threadDelay)
-import Control.Concurrent.MVar
+import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, takeMVar, readMVar)
import System.Exit (exitSuccess, exitFailure)
import System.Directory (doesDirectoryExist, removeDirectoryRecursive)
import System.Random (mkStdGen, randomRs)
randRange = (100,100000)
numRands :: Int
-numRands = 50
+numRands = 100
-slave :: Int -> MVar [Int] -> MVar () -> IO ()
-slave id res done = do
- let rs = randomRs randRange $ mkStdGen id :: [Int]
- acid <- enslaveStateFrom ("state/OrderingRandom/s" ++ show id) "localhost" 3333 (NcState [])
+slave :: Int -> MVar [Int] -> MVar () -> MVar () -> IO ()
+slave ident res done alldone = do
+ let rs = randomRs randRange $ mkStdGen ident :: [Int]
+ acid <- enslaveStateFrom ("state/OrderingRandom/s" ++ show ident) "localhost" 3333 (NcState [])
forM_ (take numRands rs) $ \r -> do
threadDelay r
update acid $ NcOpState r
putMVar done ()
-- wait for others
- _ <- takeMVar done
- delaySec 1
+ _ <- readMVar alldone
+ delaySec ident
val <- query acid GetState
putMVar res val
+ print $ "slave quit ident " ++ show ident
closeAcidState acid
main :: IO ()
main = do
cleanup "state/OrderingRandom"
acid <- openMasterStateFrom "state/OrderingRandom/m" "127.0.0.1" 3333 (NcState [])
+ allDone <- newEmptyMVar
-- start slaves
s1Res <- newEmptyMVar
s1Done <- newEmptyMVar
- s1Tid <- forkIO $ slave 1 s1Res s1Done
+ s1Tid <- forkIO $ slave 1 s1Res s1Done allDone
threadDelay 1000 -- zmq-indentity could be the same if too fast
s2Res <- newEmptyMVar
s2Done <- newEmptyMVar
- s2Tid <- forkIO $ slave 2 s2Res s2Done
+ s2Tid <- forkIO $ slave 2 s2Res s2Done allDone
-- manipulate state on master
let rs = randomRs randRange $ mkStdGen 23 :: [Int]
forM_ (take numRands rs) $ \r -> do
threadDelay r
update acid $ NcOpState r
-- wait for slaves
+ print "at wait"
_ <- takeMVar s1Done
_ <- takeMVar s2Done
-- signal slaves done
- putMVar s1Done ()
- putMVar s2Done ()
+ print "at done"
+ putMVar allDone ()
-- collect results
+ print "at collect"
vs1 <- takeMVar s1Res
vs2 <- takeMVar s2Res
vm <- query acid GetState
-- check results
+ print "at results"
+ when (vs1 /= vs2) exitFailure
when (vm /= vs1) exitFailure
when (vm /= vs2) exitFailure
closeAcidState acid
import Data.Acid
import Data.Acid.Centered
-import Control.Monad (when)
-import Control.Concurrent (threadDelay)
+import Control.Monad (void, when)
+import Control.Concurrent (forkIO, threadDelay)
import System.Exit (exitSuccess, exitFailure)
import System.Directory (doesDirectoryExist, removeDirectoryRecursive)
cleanup "state/SlaveUpdates"
acid <- openMasterStateFrom "state/SlaveUpdates/m" "127.0.0.1" 3333 (IntState 0)
- slave1
+ void $ forkIO slave1
slave2
val <- query acid GetState
closeAcidState acid