1 {-# LANGUAGE DeriveDataTypeable, RecordWildCards, FlexibleContexts #-}
2 --------------------------------------------------------------------------------
4 Module : Data.Acid.CenteredSlave.hs
7 Maintainer : max.voit+hdv@with-eyes.net
8 Portability : non-portable (uses GHC extensions)
10 The Slave part of a the Centered replication backend for acid state.
14 --------------------------------------------------------------------------------
17 module Data.Acid.Centered.Slave
28 import Data.Serialize (decode, encode, runPutLazy, runGetLazy)
32 import Data.Acid.Abstract
33 import Data.Acid.Local
36 import Data.Acid.Centered.Common
38 import Control.Concurrent (forkIO, ThreadId, myThreadId, killThread, threadDelay, forkIOWithUnmask)
39 import Control.Concurrent.MVar (MVar, newMVar, newEmptyMVar, isEmptyMVar,
40 withMVar, modifyMVar, modifyMVar_,
41 takeMVar, putMVar, tryPutMVar)
42 import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
43 import Control.Concurrent.STM.TVar (readTVar, writeTVar)
44 import Data.IORef (writeIORef)
45 import qualified Control.Concurrent.Event as Event
47 import Control.Monad.STM (atomically)
48 import Control.Monad (void, when, unless)
49 import Control.Exception (handle, throwTo, SomeException, ErrorCall(..))
51 import System.ZMQ4 (Context, Socket, Dealer(..),
52 setReceiveHighWM, setSendHighWM, setLinger, restrict,
53 poll, Poll(..), Event(..),
54 context, term, socket, close,
55 connect, disconnect, send, receive)
56 import System.FilePath ( (</>) )
58 import Data.ByteString.Lazy.Char8 (ByteString)
60 import Data.IntMap (IntMap)
61 import qualified Data.IntMap as IM
63 --------------------------------------------------------------------------------
65 -- | Slave state structure, for internal use.
67 = SlaveState { slaveLocalState :: AcidState st
68 , slaveStateIsRed :: Bool
69 , slaveStateLock :: MVar ()
70 , slaveRepFinalizers :: MVar (IntMap (IO ()))
71 , slaveRepChan :: Chan SlaveRepItem
72 , slaveSyncDone :: Event.Event
73 , slaveRevision :: MVar NodeRevision
74 , slaveRequests :: MVar SlaveRequests
75 , slaveLastRequestID :: MVar RequestID
76 , slaveRepThreadId :: MVar ThreadId
77 , slaveReqThreadId :: MVar ThreadId
78 , slaveParentThreadId :: ThreadId
79 , slaveZmqContext :: Context
80 , slaveZmqAddr :: String
81 , slaveZmqSocket :: MVar (Socket Dealer)
84 -- | Memory of own Requests sent to Master.
85 type SlaveRequests = IntMap (IO (IO ()),ThreadId)
87 -- | One Update + Metainformation to replicate.
90 | SRICheckpoint Revision
92 | SRIUpdate Revision (Maybe RequestID) (Tagged ByteString)
94 -- | Open a local State as Slave for a Master.
96 -- The directory for the local state files is the default one ("state/[typeOf
98 enslaveState :: (IsAcidic st, Typeable st) =>
99 String -- ^ hostname of the Master
100 -> PortNumber -- ^ port to connect to
101 -> st -- ^ initial state
103 enslaveState address port initialState =
104 enslaveStateFrom ("state" </> show (typeOf initialState)) address port initialState
106 -- | Open a local State as Slave for a Master.
108 -- The directory for the local state files is the default one ("state/[typeOf
110 enslaveRedState :: (IsAcidic st, Typeable st) =>
111 String -- ^ hostname of the Master
112 -> PortNumber -- ^ port to connect to
113 -> st -- ^ initial state
115 enslaveRedState address port initialState =
116 enslaveRedStateFrom ("state" </> show (typeOf initialState)) address port initialState
118 -- | Open a local State as Slave for a Master. The directory of the local state
119 -- files can be specified.
120 enslaveStateFrom :: (IsAcidic st, Typeable st) =>
121 FilePath -- ^ location of the local state files.
122 -> String -- ^ hostname of the Master
123 -> PortNumber -- ^ port to connect to
124 -> st -- ^ initial state
126 enslaveStateFrom = enslaveMayRedStateFrom False
128 -- | Open a local State as Slave for a _redundant_ Master. The directory of the local state
129 -- files can be specified.
130 enslaveRedStateFrom :: (IsAcidic st, Typeable st) =>
131 FilePath -- ^ location of the local state files.
132 -> String -- ^ hostname of the Master
133 -> PortNumber -- ^ port to connect to
134 -> st -- ^ initial state
136 enslaveRedStateFrom = enslaveMayRedStateFrom True
138 -- | Open a local State as Slave for a Master, redundant or not.
139 -- The directory of the local state files can be specified.
140 enslaveMayRedStateFrom :: (IsAcidic st, Typeable st) =>
141 Bool -- ^ is redundant
142 -> FilePath -- ^ location of the local state files.
143 -> String -- ^ hostname of the Master
144 -> PortNumber -- ^ port to connect to
145 -> st -- ^ initial state
147 enslaveMayRedStateFrom isRed directory address port initialState = do
149 lst <- openLocalStateFrom directory initialState
150 lrev <- getLocalRevision lst
152 debug $ "Opening enslaved state at revision " ++ show lrev
153 srs <- newMVar IM.empty
154 lastReqId <- newMVar 0
156 syncDone <- Event.new
157 reqTid <- newEmptyMVar
158 repTid <- newEmptyMVar
160 repFin <- newMVar IM.empty
161 sLock <- newEmptyMVar
163 let addr = "tcp://" ++ address ++ ":" ++ show port
165 sock <- socket ctx Dealer
166 setReceiveHighWM (restrict (100*1000 :: Int)) sock
167 setSendHighWM (restrict (100*1000 :: Int)) sock
169 msock <- newMVar sock
170 sendToMaster msock $ NewSlave lrev
172 let slaveState = SlaveState { slaveLocalState = lst
173 , slaveStateIsRed = isRed
174 , slaveStateLock = sLock
175 , slaveRepFinalizers = repFin
176 , slaveRepChan = repChan
177 , slaveSyncDone = syncDone
178 , slaveRevision = rev
179 , slaveRequests = srs
180 , slaveLastRequestID = lastReqId
181 , slaveReqThreadId = reqTid
182 , slaveRepThreadId = repTid
183 , slaveParentThreadId = parTid
184 , slaveZmqContext = ctx
185 , slaveZmqAddr = addr
186 , slaveZmqSocket = msock
188 void $ forkIOWithUnmask $ slaveRequestHandler slaveState
189 void $ forkIO $ slaveReplicationHandler slaveState
190 return $ slaveToAcidState slaveState
193 atomically . readTVar . logNextEntryId . localEvents . downcast
195 -- | Replication handler of the Slave.
196 slaveRequestHandler :: (IsAcidic st, Typeable st) => SlaveState st -> (IO () -> IO ()) -> IO ()
197 slaveRequestHandler slaveState@SlaveState{..} unmask = do
199 putMVar slaveReqThreadId mtid
200 let loop = handle (\e -> throwTo slaveParentThreadId (e :: SomeException)) $
201 unmask $ handle killHandler $ do
202 --waitRead =<< readMVar slaveZmqSocket
203 -- FIXME: we needn't poll if not for strange zmq behaviour
204 re <- withMVar slaveZmqSocket $ \sock -> poll 100 [Sock sock [In] Nothing]
205 unless (null $ head re) $ do
206 msg <- withMVar slaveZmqSocket receive
208 Left str -> error $ "Data.Serialize.decode failed on MasterMessage: " ++ show str
209 Right mmsg -> handleMessage mmsg
213 killHandler :: AcidException -> IO ()
214 killHandler GracefulExit = return ()
216 debug $ "Received: " ++ show m
218 -- We are sent an Update to replicate.
219 DoRep r i d -> queueRepItem slaveState (SRIUpdate r i d)
220 -- We are sent a Checkpoint for synchronization.
221 DoSyncCheckpoint r d -> replicateSyncCp slaveState r d
222 -- We are sent an Update to replicate for synchronization.
223 DoSyncRep r d -> replicateSyncUpdate slaveState r d
224 -- Master done sending all synchronization Updates.
225 SyncDone c -> onSyncDone slaveState c
226 -- We are sent a Checkpoint request.
227 DoCheckpoint r -> queueRepItem slaveState (SRICheckpoint r)
228 -- We are sent an Archive request.
229 DoArchive r -> queueRepItem slaveState (SRIArchive r)
230 -- Full replication of a revision
231 FullRep r -> modifyMVar_ slaveRepFinalizers $ \rf -> do
233 return $ IM.delete r rf
234 -- Full replication of events up to revision
235 FullRepTo r -> modifyMVar_ slaveRepFinalizers $ \rf -> do
236 let (ef, nrf) = IM.partitionWithKey (\k _ -> k <= r) rf
237 sequence_ (IM.elems ef)
239 -- We are allowed to Quit.
240 MayQuit -> writeChan slaveRepChan SRIEnd
241 -- We are requested to Quit - shall be handled by
242 -- 'bracket' usage by user.
243 MasterQuit -> throwTo slaveParentThreadId $
244 ErrorCall "Data.Acid.Centered.Slave: Master quit."
245 -- no other messages possible, enforced by type checker
247 -- | After sync check CRC
248 onSyncDone :: (IsAcidic st, Typeable st) => SlaveState st -> Crc -> IO ()
249 onSyncDone SlaveState{..} crc = do
250 localCrc <- crcOfState slaveLocalState
251 if crc /= localCrc then
252 error "Data.Acid.Centered.Slave: CRC mismatch after sync."
254 debug "Sync Done, CRC fine."
255 Event.set slaveSyncDone
257 -- | Queue Updates into Chan for replication.
258 -- We use the Chan so Sync-Updates and normal ones can be interleaved.
259 queueRepItem :: SlaveState st -> SlaveRepItem -> IO ()
260 queueRepItem SlaveState{..} repItem = do
261 debug "Queuing RepItem."
262 writeChan slaveRepChan repItem
264 -- | Replicates content of Chan.
265 slaveReplicationHandler :: Typeable st => SlaveState st -> IO ()
266 slaveReplicationHandler slaveState@SlaveState{..} = do
268 putMVar slaveRepThreadId mtid
270 -- todo: timeout is magic variable, make customizable
271 noTimeout <- Event.waitTimeout slaveSyncDone $ 10*1000*1000
272 unless noTimeout $ throwTo slaveParentThreadId $
273 ErrorCall "Data.Acid.Centered.Slave: Took too long to sync. Timeout."
275 let loop = handle (\e -> throwTo slaveParentThreadId (e :: SomeException)) $ do
276 mayRepItem <- readChan slaveRepChan
279 SRICheckpoint r -> repCheckpoint slaveState r >> loop
280 SRIArchive r -> repArchive slaveState r >> loop
281 SRIUpdate r i d -> replicateUpdate slaveState r i d False >> loop
284 -- signal that we're done
285 void $ takeMVar slaveRepThreadId
287 -- | Replicate Sync-Checkpoints directly.
288 replicateSyncCp :: (IsAcidic st, Typeable st) =>
289 SlaveState st -> Revision -> ByteString -> IO ()
290 replicateSyncCp SlaveState{..} rev encoded = do
291 st <- decodeCheckpoint encoded
292 let lst = downcast slaveLocalState
293 let core = localCore lst
294 modifyMVar_ slaveRevision $ \sr -> do
295 when (sr > rev) $ error "Data.Acid.Centered.Slave: Revision mismatch for checkpoint: Slave is newer."
296 modifyCoreState_ core $ \_ -> do
297 writeIORef (localCopy lst) st
298 createCpFake lst encoded rev
299 adjustEventLogId lst rev
303 adjustEventLogId l r = do
304 atomically $ writeTVar (logNextEntryId (localEvents l)) r
305 void $ cutFileLog (localEvents l)
306 createCpFake l e r = do
308 pushAction (localEvents l) $
309 pushEntry (localCheckpoints l) (Checkpoint r e) (putMVar mvar ())
312 case runGetLazy safeGet e of
313 Left msg -> error $ "Data.Acid.Centered.Slave: Checkpoint could not be decoded: " ++ msg
314 Right val -> return val
316 -- | Replicate Sync-Updates directly.
317 replicateSyncUpdate :: Typeable st => SlaveState st -> Revision -> Tagged ByteString -> IO ()
318 replicateSyncUpdate slaveState rev event = replicateUpdate slaveState rev Nothing event True
320 -- | Replicate an Update as requested by Master.
321 -- Updates that were requested by this Slave are run locally and the result
322 -- put into the MVar in SlaveRequests.
323 -- Other Updates are just replicated without using the result.
324 replicateUpdate :: Typeable st => SlaveState st -> Revision -> Maybe RequestID -> Tagged ByteString -> Bool -> IO ()
325 replicateUpdate SlaveState{..} rev reqId event syncing = do
326 debug $ "Got an Update to replicate " ++ show rev
327 modifyMVar_ slaveRevision $ \nr -> if rev - 1 == nr
329 -- commit / run it locally
331 Nothing -> replicateForeign
332 Just rid -> replicateOwn rid
333 -- send reply: we're done
334 unless syncing $ sendToMaster slaveZmqSocket $ RepDone rev
337 sendToMaster slaveZmqSocket RepError
339 "Data.Acid.Centered.Slave: Replication failed at revision "
340 ++ show nr ++ " -> " ++ show rev
344 if slaveStateIsRed then do
345 act <- newEmptyMVar >>= scheduleLocalColdUpdate' (downcast slaveLocalState) event
346 modifyMVar_ slaveRepFinalizers $ return . IM.insert rev act
348 void $ scheduleColdUpdate slaveLocalState event
349 replicateOwn rid = do
350 act <- modifyMVar slaveRequests $ \srs -> do
351 debug $ "This is the Update for Request " ++ show rid
352 let (icallback, timeoutId) = srs IM.! rid
353 callback <- icallback
355 let nsrs = IM.delete rid srs
356 return (nsrs, callback)
357 when slaveStateIsRed $
358 modifyMVar_ slaveRepFinalizers $ return . IM.insert rev act
360 repCheckpoint :: SlaveState st -> Revision -> IO ()
361 repCheckpoint SlaveState{..} rev = do
362 debug $ "Got Checkpoint request at revision: " ++ show rev
363 withMVar slaveRevision $ \_ ->
365 createCheckpoint slaveLocalState
367 repArchive :: SlaveState st -> Revision -> IO ()
368 repArchive SlaveState{..} rev = do
369 debug $ "Got Archive request at revision: " ++ show rev
370 withMVar slaveRevision $ \_ ->
371 createArchive slaveLocalState
374 -- | Update on slave site.
376 -- - Request Update from Master
377 -- - Master issues Update with same RequestID
378 -- - repHandler replicates and puts result in MVar
379 scheduleSlaveUpdate :: (UpdateEvent e, Typeable (EventState e)) => SlaveState (EventState e) -> e -> IO (MVar (EventResult e))
380 scheduleSlaveUpdate slaveState@SlaveState{..} event = do
381 unlocked <- isEmptyMVar slaveStateLock
382 if not unlocked then error "State is locked."
384 debug "Update by Slave."
385 result <- newEmptyMVar
386 reqId <- getNextRequestId slaveState
387 modifyMVar_ slaveRequests $ \srs -> do
388 let encoded = runPutLazy (safePut event)
389 sendToMaster slaveZmqSocket $ ReqUpdate reqId (methodTag event, encoded)
390 timeoutID <- forkIO $ timeoutRequest slaveState reqId result
391 let callback = if slaveStateIsRed
392 then scheduleLocalUpdate' (downcast slaveLocalState) event result
394 hd <- scheduleUpdate slaveLocalState event
395 void $ forkIO $ putMVar result =<< takeMVar hd
396 return (return ()) -- bogus finalizer
397 return $ IM.insert reqId (callback, timeoutID) srs
400 -- | Cold Update on slave site. This enables for using Remote.
401 scheduleSlaveColdUpdate :: Typeable st => SlaveState st -> Tagged ByteString -> IO (MVar ByteString)
402 scheduleSlaveColdUpdate slaveState@SlaveState{..} encoded = do
403 unlocked <- isEmptyMVar slaveStateLock
404 if not unlocked then error "State is locked."
406 debug "Cold Update by Slave."
407 result <- newEmptyMVar
408 -- slaveLastRequestID is only modified here - and used for locking the state
409 reqId <- getNextRequestId slaveState
410 modifyMVar_ slaveRequests $ \srs -> do
411 sendToMaster slaveZmqSocket $ ReqUpdate reqId encoded
412 timeoutID <- forkIO $ timeoutRequest slaveState reqId result
413 let callback = if slaveStateIsRed
414 then scheduleLocalColdUpdate' (downcast slaveLocalState) encoded result
416 hd <- scheduleColdUpdate slaveLocalState encoded
417 void $ forkIO $ putMVar result =<< takeMVar hd
418 return (return ()) -- bogus finalizer
419 return $ IM.insert reqId (callback, timeoutID) srs
422 -- | Generate ID for another request.
423 getNextRequestId :: SlaveState st -> IO RequestID
424 getNextRequestId SlaveState{..} = modifyMVar slaveLastRequestID $ \x -> return (x+1,x+1)
426 -- | Ensures requests are actually answered or fail.
427 -- On timeout the Slave dies, not the thread that invoked the Update.
428 timeoutRequest :: SlaveState st -> RequestID -> MVar m -> IO ()
429 timeoutRequest SlaveState{..} reqId mvar = do
430 threadDelay $ 5*1000*1000
431 stillThere <- withMVar slaveRequests (return . IM.member reqId)
433 putMVar mvar $ error "Data.Acid.Centered.Slave: Update-Request timed out."
434 throwTo slaveParentThreadId $ ErrorCall "Data.Acid.Centered.Slave: Update-Request timed out."
436 -- | Send a message to Master.
437 sendToMaster :: MVar (Socket Dealer) -> SlaveMessage -> IO ()
438 sendToMaster msock smsg = withMVar msock $ \sock -> send sock [] (encode smsg)
440 -- | Close an enslaved State.
441 liberateState :: SlaveState st -> IO ()
442 liberateState SlaveState{..} =
443 -- lock state against updates: disallow requests
444 whenM (tryPutMVar slaveStateLock ()) $ do
445 debug "Closing Slave state..."
446 -- check / wait unprocessed requests
447 debug "Waiting for Requests to finish."
448 waitPoll 100 (withMVar slaveRequests (return . IM.null))
449 -- send master quit message
450 sendToMaster slaveZmqSocket SlaveQuit
451 -- wait replication chan, only if sync done
452 syncDone <- Event.isSet slaveSyncDone
454 debug "Waiting for repChan to empty."
456 putMVar slaveRepThreadId mtid
457 -- kill handler threads
458 debug "Killing request handler."
459 withMVar slaveReqThreadId $ flip throwTo GracefulExit
461 debug "Closing down zmq."
462 withMVar slaveZmqSocket $ \s -> do
463 -- avoid the socket hanging around
464 setLinger (restrict (1000 :: Int)) s
465 disconnect s slaveZmqAddr
468 -- cleanup local state
469 debug "Closing local state."
470 closeAcidState slaveLocalState
472 slaveToAcidState :: (IsAcidic st, Typeable st) => SlaveState st -> AcidState st
473 slaveToAcidState slaveState
474 = AcidState { _scheduleUpdate = scheduleSlaveUpdate slaveState
475 , scheduleColdUpdate = scheduleSlaveColdUpdate slaveState
476 , _query = query $ slaveLocalState slaveState
477 , queryCold = queryCold $ slaveLocalState slaveState
478 , createCheckpoint = createCheckpoint $ slaveLocalState slaveState
479 , createArchive = createArchive $ slaveLocalState slaveState
480 , closeAcidState = liberateState slaveState
481 , acidSubState = mkAnyState slaveState