zmq: polling solves blocking
authorMax Voit <max.voit+gtdv@with-eyes.net>
Wed, 10 Jun 2015 15:54:42 +0000 (17:54 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Wed, 10 Jun 2015 15:54:42 +0000 (17:54 +0200)
zmq-concept/master.hs
zmq-concept/slave.hs

index b1d8977..fc22b8d 100644 (file)
@@ -17,11 +17,11 @@ main :: IO ()
 main = do
     -- current revision and updates
     curRev <- newIORef 1
+    nodesRev <- newIORef Map.empty
     forkIO $ forever $ do
         threadDelay $ 500 * 1000
         modifyIORef' curRev (+1)
     -- worker for distributing updates
-    nodesRev <- newIORef Map.empty
     runZMQ $ do
         sock <- socket Router
         bind sock addr
@@ -30,21 +30,25 @@ main = do
         --      o receive node responses and update revision list 
         forever $ do
             -- update revision list
-            ident <- receive sock
-            _ <- receive sock
-            msg <- receive sock
-            cr <- liftIO $ readIORef curRev
-            case CS.head msg of
-                'S' -> addNode nodesRev ident 
-                'D' -> incRevNode nodesRev ident (msgToRev msg)
-                _ -> error $ "unknown message: " ++ show msg
-            liftIO $ CS.putStrLn $ CS.append (formatID ident) msg
-            liftIO $ hFlush stdout
+            re <- poll 100 [Sock sock [In] Nothing]
+            unless (null $ head re) $ do
+                ident <- receive sock
+                _ <- receive sock
+                msg <- receive sock
+                case CS.head msg of
+                    'S' -> addNode nodesRev ident 
+                    'D' -> incRevNode nodesRev ident (msgToRev msg)
+                    _ -> error $ "unknown message: " ++ show msg
+                liftIO $ CS.putStrLn $ CS.append (formatID ident) msg
+                liftIO $ hFlush stdout
             -- distribute update
+            cr <- liftIO $ readIORef curRev
             nrs <- liftIO $ readIORef nodesRev 
             forM_ (Map.keys nrs) $ \i -> do
                 let rev = Map.findWithDefault 0 i nrs
                 when (rev < cr) $ sendUpdate sock i (rev + 1)
+            --liftIO $ print $ "current " ++ show nrs 
+            --liftIO $ print $ "currentr " ++ show cr
         return ()
     where formatID i = CS.cons '[' $ CS.append i "] "
 
@@ -52,7 +56,7 @@ addNode :: (MonadIO m) => IORef (Map.Map CS.ByteString Int) -> CS.ByteString ->
 addNode ns i = liftIO $ modifyIORef ns (Map.insert i 0)
 
 incRevNode :: (MonadIO m) => IORef (Map.Map CS.ByteString Int) -> CS.ByteString -> Int -> m ()
-incRevNode ns i r = liftIO $ modifyIORef ns (Map.adjust (+1) i)
+incRevNode ns i r = liftIO $ modifyIORef ns (Map.adjust (const r) i)
 
 sendUpdate :: Sender t => Socket z t -> CS.ByteString -> Int -> ZMQ z ()
 sendUpdate sock ident num = do
index 8ed1a93..fd45e65 100644 (file)
@@ -24,11 +24,14 @@ main = do
             case CS.head msg of
                 'U' -> do
                     let nr = msgToRev msg
-                    if nr == mcr + 1 then do
+                    if (nr == mcr + 1) || (nr == mcr) then do
                         send sock [] $ CS.cons 'D' $ CS.tail msg
-                        liftIO $ modifyIORef myRev (+1)
-                        liftIO . CS.putStrLn $ CS.append "D" $ CS.tail msg
-                        liftIO $ hFlush stdout
+                        if nr == mcr then
+                            liftIO $ print "W: ignoring increment which is none"
+                        else do
+                            liftIO $ modifyIORef myRev (+1)
+                            liftIO . CS.putStrLn $ CS.append "D" $ CS.tail msg
+                            liftIO $ hFlush stdout
                     else
                         error $ "E: invalid revision increment " ++ show mcr ++ " -> " ++ show nr
                         -- when coordinator keeps track anyway, this