debug try: fetch result in other thread, higher HWM
authorMax Voit <max.voit+gtdv@with-eyes.net>
Thu, 25 Jun 2015 13:28:43 +0000 (15:28 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Thu, 25 Jun 2015 13:28:43 +0000 (15:28 +0200)
src/Data/Acid/Centered/Master.hs
src/Data/Acid/Centered/Slave.hs

index 446da1a..312e1c8 100644 (file)
@@ -49,6 +49,7 @@ import Control.Monad.STM (atomically)
 import Control.Concurrent.STM.TVar (readTVar)
 
 import System.ZMQ4 (Context, Socket, Router(..), Receiver, Flag(..),
+                    setReceiveHighWM, setSendHighWM, restrict,
                     context, term, socket, close, 
                     bind, unbind,
                     send, receive)
@@ -184,12 +185,14 @@ openMasterState port initialState = do
         let levs = localEvents $ downcast lst
         lrev <- atomically $ readTVar $ logNextEntryId levs
         rev <- newMVar lrev
-        -- remote
-        ctx <- context
-        sock <- socket ctx Router
         repChan <- newChan
         ns <- newMVar M.empty
+        -- remote
         let addr = "tcp://127.0.0.1:" ++ show port
+        ctx <- context
+        sock <- socket ctx Router
+        setReceiveHighWM (restrict (100*1000)) sock
+        setSendHighWM (restrict (100*1000)) sock
         bind sock addr
         let masterState = MasterState { localState = lst
                                       , nodeStatus = ns
@@ -222,7 +225,7 @@ scheduleMasterUpdate :: UpdateEvent event => MasterState (EventState event) -> e
 scheduleMasterUpdate masterState@MasterState{..} event = do
         debug "Update by Master."
         result <- newEmptyMVar 
-        let callback = putMVar result =<< takeMVar =<< scheduleUpdate localState event 
+        let callback = void $ forkIO (putMVar result =<< takeMVar =<< scheduleUpdate localState event)
         let encoded = runPutLazy (safePut event) 
         queueUpdate masterState ((methodTag event, encoded), Left callback)
         return result
index dd76b99..61edc70 100644 (file)
@@ -51,6 +51,7 @@ import Data.Acid.Log
 import Data.Acid.Centered.Common
 
 import System.ZMQ4 (Context, Socket, Dealer(..), Receiver, Flag(..),
+                    setReceiveHighWM, setSendHighWM, restrict,
                     context, term, socket, close, 
                     connect, disconnect,
                     send, receive)
@@ -108,15 +109,17 @@ enslaveState address port initialState = do
         let levs = localEvents $ downcast lst
         lrev <- atomically $ readTVar $ logNextEntryId levs
         rev <- newMVar lrev
-        -- remote
         debug $ "Opening enslaved state at revision " ++ show lrev
-        ctx <- context
-        sock <- socket ctx Dealer
         srs <- newMVar M.empty
         lastReqId <- newMVar 0
         repChan <- newChan
         syncDone <- Event.new
+        -- remote
         let addr = "tcp://" ++ address ++ ":" ++ show port
+        ctx <- context
+        sock <- socket ctx Dealer
+        setReceiveHighWM (restrict (100*1000)) sock
+        setSendHighWM (restrict (100*1000)) sock
         connect sock addr
         sendToMaster sock $ NewSlave lrev
         let slaveState = SlaveState { slaveLocalState = lst
@@ -214,7 +217,7 @@ scheduleSlaveUpdate slaveState@SlaveState{..} event = do
             let encoded = runPutLazy (safePut event)
             sendToMaster slaveZmqSocket $ ReqUpdate reqId (methodTag event, encoded)
             timeoutID <- forkIO $ timeoutRequest slaveState reqId
-            let callback = putMVar result =<< takeMVar =<< scheduleUpdate slaveLocalState event 
+            let callback = void $ forkIO $ putMVar result =<< takeMVar =<< scheduleUpdate slaveLocalState event 
             return $ M.insert reqId (callback, timeoutID) srs
         return result