ref: indentation
authorMax Voit <max.voit+gtdv@with-eyes.net>
Tue, 18 Aug 2015 14:06:30 +0000 (16:06 +0200)
committerMax Voit <max.voit+gtdv@with-eyes.net>
Tue, 18 Aug 2015 14:06:30 +0000 (16:06 +0200)
src/Data/Acid/Centered/Master.hs
src/Data/Acid/Centered/Slave.hs

index 6c1ef5c..1262755 100644 (file)
@@ -146,7 +146,7 @@ masterRequestHandler masterState@MasterState{..} = do
 -- | Remove a Slave node from NodeStatus.
 removeFromNodeStatus :: MVar NodeStatus -> NodeIdentity -> IO ()
 removeFromNodeStatus nodeStatus ident =
-        modifyMVar_ nodeStatus $ return . M.delete ident
+    modifyMVar_ nodeStatus $ return . M.delete ident
 
 -- | Update the NodeStatus after a node has replicated an Update.
 updateNodeStatus :: MasterState st -> NodeIdentity -> Int -> IO ()
@@ -290,52 +290,52 @@ openRedMasterStateFrom :: (IsAcidic st, Typeable st) =>
             -> st           -- ^ initial state
             -> IO (AcidState st)
 openRedMasterStateFrom directory address port red initialState = do
-        debug "opening master state"
-        -- local
-        lst <- openLocalStateFrom directory initialState
-        let levs = localEvents $ downcast lst
-        lrev <- atomically $ readTVar $ logNextEntryId levs
-        rev <- newMVar lrev
-        revN <- newMVar lrev
-        repChan <- newChan
-        repChanN <- dupChan repChan
-        repFin <- newMVar IM.empty
-        ns <- newMVar M.empty
-        repTidL <- newEmptyMVar
-        repTidN <- newEmptyMVar
-        reqTid <- newEmptyMVar
-        parTid <- myThreadId
-        sLock <- newEmptyMVar
-        -- remote
-        let addr = "tcp://" ++ address ++ ":" ++ show port
-        ctx <- context
-        sock <- socket ctx Router
-        setReceiveHighWM (restrict (100*1000 :: Int)) sock
-        setSendHighWM (restrict (100*1000 :: Int)) sock
-        bind sock addr
-        msock <- newMVar sock
-
-        let masterState = MasterState { localState = lst
-                                      , nodeStatus = ns
-                                      , repRedundancy = red
-                                      , repFinalizers = repFin
-                                      , masterStateLock = sLock
-                                      , masterRevision = rev
-                                      , masterRevisionN = revN
-                                      , masterReplicationChan = repChan
-                                      , masterReplicationChanN = repChanN
-                                      , masterRepLThreadId = repTidL
-                                      , masterRepNThreadId = repTidN
-                                      , masterReqThreadId = reqTid
-                                      , masterParentThreadId = parTid
-                                      , zmqContext = ctx
-                                      , zmqAddr = addr
-                                      , zmqSocket = msock
-                                      }
-        void $ forkIO $ masterRequestHandler masterState
-        void $ forkIO $ masterReplicationHandlerL masterState
-        void $ forkIO $ masterReplicationHandlerN masterState
-        return $ toAcidState masterState
+    debug "opening master state"
+    -- local
+    lst <- openLocalStateFrom directory initialState
+    let levs = localEvents $ downcast lst
+    lrev <- atomically $ readTVar $ logNextEntryId levs
+    rev <- newMVar lrev
+    revN <- newMVar lrev
+    repChan <- newChan
+    repChanN <- dupChan repChan
+    repFin <- newMVar IM.empty
+    ns <- newMVar M.empty
+    repTidL <- newEmptyMVar
+    repTidN <- newEmptyMVar
+    reqTid <- newEmptyMVar
+    parTid <- myThreadId
+    sLock <- newEmptyMVar
+    -- remote
+    let addr = "tcp://" ++ address ++ ":" ++ show port
+    ctx <- context
+    sock <- socket ctx Router
+    setReceiveHighWM (restrict (100*1000 :: Int)) sock
+    setSendHighWM (restrict (100*1000 :: Int)) sock
+    bind sock addr
+    msock <- newMVar sock
+
+    let masterState = MasterState { localState = lst
+                                  , nodeStatus = ns
+                                  , repRedundancy = red
+                                  , repFinalizers = repFin
+                                  , masterStateLock = sLock
+                                  , masterRevision = rev
+                                  , masterRevisionN = revN
+                                  , masterReplicationChan = repChan
+                                  , masterReplicationChanN = repChanN
+                                  , masterRepLThreadId = repTidL
+                                  , masterRepNThreadId = repTidN
+                                  , masterReqThreadId = reqTid
+                                  , masterParentThreadId = parTid
+                                  , zmqContext = ctx
+                                  , zmqAddr = addr
+                                  , zmqSocket = msock
+                                  }
+    void $ forkIO $ masterRequestHandler masterState
+    void $ forkIO $ masterReplicationHandlerL masterState
+    void $ forkIO $ masterReplicationHandlerN masterState
+    return $ toAcidState masterState
 
 -- | Close the master state.
 closeMasterState :: MasterState st -> IO ()
@@ -369,41 +369,41 @@ closeMasterState MasterState{..} =
 -- | Update on master site.
 scheduleMasterUpdate :: (UpdateEvent event, Typeable (EventState event)) => MasterState (EventState event) -> event -> IO (MVar (EventResult event))
 scheduleMasterUpdate masterState@MasterState{..} event = do
-        debug "Update by Master."
-        unlocked <- isEmptyMVar masterStateLock
-        if not unlocked then error "State is locked!"
-        else do
-            result <- newEmptyMVar
-            let callback = if repRedundancy > 1
-                then
-                    -- the returned action fills in result when executed later
-                    scheduleLocalUpdate' (downcast localState) event result
-                else do
-                    hd <- scheduleUpdate localState event
-                    void $ forkIO (putMVar result =<< takeMVar hd)
-                    return (return ())      -- bogus finalizer
-            let encoded = runPutLazy (safePut event)
-            queueRepItem masterState (RIUpdate (methodTag event, encoded) (Left callback))
-            return result
+    debug "Update by Master."
+    unlocked <- isEmptyMVar masterStateLock
+    if not unlocked then error "State is locked!"
+    else do
+        result <- newEmptyMVar
+        let callback = if repRedundancy > 1
+            then
+                -- the returned action fills in result when executed later
+                scheduleLocalUpdate' (downcast localState) event result
+            else do
+                hd <- scheduleUpdate localState event
+                void $ forkIO (putMVar result =<< takeMVar hd)
+                return (return ())      -- bogus finalizer
+        let encoded = runPutLazy (safePut event)
+        queueRepItem masterState (RIUpdate (methodTag event, encoded) (Left callback))
+        return result
 
 -- | Cold Update on master site.
 scheduleMasterColdUpdate :: Typeable st => MasterState st -> Tagged ByteString -> IO (MVar ByteString)
 scheduleMasterColdUpdate masterState@MasterState{..} encoded = do
-        debug "Cold Update by Master."
-        unlocked <- isEmptyMVar masterStateLock
-        if not unlocked then error "State is locked!"
-        else do
-            result <- newEmptyMVar
-            let callback = if repRedundancy > 1
-                then
-                    -- the returned action fills in result when executed later
-                    scheduleLocalColdUpdate' (downcast localState) encoded result
-                else do
-                    hd <- scheduleColdUpdate localState encoded
-                    void $ forkIO (putMVar result =<< takeMVar hd)
-                    return (return ())      -- bogus finalizer
-            queueRepItem masterState (RIUpdate encoded (Left callback))
-            return result
+    debug "Cold Update by Master."
+    unlocked <- isEmptyMVar masterStateLock
+    if not unlocked then error "State is locked!"
+    else do
+        result <- newEmptyMVar
+        let callback = if repRedundancy > 1
+            then
+                -- the returned action fills in result when executed later
+                scheduleLocalColdUpdate' (downcast localState) encoded result
+            else do
+                hd <- scheduleColdUpdate localState encoded
+                void $ forkIO (putMVar result =<< takeMVar hd)
+                return (return ())      -- bogus finalizer
+        queueRepItem masterState (RIUpdate encoded (Left callback))
+        return result
 
 -- | Queue an RepItem (originating from the Master itself of an Slave via zmq)
 queueRepItem :: MasterState st -> ReplicationItem -> IO ()
index 57686dc..ac8d8da 100644 (file)
@@ -145,52 +145,52 @@ enslaveMayRedStateFrom :: (IsAcidic st, Typeable st) =>
          -> st              -- ^ initial state
          -> IO (AcidState st)
 enslaveMayRedStateFrom isRed directory address port initialState = do
-        -- local
-        lst <- openLocalStateFrom directory initialState
-        lrev <- getLocalRevision lst
-        rev <- newMVar lrev
-        debug $ "Opening enslaved state at revision " ++ show lrev
-        srs <- newMVar IM.empty
-        lastReqId <- newMVar 0
-        repChan <- newChan
-        syncDone <- Event.new
-        reqTid <- newEmptyMVar
-        repTid <- newEmptyMVar
-        parTid <- myThreadId
-        repFin <- newMVar IM.empty
-        sLock <- newEmptyMVar
-        -- remote
-        let addr = "tcp://" ++ address ++ ":" ++ show port
-        ctx <- context
-        sock <- socket ctx Dealer
-        setReceiveHighWM (restrict (100*1000 :: Int)) sock
-        setSendHighWM (restrict (100*1000 :: Int)) sock
-        connect sock addr
-        msock <- newMVar sock
-        sendToMaster msock $ NewSlave lrev
-
-        let slaveState = SlaveState { slaveLocalState = lst
-                                    , slaveStateIsRed = isRed
-                                    , slaveStateLock = sLock
-                                    , slaveRepFinalizers = repFin
-                                    , slaveRepChan = repChan
-                                    , slaveSyncDone = syncDone
-                                    , slaveRevision = rev
-                                    , slaveRequests = srs
-                                    , slaveLastRequestID = lastReqId
-                                    , slaveReqThreadId = reqTid
-                                    , slaveRepThreadId = repTid
-                                    , slaveParentThreadId = parTid
-                                    , slaveZmqContext = ctx
-                                    , slaveZmqAddr = addr
-                                    , slaveZmqSocket = msock
-                                    }
-        void $ forkIOWithUnmask $ slaveRequestHandler slaveState
-        void $ forkIO $ slaveReplicationHandler slaveState
-        return $ slaveToAcidState slaveState
-        where
-            getLocalRevision =
-                atomically . readTVar . logNextEntryId . localEvents . downcast
+    -- local
+    lst <- openLocalStateFrom directory initialState
+    lrev <- getLocalRevision lst
+    rev <- newMVar lrev
+    debug $ "Opening enslaved state at revision " ++ show lrev
+    srs <- newMVar IM.empty
+    lastReqId <- newMVar 0
+    repChan <- newChan
+    syncDone <- Event.new
+    reqTid <- newEmptyMVar
+    repTid <- newEmptyMVar
+    parTid <- myThreadId
+    repFin <- newMVar IM.empty
+    sLock <- newEmptyMVar
+    -- remote
+    let addr = "tcp://" ++ address ++ ":" ++ show port
+    ctx <- context
+    sock <- socket ctx Dealer
+    setReceiveHighWM (restrict (100*1000 :: Int)) sock
+    setSendHighWM (restrict (100*1000 :: Int)) sock
+    connect sock addr
+    msock <- newMVar sock
+    sendToMaster msock $ NewSlave lrev
+
+    let slaveState = SlaveState { slaveLocalState = lst
+                                , slaveStateIsRed = isRed
+                                , slaveStateLock = sLock
+                                , slaveRepFinalizers = repFin
+                                , slaveRepChan = repChan
+                                , slaveSyncDone = syncDone
+                                , slaveRevision = rev
+                                , slaveRequests = srs
+                                , slaveLastRequestID = lastReqId
+                                , slaveReqThreadId = reqTid
+                                , slaveRepThreadId = repTid
+                                , slaveParentThreadId = parTid
+                                , slaveZmqContext = ctx
+                                , slaveZmqAddr = addr
+                                , slaveZmqSocket = msock
+                                }
+    void $ forkIOWithUnmask $ slaveRequestHandler slaveState
+    void $ forkIO $ slaveReplicationHandler slaveState
+    return $ slaveToAcidState slaveState
+    where
+        getLocalRevision =
+            atomically . readTVar . logNextEntryId . localEvents . downcast
 
 -- | Replication handler of the Slave.
 slaveRequestHandler :: (IsAcidic st, Typeable st) => SlaveState st -> (IO () -> IO ()) -> IO ()
@@ -258,31 +258,31 @@ onSyncDone SlaveState{..} crc = do
 -- We use the Chan so Sync-Updates and normal ones can be interleaved.
 queueRepItem :: SlaveState st -> SlaveRepItem -> IO ()
 queueRepItem SlaveState{..} repItem = do
-        debug "Queuing RepItem."
-        writeChan slaveRepChan repItem
+    debug "Queuing RepItem."
+    writeChan slaveRepChan repItem
 
 -- | Replicates content of Chan.
 slaveReplicationHandler :: Typeable st => SlaveState st -> IO ()
 slaveReplicationHandler slaveState@SlaveState{..} = do
-        mtid <- myThreadId
-        putMVar slaveRepThreadId mtid
-
-        -- todo: timeout is magic variable, make customizable
-        noTimeout <- Event.waitTimeout slaveSyncDone $ 10*1000*1000
-        unless noTimeout $ throwTo slaveParentThreadId $
-            ErrorCall "Data.Acid.Centered.Slave: Took too long to sync. Timeout."
-
-        let loop = handle (\e -> throwTo slaveParentThreadId (e :: SomeException)) $ do
-                mayRepItem <- readChan slaveRepChan
-                case mayRepItem of
-                    SRIEnd          -> return ()
-                    SRICheckpoint r -> repCheckpoint slaveState r               >> loop
-                    SRIArchive r    -> repArchive slaveState r                  >> loop
-                    SRIUpdate r i d -> replicateUpdate slaveState r i d False   >> loop
-        loop
-
-        -- signal that we're done
-        void $ takeMVar slaveRepThreadId
+    mtid <- myThreadId
+    putMVar slaveRepThreadId mtid
+
+    -- todo: timeout is magic variable, make customizable
+    noTimeout <- Event.waitTimeout slaveSyncDone $ 10*1000*1000
+    unless noTimeout $ throwTo slaveParentThreadId $
+        ErrorCall "Data.Acid.Centered.Slave: Took too long to sync. Timeout."
+
+    let loop = handle (\e -> throwTo slaveParentThreadId (e :: SomeException)) $ do
+            mayRepItem <- readChan slaveRepChan
+            case mayRepItem of
+                SRIEnd          -> return ()
+                SRICheckpoint r -> repCheckpoint slaveState r               >> loop
+                SRIArchive r    -> repArchive slaveState r                  >> loop
+                SRIUpdate r i d -> replicateUpdate slaveState r i d False   >> loop
+    loop
+
+    -- signal that we're done
+    void $ takeMVar slaveRepThreadId
 
 -- | Replicate Sync-Checkpoints directly.
 replicateSyncCp :: (IsAcidic st, Typeable st) =>
@@ -323,39 +323,39 @@ replicateSyncUpdate slaveState rev event = replicateUpdate slaveState rev Nothin
 --   Other Updates are just replicated without using the result.
 replicateUpdate :: Typeable st => SlaveState st -> Revision -> Maybe RequestID -> Tagged ByteString -> Bool -> IO ()
 replicateUpdate SlaveState{..} rev reqId event syncing = do
-        debug $ "Got an Update to replicate " ++ show rev
-        modifyMVar_ slaveRevision $ \nr -> if rev - 1 == nr
-            then do
-                -- commit / run it locally
-                case reqId of
-                    Nothing -> replicateForeign
-                    Just rid -> replicateOwn rid
-                -- send reply: we're done
-                unless syncing $ sendToMaster slaveZmqSocket $ RepDone rev
-                return rev
-            else do
-                sendToMaster slaveZmqSocket RepError
-                void $ error $
-                    "Data.Acid.Centered.Slave: Replication failed at revision "
-                        ++ show nr ++ " -> " ++ show rev
-                return nr
-        where
-            replicateForeign =
-                if slaveStateIsRed then do
-                    act <- newEmptyMVar >>= scheduleLocalColdUpdate' (downcast slaveLocalState) event
-                    modifyMVar_ slaveRepFinalizers $ return . IM.insert rev act
-                else
-                    void $ scheduleColdUpdate slaveLocalState event
-            replicateOwn rid = do
-                act <- modifyMVar slaveRequests $ \srs -> do
-                    debug $ "This is the Update for Request " ++ show rid
-                    let (icallback, timeoutId) = srs IM.! rid
-                    callback <- icallback
-                    killThread timeoutId
-                    let nsrs = IM.delete rid srs
-                    return (nsrs, callback)
-                when slaveStateIsRed $
-                    modifyMVar_ slaveRepFinalizers $ return . IM.insert rev act
+    debug $ "Got an Update to replicate " ++ show rev
+    modifyMVar_ slaveRevision $ \nr -> if rev - 1 == nr
+        then do
+            -- commit / run it locally
+            case reqId of
+                Nothing -> replicateForeign
+                Just rid -> replicateOwn rid
+            -- send reply: we're done
+            unless syncing $ sendToMaster slaveZmqSocket $ RepDone rev
+            return rev
+        else do
+            sendToMaster slaveZmqSocket RepError
+            void $ error $
+                "Data.Acid.Centered.Slave: Replication failed at revision "
+                    ++ show nr ++ " -> " ++ show rev
+            return nr
+    where
+        replicateForeign =
+            if slaveStateIsRed then do
+                act <- newEmptyMVar >>= scheduleLocalColdUpdate' (downcast slaveLocalState) event
+                modifyMVar_ slaveRepFinalizers $ return . IM.insert rev act
+            else
+                void $ scheduleColdUpdate slaveLocalState event
+        replicateOwn rid = do
+            act <- modifyMVar slaveRequests $ \srs -> do
+                debug $ "This is the Update for Request " ++ show rid
+                let (icallback, timeoutId) = srs IM.! rid
+                callback <- icallback
+                killThread timeoutId
+                let nsrs = IM.delete rid srs
+                return (nsrs, callback)
+            when slaveStateIsRed $
+                modifyMVar_ slaveRepFinalizers $ return . IM.insert rev act
 
 repCheckpoint :: SlaveState st -> Revision -> IO ()
 repCheckpoint SlaveState{..} rev = do