slave quit fixed, fine
authorMax Voit <max.voit+gtdv@with-eyes.net>
Wed, 15 Jul 2015 16:50:14 +0000 (18:50 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Wed, 15 Jul 2015 16:50:14 +0000 (18:50 +0200)
acid-state-dist.cabal
src/Data/Acid/Centered/Common.hs
src/Data/Acid/Centered/Master.hs
src/Data/Acid/Centered/Slave.hs

index 96e5d32..521e83b 100644 (file)
@@ -72,8 +72,7 @@ library
                        transformers,
                        stm,
                        semigroups,
-                       mtl,
-                       synchronous-channels
+                       mtl
   
   -- Directories containing source files.
   hs-source-dirs:      src
index d35580c..5d2ebb9 100644 (file)
@@ -15,7 +15,6 @@ module Data.Acid.Centered.Common
     (
       debug
     , waitPoll
-    , addMyThreadId
     , crcOfState
     , Crc
     , NodeRevision
@@ -80,6 +79,7 @@ debug = L.with debugLock . hPutStrLn stderr
 data MasterMessage = DoRep Revision (Maybe RequestID) (Tagged CSL.ByteString)
                    | DoSyncRep Revision (Tagged CSL.ByteString)
                    | SyncDone Crc
+                   | MayQuit
                    | MasterQuit
                   deriving (Show)
 
@@ -95,6 +95,7 @@ instance Serialize MasterMessage where
         DoRep r i d   -> putWord8 0 >> put r >> put i >> put d
         DoSyncRep r d -> putWord8 1 >> put r >> put d
         SyncDone c    -> putWord8 2 >> put c
+        MayQuit       -> putWord8 8
         MasterQuit    -> putWord8 9
     get = do 
         tag <- getWord8
@@ -102,6 +103,7 @@ instance Serialize MasterMessage where
             0 -> liftM3 DoRep get get get
             1 -> liftM2 DoSyncRep get get
             2 -> liftM SyncDone get
+            8 -> return MayQuit
             9 -> return MasterQuit
             _ -> error $ "Data.Serialize.get failed for MasterMessage: invalid tag " ++ show tag
 
@@ -130,12 +132,6 @@ crcOfState state = do
         let encoded = runPutLazy (safePut st)
         return $ crc16 encoded
 
--- | Add own threadId to a list of such.
-addMyThreadId :: [ThreadId] -> IO [ThreadId]
-addMyThreadId ts = do
-        nt <- myThreadId
-        return (nt:ts)
-
 -- | By polling, wait until predicate true.
 waitPoll :: Int -> IO Bool -> IO ()
 waitPoll t p = p >>= \e -> unless e $ threadDelay t >> waitPoll t p
index ce22e84..4f5839d 100644 (file)
@@ -110,7 +110,9 @@ masterRequestHandler masterState@MasterState{..} = forever $ do
                 ReqUpdate rid event ->
                     queueUpdate masterState (event, Right (rid, ident))
                 -- Slave quits.
-                SlaveQuit -> removeFromNodeStatus nodeStatus ident
+                SlaveQuit -> do
+                    removeFromNodeStatus nodeStatus ident
+                    sendToSlave zmqSocket MayQuit ident
                 -- no other messages possible
                 _ -> error $ "Unknown message received: " ++ show msg
             -- loop around
index 0a5c242..1d69180 100644 (file)
@@ -70,7 +70,7 @@ import Control.Monad (forever, void,
 import Control.Monad.STM (atomically)
 import Control.Concurrent.STM.TVar (readTVar)
 import qualified Control.Concurrent.Event as Event
-import Control.Concurrent.Chan.Synchronous (Chan, newChan, isEmptyChan, readChan, asyncWriteChan)
+import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
 
 import Data.Map (Map)
 import qualified Data.Map as M
@@ -84,12 +84,13 @@ import qualified Data.ByteString.Lazy.Char8 as CSL
 
 data SlaveState st 
     = SlaveState { slaveLocalState :: AcidState st
-                 , slaveRepChan :: Chan SlaveRepItem
+                 , slaveRepChan :: Chan (Maybe SlaveRepItem)
                  , slaveSyncDone :: Event.Event
                  , slaveRevision :: MVar NodeRevision
                  , slaveRequests :: MVar SlaveRequests
                  , slaveLastRequestID :: MVar RequestID
-                 , slaveThreads :: MVar [ThreadId]
+                 , slaveRepThreadId :: MVar ThreadId
+                 , slaveReqThreadId :: MVar ThreadId
                  , slaveZmqContext :: Context
                  , slaveZmqAddr :: String
                  , slaveZmqSocket :: MVar (Socket Dealer)
@@ -119,7 +120,8 @@ enslaveState address port initialState = do
         repChan <- newChan
         syncDone <- Event.new
         sockLock <- newMVar ()
-        threads <- newMVar []
+        reqTid <- newEmptyMVar
+        repTid <- newEmptyMVar
         -- remote
         let addr = "tcp://" ++ address ++ ":" ++ show port
         ctx <- context
@@ -135,7 +137,8 @@ enslaveState address port initialState = do
                                     , slaveRevision = rev
                                     , slaveRequests = srs
                                     , slaveLastRequestID = lastReqId
-                                    , slaveThreads = threads
+                                    , slaveReqThreadId = reqTid
+                                    , slaveRepThreadId = repTid
                                     , slaveZmqContext = ctx
                                     , slaveZmqAddr = addr
                                     , slaveZmqSocket = msock
@@ -147,7 +150,8 @@ enslaveState address port initialState = do
 -- | Replication handler of the Slave. 
 slaveRequestHandler :: (IsAcidic st, Typeable st) => SlaveState st -> IO ()
 slaveRequestHandler slaveState@SlaveState{..} = do
-    modifyMVar_ slaveThreads addMyThreadId
+    mtid <- myThreadId
+    putMVar slaveReqThreadId mtid
     forever $ do
         --waitRead =<< readMVar slaveZmqSocket
         -- FIXME: we needn't poll if not for strange zmq behaviour
@@ -165,6 +169,8 @@ slaveRequestHandler slaveState@SlaveState{..} = do
                         DoSyncRep r d -> replicateSyncUpdate slaveState r d 
                         -- Master done sending all synchronization Updates.
                         SyncDone c -> onSyncDone slaveState c
+                        -- We are allowed to Quit.
+                        MayQuit -> writeChan slaveRepChan Nothing
                         -- We are requested to Quit.
                         MasterQuit -> undefined -- todo: how get a State that wasn't closed closed?
                         -- no other messages possible
@@ -186,18 +192,27 @@ onSyncDone slaveState@SlaveState{..} crc = do
 queueUpdate :: SlaveState st -> SlaveRepItem -> IO ()
 queueUpdate SlaveState{..} repItem@(rev, _, _) = do
         debug $ "Queuing Update with revision " ++ show rev
-        asyncWriteChan slaveRepChan repItem
+        --asyncWriteChan slaveRepChan repItem
+        writeChan slaveRepChan (Just repItem)
 
 -- | Replicates content of Chan.
 slaveReplicationHandler :: SlaveState st -> IO ()
 slaveReplicationHandler slaveState@SlaveState{..} = do
-        modifyMVar_ slaveThreads addMyThreadId
+        mtid <- myThreadId
+        putMVar slaveRepThreadId mtid
         -- todo: timeout is magic variable, make customizable?
         noTimeout <- Event.waitTimeout slaveSyncDone $ 10*1000*1000
         unless noTimeout $ error "Slave took too long to sync, ran into timeout."
-        forever $ do
-            repItem <- readChan slaveRepChan
-            replicateUpdate slaveState repItem False
+        let loop = do
+                mayRepItem <- readChan slaveRepChan
+                case mayRepItem of
+                    Nothing -> return ()
+                    Just repItem -> do
+                        replicateUpdate slaveState repItem False
+                        loop
+        loop
+        -- signal that we're done
+        void $ takeMVar slaveRepThreadId
 
 -- | Replicate Sync-Updates directly.
 replicateSyncUpdate slaveState rev event = replicateUpdate slaveState (rev, Nothing, event) True
@@ -270,15 +285,11 @@ liberateState SlaveState{..} = do
         sendToMaster slaveZmqSocket SlaveQuit
         -- wait replication chan
         debug "Waiting for repChan to empty."
-        waitPoll 100 (isEmptyChan slaveRepChan)
+        mtid <- myThreadId
+        putMVar slaveRepThreadId mtid
         -- kill handler threads
-        debug "Killing handlers."
-        withMVar slaveThreads $ \ts -> do
-            print ts
-            -- Wait what? It needs this order!?!
-            -- Why? Presumably the after garbage collection the Chan is
-            -- gone, as nothing's there to write to it anymore.
-            forM_ (reverse ts) killThread
+        debug "Killing request handler."
+        withMVar slaveReqThreadId killThread
         -- cleanup zmq
         debug "Closing down zmq."
         withMVar slaveZmqSocket $ \s -> do