quitting (for Slaves), encode-fix
authorMax Voit <max.voit+gtdv@with-eyes.net>
Wed, 17 Jun 2015 15:54:08 +0000 (17:54 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Wed, 17 Jun 2015 15:54:08 +0000 (17:54 +0200)
src/Data/Acid/Centered/Common.hs
src/Data/Acid/Centered/Master.hs
src/Data/Acid/Centered/Slave.hs

index 7cf210d..ef9a070 100644 (file)
@@ -19,6 +19,8 @@ module Data.Acid.Centered.Common
     , MasterMessage(..)
     ) where
 
+--import Data.Acid.Core (Tagged(..))
+
 import Control.Monad (liftM, liftM2)
 import Data.ByteString.Char8 (ByteString)
 import Data.Serialize (Serialize(..), put, get,
@@ -31,33 +33,36 @@ debug :: String -> IO ()
 debug = putStrLn 
 
 data MasterMessage = DoRep Int ByteString
+                   | MasterQuit
                   deriving (Show)
-                -- later:
-                -- | Quit
 
 data SlaveMessage = NewSlave Int
                   | RepDone Int
+                  | SlaveQuit
                   deriving (Show)
-               -- later:
+               -- todo, later:
                -- | Update ByteString
-               -- | Quit
 
 instance Serialize MasterMessage where
     put msg = case msg of
         DoRep r d -> putWord8 0 >> put r >> put d
+        MasterQuit -> putWord8 9
     get = do 
         tag <- getWord8
         case tag of
             0 -> liftM2 DoRep get get
+            9 -> return MasterQuit
             _ -> error $ "Data.Serialize.get failed for MasterMessage: invalid tag " ++ show tag
 
 instance Serialize SlaveMessage where
     put msg = case msg of
         NewSlave r -> putWord8 0 >> put r
         RepDone r  -> putWord8 1 >> put r
+        SlaveQuit  -> putWord8 9
     get = do
         tag <- getWord8
         case tag of
             0 -> liftM NewSlave get
             1 -> liftM RepDone get
+            9 -> return SlaveQuit
             _ -> error $ "Data.Serialize.get failed for SlaveMessage: invalid tag " ++ show tag
index 837129e..bf54317 100644 (file)
@@ -81,13 +81,17 @@ masterRepHandler MasterState{..} = do
                 (ident, msg) <- receiveFrame zmqSocket
                 -- handle according frame contents
                 case msg of
+                    -- New Slave joined.
                     NewSlave r -> do
-                        -- todo: the state should be locked at this point to avoid losses
+                        -- todo: the state should be locked at this point to avoid losses(?)
                         oldUpdates <- getPastUpdates localState
                         connectNode zmqSocket nodeStatus ident oldUpdates
+                    -- Slave is done replicating.
                     RepDone r -> updateNodeStatus nodeStatus repDone ident r cr
-                    -- Slave sends an _U_date
-                    --'U' -> undefined -- todo: not yet
+                    -- Slave sends an Udate.
+                    -- todo: not yet
+                    -- Slave quits.
+                    SlaveQuit -> removeFromNodeStatus nodeStatus ident
                     -- no other messages possible
                     _ -> error $ "Unknown message received: " ++ show msg
                 -- loop around
@@ -97,13 +101,14 @@ masterRepHandler MasterState{..} = do
         where cr = undefined :: Int
 
 
-
-
-
 -- | Fetch past Updates from FileLog for replication.
 getPastUpdates :: (Typeable st) => AcidState st -> IO [Tagged CSL.ByteString]
 getPastUpdates state = readEntriesFrom (localEvents $ downcast state) 0
 
+-- | Remove a Slave node from NodeStatus.
+removeFromNodeStatus :: MVar NodeStatus -> NodeIdentity -> IO ()
+removeFromNodeStatus nodeStatus ident =
+        modifyMVar_ nodeStatus $ return . M.delete ident
 
 -- | Update the NodeStatus after a node has replicated an Update.
 updateNodeStatus :: MVar NodeStatus -> E.Event -> NodeIdentity -> Int -> Int -> IO ()
@@ -128,13 +133,12 @@ connectNode :: Socket Router -> MVar NodeStatus -> NodeIdentity -> [Tagged CSL.B
 connectNode sock nodeStatus i oldUpdates = 
     modifyMVar_ nodeStatus $ \ns -> do
         forM_ (zip oldUpdates [0..]) $ \(u, r) -> do
-            sendUpdate sock r (encoded u) i
+            sendUpdate sock r (encode u) i
             (ident, msg) <- receiveFrame sock
             when (ident /= i) $ error "received message not from the new node"
             -- todo: also check increment validity
         return $ M.insert i rev ns 
     where  
-        encoded = undefined
         rev = length oldUpdates
 
 encodeUpdate :: (UpdateEvent e) => e -> ByteString
@@ -146,8 +150,6 @@ sendUpdate sock revision update ident = do
     send sock [SendMore] ident
     send sock [SendMore] CS.empty
     send sock [] $ encode $ DoRep revision update
-    where 
-        encoded = undefined -- encode Tag and BS of update
     
 
 -- | Receive one Frame. A Frame consists of three messages: 
@@ -216,6 +218,7 @@ scheduleMasterUpdate masterState event = do
     return res
     where revision = undefined
 
+-- | Send a new update to all Slaves.
 sendUpdateSlaves :: (SafeCopy e) => MasterState st -> Int -> e -> IO ()
 sendUpdateSlaves MasterState{..} revision event = withMVar nodeStatus $ \ns -> do
     let allSlaves = M.keys ns
index abb6a77..527ff36 100644 (file)
@@ -38,10 +38,12 @@ import Data.Typeable
 import Data.SafeCopy
 import Data.Serialize (Serialize(..), put, get,
                        decode, encode,
-                       runPutLazy, runPut
+                       runPutLazy, runPut,
+                       runGet
                       )
 
 import Data.Acid
+import Data.Acid.Core
 import Data.Acid.Abstract
 import Data.Acid.Local
 
@@ -105,33 +107,35 @@ slaveRepHandler SlaveState{..} = forever $ do
         case decode msg of
             Left str -> error $ "Data.Serialize.decode failed on MasterMessage: " ++ show msg
             Right mmsg -> case mmsg of
-                    -- We are sent an Update.
-                    DoRep r d -> replicateUpdate slaveZmqSocket msg slaveLocalState
+                    -- We are sent an Update to replicate.
+                    DoRep r d -> replicateUpdate slaveZmqSocket r d slaveLocalState
                     -- We are requested to Quit.
-                    -- Quit -> undefined -- todo: how get a State that wasn't closed closed?
+                    MasterQuit -> undefined -- todo: how get a State that wasn't closed closed?
                     -- no other messages possible
                     _ -> error $ "Unknown message received: " ++ show mmsg
 
-replicateUpdate :: Socket Req -> ByteString -> AcidState st -> IO ()
-replicateUpdate sock msg lst = do
+replicateUpdate :: Socket Req -> Int -> ByteString -> AcidState st -> IO ()
+replicateUpdate sock rev event lst = do
         debug "Got an Update to replicate."
-        CS.putStrLn msg
         -- commit it locally 
-        let tag = undefined
-        scheduleColdUpdate lst (tag, CSL.fromStrict msg)
+        scheduleColdUpdate lst $ decodeEvent event
         -- send reply: we're done
         sendToMaster sock $ RepDone revision
         where revision = undefined
-
+              decodeEvent ev = case runGet safeGet ev of
+                                Left str -> error str
+                                Right val -> val
+    
+-- | Send a message to Master.
 sendToMaster :: Socket Req -> SlaveMessage -> IO ()
 sendToMaster sock smsg = send sock [] $ encode smsg
 
 -- | Close an enslaved State.
 liberateState :: SlaveState st -> IO ()
 liberateState SlaveState{..} = do
-        debug "closing slave state"
+        debug "Closing Slave state."
         -- send master quit message
-        -- todo^
+        sendToMaster slaveZmqSocket SlaveQuit
         -- cleanup zmq
         disconnect slaveZmqSocket slaveZmqAddr 
         close slaveZmqSocket