d70c6bf04a5b1391f39e20e856727f0ff2ca78f2
[acid-state-dist.git] / src / Data / Acid / Centered / Master.hs
1 {-# LANGUAGE DeriveDataTypeable, RecordWildCards, FlexibleContexts #-}
2 --------------------------------------------------------------------------------
3 {- |
4   Module      :  Data.Acid.Centered.Master
5   Copyright   :  ?
6
7   Maintainer  :  max.voit+hdv@with-eyes.net
8   Portability :  non-portable (uses GHC extensions)
9
10   The Master part of the Centered replication backend for acid state.
11
12 -}
13 {- big chunks still todo:
14     o authentification
15     o encryption
16 -}
17 module Data.Acid.Centered.Master
18     (
19       openMasterState
20     , openMasterStateFrom
21     , openRedMasterState
22     , openRedMasterStateFrom
23     , createArchiveGlobally
24     , MasterState(..)
25     ) where
26
27 import Data.Typeable
28 import Data.SafeCopy
29
30 import Data.Acid
31 import Data.Acid.Core
32 import Data.Acid.Abstract
33 import Data.Acid.Local
34 import Data.Acid.Log
35 import Data.Serialize (decode, encode, runPutLazy)
36
37 import Data.Acid.Centered.Common
38
39 import Control.Concurrent (forkIO, ThreadId, myThreadId)
40 import Control.Concurrent.Chan (Chan, newChan, writeChan, readChan, dupChan)
41 import Control.Monad (when, unless, void, forM_, liftM2, liftM)
42 import Control.Monad.STM (atomically)
43 import Control.Concurrent.STM.TVar (readTVar)
44 import Control.Concurrent.MVar(MVar, newMVar, newEmptyMVar,
45                                takeMVar, putMVar, tryPutMVar, isEmptyMVar,
46                                modifyMVar, modifyMVar_, withMVar)
47 import Control.Exception (handle, throwTo, SomeException)
48
49 import System.ZMQ4 (Context, Socket, Router(..), Receiver,
50                     setReceiveHighWM, setSendHighWM, restrict,
51                     context, term, socket, close, bind, unbind,
52                     poll, Poll(..), Event(..),
53                     sendMulti, receiveMulti)
54 import System.FilePath ( (</>) )
55
56 import qualified Data.Map as M
57 import Data.Map (Map)
58 import qualified Data.IntMap as IM
59 import Data.IntMap (IntMap)
60 import qualified Data.ByteString.Lazy.Char8 as CSL
61 import qualified Data.ByteString.Char8 as CS
62 import Data.ByteString.Char8 (ByteString)
63 import qualified Data.List.NonEmpty as NEL
64 import Safe (headDef)
65
66 --------------------------------------------------------------------------------
67
68 -- | Master state structure, for internal use.
69 data MasterState st
70     = MasterState { localState :: AcidState st
71                   , nodeStatus :: MVar NodeStatus
72                   , repRedundancy :: Int
73                   , repFinalizers :: MVar (IntMap (IO ()))
74                   , masterStateLock :: MVar ()
75                   , masterRevision :: MVar NodeRevision
76                   , masterRevisionN :: MVar NodeRevision
77                   , masterReplicationChan :: Chan ReplicationItem
78                   , masterReplicationChanN :: Chan ReplicationItem
79                   , masterReqThreadId :: MVar ThreadId
80                   , masterRepLThreadId :: MVar ThreadId
81                   , masterRepNThreadId :: MVar ThreadId
82                   , masterParentThreadId :: ThreadId
83                   , zmqContext :: Context
84                   , zmqAddr :: String
85                   , zmqSocket :: MVar (Socket Router)
86                   } deriving (Typeable)
87
88 type NodeIdentity = ByteString
89 type NodeStatus = Map NodeIdentity NodeRevision
90 type Callback = IO (IO ())      -- an IO action that returns a finalizer
91 data ReplicationItem =
92       RIEnd
93     | RICheckpoint
94     | RIArchive
95     | RIUpdate (Tagged CSL.ByteString) (Either Callback (RequestID, NodeIdentity))
96
97 -- | The request handler on master node. Does
98 --      o handle receiving requests from nodes,
99 --      o answering as needed (old updates),
100 --      o bookkeeping on node states.
101 masterRequestHandler :: (IsAcidic st, Typeable st) => MasterState st -> IO ()
102 masterRequestHandler masterState@MasterState{..} = do
103     mtid <- myThreadId
104     putMVar masterReqThreadId mtid
105     let loop = handle (\e -> throwTo masterParentThreadId (e :: SomeException)) $
106           handle killHandler $ do
107             -- take one frame
108             -- waitRead =<< readMVar zmqSocket
109             -- FIXME: we needn't poll if not for strange zmq behaviour
110             re <- withMVar zmqSocket $ \sock -> poll 100 [Sock sock [In] Nothing]
111             unless (null $ head re) $ do
112                 (ident, msg) <- withMVar zmqSocket receiveFrame
113                 -- handle according frame contents
114                 case msg of
115                     -- New Slave joined.
116                     NewSlave r -> connectNode masterState ident r
117                     -- Slave is done replicating.
118                     RepDone r -> whenM (identityIsValid ident) $
119                         updateNodeStatus masterState ident r
120                     -- Slave sends an Udate.
121                     ReqUpdate rid event -> whenM (identityIsValid ident) $
122                         queueRepItem masterState (RIUpdate event (Right (rid, ident)))
123                     -- Slave quits.
124                     SlaveQuit -> do
125                         sendToSlave zmqSocket MayQuit ident
126                         removeFromNodeStatus nodeStatus ident
127                     RepError -> do
128                         sendToSlave zmqSocket MayQuit ident
129                         removeFromNodeStatus nodeStatus ident
130                     -- no other messages possible
131             loop
132     loop
133     where
134         killHandler :: AcidException -> IO ()
135         killHandler GracefulExit = return ()
136         identityIsValid i = do
137             isMember <- withMVar nodeStatus $ return . (i `M.member`)
138             if isMember then return True
139             else do
140                 debug $ "Request by unknown node [" ++ CS.unpack i ++ "]"
141                 sendToSlave zmqSocket MayQuit i
142                 return False
143
144 -- | Remove a Slave node from NodeStatus.
145 removeFromNodeStatus :: MVar NodeStatus -> NodeIdentity -> IO ()
146 removeFromNodeStatus nodeStatus ident =
147         modifyMVar_ nodeStatus $ return . M.delete ident
148
149 -- | Update the NodeStatus after a node has replicated an Update.
150 updateNodeStatus :: MasterState st -> NodeIdentity -> Int -> IO ()
151 updateNodeStatus MasterState{..} ident r =
152     modifyMVar_ nodeStatus $ \ns -> do
153         when (ns M.! ident /= (r - 1)) $
154             error $ "Invalid increment of node status " ++ show (ns M.! ident) ++ " -> " ++ show r
155         let rns = M.adjust (+1) ident ns
156         -- only for redundant operation:
157         when ((repRedundancy > 1) && (M.size (M.filter (>=r) rns) >= (repRedundancy - 1))) $ do
158             debug $ "Full replication of " ++ show r
159             -- finalize local replication
160             modifyMVar_ repFinalizers $ \rf -> do
161                 rf IM.! r
162                 return $ IM.delete r rf
163             -- send out FullRep signal
164             forM_ (M.keys ns) $ sendToSlave zmqSocket (FullRep r)
165         return rns
166
167 -- | Connect a new Slave by getting it up-to-date,
168 --   i.e. send all past events as Updates. This is fire&forget.
169 connectNode :: (IsAcidic st, Typeable st) => MasterState st -> NodeIdentity -> Revision -> IO ()
170 connectNode MasterState{..} i revision =
171     -- locking masterRevision prohibits additional events written on disk
172     withMVar masterRevision $ \mr ->
173         modifyMVar_ nodeStatus $ \ns -> do
174             -- crc generated from localCore thus corresponds to disk
175             crc <- crcOfState localState
176             -- if there has been one/more checkpoint in between:
177             lastCp <- getLastCheckpointRev localState
178             let lastCpRev = cpRevision lastCp
179             debug $ "Found checkpoint at revision " ++ show lastCpRev
180             if lastCpRev > revision then do
181                 -- send last checkpoint and newer events
182                 sendSyncCheckpoint zmqSocket lastCp i
183                 pastUpdates <- getPastUpdates localState lastCpRev
184                 forM_ pastUpdates $ \(r, u) -> sendSyncUpdate zmqSocket r u i
185             else do
186                 -- just the events
187                 pastUpdates <- getPastUpdates localState revision
188                 forM_ pastUpdates $ \(r, u) -> sendSyncUpdate zmqSocket r u i
189             sendToSlave zmqSocket (SyncDone crc) i
190             let nns = M.insert i mr ns
191             -- only for redundant operation:
192             when (repRedundancy > 1) $ checkRepStatus mr nns
193             return nns
194     where
195         cpRevision (Checkpoint r _) = r
196         sendSyncCheckpoint sock (Checkpoint cr encoded) =
197             sendToSlave sock (DoSyncCheckpoint cr encoded)
198         sendSyncUpdate sock r encoded =
199             sendToSlave sock (DoSyncRep r encoded)
200         -- FIXME: do this better (less than maxRev is possible in corner cases)
201         checkRepStatus maxRev pns =
202             when (M.size (M.filter (>= maxRev) pns) >= (repRedundancy-2)) $ do
203                 debug $ "Full replication up to " ++ show maxRev
204                 -- finalize local replication
205                 modifyMVar_ repFinalizers $ \rf -> do
206                     forM_ (filter (<= maxRev) (IM.keys rf)) $ \r -> rf IM.! r
207                     return $ IM.filterWithKey (\k _ -> k > maxRev) rf
208                 -- send out FullRep signal
209                 forM_ (M.keys pns) $ sendToSlave zmqSocket (FullRepTo maxRev)
210
211
212 -- | Fetch past Updates from FileLog for replication.
213 getPastUpdates :: (Typeable st) => AcidState st -> Int -> IO [(Int, Tagged CSL.ByteString)]
214 getPastUpdates state startRev = liftM2 zip (return [(startRev+1)..]) (readEntriesFrom (localEvents $ downcast state) startRev)
215
216 -- | Get the revision at which the last checkpoint was taken.
217 getLastCheckpointRev :: (Typeable st) => AcidState st -> IO Checkpoint
218 getLastCheckpointRev state = do
219     let lst = downcast state
220     let cplog = localCheckpoints lst
221     nextId <- atomically $ readTVar $ logNextEntryId cplog
222     cps <- readEntriesFrom cplog (nextId - 1)
223     return $ headDef (Checkpoint 0 CSL.empty) cps
224
225 -- | Send a message to a Slave
226 sendToSlave :: MVar (Socket Router) -> MasterMessage -> NodeIdentity -> IO ()
227 sendToSlave msock msg ident = withMVar msock $ \sock -> sendMulti sock $ NEL.fromList [ident, encode msg]
228
229 -- | Receive one Frame. A Frame consists of three messages:
230 --      sender ID, empty message, and actual content
231 receiveFrame :: (Receiver t) => Socket t -> IO (NodeIdentity, SlaveMessage)
232 receiveFrame sock = do
233     list <- receiveMulti sock
234     when (length list /= 2) $ error "Received invalid frame."
235     let ident = head list
236     let msg = list !! 1
237     case decode msg of
238         Left str -> error $ "Data.Serialize.decode failed on SlaveMessage: " ++ show str
239         Right smsg -> do
240             debug $ "Received from [" ++ CS.unpack ident ++ "]: "
241                         ++ take 20 (show smsg)
242             return (ident, smsg)
243
244 -- | Open the Master state.
245 --
246 -- The directory for the local state files is the default one ("state/[typeOf state]/").
247 openMasterState :: (IsAcidic st, Typeable st) =>
248                String       -- ^ address to bind (useful to listen on specific interfaces only)
249             -> PortNumber   -- ^ port to bind to
250             -> st           -- ^ initial state
251             -> IO (AcidState st)
252 openMasterState address port initialState =
253     openMasterStateFrom ("state" </> show (typeOf initialState)) address port initialState
254
255 -- | Open the master state from a specific location.
256 openMasterStateFrom :: (IsAcidic st, Typeable st) =>
257                FilePath     -- ^ location of the local state files
258             -> String       -- ^ address to bind (useful to listen on specific interfaces only)
259             -> PortNumber   -- ^ port to bind to
260             -> st           -- ^ initial state
261             -> IO (AcidState st)
262 openMasterStateFrom directory address port =
263     openRedMasterStateFrom directory address port 0
264
265 -- | Open the master state with /n/-redundant replication.
266 --
267 -- The directory for the local state files is the default one ("state/[typeOf
268 -- state]/").
269 openRedMasterState :: (IsAcidic st, Typeable st) =>
270                String       -- ^ address to bind (useful to listen on specific interfaces only)
271             -> PortNumber   -- ^ port to bind to
272             -> Int          -- ^ guarantee n-redundant replication
273             -> st           -- ^ initial state
274             -> IO (AcidState st)
275 openRedMasterState address port red initialState =
276     openRedMasterStateFrom ("state" </> show (typeOf initialState)) address port red initialState
277
278 -- | Open the master state from a specific location with redundant replication.
279 openRedMasterStateFrom :: (IsAcidic st, Typeable st) =>
280                FilePath     -- ^ location of the local state files
281             -> String       -- ^ address to bind (useful to listen on specific interfaces only)
282             -> PortNumber   -- ^ port to bind to
283             -> Int          -- ^ guarantee /n/-redundant replication
284             -> st           -- ^ initial state
285             -> IO (AcidState st)
286 openRedMasterStateFrom directory address port red initialState = do
287         debug "opening master state"
288         -- local
289         lst <- openLocalStateFrom directory initialState
290         let levs = localEvents $ downcast lst
291         lrev <- atomically $ readTVar $ logNextEntryId levs
292         rev <- newMVar lrev
293         revN <- newMVar lrev
294         repChan <- newChan
295         repChanN <- dupChan repChan
296         repFin <- newMVar IM.empty
297         ns <- newMVar M.empty
298         -- remote
299         let addr = "tcp://" ++ address ++ ":" ++ show port
300         ctx <- context
301         sock <- socket ctx Router
302         setReceiveHighWM (restrict (100*1000 :: Int)) sock
303         setSendHighWM (restrict (100*1000 :: Int)) sock
304         bind sock addr
305         msock <- newMVar sock
306         repTidL <- newEmptyMVar
307         repTidN <- newEmptyMVar
308         reqTid <- newEmptyMVar
309         parTid <- myThreadId
310         lock <- newEmptyMVar
311         let masterState = MasterState { localState = lst
312                                       , nodeStatus = ns
313                                       , repRedundancy = red
314                                       , repFinalizers = repFin
315                                       , masterStateLock = lock
316                                       , masterRevision = rev
317                                       , masterRevisionN = revN
318                                       , masterReplicationChan = repChan
319                                       , masterReplicationChanN = repChanN
320                                       , masterRepLThreadId = repTidL
321                                       , masterRepNThreadId = repTidN
322                                       , masterReqThreadId = reqTid
323                                       , masterParentThreadId = parTid
324                                       , zmqContext = ctx
325                                       , zmqAddr = addr
326                                       , zmqSocket = msock
327                                       }
328         void $ forkIO $ masterRequestHandler masterState
329         void $ forkIO $ masterReplicationHandlerL masterState
330         void $ forkIO $ masterReplicationHandlerN masterState
331         return $ toAcidState masterState
332
333 -- | Close the master state.
334 closeMasterState :: MasterState st -> IO ()
335 closeMasterState MasterState{..} =
336     -- disallow requests
337     whenM (tryPutMVar masterStateLock ()) $ do
338         debug "Closing master state."
339         -- send nodes quit
340         debug "Nodes quitting."
341         withMVar nodeStatus $ mapM_ (sendToSlave zmqSocket MasterQuit) . M.keys
342         -- wait all nodes done
343         waitPollN 100 1000 (withMVar nodeStatus (return . M.null))
344         -- wait replication chan
345         debug "Waiting for repChans to empty."
346         writeChan masterReplicationChan RIEnd
347         mtid <- myThreadId
348         putMVar masterRepLThreadId mtid
349         putMVar masterRepNThreadId mtid
350         -- kill handler
351         debug "Killing request handler."
352         withMVar masterReqThreadId $ flip throwTo GracefulExit
353         -- cleanup zmq
354         debug "Closing down zmq."
355         withMVar zmqSocket $ \sock -> do
356             unbind sock zmqAddr
357             close sock
358         term zmqContext
359         -- cleanup local state
360         closeAcidState localState
361
362 -- | Update on master site.
363 scheduleMasterUpdate :: (UpdateEvent event, Typeable (EventState event)) => MasterState (EventState event) -> event -> IO (MVar (EventResult event))
364 scheduleMasterUpdate masterState@MasterState{..} event = do
365         debug "Update by Master."
366         unlocked <- isEmptyMVar masterStateLock
367         if not unlocked then error "State is locked!"
368         else do
369             result <- newEmptyMVar
370             let callback = if repRedundancy > 1
371                 then
372                     -- the returned action fills in result when executed later
373                     scheduleLocalUpdate' (downcast localState) event result
374                 else do
375                     hd <- scheduleUpdate localState event
376                     void $ forkIO (putMVar result =<< takeMVar hd)
377                     return (return ())      -- bogus finalizer
378             let encoded = runPutLazy (safePut event)
379             queueRepItem masterState (RIUpdate (methodTag event, encoded) (Left callback))
380             return result
381
382 -- | Queue an RepItem (originating from the Master itself of an Slave via zmq)
383 queueRepItem :: MasterState st -> ReplicationItem -> IO ()
384 queueRepItem MasterState{..} = writeChan masterReplicationChan
385
386 -- | The local replication handler. Takes care to run Updates locally.
387 masterReplicationHandlerL :: (Typeable st) => MasterState st -> IO ()
388 masterReplicationHandlerL MasterState{..} = do
389     mtid <- myThreadId
390     putMVar masterRepLThreadId mtid
391     let loop = handle (\e -> throwTo masterParentThreadId (e :: SomeException)) $ do
392             debug "Replicating next item locally."
393             repItem <- readChan masterReplicationChan
394             case repItem of
395                 RIEnd -> return ()
396                 RIArchive -> do
397                     debug "Archive on master."
398                     createArchive localState
399                     loop
400                 RICheckpoint -> do
401                     debug "Checkpoint on master."
402                     createCheckpoint localState
403                     loop
404                 RIUpdate event sink -> do
405                     if repRedundancy > 1
406                     then do
407                         (rev, act) <- modifyMVar masterRevision $ \r -> do
408                             a <- case sink of
409                                 Left callback   -> callback
410                                 _               -> liftM snd $ scheduleLocalColdUpdate' (downcast localState) event
411                             return (r+1,(r+1,a))
412                         -- act finalizes the transaction - will be run after full replication
413                         modifyMVar_ repFinalizers $ return . IM.insert rev act
414                     else
415                         modifyMVar_ masterRevision $ \r -> do
416                             case sink of
417                                 Left callback   -> void callback
418                                 _               -> void $ scheduleColdUpdate localState event
419                             return (r+1)
420                     loop
421     loop
422     -- signal that we're done
423     void $ takeMVar masterRepLThreadId
424
425 -- | The network replication handler. Takes care to run Updates on Slaves.
426 masterReplicationHandlerN :: MasterState st -> IO ()
427 masterReplicationHandlerN MasterState{..} = do
428     mtid <- myThreadId
429     putMVar masterRepNThreadId mtid
430     let loop = handle (\e -> throwTo masterParentThreadId (e :: SomeException)) $ do
431             debug "Replicating next item in network."
432             repItem <- readChan masterReplicationChanN
433             case repItem of
434                 RIEnd -> return ()
435                 RIArchive -> do
436                     withMVar nodeStatus $ \ns -> do
437                         debug "Sending archive request to Slaves."
438                         withMVar masterRevisionN $ \mr ->
439                             forM_ (M.keys ns) $ sendArchive zmqSocket mr
440                     loop
441                 RICheckpoint -> do
442                     withMVar nodeStatus $ \ns -> do
443                         debug "Sending Checkpoint Request to Slaves."
444                         withMVar masterRevisionN $ \mr ->
445                             forM_ (M.keys ns) $ sendCheckpoint zmqSocket mr
446                     loop
447                 RIUpdate event sink -> do
448                     withMVar nodeStatus $ \ns -> do
449                         debug $ "Sending Update to Slaves, there are " ++ show (M.size ns)
450                         modifyMVar_ masterRevisionN $ \mrOld -> do
451                             let mr = mrOld + 1
452                             case sink of
453                                 Left _ -> forM_ (M.keys $ M.filter (<mr) ns) $ sendUpdate zmqSocket mr Nothing event
454                                 Right (reqID, reqNodeIdent) -> do
455                                     let noReqSlaves = filter (/= reqNodeIdent) $ M.keys $ M.filter (<mr) ns
456                                     sendUpdate zmqSocket mr (Just reqID) event reqNodeIdent
457                                     forM_ noReqSlaves $ sendUpdate zmqSocket mr Nothing event
458                             return mr
459                     loop
460     loop
461     -- signal that we're done
462     void $ takeMVar masterRepNThreadId
463     where
464         sendUpdate sock revision reqId encoded =
465             sendToSlave sock (DoRep revision reqId encoded)
466         sendCheckpoint sock revision = sendToSlave sock (DoCheckpoint revision)
467         sendArchive sock revision = sendToSlave sock (DoArchive revision)
468
469 -- | Create a checkpoint (on all nodes, per request).
470 --   This is useful for faster resume of both the Master (at startup) and
471 --   Slaves (at startup and reconnect).
472 createMasterCheckpoint :: MasterState st -> IO ()
473 createMasterCheckpoint masterState@MasterState{..} = do
474     debug "Checkpoint."
475     unlocked <- isEmptyMVar masterStateLock
476     unless unlocked $ error "State is locked."
477     queueRepItem masterState RICheckpoint
478
479 -- | Create an archive on all nodes.
480 --   Usually createArchive (local to each node) is appropriate.
481 --   Also take care: Nodes that are not connected at the time, will not create
482 --   an archive (on reconnect).
483 createArchiveGlobally :: (IsAcidic st, Typeable st) => AcidState st -> IO ()
484 createArchiveGlobally acid = do
485     debug "Archive globally."
486     let masterState = downcast acid
487     queueRepItem masterState RIArchive
488
489
490 toAcidState :: (IsAcidic st, Typeable st) => MasterState st -> AcidState st
491 toAcidState master
492   = AcidState { _scheduleUpdate    = scheduleMasterUpdate master
493               , scheduleColdUpdate = scheduleColdUpdate $ localState master
494               , _query             = query $ localState master
495               , queryCold          = queryCold $ localState master
496               , createCheckpoint   = createMasterCheckpoint master
497               , createArchive      = createArchive $ localState master
498               , closeAcidState     = closeMasterState master
499               , acidSubState       = mkAnyState master
500               }
501