slave quit more complete
authorMax Voit <max.voit+gtdv@with-eyes.net>
Wed, 15 Jul 2015 16:15:44 +0000 (18:15 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Wed, 15 Jul 2015 16:15:44 +0000 (18:15 +0200)
acid-state-dist.cabal
src/Data/Acid/Centered/Common.hs
src/Data/Acid/Centered/Slave.hs

index 521e83b..96e5d32 100644 (file)
@@ -72,7 +72,8 @@ library
                        transformers,
                        stm,
                        semigroups,
-                       mtl
+                       mtl,
+                       synchronous-channels
   
   -- Directories containing source files.
   hs-source-dirs:      src
index 83ab908..d35580c 100644 (file)
@@ -14,8 +14,9 @@
 module Data.Acid.Centered.Common
     (
       debug
-    , crcOfState
+    , waitPoll
     , addMyThreadId
+    , crcOfState
     , Crc
     , NodeRevision
     , Revision
@@ -31,8 +32,10 @@ import Data.Acid.Abstract (downcast)
 import Data.Acid (AcidState, IsAcidic)
 import Data.Acid.CRC (crc16)
 
-import Control.Monad (liftM, liftM2, liftM3)
-import Control.Concurrent (ThreadId, myThreadId)
+import Control.Monad (liftM, liftM2, liftM3,
+                      unless
+                     )
+import Control.Concurrent (ThreadId, myThreadId, threadDelay)
 import Data.ByteString.Char8 (ByteString)
 import qualified Data.ByteString.Lazy.Char8 as CSL
 import Data.Serialize (Serialize(..), put, get,
@@ -133,3 +136,6 @@ addMyThreadId ts = do
         nt <- myThreadId
         return (nt:ts)
 
+-- | By polling, wait until predicate true.
+waitPoll :: Int -> IO Bool -> IO ()
+waitPoll t p = p >>= \e -> unless e $ threadDelay t >> waitPoll t p
index 7bcefa8..0a5c242 100644 (file)
@@ -70,7 +70,7 @@ import Control.Monad (forever, void,
 import Control.Monad.STM (atomically)
 import Control.Concurrent.STM.TVar (readTVar)
 import qualified Control.Concurrent.Event as Event
-import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
+import Control.Concurrent.Chan.Synchronous (Chan, newChan, isEmptyChan, readChan, asyncWriteChan)
 
 import Data.Map (Map)
 import qualified Data.Map as M
@@ -146,8 +146,9 @@ enslaveState address port initialState = do
 
 -- | Replication handler of the Slave. 
 slaveRequestHandler :: (IsAcidic st, Typeable st) => SlaveState st -> IO ()
-slaveRequestHandler slaveState@SlaveState{..} = forever $ do
-        modifyMVar_ slaveThreads addMyThreadId
+slaveRequestHandler slaveState@SlaveState{..} = do
+    modifyMVar_ slaveThreads addMyThreadId
+    forever $ do
         --waitRead =<< readMVar slaveZmqSocket
         -- FIXME: we needn't poll if not for strange zmq behaviour
         re <- withMVar slaveZmqSocket $ \sock -> poll 100 [Sock sock [In] Nothing]
@@ -181,10 +182,11 @@ onSyncDone slaveState@SlaveState{..} crc = do
         Event.set slaveSyncDone
 
 -- | Queue Updates into Chan for replication.
+-- We use the Chan so Sync-Updates and normal ones can be interleaved.
 queueUpdate :: SlaveState st -> SlaveRepItem -> IO ()
 queueUpdate SlaveState{..} repItem@(rev, _, _) = do
         debug $ "Queuing Update with revision " ++ show rev
-        writeChan slaveRepChan repItem
+        asyncWriteChan slaveRepChan repItem
 
 -- | Replicates content of Chan.
 slaveReplicationHandler :: SlaveState st -> IO ()
@@ -257,25 +259,34 @@ sendToMaster msock smsg = withMVar msock $ \sock -> send sock [] (encode smsg)
 -- | Close an enslaved State.
 liberateState :: SlaveState st -> IO ()
 liberateState SlaveState{..} = do
-        debug "Closing Slave state."
+        debug "Closing Slave state..."
         -- lock state against updates: disallow requests
         -- todo: rather use a special value allowing exceptions in scheduleUpdate
         _ <- takeMVar slaveLastRequestID
         -- check / wait unprocessed requests
-        -- TODO
+        debug "Waiting for Requests to finish."
+        waitPoll 100 (withMVar slaveRequests (return . M.null))
         -- send master quit message
         sendToMaster slaveZmqSocket SlaveQuit
         -- wait replication chan
-        -- TODO
+        debug "Waiting for repChan to empty."
+        waitPoll 100 (isEmptyChan slaveRepChan)
         -- kill handler threads
-        withMVar slaveThreads $ \ts ->
-            forM_ ts killThread
+        debug "Killing handlers."
+        withMVar slaveThreads $ \ts -> do
+            print ts
+            -- Wait what? It needs this order!?!
+            -- Why? Presumably the after garbage collection the Chan is
+            -- gone, as nothing's there to write to it anymore.
+            forM_ (reverse ts) killThread
         -- cleanup zmq
+        debug "Closing down zmq."
         withMVar slaveZmqSocket $ \s -> do
             disconnect s slaveZmqAddr 
             close s
         term slaveZmqContext
         -- cleanup local state
+        debug "Closing local state."
         closeAcidState slaveLocalState