checkpoints, but not on sync
authorMax Voit <max.voit+gtdv@with-eyes.net>
Fri, 17 Jul 2015 17:24:21 +0000 (19:24 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Fri, 17 Jul 2015 17:24:21 +0000 (19:24 +0200)
src/Data/Acid/Centered/Common.hs
src/Data/Acid/Centered/Master.hs
src/Data/Acid/Centered/Slave.hs
tests/IntMasterInteractive.hs

index 5d2ebb9..f958e84 100644 (file)
@@ -79,6 +79,7 @@ debug = L.with debugLock . hPutStrLn stderr
 data MasterMessage = DoRep Revision (Maybe RequestID) (Tagged CSL.ByteString)
                    | DoSyncRep Revision (Tagged CSL.ByteString)
                    | SyncDone Crc
+                   | DoCheckpoint Revision
                    | MayQuit
                    | MasterQuit
                   deriving (Show)
@@ -95,6 +96,7 @@ instance Serialize MasterMessage where
         DoRep r i d   -> putWord8 0 >> put r >> put i >> put d
         DoSyncRep r d -> putWord8 1 >> put r >> put d
         SyncDone c    -> putWord8 2 >> put c
+        DoCheckpoint r -> putWord8 3 >> put r
         MayQuit       -> putWord8 8
         MasterQuit    -> putWord8 9
     get = do 
@@ -103,6 +105,7 @@ instance Serialize MasterMessage where
             0 -> liftM3 DoRep get get get
             1 -> liftM2 DoSyncRep get get
             2 -> liftM SyncDone get
+            3 -> liftM DoCheckpoint get
             8 -> return MayQuit
             9 -> return MasterQuit
             _ -> error $ "Data.Serialize.get failed for MasterMessage: invalid tag " ++ show tag
index 7755c83..1543fe1 100644 (file)
@@ -76,7 +76,7 @@ data MasterState st
                   , nodeStatus :: MVar NodeStatus
                   , masterStateLock :: MVar ()
                   , masterRevision :: MVar NodeRevision
-                  , masterReplicationChan :: Chan (Maybe ReplicationItem)
+                  , masterReplicationChan :: Chan ReplicationItem
                   , masterReqThreadId :: MVar ThreadId
                   , masterRepThreadId :: MVar ThreadId
                   , zmqContext :: Context
@@ -87,8 +87,11 @@ data MasterState st
 type NodeIdentity = ByteString
 type NodeStatus = Map NodeIdentity NodeRevision
 type Callback = IO ()
-type ReplicationItem = (Tagged CSL.ByteString, Either Callback (RequestID, NodeIdentity))
-        
+data ReplicationItem =
+      RIEnd
+    | RICheckpoint
+    | RIUpdate (Tagged CSL.ByteString) (Either Callback (RequestID, NodeIdentity))
+
 -- | The request handler on master node. Does
 --      o handle receiving requests from nodes,
 --      o answering as needed (old updates),
@@ -114,7 +117,7 @@ masterRequestHandler masterState@MasterState{..} = do
                 RepDone r -> return () -- updateNodeStatus masterState ident r
                 -- Slave sends an Udate.
                 ReqUpdate rid event ->
-                    queueUpdate masterState (event, Right (rid, ident))
+                    queueRepItem masterState (RIUpdate event (Right (rid, ident)))
                 -- Slave quits.
                 SlaveQuit -> do
                     sendToSlave zmqSocket MayQuit ident
@@ -171,7 +174,10 @@ sendSyncUpdate sock revision update = sendToSlave sock (DoSyncRep revision updat
 -- | Send one (encoded) Update to a Slave.
 sendUpdate :: MVar (Socket Router) -> Revision -> Maybe RequestID -> Tagged CSL.ByteString -> NodeIdentity -> IO ()
 sendUpdate sock revision reqId update = sendToSlave sock (DoRep revision reqId update) 
-    
+
+sendCheckpoint :: MVar (Socket Router) -> Revision -> NodeIdentity -> IO ()
+sendCheckpoint sock revision = sendToSlave sock (DoCheckpoint revision)
+
 -- | Receive one Frame. A Frame consists of three messages: 
 --      sender ID, empty message, and actual content 
 receiveFrame :: (Receiver t) => Socket t -> IO (NodeIdentity, SlaveMessage)
@@ -242,7 +248,7 @@ closeMasterState MasterState{..} = do
         -- todo: this could use a timeout, there may be zombies
         -- wait replication chan
         debug "Waiting for repChan to empty."
-        writeChan masterReplicationChan Nothing
+        writeChan masterReplicationChan RIEnd
         mtid <- myThreadId
         putMVar masterRepThreadId mtid
         -- kill handler
@@ -270,7 +276,7 @@ scheduleMasterUpdate masterState@MasterState{..} event = do
                     hd <- scheduleUpdate localState event
                     void $ forkIO (putMVar result =<< takeMVar hd)
             let encoded = runPutLazy (safePut event) 
-            queueUpdate masterState ((methodTag event, encoded), Left callback)
+            queueRepItem masterState (RIUpdate (methodTag event, encoded) (Left callback))
             return result
              
 -- | Remove nodes that were not responsive
@@ -279,9 +285,9 @@ removeLaggingNodes MasterState{..} =
     -- todo: send the node a quit notice
     withMVar masterRevision $ \mr -> modifyMVar_ nodeStatus $ return . M.filter (== mr) 
 
--- | Queue an Update (originating from the Master itself of an Slave via zmq)
-queueUpdate :: MasterState st -> ReplicationItem -> IO ()
-queueUpdate MasterState{..} = writeChan masterReplicationChan . Just
+-- | Queue an RepItem (originating from the Master itself of an Slave via zmq)
+queueRepItem :: MasterState st -> ReplicationItem -> IO ()
+queueRepItem MasterState{..} = writeChan masterReplicationChan
 
 -- | The replication handler. Takes care to run Updates locally in the same
 --   order as sending them out to the network.
@@ -291,10 +297,22 @@ masterReplicationHandler MasterState{..} = do
     putMVar masterRepThreadId mtid
     let loop = do
             debug "Replicating next item."
-            mayEvSi <- readChan masterReplicationChan
-            case mayEvSi of
-                Nothing -> return ()
-                Just (event, sink) -> do
+            repItem <- readChan masterReplicationChan
+            case repItem of
+                RIEnd -> return ()
+                RICheckpoint -> do
+                    debug "Checkpoint on master."
+                    createCheckpoint localState
+                    withMVar nodeStatus $ \ns -> do
+                        debug "Sending Checkpoint Request to Slaves."
+                        -- todo: we must split up revisions into
+                        -- (checkpoints,updates) and only replicate necessary
+                        -- updates after checkpoints on reconnect
+                        withMVar masterRevision $ \mr -> do
+                            forM_ (M.keys ns) $ sendCheckpoint zmqSocket mr
+                            return mr
+                    loop
+                RIUpdate event sink -> do
                     -- todo: temporary only one Chan, no improvement to without chan!
                     -- local part
                     withMVar masterRevision $ \mr ->
@@ -319,13 +337,25 @@ masterReplicationHandler MasterState{..} = do
     -- signal that we're done
     void $ takeMVar masterRepThreadId
 
+-- | Create a checkpoint (on all nodes).
+--   This is useful for faster resume of both the Master (at startup) and
+--   Slaves (at startup and reconnect).
+createMasterCheckpoint :: MasterState st -> IO ()
+createMasterCheckpoint masterState@MasterState{..} = do
+    debug "Checkpoint on Master."
+    -- We need to be careful to ensure that a checkpoint is created from the
+    -- same revision on all nodes. At this time the Core needs to be locked.
+    unlocked <- isEmptyMVar masterStateLock
+    unless unlocked $ error "State is locked."
+    queueRepItem masterState RICheckpoint
+
 toAcidState :: IsAcidic st => MasterState st -> AcidState st
 toAcidState master 
   = AcidState { _scheduleUpdate    = scheduleMasterUpdate master 
               , scheduleColdUpdate = scheduleColdUpdate $ localState master
               , _query             = query $ localState master
               , queryCold          = queryCold $ localState master
-              , createCheckpoint   = undefined
+              , createCheckpoint   = createMasterCheckpoint master
               , createArchive      = undefined
               , closeAcidState     = closeMasterState master 
               , acidSubState       = mkAnyState master
index 24f59f0..8e867e6 100644 (file)
@@ -84,7 +84,7 @@ import qualified Data.ByteString.Lazy.Char8 as CSL
 
 data SlaveState st 
     = SlaveState { slaveLocalState :: AcidState st
-                 , slaveRepChan :: Chan (Maybe SlaveRepItem)
+                 , slaveRepChan :: Chan SlaveRepItem
                  , slaveSyncDone :: Event.Event
                  , slaveRevision :: MVar NodeRevision
                  , slaveRequests :: MVar SlaveRequests
@@ -100,7 +100,10 @@ data SlaveState st
 type SlaveRequests = Map RequestID (IO ())
 
 -- | One Update + Metainformation to replicate.
-type SlaveRepItem = (Revision, Maybe RequestID, Tagged CSL.ByteString)
+data SlaveRepItem =
+      SRIEnd
+    | SRICheckpoint Revision
+    | SRIUpdate Revision (Maybe RequestID) (Tagged CSL.ByteString)
 
 -- | Open a local State as Slave for a Master.
 enslaveState :: (IsAcidic st, Typeable st) =>
@@ -164,13 +167,15 @@ slaveRequestHandler slaveState@SlaveState{..} = do
                      debug $ "Received: " ++ show mmsg
                      case mmsg of
                         -- We are sent an Update to replicate.
-                        DoRep r i d -> queueUpdate slaveState (r, i, d)
+                        DoRep r i d -> queueRepItem slaveState (SRIUpdate r i d)
                         -- We are sent an Update to replicate for synchronization.
                         DoSyncRep r d -> replicateSyncUpdate slaveState r d 
                         -- Master done sending all synchronization Updates.
                         SyncDone c -> onSyncDone slaveState c
+                        -- We are sent an Checkpoint request.
+                        DoCheckpoint r -> queueRepItem slaveState (SRICheckpoint r) 
                         -- We are allowed to Quit.
-                        MayQuit -> writeChan slaveRepChan Nothing
+                        MayQuit -> writeChan slaveRepChan SRIEnd
                         -- We are requested to Quit.
                         MasterQuit -> void $ forkIO $ liberateState slaveState
                         -- no other messages possible
@@ -182,18 +187,17 @@ onSyncDone slaveState@SlaveState{..} crc = do
     localCrc <- crcOfState slaveLocalState
     if crc /= localCrc then do
         putStrLn "Data.Acid.Centered.Slave: CRC mismatch after sync. Exiting."
-        liberateState slaveState
+        void $ forkIO $ liberateState slaveState
     else do
         debug "Sync Done, CRC fine."
         Event.set slaveSyncDone
 
 -- | Queue Updates into Chan for replication.
 -- We use the Chan so Sync-Updates and normal ones can be interleaved.
-queueUpdate :: SlaveState st -> SlaveRepItem -> IO ()
-queueUpdate SlaveState{..} repItem@(rev, _, _) = do
-        debug $ "Queuing Update with revision " ++ show rev
-        --asyncWriteChan slaveRepChan repItem
-        writeChan slaveRepChan (Just repItem)
+queueRepItem :: SlaveState st -> SlaveRepItem -> IO ()
+queueRepItem SlaveState{..} repItem = do
+        debug "Queuing RepItem."
+        writeChan slaveRepChan repItem
 
 -- | Replicates content of Chan.
 slaveReplicationHandler :: SlaveState st -> IO ()
@@ -206,23 +210,26 @@ slaveReplicationHandler slaveState@SlaveState{..} = do
         let loop = do
                 mayRepItem <- readChan slaveRepChan
                 case mayRepItem of
-                    Nothing -> return ()
-                    Just repItem -> do
-                        replicateUpdate slaveState repItem False
+                    SRIEnd -> return ()
+                    SRICheckpoint r -> do
+                        repCheckpoint slaveState r
+                        loop
+                    SRIUpdate r i d -> do
+                        replicateUpdate slaveState r i d False
                         loop
         loop
         -- signal that we're done
         void $ takeMVar slaveRepThreadId
 
 -- | Replicate Sync-Updates directly.
-replicateSyncUpdate slaveState rev event = replicateUpdate slaveState (rev, Nothing, event) True
+replicateSyncUpdate slaveState rev event = replicateUpdate slaveState rev Nothing event True
 
 -- | Replicate an Update as requested by Master.
 --   Updates that were requested by this Slave are run locally and the result
 --   put into the MVar in SlaveRequests.
 --   Other Updates are just replicated without using the result.
-replicateUpdate :: SlaveState st -> SlaveRepItem -> Bool -> IO ()
-replicateUpdate SlaveState{..} (rev, reqId, event) syncing = do
+replicateUpdate :: SlaveState st -> Revision -> Maybe RequestID -> Tagged CSL.ByteString -> Bool -> IO ()
+replicateUpdate SlaveState{..} rev reqId event syncing = do
         debug $ "Got an Update to replicate " ++ show rev
         modifyMVar_ slaveRevision $ \nr -> if rev - 1 == nr 
             then do
@@ -247,6 +254,14 @@ replicateUpdate SlaveState{..} (rev, reqId, event) syncing = do
                                 Left str -> error str
                                 Right val -> val
 
+repCheckpoint :: SlaveState st -> Revision -> IO ()
+repCheckpoint SlaveState{..} rev = do
+    debug "Got Checkpoint request."
+    withMVar slaveRevision $ \nr ->
+        -- create checkpoint
+        createCheckpoint slaveLocalState
+
+
 -- | Update on slave site. 
 --      The steps are:  
 --      - Request Update from Master
index 0f87142..98c43a3 100644 (file)
@@ -5,7 +5,7 @@ import Data.Acid.Centered
 import Data.SafeCopy
 import Data.Typeable
 
-import Control.Monad (forever)
+import Control.Monad (forever, forM_)
 import System.Exit (exitSuccess)
 
 -- state structures
@@ -23,19 +23,27 @@ main = do
                 putStrLn "Bye!"
                 closeAcidState acid
                 exitSuccess
+            ('c':_) -> do
+                createCheckpoint acid
+                putStrLn "Checkpoint generated."
             ('q':_) -> do
                 val <- query acid GetState
                 putStrLn $ "Current value: " ++ show val
             ('u':val) -> do
                 update acid (SetState (read val :: Int))
                 putStrLn "State updated."
-            ('i':_) -> update acid IncrementState
+            ('i':_) -> update acid IncrementState >> putStrLn "State incremented."
+            ('k':_) -> do
+                forM_ [1..1000] $ const $ update acid IncrementState
+                putStrLn "State incremented 1k times."
             _ -> putStrLn "Unknown command." >> putStrLn usage
 
 
 usage :: String
 usage = "Possible commands:\
         \\n  x    exit\
+        \\n  c    checkpoint\
         \\n  q    query the state\
         \\n  u v  update to value v\
-        \\n  i    increment"
+        \\n  i    increment\
+        \\n  k    increment 1000 times"