fixed message / event encode - example working (with debug output ugly)
authorMax Voit <max.voit+gtdv@with-eyes.net>
Thu, 18 Jun 2015 15:37:32 +0000 (17:37 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Thu, 18 Jun 2015 15:37:32 +0000 (17:37 +0200)
src/Data/Acid/Centered/Common.hs
src/Data/Acid/Centered/Master.hs
src/Data/Acid/Centered/Slave.hs

index ef9f264..228be58 100644 (file)
@@ -20,10 +20,11 @@ module Data.Acid.Centered.Common
     , MasterMessage(..)
     ) where
 
---import Data.Acid.Core (Tagged(..))
+import Data.Acid.Core (Tagged(..))
 
 import Control.Monad (liftM, liftM2)
 import Data.ByteString.Char8 (ByteString)
+import qualified Data.ByteString.Lazy.Char8 as CSL
 import Data.Serialize (Serialize(..), put, get,
                        putWord8, getWord8,
                       )
@@ -35,7 +36,7 @@ type NodeRevision = Int
 debug :: String -> IO ()
 debug = putStrLn 
 
-data MasterMessage = DoRep Int ByteString
+data MasterMessage = DoRep Int (Tagged CSL.ByteString)
                    | MasterQuit
                   deriving (Show)
 
index c35105a..3ba972e 100644 (file)
@@ -138,7 +138,7 @@ connectNode :: Socket Router -> MVar NodeStatus -> NodeIdentity -> [Tagged CSL.B
 connectNode sock nodeStatus i oldUpdates = 
     modifyMVar_ nodeStatus $ \ns -> do
         forM_ (zip oldUpdates [0..]) $ \(u, r) -> do
-            sendUpdate sock r (encode u) i
+            sendUpdate sock r u i
             (ident, msg) <- receiveFrame sock
             when (ident /= i) $ error "received message not from the new node"
             -- todo: also check increment validity
@@ -149,7 +149,7 @@ encodeUpdate :: (UpdateEvent e) => e -> ByteString
 encodeUpdate event = runPut (safePut event)
 
 -- | Send one (encoded) Update to a Slave.
-sendUpdate :: Socket Router -> Int -> ByteString -> NodeIdentity -> IO ()
+sendUpdate :: Socket Router -> Int -> Tagged CSL.ByteString -> NodeIdentity -> IO ()
 sendUpdate sock revision update ident = do
     send sock [SendMore] ident
     send sock [SendMore] CS.empty
@@ -228,12 +228,12 @@ scheduleMasterUpdate masterState event = do
         return res
 
 -- | Send a new update to all Slaves.
-sendUpdateSlaves :: (SafeCopy e) => MasterState st -> Int -> e -> IO ()
+sendUpdateSlaves :: (UpdateEvent e) => MasterState st -> Int -> e -> IO ()
 sendUpdateSlaves MasterState{..} revision event = withMVar nodeStatus $ \ns -> do
     let allSlaves = M.keys ns
-    let encoded = runPut (safePut event)
+    let encoded = runPutLazy (safePut event)
     forM_ allSlaves $ \i ->
-        sendUpdate zmqSocket revision encoded i
+        sendUpdate zmqSocket revision (methodTag event, encoded) i
 
 
 toAcidState :: IsAcidic st => MasterState st -> AcidState st
index 902d3da..0f2bc5e 100644 (file)
@@ -81,17 +81,19 @@ enslaveState :: (IsAcidic st, Typeable st) =>
          -> st              -- ^ initial state
          -> IO (AcidState st)
 enslaveState address port initialState = do
-        debug "opening enslaved state"
+        debug "Opening enslaved state."
         -- local
         lst <- openLocalState initialState
         let levs = localEvents $ downcast lst
-        lrev <- atomically $ readTVar $ logNextEntryId levs
+        nlrev <- atomically $ readTVar $ logNextEntryId levs
+        let lrev = nlrev -1
         rev <- newMVar lrev
         -- remote
         ctx <- context
         sock <- socket ctx Req
         let addr = "tcp://" ++ address ++ ":" ++ show port
         connect sock addr
+        sendToMaster sock $ NewSlave lrev
         let slaveState = SlaveState { slaveLocalState = lst
                                     , slaveRevision = rev
                                     , slaveZmqContext = ctx
@@ -116,13 +118,13 @@ slaveRepHandler SlaveState{..} = forever $ do
                     -- no other messages possible
                     _ -> error $ "Unknown message received: " ++ show mmsg
 
-replicateUpdate :: Socket Req -> Int -> ByteString -> AcidState st -> MVar NodeRevision -> IO ()
+replicateUpdate :: Socket Req -> Int -> Tagged CSL.ByteString -> AcidState st -> MVar NodeRevision -> IO ()
 replicateUpdate sock rev event lst nrev = do
         debug $ "Got an Update to replicate " ++ show rev
         modifyMVar_ nrev $ \nr -> case rev - 1 of
             nr -> do
                 -- commit it locally 
-                scheduleColdUpdate lst $ decodeEvent event
+                scheduleColdUpdate lst event
                 -- send reply: we're done
                 sendToMaster sock $ RepDone rev
                 return rev