enhan: change to IntMap where possible
authorMax Voit <max.voit+gtdv@with-eyes.net>
Thu, 6 Aug 2015 11:33:37 +0000 (13:33 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Thu, 6 Aug 2015 11:33:37 +0000 (13:33 +0200)
makefile
src/Data/Acid/Centered/Master.hs
src/Data/Acid/Centered/Slave.hs

index dabaca4..4d8cd6d 100644 (file)
--- a/makefile
+++ b/makefile
@@ -1,4 +1,4 @@
-test:
+all-test:
        cabal clean
        cabal configure --enable-test
        cabal build
index f7f8c3a..7de1d43 100644 (file)
@@ -57,6 +57,8 @@ import System.FilePath ( (</>) )
 
 import qualified Data.Map as M
 import Data.Map (Map)
+import qualified Data.IntMap as IM
+import Data.IntMap (IntMap)
 import qualified Data.ByteString.Lazy.Char8 as CSL
 import qualified Data.ByteString.Char8 as CS
 import Data.ByteString.Char8 (ByteString)
@@ -70,7 +72,7 @@ data MasterState st
     = MasterState { localState :: AcidState st
                   , nodeStatus :: MVar NodeStatus
                   , repRedundancy :: Int
-                  , repFinalizers :: MVar (Map Revision (IO ()))
+                  , repFinalizers :: MVar (IntMap (IO ()))
                   , masterStateLock :: MVar ()
                   , masterRevision :: MVar NodeRevision
                   , masterRevisionN :: MVar NodeRevision
@@ -160,8 +162,8 @@ updateNodeStatus MasterState{..} ident r =
             debug $ "Full replication of " ++ show r
             -- finalize local replication
             modifyMVar_ repFinalizers $ \rf -> do
-                rf M.! r
-                return $ M.delete r rf
+                rf IM.! r
+                return $ IM.delete r rf
             -- send out FullRep signal
             forM_ (M.keys ns) $ sendToSlave zmqSocket (FullRep r)
         return rns
@@ -205,8 +207,8 @@ connectNode MasterState{..} i revision =
                 debug $ "Full replication up to " ++ show maxRev
                 -- finalize local replication
                 modifyMVar_ repFinalizers $ \rf -> do
-                    forM_ (filter (<= maxRev) (M.keys rf)) $ \r -> rf M.! r
-                    return $ M.filterWithKey (\k _ -> k > maxRev) rf
+                    forM_ (filter (<= maxRev) (IM.keys rf)) $ \r -> rf IM.! r
+                    return $ IM.filterWithKey (\k _ -> k > maxRev) rf
                 -- send out FullRep signal
                 forM_ (M.keys pns) $ sendToSlave zmqSocket (FullRepTo maxRev)
 
@@ -292,7 +294,7 @@ openRedMasterStateFrom directory address port red initialState = do
         revN <- newMVar lrev
         repChan <- newChan
         repChanN <- dupChan repChan
-        repFin <- newMVar M.empty
+        repFin <- newMVar IM.empty
         ns <- newMVar M.empty
         -- remote
         let addr = "tcp://" ++ address ++ ":" ++ show port
@@ -410,7 +412,7 @@ masterReplicationHandlerL MasterState{..} = do
                                 _               -> liftM snd $ scheduleLocalColdUpdate' (downcast localState) event
                             return (r+1,(r+1,a))
                         -- act finalizes the transaction - will be run after full replication
-                        modifyMVar_ repFinalizers $ return . M.insert rev act
+                        modifyMVar_ repFinalizers $ return . IM.insert rev act
                     else
                         modifyMVar_ masterRevision $ \r -> do
                             case sink of
index cedc2ce..d356e21 100644 (file)
@@ -56,8 +56,8 @@ import qualified Control.Concurrent.Event as Event
 import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
 import Control.Exception (handle, throw, SomeException, ErrorCall(..), AsyncException(..))
 
-import Data.Map (Map)
-import qualified Data.Map as M
+import Data.IntMap (IntMap)
+import qualified Data.IntMap as IM
 import Data.Maybe (fromMaybe)
 
 import qualified Data.ByteString.Lazy.Char8 as CSL
@@ -68,7 +68,7 @@ import qualified Data.ByteString.Lazy.Char8 as CSL
 data SlaveState st
     = SlaveState { slaveLocalState :: AcidState st
                  , slaveStateIsRed :: Bool
-                 , slaveRepFinalizers :: MVar (Map Revision (IO ()))
+                 , slaveRepFinalizers :: MVar (IntMap (IO ()))
                  , slaveRepChan :: Chan SlaveRepItem
                  , slaveSyncDone :: Event.Event
                  , slaveRevision :: MVar NodeRevision
@@ -83,7 +83,7 @@ data SlaveState st
                  } deriving (Typeable)
 
 -- | Memory of own Requests sent to Master.
-type SlaveRequests = Map RequestID (IO (IO ()),ThreadId)
+type SlaveRequests = IntMap (IO (IO ()),ThreadId)
 
 -- | One Update + Metainformation to replicate.
 data SlaveRepItem =
@@ -148,14 +148,14 @@ enslaveMayRedStateFrom isRed directory address port initialState = do
         lrev <- atomically $ readTVar $ logNextEntryId levs
         rev <- newMVar lrev
         debug $ "Opening enslaved state at revision " ++ show lrev
-        srs <- newMVar M.empty
+        srs <- newMVar IM.empty
         lastReqId <- newMVar 0
         repChan <- newChan
         syncDone <- Event.new
         reqTid <- newEmptyMVar
         repTid <- newEmptyMVar
         parTid <- myThreadId
-        repFin <- newMVar M.empty
+        repFin <- newMVar IM.empty
         -- remote
         let addr = "tcp://" ++ address ++ ":" ++ show port
         ctx <- context
@@ -215,12 +215,12 @@ slaveRequestHandler slaveState@SlaveState{..} = do
                             DoArchive r -> queueRepItem slaveState (SRIArchive r)
                             -- Full replication of a revision
                             FullRep r -> modifyMVar_ slaveRepFinalizers $ \rf -> do
-                                            rf M.! r
-                                            return $ M.delete r rf
+                                            rf IM.! r
+                                            return $ IM.delete r rf
                             -- Full replication of events up to revision
                             FullRepTo r -> modifyMVar_ slaveRepFinalizers $ \rf -> do
-                                            let (ef, nrf) = M.partitionWithKey (\k _ -> k <= r) rf
-                                            sequence_ (M.elems ef)
+                                            let (ef, nrf) = IM.partitionWithKey (\k _ -> k <= r) rf
+                                            sequence_ (IM.elems ef)
                                             return nrf
                             -- We are allowed to Quit.
                             MayQuit -> writeChan slaveRepChan SRIEnd
@@ -327,19 +327,19 @@ replicateUpdate slaveState@SlaveState{..} rev reqId event syncing = do
                     Nothing -> if slaveStateIsRed
                         then do
                             act <- liftM snd $ scheduleLocalColdUpdate' (downcast slaveLocalState) event
-                            modifyMVar_ slaveRepFinalizers $ return . M.insert rev act
+                            modifyMVar_ slaveRepFinalizers $ return . IM.insert rev act
                         else
                             void $ scheduleColdUpdate slaveLocalState event
                     Just rid -> do
                         act <- modifyMVar slaveRequests $ \srs -> do
                             debug $ "This is the Update for Request " ++ show rid
-                            let (icallback, timeoutId) = fromMaybe (error $ "Callback not found: " ++ show rid) (M.lookup rid srs)
+                            let (icallback, timeoutId) = fromMaybe (error $ "Callback not found: " ++ show rid) (IM.lookup rid srs)
                             callback <- icallback
                             killThread timeoutId
-                            let nsrs = M.delete rid srs
+                            let nsrs = IM.delete rid srs
                             return (nsrs, callback)
                         when slaveStateIsRed $
-                            modifyMVar_ slaveRepFinalizers $ return . M.insert rev act
+                            modifyMVar_ slaveRepFinalizers $ return . IM.insert rev act
                 -- send reply: we're done
                 unless syncing $ sendToMaster slaveZmqSocket $ RepDone rev
                 return rev
@@ -386,7 +386,7 @@ scheduleSlaveUpdate slaveState@SlaveState{..} event = do
                         hd <- scheduleUpdate slaveLocalState event
                         void $ forkIO $ putMVar result =<< takeMVar hd
                         return (return ())      -- bogus finalizer
-            return $ M.insert reqId (callback, timeoutID) srs
+            return $ IM.insert reqId (callback, timeoutID) srs
         return result
 
 -- | Ensures requests are actually answered or fail.
@@ -394,7 +394,7 @@ scheduleSlaveUpdate slaveState@SlaveState{..} event = do
 timeoutRequest :: SlaveState st -> RequestID -> IO ()
 timeoutRequest SlaveState{..} reqId = do
     threadDelay $ 5*1000*1000
-    stillThere <- withMVar slaveRequests (return . M.member reqId)
+    stillThere <- withMVar slaveRequests (return . IM.member reqId)
     when stillThere $ throwTo slaveParentThreadId $ ErrorCall "Update-Request timed out."
 
 -- | Send a message to Master.
@@ -410,7 +410,7 @@ liberateState SlaveState{..} = do
         _ <- takeMVar slaveLastRequestID
         -- check / wait unprocessed requests
         debug "Waiting for Requests to finish."
-        waitPoll 100 (withMVar slaveRequests (return . M.null))
+        waitPoll 100 (withMVar slaveRequests (return . IM.null))
         -- send master quit message
         sendToMaster slaveZmqSocket SlaveQuit
         -- wait replication chan, only if sync done