checkpoint transfer framework
authorMax Voit <max.voit+gtdv@with-eyes.net>
Sat, 18 Jul 2015 07:45:13 +0000 (09:45 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Sat, 18 Jul 2015 07:45:13 +0000 (09:45 +0200)
acid-state-dist.cabal
src/Data/Acid/Centered/Common.hs
src/Data/Acid/Centered/Master.hs
src/Data/Acid/Centered/Slave.hs

index 521e83b..712ae21 100644 (file)
@@ -72,6 +72,7 @@ library
                        transformers,
                        stm,
                        semigroups,
+                       safe,
                        mtl
   
   -- Directories containing source files.
index f958e84..f51fe1f 100644 (file)
@@ -80,6 +80,7 @@ data MasterMessage = DoRep Revision (Maybe RequestID) (Tagged CSL.ByteString)
                    | DoSyncRep Revision (Tagged CSL.ByteString)
                    | SyncDone Crc
                    | DoCheckpoint Revision
+                   | DoSyncCheckpoint Revision CSL.ByteString
                    | MayQuit
                    | MasterQuit
                   deriving (Show)
@@ -93,12 +94,13 @@ data SlaveMessage = NewSlave Int
 
 instance Serialize MasterMessage where
     put msg = case msg of
-        DoRep r i d   -> putWord8 0 >> put r >> put i >> put d
-        DoSyncRep r d -> putWord8 1 >> put r >> put d
-        SyncDone c    -> putWord8 2 >> put c
-        DoCheckpoint r -> putWord8 3 >> put r
-        MayQuit       -> putWord8 8
-        MasterQuit    -> putWord8 9
+        DoRep r i d          -> putWord8 0 >> put r >> put i >> put d
+        DoSyncRep r d        -> putWord8 1 >> put r >> put d
+        SyncDone c           -> putWord8 2 >> put c
+        DoCheckpoint r       -> putWord8 3 >> put r
+        DoSyncCheckpoint r d -> putWord8 4 >> put r >> put d
+        MayQuit              -> putWord8 8
+        MasterQuit           -> putWord8 9
     get = do 
         tag <- getWord8
         case tag of
@@ -106,6 +108,7 @@ instance Serialize MasterMessage where
             1 -> liftM2 DoSyncRep get get
             2 -> liftM SyncDone get
             3 -> liftM DoCheckpoint get
+            4 -> liftM2 DoSyncCheckpoint get get
             8 -> return MayQuit
             9 -> return MasterQuit
             _ -> error $ "Data.Serialize.get failed for MasterMessage: invalid tag " ++ show tag
index 1543fe1..8b836b4 100644 (file)
@@ -62,6 +62,7 @@ import qualified Data.ByteString.Lazy.Char8 as CSL
 import qualified Data.ByteString.Char8 as CS
 import Data.ByteString.Char8 (ByteString)
 import qualified Data.List.NonEmpty as NEL
+import Safe (headDef)
 
 -- auto imports following - need to be cleaned up
 import Control.Concurrent.MVar(MVar, newMVar, newEmptyMVar,
@@ -110,9 +111,7 @@ masterRequestHandler masterState@MasterState{..} = do
             -- handle according frame contents
             case msg of
                 -- New Slave joined.
-                NewSlave r -> do
-                    pastUpdates <- getPastUpdates localState r
-                    connectNode masterState ident pastUpdates
+                NewSlave r -> connectNode masterState ident r
                 -- Slave is done replicating.
                 RepDone r -> return () -- updateNodeStatus masterState ident r
                 -- Slave sends an Udate.
@@ -127,10 +126,6 @@ masterRequestHandler masterState@MasterState{..} = do
             -- loop around
             debug "Loop iteration."
 
--- | Fetch past Updates from FileLog for replication.
-getPastUpdates :: (Typeable st) => AcidState st -> Int -> IO [(Int, Tagged CSL.ByteString)]
-getPastUpdates state startRev = liftM2 zip (return [(startRev+1)..]) (readEntriesFrom (localEvents $ downcast state) startRev)
-
 -- | Remove a Slave node from NodeStatus.
 removeFromNodeStatus :: MVar NodeStatus -> NodeIdentity -> IO ()
 removeFromNodeStatus nodeStatus ident =
@@ -153,20 +148,51 @@ updateNodeStatus MasterState{..} ident r =
 --   i.e. send all past events as Updates. This is fire&forget.
 --   todo: check HWM
 --   todo: check sync validity
-connectNode :: (IsAcidic st, Typeable st) => MasterState st -> NodeIdentity -> [(Int, Tagged CSL.ByteString)] -> IO ()
-connectNode MasterState{..} i pastUpdates = 
+connectNode :: (IsAcidic st, Typeable st) => MasterState st -> NodeIdentity -> Revision -> IO ()
+connectNode MasterState{..} i revision = 
     withMVar masterRevision $ \mr -> 
         modifyMVar_ nodeStatus $ \ns -> do
             -- todo: do we need to lock masterState for crc?
             crc <- crcOfState localState
-            forM_ pastUpdates $ \(r, u) -> sendSyncUpdate zmqSocket r u i
+            -- if there has been one/more checkpoint in between:
+            lastCp <- getLastCheckpointRev localState
+            let lastCpRev = cpRevision lastCp
+            debug $ "Found checkpoint at revision " ++ show lastCpRev
+            if lastCpRev > revision then do
+                -- send last checkpoint and events from after then
+                sendSyncCheckpoint zmqSocket lastCp i
+                pastUpdates <- getPastUpdates localState (lastCpRev + 1)
+                forM_ pastUpdates $ \(r, u) -> sendSyncUpdate zmqSocket r u i
+            else do
+                -- just the events
+                pastUpdates <- getPastUpdates localState revision
+                forM_ pastUpdates $ \(r, u) -> sendSyncUpdate zmqSocket r u i
             sendToSlave zmqSocket (SyncDone crc) i
-            return $ M.insert i mr ns 
+            return $ M.insert i mr ns
+    where cpRevision (Checkpoint r _) = r
+
+-- | Fetch past Updates from FileLog for replication.
+getPastUpdates :: (Typeable st) => AcidState st -> Int -> IO [(Int, Tagged CSL.ByteString)]
+getPastUpdates state startRev = liftM2 zip (return [(startRev+1)..]) (readEntriesFrom (localEvents $ downcast state) startRev)
+
+-- | Get the revision at which the last checkpoint was taken.
+getLastCheckpointRev :: (Typeable st) => AcidState st -> IO Checkpoint
+getLastCheckpointRev state = do
+    let lst = downcast state
+    let cplog = localCheckpoints lst
+    nextId <- atomically $ readTVar $ logNextEntryId cplog
+    cps <- readEntriesFrom cplog (nextId - 1)
+    return $ headDef (Checkpoint 0 CSL.empty) cps
 
 -- | Send a message to a Slave
 sendToSlave :: MVar (Socket Router) -> MasterMessage -> NodeIdentity -> IO ()
 sendToSlave msock msg ident = withMVar msock $ \sock -> sendMulti sock $ NEL.fromList [ident, encode msg]
 
+-- | Send an encoded Checkpoint to a Slave.
+sendSyncCheckpoint :: MVar (Socket Router) -> Checkpoint -> NodeIdentity -> IO ()
+sendSyncCheckpoint sock (Checkpoint cr encoded) =
+    sendToSlave sock (DoSyncCheckpoint cr encoded)
+
 -- | Send one (encoded) Update to a Slave.
 sendSyncUpdate :: MVar (Socket Router) -> Revision -> Tagged CSL.ByteString -> NodeIdentity -> IO ()
 sendSyncUpdate sock revision update = sendToSlave sock (DoSyncRep revision update) 
@@ -209,7 +235,7 @@ openMasterState port initialState = do
         repChan <- newChan
         ns <- newMVar M.empty
         -- remote
-        let addr = "tcp://127.0.0.1:" ++ show port
+        let addr = "tcp://*:" ++ show port
         ctx <- context
         sock <- socket ctx Router
         setReceiveHighWM (restrict (100*1000)) sock
index 8e867e6..cdb01fe 100644 (file)
@@ -44,6 +44,7 @@ import Data.Serialize (Serialize(..), put, get,
 
 import Data.Acid
 import Data.Acid.Core
+import Data.Acid.Common
 import Data.Acid.Abstract
 import Data.Acid.Local
 import Data.Acid.Log
@@ -168,6 +169,8 @@ slaveRequestHandler slaveState@SlaveState{..} = do
                      case mmsg of
                         -- We are sent an Update to replicate.
                         DoRep r i d -> queueRepItem slaveState (SRIUpdate r i d)
+                        -- We are sent a Checkpoint for synchronization.
+                        DoSyncCheckpoint r d -> replicateSyncCp slaveState r d 
                         -- We are sent an Update to replicate for synchronization.
                         DoSyncRep r d -> replicateSyncUpdate slaveState r d 
                         -- Master done sending all synchronization Updates.
@@ -221,6 +224,19 @@ slaveReplicationHandler slaveState@SlaveState{..} = do
         -- signal that we're done
         void $ takeMVar slaveRepThreadId
 
+-- | Replicate Sync-Checkpoints directly.
+replicateSyncCp :: SlaveState st -> Revision -> CSL.ByteString -> IO ()
+replicateSyncCp slaveState rev encoded = undefined
+    --core <- mkCore (eventsToMethods acidEvents) encoded
+    -- TODO
+    -- actually it might be best, to:
+    --      close the local state
+    --      append the checkpoint to checkpointLog
+    --      open the local state anew
+    -- in between no updates are possible, however.
+    -- also we can't simply substitute the localState within slaveState
+
+
 -- | Replicate Sync-Updates directly.
 replicateSyncUpdate slaveState rev event = replicateUpdate slaveState rev Nothing event True