ref: Bytestring, imports
authorMax Voit <max.voit+gtdv@with-eyes.net>
Tue, 18 Aug 2015 12:09:51 +0000 (14:09 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Tue, 18 Aug 2015 12:09:51 +0000 (14:09 +0200)
src/Data/Acid/Centered/Common.hs
src/Data/Acid/Centered/Master.hs
src/Data/Acid/Centered/Slave.hs

index 19ab9d2..5aed467 100644 (file)
@@ -28,24 +28,27 @@ module Data.Acid.Centered.Common
     , AcidException(..)
     ) where
 
+import Data.Typeable (Typeable)
+import Data.SafeCopy (safePut)
+
 import Data.Acid.Core (Tagged, withCoreState)
 import Data.Acid.Local (localCore)
 import Data.Acid.Abstract (downcast)
 import Data.Acid (AcidState, IsAcidic)
 import Data.Acid.CRC (crc16)
 
+import Control.Concurrent (threadDelay)
+
 import Control.Monad (liftM, liftM2, liftM3,
                       unless, when
                      )
-import Control.Concurrent (threadDelay)
 import Control.Exception (Exception)
-import qualified Data.ByteString.Lazy.Char8 as CSL
+
+import Data.ByteString.Lazy.Char8 (ByteString)
 import Data.Serialize (Serialize(..), put, get,
                        putWord8, getWord8,
                        runPutLazy
                       )
-import Data.Typeable (Typeable)
-import Data.SafeCopy (safePut)
 import Data.Word (Word16)
 
 #ifdef nodebug
@@ -94,11 +97,11 @@ data AcidException = GracefulExit
 instance Exception AcidException
 
 -- | Messages the Master sends to Slaves.
-data MasterMessage = DoRep Revision (Maybe RequestID) (Tagged CSL.ByteString)
-                   | DoSyncRep Revision (Tagged CSL.ByteString)
+data MasterMessage = DoRep Revision (Maybe RequestID) (Tagged ByteString)
+                   | DoSyncRep Revision (Tagged ByteString)
                    | SyncDone Crc
                    | DoCheckpoint Revision
-                   | DoSyncCheckpoint Revision CSL.ByteString
+                   | DoSyncCheckpoint Revision ByteString
                    | DoArchive Revision
                    | FullRep Revision
                    | FullRepTo Revision
@@ -110,7 +113,7 @@ data MasterMessage = DoRep Revision (Maybe RequestID) (Tagged CSL.ByteString)
 data SlaveMessage = NewSlave Int
                   | RepDone Int
                   | RepError
-                  | ReqUpdate RequestID (Tagged CSL.ByteString)
+                  | ReqUpdate RequestID (Tagged ByteString)
                   | SlaveQuit
                   deriving (Show)
 
index 8067290..75f7c30 100644 (file)
@@ -26,24 +26,25 @@ module Data.Acid.Centered.Master
 
 import Data.Typeable
 import Data.SafeCopy
+import Data.Serialize (decode, encode, runPutLazy)
 
 import Data.Acid
 import Data.Acid.Core
 import Data.Acid.Abstract
 import Data.Acid.Local
 import Data.Acid.Log
-import Data.Serialize (decode, encode, runPutLazy)
 
 import Data.Acid.Centered.Common
 
 import Control.Concurrent (forkIO, ThreadId, myThreadId)
 import Control.Concurrent.Chan (Chan, newChan, writeChan, readChan, dupChan)
-import Control.Monad (when, unless, void, forM_, liftM2)
-import Control.Monad.STM (atomically)
 import Control.Concurrent.STM.TVar (readTVar)
 import Control.Concurrent.MVar(MVar, newMVar, newEmptyMVar,
                                takeMVar, putMVar, tryPutMVar, isEmptyMVar,
                                modifyMVar, modifyMVar_, withMVar)
+
+import Control.Monad.STM (atomically)
+import Control.Monad (when, unless, void, forM_, liftM2)
 import Control.Exception (handle, throwTo, SomeException)
 
 import System.ZMQ4 (Context, Socket, Router(..), Receiver,
@@ -53,13 +54,14 @@ import System.ZMQ4 (Context, Socket, Router(..), Receiver,
                     sendMulti, receiveMulti)
 import System.FilePath ( (</>) )
 
-import qualified Data.Map as M
-import Data.Map (Map)
-import qualified Data.IntMap as IM
-import Data.IntMap (IntMap)
 import qualified Data.ByteString.Lazy.Char8 as CSL
+import           Data.ByteString.Lazy.Char8 (ByteString)
 import qualified Data.ByteString.Char8 as CS
-import Data.ByteString.Char8 (ByteString)
+
+import qualified Data.Map as M
+import           Data.Map (Map)
+import qualified Data.IntMap as IM
+import           Data.IntMap (IntMap)
 import qualified Data.List.NonEmpty as NEL
 import Safe (headDef)
 
@@ -85,14 +87,14 @@ data MasterState st
                   , zmqSocket :: MVar (Socket Router)
                   } deriving (Typeable)
 
-type NodeIdentity = ByteString
+type NodeIdentity = CS.ByteString
 type NodeStatus = Map NodeIdentity NodeRevision
 type Callback = IO (IO ())      -- an IO action that returns a finalizer
 data ReplicationItem =
       RIEnd
     | RICheckpoint
     | RIArchive
-    | RIUpdate (Tagged CSL.ByteString) (Either Callback (RequestID, NodeIdentity))
+    | RIUpdate (Tagged ByteString) (Either Callback (RequestID, NodeIdentity))
 
 -- | The request handler on master node. Does
 --      o handle receiving requests from nodes,
@@ -210,7 +212,7 @@ connectNode MasterState{..} i revision =
 
 
 -- | Fetch past Updates from FileLog for replication.
-getPastUpdates :: (Typeable st) => AcidState st -> Int -> IO [(Int, Tagged CSL.ByteString)]
+getPastUpdates :: (Typeable st) => AcidState st -> Int -> IO [(Int, Tagged ByteString)]
 getPastUpdates state startRev = liftM2 zip (return [(startRev+1)..]) (readEntriesFrom (localEvents $ downcast state) startRev)
 
 -- | Get the revision at which the last checkpoint was taken.
@@ -380,7 +382,7 @@ scheduleMasterUpdate masterState@MasterState{..} event = do
             return result
 
 -- | Cold Update on master site.
-scheduleMasterColdUpdate :: Typeable st => MasterState st -> Tagged CSL.ByteString -> IO (MVar CSL.ByteString)
+scheduleMasterColdUpdate :: Typeable st => MasterState st -> Tagged ByteString -> IO (MVar ByteString)
 scheduleMasterColdUpdate masterState@MasterState{..} encoded = do
         debug "Cold Update by Master."
         unlocked <- isEmptyMVar masterStateLock
index bc8e4a7..a81791c 100644 (file)
@@ -35,30 +35,31 @@ import Data.Acid.Log
 
 import Data.Acid.Centered.Common
 
-import System.ZMQ4 (Context, Socket, Dealer(..),
-                    setReceiveHighWM, setSendHighWM, setLinger, restrict,
-                    poll, Poll(..), Event(..),
-                    context, term, socket, close,
-                    connect, disconnect, send, receive)
-import System.FilePath ( (</>) )
-
 import Control.Concurrent (forkIO, ThreadId, myThreadId, killThread, threadDelay, forkIOWithUnmask)
 import Control.Concurrent.MVar (MVar, newMVar, newEmptyMVar, isEmptyMVar,
                                 withMVar, modifyMVar, modifyMVar_,
                                 takeMVar, putMVar, tryPutMVar)
-import Data.IORef (writeIORef)
-import Control.Monad (void, when, unless)
-import Control.Monad.STM (atomically)
+import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
 import Control.Concurrent.STM.TVar (readTVar, writeTVar)
+import Data.IORef (writeIORef)
 import qualified Control.Concurrent.Event as Event
-import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
+
+import Control.Monad.STM (atomically)
+import Control.Monad (void, when, unless)
 import Control.Exception (handle, throwTo, SomeException, ErrorCall(..))
 
-import Data.IntMap (IntMap)
-import qualified Data.IntMap as IM
+import System.ZMQ4 (Context, Socket, Dealer(..),
+                    setReceiveHighWM, setSendHighWM, setLinger, restrict,
+                    poll, Poll(..), Event(..),
+                    context, term, socket, close,
+                    connect, disconnect, send, receive)
+import System.FilePath ( (</>) )
+
+import Data.ByteString.Lazy.Char8 (ByteString)
 import Data.Maybe (fromMaybe)
 
-import qualified Data.ByteString.Lazy.Char8 as CSL
+import           Data.IntMap (IntMap)
+import qualified Data.IntMap as IM
 
 --------------------------------------------------------------------------------
 
@@ -89,7 +90,7 @@ data SlaveRepItem =
       SRIEnd
     | SRICheckpoint Revision
     | SRIArchive Revision
-    | SRIUpdate Revision (Maybe RequestID) (Tagged CSL.ByteString)
+    | SRIUpdate Revision (Maybe RequestID) (Tagged ByteString)
 
 -- | Open a local State as Slave for a Master.
 --
@@ -284,7 +285,7 @@ slaveReplicationHandler slaveState@SlaveState{..} = do
 
 -- | Replicate Sync-Checkpoints directly.
 replicateSyncCp :: (IsAcidic st, Typeable st) =>
-        SlaveState st -> Revision -> CSL.ByteString -> IO ()
+        SlaveState st -> Revision -> ByteString -> IO ()
 replicateSyncCp SlaveState{..} rev encoded = do
     st <- decodeCheckpoint encoded
     let lst = downcast slaveLocalState
@@ -312,14 +313,14 @@ replicateSyncCp SlaveState{..} rev encoded = do
                 Right val -> return val
 
 -- | Replicate Sync-Updates directly.
-replicateSyncUpdate :: Typeable st => SlaveState st -> Revision -> Tagged CSL.ByteString -> IO ()
+replicateSyncUpdate :: Typeable st => SlaveState st -> Revision -> Tagged ByteString -> IO ()
 replicateSyncUpdate slaveState rev event = replicateUpdate slaveState rev Nothing event True
 
 -- | Replicate an Update as requested by Master.
 --   Updates that were requested by this Slave are run locally and the result
 --   put into the MVar in SlaveRequests.
 --   Other Updates are just replicated without using the result.
-replicateUpdate :: Typeable st => SlaveState st -> Revision -> Maybe RequestID -> Tagged CSL.ByteString -> Bool -> IO ()
+replicateUpdate :: Typeable st => SlaveState st -> Revision -> Maybe RequestID -> Tagged ByteString -> Bool -> IO ()
 replicateUpdate SlaveState{..} rev reqId event syncing = do
         debug $ "Got an Update to replicate " ++ show rev
         modifyMVar_ slaveRevision $ \nr -> if rev - 1 == nr
@@ -392,7 +393,7 @@ scheduleSlaveUpdate slaveState@SlaveState{..} event = do
         return result
 
 -- | Cold Update on slave site. This enables for using Remote.
-scheduleSlaveColdUpdate :: Typeable st => SlaveState st -> Tagged CSL.ByteString -> IO (MVar CSL.ByteString)
+scheduleSlaveColdUpdate :: Typeable st => SlaveState st -> Tagged ByteString -> IO (MVar ByteString)
 scheduleSlaveColdUpdate slaveState@SlaveState{..} encoded = do
     unlocked <- isEmptyMVar slaveStateLock
     if not unlocked then error "State is locked."