data MasterMessage = DoRep Revision (Maybe RequestID) (Tagged CSL.ByteString)
| DoSyncRep Revision (Tagged CSL.ByteString)
| SyncDone Crc
+ | DoCheckpoint Revision
| MayQuit
| MasterQuit
deriving (Show)
DoRep r i d -> putWord8 0 >> put r >> put i >> put d
DoSyncRep r d -> putWord8 1 >> put r >> put d
SyncDone c -> putWord8 2 >> put c
+ DoCheckpoint r -> putWord8 3 >> put r
MayQuit -> putWord8 8
MasterQuit -> putWord8 9
get = do
0 -> liftM3 DoRep get get get
1 -> liftM2 DoSyncRep get get
2 -> liftM SyncDone get
+ 3 -> liftM DoCheckpoint get
8 -> return MayQuit
9 -> return MasterQuit
_ -> error $ "Data.Serialize.get failed for MasterMessage: invalid tag " ++ show tag
, nodeStatus :: MVar NodeStatus
, masterStateLock :: MVar ()
, masterRevision :: MVar NodeRevision
- , masterReplicationChan :: Chan (Maybe ReplicationItem)
+ , masterReplicationChan :: Chan ReplicationItem
, masterReqThreadId :: MVar ThreadId
, masterRepThreadId :: MVar ThreadId
, zmqContext :: Context
type NodeIdentity = ByteString
type NodeStatus = Map NodeIdentity NodeRevision
type Callback = IO ()
-type ReplicationItem = (Tagged CSL.ByteString, Either Callback (RequestID, NodeIdentity))
-
+data ReplicationItem =
+ RIEnd
+ | RICheckpoint
+ | RIUpdate (Tagged CSL.ByteString) (Either Callback (RequestID, NodeIdentity))
+
-- | The request handler on master node. Does
-- o handle receiving requests from nodes,
-- o answering as needed (old updates),
RepDone r -> return () -- updateNodeStatus masterState ident r
-- Slave sends an Udate.
ReqUpdate rid event ->
- queueUpdate masterState (event, Right (rid, ident))
+ queueRepItem masterState (RIUpdate event (Right (rid, ident)))
-- Slave quits.
SlaveQuit -> do
sendToSlave zmqSocket MayQuit ident
-- | Send one (encoded) Update to a Slave.
sendUpdate :: MVar (Socket Router) -> Revision -> Maybe RequestID -> Tagged CSL.ByteString -> NodeIdentity -> IO ()
sendUpdate sock revision reqId update = sendToSlave sock (DoRep revision reqId update)
-
+
+sendCheckpoint :: MVar (Socket Router) -> Revision -> NodeIdentity -> IO ()
+sendCheckpoint sock revision = sendToSlave sock (DoCheckpoint revision)
+
-- | Receive one Frame. A Frame consists of three messages:
-- sender ID, empty message, and actual content
receiveFrame :: (Receiver t) => Socket t -> IO (NodeIdentity, SlaveMessage)
-- todo: this could use a timeout, there may be zombies
-- wait replication chan
debug "Waiting for repChan to empty."
- writeChan masterReplicationChan Nothing
+ writeChan masterReplicationChan RIEnd
mtid <- myThreadId
putMVar masterRepThreadId mtid
-- kill handler
hd <- scheduleUpdate localState event
void $ forkIO (putMVar result =<< takeMVar hd)
let encoded = runPutLazy (safePut event)
- queueUpdate masterState ((methodTag event, encoded), Left callback)
+ queueRepItem masterState (RIUpdate (methodTag event, encoded) (Left callback))
return result
-- | Remove nodes that were not responsive
-- todo: send the node a quit notice
withMVar masterRevision $ \mr -> modifyMVar_ nodeStatus $ return . M.filter (== mr)
--- | Queue an Update (originating from the Master itself of an Slave via zmq)
-queueUpdate :: MasterState st -> ReplicationItem -> IO ()
-queueUpdate MasterState{..} = writeChan masterReplicationChan . Just
+-- | Queue an RepItem (originating from the Master itself of an Slave via zmq)
+queueRepItem :: MasterState st -> ReplicationItem -> IO ()
+queueRepItem MasterState{..} = writeChan masterReplicationChan
-- | The replication handler. Takes care to run Updates locally in the same
-- order as sending them out to the network.
putMVar masterRepThreadId mtid
let loop = do
debug "Replicating next item."
- mayEvSi <- readChan masterReplicationChan
- case mayEvSi of
- Nothing -> return ()
- Just (event, sink) -> do
+ repItem <- readChan masterReplicationChan
+ case repItem of
+ RIEnd -> return ()
+ RICheckpoint -> do
+ debug "Checkpoint on master."
+ createCheckpoint localState
+ withMVar nodeStatus $ \ns -> do
+ debug "Sending Checkpoint Request to Slaves."
+ -- todo: we must split up revisions into
+ -- (checkpoints,updates) and only replicate necessary
+ -- updates after checkpoints on reconnect
+ withMVar masterRevision $ \mr -> do
+ forM_ (M.keys ns) $ sendCheckpoint zmqSocket mr
+ return mr
+ loop
+ RIUpdate event sink -> do
-- todo: temporary only one Chan, no improvement to without chan!
-- local part
withMVar masterRevision $ \mr ->
-- signal that we're done
void $ takeMVar masterRepThreadId
+-- | Create a checkpoint (on all nodes).
+-- This is useful for faster resume of both the Master (at startup) and
+-- Slaves (at startup and reconnect).
+createMasterCheckpoint :: MasterState st -> IO ()
+createMasterCheckpoint masterState@MasterState{..} = do
+ debug "Checkpoint on Master."
+ -- We need to be careful to ensure that a checkpoint is created from the
+ -- same revision on all nodes. At this time the Core needs to be locked.
+ unlocked <- isEmptyMVar masterStateLock
+ unless unlocked $ error "State is locked."
+ queueRepItem masterState RICheckpoint
+
toAcidState :: IsAcidic st => MasterState st -> AcidState st
toAcidState master
= AcidState { _scheduleUpdate = scheduleMasterUpdate master
, scheduleColdUpdate = scheduleColdUpdate $ localState master
, _query = query $ localState master
, queryCold = queryCold $ localState master
- , createCheckpoint = undefined
+ , createCheckpoint = createMasterCheckpoint master
, createArchive = undefined
, closeAcidState = closeMasterState master
, acidSubState = mkAnyState master
data SlaveState st
= SlaveState { slaveLocalState :: AcidState st
- , slaveRepChan :: Chan (Maybe SlaveRepItem)
+ , slaveRepChan :: Chan SlaveRepItem
, slaveSyncDone :: Event.Event
, slaveRevision :: MVar NodeRevision
, slaveRequests :: MVar SlaveRequests
type SlaveRequests = Map RequestID (IO ())
-- | One Update + Metainformation to replicate.
-type SlaveRepItem = (Revision, Maybe RequestID, Tagged CSL.ByteString)
+data SlaveRepItem =
+ SRIEnd
+ | SRICheckpoint Revision
+ | SRIUpdate Revision (Maybe RequestID) (Tagged CSL.ByteString)
-- | Open a local State as Slave for a Master.
enslaveState :: (IsAcidic st, Typeable st) =>
debug $ "Received: " ++ show mmsg
case mmsg of
-- We are sent an Update to replicate.
- DoRep r i d -> queueUpdate slaveState (r, i, d)
+ DoRep r i d -> queueRepItem slaveState (SRIUpdate r i d)
-- We are sent an Update to replicate for synchronization.
DoSyncRep r d -> replicateSyncUpdate slaveState r d
-- Master done sending all synchronization Updates.
SyncDone c -> onSyncDone slaveState c
+ -- We are sent an Checkpoint request.
+ DoCheckpoint r -> queueRepItem slaveState (SRICheckpoint r)
-- We are allowed to Quit.
- MayQuit -> writeChan slaveRepChan Nothing
+ MayQuit -> writeChan slaveRepChan SRIEnd
-- We are requested to Quit.
MasterQuit -> void $ forkIO $ liberateState slaveState
-- no other messages possible
localCrc <- crcOfState slaveLocalState
if crc /= localCrc then do
putStrLn "Data.Acid.Centered.Slave: CRC mismatch after sync. Exiting."
- liberateState slaveState
+ void $ forkIO $ liberateState slaveState
else do
debug "Sync Done, CRC fine."
Event.set slaveSyncDone
-- | Queue Updates into Chan for replication.
-- We use the Chan so Sync-Updates and normal ones can be interleaved.
-queueUpdate :: SlaveState st -> SlaveRepItem -> IO ()
-queueUpdate SlaveState{..} repItem@(rev, _, _) = do
- debug $ "Queuing Update with revision " ++ show rev
- --asyncWriteChan slaveRepChan repItem
- writeChan slaveRepChan (Just repItem)
+queueRepItem :: SlaveState st -> SlaveRepItem -> IO ()
+queueRepItem SlaveState{..} repItem = do
+ debug "Queuing RepItem."
+ writeChan slaveRepChan repItem
-- | Replicates content of Chan.
slaveReplicationHandler :: SlaveState st -> IO ()
let loop = do
mayRepItem <- readChan slaveRepChan
case mayRepItem of
- Nothing -> return ()
- Just repItem -> do
- replicateUpdate slaveState repItem False
+ SRIEnd -> return ()
+ SRICheckpoint r -> do
+ repCheckpoint slaveState r
+ loop
+ SRIUpdate r i d -> do
+ replicateUpdate slaveState r i d False
loop
loop
-- signal that we're done
void $ takeMVar slaveRepThreadId
-- | Replicate Sync-Updates directly.
-replicateSyncUpdate slaveState rev event = replicateUpdate slaveState (rev, Nothing, event) True
+replicateSyncUpdate slaveState rev event = replicateUpdate slaveState rev Nothing event True
-- | Replicate an Update as requested by Master.
-- Updates that were requested by this Slave are run locally and the result
-- put into the MVar in SlaveRequests.
-- Other Updates are just replicated without using the result.
-replicateUpdate :: SlaveState st -> SlaveRepItem -> Bool -> IO ()
-replicateUpdate SlaveState{..} (rev, reqId, event) syncing = do
+replicateUpdate :: SlaveState st -> Revision -> Maybe RequestID -> Tagged CSL.ByteString -> Bool -> IO ()
+replicateUpdate SlaveState{..} rev reqId event syncing = do
debug $ "Got an Update to replicate " ++ show rev
modifyMVar_ slaveRevision $ \nr -> if rev - 1 == nr
then do
Left str -> error str
Right val -> val
+repCheckpoint :: SlaveState st -> Revision -> IO ()
+repCheckpoint SlaveState{..} rev = do
+ debug "Got Checkpoint request."
+ withMVar slaveRevision $ \nr ->
+ -- create checkpoint
+ createCheckpoint slaveLocalState
+
+
-- | Update on slave site.
-- The steps are:
-- - Request Update from Master
import Data.SafeCopy
import Data.Typeable
-import Control.Monad (forever)
+import Control.Monad (forever, forM_)
import System.Exit (exitSuccess)
-- state structures
putStrLn "Bye!"
closeAcidState acid
exitSuccess
+ ('c':_) -> do
+ createCheckpoint acid
+ putStrLn "Checkpoint generated."
('q':_) -> do
val <- query acid GetState
putStrLn $ "Current value: " ++ show val
('u':val) -> do
update acid (SetState (read val :: Int))
putStrLn "State updated."
- ('i':_) -> update acid IncrementState
+ ('i':_) -> update acid IncrementState >> putStrLn "State incremented."
+ ('k':_) -> do
+ forM_ [1..1000] $ const $ update acid IncrementState
+ putStrLn "State incremented 1k times."
_ -> putStrLn "Unknown command." >> putStrLn usage
usage :: String
usage = "Possible commands:\
\\n x exit\
+ \\n c checkpoint\
\\n q query the state\
\\n u v update to value v\
- \\n i increment"
+ \\n i increment\
+ \\n k increment 1000 times"