fix other revision numbering issue
authorMax Voit <max.voit+gtdv@with-eyes.net>
Thu, 18 Jun 2015 18:04:04 +0000 (20:04 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Thu, 18 Jun 2015 18:04:04 +0000 (20:04 +0200)
src/Data/Acid/Centered/Master.hs

index 66f9e62..82a0b6c 100644 (file)
@@ -90,7 +90,7 @@ masterRepHandler masterState@MasterState{..} = do
                     NewSlave r -> do
                         -- todo: the state should be locked at this point to avoid losses(?)
                         pastUpdates <- getPastUpdates localState r
-                        connectNode zmqSocket nodeStatus ident pastUpdates
+                        connectNode masterState ident pastUpdates
                     -- Slave is done replicating.
                     RepDone r -> updateNodeStatus masterState ident r
                     -- Slave sends an Udate.
@@ -107,7 +107,7 @@ masterRepHandler masterState@MasterState{..} = do
 
 -- | Fetch past Updates from FileLog for replication.
 getPastUpdates :: (Typeable st) => AcidState st -> Int -> IO [(Int, Tagged CSL.ByteString)]
-getPastUpdates state startRev = liftM2 zip (return [startRev..]) (readEntriesFrom (localEvents $ downcast state) startRev)
+getPastUpdates state startRev = liftM2 zip (return [(startRev+1)..]) (readEntriesFrom (localEvents $ downcast state) startRev)
 
 -- | Remove a Slave node from NodeStatus.
 removeFromNodeStatus :: MVar NodeStatus -> NodeIdentity -> IO ()
@@ -134,18 +134,16 @@ updateNodeStatus MasterState{..} ident r =
 --   i.e. send all past events as Updates.
 --   This temporarily blocks all other communication.
 -- todo: updates received by slaves are problematic here!
-connectNode :: Socket Router -> MVar NodeStatus -> NodeIdentity -> [(Int, Tagged CSL.ByteString)] -> IO ()
-connectNode sock nodeStatus i pastUpdates = 
-    modifyMVar_ nodeStatus $ \ns -> do
-        forM_ pastUpdates $ \(r, u) -> do
-            sendUpdate sock r u i
-            (ident, msg) <- receiveFrame sock
-            when (ident /= i) $ error "received message not from the new node"
-            -- todo: also check increment validity
-        return $ M.insert i rev ns 
-    where rev = case length pastUpdates of
-                    0 -> 0
-                    _ -> fst $ last pastUpdates
+connectNode :: MasterState st -> NodeIdentity -> [(Int, Tagged CSL.ByteString)] -> IO ()
+connectNode MasterState{..} i pastUpdates = 
+    withMVar masterRevision $ \mr -> 
+        modifyMVar_ nodeStatus $ \ns -> do
+            forM_ pastUpdates $ \(r, u) -> do
+                sendUpdate zmqSocket r u i
+                (ident, msg) <- receiveFrame zmqSocket
+                when (ident /= i) $ error "received message not from the new node"
+                -- todo: also check increment validity
+            return $ M.insert i mr ns 
 
 -- | Send one (encoded) Update to a Slave.
 sendUpdate :: Socket Router -> Int -> Tagged CSL.ByteString -> NodeIdentity -> IO ()
@@ -217,26 +215,28 @@ closeMasterState MasterState{..} = do
 -- todo: this implementation is only valid for Slaves not sending Updates.
 scheduleMasterUpdate :: UpdateEvent event => MasterState (EventState event) -> event -> IO (MVar (EventResult event))
 scheduleMasterUpdate masterState event = do
+        withMVar (masterRevision masterState) $ debug . (++) "Master Update from rev " . show 
         -- do local Update
         res <- scheduleUpdate (localState masterState) event
-        modifyMVar_ (masterRevision masterState) $ \mr -> do
-            -- sent Update to Slaves
-            E.clear $ repDone masterState
-            sendUpdateSlaves masterState (mr + 1) event
-            return (mr + 1)
+        modifyMVar_ (masterRevision masterState) (return . (+1))
+        -- sent Update to Slaves
+        E.clear $ repDone masterState
+        sendUpdateSlaves masterState event
         -- wait for Slaves finish replication
         E.wait $ repDone masterState
         return res
 
 -- | Send a new update to all Slaves.
-sendUpdateSlaves :: (UpdateEvent e) => MasterState st -> Int -> e -> IO ()
-sendUpdateSlaves MasterState{..} revision event = withMVar nodeStatus $ \ns -> do
+sendUpdateSlaves :: (UpdateEvent e) => MasterState st -> e -> IO ()
+sendUpdateSlaves MasterState{..} event = withMVar nodeStatus $ \ns -> do
     let allSlaves = M.keys ns
     let numSlaves = length allSlaves
     debug $ "Sending Update to Slaves, there are " ++ show numSlaves
     let encoded = runPutLazy (safePut event)
-    forM_ allSlaves $ \i ->
-        sendUpdate zmqSocket revision (methodTag event, encoded) i
+    withMVar masterRevision $ \mr -> do
+        debug $ "Sending Updates to rev " ++ show mr
+        forM_ allSlaves $ \i ->
+            sendUpdate zmqSocket mr (methodTag event, encoded) i
     -- if there are no Slaves, replication is already done
     when (numSlaves == 0) $ E.set repDone