ref: Master
authorMax Voit <max.voit+gtdv@with-eyes.net>
Tue, 18 Aug 2015 14:04:20 +0000 (16:04 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Tue, 18 Aug 2015 14:04:20 +0000 (16:04 +0200)
src/Data/Acid/Centered/Master.hs

index 75f7c30..6c1ef5c 100644 (file)
@@ -112,24 +112,7 @@ masterRequestHandler masterState@MasterState{..} = do
             re <- withMVar zmqSocket $ \sock -> poll 100 [Sock sock [In] Nothing]
             unless (null $ head re) $ do
                 (ident, msg) <- withMVar zmqSocket receiveFrame
-                -- handle according frame contents
-                case msg of
-                    -- New Slave joined.
-                    NewSlave r -> connectNode masterState ident r
-                    -- Slave is done replicating.
-                    RepDone r -> whenM (identityIsValid ident) $
-                        updateNodeStatus masterState ident r
-                    -- Slave sends an Udate.
-                    ReqUpdate rid event -> whenM (identityIsValid ident) $
-                        queueRepItem masterState (RIUpdate event (Right (rid, ident)))
-                    -- Slave quits.
-                    SlaveQuit -> do
-                        sendToSlave zmqSocket MayQuit ident
-                        removeFromNodeStatus nodeStatus ident
-                    RepError -> do
-                        sendToSlave zmqSocket MayQuit ident
-                        removeFromNodeStatus nodeStatus ident
-                    -- no other messages possible
+                handleMessage ident msg
             loop
     loop
     where
@@ -142,6 +125,23 @@ masterRequestHandler masterState@MasterState{..} = do
                 debug $ "Request by unknown node [" ++ CS.unpack i ++ "]"
                 sendToSlave zmqSocket MayQuit i
                 return False
+        handleMessage i m = case m of
+             -- New Slave joined.
+             NewSlave r -> connectNode masterState i r
+             -- Slave is done replicating.
+             RepDone r -> whenM (identityIsValid i) $
+                 updateNodeStatus masterState i r
+             -- Slave sends an Udate.
+             ReqUpdate rid event -> whenM (identityIsValid i) $
+                 queueRepItem masterState (RIUpdate event (Right (rid, i)))
+             -- Slave quits.
+             SlaveQuit -> do
+                 sendToSlave zmqSocket MayQuit i
+                 removeFromNodeStatus nodeStatus i
+             RepError -> do
+                 sendToSlave zmqSocket MayQuit i
+                 removeFromNodeStatus nodeStatus i
+             -- no other messages possible
 
 -- | Remove a Slave node from NodeStatus.
 removeFromNodeStatus :: MVar NodeStatus -> NodeIdentity -> IO ()
@@ -150,21 +150,24 @@ removeFromNodeStatus nodeStatus ident =
 
 -- | Update the NodeStatus after a node has replicated an Update.
 updateNodeStatus :: MasterState st -> NodeIdentity -> Int -> IO ()
-updateNodeStatus MasterState{..} ident r =
+updateNodeStatus MasterState{..} ident rev =
     modifyMVar_ nodeStatus $ \ns -> do
-        when (ns M.! ident /= (r - 1)) $
-            error $ "Invalid increment of node status " ++ show (ns M.! ident) ++ " -> " ++ show r
+        when (ns M.! ident /= (rev - 1)) $
+            error $ "Invalid increment of node status "
+                ++ show (ns M.! ident) ++ " -> " ++ show rev
         let rns = M.adjust (+1) ident ns
         -- only for redundant operation:
-        when ((repRedundancy > 1) && (M.size (M.filter (>=r) rns) >= (repRedundancy - 1))) $ do
-            debug $ "Full replication of " ++ show r
+        when ((repRedundancy > 1) && (M.size (M.filter (>=rev) rns) >= (repRedundancy - 1))) $ do
+            debug $ "Full replication of " ++ show rev
             -- finalize local replication
-            modifyMVar_ repFinalizers $ \rf -> do
-                rf IM.! r
-                return $ IM.delete r rf
+            runAndDelFinalizer rev
             -- send out FullRep signal
-            forM_ (M.keys ns) $ sendToSlave zmqSocket (FullRep r)
+            forM_ (M.keys ns) $ sendToSlave zmqSocket (FullRep rev)
         return rns
+    where
+        runAndDelFinalizer r = modifyMVar_ repFinalizers $ \rf -> do
+            rf IM.! r
+            return $ IM.delete r rf
 
 -- | Connect a new Slave by getting it up-to-date,
 --   i.e. send all past events as Updates. This is fire&forget.
@@ -175,7 +178,7 @@ connectNode MasterState{..} i revision =
         modifyMVar_ nodeStatus $ \ns -> do
             -- crc generated from localCore thus corresponds to disk
             crc <- crcOfState localState
-            -- if there has been one/more checkpoint in between:
+            -- if there has been a checkpoint in between:
             lastCp <- getLastCheckpointRev localState
             let lastCpRev = cpRevision lastCp
             debug $ "Found checkpoint at revision " ++ show lastCpRev
@@ -188,6 +191,7 @@ connectNode MasterState{..} i revision =
                 -- just the events
                 pastUpdates <- getPastUpdates localState revision
                 forM_ pastUpdates $ \(r, u) -> sendSyncUpdate zmqSocket r u i
+            -- now done, crc
             sendToSlave zmqSocket (SyncDone crc) i
             let nns = M.insert i mr ns
             -- only for redundant operation:
@@ -213,13 +217,13 @@ connectNode MasterState{..} i revision =
 
 -- | Fetch past Updates from FileLog for replication.
 getPastUpdates :: (Typeable st) => AcidState st -> Int -> IO [(Int, Tagged ByteString)]
-getPastUpdates state startRev = liftM2 zip (return [(startRev+1)..]) (readEntriesFrom (localEvents $ downcast state) startRev)
+getPastUpdates state startRev =
+    liftM2 zip (return [(startRev+1)..]) (readEntriesFrom (localEvents $ downcast state) startRev)
 
 -- | Get the revision at which the last checkpoint was taken.
 getLastCheckpointRev :: (Typeable st) => AcidState st -> IO Checkpoint
 getLastCheckpointRev state = do
-    let lst = downcast state
-    let cplog = localCheckpoints lst
+    let cplog = localCheckpoints $ downcast state
     nextId <- atomically $ readTVar $ logNextEntryId cplog
     cps <- readEntriesFrom cplog (nextId - 1)
     return $ headDef (Checkpoint 0 CSL.empty) cps
@@ -228,8 +232,8 @@ getLastCheckpointRev state = do
 sendToSlave :: MVar (Socket Router) -> MasterMessage -> NodeIdentity -> IO ()
 sendToSlave msock msg ident = withMVar msock $ \sock -> sendMulti sock $ NEL.fromList [ident, encode msg]
 
--- | Receive one Frame. A Frame consists of three messages:
---      sender ID, empty message, and actual content
+-- | Receive one Frame. A Frame consists of two messages:
+--      sender ID and actual content
 receiveFrame :: (Receiver t) => Socket t -> IO (NodeIdentity, SlaveMessage)
 receiveFrame sock = do
     list <- receiveMulti sock
@@ -297,6 +301,11 @@ openRedMasterStateFrom directory address port red initialState = do
         repChanN <- dupChan repChan
         repFin <- newMVar IM.empty
         ns <- newMVar M.empty
+        repTidL <- newEmptyMVar
+        repTidN <- newEmptyMVar
+        reqTid <- newEmptyMVar
+        parTid <- myThreadId
+        sLock <- newEmptyMVar
         -- remote
         let addr = "tcp://" ++ address ++ ":" ++ show port
         ctx <- context
@@ -305,16 +314,12 @@ openRedMasterStateFrom directory address port red initialState = do
         setSendHighWM (restrict (100*1000 :: Int)) sock
         bind sock addr
         msock <- newMVar sock
-        repTidL <- newEmptyMVar
-        repTidN <- newEmptyMVar
-        reqTid <- newEmptyMVar
-        parTid <- myThreadId
-        lock <- newEmptyMVar
+
         let masterState = MasterState { localState = lst
                                       , nodeStatus = ns
                                       , repRedundancy = red
                                       , repFinalizers = repFin
-                                      , masterStateLock = lock
+                                      , masterStateLock = sLock
                                       , masterRevision = rev
                                       , masterRevisionN = revN
                                       , masterReplicationChan = repChan
@@ -423,8 +428,7 @@ masterReplicationHandlerL MasterState{..} = do
                     createCheckpoint localState
                     loop
                 RIUpdate event sink -> do
-                    if repRedundancy > 1
-                    then do
+                    if repRedundancy > 1 then do
                         (rev, act) <- modifyMVar masterRevision $ \r -> do
                             a <- case sink of
                                 Left callback   -> callback