crc on join - frame
authorMax Voit <max.voit+gtdv@with-eyes.net>
Thu, 2 Jul 2015 19:31:34 +0000 (21:31 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Thu, 2 Jul 2015 19:31:34 +0000 (21:31 +0200)
src/Data/Acid/Centered/Common.hs
src/Data/Acid/Centered/Master.hs
src/Data/Acid/Centered/Slave.hs

index 92cfb80..733171f 100644 (file)
@@ -14,6 +14,7 @@
 module Data.Acid.Centered.Common
     (
       debug
+    , crcOfState
     , NodeRevision
     , Revision
     , RequestID
@@ -22,14 +23,22 @@ module Data.Acid.Centered.Common
     , MasterMessage(..)
     ) where
 
-import Data.Acid.Core (Tagged(..))
+import Data.Acid.Core (Tagged(..), withCoreState)
+import Data.Acid.Local (localCore)
+import Data.Acid.Abstract (downcast)
+import Data.Acid (AcidState, IsAcidic)
+import Data.Acid.CRC (crc16)
 
 import Control.Monad (liftM, liftM2, liftM3)
 import Data.ByteString.Char8 (ByteString)
 import qualified Data.ByteString.Lazy.Char8 as CSL
 import Data.Serialize (Serialize(..), put, get,
                        putWord8, getWord8,
+                       runPutLazy
                       )
+import Data.Typeable (Typeable)
+import Data.SafeCopy (safePut)
+import Data.Word (Word16)
 import System.IO (stderr, hPutStrLn)
 import qualified Control.Concurrent.Lock as L
 import System.IO.Unsafe (unsafePerformIO)
@@ -48,6 +57,9 @@ type Revision = Int
 -- | ID of an Update Request.
 type RequestID = Int
 
+-- | We use CRC16 for now.
+type Crc = Word16
+
 
 -- | Debugging without interleaving output from different threads
 {-# NOINLINE debugLock #-}
@@ -65,7 +77,7 @@ data MasterMessage = DoRep Revision (Maybe RequestID) (Tagged CSL.ByteString)
                    | MasterQuit
                   deriving (Show)
 
-data SlaveMessage = NewSlave Int
+data SlaveMessage = NewSlave Int Crc
                   | RepDone Int
                   | RepError
                   | ReqUpdate RequestID (Tagged CSL.ByteString)
@@ -89,7 +101,7 @@ instance Serialize MasterMessage where
 
 instance Serialize SlaveMessage where
     put msg = case msg of
-        NewSlave r    -> putWord8 0 >> put r
+        NewSlave r c  -> putWord8 0 >> put r >> put c
         RepDone r     -> putWord8 1 >> put r
         RepError      -> putWord8 2
         ReqUpdate i d -> putWord8 3 >> put i >> put d
@@ -97,9 +109,17 @@ instance Serialize SlaveMessage where
     get = do
         tag <- getWord8
         case tag of
-            0 -> liftM NewSlave get
+            0 -> liftM2 NewSlave get get
             1 -> liftM RepDone get
             2 -> return RepError
             3 -> liftM2 ReqUpdate get get
             9 -> return SlaveQuit
             _ -> error $ "Data.Serialize.get failed for SlaveMessage: invalid tag " ++ show tag
+
+-- | Compute the CRC of a state.
+crcOfState :: (IsAcidic st, Typeable st) => AcidState st -> IO Crc
+crcOfState state = do
+    let lst = downcast state
+    withCoreState (localCore lst) $ \st -> do
+        let encoded = runPutLazy (safePut st)
+        return $ crc16 encoded
index b29eff9..004226e 100644 (file)
@@ -101,9 +101,17 @@ masterRequestHandler masterState@MasterState{..} = forever $ do
             -- handle according frame contents
             case msg of
                 -- New Slave joined.
-                NewSlave r -> do
+                NewSlave r -> do
                     pastUpdates <- getPastUpdates localState r
                     connectNode masterState ident pastUpdates
+                {- fixme take the correct state revision
+                NewSlave r c -> do
+                    if crcOfState localState == c then 
+                        pastUpdates <- getPastUpdates localState r
+                        connectNode masterState ident pastUpdates
+                    else
+                        error "checksum mismatch."
+                -}
                 -- Slave is done replicating.
                 RepDone r -> return () -- updateNodeStatus masterState ident r
                 -- Slave sends an Udate.
index ccde48a..4554203 100644 (file)
@@ -117,6 +117,7 @@ enslaveState address port initialState = do
         repChan <- newChan
         syncDone <- Event.new
         sockLock <- newMVar ()
+        lcrc <- crcOfState lst
         -- remote
         let addr = "tcp://" ++ address ++ ":" ++ show port
         ctx <- context
@@ -125,7 +126,7 @@ enslaveState address port initialState = do
         setSendHighWM (restrict (100*1000)) sock
         connect sock addr
         msock <- newMVar sock
-        sendToMaster msock $ NewSlave lrev
+        sendToMaster msock $ NewSlave lrev lcrc
         let slaveState = SlaveState { slaveLocalState = lst
                                     , slaveRepChan = repChan
                                     , slaveSyncDone = syncDone