1 {-# LANGUAGE DeriveDataTypeable, RecordWildCards, FlexibleContexts #-}
2 --------------------------------------------------------------------------------
4 Module : Data.Acid.Centered.Master
7 Maintainer : max.voit+hdv@with-eyes.net
8 Portability : non-portable (uses GHC extensions)
10 The Master part of the Centered replication backend for acid state.
13 {- big chunks still todo:
17 module Data.Acid.Centered.Master
22 , openRedMasterStateFrom
23 , createArchiveGlobally
29 import Data.Serialize (decode, encode, runPutLazy)
33 import Data.Acid.Abstract
34 import Data.Acid.Local
37 import Data.Acid.Centered.Common
39 import Control.Concurrent (forkIO, ThreadId, myThreadId)
40 import Control.Concurrent.Chan (Chan, newChan, writeChan, readChan, dupChan)
41 import Control.Concurrent.STM.TVar (readTVar)
42 import Control.Concurrent.MVar(MVar, newMVar, newEmptyMVar,
43 takeMVar, putMVar, tryPutMVar, isEmptyMVar,
44 modifyMVar, modifyMVar_, withMVar)
46 import Control.Monad.STM (atomically)
47 import Control.Monad (when, unless, void, forM_, liftM2)
48 import Control.Exception (handle, throwTo, SomeException)
50 import System.ZMQ4 (Context, Socket, Router(..), Receiver,
51 setReceiveHighWM, setSendHighWM, restrict,
52 context, term, socket, close, bind, unbind,
53 poll, Poll(..), Event(..),
54 sendMulti, receiveMulti)
55 import System.FilePath ( (</>) )
57 import qualified Data.ByteString.Lazy.Char8 as CSL
58 import Data.ByteString.Lazy.Char8 (ByteString)
59 import qualified Data.ByteString.Char8 as CS
61 import qualified Data.Map as M
63 import qualified Data.IntMap as IM
64 import Data.IntMap (IntMap)
65 import qualified Data.List.NonEmpty as NEL
68 --------------------------------------------------------------------------------
70 -- | Master state structure, for internal use.
72 = MasterState { localState :: AcidState st
73 , nodeStatus :: MVar NodeStatus
74 , repRedundancy :: Int
75 , repFinalizers :: MVar (IntMap (IO ()))
76 , masterStateLock :: MVar ()
77 , masterRevision :: MVar NodeRevision
78 , masterRevisionN :: MVar NodeRevision
79 , masterReplicationChan :: Chan ReplicationItem
80 , masterReplicationChanN :: Chan ReplicationItem
81 , masterReqThreadId :: MVar ThreadId
82 , masterRepLThreadId :: MVar ThreadId
83 , masterRepNThreadId :: MVar ThreadId
84 , masterParentThreadId :: ThreadId
85 , zmqContext :: Context
87 , zmqSocket :: MVar (Socket Router)
90 type NodeIdentity = CS.ByteString
91 type NodeStatus = Map NodeIdentity NodeRevision
92 type Callback = IO (IO ()) -- an IO action that returns a finalizer
93 data ReplicationItem =
97 | RIUpdate (Tagged ByteString) (Either Callback (RequestID, NodeIdentity))
99 -- | The request handler on master node. Does
100 -- o handle receiving requests from nodes,
101 -- o answering as needed (old updates),
102 -- o bookkeeping on node states.
103 masterRequestHandler :: (IsAcidic st, Typeable st) => MasterState st -> IO ()
104 masterRequestHandler masterState@MasterState{..} = do
106 putMVar masterReqThreadId mtid
107 let loop = handle (\e -> throwTo masterParentThreadId (e :: SomeException)) $
108 handle killHandler $ do
110 -- waitRead =<< readMVar zmqSocket
111 -- FIXME: we needn't poll if not for strange zmq behaviour
112 re <- withMVar zmqSocket $ \sock -> poll 100 [Sock sock [In] Nothing]
113 unless (null $ head re) $ do
114 (ident, msg) <- withMVar zmqSocket receiveFrame
115 -- handle according frame contents
118 NewSlave r -> connectNode masterState ident r
119 -- Slave is done replicating.
120 RepDone r -> whenM (identityIsValid ident) $
121 updateNodeStatus masterState ident r
122 -- Slave sends an Udate.
123 ReqUpdate rid event -> whenM (identityIsValid ident) $
124 queueRepItem masterState (RIUpdate event (Right (rid, ident)))
127 sendToSlave zmqSocket MayQuit ident
128 removeFromNodeStatus nodeStatus ident
130 sendToSlave zmqSocket MayQuit ident
131 removeFromNodeStatus nodeStatus ident
132 -- no other messages possible
136 killHandler :: AcidException -> IO ()
137 killHandler GracefulExit = return ()
138 identityIsValid i = do
139 isMember <- withMVar nodeStatus $ return . (i `M.member`)
140 if isMember then return True
142 debug $ "Request by unknown node [" ++ CS.unpack i ++ "]"
143 sendToSlave zmqSocket MayQuit i
146 -- | Remove a Slave node from NodeStatus.
147 removeFromNodeStatus :: MVar NodeStatus -> NodeIdentity -> IO ()
148 removeFromNodeStatus nodeStatus ident =
149 modifyMVar_ nodeStatus $ return . M.delete ident
151 -- | Update the NodeStatus after a node has replicated an Update.
152 updateNodeStatus :: MasterState st -> NodeIdentity -> Int -> IO ()
153 updateNodeStatus MasterState{..} ident r =
154 modifyMVar_ nodeStatus $ \ns -> do
155 when (ns M.! ident /= (r - 1)) $
156 error $ "Invalid increment of node status " ++ show (ns M.! ident) ++ " -> " ++ show r
157 let rns = M.adjust (+1) ident ns
158 -- only for redundant operation:
159 when ((repRedundancy > 1) && (M.size (M.filter (>=r) rns) >= (repRedundancy - 1))) $ do
160 debug $ "Full replication of " ++ show r
161 -- finalize local replication
162 modifyMVar_ repFinalizers $ \rf -> do
164 return $ IM.delete r rf
165 -- send out FullRep signal
166 forM_ (M.keys ns) $ sendToSlave zmqSocket (FullRep r)
169 -- | Connect a new Slave by getting it up-to-date,
170 -- i.e. send all past events as Updates. This is fire&forget.
171 connectNode :: (IsAcidic st, Typeable st) => MasterState st -> NodeIdentity -> Revision -> IO ()
172 connectNode MasterState{..} i revision =
173 -- locking masterRevision prohibits additional events written on disk
174 withMVar masterRevision $ \mr ->
175 modifyMVar_ nodeStatus $ \ns -> do
176 -- crc generated from localCore thus corresponds to disk
177 crc <- crcOfState localState
178 -- if there has been one/more checkpoint in between:
179 lastCp <- getLastCheckpointRev localState
180 let lastCpRev = cpRevision lastCp
181 debug $ "Found checkpoint at revision " ++ show lastCpRev
182 if lastCpRev > revision then do
183 -- send last checkpoint and newer events
184 sendSyncCheckpoint zmqSocket lastCp i
185 pastUpdates <- getPastUpdates localState lastCpRev
186 forM_ pastUpdates $ \(r, u) -> sendSyncUpdate zmqSocket r u i
189 pastUpdates <- getPastUpdates localState revision
190 forM_ pastUpdates $ \(r, u) -> sendSyncUpdate zmqSocket r u i
191 sendToSlave zmqSocket (SyncDone crc) i
192 let nns = M.insert i mr ns
193 -- only for redundant operation:
194 when (repRedundancy > 1) $ checkRepStatus mr nns
197 cpRevision (Checkpoint r _) = r
198 sendSyncCheckpoint sock (Checkpoint cr encoded) =
199 sendToSlave sock (DoSyncCheckpoint cr encoded)
200 sendSyncUpdate sock r encoded =
201 sendToSlave sock (DoSyncRep r encoded)
202 -- FIXME: do this better (less than maxRev is possible in corner cases)
203 checkRepStatus maxRev pns =
204 when (M.size (M.filter (>= maxRev) pns) >= (repRedundancy-2)) $ do
205 debug $ "Full replication up to " ++ show maxRev
206 -- finalize local replication
207 modifyMVar_ repFinalizers $ \rf -> do
208 forM_ (filter (<= maxRev) (IM.keys rf)) $ \r -> rf IM.! r
209 return $ IM.filterWithKey (\k _ -> k > maxRev) rf
210 -- send out FullRep signal
211 forM_ (M.keys pns) $ sendToSlave zmqSocket (FullRepTo maxRev)
214 -- | Fetch past Updates from FileLog for replication.
215 getPastUpdates :: (Typeable st) => AcidState st -> Int -> IO [(Int, Tagged ByteString)]
216 getPastUpdates state startRev = liftM2 zip (return [(startRev+1)..]) (readEntriesFrom (localEvents $ downcast state) startRev)
218 -- | Get the revision at which the last checkpoint was taken.
219 getLastCheckpointRev :: (Typeable st) => AcidState st -> IO Checkpoint
220 getLastCheckpointRev state = do
221 let lst = downcast state
222 let cplog = localCheckpoints lst
223 nextId <- atomically $ readTVar $ logNextEntryId cplog
224 cps <- readEntriesFrom cplog (nextId - 1)
225 return $ headDef (Checkpoint 0 CSL.empty) cps
227 -- | Send a message to a Slave
228 sendToSlave :: MVar (Socket Router) -> MasterMessage -> NodeIdentity -> IO ()
229 sendToSlave msock msg ident = withMVar msock $ \sock -> sendMulti sock $ NEL.fromList [ident, encode msg]
231 -- | Receive one Frame. A Frame consists of three messages:
232 -- sender ID, empty message, and actual content
233 receiveFrame :: (Receiver t) => Socket t -> IO (NodeIdentity, SlaveMessage)
234 receiveFrame sock = do
235 list <- receiveMulti sock
236 when (length list /= 2) $ error "Received invalid frame."
237 let ident = head list
240 Left str -> error $ "Data.Serialize.decode failed on SlaveMessage: " ++ show str
242 debug $ "Received from [" ++ CS.unpack ident ++ "]: "
243 ++ take 20 (show smsg)
246 -- | Open the Master state.
248 -- The directory for the local state files is the default one ("state/[typeOf state]/").
249 openMasterState :: (IsAcidic st, Typeable st) =>
250 String -- ^ address to bind (useful to listen on specific interfaces only)
251 -> PortNumber -- ^ port to bind to
252 -> st -- ^ initial state
254 openMasterState address port initialState =
255 openMasterStateFrom ("state" </> show (typeOf initialState)) address port initialState
257 -- | Open the master state from a specific location.
258 openMasterStateFrom :: (IsAcidic st, Typeable st) =>
259 FilePath -- ^ location of the local state files
260 -> String -- ^ address to bind (useful to listen on specific interfaces only)
261 -> PortNumber -- ^ port to bind to
262 -> st -- ^ initial state
264 openMasterStateFrom directory address port =
265 openRedMasterStateFrom directory address port 0
267 -- | Open the master state with /n/-redundant replication.
269 -- The directory for the local state files is the default one ("state/[typeOf
271 openRedMasterState :: (IsAcidic st, Typeable st) =>
272 String -- ^ address to bind (useful to listen on specific interfaces only)
273 -> PortNumber -- ^ port to bind to
274 -> Int -- ^ guarantee n-redundant replication
275 -> st -- ^ initial state
277 openRedMasterState address port red initialState =
278 openRedMasterStateFrom ("state" </> show (typeOf initialState)) address port red initialState
280 -- | Open the master state from a specific location with redundant replication.
281 openRedMasterStateFrom :: (IsAcidic st, Typeable st) =>
282 FilePath -- ^ location of the local state files
283 -> String -- ^ address to bind (useful to listen on specific interfaces only)
284 -> PortNumber -- ^ port to bind to
285 -> Int -- ^ guarantee /n/-redundant replication
286 -> st -- ^ initial state
288 openRedMasterStateFrom directory address port red initialState = do
289 debug "opening master state"
291 lst <- openLocalStateFrom directory initialState
292 let levs = localEvents $ downcast lst
293 lrev <- atomically $ readTVar $ logNextEntryId levs
297 repChanN <- dupChan repChan
298 repFin <- newMVar IM.empty
299 ns <- newMVar M.empty
301 let addr = "tcp://" ++ address ++ ":" ++ show port
303 sock <- socket ctx Router
304 setReceiveHighWM (restrict (100*1000 :: Int)) sock
305 setSendHighWM (restrict (100*1000 :: Int)) sock
307 msock <- newMVar sock
308 repTidL <- newEmptyMVar
309 repTidN <- newEmptyMVar
310 reqTid <- newEmptyMVar
313 let masterState = MasterState { localState = lst
315 , repRedundancy = red
316 , repFinalizers = repFin
317 , masterStateLock = lock
318 , masterRevision = rev
319 , masterRevisionN = revN
320 , masterReplicationChan = repChan
321 , masterReplicationChanN = repChanN
322 , masterRepLThreadId = repTidL
323 , masterRepNThreadId = repTidN
324 , masterReqThreadId = reqTid
325 , masterParentThreadId = parTid
330 void $ forkIO $ masterRequestHandler masterState
331 void $ forkIO $ masterReplicationHandlerL masterState
332 void $ forkIO $ masterReplicationHandlerN masterState
333 return $ toAcidState masterState
335 -- | Close the master state.
336 closeMasterState :: MasterState st -> IO ()
337 closeMasterState MasterState{..} =
339 whenM (tryPutMVar masterStateLock ()) $ do
340 debug "Closing master state."
342 debug "Nodes quitting."
343 withMVar nodeStatus $ mapM_ (sendToSlave zmqSocket MasterQuit) . M.keys
344 -- wait all nodes done
345 waitPollN 100 1000 (withMVar nodeStatus (return . M.null))
346 -- wait replication chan
347 debug "Waiting for repChans to empty."
348 writeChan masterReplicationChan RIEnd
350 putMVar masterRepLThreadId mtid
351 putMVar masterRepNThreadId mtid
353 debug "Killing request handler."
354 withMVar masterReqThreadId $ flip throwTo GracefulExit
356 debug "Closing down zmq."
357 withMVar zmqSocket $ \sock -> do
361 -- cleanup local state
362 closeAcidState localState
364 -- | Update on master site.
365 scheduleMasterUpdate :: (UpdateEvent event, Typeable (EventState event)) => MasterState (EventState event) -> event -> IO (MVar (EventResult event))
366 scheduleMasterUpdate masterState@MasterState{..} event = do
367 debug "Update by Master."
368 unlocked <- isEmptyMVar masterStateLock
369 if not unlocked then error "State is locked!"
371 result <- newEmptyMVar
372 let callback = if repRedundancy > 1
374 -- the returned action fills in result when executed later
375 scheduleLocalUpdate' (downcast localState) event result
377 hd <- scheduleUpdate localState event
378 void $ forkIO (putMVar result =<< takeMVar hd)
379 return (return ()) -- bogus finalizer
380 let encoded = runPutLazy (safePut event)
381 queueRepItem masterState (RIUpdate (methodTag event, encoded) (Left callback))
384 -- | Cold Update on master site.
385 scheduleMasterColdUpdate :: Typeable st => MasterState st -> Tagged ByteString -> IO (MVar ByteString)
386 scheduleMasterColdUpdate masterState@MasterState{..} encoded = do
387 debug "Cold Update by Master."
388 unlocked <- isEmptyMVar masterStateLock
389 if not unlocked then error "State is locked!"
391 result <- newEmptyMVar
392 let callback = if repRedundancy > 1
394 -- the returned action fills in result when executed later
395 scheduleLocalColdUpdate' (downcast localState) encoded result
397 hd <- scheduleColdUpdate localState encoded
398 void $ forkIO (putMVar result =<< takeMVar hd)
399 return (return ()) -- bogus finalizer
400 queueRepItem masterState (RIUpdate encoded (Left callback))
403 -- | Queue an RepItem (originating from the Master itself of an Slave via zmq)
404 queueRepItem :: MasterState st -> ReplicationItem -> IO ()
405 queueRepItem MasterState{..} = writeChan masterReplicationChan
407 -- | The local replication handler. Takes care to run Updates locally.
408 masterReplicationHandlerL :: (Typeable st) => MasterState st -> IO ()
409 masterReplicationHandlerL MasterState{..} = do
411 putMVar masterRepLThreadId mtid
412 let loop = handle (\e -> throwTo masterParentThreadId (e :: SomeException)) $ do
413 debug "Replicating next item locally."
414 repItem <- readChan masterReplicationChan
418 debug "Archive on master."
419 createArchive localState
422 debug "Checkpoint on master."
423 createCheckpoint localState
425 RIUpdate event sink -> do
428 (rev, act) <- modifyMVar masterRevision $ \r -> do
430 Left callback -> callback
431 _ -> newEmptyMVar >>= scheduleLocalColdUpdate' (downcast localState) event
433 -- act finalizes the transaction - will be run after full replication
434 modifyMVar_ repFinalizers $ return . IM.insert rev act
436 modifyMVar_ masterRevision $ \r -> do
438 Left callback -> void callback
439 _ -> void $ scheduleColdUpdate localState event
443 -- signal that we're done
444 void $ takeMVar masterRepLThreadId
446 -- | The network replication handler. Takes care to run Updates on Slaves.
447 masterReplicationHandlerN :: MasterState st -> IO ()
448 masterReplicationHandlerN MasterState{..} = do
450 putMVar masterRepNThreadId mtid
451 let loop = handle (\e -> throwTo masterParentThreadId (e :: SomeException)) $ do
452 debug "Replicating next item in network."
453 repItem <- readChan masterReplicationChanN
457 withMVar nodeStatus $ \ns -> do
458 debug "Sending archive request to Slaves."
459 withMVar masterRevisionN $ \mr ->
460 forM_ (M.keys ns) $ sendArchive zmqSocket mr
463 withMVar nodeStatus $ \ns -> do
464 debug "Sending Checkpoint Request to Slaves."
465 withMVar masterRevisionN $ \mr ->
466 forM_ (M.keys ns) $ sendCheckpoint zmqSocket mr
468 RIUpdate event sink -> do
469 withMVar nodeStatus $ \ns -> do
470 debug $ "Sending Update to Slaves, there are " ++ show (M.size ns)
471 modifyMVar_ masterRevisionN $ \mrOld -> do
474 Left _ -> forM_ (M.keys $ M.filter (<mr) ns) $ sendUpdate zmqSocket mr Nothing event
475 Right (reqID, reqNodeIdent) -> do
476 let noReqSlaves = filter (/= reqNodeIdent) $ M.keys $ M.filter (<mr) ns
477 sendUpdate zmqSocket mr (Just reqID) event reqNodeIdent
478 forM_ noReqSlaves $ sendUpdate zmqSocket mr Nothing event
482 -- signal that we're done
483 void $ takeMVar masterRepNThreadId
485 sendUpdate sock revision reqId encoded =
486 sendToSlave sock (DoRep revision reqId encoded)
487 sendCheckpoint sock revision = sendToSlave sock (DoCheckpoint revision)
488 sendArchive sock revision = sendToSlave sock (DoArchive revision)
490 -- | Create a checkpoint (on all nodes, per request).
491 -- This is useful for faster resume of both the Master (at startup) and
492 -- Slaves (at startup and reconnect).
493 createMasterCheckpoint :: MasterState st -> IO ()
494 createMasterCheckpoint masterState@MasterState{..} = do
496 unlocked <- isEmptyMVar masterStateLock
497 unless unlocked $ error "State is locked."
498 queueRepItem masterState RICheckpoint
500 -- | Create an archive on all nodes.
501 -- Usually createArchive (local to each node) is appropriate.
502 -- Also take care: Nodes that are not connected at the time, will not create
503 -- an archive (on reconnect).
504 createArchiveGlobally :: (IsAcidic st, Typeable st) => AcidState st -> IO ()
505 createArchiveGlobally acid = do
506 debug "Archive globally."
507 let masterState = downcast acid
508 queueRepItem masterState RIArchive
511 toAcidState :: (IsAcidic st, Typeable st) => MasterState st -> AcidState st
513 = AcidState { _scheduleUpdate = scheduleMasterUpdate master
514 , scheduleColdUpdate = scheduleMasterColdUpdate master
515 , _query = query $ localState master
516 , queryCold = queryCold $ localState master
517 , createCheckpoint = createMasterCheckpoint master
518 , createArchive = createArchive $ localState master
519 , closeAcidState = closeMasterState master
520 , acidSubState = mkAnyState master