handling communication in a special thread - sensible?
authorMax Voit <max.voit+gtdv@with-eyes.net>
Wed, 10 Jun 2015 16:10:25 +0000 (18:10 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Wed, 10 Jun 2015 16:10:25 +0000 (18:10 +0200)
src/Data/Acid/MasterCentered.hs

index c743155..ea0f685 100644 (file)
@@ -34,34 +34,84 @@ import Data.Acid.Abstract -- not exported by acid-state, export and reinstall
 import Data.Acid.Advanced
 import Data.Acid.Local
 
-import System.ZMQ4
+import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
+import Control.Concurrent (forkIO)
+
+import System.ZMQ4 (socket, Router, bind, receive, liftIO)
 
 type PortNumber = Int
 
+data RepStatus = Done | Replicating | Cleanup
+
 data MasterState st 
     = MasterState { localState :: AcidState st
-                  , zmqContext :: Context
+                  , repStatus :: IORef RepStatus
+                  , repHandler :: Chan
                   }
 
+debug :: String -> IO ()
+debug msg = return ()
+        -- putStrLn msg
+        
+-- | The replication handler on master node
+masterRepHandler :: Chan -> IORef RepStatus -> String -> IO()
+masterRepHandler repHandler repStatus addr = runZMQ $ do
+        sock <- socket Router
+        bind sock addr
+        let loop = do
+            -- take one frame - only if there is one, else it'd block
+            inputWaiting <- poll 10 [Sock sock [In] Nothing]
+            unless (null $ head inputWaiting) $ do
+                ident <- receive sock
+                _ <- receive sock
+                msg <- receive sock
+                -- now handle received stuff
+                return ()
+            -- handle send events
+            
+            -- loop around
+            liftIO $ debug "loop iteration"
+            loop
+        loop
+{- what do we need to do in the zmq part?
+  there is two things:
+    1) receiving messages from slave nodes
+        - may change repStatus
+        - may need to send out rep requests
+    2) sending messages proactively, due to an update
+  problem: the receiving loop may block the proactive sending
+  solution: before receiving, check whether there is something
+
+  this is still ugly, as it is polling. Why can't we do something reactive?
+    not use zmq-monadic but hand out the socket to threads doing 1) and 2).
+    there may then be "write" collisions. use locking?
+-}
+
+
+
+-- | Open the master state.
 openMasterState :: (IsAcidic st, Typeable st) =>
-               PortNumber
-            -> st
+               PortNumber   -- ^ port to bind to
+            -> st           -- ^ initial state 
             -> IO (AcidState st)
 openMasterState port initialState = do
+        debug "opening master state"
         -- remote
-        ctx <- context
-            -- bind to port
-            -- fork to wait for connects and handle slaves
-            -- as we fork anyways, perhaps use runZMQ in there - but will
-            --   it terminate gracefully?
+        rs <- newIORef
+        rh <- newChan
+        let addr = "tcp://127.0.0.1:" ++ show port
+        forkIO $ masterRepHandler rh rs addr 
         -- local
         lst <- openLocalState initialState
         return $ toAcidState MasterState { localState = lst
-                                         , zmqContext = ctx
+                                         , repStatus = rs
+                                         , repHandler = rh
                                          }
 
+-- | Close the master state.
+closeMasterState :: MasterState -> IO ()
 closeMasterState MasterState{..} = do
-        destroy zmqContext
+        debug "closing master state"
         closeAcidState localState
 
 enslaveState = undefined