more debug commit
authorMax Voit <max.voit+gtdv@with-eyes.net>
Sat, 27 Jun 2015 14:42:51 +0000 (16:42 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Sat, 27 Jun 2015 14:42:51 +0000 (16:42 +0200)
src/Data/Acid/Centered/Slave.hs

index 35c18bc..9770c51 100644 (file)
@@ -52,6 +52,7 @@ import Data.Acid.Centered.Common
 
 import System.ZMQ4 (Context, Socket, Dealer(..), Receiver, Flag(..),
                     setReceiveHighWM, setSendHighWM, restrict,
+                    waitRead,
                     context, term, socket, close, 
                     connect, disconnect,
                     send, receive)
@@ -59,6 +60,7 @@ import System.ZMQ4 (Context, Socket, Dealer(..), Receiver, Flag(..),
 import Control.Concurrent (forkIO, threadDelay, ThreadId, killThread)
 import Control.Concurrent.MVar (MVar, newMVar, newEmptyMVar, 
                                 withMVar, modifyMVar, modifyMVar_,
+                                readMVar,
                                 takeMVar, putMVar)
 import Control.Monad (forever, void,
                       when, unless
@@ -88,8 +90,7 @@ data SlaveState st
                  , slaveLastRequestID :: MVar RequestID
                  , slaveZmqContext :: Context
                  , slaveZmqAddr :: String
-                 , slaveZmqSocket :: Socket Dealer
-                 , slaveZmqSocketLock :: MVar ()
+                 , slaveZmqSocket :: MVar (Socket Dealer)
                  } deriving (Typeable)
 
 -- | Memory of own Requests sent to Master.
@@ -123,7 +124,8 @@ enslaveState address port initialState = do
         setReceiveHighWM (restrict (100*1000)) sock
         setSendHighWM (restrict (100*1000)) sock
         connect sock addr
-        sendToMaster sock $ NewSlave lrev
+        msock <- newMVar sock
+        sendToMaster msock $ NewSlave lrev
         let slaveState = SlaveState { slaveLocalState = lst
                                     , slaveRepChan = repChan
                                     , slaveSyncDone = syncDone
@@ -132,8 +134,7 @@ enslaveState address port initialState = do
                                     , slaveLastRequestID = lastReqId
                                     , slaveZmqContext = ctx
                                     , slaveZmqAddr = addr
-                                    , slaveZmqSocket = sock
-                                    , slaveZmqSocketLock = sockLock
+                                    , slaveZmqSocket = msock
                                     }
         forkIO $ slaveRequestHandler slaveState 
         forkIO $ slaveReplicationHandler slaveState 
@@ -142,7 +143,8 @@ enslaveState address port initialState = do
 -- | Replication handler of the Slave. 
 slaveRequestHandler :: SlaveState st -> IO ()
 slaveRequestHandler slaveState@SlaveState{..} = forever $ do
-        msg <- receive slaveZmqSocket
+        waitRead =<< readMVar slaveZmqSocket
+        msg <- withMVar slaveZmqSocket receive
         case decode msg of
             Left str -> error $ "Data.Serialize.decode failed on MasterMessage: " ++ show msg
             Right mmsg -> do
@@ -192,12 +194,14 @@ replicateUpdate SlaveState{..} (rev, reqId, event) syncing = do
                     Just rid -> modifyMVar slaveRequests $ \srs -> do
                         debug $ "This is the Update for Request " ++ show rid
                         let icb = fromMaybe (error $ "Callback not found: " ++ show rid) (M.lookup rid srs) 
+                        debug "before cb"
                         callback <- icb
+                        debug "after cb"
                         -- todo: we remember it, clean it up later
-                        let nsrs = M.adjust (\c -> return ()) rid srs
+                        let nsrs = M.delete rid srs
                         return (nsrs, callback) 
                 -- send reply: we're done
-                unless syncing $ withMVar slaveZmqSocketLock $ \_ -> sendToMaster slaveZmqSocket $ RepDone rev
+                unless syncing $ sendToMaster slaveZmqSocket $ RepDone rev
                 return rev
             _  -> do 
                 sendToMaster slaveZmqSocket RepError
@@ -219,14 +223,15 @@ scheduleSlaveUpdate slaveState@SlaveState{..} event = do
         reqId <- modifyMVar slaveLastRequestID $ \x -> return (x+1,x+1)
         modifyMVar_ slaveRequests $ \srs -> do
             let encoded = runPutLazy (safePut event)
-            withMVar slaveZmqSocketLock $ \_ -> sendToMaster slaveZmqSocket $ ReqUpdate reqId (methodTag event, encoded)
+            sendToMaster slaveZmqSocket $ ReqUpdate reqId (methodTag event, encoded)
+            debug "after send"
             let callback = void $ forkIO $ putMVar result =<< takeMVar =<< scheduleUpdate slaveLocalState event 
             return $ M.insert reqId callback srs
         return result
 
 -- | Send a message to Master.
-sendToMaster :: Socket Dealer -> SlaveMessage -> IO ()
-sendToMaster sock smsg = send sock [] $ encode smsg
+sendToMaster :: MVar (Socket Dealer) -> SlaveMessage -> IO ()
+sendToMaster msock smsg = withMVar msock $ \sock -> send sock [] (encode smsg)
 
 -- | Close an enslaved State.
 liberateState :: SlaveState st -> IO ()
@@ -235,8 +240,9 @@ liberateState SlaveState{..} = do
         -- send master quit message
         sendToMaster slaveZmqSocket SlaveQuit
         -- cleanup zmq
-        disconnect slaveZmqSocket slaveZmqAddr 
-        close slaveZmqSocket
+        withMVar slaveZmqSocket $ \s -> do
+            disconnect s slaveZmqAddr 
+            close s
         term slaveZmqContext
         -- cleanup local state
         closeAcidState slaveLocalState