zmq: concept without polling, blocking join
authorMax Voit <max.voit+gtdv@with-eyes.net>
Sat, 13 Jun 2015 11:36:03 +0000 (13:36 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Sat, 13 Jun 2015 11:36:03 +0000 (13:36 +0200)
zmq-concept/master.hs
zmq-concept/slave.hs

index fc22b8d..aa4bcce 100644 (file)
@@ -2,12 +2,13 @@
 import Control.Monad
 import Control.Monad.IO.Class (MonadIO)
 import System.IO
-import System.ZMQ4.Monadic
+import System.ZMQ4
 import qualified Data.ByteString.Char8 as CS
 import qualified Data.Map.Strict as Map
 import Control.Concurrent (forkIO, threadDelay)
 import Data.IORef (newIORef, modifyIORef, modifyIORef', readIORef, IORef)
 import Data.Maybe (fromMaybe)
+import Control.Concurrent.MVar
 
 
 addr :: String
@@ -16,49 +17,63 @@ addr = "tcp://127.0.0.1:5000"
 main :: IO ()
 main = do
     -- current revision and updates
-    curRev <- newIORef 1
+    curRev <- newMVar 1
     nodesRev <- newIORef Map.empty
+    -- no monadic zmq, share socket
+    ctx <- context
+    sock <- socket ctx Router
+    bind sock addr
+
+    -- /random/ updates
     forkIO $ forever $ do
         threadDelay $ 500 * 1000
-        modifyIORef' curRev (+1)
+        modifyMVar_ curRev $ \cr -> do
+            let crn = cr + 1
+            -- send update to all nodes uptodate
+            ns <- readIORef nodesRev
+            let nodesUpToDate = Map.keys $ Map.filter (== cr) ns
+            forM_ nodesUpToDate $ \i -> sendUpdate sock i crn
+            return crn
+
     -- worker for distributing updates
-    runZMQ $ do
-        sock <- socket Router
-        bind sock addr
-        -- now loop and:
-        --      o update nodes not on current revision
-        --      o receive node responses and update revision list 
-        forever $ do
-            -- update revision list
-            re <- poll 100 [Sock sock [In] Nothing]
-            unless (null $ head re) $ do
-                ident <- receive sock
-                _ <- receive sock
-                msg <- receive sock
-                case CS.head msg of
-                    'S' -> addNode nodesRev ident 
-                    'D' -> incRevNode nodesRev ident (msgToRev msg)
-                    _ -> error $ "unknown message: " ++ show msg
-                liftIO $ CS.putStrLn $ CS.append (formatID ident) msg
-                liftIO $ hFlush stdout
-            -- distribute update
-            cr <- liftIO $ readIORef curRev
-            nrs <- liftIO $ readIORef nodesRev 
-            forM_ (Map.keys nrs) $ \i -> do
-                let rev = Map.findWithDefault 0 i nrs
-                when (rev < cr) $ sendUpdate sock i (rev + 1)
-            --liftIO $ print $ "current " ++ show nrs 
-            --liftIO $ print $ "currentr " ++ show cr
-        return ()
-    where formatID i = CS.cons '[' $ CS.append i "] "
+    -- now loop and:
+    --      o update nodes not on current revision
+    --      o receive node responses and update revision list 
+    forever $ do
+        -- update revision list
+        (ident, msg) <- receiveReply sock
+        case CS.head msg of
+            'S' -> withMVar curRev $ \cr -> do 
+                   forM_ [0..cr] $ \rev -> do
+                              sendUpdate sock ident rev
+                              -- now also wait for it updating
+                              (_, msgnr) <- receiveReply sock
+                              when (msgToRev msgnr /= rev) $ error "revision update for new node failed"
+                   addNode nodesRev ident cr
+            'D' -> incRevNode nodesRev ident (msgToRev msg)
+            _ -> error $ "unknown message: " ++ show msg
+        CS.putStrLn $ CS.append (formatID ident) msg
+        hFlush stdout
+    -- cleanup zmq stuff
+    unbind sock addr
+    term ctx
+
+formatID i = CS.cons '[' $ CS.append i "] "
 
-addNode :: (MonadIO m) => IORef (Map.Map CS.ByteString Int) -> CS.ByteString -> m ()
-addNode ns i = liftIO $ modifyIORef ns (Map.insert i 0)
+addNode :: IORef (Map.Map CS.ByteString Int) -> CS.ByteString -> Int -> IO ()
+addNode ns i rev = modifyIORef ns (Map.insert i rev)
 
-incRevNode :: (MonadIO m) => IORef (Map.Map CS.ByteString Int) -> CS.ByteString -> Int -> m ()
-incRevNode ns i r = liftIO $ modifyIORef ns (Map.adjust (const r) i)
+incRevNode :: IORef (Map.Map CS.ByteString Int) -> CS.ByteString -> Int -> IO ()
+incRevNode ns i r = modifyIORef ns (Map.adjust (const r) i)
 
-sendUpdate :: Sender t => Socket z t -> CS.ByteString -> Int -> ZMQ z ()
+receiveReply :: Receiver t => Socket t -> IO (CS.ByteString, CS.ByteString)
+receiveReply sock = do
+    ident <- receive sock 
+    _ <- receive sock 
+    msg <- receive sock 
+    return (ident, msg)
+
+sendUpdate :: Sender t => Socket t -> CS.ByteString -> Int -> IO ()
 sendUpdate sock ident num = do
     send sock [SendMore] ident
     send sock [SendMore] ""
@@ -66,3 +81,4 @@ sendUpdate sock ident num = do
 
 msgToRev :: CS.ByteString -> Int
 msgToRev m = fst $ fromMaybe (0,"") (CS.readInt $ CS.tail m)
+
index fd45e65..d83b38d 100644 (file)
@@ -18,7 +18,7 @@ main = do
         connect sock addr
         send sock [] "S"
         forever $ do
-            liftIO $ threadDelay 100000
+            -- liftIO $ threadDelay 100000
             msg <- receive sock 
             mcr <- liftIO $ readIORef myRev
             case CS.head msg of