enhan: cold updates, adapt to changed scheduleLocalColdUpdate'
authorMax Voit <max.voit+gtdv@with-eyes.net>
Mon, 17 Aug 2015 11:57:17 +0000 (13:57 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Mon, 17 Aug 2015 11:57:17 +0000 (13:57 +0200)
src/Data/Acid/Centered/Master.hs
src/Data/Acid/Centered/Slave.hs

index d70c6bf..8067290 100644 (file)
@@ -38,7 +38,7 @@ import Data.Acid.Centered.Common
 
 import Control.Concurrent (forkIO, ThreadId, myThreadId)
 import Control.Concurrent.Chan (Chan, newChan, writeChan, readChan, dupChan)
-import Control.Monad (when, unless, void, forM_, liftM2, liftM)
+import Control.Monad (when, unless, void, forM_, liftM2)
 import Control.Monad.STM (atomically)
 import Control.Concurrent.STM.TVar (readTVar)
 import Control.Concurrent.MVar(MVar, newMVar, newEmptyMVar,
@@ -379,6 +379,25 @@ scheduleMasterUpdate masterState@MasterState{..} event = do
             queueRepItem masterState (RIUpdate (methodTag event, encoded) (Left callback))
             return result
 
+-- | Cold Update on master site.
+scheduleMasterColdUpdate :: Typeable st => MasterState st -> Tagged CSL.ByteString -> IO (MVar CSL.ByteString)
+scheduleMasterColdUpdate masterState@MasterState{..} encoded = do
+        debug "Cold Update by Master."
+        unlocked <- isEmptyMVar masterStateLock
+        if not unlocked then error "State is locked!"
+        else do
+            result <- newEmptyMVar
+            let callback = if repRedundancy > 1
+                then
+                    -- the returned action fills in result when executed later
+                    scheduleLocalColdUpdate' (downcast localState) encoded result
+                else do
+                    hd <- scheduleColdUpdate localState encoded
+                    void $ forkIO (putMVar result =<< takeMVar hd)
+                    return (return ())      -- bogus finalizer
+            queueRepItem masterState (RIUpdate encoded (Left callback))
+            return result
+
 -- | Queue an RepItem (originating from the Master itself of an Slave via zmq)
 queueRepItem :: MasterState st -> ReplicationItem -> IO ()
 queueRepItem MasterState{..} = writeChan masterReplicationChan
@@ -407,7 +426,7 @@ masterReplicationHandlerL MasterState{..} = do
                         (rev, act) <- modifyMVar masterRevision $ \r -> do
                             a <- case sink of
                                 Left callback   -> callback
-                                _               -> liftM snd $ scheduleLocalColdUpdate' (downcast localState) event
+                                _               -> newEmptyMVar >>= scheduleLocalColdUpdate' (downcast localState) event
                             return (r+1,(r+1,a))
                         -- act finalizes the transaction - will be run after full replication
                         modifyMVar_ repFinalizers $ return . IM.insert rev act
@@ -490,7 +509,7 @@ createArchiveGlobally acid = do
 toAcidState :: (IsAcidic st, Typeable st) => MasterState st -> AcidState st
 toAcidState master
   = AcidState { _scheduleUpdate    = scheduleMasterUpdate master
-              , scheduleColdUpdate = scheduleColdUpdate $ localState master
+              , scheduleColdUpdate = scheduleMasterColdUpdate master
               , _query             = query $ localState master
               , queryCold          = queryCold $ localState master
               , createCheckpoint   = createMasterCheckpoint master
index d085427..bc8e4a7 100644 (file)
@@ -47,7 +47,7 @@ import Control.Concurrent.MVar (MVar, newMVar, newEmptyMVar, isEmptyMVar,
                                 withMVar, modifyMVar, modifyMVar_,
                                 takeMVar, putMVar, tryPutMVar)
 import Data.IORef (writeIORef)
-import Control.Monad (void, when, unless, liftM)
+import Control.Monad (void, when, unless)
 import Control.Monad.STM (atomically)
 import Control.Concurrent.STM.TVar (readTVar, writeTVar)
 import qualified Control.Concurrent.Event as Event
@@ -291,7 +291,6 @@ replicateSyncCp SlaveState{..} rev encoded = do
     let core = localCore lst
     modifyMVar_ slaveRevision $ \sr -> do
         when (sr > rev) $ error "Data.Acid.Centered.Slave: Revision mismatch for checkpoint: Slave is newer."
-        -- todo: check
         modifyCoreState_ core $ \_ -> do
             writeIORef (localCopy lst) st
             createCpFake lst encoded rev
@@ -329,7 +328,7 @@ replicateUpdate SlaveState{..} rev reqId event syncing = do
                 case reqId of
                     Nothing -> if slaveStateIsRed
                         then do
-                            act <- liftM snd $ scheduleLocalColdUpdate' (downcast slaveLocalState) event
+                            act <- newEmptyMVar >>= scheduleLocalColdUpdate' (downcast slaveLocalState) event 
                             modifyMVar_ slaveRepFinalizers $ return . IM.insert rev act
                         else
                             void $ scheduleColdUpdate slaveLocalState event
@@ -392,6 +391,28 @@ scheduleSlaveUpdate slaveState@SlaveState{..} event = do
             return $ IM.insert reqId (callback, timeoutID) srs
         return result
 
+-- | Cold Update on slave site. This enables for using Remote.
+scheduleSlaveColdUpdate :: Typeable st => SlaveState st -> Tagged CSL.ByteString -> IO (MVar CSL.ByteString)
+scheduleSlaveColdUpdate slaveState@SlaveState{..} encoded = do
+    unlocked <- isEmptyMVar slaveStateLock
+    if not unlocked then error "State is locked."
+    else do
+        debug "Cold Update by Slave."
+        result <- newEmptyMVar
+        -- slaveLastRequestID is only modified here - and used for locking the state
+        reqId <- modifyMVar slaveLastRequestID $ \x -> return (x+1,x+1)
+        modifyMVar_ slaveRequests $ \srs -> do
+            sendToMaster slaveZmqSocket $ ReqUpdate reqId encoded
+            timeoutID <- forkIO $ timeoutRequest slaveState reqId result
+            let callback = if slaveStateIsRed
+                    then scheduleLocalColdUpdate' (downcast slaveLocalState) encoded result
+                    else do
+                        hd <- scheduleColdUpdate slaveLocalState encoded
+                        void $ forkIO $ putMVar result =<< takeMVar hd
+                        return (return ())      -- bogus finalizer
+            return $ IM.insert reqId (callback, timeoutID) srs
+        return result
+
 -- | Ensures requests are actually answered or fail.
 --   On timeout the Slave dies, not the thread that invoked the Update.
 timeoutRequest :: SlaveState st -> RequestID -> MVar m -> IO ()
@@ -441,7 +462,7 @@ liberateState SlaveState{..} =
 slaveToAcidState :: (IsAcidic st, Typeable st)  => SlaveState st -> AcidState st
 slaveToAcidState slaveState
   = AcidState { _scheduleUpdate    = scheduleSlaveUpdate slaveState
-              , scheduleColdUpdate = undefined
+              , scheduleColdUpdate = scheduleSlaveColdUpdate slaveState
               , _query             = query $ slaveLocalState slaveState
               , queryCold          = queryCold $ slaveLocalState slaveState
               , createCheckpoint   = createCheckpoint $ slaveLocalState slaveState