fix for strange zmq behaviour: use polling
authorMax Voit <max.voit+gtdv@with-eyes.net>
Sat, 27 Jun 2015 17:04:56 +0000 (19:04 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Sat, 27 Jun 2015 17:04:56 +0000 (19:04 +0200)
src/Data/Acid/Centered/Master.hs
src/Data/Acid/Centered/Slave.hs

index e39f2b5..b29eff9 100644 (file)
@@ -52,6 +52,7 @@ import System.ZMQ4 (Context, Socket, Router(..), Receiver, Flag(..),
                     setReceiveHighWM, setSendHighWM, restrict,
                     context, term, socket, close, 
                     bind, unbind,
+                    poll, Poll(..), Event(..),
                     waitRead,
                     sendMulti, receiveMulti)
 
@@ -92,27 +93,28 @@ type ReplicationItem = (Tagged CSL.ByteString, Either Callback (RequestID, NodeI
 masterRequestHandler :: (Typeable st) => MasterState st -> IO ()
 masterRequestHandler masterState@MasterState{..} = forever $ do
         -- take one frame
-        debug "Waiting for new frame."
-        waitRead =<< readMVar zmqSocket
-        (ident, msg) <- withMVar zmqSocket receiveFrame
-        debug "Got frame."
-        -- handle according frame contents
-        case msg of
-            -- New Slave joined.
-            NewSlave r -> do
-                pastUpdates <- getPastUpdates localState r
-                connectNode masterState ident pastUpdates
-            -- Slave is done replicating.
-            RepDone r -> return () -- updateNodeStatus masterState ident r
-            -- Slave sends an Udate.
-            ReqUpdate rid event ->
-                queueUpdate masterState (event, Right (rid, ident))
-            -- Slave quits.
-            SlaveQuit -> removeFromNodeStatus nodeStatus ident
-            -- no other messages possible
-            _ -> error $ "Unknown message received: " ++ show msg
-        -- loop around
-        debug "Loop iteration."
+        -- waitRead =<< readMVar zmqSocket
+        -- FIXME: we needn't poll if not for strange zmq behaviour
+        re <- withMVar zmqSocket $ \sock -> poll 100 [Sock sock [In] Nothing]
+        unless (null $ head re) $ do
+            (ident, msg) <- withMVar zmqSocket receiveFrame
+            -- handle according frame contents
+            case msg of
+                -- New Slave joined.
+                NewSlave r -> do
+                    pastUpdates <- getPastUpdates localState r
+                    connectNode masterState ident pastUpdates
+                -- Slave is done replicating.
+                RepDone r -> return () -- updateNodeStatus masterState ident r
+                -- Slave sends an Udate.
+                ReqUpdate rid event ->
+                    queueUpdate masterState (event, Right (rid, ident))
+                -- Slave quits.
+                SlaveQuit -> removeFromNodeStatus nodeStatus ident
+                -- no other messages possible
+                _ -> error $ "Unknown message received: " ++ show msg
+            -- loop around
+            debug "Loop iteration."
 
 -- | Fetch past Updates from FileLog for replication.
 getPastUpdates :: (Typeable st) => AcidState st -> Int -> IO [(Int, Tagged CSL.ByteString)]
index fa57b29..ccde48a 100644 (file)
@@ -53,6 +53,7 @@ import Data.Acid.Centered.Common
 import System.ZMQ4 (Context, Socket, Dealer(..), Receiver, Flag(..),
                     setReceiveHighWM, setSendHighWM, restrict,
                     waitRead,
+                    poll, Poll(..), Event(..),
                     context, term, socket, close, 
                     connect, disconnect,
                     send, receive)
@@ -67,7 +68,6 @@ import Control.Monad (forever, void,
                      )
 import Control.Monad.STM (atomically)
 import Control.Concurrent.STM.TVar (readTVar)
-import Control.Concurrent.Event (Event)
 import qualified Control.Concurrent.Event as Event
 import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
 
@@ -84,7 +84,7 @@ import qualified Data.ByteString.Lazy.Char8 as CSL
 data SlaveState st 
     = SlaveState { slaveLocalState :: AcidState st
                  , slaveRepChan :: Chan SlaveRepItem
-                 , slaveSyncDone :: Event
+                 , slaveSyncDone :: Event.Event
                  , slaveRevision :: MVar NodeRevision
                  , slaveRequests :: MVar SlaveRequests
                  , slaveLastRequestID :: MVar RequestID
@@ -143,23 +143,26 @@ enslaveState address port initialState = do
 -- | Replication handler of the Slave. 
 slaveRequestHandler :: SlaveState st -> IO ()
 slaveRequestHandler slaveState@SlaveState{..} = forever $ do
-        waitRead =<< readMVar slaveZmqSocket
-        msg <- withMVar slaveZmqSocket receive
-        case decode msg of
-            Left str -> error $ "Data.Serialize.decode failed on MasterMessage: " ++ show msg
-            Right mmsg -> do
-                 debug $ "Received " ++ show mmsg
-                 case mmsg of
-                    -- We are sent an Update to replicate.
-                    DoRep r i d -> queueUpdate slaveState (r, i, d)
-                    -- We are sent an Update to replicate for synchronization.
-                    DoSyncRep r d -> replicateSyncUpdate slaveState r d 
-                    -- Master done sending all synchronization Updates.
-                    SyncDone -> debug "Sync Done." >> Event.set slaveSyncDone
-                    -- We are requested to Quit.
-                    MasterQuit -> undefined -- todo: how get a State that wasn't closed closed?
-                    -- no other messages possible
-                    _ -> error $ "Unknown message received: " ++ show mmsg
+        --waitRead =<< readMVar slaveZmqSocket
+        -- FIXME: we needn't poll if not for strange zmq behaviour
+        re <- withMVar slaveZmqSocket $ \sock -> poll 100 [Sock sock [In] Nothing]
+        unless (null $ head re) $ do
+            msg <- withMVar slaveZmqSocket receive
+            case decode msg of
+                Left str -> error $ "Data.Serialize.decode failed on MasterMessage: " ++ show msg
+                Right mmsg -> do
+                     debug $ "Received " ++ show mmsg
+                     case mmsg of
+                        -- We are sent an Update to replicate.
+                        DoRep r i d -> queueUpdate slaveState (r, i, d)
+                        -- We are sent an Update to replicate for synchronization.
+                        DoSyncRep r d -> replicateSyncUpdate slaveState r d 
+                        -- Master done sending all synchronization Updates.
+                        SyncDone -> debug "Sync Done." >> Event.set slaveSyncDone
+                        -- We are requested to Quit.
+                        MasterQuit -> undefined -- todo: how get a State that wasn't closed closed?
+                        -- no other messages possible
+                        _ -> error $ "Unknown message received: " ++ show mmsg
 
 -- | Queue Updates into Chan for replication.
 queueUpdate :: SlaveState st -> SlaveRepItem -> IO ()
@@ -193,10 +196,7 @@ replicateUpdate SlaveState{..} (rev, reqId, event) syncing = do
                         void $ scheduleColdUpdate slaveLocalState event 
                     Just rid -> modifyMVar slaveRequests $ \srs -> do
                         debug $ "This is the Update for Request " ++ show rid
-                        let icb = fromMaybe (error $ "Callback not found: " ++ show rid) (M.lookup rid srs) 
-                        debug "before cb"
-                        callback <- icb
-                        debug "after cb"
+                        callback <- fromMaybe (error $ "Callback not found: " ++ show rid) (M.lookup rid srs)
                         -- todo: we remember it, clean it up later
                         let nsrs = M.delete rid srs
                         return (nsrs, callback) 
@@ -224,7 +224,6 @@ scheduleSlaveUpdate slaveState@SlaveState{..} event = do
         modifyMVar_ slaveRequests $ \srs -> do
             let encoded = runPutLazy (safePut event)
             sendToMaster slaveZmqSocket $ ReqUpdate reqId (methodTag event, encoded)
-            debug "after send"
             let callback = do
                     hd <- scheduleUpdate slaveLocalState event 
                     void $ forkIO $ putMVar result =<< takeMVar hd