ref: Slave
authorMax Voit <max.voit+gtdv@with-eyes.net>
Tue, 18 Aug 2015 13:30:18 +0000 (15:30 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Tue, 18 Aug 2015 13:30:18 +0000 (15:30 +0200)
src/Data/Acid/Centered/Slave.hs

index a81791c..57686dc 100644 (file)
@@ -56,7 +56,6 @@ import System.ZMQ4 (Context, Socket, Dealer(..),
 import System.FilePath ( (</>) )
 
 import Data.ByteString.Lazy.Char8 (ByteString)
-import Data.Maybe (fromMaybe)
 
 import           Data.IntMap (IntMap)
 import qualified Data.IntMap as IM
@@ -148,8 +147,7 @@ enslaveMayRedStateFrom :: (IsAcidic st, Typeable st) =>
 enslaveMayRedStateFrom isRed directory address port initialState = do
         -- local
         lst <- openLocalStateFrom directory initialState
-        let levs = localEvents $ downcast lst
-        lrev <- atomically $ readTVar $ logNextEntryId levs
+        lrev <- getLocalRevision lst
         rev <- newMVar lrev
         debug $ "Opening enslaved state at revision " ++ show lrev
         srs <- newMVar IM.empty
@@ -160,7 +158,7 @@ enslaveMayRedStateFrom isRed directory address port initialState = do
         repTid <- newEmptyMVar
         parTid <- myThreadId
         repFin <- newMVar IM.empty
-        lock <- newEmptyMVar
+        sLock <- newEmptyMVar
         -- remote
         let addr = "tcp://" ++ address ++ ":" ++ show port
         ctx <- context
@@ -170,9 +168,10 @@ enslaveMayRedStateFrom isRed directory address port initialState = do
         connect sock addr
         msock <- newMVar sock
         sendToMaster msock $ NewSlave lrev
+
         let slaveState = SlaveState { slaveLocalState = lst
                                     , slaveStateIsRed = isRed
-                                    , slaveStateLock = lock
+                                    , slaveStateLock = sLock
                                     , slaveRepFinalizers = repFin
                                     , slaveRepChan = repChan
                                     , slaveSyncDone = syncDone
@@ -189,6 +188,9 @@ enslaveMayRedStateFrom isRed directory address port initialState = do
         void $ forkIOWithUnmask $ slaveRequestHandler slaveState
         void $ forkIO $ slaveReplicationHandler slaveState
         return $ slaveToAcidState slaveState
+        where
+            getLocalRevision =
+                atomically . readTVar . logNextEntryId . localEvents . downcast
 
 -- | Replication handler of the Slave.
 slaveRequestHandler :: (IsAcidic st, Typeable st) => SlaveState st -> (IO () -> IO ()) -> IO ()
@@ -204,42 +206,43 @@ slaveRequestHandler slaveState@SlaveState{..} unmask = do
                 msg <- withMVar slaveZmqSocket receive
                 case decode msg of
                     Left str -> error $ "Data.Serialize.decode failed on MasterMessage: " ++ show str
-                    Right mmsg -> do
-                         debug $ "Received: " ++ show mmsg
-                         case mmsg of
-                            -- We are sent an Update to replicate.
-                            DoRep r i d -> queueRepItem slaveState (SRIUpdate r i d)
-                            -- We are sent a Checkpoint for synchronization.
-                            DoSyncCheckpoint r d -> replicateSyncCp slaveState r d
-                            -- We are sent an Update to replicate for synchronization.
-                            DoSyncRep r d -> replicateSyncUpdate slaveState r d
-                            -- Master done sending all synchronization Updates.
-                            SyncDone c -> onSyncDone slaveState c
-                            -- We are sent a Checkpoint request.
-                            DoCheckpoint r -> queueRepItem slaveState (SRICheckpoint r)
-                            -- We are sent an Archive request.
-                            DoArchive r -> queueRepItem slaveState (SRIArchive r)
-                            -- Full replication of a revision
-                            FullRep r -> modifyMVar_ slaveRepFinalizers $ \rf -> do
-                                            rf IM.! r
-                                            return $ IM.delete r rf
-                            -- Full replication of events up to revision
-                            FullRepTo r -> modifyMVar_ slaveRepFinalizers $ \rf -> do
-                                            let (ef, nrf) = IM.partitionWithKey (\k _ -> k <= r) rf
-                                            sequence_ (IM.elems ef)
-                                            return nrf
-                            -- We are allowed to Quit.
-                            MayQuit -> writeChan slaveRepChan SRIEnd
-                            -- We are requested to Quit - shall be handled by
-                            -- 'bracket' usage by user.
-                            MasterQuit -> throwTo slaveParentThreadId $
-                                ErrorCall "Data.Acid.Centered.Slave: Master quit."
-                            -- no other messages possible, enforced by type checker
+                    Right mmsg -> handleMessage mmsg
             loop
     loop
     where
         killHandler :: AcidException -> IO ()
         killHandler GracefulExit = return ()
+        handleMessage m = do
+            debug $ "Received: " ++ show m
+            case m of
+               -- We are sent an Update to replicate.
+               DoRep r i d -> queueRepItem slaveState (SRIUpdate r i d)
+               -- We are sent a Checkpoint for synchronization.
+               DoSyncCheckpoint r d -> replicateSyncCp slaveState r d
+               -- We are sent an Update to replicate for synchronization.
+               DoSyncRep r d -> replicateSyncUpdate slaveState r d
+               -- Master done sending all synchronization Updates.
+               SyncDone c -> onSyncDone slaveState c
+               -- We are sent a Checkpoint request.
+               DoCheckpoint r -> queueRepItem slaveState (SRICheckpoint r)
+               -- We are sent an Archive request.
+               DoArchive r -> queueRepItem slaveState (SRIArchive r)
+               -- Full replication of a revision
+               FullRep r -> modifyMVar_ slaveRepFinalizers $ \rf -> do
+                               rf IM.! r
+                               return $ IM.delete r rf
+               -- Full replication of events up to revision
+               FullRepTo r -> modifyMVar_ slaveRepFinalizers $ \rf -> do
+                               let (ef, nrf) = IM.partitionWithKey (\k _ -> k <= r) rf
+                               sequence_ (IM.elems ef)
+                               return nrf
+               -- We are allowed to Quit.
+               MayQuit -> writeChan slaveRepChan SRIEnd
+               -- We are requested to Quit - shall be handled by
+               -- 'bracket' usage by user.
+               MasterQuit -> throwTo slaveParentThreadId $
+                   ErrorCall "Data.Acid.Centered.Slave: Master quit."
+               -- no other messages possible, enforced by type checker
 
 -- | After sync check CRC
 onSyncDone :: (IsAcidic st, Typeable st) => SlaveState st -> Crc -> IO ()
@@ -263,23 +266,21 @@ slaveReplicationHandler :: Typeable st => SlaveState st -> IO ()
 slaveReplicationHandler slaveState@SlaveState{..} = do
         mtid <- myThreadId
         putMVar slaveRepThreadId mtid
-        -- todo: timeout is magic variable, make customizable?
+
+        -- todo: timeout is magic variable, make customizable
         noTimeout <- Event.waitTimeout slaveSyncDone $ 10*1000*1000
-        unless noTimeout $ throwTo slaveParentThreadId $ ErrorCall "Data.Acid.Centered.Slave: Took too long to sync. Timeout."
+        unless noTimeout $ throwTo slaveParentThreadId $
+            ErrorCall "Data.Acid.Centered.Slave: Took too long to sync. Timeout."
+
         let loop = handle (\e -> throwTo slaveParentThreadId (e :: SomeException)) $ do
                 mayRepItem <- readChan slaveRepChan
                 case mayRepItem of
-                    SRIEnd -> return ()
-                    SRICheckpoint r -> do
-                        repCheckpoint slaveState r
-                        loop
-                    SRIArchive r -> do
-                        repArchive slaveState r
-                        loop
-                    SRIUpdate r i d -> do
-                        replicateUpdate slaveState r i d False
-                        loop
+                    SRIEnd          -> return ()
+                    SRICheckpoint r -> repCheckpoint slaveState r               >> loop
+                    SRIArchive r    -> repArchive slaveState r                  >> loop
+                    SRIUpdate r i d -> replicateUpdate slaveState r i d False   >> loop
         loop
+
         -- signal that we're done
         void $ takeMVar slaveRepThreadId
 
@@ -327,29 +328,34 @@ replicateUpdate SlaveState{..} rev reqId event syncing = do
             then do
                 -- commit / run it locally
                 case reqId of
-                    Nothing -> if slaveStateIsRed
-                        then do
-                            act <- newEmptyMVar >>= scheduleLocalColdUpdate' (downcast slaveLocalState) event 
-                            modifyMVar_ slaveRepFinalizers $ return . IM.insert rev act
-                        else
-                            void $ scheduleColdUpdate slaveLocalState event
-                    Just rid -> do
-                        act <- modifyMVar slaveRequests $ \srs -> do
-                            debug $ "This is the Update for Request " ++ show rid
-                            let (icallback, timeoutId) = fromMaybe (error $ "Data.Acid.Centered.Slave: Callback not found: " ++ show rid) (IM.lookup rid srs)
-                            callback <- icallback
-                            killThread timeoutId
-                            let nsrs = IM.delete rid srs
-                            return (nsrs, callback)
-                        when slaveStateIsRed $
-                            modifyMVar_ slaveRepFinalizers $ return . IM.insert rev act
+                    Nothing -> replicateForeign
+                    Just rid -> replicateOwn rid
                 -- send reply: we're done
                 unless syncing $ sendToMaster slaveZmqSocket $ RepDone rev
                 return rev
             else do
                 sendToMaster slaveZmqSocket RepError
-                void $ error $ "Data.Acid.Centered.Slave: Replication failed at revision " ++ show nr ++ " -> " ++ show rev
+                void $ error $
+                    "Data.Acid.Centered.Slave: Replication failed at revision "
+                        ++ show nr ++ " -> " ++ show rev
                 return nr
+        where
+            replicateForeign =
+                if slaveStateIsRed then do
+                    act <- newEmptyMVar >>= scheduleLocalColdUpdate' (downcast slaveLocalState) event
+                    modifyMVar_ slaveRepFinalizers $ return . IM.insert rev act
+                else
+                    void $ scheduleColdUpdate slaveLocalState event
+            replicateOwn rid = do
+                act <- modifyMVar slaveRequests $ \srs -> do
+                    debug $ "This is the Update for Request " ++ show rid
+                    let (icallback, timeoutId) = srs IM.! rid
+                    callback <- icallback
+                    killThread timeoutId
+                    let nsrs = IM.delete rid srs
+                    return (nsrs, callback)
+                when slaveStateIsRed $
+                    modifyMVar_ slaveRepFinalizers $ return . IM.insert rev act
 
 repCheckpoint :: SlaveState st -> Revision -> IO ()
 repCheckpoint SlaveState{..} rev = do
@@ -377,8 +383,7 @@ scheduleSlaveUpdate slaveState@SlaveState{..} event = do
     else do
         debug "Update by Slave."
         result <- newEmptyMVar
-        -- slaveLastRequestID is only modified here - and used for locking the state
-        reqId <- modifyMVar slaveLastRequestID $ \x -> return (x+1,x+1)
+        reqId <- getNextRequestId slaveState
         modifyMVar_ slaveRequests $ \srs -> do
             let encoded = runPutLazy (safePut event)
             sendToMaster slaveZmqSocket $ ReqUpdate reqId (methodTag event, encoded)
@@ -401,7 +406,7 @@ scheduleSlaveColdUpdate slaveState@SlaveState{..} encoded = do
         debug "Cold Update by Slave."
         result <- newEmptyMVar
         -- slaveLastRequestID is only modified here - and used for locking the state
-        reqId <- modifyMVar slaveLastRequestID $ \x -> return (x+1,x+1)
+        reqId <- getNextRequestId slaveState
         modifyMVar_ slaveRequests $ \srs -> do
             sendToMaster slaveZmqSocket $ ReqUpdate reqId encoded
             timeoutID <- forkIO $ timeoutRequest slaveState reqId result
@@ -414,6 +419,10 @@ scheduleSlaveColdUpdate slaveState@SlaveState{..} encoded = do
             return $ IM.insert reqId (callback, timeoutID) srs
         return result
 
+-- | Generate ID for another request.
+getNextRequestId :: SlaveState st -> IO RequestID
+getNextRequestId SlaveState{..} = modifyMVar slaveLastRequestID $ \x -> return (x+1,x+1)
+
 -- | Ensures requests are actually answered or fail.
 --   On timeout the Slave dies, not the thread that invoked the Update.
 timeoutRequest :: SlaveState st -> RequestID -> MVar m -> IO ()