changed message format, use Serialize
authorMax Voit <max.voit+gtdv@with-eyes.net>
Wed, 17 Jun 2015 13:13:19 +0000 (15:13 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Wed, 17 Jun 2015 13:13:19 +0000 (15:13 +0200)
src/Data/Acid/Centered/Common.hs
src/Data/Acid/Centered/Master.hs
src/Data/Acid/Centered/Slave.hs

index 5eee607..7cf210d 100644 (file)
@@ -1,4 +1,4 @@
-{-# LANGUAGE DeriveDataTypeable, RecordWildCards, OverloadedStrings #-}
+{-# LANGUAGE OverloadedStrings #-}
 --------------------------------------------------------------------------------
 {- |
   Module      :  Data.Acid.Centered.Common
 
 module Data.Acid.Centered.Common
     (
-      PortNumber(..)
-    , debug
+      debug
+    , PortNumber(..)
+    , SlaveMessage(..)
+    , MasterMessage(..)
     ) where
 
+import Control.Monad (liftM, liftM2)
+import Data.ByteString.Char8 (ByteString)
+import Data.Serialize (Serialize(..), put, get,
+                       putWord8, getWord8,
+                      )
+
 type PortNumber = Int
 
 debug :: String -> IO ()
 debug = putStrLn 
+
+data MasterMessage = DoRep Int ByteString
+                  deriving (Show)
+                -- later:
+                -- | Quit
+
+data SlaveMessage = NewSlave Int
+                  | RepDone Int
+                  deriving (Show)
+               -- later:
+               -- | Update ByteString
+               -- | Quit
+
+instance Serialize MasterMessage where
+    put msg = case msg of
+        DoRep r d -> putWord8 0 >> put r >> put d
+    get = do 
+        tag <- getWord8
+        case tag of
+            0 -> liftM2 DoRep get get
+            _ -> error $ "Data.Serialize.get failed for MasterMessage: invalid tag " ++ show tag
+
+instance Serialize SlaveMessage where
+    put msg = case msg of
+        NewSlave r -> putWord8 0 >> put r
+        RepDone r  -> putWord8 1 >> put r
+    get = do
+        tag <- getWord8
+        case tag of
+            0 -> liftM NewSlave get
+            1 -> liftM RepDone get
+            _ -> error $ "Data.Serialize.get failed for SlaveMessage: invalid tag " ++ show tag
index 27fe91d..837129e 100644 (file)
@@ -1,4 +1,4 @@
-{-# LANGUAGE DeriveDataTypeable, RecordWildCards, OverloadedStrings #-}
+{-# LANGUAGE DeriveDataTypeable, RecordWildCards #-}
 --------------------------------------------------------------------------------
 {- |
   Module      :  Data.Acid.Centered.Master
@@ -33,12 +33,15 @@ import Data.Acid.Abstract
 import Data.Acid.Advanced
 import Data.Acid.Local
 import Data.Acid.Log
-import Data.Serialize (runPutLazy, runPut)
+import Data.Serialize (Serialize(..), put, get,
+                       decode, encode,
+                       runPutLazy, runPut
+                      )
 
 import Data.Acid.Centered.Common
 
 import Control.Concurrent (forkIO)
-import Control.Monad (forever, when, forM_)
+import Control.Monad (forever, when, forM_, liftM, liftM2)
 import qualified Control.Concurrent.Event as E
 
 import System.ZMQ4 (Context, Socket, Router(..), Receiver, Flag(..),
@@ -77,35 +80,38 @@ masterRepHandler MasterState{..} = do
                 -- take one frame
                 (ident, msg) <- receiveFrame zmqSocket
                 -- handle according frame contents
-                case CS.head msg of
-                    -- a _N_ew slave node
-                    'N' -> do
+                case msg of
+                    NewSlave r -> do
                         -- todo: the state should be locked at this point to avoid losses
                         oldUpdates <- getPastUpdates localState
                         connectNode zmqSocket nodeStatus ident oldUpdates
-                    -- Update was _D_one 
-                    'D' -> updateNodeStatus nodeStatus repDone ident msg cr
+                    RepDone r -> updateNodeStatus nodeStatus repDone ident r cr
                     -- Slave sends an _U_date
-                    'U' -> undefined -- todo: not yet
+                    --'U' -> undefined -- todo: not yet
                     -- no other messages possible
-                    _ -> error $ "Unknown message received: " ++ CS.unpack msg
+                    _ -> error $ "Unknown message received: " ++ show msg
                 -- loop around
                 debug "loop iteration"
                 loop
         loop
         where cr = undefined :: Int
 
+
+
+
+
 -- | Fetch past Updates from FileLog for replication.
 getPastUpdates :: (Typeable st) => AcidState st -> IO [Tagged CSL.ByteString]
 getPastUpdates state = readEntriesFrom (localEvents $ downcast state) 0
 
 
 -- | Update the NodeStatus after a node has replicated an Update.
-updateNodeStatus :: MVar NodeStatus -> E.Event -> NodeIdentity -> ByteString -> Int -> IO ()
-updateNodeStatus nodeStatus rDone ident msg cr = 
+updateNodeStatus :: MVar NodeStatus -> E.Event -> NodeIdentity -> Int -> Int -> IO ()
+updateNodeStatus nodeStatus rDone ident r cr = 
     modifyMVar_ nodeStatus $ \ns -> do
         -- todo: there should be a fancy way to do this
         --when (M.findWithDefault 0 ident ns /= (cr - 1)) $ error "Invalid increment of node status."
+        -- todo: checks sensible?
         let rs = M.adjust (+1) ident ns
         when (allNodesDone rs) $ do
             E.set rDone
@@ -121,8 +127,8 @@ updateNodeStatus nodeStatus rDone ident msg cr =
 connectNode :: Socket Router -> MVar NodeStatus -> NodeIdentity -> [Tagged CSL.ByteString] -> IO ()
 connectNode sock nodeStatus i oldUpdates = 
     modifyMVar_ nodeStatus $ \ns -> do
-        forM_ oldUpdates $ \u -> do
-            sendUpdate sock (encoded u) i
+        forM_ (zip oldUpdates [0..]) $ \(u, r) -> do
+            sendUpdate sock (encoded u) i
             (ident, msg) <- receiveFrame sock
             when (ident /= i) $ error "received message not from the new node"
             -- todo: also check increment validity
@@ -135,25 +141,27 @@ encodeUpdate :: (UpdateEvent e) => e -> ByteString
 encodeUpdate event = runPut (safePut event)
 
 -- | Send one (encoded) Update to a Slave.
-sendUpdate :: Socket Router -> ByteString -> NodeIdentity -> IO ()
-sendUpdate sock update ident = do
+sendUpdate :: Socket Router -> Int -> ByteString -> NodeIdentity -> IO ()
+sendUpdate sock revision update ident = do
     send sock [SendMore] ident
-    send sock [SendMore] ""
-    send sock [] $ 'U' `CS.cons` encoded
+    send sock [SendMore] CS.empty
+    send sock [] $ encode $ DoRep revision update
     where 
         encoded = undefined -- encode Tag and BS of update
     
 
 -- | Receive one Frame. A Frame consists of three messages: 
 --      sender ID, empty message, and actual content 
-receiveFrame :: (Receiver t) => Socket t -> IO (NodeIdentity, ByteString)
+receiveFrame :: (Receiver t) => Socket t -> IO (NodeIdentity, SlaveMessage)
 receiveFrame sock = do
     ident <- receive sock
     _     <- receive sock
     msg   <- receive sock
     debug $ "received from [" ++ show ident ++ "]: " ++ show msg
-    return (ident, msg)
-    
+    case decode msg of
+        -- todo: pass on exceptions
+        Left str -> error $ "Data.Serialize.decode failed on SlaveMessage: " ++ show msg
+        Right smsg -> return (ident, smsg)
 
 -- | Open the master state.
 openMasterState :: (IsAcidic st, Typeable st) =>
@@ -202,16 +210,17 @@ scheduleMasterUpdate masterState event = do
     res <- scheduleUpdate (localState masterState) event
     -- sent Update to Slaves
     E.clear $ repDone masterState
-    sendUpdateSlaves masterState event
+    sendUpdateSlaves masterState revision event
     -- wait for Slaves finish replication
     E.wait $ repDone masterState
     return res
+    where revision = undefined
 
-sendUpdateSlaves :: (SafeCopy e) => MasterState st -> e -> IO ()
-sendUpdateSlaves MasterState{..} event = withMVar nodeStatus $ \ns -> do
+sendUpdateSlaves :: (SafeCopy e) => MasterState st -> Int -> e -> IO ()
+sendUpdateSlaves MasterState{..} revision event = withMVar nodeStatus $ \ns -> do
     let allSlaves = M.keys ns
     let encoded = runPut (safePut event)
-    forM_ allSlaves $ \i -> sendUpdate zmqSocket encoded i
+    forM_ allSlaves $ \i -> sendUpdate zmqSocket revision encoded i
 
 
 toAcidState :: IsAcidic st => MasterState st -> AcidState st
index fe5a44f..abb6a77 100644 (file)
@@ -1,4 +1,4 @@
-{-# LANGUAGE DeriveDataTypeable, RecordWildCards, OverloadedStrings #-}
+{-# LANGUAGE DeriveDataTypeable, RecordWildCards #-}
 --------------------------------------------------------------------------------
 {- |
   Module      :  Data.Acid.CenteredSlave.hs
@@ -36,6 +36,10 @@ module Data.Acid.Centered.Slave
 
 import Data.Typeable
 import Data.SafeCopy
+import Data.Serialize (Serialize(..), put, get,
+                       decode, encode,
+                       runPutLazy, runPut
+                      )
 
 import Data.Acid
 import Data.Acid.Abstract
@@ -98,13 +102,15 @@ enslaveState address port initialState = do
 slaveRepHandler :: SlaveState st -> IO ()
 slaveRepHandler SlaveState{..} = forever $ do
         msg <- receive slaveZmqSocket
-        case CS.head msg of
-            -- We are sent an Update.
-            'U' -> replicateUpdate slaveZmqSocket msg slaveLocalState
-            -- We are requested to Quit.
-            'Q' -> undefined -- todo: how get a State that wasn't closed closed?
-            -- no other messages possible
-            _ -> error $ "Unknown message received: " ++ CS.unpack msg
+        case decode msg of
+            Left str -> error $ "Data.Serialize.decode failed on MasterMessage: " ++ show msg
+            Right mmsg -> case mmsg of
+                    -- We are sent an Update.
+                    DoRep r d -> replicateUpdate slaveZmqSocket msg slaveLocalState
+                    -- We are requested to Quit.
+                    -- Quit -> undefined -- todo: how get a State that wasn't closed closed?
+                    -- no other messages possible
+                    _ -> error $ "Unknown message received: " ++ show mmsg
 
 replicateUpdate :: Socket Req -> ByteString -> AcidState st -> IO ()
 replicateUpdate sock msg lst = do
@@ -114,8 +120,11 @@ replicateUpdate sock msg lst = do
         let tag = undefined
         scheduleColdUpdate lst (tag, CSL.fromStrict msg)
         -- send reply: we're done
-        send sock [] "D"
+        sendToMaster sock $ RepDone revision
+        where revision = undefined
 
+sendToMaster :: Socket Req -> SlaveMessage -> IO ()
+sendToMaster sock smsg = send sock [] $ encode smsg
 
 -- | Close an enslaved State.
 liberateState :: SlaveState st -> IO ()