beginning of node-status - for discussion
authorMax Voit <max.voit+gtdv@with-eyes.net>
Sat, 13 Jun 2015 07:58:07 +0000 (09:58 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Sat, 13 Jun 2015 07:58:07 +0000 (09:58 +0200)
acid-state-dist.cabal
src/Data/Acid/MasterCentered.hs

index 39b6e4d..89ad24a 100644 (file)
@@ -61,6 +61,7 @@ library
                        safecopy,
                        acid-state > 0.12 && < 0.13,
                        zeromq4-haskell,
+                       containers,
                        transformers,
                        mtl
   
index 36f8e58..8095d3c 100644 (file)
@@ -37,18 +37,25 @@ import Data.Acid.Local
 import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
 import Control.Concurrent (forkIO)
 
-import System.ZMQ4 (Context, Socket, Router(..), context, term, socket, close, bind, unbind, send, receive)
+import System.ZMQ4 (Context, Socket, Router(..), context, term, socket, close, 
+                    bind, unbind, send, receive, sendMulti, receiveMulti)
 import Data.IORef (IORef, newIORef)
+import qualified Data.Map as M
+import Data.Map (Map)
 
 -- auto imports following - need to be cleaned up
 import Control.Monad.IO.Class(liftIO)
+import Control.Concurrent.MVar(MVar)
 
 type PortNumber = Int
 
 data RepStatus = Done | Replicating | Cleanup
 
+type NodeStatus = Map String Int
+
 data MasterState st 
     = MasterState { localState :: AcidState st
+                  , nodeStatus :: IORef NodeStatus
                   , repStatus :: IORef RepStatus
                   , zmqContext :: Context
                   , zmqAddr :: String
@@ -71,7 +78,6 @@ masterRepHandler sock repStatus = do
                 msg <- receive sock
                 debug $ "from [" ++ show ident ++ "]: " ++ show msg
                 -- now handle received stuff
-                -- handle send events
                 
                 -- loop around
                 liftIO $ debug "loop iteration"
@@ -84,7 +90,7 @@ masterRepHandler sock repStatus = do
         - may need to send out rep requests
     2) sending messages proactively, due to an update
  not use zmq-monadic but hand out the socket to threads doing 1) and 2).
-    there may then be "write" collisions. use locking?
+    there may then be "write" collisions, but not with sendMulti
 -}
 
 
@@ -100,12 +106,14 @@ openMasterState port initialState = do
         ctx <- context
         sock <- socket ctx Router
         rs <- newIORef Done
+        ns <- newIORef M.empty
         let addr = "tcp://127.0.0.1:" ++ show port
         bind sock addr
         forkIO $ masterRepHandler sock rs 
         -- local
         lst <- openLocalState initialState
         return $ toAcidState MasterState { localState = lst
+                                         , nodeStatus = ns
                                          , repStatus = rs
                                          , zmqContext = ctx
                                          , zmqAddr = addr
@@ -124,6 +132,22 @@ closeMasterState MasterState{..} = do
         -- cleanup local state
         closeAcidState localState
 
+-- | Update on master site.
+scheduleMasterUpdate :: UpdateEvent event => MasterState (EventState event) -> event -> IO (MVar (EventResult event))
+scheduleMasterUpdate masterState event = do
+    -- sent Update to Slaves
+    sendUpdateSlaves
+    -- do local Update
+    scheduleUpdate $ localState masterState
+    -- wait for Slaves finish replication
+
+sendUpdateSlaves master = do
+    debug "sending update to slaves"
+    let salves = allNodes $ nodeStatus master 
+    sendMulti
+    
+allNodes ns = M.keys ns
+
 enslaveState = undefined
 
 toAcidState :: IsAcidic st => MasterState st -> AcidState st