Async Slave joins
authorMax Voit <max.voit+gtdv@with-eyes.net>
Wed, 24 Jun 2015 10:21:38 +0000 (12:21 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Wed, 24 Jun 2015 10:21:38 +0000 (12:21 +0200)
src/Data/Acid/Centered/Common.hs
src/Data/Acid/Centered/Master.hs
src/Data/Acid/Centered/Slave.hs

index a195719..31f2b06 100644 (file)
@@ -50,6 +50,7 @@ debug = putStrLn
 
 data MasterMessage = DoRep Revision (Maybe RequestID) (Tagged CSL.ByteString)
                    | DoSyncRep Revision (Tagged CSL.ByteString)
+                   | SyncDone
                    | MasterQuit
                   deriving (Show)
 
@@ -64,12 +65,14 @@ instance Serialize MasterMessage where
     put msg = case msg of
         DoRep r i d   -> putWord8 0 >> put r >> put i >> put d
         DoSyncRep r d -> putWord8 1 >> put r >> put d
+        SyncDone      -> putWord8 2
         MasterQuit    -> putWord8 9
     get = do 
         tag <- getWord8
         case tag of
             0 -> liftM3 DoRep get get get
             1 -> liftM2 DoSyncRep get get
+            2 -> return SyncDone
             9 -> return MasterQuit
             _ -> error $ "Data.Serialize.get failed for MasterMessage: invalid tag " ++ show tag
 
index edd0e02..cf92e82 100644 (file)
@@ -142,18 +142,23 @@ connectNode :: MasterState st -> NodeIdentity -> [(Int, Tagged CSL.ByteString)]
 connectNode MasterState{..} i pastUpdates = 
     withMVar masterRevision $ \mr -> 
         modifyMVar_ nodeStatus $ \ns -> do
-            forM_ pastUpdates $ \(r, u) -> do
-                sendUpdate zmqSocket r Nothing u i
-                (ident, msg) <- receiveFrame zmqSocket
-                when (ident /= i) $ error "received message not from the new node"
-                -- todo: also check increment validity
+            forM_ pastUpdates $ \(r, u) -> sendSyncUpdate zmqSocket r u i
+            sendToSlave zmqSocket SyncDone i
             return $ M.insert i mr ns 
 
+-- | Send a message to a Slave
+sendToSlave :: Socket Router -> MasterMessage -> NodeIdentity -> IO ()
+sendToSlave sock msg ident = do
+    send sock [SendMore] ident
+    send sock [] $ encode msg
+
+-- | Send one (encoded) Update to a Slave.
+sendSyncUpdate :: Socket Router -> Revision -> Tagged CSL.ByteString -> NodeIdentity -> IO ()
+sendSyncUpdate sock revision update = sendToSlave sock (DoSyncRep revision update) 
+    
 -- | Send one (encoded) Update to a Slave.
 sendUpdate :: Socket Router -> Revision -> Maybe RequestID -> Tagged CSL.ByteString -> NodeIdentity -> IO ()
-sendUpdate sock revision reqId update ident = do
-    send sock [SendMore] ident
-    send sock [] $ encode $ DoRep revision reqId update
+sendUpdate sock revision reqId update = sendToSlave sock (DoRep revision reqId update) 
     
 -- | Receive one Frame. A Frame consists of three messages: 
 --      sender ID, empty message, and actual content 
index 013b28f..0982642 100644 (file)
@@ -58,11 +58,14 @@ import Control.Concurrent (forkIO)
 import Control.Concurrent.MVar (MVar, newMVar, newEmptyMVar, 
                                 modifyMVar, modifyMVar_,
                                 takeMVar, putMVar)
-import Control.Monad (forever, void)
+import Control.Monad (forever, void,
+                      when, unless
+                     )
 import Control.Monad.STM (atomically)
 import Control.Concurrent.STM.TVar (readTVar)
 import Control.Concurrent.Event (Event)
 import qualified Control.Concurrent.Event as Event
+import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
 
 import Data.Map (Map)
 import qualified Data.Map as M
@@ -76,6 +79,8 @@ import qualified Data.ByteString.Lazy.Char8 as CSL
 
 data SlaveState st 
     = SlaveState { slaveLocalState :: AcidState st
+                 , slaveRepChan :: Chan SlaveRepItem
+                 , slaveSyncDone :: Event
                  , slaveRevision :: MVar NodeRevision
                  , slaveRequests :: MVar SlaveRequests
                  , slaveZmqContext :: Context
@@ -84,6 +89,7 @@ data SlaveState st
                  } deriving (Typeable)
 
 type SlaveRequests = Map RequestID (IO ())
+type SlaveRepItem = (Revision, Maybe RequestID, Tagged CSL.ByteString)
 
 -- | Open a local State as Slave for a Master.
 enslaveState :: (IsAcidic st, Typeable st) =>
@@ -102,42 +108,66 @@ enslaveState address port initialState = do
         ctx <- context
         sock <- socket ctx Dealer
         srs <- newMVar M.empty
+        repChan <- newChan
+        syncDone <- Event.new
         let addr = "tcp://" ++ address ++ ":" ++ show port
         connect sock addr
         sendToMaster sock $ NewSlave lrev
         let slaveState = SlaveState { slaveLocalState = lst
+                                    , slaveRepChan = repChan
+                                    , slaveSyncDone = syncDone
                                     , slaveRevision = rev
                                     , slaveRequests = srs
                                     , slaveZmqContext = ctx
                                     , slaveZmqAddr = addr
                                     , slaveZmqSocket = sock
                                     }
-        forkIO $ slaveRepHandler slaveState 
+        forkIO $ slaveRequestHandler slaveState 
+        forkIO $ slaveReplicationHandler slaveState 
         return $ slaveToAcidState slaveState 
 
 -- | Replication handler of the Slave. Forked and running in background all the
 --   time.
-slaveRepHandler :: SlaveState st -> IO ()
-slaveRepHandler slaveState@SlaveState{..} = forever $ do
+slaveRequestHandler :: SlaveState st -> IO ()
+slaveRequestHandler slaveState@SlaveState{..} = forever $ do
         msg <- receive slaveZmqSocket
         case decode msg of
             Left str -> error $ "Data.Serialize.decode failed on MasterMessage: " ++ show msg
             Right mmsg -> case mmsg of
                     -- We are sent an Update to replicate.
-                    DoRep r i d -> replicateUpdate slaveState r i d 
+                    DoRep r i d -> queueUpdate slaveState (r, i, d)
                     -- We are sent an Update to replicate for synchronization.
                     DoSyncRep r d -> replicateSyncUpdate slaveState r d 
+                    -- Master done sending all Sync Updates 
+                    SyncDone -> debug "Sync Done." >> Event.set slaveSyncDone
                     -- We are requested to Quit.
                     MasterQuit -> undefined -- todo: how get a State that wasn't closed closed?
                     -- no other messages possible
                     _ -> error $ "Unknown message received: " ++ show mmsg
 
+-- | Queue Updates into Chan for replication.
+queueUpdate :: SlaveState st -> SlaveRepItem -> IO ()
+queueUpdate SlaveState{..} repItem@(rev, _, _) = do
+        debug $ "Queuing Update with revision " ++ show rev
+        writeChan slaveRepChan repItem
+
+-- | Replicates content of Chan.
+slaveReplicationHandler slaveState@SlaveState{..} = do
+        noTimeout <- Event.waitTimeout slaveSyncDone $ 10*1000*1000
+        unless noTimeout $ error "Slave took too long to sync, ran into timeout."
+        forever $ do
+            repItem <- readChan slaveRepChan
+            replicateUpdate slaveState repItem False
+
+-- | Replicate Sync-Updates directly.
+replicateSyncUpdate slaveState rev event = replicateUpdate slaveState (rev, Nothing, event) True
+
 -- | Replicate an Update as requested by Master.
 --   Updates that were requested by this Slave we run locally and put the result
 --   into the MVar in SlaveRequests.
 --   Other Updates are just replicated without using the result.
-replicateUpdate :: SlaveState st -> Revision -> Maybe RequestID -> Tagged CSL.ByteString -> IO ()
-replicateUpdate SlaveState{..} rev reqId event = do
+replicateUpdate :: SlaveState st -> SlaveRepItem -> Bool -> IO ()
+replicateUpdate SlaveState{..} (rev, reqId, event) syncing = do
         debug $ "Got an Update to replicate " ++ show rev
         modifyMVar_ slaveRevision $ \nr -> case rev - 1 of
             nr -> do
@@ -151,7 +181,7 @@ replicateUpdate SlaveState{..} rev reqId event = do
                         let nsrs = M.adjust (\c -> return ()) rid srs
                         return (nsrs, callback) 
                 -- send reply: we're done
-                sendToMaster slaveZmqSocket $ RepDone rev
+                unless syncing $ sendToMaster slaveZmqSocket $ RepDone rev
                 return rev
             _  -> do 
                 sendToMaster slaveZmqSocket RepError
@@ -161,8 +191,6 @@ replicateUpdate SlaveState{..} rev reqId event = do
                                 Left str -> error str
                                 Right val -> val
 
-replicateSyncUpdate = undefined
-
 -- | Slave Updates
 --
 -- | Update on slave site.