enhan: cleanup, comments on examples 0.1.0.0
authorMax Voit <max.voit+gtdv@with-eyes.net>
Fri, 21 Aug 2015 15:00:45 +0000 (17:00 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Fri, 21 Aug 2015 15:00:45 +0000 (17:00 +0200)
bench1.md [deleted file]
examples/HelloWorldSlave_Safe.hs
examples/HelloWorldSlave_Safe2.hs
examples/readme.md
makefile
src/Data/Acid/Centered/Common.hs
zmq-concept/hwmtest.hs [deleted file]
zmq-concept/master.hs [deleted file]
zmq-concept/master2.hs [deleted file]
zmq-concept/master3.hs [deleted file]
zmq-concept/slave.hs [deleted file]

diff --git a/bench1.md b/bench1.md
deleted file mode 100644 (file)
index 1a3b12a..0000000
--- a/bench1.md
+++ /dev/null
@@ -1,101 +0,0 @@
-The masterReplicationHandler uses a Chan to ensure Update ordering. The handler
-reading from the Chan does both, local updates and distributing via zmq.
-Splitting the Chan should increase Performance with Salves connected quite a bit.
-
-# before splitting chan
-
-Benchmark Local: RUNNING...
-benchmarking Local
-time                 105.4 ms   (97.22 ms .. 115.2 ms)
-                     0.990 R²   (0.979 R² .. 0.998 R²)
-mean                 106.4 ms   (103.2 ms .. 111.0 ms)
-std dev              5.919 ms   (3.762 ms .. 9.091 ms)
-variance introduced by outliers: 10% (moderately inflated)
-
-benchmarking Local-grouped
-time                 7.285 ms   (7.040 ms .. 7.535 ms)
-                     0.993 R²   (0.989 R² .. 0.996 R²)
-mean                 6.108 ms   (5.830 ms .. 6.378 ms)
-std dev              780.1 μs   (660.3 μs .. 906.1 μs)
-variance introduced by outliers: 72% (severely inflated)
-
-Benchmark Local: FINISH
-Benchmark MasterOnly: RUNNING...
-benchmarking MasterOnly
-time                 111.9 ms   (105.0 ms .. 117.6 ms)
-                     0.993 R²   (0.974 R² .. 0.999 R²)
-mean                 107.7 ms   (102.9 ms .. 112.9 ms)
-std dev              7.137 ms   (4.358 ms .. 10.91 ms)
-variance introduced by outliers: 12% (moderately inflated)
-
-benchmarking MasterOnly-grouped
-time                 12.44 ms   (10.68 ms .. 16.04 ms)
-                     0.763 R²   (0.637 R² .. 0.993 R²)
-mean                 10.16 ms   (9.487 ms .. 12.36 ms)
-std dev              2.992 ms   (1.021 ms .. 6.005 ms)
-variance introduced by outliers: 93% (severely inflated)
-
-Benchmark MasterOnly: FINISH
-Benchmark MasterSlave: RUNNING...
-benchmarking MasterSlave
-time                 5.099 s    (4.994 s .. 5.181 s)
-                     1.000 R²   (1.000 R² .. 1.000 R²)
-mean                 5.102 s    (5.082 s .. 5.113 s)
-std dev              17.90 ms   (0.0 s .. 19.14 ms)
-variance introduced by outliers: 19% (moderately inflated)
-
-benchmarking MasterSlave-grouped
-time                 198.1 ms   (73.47 ms .. 400.3 ms)
-                     0.704 R²   (0.237 R² .. 1.000 R²)
-mean                 229.5 ms   (188.4 ms .. 306.5 ms)
-std dev              64.98 ms   (2.340 ms .. 82.13 ms)
-variance introduced by outliers: 65% (severely inflated)
-
-# after splitting chan
-
-benchmarking Local
-time                 98.01 ms   (93.92 ms .. 104.8 ms)
-                     0.995 R²   (0.989 R² .. 0.999 R²)
-mean                 97.68 ms   (94.70 ms .. 100.3 ms)
-std dev              4.134 ms   (2.843 ms .. 5.997 ms)
-
-benchmarking Local-grouped
-time                 7.386 ms   (7.093 ms .. 7.666 ms)
-                     0.992 R²   (0.987 R² .. 0.996 R²)
-mean                 6.105 ms   (5.806 ms .. 6.394 ms)
-std dev              831.6 μs   (689.2 μs .. 1.003 ms)
-variance introduced by outliers: 74% (severely inflated)
-
-Benchmark Local: FINISH
-Benchmark MasterOnly: RUNNING...
-benchmarking MasterOnly
-time                 106.5 ms   (98.30 ms .. 112.3 ms)
-                     0.996 R²   (0.992 R² .. 0.999 R²)
-mean                 99.60 ms   (97.03 ms .. 102.5 ms)
-std dev              4.517 ms   (3.295 ms .. 6.592 ms)
-variance introduced by outliers: 10% (moderately inflated)
-
-benchmarking MasterOnly-grouped
-time                 11.02 ms   (10.78 ms .. 11.33 ms)
-                     0.996 R²   (0.993 R² .. 0.998 R²)
-mean                 9.998 ms   (9.668 ms .. 10.28 ms)
-std dev              841.6 μs   (679.3 μs .. 1.022 ms)
-variance introduced by outliers: 45% (moderately inflated)
-
-Benchmark MasterOnly: FINISH
-Benchmark MasterSlave: RUNNING...
-benchmarking MasterSlave
-time                 129.6 ms   (124.8 ms .. 132.4 ms)
-                     0.999 R²   (0.995 R² .. 1.000 R²)
-mean                 126.4 ms   (124.4 ms .. 129.1 ms)
-std dev              3.516 ms   (2.240 ms .. 5.186 ms)
-variance introduced by outliers: 11% (moderately inflated)
-
-benchmarking MasterSlave-grouped
-time                 25.02 ms   (24.02 ms .. 26.07 ms)
-                     0.989 R²   (0.978 R² .. 0.996 R²)
-mean                 21.36 ms   (18.78 ms .. 22.72 ms)
-std dev              4.140 ms   (1.663 ms .. 6.090 ms)
-variance introduced by outliers: 74% (severely inflated)
-
-
index ad6feb3..98eb6e0 100644 (file)
@@ -32,7 +32,7 @@ queryState = do HelloWorldState string <- ask
 $(makeAcidic ''HelloWorldState ['writeState, 'queryState])
 
 main :: IO ()
-main = bracket
+main = bracket  -- use bracket for safe State deallocation on exceptions.
     (enslaveState "localhost"  3333 (HelloWorldState "Hello world"))
     (\acid -> putStrLn "Finally shutting down Slave." >> closeAcidState acid)
     $ \acid -> do
index ef6bbdd..b5cbaa7 100644 (file)
@@ -35,6 +35,8 @@ $(makeAcidic ''HelloWorldState ['writeState, 'queryState])
 main :: IO ()
 main = do
     acid <- enslaveState "localhost"  3333 (HelloWorldState "Hello world")
+    -- In contrast to the HelloWorldSlave_Safe.hs we do not have async
+    -- exceptions masked here:
     handle (\(e :: SomeException) -> putStrLn ("Exceptionally shut down Slave, due to: " ++ show e) >> closeAcidState acid) $ do
         putStrLn "Possible commands: x for exit; q for query; uString for update;"
         let loop = do
index 914c35d..6a22d63 100644 (file)
@@ -8,7 +8,13 @@ The slave1/2 directories contain symlinks for quick testing on one machine.
 A simple example analogous to the HelloWorld example in acid-state.
 Run the Master directly here, Slaves in subdirectories.
 
+The "safe" Slaves demonstrate how to take care of exceptions.
+
 # Int*Interactive
 
 Contains in Int as state, allows for multiple Updates in a row. The IntState is
 used in the _tests_ as well.
+
+# Threaded
+
+Demonstrates how Exceptions are handled in threaded usages.
index 391c520..0bac889 100644 (file)
--- a/makefile
+++ b/makefile
@@ -1,3 +1,8 @@
+# This makefile is only for convenience in development,
+# use cabal for building.
+all:
+       echo "Not intended for building, only convenience functions."
+
 all-test:
        cabal clean
        cabal configure --enable-test
index 5cc09b1..f04f99e 100644 (file)
@@ -40,15 +40,13 @@ import Data.Acid.CRC (crc16)
 import Control.Concurrent (threadDelay)
 
 import Control.Monad (liftM, liftM2, liftM3,
-                      unless, when
-                     )
+                      unless, when)
 import Control.Exception (Exception)
 
 import Data.ByteString.Lazy.Char8 (ByteString)
 import Data.Serialize (Serialize(..), put, get,
                        putWord8, getWord8,
-                       runPutLazy
-                      )
+                       runPutLazy)
 import Data.Word (Word16)
 
 #ifdef nodebug
diff --git a/zmq-concept/hwmtest.hs b/zmq-concept/hwmtest.hs
deleted file mode 100644 (file)
index 6e2457c..0000000
+++ /dev/null
@@ -1,48 +0,0 @@
-{-# LANGUAGE OverloadedStrings #-}
-import System.ZMQ4
-import Control.Concurrent 
-import Control.Monad
-import qualified Data.ByteString.Char8 as CS
-
-receiver = do
-    ctx <- context
-    sock <- socket ctx Dealer
-    --setReceiveHighWM (restrict (100*1000)) sock
-    connect sock "tcp://localhost:3335"
-    send sock [] "new here"
-    forever $ do 
-        msg <- receive sock
-        CS.putStrLn $ "R " `CS.append` msg
-    -- this is important since garbage collection!
-    close sock
-    term ctx
-
-main = do
-    forkIO receiver
-
-    ctx <- context
-    sock <- socket ctx Router
-    --setReceiveHighWM (restrict (100*1000)) sock
-    bind sock "tcp://*:3335"
-
-    ident <- receive sock
-    _ <- receive sock
-
-    forever $ do 
-        c <- getLine
-        case c of
-            'v':_ -> forM_ [0..100] $ \i -> do
-               send sock [SendMore] ident
-               send sock [] $ CS.pack $ show i 
-            's':_ -> forM_ [0..1000] $ \i -> do
-               send sock [SendMore] ident
-               send sock [] $ CS.pack $ show i 
-            'b':_ -> forM_ [0..1000000] $ \i -> do
-               send sock [SendMore] ident
-               send sock [] $ CS.pack $ show i 
-            _ -> print "unknown command"
-
-    -- this is important since garbage collection!
-    close sock
-    term ctx
-
diff --git a/zmq-concept/master.hs b/zmq-concept/master.hs
deleted file mode 100644 (file)
index 1b84621..0000000
+++ /dev/null
@@ -1,84 +0,0 @@
-{-# LANGUAGE OverloadedStrings #-}
-import Control.Monad
-import Control.Monad.IO.Class (MonadIO)
-import System.IO
-import System.ZMQ4
-import qualified Data.ByteString.Char8 as CS
-import qualified Data.Map.Strict as Map
-import Control.Concurrent (forkIO, threadDelay)
-import Data.IORef (newIORef, modifyIORef, modifyIORef', readIORef, IORef)
-import Data.Maybe (fromMaybe)
-import Control.Concurrent.MVar
-
-
-addr :: String
-addr = "tcp://127.0.0.1:5000"
-
-main :: IO ()
-main = do
-    -- current revision and updates
-    curRev <- newMVar 1
-    nodesRev <- newIORef Map.empty
-    -- no monadic zmq, share socket
-    ctx <- context
-    sock <- socket ctx Router
-    bind sock addr
-
-    -- /random/ updates
-    forkIO $ forever $ do
-        threadDelay $ 500 * 1000
-        modifyMVar_ curRev $ \cr -> do
-            let crn = cr + 1
-            -- send update to all nodes uptodate
-            ns <- readIORef nodesRev
-            let nodesUpToDate = Map.keys $ Map.filter (== cr) ns
-            forM_ nodesUpToDate $ \i -> sendUpdate sock i crn
-            return crn
-
-    -- worker for distributing updates
-    -- now loop and:
-    --      o update nodes not on current revision
-    --      o receive node responses and update revision list 
-    forever $ do
-        -- update revision list
-        (ident, msg) <- receiveReply sock
-        case CS.head msg of
-            'S' -> withMVar curRev $ \cr -> do 
-                   forM_ [0..cr] $ \rev -> do
-                              sendUpdate sock ident rev
-                              -- now also wait for it updating
-                              (_, msgnr) <- receiveReply sock
-                              when (msgToRev msgnr /= rev) $ error "revision update for new node failed"
-                   addNode nodesRev ident cr
-            'D' -> incRevNode nodesRev ident (msgToRev msg)
-            _ -> error $ "unknown message: " ++ show msg
-        CS.putStrLn $ CS.append (formatID ident) msg
-        hFlush stdout
-    -- cleanup zmq stuff
-    unbind sock addr
-    term ctx
-
-formatID i = CS.cons '[' $ CS.append i "] "
-
-addNode :: IORef (Map.Map CS.ByteString Int) -> CS.ByteString -> Int -> IO ()
-addNode ns i rev = modifyIORef ns (Map.insert i rev)
-
-incRevNode :: IORef (Map.Map CS.ByteString Int) -> CS.ByteString -> Int -> IO ()
-incRevNode ns i r = modifyIORef ns (Map.adjust (const r) i)
-
-receiveReply :: Receiver t => Socket t -> IO (CS.ByteString, CS.ByteString)
-receiveReply sock = do
-    ident <- receive sock 
-    msg <- receive sock 
-    print ident
-    print msg
-    return (ident, msg)
-
-sendUpdate :: Sender t => Socket t -> CS.ByteString -> Int -> IO ()
-sendUpdate sock ident num = do
-    send sock [SendMore] ident
-    send sock [] $ CS.cons 'U' (CS.pack (show (num :: Int)))
-
-msgToRev :: CS.ByteString -> Int
-msgToRev m = fst $ fromMaybe (0,"") (CS.readInt $ CS.tail m)
-
diff --git a/zmq-concept/master2.hs b/zmq-concept/master2.hs
deleted file mode 100644 (file)
index 993bc11..0000000
+++ /dev/null
@@ -1,74 +0,0 @@
--- another communication concept, using two sockets
---  o dealer-router (s-m) for slaves requests
---      slave connects, tells master where he is
---  o request-response (s-m) for actual state updates 
---      master connects back
---
---  reason to think about this concept is that zmq sockets are not thread-safe
---  therefore splitting the former single socket in multiple could be sensible
---
---  idea behind it is: it should be so simple
---
---  S: Hey, I'm at revision 23.
---  M: Here, have some updates till 123.
---  S: I'm at 123. More?
---  M: Here, have some updates till 127.
---  S: I'm at 127. More?
---    <silence, nothing happened>
---  M: Here, have 128.
---  S: I'm at 128. More?
---  ...
---  After each "More?" the Master either sends Updates that happened before
---  or waits for a Signal that a new Update is in progress/happened.
--- 
---  Random Requests by Slaves (UpdateRequest, SlaveQuit...) have to go over
---  another channel; especially also NewSlave, which needs forkIOing the
---  communication thread.
-
-import System.ZMQ4
-
-import Control.Monad (forever)
-
-type NodeID = Int
-type NodeAddr = String
-
-
-main :: IO ()
-main = do
-    -- bind router sock
-    -- open local state
-    forever $ do
-        -- wait for requests to
-        --  o connect a new node
-        --      add node in node status
-        --      forkio rephandler for that node
-        --  o issue an update
-        --      serialize it on master
-        --      set update-occured-event - slaves will replicate
-        --      ATH: at this point we forbid majority serialization!
-        --          we can't use a chan (as in the old concept) as this again 
-        --          requires to split the socket between different threads
-        --          instead we could use a chan holding events not serialized on
-        --          master - this needs distinction by rev in the rephandler 
-        --  o quit
-        --      kill rephandler
-        --      remove from node status
-        undefined
-
-
--- there's one for each slave
---      threadids must be taken care of (handler no longer needed -> kill)
---      we need an update-occured-event
---      we also need a mechanism to ensure a slave knows it's an update he requested
---          ATH: this is not trivial - we can't get this info from the master log
-replicationHandler :: NodeID -> NodeAddr -> IO ()
-replicationHandler id addr = do
-    -- connect
-    forever $ do
-        -- await slave-request for updates
-        -- if curstate newer
-            -- send state update
-        -- else
-            -- wait update-event, then send state update
-        undefined
-    
diff --git a/zmq-concept/master3.hs b/zmq-concept/master3.hs
deleted file mode 100644 (file)
index 3c13319..0000000
+++ /dev/null
@@ -1,79 +0,0 @@
--- this is mostly master2.hs, mixed with some of the original concept
---
--- another communication concept, using two sockets
---  o dealer-router (s-m) for slaves requests
---      slave connects, tells master where he is
---  o request-response (s-m) for actual state updates 
---      master connects back
---
---  reason to think about this concept is that zmq sockets are not thread-safe
---  therefore splitting the former single socket in multiple could be sensible
---
---  idea behind it is: it should be so simple
---
---  S: Hey, I'm at revision 23.
---  M: Here, have some updates till 123.
---  S: I'm at 123. More?
---  M: Here, have some updates till 127.
---  S: I'm at 127. More?
---    <silence, nothing happened>
---  M: Here, have 128.
---  S: I'm at 128. More?
---  ...
---  After each "More?" the Master either sends Updates that happened before
---  or waits for a Signal that a new Update is in progress/happened.
--- 
---  Random Requests by Slaves (UpdateRequest, SlaveQuit...) have to go over
---  another channel; especially also NewSlave, which needs forkIOing the
---  communication thread.
-
-import System.ZMQ4
-
-import Control.Monad (forever)
-import Control.Concurrent.Chan
-import Data.ByteString (ByteString)
-
-type NodeID = Int
-type NodeAddr = String
-type ReqID = Int
-type RepItem = ((Tagged ByteString), Either (ReqID) (NodeID,ReqID))
-
-
-main :: IO ()
-main = do
-    -- bind router sock
-    -- open local state
-    forever $ do
-        -- wait for requests to
-        --  o connect a new node
-        --      add node in node status
-        --      forkio rephandler for that node
-        --  o issue an update
-        --      put it into replicationChan
-        --      ATH: careful with majority serialization: 
-        --          here or in replicationHandler?
-        --  o quit
-        --      kill rephandler
-        --      remove from node status
-        undefined
-
-localRepHandler :: Chan RepItem -> IO ()
-localRepHandler chan = forever $ do
-    -- read from chan
-    -- replicate locally
-    undefined
-
--- there's one for each slave
---      threadids must be taken care of (handler no longer needed -> kill)
-replicationHandler :: Chan RepItem -> NodeID -> NodeAddr -> IO ()
-replicationHandler chan id addr = do
-    -- connect
-    -- send all old updates
-    -- send crc
-    -- clone replicationChan 
-    forever $ do
-        -- take update from chan
-        -- send it to slave (if requested by a slave it's marked in there)
-        -- wait slave done msg
-        undefined
-    
diff --git a/zmq-concept/slave.hs b/zmq-concept/slave.hs
deleted file mode 100644 (file)
index 30c74c7..0000000
+++ /dev/null
@@ -1,42 +0,0 @@
-{-# LANGUAGE OverloadedStrings #-}
-import Control.Monad
-import Control.Concurrent (threadDelay)
-import System.IO
-import System.ZMQ4.Monadic
-import qualified Data.ByteString.Char8 as CS
-import Data.IORef (newIORef, modifyIORef, readIORef)
-import Data.Maybe (fromMaybe)
-
-addr :: String
-addr = "tcp://127.0.0.1:5000"
-
-main :: IO ()
-main = do
-    myRev <- newIORef 0
-    runZMQ $ do
-        sock <- socket Dealer
-        connect sock addr
-        send sock [] "S"
-        forever $ do
-            -- liftIO $ threadDelay 100000
-            msg <- receive sock 
-            mcr <- liftIO $ readIORef myRev
-            case CS.head msg of
-                'U' -> do
-                    let nr = msgToRev msg
-                    if (nr == mcr + 1) || (nr == mcr) then do
-                        send sock [] $ CS.cons 'D' $ CS.tail msg
-                        if nr == mcr then
-                            liftIO $ print "W: ignoring increment which is none"
-                        else do
-                            liftIO $ modifyIORef myRev (+1)
-                            liftIO . CS.putStrLn $ CS.append "D" $ CS.tail msg
-                            liftIO $ hFlush stdout
-                    else
-                        error $ "E: invalid revision increment " ++ show mcr ++ " -> " ++ show nr
-                        -- when coordinator keeps track anyway, this
-                        -- shouldn't happen - but is it sensible?
-                _ -> return ()
-
-msgToRev m = fst $ fromMaybe (0,"") (CS.readInt $ CS.tail m)
-