fix revision count issue, numbering of pastUpdates, some more debug
authorMax Voit <max.voit+gtdv@with-eyes.net>
Thu, 18 Jun 2015 16:42:09 +0000 (18:42 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Thu, 18 Jun 2015 16:42:09 +0000 (18:42 +0200)
src/Data/Acid/Centered/Master.hs
src/Data/Acid/Centered/Slave.hs

index 3ba972e..66f9e62 100644 (file)
@@ -89,7 +89,7 @@ masterRepHandler masterState@MasterState{..} = do
                     -- New Slave joined.
                     NewSlave r -> do
                         -- todo: the state should be locked at this point to avoid losses(?)
-                        pastUpdates <- getPastUpdates localState
+                        pastUpdates <- getPastUpdates localState r
                         connectNode zmqSocket nodeStatus ident pastUpdates
                     -- Slave is done replicating.
                     RepDone r -> updateNodeStatus masterState ident r
@@ -106,8 +106,8 @@ masterRepHandler masterState@MasterState{..} = do
 
 
 -- | Fetch past Updates from FileLog for replication.
-getPastUpdates :: (Typeable st) => AcidState st -> IO [Tagged CSL.ByteString]
-getPastUpdates state = readEntriesFrom (localEvents $ downcast state) 0
+getPastUpdates :: (Typeable st) => AcidState st -> Int -> IO [(Int, Tagged CSL.ByteString)]
+getPastUpdates state startRev = liftM2 zip (return [startRev..]) (readEntriesFrom (localEvents $ downcast state) startRev)
 
 -- | Remove a Slave node from NodeStatus.
 removeFromNodeStatus :: MVar NodeStatus -> NodeIdentity -> IO ()
@@ -134,19 +134,18 @@ updateNodeStatus MasterState{..} ident r =
 --   i.e. send all past events as Updates.
 --   This temporarily blocks all other communication.
 -- todo: updates received by slaves are problematic here!
-connectNode :: Socket Router -> MVar NodeStatus -> NodeIdentity -> [Tagged CSL.ByteString] -> IO ()
-connectNode sock nodeStatus i oldUpdates = 
+connectNode :: Socket Router -> MVar NodeStatus -> NodeIdentity -> [(Int, Tagged CSL.ByteString)] -> IO ()
+connectNode sock nodeStatus i pastUpdates = 
     modifyMVar_ nodeStatus $ \ns -> do
-        forM_ (zip oldUpdates [0..]) $ \(u, r) -> do
+        forM_ pastUpdates $ \(r, u) -> do
             sendUpdate sock r u i
             (ident, msg) <- receiveFrame sock
             when (ident /= i) $ error "received message not from the new node"
             -- todo: also check increment validity
         return $ M.insert i rev ns 
-    where rev = length oldUpdates
-
-encodeUpdate :: (UpdateEvent e) => e -> ByteString
-encodeUpdate event = runPut (safePut event)
+    where rev = case length pastUpdates of
+                    0 -> 0
+                    _ -> fst $ last pastUpdates
 
 -- | Send one (encoded) Update to a Slave.
 sendUpdate :: Socket Router -> Int -> Tagged CSL.ByteString -> NodeIdentity -> IO ()
@@ -163,11 +162,13 @@ receiveFrame sock = do
     ident <- receive sock
     _     <- receive sock
     msg   <- receive sock
-    debug $ "received from [" ++ show ident ++ "]: " ++ show msg
     case decode msg of
         -- todo: pass on exceptions
         Left str -> error $ "Data.Serialize.decode failed on SlaveMessage: " ++ show msg
-        Right smsg -> return (ident, smsg)
+        Right smsg -> do
+            debug $ "Received from [" ++ show ident ++ "]: "
+                        ++ take 20 (show smsg)
+            return (ident, smsg)
 
 -- | Open the master state.
 openMasterState :: (IsAcidic st, Typeable st) =>
@@ -202,7 +203,7 @@ openMasterState port initialState = do
 -- | Close the master state.
 closeMasterState :: MasterState st -> IO ()
 closeMasterState MasterState{..} = do
-        debug "closing master state"
+        debug "Closing master state."
         -- wait all nodes done
         -- todo^ - not necessary for now
         -- cleanup zmq
@@ -231,9 +232,13 @@ scheduleMasterUpdate masterState event = do
 sendUpdateSlaves :: (UpdateEvent e) => MasterState st -> Int -> e -> IO ()
 sendUpdateSlaves MasterState{..} revision event = withMVar nodeStatus $ \ns -> do
     let allSlaves = M.keys ns
+    let numSlaves = length allSlaves
+    debug $ "Sending Update to Slaves, there are " ++ show numSlaves
     let encoded = runPutLazy (safePut event)
     forM_ allSlaves $ \i ->
         sendUpdate zmqSocket revision (methodTag event, encoded) i
+    -- if there are no Slaves, replication is already done
+    when (numSlaves == 0) $ E.set repDone
 
 
 toAcidState :: IsAcidic st => MasterState st -> AcidState st
index 0f2bc5e..4977e09 100644 (file)
@@ -81,14 +81,13 @@ enslaveState :: (IsAcidic st, Typeable st) =>
          -> st              -- ^ initial state
          -> IO (AcidState st)
 enslaveState address port initialState = do
-        debug "Opening enslaved state."
         -- local
         lst <- openLocalState initialState
         let levs = localEvents $ downcast lst
-        nlrev <- atomically $ readTVar $ logNextEntryId levs
-        let lrev = nlrev -1
+        lrev <- atomically $ readTVar $ logNextEntryId levs
         rev <- newMVar lrev
         -- remote
+        debug $ "Opening enslaved state at revision " ++ show lrev
         ctx <- context
         sock <- socket ctx Req
         let addr = "tcp://" ++ address ++ ":" ++ show port