zmq: hoist new nodes to current revision
authorMax Voit <max.voit+gtdv@with-eyes.net>
Sat, 30 May 2015 15:25:02 +0000 (17:25 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Sat, 30 May 2015 15:25:02 +0000 (17:25 +0200)
zmq-concept/master.hs
zmq-concept/slave.hs

index 6b5dcfc..b1d8977 100644 (file)
@@ -1,10 +1,13 @@
 {-# LANGUAGE OverloadedStrings #-}
 import Control.Monad
+import Control.Monad.IO.Class (MonadIO)
 import System.IO
 import System.ZMQ4.Monadic
 import qualified Data.ByteString.Char8 as CS
+import qualified Data.Map.Strict as Map
 import Control.Concurrent (forkIO, threadDelay)
-import Data.IORef (newIORef, modifyIORef, readIORef)
+import Data.IORef (newIORef, modifyIORef, modifyIORef', readIORef, IORef)
+import Data.Maybe (fromMaybe)
 
 
 addr :: String
@@ -12,29 +15,50 @@ addr = "tcp://127.0.0.1:5000"
 
 main :: IO ()
 main = do
-    curRev <- newIORef 0
+    -- current revision and updates
+    curRev <- newIORef 1
     forkIO $ forever $ do
         threadDelay $ 500 * 1000
-        modifyIORef curRev (+1)
+        modifyIORef' curRev (+1)
+    -- worker for distributing updates
+    nodesRev <- newIORef Map.empty
     runZMQ $ do
         sock <- socket Router
         bind sock addr
+        -- now loop and:
+        --      o update nodes not on current revision
+        --      o receive node responses and update revision list 
         forever $ do
+            -- update revision list
             ident <- receive sock
             _ <- receive sock
             msg <- receive sock
             cr <- liftIO $ readIORef curRev
             case CS.head msg of
-                'S' -> sendUpdate sock ident 0
-                'D' -> sendUpdate sock ident cr
+                'S' -> addNode nodesRev ident 
+                'D' -> incRevNode nodesRev ident (msgToRev msg)
+                _ -> error $ "unknown message: " ++ show msg
             liftIO $ CS.putStrLn $ CS.append (formatID ident) msg
             liftIO $ hFlush stdout
+            -- distribute update
+            nrs <- liftIO $ readIORef nodesRev 
+            forM_ (Map.keys nrs) $ \i -> do
+                let rev = Map.findWithDefault 0 i nrs
+                when (rev < cr) $ sendUpdate sock i (rev + 1)
         return ()
     where formatID i = CS.cons '[' $ CS.append i "] "
 
-sendUpdate sock id num = do
-    send sock [SendMore] id
+addNode :: (MonadIO m) => IORef (Map.Map CS.ByteString Int) -> CS.ByteString -> m ()
+addNode ns i = liftIO $ modifyIORef ns (Map.insert i 0)
+
+incRevNode :: (MonadIO m) => IORef (Map.Map CS.ByteString Int) -> CS.ByteString -> Int -> m ()
+incRevNode ns i r = liftIO $ modifyIORef ns (Map.adjust (+1) i)
+
+sendUpdate :: Sender t => Socket z t -> CS.ByteString -> Int -> ZMQ z ()
+sendUpdate sock ident num = do
+    send sock [SendMore] ident
     send sock [SendMore] ""
     send sock [] $ CS.cons 'U' (CS.pack (show (num :: Int)))
 
-
+msgToRev :: CS.ByteString -> Int
+msgToRev m = fst $ fromMaybe (0,"") (CS.readInt $ CS.tail m)
index 97468a9..8ed1a93 100644 (file)
@@ -4,12 +4,15 @@ import Control.Concurrent (threadDelay)
 import System.IO
 import System.ZMQ4.Monadic
 import qualified Data.ByteString.Char8 as CS
+import Data.IORef (newIORef, modifyIORef, readIORef)
+import Data.Maybe (fromMaybe)
 
 addr :: String
 addr = "tcp://127.0.0.1:5000"
 
 main :: IO ()
 main = do
+    myRev <- newIORef 0
     runZMQ $ do
         sock <- socket Req
         connect sock addr
@@ -17,9 +20,20 @@ main = do
         forever $ do
             liftIO $ threadDelay 100000
             msg <- receive sock 
+            mcr <- liftIO $ readIORef myRev
             case CS.head msg of
                 'U' -> do
-                    send sock [] $ CS.append "Done: " $ msg
-                    liftIO . CS.putStrLn $ CS.append "D" $ CS.tail msg
-                    liftIO $ hFlush stdout
+                    let nr = msgToRev msg
+                    if nr == mcr + 1 then do
+                        send sock [] $ CS.cons 'D' $ CS.tail msg
+                        liftIO $ modifyIORef myRev (+1)
+                        liftIO . CS.putStrLn $ CS.append "D" $ CS.tail msg
+                        liftIO $ hFlush stdout
+                    else
+                        error $ "E: invalid revision increment " ++ show mcr ++ " -> " ++ show nr
+                        -- when coordinator keeps track anyway, this
+                        -- shouldn't happen - but is it sensible?
                 _ -> return ()
+
+msgToRev m = fst $ fromMaybe (0,"") (CS.readInt $ CS.tail m)
+