scheduleMasterUpdate masterState@MasterState{..} event = do
debug "Update by Master."
result <- newEmptyMVar
- let callback = void $ forkIO (putMVar result =<< takeMVar =<< scheduleUpdate localState event)
+ let callback = do
+ hd <- scheduleUpdate localState event
+ void $ forkIO (putMVar result =<< takeMVar hd)
let encoded = runPutLazy (safePut event)
queueUpdate masterState ((methodTag event, encoded), Left callback)
return result
replicateUpdate :: SlaveState st -> SlaveRepItem -> Bool -> IO ()
replicateUpdate SlaveState{..} (rev, reqId, event) syncing = do
debug $ "Got an Update to replicate " ++ show rev
- modifyMVar_ slaveRevision $ \nr -> case rev - 1 of
- nr -> do
+ modifyMVar_ slaveRevision $ \nr -> if rev - 1 == nr
+ then do
-- commit / run it locally
case reqId of
Nothing ->
-- send reply: we're done
unless syncing $ sendToMaster slaveZmqSocket $ RepDone rev
return rev
- _ -> do
+ else do
sendToMaster slaveZmqSocket RepError
error $ "Replication failed at revision " ++ show rev ++ " -> " ++ show nr
return nr
let encoded = runPutLazy (safePut event)
sendToMaster slaveZmqSocket $ ReqUpdate reqId (methodTag event, encoded)
debug "after send"
- let callback = void $ forkIO $ putMVar result =<< takeMVar =<< scheduleUpdate slaveLocalState event
+ let callback = do
+ hd <- scheduleUpdate slaveLocalState event
+ void $ forkIO $ putMVar result =<< takeMVar hd
return $ M.insert reqId callback srs
return result