checkpoint replication
authorMax Voit <max.voit+gtdv@with-eyes.net>
Mon, 20 Jul 2015 07:42:26 +0000 (09:42 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Mon, 20 Jul 2015 07:42:26 +0000 (09:42 +0200)
src/Data/Acid/Centered/Master.hs
src/Data/Acid/Centered/Slave.hs

index 8b836b4..2c76540 100644 (file)
@@ -161,7 +161,7 @@ connectNode MasterState{..} i revision =
             if lastCpRev > revision then do
                 -- send last checkpoint and events from after then
                 sendSyncCheckpoint zmqSocket lastCp i
-                pastUpdates <- getPastUpdates localState (lastCpRev + 1)
+                pastUpdates <- getPastUpdates localState lastCpRev
                 forM_ pastUpdates $ \(r, u) -> sendSyncUpdate zmqSocket r u i
             else do
                 -- just the events
index cdb01fe..628672a 100644 (file)
@@ -39,7 +39,7 @@ import Data.SafeCopy
 import Data.Serialize (Serialize(..), put, get,
                        decode, encode,
                        runPutLazy, runPut,
-                       runGet
+                       runGet, runGetLazy
                       )
 
 import Data.Acid
@@ -225,17 +225,19 @@ slaveReplicationHandler slaveState@SlaveState{..} = do
         void $ takeMVar slaveRepThreadId
 
 -- | Replicate Sync-Checkpoints directly.
-replicateSyncCp :: SlaveState st -> Revision -> CSL.ByteString -> IO ()
-replicateSyncCp slaveState rev encoded = undefined
-    --core <- mkCore (eventsToMethods acidEvents) encoded
-    -- TODO
-    -- actually it might be best, to:
-    --      close the local state
-    --      append the checkpoint to checkpointLog
-    --      open the local state anew
-    -- in between no updates are possible, however.
-    -- also we can't simply substitute the localState within slaveState
-
+replicateSyncCp :: (IsAcidic st, Typeable st) =>
+        SlaveState st -> Revision -> CSL.ByteString -> IO ()
+replicateSyncCp slaveState@SlaveState{..} rev encoded = do
+    st <- decodeCheckpoint encoded
+    let core = localCore $ downcast slaveLocalState
+    modifyCoreState_ core (replaceState st)
+    modifyMVar_ slaveRevision $ return . const rev
+    where
+        replaceState s _ = return s
+        decodeCheckpoint e =
+            case runGetLazy safeGet e of
+                Left msg  -> error $ "Checkpoint could not be decoded: " ++ msg
+                Right val -> return val
 
 -- | Replicate Sync-Updates directly.
 replicateSyncUpdate slaveState rev event = replicateUpdate slaveState rev Nothing event True