75f7c30202b4c3dd17ba4031a6f0c8301a15a9d8
[acid-state-dist.git] / src / Data / Acid / Centered / Master.hs
1 {-# LANGUAGE DeriveDataTypeable, RecordWildCards, FlexibleContexts #-}
2 --------------------------------------------------------------------------------
3 {- |
4   Module      :  Data.Acid.Centered.Master
5   Copyright   :  ?
6
7   Maintainer  :  max.voit+hdv@with-eyes.net
8   Portability :  non-portable (uses GHC extensions)
9
10   The Master part of the Centered replication backend for acid state.
11
12 -}
13 {- big chunks still todo:
14     o authentification
15     o encryption
16 -}
17 module Data.Acid.Centered.Master
18     (
19       openMasterState
20     , openMasterStateFrom
21     , openRedMasterState
22     , openRedMasterStateFrom
23     , createArchiveGlobally
24     , MasterState(..)
25     ) where
26
27 import Data.Typeable
28 import Data.SafeCopy
29 import Data.Serialize (decode, encode, runPutLazy)
30
31 import Data.Acid
32 import Data.Acid.Core
33 import Data.Acid.Abstract
34 import Data.Acid.Local
35 import Data.Acid.Log
36
37 import Data.Acid.Centered.Common
38
39 import Control.Concurrent (forkIO, ThreadId, myThreadId)
40 import Control.Concurrent.Chan (Chan, newChan, writeChan, readChan, dupChan)
41 import Control.Concurrent.STM.TVar (readTVar)
42 import Control.Concurrent.MVar(MVar, newMVar, newEmptyMVar,
43                                takeMVar, putMVar, tryPutMVar, isEmptyMVar,
44                                modifyMVar, modifyMVar_, withMVar)
45
46 import Control.Monad.STM (atomically)
47 import Control.Monad (when, unless, void, forM_, liftM2)
48 import Control.Exception (handle, throwTo, SomeException)
49
50 import System.ZMQ4 (Context, Socket, Router(..), Receiver,
51                     setReceiveHighWM, setSendHighWM, restrict,
52                     context, term, socket, close, bind, unbind,
53                     poll, Poll(..), Event(..),
54                     sendMulti, receiveMulti)
55 import System.FilePath ( (</>) )
56
57 import qualified Data.ByteString.Lazy.Char8 as CSL
58 import           Data.ByteString.Lazy.Char8 (ByteString)
59 import qualified Data.ByteString.Char8 as CS
60
61 import qualified Data.Map as M
62 import           Data.Map (Map)
63 import qualified Data.IntMap as IM
64 import           Data.IntMap (IntMap)
65 import qualified Data.List.NonEmpty as NEL
66 import Safe (headDef)
67
68 --------------------------------------------------------------------------------
69
70 -- | Master state structure, for internal use.
71 data MasterState st
72     = MasterState { localState :: AcidState st
73                   , nodeStatus :: MVar NodeStatus
74                   , repRedundancy :: Int
75                   , repFinalizers :: MVar (IntMap (IO ()))
76                   , masterStateLock :: MVar ()
77                   , masterRevision :: MVar NodeRevision
78                   , masterRevisionN :: MVar NodeRevision
79                   , masterReplicationChan :: Chan ReplicationItem
80                   , masterReplicationChanN :: Chan ReplicationItem
81                   , masterReqThreadId :: MVar ThreadId
82                   , masterRepLThreadId :: MVar ThreadId
83                   , masterRepNThreadId :: MVar ThreadId
84                   , masterParentThreadId :: ThreadId
85                   , zmqContext :: Context
86                   , zmqAddr :: String
87                   , zmqSocket :: MVar (Socket Router)
88                   } deriving (Typeable)
89
90 type NodeIdentity = CS.ByteString
91 type NodeStatus = Map NodeIdentity NodeRevision
92 type Callback = IO (IO ())      -- an IO action that returns a finalizer
93 data ReplicationItem =
94       RIEnd
95     | RICheckpoint
96     | RIArchive
97     | RIUpdate (Tagged ByteString) (Either Callback (RequestID, NodeIdentity))
98
99 -- | The request handler on master node. Does
100 --      o handle receiving requests from nodes,
101 --      o answering as needed (old updates),
102 --      o bookkeeping on node states.
103 masterRequestHandler :: (IsAcidic st, Typeable st) => MasterState st -> IO ()
104 masterRequestHandler masterState@MasterState{..} = do
105     mtid <- myThreadId
106     putMVar masterReqThreadId mtid
107     let loop = handle (\e -> throwTo masterParentThreadId (e :: SomeException)) $
108           handle killHandler $ do
109             -- take one frame
110             -- waitRead =<< readMVar zmqSocket
111             -- FIXME: we needn't poll if not for strange zmq behaviour
112             re <- withMVar zmqSocket $ \sock -> poll 100 [Sock sock [In] Nothing]
113             unless (null $ head re) $ do
114                 (ident, msg) <- withMVar zmqSocket receiveFrame
115                 -- handle according frame contents
116                 case msg of
117                     -- New Slave joined.
118                     NewSlave r -> connectNode masterState ident r
119                     -- Slave is done replicating.
120                     RepDone r -> whenM (identityIsValid ident) $
121                         updateNodeStatus masterState ident r
122                     -- Slave sends an Udate.
123                     ReqUpdate rid event -> whenM (identityIsValid ident) $
124                         queueRepItem masterState (RIUpdate event (Right (rid, ident)))
125                     -- Slave quits.
126                     SlaveQuit -> do
127                         sendToSlave zmqSocket MayQuit ident
128                         removeFromNodeStatus nodeStatus ident
129                     RepError -> do
130                         sendToSlave zmqSocket MayQuit ident
131                         removeFromNodeStatus nodeStatus ident
132                     -- no other messages possible
133             loop
134     loop
135     where
136         killHandler :: AcidException -> IO ()
137         killHandler GracefulExit = return ()
138         identityIsValid i = do
139             isMember <- withMVar nodeStatus $ return . (i `M.member`)
140             if isMember then return True
141             else do
142                 debug $ "Request by unknown node [" ++ CS.unpack i ++ "]"
143                 sendToSlave zmqSocket MayQuit i
144                 return False
145
146 -- | Remove a Slave node from NodeStatus.
147 removeFromNodeStatus :: MVar NodeStatus -> NodeIdentity -> IO ()
148 removeFromNodeStatus nodeStatus ident =
149         modifyMVar_ nodeStatus $ return . M.delete ident
150
151 -- | Update the NodeStatus after a node has replicated an Update.
152 updateNodeStatus :: MasterState st -> NodeIdentity -> Int -> IO ()
153 updateNodeStatus MasterState{..} ident r =
154     modifyMVar_ nodeStatus $ \ns -> do
155         when (ns M.! ident /= (r - 1)) $
156             error $ "Invalid increment of node status " ++ show (ns M.! ident) ++ " -> " ++ show r
157         let rns = M.adjust (+1) ident ns
158         -- only for redundant operation:
159         when ((repRedundancy > 1) && (M.size (M.filter (>=r) rns) >= (repRedundancy - 1))) $ do
160             debug $ "Full replication of " ++ show r
161             -- finalize local replication
162             modifyMVar_ repFinalizers $ \rf -> do
163                 rf IM.! r
164                 return $ IM.delete r rf
165             -- send out FullRep signal
166             forM_ (M.keys ns) $ sendToSlave zmqSocket (FullRep r)
167         return rns
168
169 -- | Connect a new Slave by getting it up-to-date,
170 --   i.e. send all past events as Updates. This is fire&forget.
171 connectNode :: (IsAcidic st, Typeable st) => MasterState st -> NodeIdentity -> Revision -> IO ()
172 connectNode MasterState{..} i revision =
173     -- locking masterRevision prohibits additional events written on disk
174     withMVar masterRevision $ \mr ->
175         modifyMVar_ nodeStatus $ \ns -> do
176             -- crc generated from localCore thus corresponds to disk
177             crc <- crcOfState localState
178             -- if there has been one/more checkpoint in between:
179             lastCp <- getLastCheckpointRev localState
180             let lastCpRev = cpRevision lastCp
181             debug $ "Found checkpoint at revision " ++ show lastCpRev
182             if lastCpRev > revision then do
183                 -- send last checkpoint and newer events
184                 sendSyncCheckpoint zmqSocket lastCp i
185                 pastUpdates <- getPastUpdates localState lastCpRev
186                 forM_ pastUpdates $ \(r, u) -> sendSyncUpdate zmqSocket r u i
187             else do
188                 -- just the events
189                 pastUpdates <- getPastUpdates localState revision
190                 forM_ pastUpdates $ \(r, u) -> sendSyncUpdate zmqSocket r u i
191             sendToSlave zmqSocket (SyncDone crc) i
192             let nns = M.insert i mr ns
193             -- only for redundant operation:
194             when (repRedundancy > 1) $ checkRepStatus mr nns
195             return nns
196     where
197         cpRevision (Checkpoint r _) = r
198         sendSyncCheckpoint sock (Checkpoint cr encoded) =
199             sendToSlave sock (DoSyncCheckpoint cr encoded)
200         sendSyncUpdate sock r encoded =
201             sendToSlave sock (DoSyncRep r encoded)
202         -- FIXME: do this better (less than maxRev is possible in corner cases)
203         checkRepStatus maxRev pns =
204             when (M.size (M.filter (>= maxRev) pns) >= (repRedundancy-2)) $ do
205                 debug $ "Full replication up to " ++ show maxRev
206                 -- finalize local replication
207                 modifyMVar_ repFinalizers $ \rf -> do
208                     forM_ (filter (<= maxRev) (IM.keys rf)) $ \r -> rf IM.! r
209                     return $ IM.filterWithKey (\k _ -> k > maxRev) rf
210                 -- send out FullRep signal
211                 forM_ (M.keys pns) $ sendToSlave zmqSocket (FullRepTo maxRev)
212
213
214 -- | Fetch past Updates from FileLog for replication.
215 getPastUpdates :: (Typeable st) => AcidState st -> Int -> IO [(Int, Tagged ByteString)]
216 getPastUpdates state startRev = liftM2 zip (return [(startRev+1)..]) (readEntriesFrom (localEvents $ downcast state) startRev)
217
218 -- | Get the revision at which the last checkpoint was taken.
219 getLastCheckpointRev :: (Typeable st) => AcidState st -> IO Checkpoint
220 getLastCheckpointRev state = do
221     let lst = downcast state
222     let cplog = localCheckpoints lst
223     nextId <- atomically $ readTVar $ logNextEntryId cplog
224     cps <- readEntriesFrom cplog (nextId - 1)
225     return $ headDef (Checkpoint 0 CSL.empty) cps
226
227 -- | Send a message to a Slave
228 sendToSlave :: MVar (Socket Router) -> MasterMessage -> NodeIdentity -> IO ()
229 sendToSlave msock msg ident = withMVar msock $ \sock -> sendMulti sock $ NEL.fromList [ident, encode msg]
230
231 -- | Receive one Frame. A Frame consists of three messages:
232 --      sender ID, empty message, and actual content
233 receiveFrame :: (Receiver t) => Socket t -> IO (NodeIdentity, SlaveMessage)
234 receiveFrame sock = do
235     list <- receiveMulti sock
236     when (length list /= 2) $ error "Received invalid frame."
237     let ident = head list
238     let msg = list !! 1
239     case decode msg of
240         Left str -> error $ "Data.Serialize.decode failed on SlaveMessage: " ++ show str
241         Right smsg -> do
242             debug $ "Received from [" ++ CS.unpack ident ++ "]: "
243                         ++ take 20 (show smsg)
244             return (ident, smsg)
245
246 -- | Open the Master state.
247 --
248 -- The directory for the local state files is the default one ("state/[typeOf state]/").
249 openMasterState :: (IsAcidic st, Typeable st) =>
250                String       -- ^ address to bind (useful to listen on specific interfaces only)
251             -> PortNumber   -- ^ port to bind to
252             -> st           -- ^ initial state
253             -> IO (AcidState st)
254 openMasterState address port initialState =
255     openMasterStateFrom ("state" </> show (typeOf initialState)) address port initialState
256
257 -- | Open the master state from a specific location.
258 openMasterStateFrom :: (IsAcidic st, Typeable st) =>
259                FilePath     -- ^ location of the local state files
260             -> String       -- ^ address to bind (useful to listen on specific interfaces only)
261             -> PortNumber   -- ^ port to bind to
262             -> st           -- ^ initial state
263             -> IO (AcidState st)
264 openMasterStateFrom directory address port =
265     openRedMasterStateFrom directory address port 0
266
267 -- | Open the master state with /n/-redundant replication.
268 --
269 -- The directory for the local state files is the default one ("state/[typeOf
270 -- state]/").
271 openRedMasterState :: (IsAcidic st, Typeable st) =>
272                String       -- ^ address to bind (useful to listen on specific interfaces only)
273             -> PortNumber   -- ^ port to bind to
274             -> Int          -- ^ guarantee n-redundant replication
275             -> st           -- ^ initial state
276             -> IO (AcidState st)
277 openRedMasterState address port red initialState =
278     openRedMasterStateFrom ("state" </> show (typeOf initialState)) address port red initialState
279
280 -- | Open the master state from a specific location with redundant replication.
281 openRedMasterStateFrom :: (IsAcidic st, Typeable st) =>
282                FilePath     -- ^ location of the local state files
283             -> String       -- ^ address to bind (useful to listen on specific interfaces only)
284             -> PortNumber   -- ^ port to bind to
285             -> Int          -- ^ guarantee /n/-redundant replication
286             -> st           -- ^ initial state
287             -> IO (AcidState st)
288 openRedMasterStateFrom directory address port red initialState = do
289         debug "opening master state"
290         -- local
291         lst <- openLocalStateFrom directory initialState
292         let levs = localEvents $ downcast lst
293         lrev <- atomically $ readTVar $ logNextEntryId levs
294         rev <- newMVar lrev
295         revN <- newMVar lrev
296         repChan <- newChan
297         repChanN <- dupChan repChan
298         repFin <- newMVar IM.empty
299         ns <- newMVar M.empty
300         -- remote
301         let addr = "tcp://" ++ address ++ ":" ++ show port
302         ctx <- context
303         sock <- socket ctx Router
304         setReceiveHighWM (restrict (100*1000 :: Int)) sock
305         setSendHighWM (restrict (100*1000 :: Int)) sock
306         bind sock addr
307         msock <- newMVar sock
308         repTidL <- newEmptyMVar
309         repTidN <- newEmptyMVar
310         reqTid <- newEmptyMVar
311         parTid <- myThreadId
312         lock <- newEmptyMVar
313         let masterState = MasterState { localState = lst
314                                       , nodeStatus = ns
315                                       , repRedundancy = red
316                                       , repFinalizers = repFin
317                                       , masterStateLock = lock
318                                       , masterRevision = rev
319                                       , masterRevisionN = revN
320                                       , masterReplicationChan = repChan
321                                       , masterReplicationChanN = repChanN
322                                       , masterRepLThreadId = repTidL
323                                       , masterRepNThreadId = repTidN
324                                       , masterReqThreadId = reqTid
325                                       , masterParentThreadId = parTid
326                                       , zmqContext = ctx
327                                       , zmqAddr = addr
328                                       , zmqSocket = msock
329                                       }
330         void $ forkIO $ masterRequestHandler masterState
331         void $ forkIO $ masterReplicationHandlerL masterState
332         void $ forkIO $ masterReplicationHandlerN masterState
333         return $ toAcidState masterState
334
335 -- | Close the master state.
336 closeMasterState :: MasterState st -> IO ()
337 closeMasterState MasterState{..} =
338     -- disallow requests
339     whenM (tryPutMVar masterStateLock ()) $ do
340         debug "Closing master state."
341         -- send nodes quit
342         debug "Nodes quitting."
343         withMVar nodeStatus $ mapM_ (sendToSlave zmqSocket MasterQuit) . M.keys
344         -- wait all nodes done
345         waitPollN 100 1000 (withMVar nodeStatus (return . M.null))
346         -- wait replication chan
347         debug "Waiting for repChans to empty."
348         writeChan masterReplicationChan RIEnd
349         mtid <- myThreadId
350         putMVar masterRepLThreadId mtid
351         putMVar masterRepNThreadId mtid
352         -- kill handler
353         debug "Killing request handler."
354         withMVar masterReqThreadId $ flip throwTo GracefulExit
355         -- cleanup zmq
356         debug "Closing down zmq."
357         withMVar zmqSocket $ \sock -> do
358             unbind sock zmqAddr
359             close sock
360         term zmqContext
361         -- cleanup local state
362         closeAcidState localState
363
364 -- | Update on master site.
365 scheduleMasterUpdate :: (UpdateEvent event, Typeable (EventState event)) => MasterState (EventState event) -> event -> IO (MVar (EventResult event))
366 scheduleMasterUpdate masterState@MasterState{..} event = do
367         debug "Update by Master."
368         unlocked <- isEmptyMVar masterStateLock
369         if not unlocked then error "State is locked!"
370         else do
371             result <- newEmptyMVar
372             let callback = if repRedundancy > 1
373                 then
374                     -- the returned action fills in result when executed later
375                     scheduleLocalUpdate' (downcast localState) event result
376                 else do
377                     hd <- scheduleUpdate localState event
378                     void $ forkIO (putMVar result =<< takeMVar hd)
379                     return (return ())      -- bogus finalizer
380             let encoded = runPutLazy (safePut event)
381             queueRepItem masterState (RIUpdate (methodTag event, encoded) (Left callback))
382             return result
383
384 -- | Cold Update on master site.
385 scheduleMasterColdUpdate :: Typeable st => MasterState st -> Tagged ByteString -> IO (MVar ByteString)
386 scheduleMasterColdUpdate masterState@MasterState{..} encoded = do
387         debug "Cold Update by Master."
388         unlocked <- isEmptyMVar masterStateLock
389         if not unlocked then error "State is locked!"
390         else do
391             result <- newEmptyMVar
392             let callback = if repRedundancy > 1
393                 then
394                     -- the returned action fills in result when executed later
395                     scheduleLocalColdUpdate' (downcast localState) encoded result
396                 else do
397                     hd <- scheduleColdUpdate localState encoded
398                     void $ forkIO (putMVar result =<< takeMVar hd)
399                     return (return ())      -- bogus finalizer
400             queueRepItem masterState (RIUpdate encoded (Left callback))
401             return result
402
403 -- | Queue an RepItem (originating from the Master itself of an Slave via zmq)
404 queueRepItem :: MasterState st -> ReplicationItem -> IO ()
405 queueRepItem MasterState{..} = writeChan masterReplicationChan
406
407 -- | The local replication handler. Takes care to run Updates locally.
408 masterReplicationHandlerL :: (Typeable st) => MasterState st -> IO ()
409 masterReplicationHandlerL MasterState{..} = do
410     mtid <- myThreadId
411     putMVar masterRepLThreadId mtid
412     let loop = handle (\e -> throwTo masterParentThreadId (e :: SomeException)) $ do
413             debug "Replicating next item locally."
414             repItem <- readChan masterReplicationChan
415             case repItem of
416                 RIEnd -> return ()
417                 RIArchive -> do
418                     debug "Archive on master."
419                     createArchive localState
420                     loop
421                 RICheckpoint -> do
422                     debug "Checkpoint on master."
423                     createCheckpoint localState
424                     loop
425                 RIUpdate event sink -> do
426                     if repRedundancy > 1
427                     then do
428                         (rev, act) <- modifyMVar masterRevision $ \r -> do
429                             a <- case sink of
430                                 Left callback   -> callback
431                                 _               -> newEmptyMVar >>= scheduleLocalColdUpdate' (downcast localState) event
432                             return (r+1,(r+1,a))
433                         -- act finalizes the transaction - will be run after full replication
434                         modifyMVar_ repFinalizers $ return . IM.insert rev act
435                     else
436                         modifyMVar_ masterRevision $ \r -> do
437                             case sink of
438                                 Left callback   -> void callback
439                                 _               -> void $ scheduleColdUpdate localState event
440                             return (r+1)
441                     loop
442     loop
443     -- signal that we're done
444     void $ takeMVar masterRepLThreadId
445
446 -- | The network replication handler. Takes care to run Updates on Slaves.
447 masterReplicationHandlerN :: MasterState st -> IO ()
448 masterReplicationHandlerN MasterState{..} = do
449     mtid <- myThreadId
450     putMVar masterRepNThreadId mtid
451     let loop = handle (\e -> throwTo masterParentThreadId (e :: SomeException)) $ do
452             debug "Replicating next item in network."
453             repItem <- readChan masterReplicationChanN
454             case repItem of
455                 RIEnd -> return ()
456                 RIArchive -> do
457                     withMVar nodeStatus $ \ns -> do
458                         debug "Sending archive request to Slaves."
459                         withMVar masterRevisionN $ \mr ->
460                             forM_ (M.keys ns) $ sendArchive zmqSocket mr
461                     loop
462                 RICheckpoint -> do
463                     withMVar nodeStatus $ \ns -> do
464                         debug "Sending Checkpoint Request to Slaves."
465                         withMVar masterRevisionN $ \mr ->
466                             forM_ (M.keys ns) $ sendCheckpoint zmqSocket mr
467                     loop
468                 RIUpdate event sink -> do
469                     withMVar nodeStatus $ \ns -> do
470                         debug $ "Sending Update to Slaves, there are " ++ show (M.size ns)
471                         modifyMVar_ masterRevisionN $ \mrOld -> do
472                             let mr = mrOld + 1
473                             case sink of
474                                 Left _ -> forM_ (M.keys $ M.filter (<mr) ns) $ sendUpdate zmqSocket mr Nothing event
475                                 Right (reqID, reqNodeIdent) -> do
476                                     let noReqSlaves = filter (/= reqNodeIdent) $ M.keys $ M.filter (<mr) ns
477                                     sendUpdate zmqSocket mr (Just reqID) event reqNodeIdent
478                                     forM_ noReqSlaves $ sendUpdate zmqSocket mr Nothing event
479                             return mr
480                     loop
481     loop
482     -- signal that we're done
483     void $ takeMVar masterRepNThreadId
484     where
485         sendUpdate sock revision reqId encoded =
486             sendToSlave sock (DoRep revision reqId encoded)
487         sendCheckpoint sock revision = sendToSlave sock (DoCheckpoint revision)
488         sendArchive sock revision = sendToSlave sock (DoArchive revision)
489
490 -- | Create a checkpoint (on all nodes, per request).
491 --   This is useful for faster resume of both the Master (at startup) and
492 --   Slaves (at startup and reconnect).
493 createMasterCheckpoint :: MasterState st -> IO ()
494 createMasterCheckpoint masterState@MasterState{..} = do
495     debug "Checkpoint."
496     unlocked <- isEmptyMVar masterStateLock
497     unless unlocked $ error "State is locked."
498     queueRepItem masterState RICheckpoint
499
500 -- | Create an archive on all nodes.
501 --   Usually createArchive (local to each node) is appropriate.
502 --   Also take care: Nodes that are not connected at the time, will not create
503 --   an archive (on reconnect).
504 createArchiveGlobally :: (IsAcidic st, Typeable st) => AcidState st -> IO ()
505 createArchiveGlobally acid = do
506     debug "Archive globally."
507     let masterState = downcast acid
508     queueRepItem masterState RIArchive
509
510
511 toAcidState :: (IsAcidic st, Typeable st) => MasterState st -> AcidState st
512 toAcidState master
513   = AcidState { _scheduleUpdate    = scheduleMasterUpdate master
514               , scheduleColdUpdate = scheduleMasterColdUpdate master
515               , _query             = query $ localState master
516               , queryCold          = queryCold $ localState master
517               , createCheckpoint   = createMasterCheckpoint master
518               , createArchive      = createArchive $ localState master
519               , closeAcidState     = closeMasterState master
520               , acidSubState       = mkAnyState master
521               }
522