ref: Master
[acid-state-dist.git] / src / Data / Acid / Centered / Master.hs
1 {-# LANGUAGE DeriveDataTypeable, RecordWildCards, FlexibleContexts #-}
2 --------------------------------------------------------------------------------
3 {- |
4   Module      :  Data.Acid.Centered.Master
5   Copyright   :  ?
6
7   Maintainer  :  max.voit+hdv@with-eyes.net
8   Portability :  non-portable (uses GHC extensions)
9
10   The Master part of the Centered replication backend for acid state.
11
12 -}
13 {- big chunks still todo:
14     o authentification
15     o encryption
16 -}
17 module Data.Acid.Centered.Master
18     (
19       openMasterState
20     , openMasterStateFrom
21     , openRedMasterState
22     , openRedMasterStateFrom
23     , createArchiveGlobally
24     , MasterState(..)
25     ) where
26
27 import Data.Typeable
28 import Data.SafeCopy
29 import Data.Serialize (decode, encode, runPutLazy)
30
31 import Data.Acid
32 import Data.Acid.Core
33 import Data.Acid.Abstract
34 import Data.Acid.Local
35 import Data.Acid.Log
36
37 import Data.Acid.Centered.Common
38
39 import Control.Concurrent (forkIO, ThreadId, myThreadId)
40 import Control.Concurrent.Chan (Chan, newChan, writeChan, readChan, dupChan)
41 import Control.Concurrent.STM.TVar (readTVar)
42 import Control.Concurrent.MVar(MVar, newMVar, newEmptyMVar,
43                                takeMVar, putMVar, tryPutMVar, isEmptyMVar,
44                                modifyMVar, modifyMVar_, withMVar)
45
46 import Control.Monad.STM (atomically)
47 import Control.Monad (when, unless, void, forM_, liftM2)
48 import Control.Exception (handle, throwTo, SomeException)
49
50 import System.ZMQ4 (Context, Socket, Router(..), Receiver,
51                     setReceiveHighWM, setSendHighWM, restrict,
52                     context, term, socket, close, bind, unbind,
53                     poll, Poll(..), Event(..),
54                     sendMulti, receiveMulti)
55 import System.FilePath ( (</>) )
56
57 import qualified Data.ByteString.Lazy.Char8 as CSL
58 import           Data.ByteString.Lazy.Char8 (ByteString)
59 import qualified Data.ByteString.Char8 as CS
60
61 import qualified Data.Map as M
62 import           Data.Map (Map)
63 import qualified Data.IntMap as IM
64 import           Data.IntMap (IntMap)
65 import qualified Data.List.NonEmpty as NEL
66 import Safe (headDef)
67
68 --------------------------------------------------------------------------------
69
70 -- | Master state structure, for internal use.
71 data MasterState st
72     = MasterState { localState :: AcidState st
73                   , nodeStatus :: MVar NodeStatus
74                   , repRedundancy :: Int
75                   , repFinalizers :: MVar (IntMap (IO ()))
76                   , masterStateLock :: MVar ()
77                   , masterRevision :: MVar NodeRevision
78                   , masterRevisionN :: MVar NodeRevision
79                   , masterReplicationChan :: Chan ReplicationItem
80                   , masterReplicationChanN :: Chan ReplicationItem
81                   , masterReqThreadId :: MVar ThreadId
82                   , masterRepLThreadId :: MVar ThreadId
83                   , masterRepNThreadId :: MVar ThreadId
84                   , masterParentThreadId :: ThreadId
85                   , zmqContext :: Context
86                   , zmqAddr :: String
87                   , zmqSocket :: MVar (Socket Router)
88                   } deriving (Typeable)
89
90 type NodeIdentity = CS.ByteString
91 type NodeStatus = Map NodeIdentity NodeRevision
92 type Callback = IO (IO ())      -- an IO action that returns a finalizer
93 data ReplicationItem =
94       RIEnd
95     | RICheckpoint
96     | RIArchive
97     | RIUpdate (Tagged ByteString) (Either Callback (RequestID, NodeIdentity))
98
99 -- | The request handler on master node. Does
100 --      o handle receiving requests from nodes,
101 --      o answering as needed (old updates),
102 --      o bookkeeping on node states.
103 masterRequestHandler :: (IsAcidic st, Typeable st) => MasterState st -> IO ()
104 masterRequestHandler masterState@MasterState{..} = do
105     mtid <- myThreadId
106     putMVar masterReqThreadId mtid
107     let loop = handle (\e -> throwTo masterParentThreadId (e :: SomeException)) $
108           handle killHandler $ do
109             -- take one frame
110             -- waitRead =<< readMVar zmqSocket
111             -- FIXME: we needn't poll if not for strange zmq behaviour
112             re <- withMVar zmqSocket $ \sock -> poll 100 [Sock sock [In] Nothing]
113             unless (null $ head re) $ do
114                 (ident, msg) <- withMVar zmqSocket receiveFrame
115                 handleMessage ident msg
116             loop
117     loop
118     where
119         killHandler :: AcidException -> IO ()
120         killHandler GracefulExit = return ()
121         identityIsValid i = do
122             isMember <- withMVar nodeStatus $ return . (i `M.member`)
123             if isMember then return True
124             else do
125                 debug $ "Request by unknown node [" ++ CS.unpack i ++ "]"
126                 sendToSlave zmqSocket MayQuit i
127                 return False
128         handleMessage i m = case m of
129              -- New Slave joined.
130              NewSlave r -> connectNode masterState i r
131              -- Slave is done replicating.
132              RepDone r -> whenM (identityIsValid i) $
133                  updateNodeStatus masterState i r
134              -- Slave sends an Udate.
135              ReqUpdate rid event -> whenM (identityIsValid i) $
136                  queueRepItem masterState (RIUpdate event (Right (rid, i)))
137              -- Slave quits.
138              SlaveQuit -> do
139                  sendToSlave zmqSocket MayQuit i
140                  removeFromNodeStatus nodeStatus i
141              RepError -> do
142                  sendToSlave zmqSocket MayQuit i
143                  removeFromNodeStatus nodeStatus i
144              -- no other messages possible
145
146 -- | Remove a Slave node from NodeStatus.
147 removeFromNodeStatus :: MVar NodeStatus -> NodeIdentity -> IO ()
148 removeFromNodeStatus nodeStatus ident =
149         modifyMVar_ nodeStatus $ return . M.delete ident
150
151 -- | Update the NodeStatus after a node has replicated an Update.
152 updateNodeStatus :: MasterState st -> NodeIdentity -> Int -> IO ()
153 updateNodeStatus MasterState{..} ident rev =
154     modifyMVar_ nodeStatus $ \ns -> do
155         when (ns M.! ident /= (rev - 1)) $
156             error $ "Invalid increment of node status "
157                 ++ show (ns M.! ident) ++ " -> " ++ show rev
158         let rns = M.adjust (+1) ident ns
159         -- only for redundant operation:
160         when ((repRedundancy > 1) && (M.size (M.filter (>=rev) rns) >= (repRedundancy - 1))) $ do
161             debug $ "Full replication of " ++ show rev
162             -- finalize local replication
163             runAndDelFinalizer rev
164             -- send out FullRep signal
165             forM_ (M.keys ns) $ sendToSlave zmqSocket (FullRep rev)
166         return rns
167     where
168         runAndDelFinalizer r = modifyMVar_ repFinalizers $ \rf -> do
169             rf IM.! r
170             return $ IM.delete r rf
171
172 -- | Connect a new Slave by getting it up-to-date,
173 --   i.e. send all past events as Updates. This is fire&forget.
174 connectNode :: (IsAcidic st, Typeable st) => MasterState st -> NodeIdentity -> Revision -> IO ()
175 connectNode MasterState{..} i revision =
176     -- locking masterRevision prohibits additional events written on disk
177     withMVar masterRevision $ \mr ->
178         modifyMVar_ nodeStatus $ \ns -> do
179             -- crc generated from localCore thus corresponds to disk
180             crc <- crcOfState localState
181             -- if there has been a checkpoint in between:
182             lastCp <- getLastCheckpointRev localState
183             let lastCpRev = cpRevision lastCp
184             debug $ "Found checkpoint at revision " ++ show lastCpRev
185             if lastCpRev > revision then do
186                 -- send last checkpoint and newer events
187                 sendSyncCheckpoint zmqSocket lastCp i
188                 pastUpdates <- getPastUpdates localState lastCpRev
189                 forM_ pastUpdates $ \(r, u) -> sendSyncUpdate zmqSocket r u i
190             else do
191                 -- just the events
192                 pastUpdates <- getPastUpdates localState revision
193                 forM_ pastUpdates $ \(r, u) -> sendSyncUpdate zmqSocket r u i
194             -- now done, crc
195             sendToSlave zmqSocket (SyncDone crc) i
196             let nns = M.insert i mr ns
197             -- only for redundant operation:
198             when (repRedundancy > 1) $ checkRepStatus mr nns
199             return nns
200     where
201         cpRevision (Checkpoint r _) = r
202         sendSyncCheckpoint sock (Checkpoint cr encoded) =
203             sendToSlave sock (DoSyncCheckpoint cr encoded)
204         sendSyncUpdate sock r encoded =
205             sendToSlave sock (DoSyncRep r encoded)
206         -- FIXME: do this better (less than maxRev is possible in corner cases)
207         checkRepStatus maxRev pns =
208             when (M.size (M.filter (>= maxRev) pns) >= (repRedundancy-2)) $ do
209                 debug $ "Full replication up to " ++ show maxRev
210                 -- finalize local replication
211                 modifyMVar_ repFinalizers $ \rf -> do
212                     forM_ (filter (<= maxRev) (IM.keys rf)) $ \r -> rf IM.! r
213                     return $ IM.filterWithKey (\k _ -> k > maxRev) rf
214                 -- send out FullRep signal
215                 forM_ (M.keys pns) $ sendToSlave zmqSocket (FullRepTo maxRev)
216
217
218 -- | Fetch past Updates from FileLog for replication.
219 getPastUpdates :: (Typeable st) => AcidState st -> Int -> IO [(Int, Tagged ByteString)]
220 getPastUpdates state startRev =
221     liftM2 zip (return [(startRev+1)..]) (readEntriesFrom (localEvents $ downcast state) startRev)
222
223 -- | Get the revision at which the last checkpoint was taken.
224 getLastCheckpointRev :: (Typeable st) => AcidState st -> IO Checkpoint
225 getLastCheckpointRev state = do
226     let cplog = localCheckpoints $ downcast state
227     nextId <- atomically $ readTVar $ logNextEntryId cplog
228     cps <- readEntriesFrom cplog (nextId - 1)
229     return $ headDef (Checkpoint 0 CSL.empty) cps
230
231 -- | Send a message to a Slave
232 sendToSlave :: MVar (Socket Router) -> MasterMessage -> NodeIdentity -> IO ()
233 sendToSlave msock msg ident = withMVar msock $ \sock -> sendMulti sock $ NEL.fromList [ident, encode msg]
234
235 -- | Receive one Frame. A Frame consists of two messages:
236 --      sender ID and actual content
237 receiveFrame :: (Receiver t) => Socket t -> IO (NodeIdentity, SlaveMessage)
238 receiveFrame sock = do
239     list <- receiveMulti sock
240     when (length list /= 2) $ error "Received invalid frame."
241     let ident = head list
242     let msg = list !! 1
243     case decode msg of
244         Left str -> error $ "Data.Serialize.decode failed on SlaveMessage: " ++ show str
245         Right smsg -> do
246             debug $ "Received from [" ++ CS.unpack ident ++ "]: "
247                         ++ take 20 (show smsg)
248             return (ident, smsg)
249
250 -- | Open the Master state.
251 --
252 -- The directory for the local state files is the default one ("state/[typeOf state]/").
253 openMasterState :: (IsAcidic st, Typeable st) =>
254                String       -- ^ address to bind (useful to listen on specific interfaces only)
255             -> PortNumber   -- ^ port to bind to
256             -> st           -- ^ initial state
257             -> IO (AcidState st)
258 openMasterState address port initialState =
259     openMasterStateFrom ("state" </> show (typeOf initialState)) address port initialState
260
261 -- | Open the master state from a specific location.
262 openMasterStateFrom :: (IsAcidic st, Typeable st) =>
263                FilePath     -- ^ location of the local state files
264             -> String       -- ^ address to bind (useful to listen on specific interfaces only)
265             -> PortNumber   -- ^ port to bind to
266             -> st           -- ^ initial state
267             -> IO (AcidState st)
268 openMasterStateFrom directory address port =
269     openRedMasterStateFrom directory address port 0
270
271 -- | Open the master state with /n/-redundant replication.
272 --
273 -- The directory for the local state files is the default one ("state/[typeOf
274 -- state]/").
275 openRedMasterState :: (IsAcidic st, Typeable st) =>
276                String       -- ^ address to bind (useful to listen on specific interfaces only)
277             -> PortNumber   -- ^ port to bind to
278             -> Int          -- ^ guarantee n-redundant replication
279             -> st           -- ^ initial state
280             -> IO (AcidState st)
281 openRedMasterState address port red initialState =
282     openRedMasterStateFrom ("state" </> show (typeOf initialState)) address port red initialState
283
284 -- | Open the master state from a specific location with redundant replication.
285 openRedMasterStateFrom :: (IsAcidic st, Typeable st) =>
286                FilePath     -- ^ location of the local state files
287             -> String       -- ^ address to bind (useful to listen on specific interfaces only)
288             -> PortNumber   -- ^ port to bind to
289             -> Int          -- ^ guarantee /n/-redundant replication
290             -> st           -- ^ initial state
291             -> IO (AcidState st)
292 openRedMasterStateFrom directory address port red initialState = do
293         debug "opening master state"
294         -- local
295         lst <- openLocalStateFrom directory initialState
296         let levs = localEvents $ downcast lst
297         lrev <- atomically $ readTVar $ logNextEntryId levs
298         rev <- newMVar lrev
299         revN <- newMVar lrev
300         repChan <- newChan
301         repChanN <- dupChan repChan
302         repFin <- newMVar IM.empty
303         ns <- newMVar M.empty
304         repTidL <- newEmptyMVar
305         repTidN <- newEmptyMVar
306         reqTid <- newEmptyMVar
307         parTid <- myThreadId
308         sLock <- newEmptyMVar
309         -- remote
310         let addr = "tcp://" ++ address ++ ":" ++ show port
311         ctx <- context
312         sock <- socket ctx Router
313         setReceiveHighWM (restrict (100*1000 :: Int)) sock
314         setSendHighWM (restrict (100*1000 :: Int)) sock
315         bind sock addr
316         msock <- newMVar sock
317
318         let masterState = MasterState { localState = lst
319                                       , nodeStatus = ns
320                                       , repRedundancy = red
321                                       , repFinalizers = repFin
322                                       , masterStateLock = sLock
323                                       , masterRevision = rev
324                                       , masterRevisionN = revN
325                                       , masterReplicationChan = repChan
326                                       , masterReplicationChanN = repChanN
327                                       , masterRepLThreadId = repTidL
328                                       , masterRepNThreadId = repTidN
329                                       , masterReqThreadId = reqTid
330                                       , masterParentThreadId = parTid
331                                       , zmqContext = ctx
332                                       , zmqAddr = addr
333                                       , zmqSocket = msock
334                                       }
335         void $ forkIO $ masterRequestHandler masterState
336         void $ forkIO $ masterReplicationHandlerL masterState
337         void $ forkIO $ masterReplicationHandlerN masterState
338         return $ toAcidState masterState
339
340 -- | Close the master state.
341 closeMasterState :: MasterState st -> IO ()
342 closeMasterState MasterState{..} =
343     -- disallow requests
344     whenM (tryPutMVar masterStateLock ()) $ do
345         debug "Closing master state."
346         -- send nodes quit
347         debug "Nodes quitting."
348         withMVar nodeStatus $ mapM_ (sendToSlave zmqSocket MasterQuit) . M.keys
349         -- wait all nodes done
350         waitPollN 100 1000 (withMVar nodeStatus (return . M.null))
351         -- wait replication chan
352         debug "Waiting for repChans to empty."
353         writeChan masterReplicationChan RIEnd
354         mtid <- myThreadId
355         putMVar masterRepLThreadId mtid
356         putMVar masterRepNThreadId mtid
357         -- kill handler
358         debug "Killing request handler."
359         withMVar masterReqThreadId $ flip throwTo GracefulExit
360         -- cleanup zmq
361         debug "Closing down zmq."
362         withMVar zmqSocket $ \sock -> do
363             unbind sock zmqAddr
364             close sock
365         term zmqContext
366         -- cleanup local state
367         closeAcidState localState
368
369 -- | Update on master site.
370 scheduleMasterUpdate :: (UpdateEvent event, Typeable (EventState event)) => MasterState (EventState event) -> event -> IO (MVar (EventResult event))
371 scheduleMasterUpdate masterState@MasterState{..} event = do
372         debug "Update by Master."
373         unlocked <- isEmptyMVar masterStateLock
374         if not unlocked then error "State is locked!"
375         else do
376             result <- newEmptyMVar
377             let callback = if repRedundancy > 1
378                 then
379                     -- the returned action fills in result when executed later
380                     scheduleLocalUpdate' (downcast localState) event result
381                 else do
382                     hd <- scheduleUpdate localState event
383                     void $ forkIO (putMVar result =<< takeMVar hd)
384                     return (return ())      -- bogus finalizer
385             let encoded = runPutLazy (safePut event)
386             queueRepItem masterState (RIUpdate (methodTag event, encoded) (Left callback))
387             return result
388
389 -- | Cold Update on master site.
390 scheduleMasterColdUpdate :: Typeable st => MasterState st -> Tagged ByteString -> IO (MVar ByteString)
391 scheduleMasterColdUpdate masterState@MasterState{..} encoded = do
392         debug "Cold Update by Master."
393         unlocked <- isEmptyMVar masterStateLock
394         if not unlocked then error "State is locked!"
395         else do
396             result <- newEmptyMVar
397             let callback = if repRedundancy > 1
398                 then
399                     -- the returned action fills in result when executed later
400                     scheduleLocalColdUpdate' (downcast localState) encoded result
401                 else do
402                     hd <- scheduleColdUpdate localState encoded
403                     void $ forkIO (putMVar result =<< takeMVar hd)
404                     return (return ())      -- bogus finalizer
405             queueRepItem masterState (RIUpdate encoded (Left callback))
406             return result
407
408 -- | Queue an RepItem (originating from the Master itself of an Slave via zmq)
409 queueRepItem :: MasterState st -> ReplicationItem -> IO ()
410 queueRepItem MasterState{..} = writeChan masterReplicationChan
411
412 -- | The local replication handler. Takes care to run Updates locally.
413 masterReplicationHandlerL :: (Typeable st) => MasterState st -> IO ()
414 masterReplicationHandlerL MasterState{..} = do
415     mtid <- myThreadId
416     putMVar masterRepLThreadId mtid
417     let loop = handle (\e -> throwTo masterParentThreadId (e :: SomeException)) $ do
418             debug "Replicating next item locally."
419             repItem <- readChan masterReplicationChan
420             case repItem of
421                 RIEnd -> return ()
422                 RIArchive -> do
423                     debug "Archive on master."
424                     createArchive localState
425                     loop
426                 RICheckpoint -> do
427                     debug "Checkpoint on master."
428                     createCheckpoint localState
429                     loop
430                 RIUpdate event sink -> do
431                     if repRedundancy > 1 then do
432                         (rev, act) <- modifyMVar masterRevision $ \r -> do
433                             a <- case sink of
434                                 Left callback   -> callback
435                                 _               -> newEmptyMVar >>= scheduleLocalColdUpdate' (downcast localState) event
436                             return (r+1,(r+1,a))
437                         -- act finalizes the transaction - will be run after full replication
438                         modifyMVar_ repFinalizers $ return . IM.insert rev act
439                     else
440                         modifyMVar_ masterRevision $ \r -> do
441                             case sink of
442                                 Left callback   -> void callback
443                                 _               -> void $ scheduleColdUpdate localState event
444                             return (r+1)
445                     loop
446     loop
447     -- signal that we're done
448     void $ takeMVar masterRepLThreadId
449
450 -- | The network replication handler. Takes care to run Updates on Slaves.
451 masterReplicationHandlerN :: MasterState st -> IO ()
452 masterReplicationHandlerN MasterState{..} = do
453     mtid <- myThreadId
454     putMVar masterRepNThreadId mtid
455     let loop = handle (\e -> throwTo masterParentThreadId (e :: SomeException)) $ do
456             debug "Replicating next item in network."
457             repItem <- readChan masterReplicationChanN
458             case repItem of
459                 RIEnd -> return ()
460                 RIArchive -> do
461                     withMVar nodeStatus $ \ns -> do
462                         debug "Sending archive request to Slaves."
463                         withMVar masterRevisionN $ \mr ->
464                             forM_ (M.keys ns) $ sendArchive zmqSocket mr
465                     loop
466                 RICheckpoint -> do
467                     withMVar nodeStatus $ \ns -> do
468                         debug "Sending Checkpoint Request to Slaves."
469                         withMVar masterRevisionN $ \mr ->
470                             forM_ (M.keys ns) $ sendCheckpoint zmqSocket mr
471                     loop
472                 RIUpdate event sink -> do
473                     withMVar nodeStatus $ \ns -> do
474                         debug $ "Sending Update to Slaves, there are " ++ show (M.size ns)
475                         modifyMVar_ masterRevisionN $ \mrOld -> do
476                             let mr = mrOld + 1
477                             case sink of
478                                 Left _ -> forM_ (M.keys $ M.filter (<mr) ns) $ sendUpdate zmqSocket mr Nothing event
479                                 Right (reqID, reqNodeIdent) -> do
480                                     let noReqSlaves = filter (/= reqNodeIdent) $ M.keys $ M.filter (<mr) ns
481                                     sendUpdate zmqSocket mr (Just reqID) event reqNodeIdent
482                                     forM_ noReqSlaves $ sendUpdate zmqSocket mr Nothing event
483                             return mr
484                     loop
485     loop
486     -- signal that we're done
487     void $ takeMVar masterRepNThreadId
488     where
489         sendUpdate sock revision reqId encoded =
490             sendToSlave sock (DoRep revision reqId encoded)
491         sendCheckpoint sock revision = sendToSlave sock (DoCheckpoint revision)
492         sendArchive sock revision = sendToSlave sock (DoArchive revision)
493
494 -- | Create a checkpoint (on all nodes, per request).
495 --   This is useful for faster resume of both the Master (at startup) and
496 --   Slaves (at startup and reconnect).
497 createMasterCheckpoint :: MasterState st -> IO ()
498 createMasterCheckpoint masterState@MasterState{..} = do
499     debug "Checkpoint."
500     unlocked <- isEmptyMVar masterStateLock
501     unless unlocked $ error "State is locked."
502     queueRepItem masterState RICheckpoint
503
504 -- | Create an archive on all nodes.
505 --   Usually createArchive (local to each node) is appropriate.
506 --   Also take care: Nodes that are not connected at the time, will not create
507 --   an archive (on reconnect).
508 createArchiveGlobally :: (IsAcidic st, Typeable st) => AcidState st -> IO ()
509 createArchiveGlobally acid = do
510     debug "Archive globally."
511     let masterState = downcast acid
512     queueRepItem masterState RIArchive
513
514
515 toAcidState :: (IsAcidic st, Typeable st) => MasterState st -> AcidState st
516 toAcidState master
517   = AcidState { _scheduleUpdate    = scheduleMasterUpdate master
518               , scheduleColdUpdate = scheduleMasterColdUpdate master
519               , _query             = query $ localState master
520               , queryCold          = queryCold $ localState master
521               , createCheckpoint   = createMasterCheckpoint master
522               , createArchive      = createArchive $ localState master
523               , closeAcidState     = closeMasterState master
524               , acidSubState       = mkAnyState master
525               }
526