enhan: Slave timeout handlers
authorMax Voit <max.voit+gtdv@with-eyes.net>
Thu, 23 Jul 2015 13:46:25 +0000 (15:46 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Thu, 23 Jul 2015 13:46:25 +0000 (15:46 +0200)
makefile
src/Data/Acid/Centered/Slave.hs

index e99dd07..2dda63b 100644 (file)
--- a/makefile
+++ b/makefile
@@ -2,7 +2,7 @@ all-test:
        cabal clean
        cabal configure --enable-test
        cabal build
-       cabal test
+       cabal test | grep --color -C 999 PASS
 
 clean-all-state:
        find . -name state -type d -exec rm -rf {} \;
index 131c76d..9d4ee74 100644 (file)
@@ -42,7 +42,7 @@ import System.ZMQ4 (Context, Socket, Dealer(..),
                     connect, disconnect, send, receive)
 import System.FilePath ( (</>) )
 
-import Control.Concurrent (forkIO, ThreadId, myThreadId, killThread)
+import Control.Concurrent (forkIO, ThreadId, myThreadId, killThread, threadDelay)
 import Control.Concurrent.MVar (MVar, newMVar, newEmptyMVar,
                                 withMVar, modifyMVar, modifyMVar_,
                                 takeMVar, putMVar)
@@ -76,7 +76,7 @@ data SlaveState st
                  } deriving (Typeable)
 
 -- | Memory of own Requests sent to Master.
-type SlaveRequests = Map RequestID (IO ())
+type SlaveRequests = Map RequestID (IO (),ThreadId)
 
 -- | One Update + Metainformation to replicate.
 data SlaveRepItem =
@@ -265,7 +265,9 @@ replicateUpdate SlaveState{..} rev reqId event syncing = do
                         void $ scheduleColdUpdate slaveLocalState event
                     Just rid -> modifyMVar slaveRequests $ \srs -> do
                         debug $ "This is the Update for Request " ++ show rid
-                        callback <- fromMaybe (error $ "Callback not found: " ++ show rid) (M.lookup rid srs)
+                        let (icallback, timeoutId) = fromMaybe (error $ "Callback not found: " ++ show rid) (M.lookup rid srs)
+                        callback <- icallback
+                        killThread timeoutId
                         let nsrs = M.delete rid srs
                         return (nsrs, callback)
                 -- send reply: we're done
@@ -298,7 +300,7 @@ repArchive SlaveState{..} rev = do
 --      - Master issues Update with same RequestID
 --      - repHandler replicates and puts result in MVar
 scheduleSlaveUpdate :: UpdateEvent e => SlaveState (EventState e) -> e -> IO (MVar (EventResult e))
-scheduleSlaveUpdate SlaveState{..} event = do
+scheduleSlaveUpdate slaveState@SlaveState{..} event = do
         debug "Update by Slave."
         result <- newEmptyMVar
         -- slaveLastRequestID is only modified here - and used for locking the state
@@ -306,12 +308,21 @@ scheduleSlaveUpdate SlaveState{..} event = do
         modifyMVar_ slaveRequests $ \srs -> do
             let encoded = runPutLazy (safePut event)
             sendToMaster slaveZmqSocket $ ReqUpdate reqId (methodTag event, encoded)
+            timeoutID <- forkIO $ timeoutRequest slaveState reqId
             let callback = do
                     hd <- scheduleUpdate slaveLocalState event
                     void $ forkIO $ putMVar result =<< takeMVar hd
-            return $ M.insert reqId callback srs
+            return $ M.insert reqId (callback, timeoutID) srs
         return result
 
+-- | Ensures requests are actually answered or fail.
+-- TODO: put error into result-mvar too
+timeoutRequest :: SlaveState st -> RequestID -> IO ()
+timeoutRequest SlaveState{..} reqId = do
+    threadDelay $ 5*1000*1000
+    stillThere <- withMVar slaveRequests (return . M.member reqId)
+    when stillThere $ error "Update-Request timed out."
+
 -- | Send a message to Master.
 sendToMaster :: MVar (Socket Dealer) -> SlaveMessage -> IO ()
 sendToMaster msock smsg = withMVar msock $ \sock -> send sock [] (encode smsg)