replication on Master using Chan (no interleaving), not waiting for replication anymore
authorMax Voit <max.voit+gtdv@with-eyes.net>
Tue, 23 Jun 2015 11:21:27 +0000 (13:21 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Tue, 23 Jun 2015 11:21:27 +0000 (13:21 +0200)
src/Data/Acid/Centered/Master.hs

index a9f4819..edd0e02 100644 (file)
@@ -41,10 +41,12 @@ import Data.Serialize (Serialize(..), put, get,
 import Data.Acid.Centered.Common
 
 import Control.Concurrent (forkIO)
-import Control.Monad (forever, when, unless, forM_, liftM, liftM2)
+import Control.Concurrent.Chan (Chan, newChan, writeChan, readChan)
+import Control.Monad (when, unless, void,
+                      forever, forM_, 
+                      liftM, liftM2)
 import Control.Monad.STM (atomically)
 import Control.Concurrent.STM.TVar (readTVar)
-import qualified Control.Concurrent.Event as E
 
 import System.ZMQ4 (Context, Socket, Router(..), Receiver, Flag(..),
                     context, term, socket, close, 
@@ -58,7 +60,9 @@ import qualified Data.ByteString.Char8 as CS
 import Data.ByteString.Char8 (ByteString)
 
 -- auto imports following - need to be cleaned up
-import Control.Concurrent.MVar(MVar, modifyMVar, modifyMVar_, withMVar, newMVar)
+import Control.Concurrent.MVar(MVar, newMVar, newEmptyMVar,
+                               takeMVar, putMVar,
+                               modifyMVar, modifyMVar_, withMVar)
 
 --------------------------------------------------------------------------------
 
@@ -66,7 +70,7 @@ data MasterState st
     = MasterState { localState :: AcidState st
                   , nodeStatus :: MVar NodeStatus
                   , masterRevision :: MVar NodeRevision
-                  , repDone :: E.Event
+                  , masterReplicationChan :: Chan ReplicationItem
                   , zmqContext :: Context
                   , zmqAddr :: String
                   , zmqSocket :: Socket Router
@@ -74,13 +78,15 @@ data MasterState st
 
 type NodeIdentity = ByteString
 type NodeStatus = Map NodeIdentity NodeRevision
+type Callback = IO ()
+type ReplicationItem = (Tagged CSL.ByteString, Either Callback (RequestID, NodeIdentity))
         
--- | The replication handler on master node. Does
+-- | The request handler on master node. Does
 --      o handle receiving requests from nodes,
 --      o answering as needed (old updates),
 --      o bookkeeping on node states. 
-masterRepHandler :: (Typeable st) => MasterState st -> IO ()
-masterRepHandler masterState@MasterState{..} = do
+masterRequestHandler :: (Typeable st) => MasterState st -> IO ()
+masterRequestHandler masterState@MasterState{..} = do
         let loop = do
                 -- take one frame
                 (ident, msg) <- receiveFrame zmqSocket
@@ -95,7 +101,7 @@ masterRepHandler masterState@MasterState{..} = do
                     RepDone r -> updateNodeStatus masterState ident r
                     -- Slave sends an Udate.
                     ReqUpdate rid event ->
-                        sendUpdateSlaves masterState (rid, ident) event
+                        queueUpdate masterState (event, Right (rid, ident))
                     -- Slave quits.
                     SlaveQuit -> removeFromNodeStatus nodeStatus ident
                     -- no other messages possible
@@ -105,7 +111,6 @@ masterRepHandler masterState@MasterState{..} = do
                 loop
         loop
 
-
 -- | Fetch past Updates from FileLog for replication.
 getPastUpdates :: (Typeable st) => AcidState st -> Int -> IO [(Int, Tagged CSL.ByteString)]
 getPastUpdates state startRev = liftM2 zip (return [(startRev+1)..]) (readEntriesFrom (localEvents $ downcast state) startRev)
@@ -125,9 +130,7 @@ updateNodeStatus MasterState{..} ident r =
                 error $ "Invalid increment of node status " ++ show ns ++ " -> " ++ show mr
             -- todo: checks sensible?
             let rs = M.adjust (+1) ident ns
-            when (allNodesDone mr rs) $ do
-                E.set repDone
-                debug $ "All nodes done replicating " ++ show mr
+            when (allNodesDone mr rs) $ debug $ "All nodes done replicating " ++ show mr
             return rs
             where allNodesDone mrev = M.fold (\v t -> (v == mrev) && t) True
 
@@ -152,7 +155,6 @@ sendUpdate sock revision reqId update ident = do
     send sock [SendMore] ident
     send sock [] $ encode $ DoRep revision reqId update
     
-
 -- | Receive one Frame. A Frame consists of three messages: 
 --      sender ID, empty message, and actual content 
 receiveFrame :: (Receiver t) => Socket t -> IO (NodeIdentity, SlaveMessage)
@@ -182,19 +184,20 @@ openMasterState port initialState = do
         -- remote
         ctx <- context
         sock <- socket ctx Router
-        rd <- E.newSet
+        repChan <- newChan
         ns <- newMVar M.empty
         let addr = "tcp://127.0.0.1:" ++ show port
         bind sock addr
         let masterState = MasterState { localState = lst
                                       , nodeStatus = ns
                                       , masterRevision = rev
-                                      , repDone = rd
+                                      , masterReplicationChan = repChan
                                       , zmqContext = ctx
                                       , zmqAddr = addr
                                       , zmqSocket = sock
                                       }
-        forkIO $ masterRepHandler masterState
+        forkIO $ masterRequestHandler masterState
+        forkIO $ masterReplicationHandler masterState
         return $ toAcidState masterState
 
 -- | Close the master state.
@@ -213,62 +216,48 @@ closeMasterState MasterState{..} = do
 -- | Update on master site.
 -- todo: this implementation is only valid for Slaves not sending Updates.
 scheduleMasterUpdate :: UpdateEvent event => MasterState (EventState event) -> event -> IO (MVar (EventResult event))
-scheduleMasterUpdate masterState event = do
-        withMVar (masterRevision masterState) $ debug . (++) "Master Update from rev " . show 
-        -- do local Update
-        res <- scheduleUpdate (localState masterState) event
-        modifyMVar_ (masterRevision masterState) (return . (+1))
-        -- sent Update to Slaves
-        E.clear $ repDone masterState
-        sendUpdateSlavesOld masterState event
-        -- wait for Slaves finish replication
-        noTimeout <- E.waitTimeout (repDone masterState) (500*1000)
-        unless noTimeout $ do
-            debug "Timeout occurred."
-            E.set (repDone masterState)
-            removeLaggingNodes masterState
-        return res
-
+scheduleMasterUpdate masterState@MasterState{..} event = do
+        debug "Update by Master."
+        result <- newEmptyMVar 
+        let callback = putMVar result =<< takeMVar =<< scheduleUpdate localState event 
+        let encoded = runPutLazy (safePut event) 
+        queueUpdate masterState ((methodTag event, encoded), Left callback)
+        return result
+             
 -- | Remove nodes that were not responsive
 removeLaggingNodes :: MasterState st -> IO ()
 removeLaggingNodes MasterState{..} = 
+    -- todo: send the node a quit notice
     withMVar masterRevision $ \mr -> modifyMVar_ nodeStatus $ return . M.filter (== mr) 
 
+-- | Queue an Update (originating from the Master itself of an Slave via zmq)
+queueUpdate :: MasterState st -> ReplicationItem -> IO ()
+queueUpdate MasterState{..} = writeChan masterReplicationChan
 
--- | Send a new update to all Slaves.
-                        -- todo: these two as one type _
-sendUpdateSlaves :: MasterState st -> (RequestID, NodeIdentity) -> Tagged CSL.ByteString -> IO ()
-sendUpdateSlaves MasterState{..} (reqID, reqIdent) event = withMVar nodeStatus $ \ns -> do
-    let noReqSlaves = filter (/= reqIdent) $ M.keys ns 
-    let numSlaves = (+1) $ length noReqSlaves 
-    debug $ "Sending Update to Slaves, there are " ++ show numSlaves
-    modifyMVar_ masterRevision $ \mr -> do
-        let mrn = mr + 1
-        debug $ "Replicating Update myself to " ++ show mrn
-        scheduleColdUpdate localState event
-        debug $ "Sending Updates to rev " ++ show mrn
-        -- todo: check it's not sent (or better: requested) twice
-        sendUpdate zmqSocket mr (Just reqID) event reqIdent
-        forM_ noReqSlaves $ \i ->
-            sendUpdate zmqSocket mrn Nothing event i
-        return mrn
-    -- if there are no Slaves, replication is already done
-    when (numSlaves == 0) $ E.set repDone
-
--- | Send a new update to all Slaves.
-sendUpdateSlavesOld :: (UpdateEvent e) => MasterState st -> e -> IO ()
-sendUpdateSlavesOld MasterState{..} event = withMVar nodeStatus $ \ns -> do
-    let allSlaves = M.keys ns
-    let numSlaves = length allSlaves
-    debug $ "Sending Update to Slaves, there are " ++ show numSlaves
-    let encoded = runPutLazy (safePut event)
-    withMVar masterRevision $ \mr -> do
-        debug $ "Sending Updates to rev " ++ show mr
-        forM_ allSlaves $ \i ->
-            sendUpdate zmqSocket mr Nothing (methodTag event, encoded) i
-    -- if there are no Slaves, replication is already done
-    when (numSlaves == 0) $ E.set repDone
-
+-- | The replication handler. Takes care to run Updates locally in the same
+--   order as sending them out to the network.
+masterReplicationHandler :: MasterState st -> IO ()
+masterReplicationHandler MasterState{..} = forever $ do
+        debug "Replicating next item."
+        (event, sink) <- readChan masterReplicationChan
+        -- todo: temporary only one Chan, no improvement to without chan!
+        -- local part
+        withMVar masterRevision $ \mr -> debug $ "Replicating Update myself to " ++ show (mr + 1)
+        case sink of 
+            Left callback   -> callback 
+            _               -> void $ scheduleColdUpdate localState event
+        -- remote part
+        withMVar nodeStatus $ \ns -> do
+            debug $ "Sending Update to Slaves, there are " ++ show (M.size ns)
+            modifyMVar_ masterRevision $ \mrOld -> do
+                let mr = mrOld + 1 
+                case sink of
+                    Left _ -> forM_ (M.keys ns) $ sendUpdate zmqSocket mr Nothing event 
+                    Right (reqID, reqNodeIdent) -> do
+                        let noReqSlaves = filter (/= reqNodeIdent) $ M.keys ns 
+                        sendUpdate zmqSocket mr (Just reqID) event reqNodeIdent
+                        forM_ noReqSlaves $ sendUpdate zmqSocket mr Nothing event 
+                return mr
 
 toAcidState :: IsAcidic st => MasterState st -> AcidState st
 toAcidState master