commit just for debugging state
authorMax Voit <max.voit+gtdv@with-eyes.net>
Sat, 27 Jun 2015 13:47:20 +0000 (15:47 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Sat, 27 Jun 2015 13:47:20 +0000 (15:47 +0200)
acid-state-dist.cabal
src/Data/Acid/Centered/Master.hs
src/Data/Acid/Centered/Slave.hs

index d313588..521e83b 100644 (file)
@@ -71,6 +71,7 @@ library
                        containers,
                        transformers,
                        stm,
+                       semigroups,
                        mtl
   
   -- Directories containing source files.
index 312e1c8..6431ff1 100644 (file)
@@ -52,17 +52,20 @@ import System.ZMQ4 (Context, Socket, Router(..), Receiver, Flag(..),
                     setReceiveHighWM, setSendHighWM, restrict,
                     context, term, socket, close, 
                     bind, unbind,
-                    send, receive)
+                    waitRead,
+                    sendMulti, receiveMulti)
 
 import qualified Data.Map as M
 import Data.Map (Map)
 import qualified Data.ByteString.Lazy.Char8 as CSL
 import qualified Data.ByteString.Char8 as CS
 import Data.ByteString.Char8 (ByteString)
+import qualified Data.List.NonEmpty as NEL
 
 -- auto imports following - need to be cleaned up
 import Control.Concurrent.MVar(MVar, newMVar, newEmptyMVar,
                                takeMVar, putMVar,
+                               readMVar,
                                modifyMVar, modifyMVar_, withMVar)
 
 --------------------------------------------------------------------------------
@@ -74,7 +77,7 @@ data MasterState st
                   , masterReplicationChan :: Chan ReplicationItem
                   , zmqContext :: Context
                   , zmqAddr :: String
-                  , zmqSocket :: Socket Router
+                  , zmqSocket :: MVar (Socket Router)
                   } deriving (Typeable)
 
 type NodeIdentity = ByteString
@@ -87,29 +90,29 @@ type ReplicationItem = (Tagged CSL.ByteString, Either Callback (RequestID, NodeI
 --      o answering as needed (old updates),
 --      o bookkeeping on node states. 
 masterRequestHandler :: (Typeable st) => MasterState st -> IO ()
-masterRequestHandler masterState@MasterState{..} = do
-        let loop = do
-                -- take one frame
-                (ident, msg) <- receiveFrame zmqSocket
-                -- handle according frame contents
-                case msg of
-                    -- New Slave joined.
-                    NewSlave r -> do
-                        pastUpdates <- getPastUpdates localState r
-                        connectNode masterState ident pastUpdates
-                    -- Slave is done replicating.
-                    RepDone r -> updateNodeStatus masterState ident r
-                    -- Slave sends an Udate.
-                    ReqUpdate rid event ->
-                        queueUpdate masterState (event, Right (rid, ident))
-                    -- Slave quits.
-                    SlaveQuit -> removeFromNodeStatus nodeStatus ident
-                    -- no other messages possible
-                    _ -> error $ "Unknown message received: " ++ show msg
-                -- loop around
-                debug "loop iteration"
-                loop
-        loop
+masterRequestHandler masterState@MasterState{..} = forever $ do
+        -- take one frame
+        debug "Waiting for new frame."
+        waitRead =<< readMVar zmqSocket
+        (ident, msg) <- withMVar zmqSocket receiveFrame
+        debug "Got frame."
+        -- handle according frame contents
+        case msg of
+            -- New Slave joined.
+            NewSlave r -> do
+                pastUpdates <- getPastUpdates localState r
+                connectNode masterState ident pastUpdates
+            -- Slave is done replicating.
+            RepDone r -> return () -- updateNodeStatus masterState ident r
+            -- Slave sends an Udate.
+            ReqUpdate rid event ->
+                queueUpdate masterState (event, Right (rid, ident))
+            -- Slave quits.
+            SlaveQuit -> removeFromNodeStatus nodeStatus ident
+            -- no other messages possible
+            _ -> error $ "Unknown message received: " ++ show msg
+        -- loop around
+        debug "Loop iteration."
 
 -- | Fetch past Updates from FileLog for replication.
 getPastUpdates :: (Typeable st) => AcidState st -> Int -> IO [(Int, Tagged CSL.ByteString)]
@@ -146,25 +149,25 @@ connectNode MasterState{..} i pastUpdates =
             return $ M.insert i mr ns 
 
 -- | Send a message to a Slave
-sendToSlave :: Socket Router -> MasterMessage -> NodeIdentity -> IO ()
-sendToSlave sock msg ident = do
-    send sock [SendMore] ident
-    send sock [] $ encode msg
+sendToSlave :: MVar (Socket Router) -> MasterMessage -> NodeIdentity -> IO ()
+sendToSlave msock msg ident = withMVar msock $ \sock -> sendMulti sock $ NEL.fromList [ident, encode msg]
 
 -- | Send one (encoded) Update to a Slave.
-sendSyncUpdate :: Socket Router -> Revision -> Tagged CSL.ByteString -> NodeIdentity -> IO ()
+sendSyncUpdate :: MVar (Socket Router) -> Revision -> Tagged CSL.ByteString -> NodeIdentity -> IO ()
 sendSyncUpdate sock revision update = sendToSlave sock (DoSyncRep revision update) 
     
 -- | Send one (encoded) Update to a Slave.
-sendUpdate :: Socket Router -> Revision -> Maybe RequestID -> Tagged CSL.ByteString -> NodeIdentity -> IO ()
+sendUpdate :: MVar (Socket Router) -> Revision -> Maybe RequestID -> Tagged CSL.ByteString -> NodeIdentity -> IO ()
 sendUpdate sock revision reqId update = sendToSlave sock (DoRep revision reqId update) 
     
 -- | Receive one Frame. A Frame consists of three messages: 
 --      sender ID, empty message, and actual content 
 receiveFrame :: (Receiver t) => Socket t -> IO (NodeIdentity, SlaveMessage)
 receiveFrame sock = do
-    ident <- receive sock
-    msg   <- receive sock
+    list <- receiveMulti sock
+    when (length list /= 2) $ error "Received invalid frame."
+    let ident = head list
+    let msg = list !! 1
     case decode msg of
         -- todo: pass on exceptions
         Left str -> error $ "Data.Serialize.decode failed on SlaveMessage: " ++ show msg
@@ -194,13 +197,14 @@ openMasterState port initialState = do
         setReceiveHighWM (restrict (100*1000)) sock
         setSendHighWM (restrict (100*1000)) sock
         bind sock addr
+        msock <- newMVar sock
         let masterState = MasterState { localState = lst
                                       , nodeStatus = ns
                                       , masterRevision = rev
                                       , masterReplicationChan = repChan
                                       , zmqContext = ctx
                                       , zmqAddr = addr
-                                      , zmqSocket = sock
+                                      , zmqSocket = msock
                                       }
         forkIO $ masterRequestHandler masterState
         forkIO $ masterReplicationHandler masterState
@@ -213,8 +217,9 @@ closeMasterState MasterState{..} = do
         -- wait all nodes done
         -- todo^ - not necessary for now
         -- cleanup zmq
-        unbind zmqSocket zmqAddr 
-        close zmqSocket
+        withMVar zmqSocket $ \sock -> do
+            unbind sock zmqAddr 
+            close sock
         term zmqContext
         -- cleanup local state
         closeAcidState localState
index 61edc70..35c18bc 100644 (file)
@@ -32,7 +32,7 @@ module Data.Acid.Centered.Slave
     (
       enslaveState
     , SlaveState(..)
-    )  where
+    )  where
 
 import Data.Typeable
 import Data.SafeCopy
@@ -89,10 +89,11 @@ data SlaveState st
                  , slaveZmqContext :: Context
                  , slaveZmqAddr :: String
                  , slaveZmqSocket :: Socket Dealer
+                 , slaveZmqSocketLock :: MVar ()
                  } deriving (Typeable)
 
 -- | Memory of own Requests sent to Master.
-type SlaveRequests = Map RequestID (IO (), ThreadId)
+type SlaveRequests = Map RequestID (IO ())
 
 -- | One Update + Metainformation to replicate.
 type SlaveRepItem = (Revision, Maybe RequestID, Tagged CSL.ByteString)
@@ -114,6 +115,7 @@ enslaveState address port initialState = do
         lastReqId <- newMVar 0
         repChan <- newChan
         syncDone <- Event.new
+        sockLock <- newMVar ()
         -- remote
         let addr = "tcp://" ++ address ++ ":" ++ show port
         ctx <- context
@@ -131,6 +133,7 @@ enslaveState address port initialState = do
                                     , slaveZmqContext = ctx
                                     , slaveZmqAddr = addr
                                     , slaveZmqSocket = sock
+                                    , slaveZmqSocketLock = sockLock
                                     }
         forkIO $ slaveRequestHandler slaveState 
         forkIO $ slaveReplicationHandler slaveState 
@@ -142,7 +145,9 @@ slaveRequestHandler slaveState@SlaveState{..} = forever $ do
         msg <- receive slaveZmqSocket
         case decode msg of
             Left str -> error $ "Data.Serialize.decode failed on MasterMessage: " ++ show msg
-            Right mmsg -> case mmsg of
+            Right mmsg -> do
+                 debug $ "Received " ++ show mmsg
+                 case mmsg of
                     -- We are sent an Update to replicate.
                     DoRep r i d -> queueUpdate slaveState (r, i, d)
                     -- We are sent an Update to replicate for synchronization.
@@ -186,14 +191,13 @@ replicateUpdate SlaveState{..} (rev, reqId, event) syncing = do
                         void $ scheduleColdUpdate slaveLocalState event 
                     Just rid -> modifyMVar slaveRequests $ \srs -> do
                         debug $ "This is the Update for Request " ++ show rid
-                        let (icb, tId) = fromMaybe (error $ "Callback not found: " ++ show rid) (M.lookup rid srs) 
+                        let icb = fromMaybe (error $ "Callback not found: " ++ show rid) (M.lookup rid srs) 
                         callback <- icb
-                        killThread tId
                         -- todo: we remember it, clean it up later
-                        let nsrs = M.adjust (\(c, t) -> (return (),tId)) rid srs
+                        let nsrs = M.adjust (\c -> return ()) rid srs
                         return (nsrs, callback) 
                 -- send reply: we're done
-                unless syncing $ sendToMaster slaveZmqSocket $ RepDone rev
+                unless syncing $ withMVar slaveZmqSocketLock $ \_ -> sendToMaster slaveZmqSocket $ RepDone rev
                 return rev
             _  -> do 
                 sendToMaster slaveZmqSocket RepError
@@ -215,20 +219,11 @@ scheduleSlaveUpdate slaveState@SlaveState{..} event = do
         reqId <- modifyMVar slaveLastRequestID $ \x -> return (x+1,x+1)
         modifyMVar_ slaveRequests $ \srs -> do
             let encoded = runPutLazy (safePut event)
-            sendToMaster slaveZmqSocket $ ReqUpdate reqId (methodTag event, encoded)
-            timeoutID <- forkIO $ timeoutRequest slaveState reqId
+            withMVar slaveZmqSocketLock $ \_ -> sendToMaster slaveZmqSocket $ ReqUpdate reqId (methodTag event, encoded)
             let callback = void $ forkIO $ putMVar result =<< takeMVar =<< scheduleUpdate slaveLocalState event 
-            return $ M.insert reqId (callback, timeoutID) srs
+            return $ M.insert reqId callback srs
         return result
 
--- | Checks that requests actually are answered.
-timeoutRequest :: SlaveState st -> RequestID -> IO ()
-timeoutRequest SlaveState{..} reqId = do
-        threadDelay $ 5*1000*1000
-        stillThere <- withMVar slaveRequests (return . M.member reqId)
-        when stillThere $ error "Update-Request was not handled Master."
-
-
 -- | Send a message to Master.
 sendToMaster :: Socket Dealer -> SlaveMessage -> IO ()
 sendToMaster sock smsg = send sock [] $ encode smsg