main = do
-- current revision and updates
curRev <- newIORef 1
+ nodesRev <- newIORef Map.empty
forkIO $ forever $ do
threadDelay $ 500 * 1000
modifyIORef' curRev (+1)
-- worker for distributing updates
- nodesRev <- newIORef Map.empty
runZMQ $ do
sock <- socket Router
bind sock addr
-- o receive node responses and update revision list
forever $ do
-- update revision list
- ident <- receive sock
- _ <- receive sock
- msg <- receive sock
- cr <- liftIO $ readIORef curRev
- case CS.head msg of
- 'S' -> addNode nodesRev ident
- 'D' -> incRevNode nodesRev ident (msgToRev msg)
- _ -> error $ "unknown message: " ++ show msg
- liftIO $ CS.putStrLn $ CS.append (formatID ident) msg
- liftIO $ hFlush stdout
+ re <- poll 100 [Sock sock [In] Nothing]
+ unless (null $ head re) $ do
+ ident <- receive sock
+ _ <- receive sock
+ msg <- receive sock
+ case CS.head msg of
+ 'S' -> addNode nodesRev ident
+ 'D' -> incRevNode nodesRev ident (msgToRev msg)
+ _ -> error $ "unknown message: " ++ show msg
+ liftIO $ CS.putStrLn $ CS.append (formatID ident) msg
+ liftIO $ hFlush stdout
-- distribute update
+ cr <- liftIO $ readIORef curRev
nrs <- liftIO $ readIORef nodesRev
forM_ (Map.keys nrs) $ \i -> do
let rev = Map.findWithDefault 0 i nrs
when (rev < cr) $ sendUpdate sock i (rev + 1)
+ --liftIO $ print $ "current " ++ show nrs
+ --liftIO $ print $ "currentr " ++ show cr
return ()
where formatID i = CS.cons '[' $ CS.append i "] "
addNode ns i = liftIO $ modifyIORef ns (Map.insert i 0)
incRevNode :: (MonadIO m) => IORef (Map.Map CS.ByteString Int) -> CS.ByteString -> Int -> m ()
-incRevNode ns i r = liftIO $ modifyIORef ns (Map.adjust (+1) i)
+incRevNode ns i r = liftIO $ modifyIORef ns (Map.adjust (const r) i)
sendUpdate :: Sender t => Socket z t -> CS.ByteString -> Int -> ZMQ z ()
sendUpdate sock ident num = do
case CS.head msg of
'U' -> do
let nr = msgToRev msg
- if nr == mcr + 1 then do
+ if (nr == mcr + 1) || (nr == mcr) then do
send sock [] $ CS.cons 'D' $ CS.tail msg
- liftIO $ modifyIORef myRev (+1)
- liftIO . CS.putStrLn $ CS.append "D" $ CS.tail msg
- liftIO $ hFlush stdout
+ if nr == mcr then
+ liftIO $ print "W: ignoring increment which is none"
+ else do
+ liftIO $ modifyIORef myRev (+1)
+ liftIO . CS.putStrLn $ CS.append "D" $ CS.tail msg
+ liftIO $ hFlush stdout
else
error $ "E: invalid revision increment " ++ show mcr ++ " -> " ++ show nr
-- when coordinator keeps track anyway, this