022dee9d0763b48ff1ed5b370e0b5b043eb5d53a
[acid-state-dist.git] / src / Data / Acid / Centered / Master.hs
1 {-# LANGUAGE DeriveDataTypeable, RecordWildCards, FlexibleContexts #-}
2 --------------------------------------------------------------------------------
3 {- |
4   Module      :  Data.Acid.Centered.Master
5   Copyright   :  ?
6
7   Maintainer  :  max.voit+hdv@with-eyes.net
8   Portability :  non-portable (uses GHC extensions)
9
10   The Master part of the Centered replication backend for acid state.
11
12 -}
13 {- big chunks still todo:
14     o authentification
15     o encryption
16 -}
17 module Data.Acid.Centered.Master
18     (
19       openMasterState
20     , openMasterStateFrom
21     , openRedMasterState
22     , openRedMasterStateFrom
23     , createArchiveGlobally
24     , MasterState(..)
25     ) where
26
27 import Data.Typeable
28 import Data.SafeCopy
29
30 import Data.Acid
31 import Data.Acid.Core
32 import Data.Acid.Abstract
33 import Data.Acid.Local
34 import Data.Acid.Log
35 import Data.Serialize (decode, encode, runPutLazy)
36
37 import Data.Acid.Centered.Common
38
39 import Control.Concurrent (forkIO, ThreadId, myThreadId)
40 import Control.Concurrent.Chan (Chan, newChan, writeChan, readChan, dupChan)
41 import Control.Monad (when, unless, void, forM_, liftM2, liftM)
42 import Control.Monad.STM (atomically)
43 import Control.Concurrent.STM.TVar (readTVar)
44 import Control.Concurrent.MVar(MVar, newMVar, newEmptyMVar,
45                                takeMVar, putMVar, isEmptyMVar,
46                                modifyMVar, modifyMVar_, withMVar)
47 import Control.Exception (handle, throwTo, SomeException)
48
49 import System.ZMQ4 (Context, Socket, Router(..), Receiver,
50                     setReceiveHighWM, setSendHighWM, restrict,
51                     context, term, socket, close, bind, unbind,
52                     poll, Poll(..), Event(..),
53                     sendMulti, receiveMulti)
54 import System.FilePath ( (</>) )
55
56 import qualified Data.Map as M
57 import Data.Map (Map)
58 import qualified Data.IntMap as IM
59 import Data.IntMap (IntMap)
60 import qualified Data.ByteString.Lazy.Char8 as CSL
61 import qualified Data.ByteString.Char8 as CS
62 import Data.ByteString.Char8 (ByteString)
63 import qualified Data.List.NonEmpty as NEL
64 import Safe (headDef)
65
66 --------------------------------------------------------------------------------
67
68 -- | Master state structure, for internal use.
69 data MasterState st
70     = MasterState { localState :: AcidState st
71                   , nodeStatus :: MVar NodeStatus
72                   , repRedundancy :: Int
73                   , repFinalizers :: MVar (IntMap (IO ()))
74                   , masterStateLock :: MVar ()
75                   , masterRevision :: MVar NodeRevision
76                   , masterRevisionN :: MVar NodeRevision
77                   , masterReplicationChan :: Chan ReplicationItem
78                   , masterReplicationChanN :: Chan ReplicationItem
79                   , masterReqThreadId :: MVar ThreadId
80                   , masterRepLThreadId :: MVar ThreadId
81                   , masterRepNThreadId :: MVar ThreadId
82                   , masterParentThreadId :: ThreadId
83                   , zmqContext :: Context
84                   , zmqAddr :: String
85                   , zmqSocket :: MVar (Socket Router)
86                   } deriving (Typeable)
87
88 type NodeIdentity = ByteString
89 type NodeStatus = Map NodeIdentity NodeRevision
90 type Callback = IO (IO ())      -- an IO action that returns a finalizer
91 data ReplicationItem =
92       RIEnd
93     | RICheckpoint
94     | RIArchive
95     | RIUpdate (Tagged CSL.ByteString) (Either Callback (RequestID, NodeIdentity))
96
97 -- | The request handler on master node. Does
98 --      o handle receiving requests from nodes,
99 --      o answering as needed (old updates),
100 --      o bookkeeping on node states.
101 masterRequestHandler :: (IsAcidic st, Typeable st) => MasterState st -> IO ()
102 masterRequestHandler masterState@MasterState{..} = do
103     mtid <- myThreadId
104     putMVar masterReqThreadId mtid
105     let loop = handle (\e -> throwTo masterParentThreadId (e :: SomeException)) $
106           handle killHandler $ do
107             -- take one frame
108             -- waitRead =<< readMVar zmqSocket
109             -- FIXME: we needn't poll if not for strange zmq behaviour
110             re <- withMVar zmqSocket $ \sock -> poll 100 [Sock sock [In] Nothing]
111             unless (null $ head re) $ do
112                 (ident, msg) <- withMVar zmqSocket receiveFrame
113                 -- handle according frame contents
114                 case msg of
115                     -- New Slave joined.
116                     NewSlave r -> connectNode masterState ident r
117                     -- Slave is done replicating.
118                     RepDone r -> whenM (identityIsValid ident) $
119                         updateNodeStatus masterState ident r
120                     -- Slave sends an Udate.
121                     ReqUpdate rid event -> whenM (identityIsValid ident) $
122                         queueRepItem masterState (RIUpdate event (Right (rid, ident)))
123                     -- Slave quits.
124                     SlaveQuit -> do
125                         sendToSlave zmqSocket MayQuit ident
126                         removeFromNodeStatus nodeStatus ident
127                     RepError -> do
128                         sendToSlave zmqSocket MayQuit ident
129                         removeFromNodeStatus nodeStatus ident
130                     -- no other messages possible
131             loop
132     loop
133     where
134         killHandler :: AcidException -> IO ()
135         killHandler GracefulExit = return ()
136         identityIsValid i = do
137             isMember <- withMVar nodeStatus $ return . (i `M.member`)
138             if isMember then return True
139             else do
140                 debug $ "Request by unknown node [" ++ CS.unpack i ++ "]"
141                 sendToSlave zmqSocket MayQuit i
142                 return False
143
144 -- | Remove a Slave node from NodeStatus.
145 removeFromNodeStatus :: MVar NodeStatus -> NodeIdentity -> IO ()
146 removeFromNodeStatus nodeStatus ident =
147         modifyMVar_ nodeStatus $ return . M.delete ident
148
149 -- | Update the NodeStatus after a node has replicated an Update.
150 updateNodeStatus :: MasterState st -> NodeIdentity -> Int -> IO ()
151 updateNodeStatus MasterState{..} ident r =
152     modifyMVar_ nodeStatus $ \ns -> do
153         when (ns M.! ident /= (r - 1)) $
154             error $ "Invalid increment of node status " ++ show (ns M.! ident) ++ " -> " ++ show r
155         let rns = M.adjust (+1) ident ns
156         -- only for redundant operation:
157         when ((repRedundancy > 1) && (M.size (M.filter (>=r) rns) >= (repRedundancy - 1))) $ do
158             debug $ "Full replication of " ++ show r
159             -- finalize local replication
160             modifyMVar_ repFinalizers $ \rf -> do
161                 rf IM.! r
162                 return $ IM.delete r rf
163             -- send out FullRep signal
164             forM_ (M.keys ns) $ sendToSlave zmqSocket (FullRep r)
165         return rns
166
167 -- | Connect a new Slave by getting it up-to-date,
168 --   i.e. send all past events as Updates. This is fire&forget.
169 connectNode :: (IsAcidic st, Typeable st) => MasterState st -> NodeIdentity -> Revision -> IO ()
170 connectNode MasterState{..} i revision =
171     -- locking masterRevision prohibits additional events written on disk
172     withMVar masterRevision $ \mr ->
173         modifyMVar_ nodeStatus $ \ns -> do
174             -- crc generated from localCore thus corresponds to disk
175             crc <- crcOfState localState
176             -- if there has been one/more checkpoint in between:
177             lastCp <- getLastCheckpointRev localState
178             let lastCpRev = cpRevision lastCp
179             debug $ "Found checkpoint at revision " ++ show lastCpRev
180             if lastCpRev > revision then do
181                 -- send last checkpoint and newer events
182                 sendSyncCheckpoint zmqSocket lastCp i
183                 pastUpdates <- getPastUpdates localState lastCpRev
184                 forM_ pastUpdates $ \(r, u) -> sendSyncUpdate zmqSocket r u i
185             else do
186                 -- just the events
187                 pastUpdates <- getPastUpdates localState revision
188                 forM_ pastUpdates $ \(r, u) -> sendSyncUpdate zmqSocket r u i
189             sendToSlave zmqSocket (SyncDone crc) i
190             let nns = M.insert i mr ns
191             -- only for redundant operation:
192             when (repRedundancy > 1) $ checkRepStatus mr nns
193             return nns
194     where
195         cpRevision (Checkpoint r _) = r
196         sendSyncCheckpoint sock (Checkpoint cr encoded) =
197             sendToSlave sock (DoSyncCheckpoint cr encoded)
198         sendSyncUpdate sock r encoded =
199             sendToSlave sock (DoSyncRep r encoded)
200         -- FIXME: do this better (less than maxRev is possible in corner cases)
201         checkRepStatus maxRev pns =
202             when (M.size (M.filter (>= maxRev) pns) >= (repRedundancy-2)) $ do
203                 debug $ "Full replication up to " ++ show maxRev
204                 -- finalize local replication
205                 modifyMVar_ repFinalizers $ \rf -> do
206                     forM_ (filter (<= maxRev) (IM.keys rf)) $ \r -> rf IM.! r
207                     return $ IM.filterWithKey (\k _ -> k > maxRev) rf
208                 -- send out FullRep signal
209                 forM_ (M.keys pns) $ sendToSlave zmqSocket (FullRepTo maxRev)
210
211
212 -- | Fetch past Updates from FileLog for replication.
213 getPastUpdates :: (Typeable st) => AcidState st -> Int -> IO [(Int, Tagged CSL.ByteString)]
214 getPastUpdates state startRev = liftM2 zip (return [(startRev+1)..]) (readEntriesFrom (localEvents $ downcast state) startRev)
215
216 -- | Get the revision at which the last checkpoint was taken.
217 getLastCheckpointRev :: (Typeable st) => AcidState st -> IO Checkpoint
218 getLastCheckpointRev state = do
219     let lst = downcast state
220     let cplog = localCheckpoints lst
221     nextId <- atomically $ readTVar $ logNextEntryId cplog
222     cps <- readEntriesFrom cplog (nextId - 1)
223     return $ headDef (Checkpoint 0 CSL.empty) cps
224
225 -- | Send a message to a Slave
226 sendToSlave :: MVar (Socket Router) -> MasterMessage -> NodeIdentity -> IO ()
227 sendToSlave msock msg ident = withMVar msock $ \sock -> sendMulti sock $ NEL.fromList [ident, encode msg]
228
229 -- | Receive one Frame. A Frame consists of three messages:
230 --      sender ID, empty message, and actual content
231 receiveFrame :: (Receiver t) => Socket t -> IO (NodeIdentity, SlaveMessage)
232 receiveFrame sock = do
233     list <- receiveMulti sock
234     when (length list /= 2) $ error "Received invalid frame."
235     let ident = head list
236     let msg = list !! 1
237     case decode msg of
238         Left str -> error $ "Data.Serialize.decode failed on SlaveMessage: " ++ show str
239         Right smsg -> do
240             debug $ "Received from [" ++ CS.unpack ident ++ "]: "
241                         ++ take 20 (show smsg)
242             return (ident, smsg)
243
244 -- | Open the Master state.
245 --
246 -- The directory for the local state files is the default one ("state/[typeOf state]/").
247 openMasterState :: (IsAcidic st, Typeable st) =>
248                String       -- ^ address to bind (useful to listen on specific interfaces only)
249             -> PortNumber   -- ^ port to bind to
250             -> st           -- ^ initial state
251             -> IO (AcidState st)
252 openMasterState address port initialState =
253     openMasterStateFrom ("state" </> show (typeOf initialState)) address port initialState
254
255 -- | Open the master state from a specific location.
256 openMasterStateFrom :: (IsAcidic st, Typeable st) =>
257                FilePath     -- ^ location of the local state files
258             -> String       -- ^ address to bind (useful to listen on specific interfaces only)
259             -> PortNumber   -- ^ port to bind to
260             -> st           -- ^ initial state
261             -> IO (AcidState st)
262 openMasterStateFrom directory address port =
263     openRedMasterStateFrom directory address port 0
264
265 -- | Open the master state with /n/-redundant replication.
266 --
267 -- The directory for the local state files is the default one ("state/[typeOf
268 -- state]/").
269 openRedMasterState :: (IsAcidic st, Typeable st) =>
270                String       -- ^ address to bind (useful to listen on specific interfaces only)
271             -> PortNumber   -- ^ port to bind to
272             -> Int          -- ^ guarantee n-redundant replication
273             -> st           -- ^ initial state
274             -> IO (AcidState st)
275 openRedMasterState address port red initialState =
276     openRedMasterStateFrom ("state" </> show (typeOf initialState)) address port red initialState
277
278 -- | Open the master state from a specific location with redundant replication.
279 openRedMasterStateFrom :: (IsAcidic st, Typeable st) =>
280                FilePath     -- ^ location of the local state files
281             -> String       -- ^ address to bind (useful to listen on specific interfaces only)
282             -> PortNumber   -- ^ port to bind to
283             -> Int          -- ^ guarantee /n/-redundant replication
284             -> st           -- ^ initial state
285             -> IO (AcidState st)
286 openRedMasterStateFrom directory address port red initialState = do
287         debug "opening master state"
288         -- local
289         lst <- openLocalStateFrom directory initialState
290         let levs = localEvents $ downcast lst
291         lrev <- atomically $ readTVar $ logNextEntryId levs
292         rev <- newMVar lrev
293         revN <- newMVar lrev
294         repChan <- newChan
295         repChanN <- dupChan repChan
296         repFin <- newMVar IM.empty
297         ns <- newMVar M.empty
298         -- remote
299         let addr = "tcp://" ++ address ++ ":" ++ show port
300         ctx <- context
301         sock <- socket ctx Router
302         setReceiveHighWM (restrict (100*1000 :: Int)) sock
303         setSendHighWM (restrict (100*1000 :: Int)) sock
304         bind sock addr
305         msock <- newMVar sock
306         repTidL <- newEmptyMVar
307         repTidN <- newEmptyMVar
308         reqTid <- newEmptyMVar
309         parTid <- myThreadId
310         lock <- newEmptyMVar
311         let masterState = MasterState { localState = lst
312                                       , nodeStatus = ns
313                                       , repRedundancy = red
314                                       , repFinalizers = repFin
315                                       , masterStateLock = lock
316                                       , masterRevision = rev
317                                       , masterRevisionN = revN
318                                       , masterReplicationChan = repChan
319                                       , masterReplicationChanN = repChanN
320                                       , masterRepLThreadId = repTidL
321                                       , masterRepNThreadId = repTidN
322                                       , masterReqThreadId = reqTid
323                                       , masterParentThreadId = parTid
324                                       , zmqContext = ctx
325                                       , zmqAddr = addr
326                                       , zmqSocket = msock
327                                       }
328         void $ forkIO $ masterRequestHandler masterState
329         void $ forkIO $ masterReplicationHandlerL masterState
330         void $ forkIO $ masterReplicationHandlerN masterState
331         return $ toAcidState masterState
332
333 -- | Close the master state.
334 closeMasterState :: MasterState st -> IO ()
335 closeMasterState MasterState{..} = do
336         debug "Closing master state."
337         -- disallow requests
338         putMVar masterStateLock ()
339         -- send nodes quit
340         debug "Nodes to quitting."
341         withMVar nodeStatus $ mapM_ (sendToSlave zmqSocket MasterQuit) . M.keys
342         -- wait all nodes done
343         waitPoll 100 (withMVar nodeStatus (return . M.null))
344         -- todo: this could use a timeout, there may be zombies
345         -- wait replication chan
346         debug "Waiting for repChans to empty."
347         writeChan masterReplicationChan RIEnd
348         mtid <- myThreadId
349         putMVar masterRepLThreadId mtid
350         putMVar masterRepNThreadId mtid
351         -- kill handler
352         debug "Killing request handler."
353         withMVar masterReqThreadId $ flip throwTo GracefulExit
354         -- cleanup zmq
355         debug "Closing down zmq."
356         withMVar zmqSocket $ \sock -> do
357             unbind sock zmqAddr
358             close sock
359         term zmqContext
360         -- cleanup local state
361         closeAcidState localState
362
363 -- | Update on master site.
364 scheduleMasterUpdate :: (UpdateEvent event, Typeable (EventState event)) => MasterState (EventState event) -> event -> IO (MVar (EventResult event))
365 scheduleMasterUpdate masterState@MasterState{..} event = do
366         debug "Update by Master."
367         unlocked <- isEmptyMVar masterStateLock
368         if not unlocked then error "State is locked!"
369         else do
370             result <- newEmptyMVar
371             let callback = if repRedundancy > 1
372                 then
373                     -- the returned action fills in result when executed later
374                     scheduleLocalUpdate' (downcast localState) event result
375                 else do
376                     hd <- scheduleUpdate localState event
377                     void $ forkIO (putMVar result =<< takeMVar hd)
378                     return (return ())      -- bogus finalizer
379             let encoded = runPutLazy (safePut event)
380             queueRepItem masterState (RIUpdate (methodTag event, encoded) (Left callback))
381             return result
382
383 -- | Queue an RepItem (originating from the Master itself of an Slave via zmq)
384 queueRepItem :: MasterState st -> ReplicationItem -> IO ()
385 queueRepItem MasterState{..} = writeChan masterReplicationChan
386
387 -- | The local replication handler. Takes care to run Updates locally.
388 masterReplicationHandlerL :: (Typeable st) => MasterState st -> IO ()
389 masterReplicationHandlerL MasterState{..} = do
390     mtid <- myThreadId
391     putMVar masterRepLThreadId mtid
392     let loop = handle (\e -> throwTo masterParentThreadId (e :: SomeException)) $ do
393             debug "Replicating next item locally."
394             repItem <- readChan masterReplicationChan
395             case repItem of
396                 RIEnd -> return ()
397                 RIArchive -> do
398                     debug "Archive on master."
399                     createArchive localState
400                     loop
401                 RICheckpoint -> do
402                     debug "Checkpoint on master."
403                     createCheckpoint localState
404                     loop
405                 RIUpdate event sink -> do
406                     if repRedundancy > 1
407                     then do
408                         (rev, act) <- modifyMVar masterRevision $ \r -> do
409                             a <- case sink of
410                                 Left callback   -> callback
411                                 _               -> liftM snd $ scheduleLocalColdUpdate' (downcast localState) event
412                             return (r+1,(r+1,a))
413                         -- act finalizes the transaction - will be run after full replication
414                         modifyMVar_ repFinalizers $ return . IM.insert rev act
415                     else
416                         modifyMVar_ masterRevision $ \r -> do
417                             case sink of
418                                 Left callback   -> void callback
419                                 _               -> void $ scheduleColdUpdate localState event
420                             return (r+1)
421                     loop
422     loop
423     -- signal that we're done
424     void $ takeMVar masterRepLThreadId
425
426 -- | The network replication handler. Takes care to run Updates on Slaves.
427 masterReplicationHandlerN :: MasterState st -> IO ()
428 masterReplicationHandlerN MasterState{..} = do
429     mtid <- myThreadId
430     putMVar masterRepNThreadId mtid
431     let loop = handle (\e -> throwTo masterParentThreadId (e :: SomeException)) $ do
432             debug "Replicating next item in network."
433             repItem <- readChan masterReplicationChanN
434             case repItem of
435                 RIEnd -> return ()
436                 RIArchive -> do
437                     withMVar nodeStatus $ \ns -> do
438                         debug "Sending archive request to Slaves."
439                         withMVar masterRevisionN $ \mr ->
440                             forM_ (M.keys ns) $ sendArchive zmqSocket mr
441                     loop
442                 RICheckpoint -> do
443                     withMVar nodeStatus $ \ns -> do
444                         debug "Sending Checkpoint Request to Slaves."
445                         withMVar masterRevisionN $ \mr ->
446                             forM_ (M.keys ns) $ sendCheckpoint zmqSocket mr
447                     loop
448                 RIUpdate event sink -> do
449                     withMVar nodeStatus $ \ns -> do
450                         debug $ "Sending Update to Slaves, there are " ++ show (M.size ns)
451                         modifyMVar_ masterRevisionN $ \mrOld -> do
452                             let mr = mrOld + 1
453                             case sink of
454                                 Left _ -> forM_ (M.keys $ M.filter (<mr) ns) $ sendUpdate zmqSocket mr Nothing event
455                                 Right (reqID, reqNodeIdent) -> do
456                                     let noReqSlaves = filter (/= reqNodeIdent) $ M.keys $ M.filter (<mr) ns
457                                     sendUpdate zmqSocket mr (Just reqID) event reqNodeIdent
458                                     forM_ noReqSlaves $ sendUpdate zmqSocket mr Nothing event
459                             return mr
460                     loop
461     loop
462     -- signal that we're done
463     void $ takeMVar masterRepNThreadId
464     where
465         sendUpdate sock revision reqId encoded =
466             sendToSlave sock (DoRep revision reqId encoded)
467         sendCheckpoint sock revision = sendToSlave sock (DoCheckpoint revision)
468         sendArchive sock revision = sendToSlave sock (DoArchive revision)
469
470 -- | Create a checkpoint (on all nodes, per request).
471 --   This is useful for faster resume of both the Master (at startup) and
472 --   Slaves (at startup and reconnect).
473 createMasterCheckpoint :: MasterState st -> IO ()
474 createMasterCheckpoint masterState@MasterState{..} = do
475     debug "Checkpoint."
476     unlocked <- isEmptyMVar masterStateLock
477     unless unlocked $ error "State is locked."
478     queueRepItem masterState RICheckpoint
479
480 -- | Create an archive on all nodes.
481 --   Usually createArchive (local to each node) is appropriate.
482 --   Also take care: Nodes that are not connected at the time, will not create
483 --   an archive (on reconnect).
484 createArchiveGlobally :: (IsAcidic st, Typeable st) => AcidState st -> IO ()
485 createArchiveGlobally acid = do
486     debug "Archive globally."
487     let masterState = downcast acid
488     queueRepItem masterState RIArchive
489
490
491 toAcidState :: (IsAcidic st, Typeable st) => MasterState st -> AcidState st
492 toAcidState master
493   = AcidState { _scheduleUpdate    = scheduleMasterUpdate master
494               , scheduleColdUpdate = scheduleColdUpdate $ localState master
495               , _query             = query $ localState master
496               , queryCold          = queryCold $ localState master
497               , createCheckpoint   = createMasterCheckpoint master
498               , createArchive      = createArchive $ localState master
499               , closeAcidState     = closeMasterState master
500               , acidSubState       = mkAnyState master
501               }
502