ref: Slave
[acid-state-dist.git] / src / Data / Acid / Centered / Slave.hs
1 {-# LANGUAGE DeriveDataTypeable, RecordWildCards, FlexibleContexts #-}
2 --------------------------------------------------------------------------------
3 {- |
4   Module      :  Data.Acid.CenteredSlave.hs
5   Copyright   :  ?
6
7   Maintainer  :  max.voit+hdv@with-eyes.net
8   Portability :  non-portable (uses GHC extensions)
9
10   The Slave part of a the Centered replication backend for acid state.
11
12 -}
13
14 --------------------------------------------------------------------------------
15 -- SLAVE part
16
17 module Data.Acid.Centered.Slave
18     (
19       enslaveState
20     , enslaveStateFrom
21     , enslaveRedState
22     , enslaveRedStateFrom
23     , SlaveState(..)
24     )  where
25
26 import Data.Typeable
27 import Data.SafeCopy
28 import Data.Serialize (decode, encode, runPutLazy, runGetLazy)
29
30 import Data.Acid
31 import Data.Acid.Core
32 import Data.Acid.Abstract
33 import Data.Acid.Local
34 import Data.Acid.Log
35
36 import Data.Acid.Centered.Common
37
38 import Control.Concurrent (forkIO, ThreadId, myThreadId, killThread, threadDelay, forkIOWithUnmask)
39 import Control.Concurrent.MVar (MVar, newMVar, newEmptyMVar, isEmptyMVar,
40                                 withMVar, modifyMVar, modifyMVar_,
41                                 takeMVar, putMVar, tryPutMVar)
42 import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
43 import Control.Concurrent.STM.TVar (readTVar, writeTVar)
44 import Data.IORef (writeIORef)
45 import qualified Control.Concurrent.Event as Event
46
47 import Control.Monad.STM (atomically)
48 import Control.Monad (void, when, unless)
49 import Control.Exception (handle, throwTo, SomeException, ErrorCall(..))
50
51 import System.ZMQ4 (Context, Socket, Dealer(..),
52                     setReceiveHighWM, setSendHighWM, setLinger, restrict,
53                     poll, Poll(..), Event(..),
54                     context, term, socket, close,
55                     connect, disconnect, send, receive)
56 import System.FilePath ( (</>) )
57
58 import Data.ByteString.Lazy.Char8 (ByteString)
59
60 import           Data.IntMap (IntMap)
61 import qualified Data.IntMap as IM
62
63 --------------------------------------------------------------------------------
64
65 -- | Slave state structure, for internal use.
66 data SlaveState st
67     = SlaveState { slaveLocalState :: AcidState st
68                  , slaveStateIsRed :: Bool
69                  , slaveStateLock :: MVar ()
70                  , slaveRepFinalizers :: MVar (IntMap (IO ()))
71                  , slaveRepChan :: Chan SlaveRepItem
72                  , slaveSyncDone :: Event.Event
73                  , slaveRevision :: MVar NodeRevision
74                  , slaveRequests :: MVar SlaveRequests
75                  , slaveLastRequestID :: MVar RequestID
76                  , slaveRepThreadId :: MVar ThreadId
77                  , slaveReqThreadId :: MVar ThreadId
78                  , slaveParentThreadId :: ThreadId
79                  , slaveZmqContext :: Context
80                  , slaveZmqAddr :: String
81                  , slaveZmqSocket :: MVar (Socket Dealer)
82                  } deriving (Typeable)
83
84 -- | Memory of own Requests sent to Master.
85 type SlaveRequests = IntMap (IO (IO ()),ThreadId)
86
87 -- | One Update + Metainformation to replicate.
88 data SlaveRepItem =
89       SRIEnd
90     | SRICheckpoint Revision
91     | SRIArchive Revision
92     | SRIUpdate Revision (Maybe RequestID) (Tagged ByteString)
93
94 -- | Open a local State as Slave for a Master.
95 --
96 -- The directory for the local state files is the default one ("state/[typeOf
97 -- state]").
98 enslaveState :: (IsAcidic st, Typeable st) =>
99             String          -- ^ hostname of the Master
100          -> PortNumber      -- ^ port to connect to
101          -> st              -- ^ initial state
102          -> IO (AcidState st)
103 enslaveState address port initialState =
104     enslaveStateFrom ("state" </> show (typeOf initialState)) address port initialState
105
106 -- | Open a local State as Slave for a Master.
107 --
108 -- The directory for the local state files is the default one ("state/[typeOf
109 -- state]").
110 enslaveRedState :: (IsAcidic st, Typeable st) =>
111             String          -- ^ hostname of the Master
112          -> PortNumber      -- ^ port to connect to
113          -> st              -- ^ initial state
114          -> IO (AcidState st)
115 enslaveRedState address port initialState =
116     enslaveRedStateFrom ("state" </> show (typeOf initialState)) address port initialState
117
118 -- | Open a local State as Slave for a Master. The directory of the local state
119 -- files can be specified.
120 enslaveStateFrom :: (IsAcidic st, Typeable st) =>
121             FilePath        -- ^ location of the local state files.
122          -> String          -- ^ hostname of the Master
123          -> PortNumber      -- ^ port to connect to
124          -> st              -- ^ initial state
125          -> IO (AcidState st)
126 enslaveStateFrom = enslaveMayRedStateFrom False
127
128 -- | Open a local State as Slave for a _redundant_ Master. The directory of the local state
129 -- files can be specified.
130 enslaveRedStateFrom :: (IsAcidic st, Typeable st) =>
131             FilePath        -- ^ location of the local state files.
132          -> String          -- ^ hostname of the Master
133          -> PortNumber      -- ^ port to connect to
134          -> st              -- ^ initial state
135          -> IO (AcidState st)
136 enslaveRedStateFrom = enslaveMayRedStateFrom True
137
138 -- | Open a local State as Slave for a Master, redundant or not.
139 --   The directory of the local state files can be specified.
140 enslaveMayRedStateFrom :: (IsAcidic st, Typeable st) =>
141             Bool            -- ^ is redundant
142          -> FilePath        -- ^ location of the local state files.
143          -> String          -- ^ hostname of the Master
144          -> PortNumber      -- ^ port to connect to
145          -> st              -- ^ initial state
146          -> IO (AcidState st)
147 enslaveMayRedStateFrom isRed directory address port initialState = do
148         -- local
149         lst <- openLocalStateFrom directory initialState
150         lrev <- getLocalRevision lst
151         rev <- newMVar lrev
152         debug $ "Opening enslaved state at revision " ++ show lrev
153         srs <- newMVar IM.empty
154         lastReqId <- newMVar 0
155         repChan <- newChan
156         syncDone <- Event.new
157         reqTid <- newEmptyMVar
158         repTid <- newEmptyMVar
159         parTid <- myThreadId
160         repFin <- newMVar IM.empty
161         sLock <- newEmptyMVar
162         -- remote
163         let addr = "tcp://" ++ address ++ ":" ++ show port
164         ctx <- context
165         sock <- socket ctx Dealer
166         setReceiveHighWM (restrict (100*1000 :: Int)) sock
167         setSendHighWM (restrict (100*1000 :: Int)) sock
168         connect sock addr
169         msock <- newMVar sock
170         sendToMaster msock $ NewSlave lrev
171
172         let slaveState = SlaveState { slaveLocalState = lst
173                                     , slaveStateIsRed = isRed
174                                     , slaveStateLock = sLock
175                                     , slaveRepFinalizers = repFin
176                                     , slaveRepChan = repChan
177                                     , slaveSyncDone = syncDone
178                                     , slaveRevision = rev
179                                     , slaveRequests = srs
180                                     , slaveLastRequestID = lastReqId
181                                     , slaveReqThreadId = reqTid
182                                     , slaveRepThreadId = repTid
183                                     , slaveParentThreadId = parTid
184                                     , slaveZmqContext = ctx
185                                     , slaveZmqAddr = addr
186                                     , slaveZmqSocket = msock
187                                     }
188         void $ forkIOWithUnmask $ slaveRequestHandler slaveState
189         void $ forkIO $ slaveReplicationHandler slaveState
190         return $ slaveToAcidState slaveState
191         where
192             getLocalRevision =
193                 atomically . readTVar . logNextEntryId . localEvents . downcast
194
195 -- | Replication handler of the Slave.
196 slaveRequestHandler :: (IsAcidic st, Typeable st) => SlaveState st -> (IO () -> IO ()) -> IO ()
197 slaveRequestHandler slaveState@SlaveState{..} unmask = do
198     mtid <- myThreadId
199     putMVar slaveReqThreadId mtid
200     let loop = handle (\e -> throwTo slaveParentThreadId (e :: SomeException)) $
201             unmask $ handle killHandler $ do
202             --waitRead =<< readMVar slaveZmqSocket
203             -- FIXME: we needn't poll if not for strange zmq behaviour
204             re <- withMVar slaveZmqSocket $ \sock -> poll 100 [Sock sock [In] Nothing]
205             unless (null $ head re) $ do
206                 msg <- withMVar slaveZmqSocket receive
207                 case decode msg of
208                     Left str -> error $ "Data.Serialize.decode failed on MasterMessage: " ++ show str
209                     Right mmsg -> handleMessage mmsg
210             loop
211     loop
212     where
213         killHandler :: AcidException -> IO ()
214         killHandler GracefulExit = return ()
215         handleMessage m = do
216             debug $ "Received: " ++ show m
217             case m of
218                -- We are sent an Update to replicate.
219                DoRep r i d -> queueRepItem slaveState (SRIUpdate r i d)
220                -- We are sent a Checkpoint for synchronization.
221                DoSyncCheckpoint r d -> replicateSyncCp slaveState r d
222                -- We are sent an Update to replicate for synchronization.
223                DoSyncRep r d -> replicateSyncUpdate slaveState r d
224                -- Master done sending all synchronization Updates.
225                SyncDone c -> onSyncDone slaveState c
226                -- We are sent a Checkpoint request.
227                DoCheckpoint r -> queueRepItem slaveState (SRICheckpoint r)
228                -- We are sent an Archive request.
229                DoArchive r -> queueRepItem slaveState (SRIArchive r)
230                -- Full replication of a revision
231                FullRep r -> modifyMVar_ slaveRepFinalizers $ \rf -> do
232                                rf IM.! r
233                                return $ IM.delete r rf
234                -- Full replication of events up to revision
235                FullRepTo r -> modifyMVar_ slaveRepFinalizers $ \rf -> do
236                                let (ef, nrf) = IM.partitionWithKey (\k _ -> k <= r) rf
237                                sequence_ (IM.elems ef)
238                                return nrf
239                -- We are allowed to Quit.
240                MayQuit -> writeChan slaveRepChan SRIEnd
241                -- We are requested to Quit - shall be handled by
242                -- 'bracket' usage by user.
243                MasterQuit -> throwTo slaveParentThreadId $
244                    ErrorCall "Data.Acid.Centered.Slave: Master quit."
245                -- no other messages possible, enforced by type checker
246
247 -- | After sync check CRC
248 onSyncDone :: (IsAcidic st, Typeable st) => SlaveState st -> Crc -> IO ()
249 onSyncDone SlaveState{..} crc = do
250     localCrc <- crcOfState slaveLocalState
251     if crc /= localCrc then
252         error "Data.Acid.Centered.Slave: CRC mismatch after sync."
253     else do
254         debug "Sync Done, CRC fine."
255         Event.set slaveSyncDone
256
257 -- | Queue Updates into Chan for replication.
258 -- We use the Chan so Sync-Updates and normal ones can be interleaved.
259 queueRepItem :: SlaveState st -> SlaveRepItem -> IO ()
260 queueRepItem SlaveState{..} repItem = do
261         debug "Queuing RepItem."
262         writeChan slaveRepChan repItem
263
264 -- | Replicates content of Chan.
265 slaveReplicationHandler :: Typeable st => SlaveState st -> IO ()
266 slaveReplicationHandler slaveState@SlaveState{..} = do
267         mtid <- myThreadId
268         putMVar slaveRepThreadId mtid
269
270         -- todo: timeout is magic variable, make customizable
271         noTimeout <- Event.waitTimeout slaveSyncDone $ 10*1000*1000
272         unless noTimeout $ throwTo slaveParentThreadId $
273             ErrorCall "Data.Acid.Centered.Slave: Took too long to sync. Timeout."
274
275         let loop = handle (\e -> throwTo slaveParentThreadId (e :: SomeException)) $ do
276                 mayRepItem <- readChan slaveRepChan
277                 case mayRepItem of
278                     SRIEnd          -> return ()
279                     SRICheckpoint r -> repCheckpoint slaveState r               >> loop
280                     SRIArchive r    -> repArchive slaveState r                  >> loop
281                     SRIUpdate r i d -> replicateUpdate slaveState r i d False   >> loop
282         loop
283
284         -- signal that we're done
285         void $ takeMVar slaveRepThreadId
286
287 -- | Replicate Sync-Checkpoints directly.
288 replicateSyncCp :: (IsAcidic st, Typeable st) =>
289         SlaveState st -> Revision -> ByteString -> IO ()
290 replicateSyncCp SlaveState{..} rev encoded = do
291     st <- decodeCheckpoint encoded
292     let lst = downcast slaveLocalState
293     let core = localCore lst
294     modifyMVar_ slaveRevision $ \sr -> do
295         when (sr > rev) $ error "Data.Acid.Centered.Slave: Revision mismatch for checkpoint: Slave is newer."
296         modifyCoreState_ core $ \_ -> do
297             writeIORef (localCopy lst) st
298             createCpFake lst encoded rev
299             adjustEventLogId lst rev
300             return st
301         return rev
302     where
303         adjustEventLogId l r = do
304             atomically $ writeTVar (logNextEntryId (localEvents l)) r
305             void $ cutFileLog (localEvents l)
306         createCpFake l e r = do
307             mvar <- newEmptyMVar
308             pushAction (localEvents l) $
309                 pushEntry (localCheckpoints l) (Checkpoint r e) (putMVar mvar ())
310             takeMVar mvar
311         decodeCheckpoint e =
312             case runGetLazy safeGet e of
313                 Left msg  -> error $ "Data.Acid.Centered.Slave: Checkpoint could not be decoded: " ++ msg
314                 Right val -> return val
315
316 -- | Replicate Sync-Updates directly.
317 replicateSyncUpdate :: Typeable st => SlaveState st -> Revision -> Tagged ByteString -> IO ()
318 replicateSyncUpdate slaveState rev event = replicateUpdate slaveState rev Nothing event True
319
320 -- | Replicate an Update as requested by Master.
321 --   Updates that were requested by this Slave are run locally and the result
322 --   put into the MVar in SlaveRequests.
323 --   Other Updates are just replicated without using the result.
324 replicateUpdate :: Typeable st => SlaveState st -> Revision -> Maybe RequestID -> Tagged ByteString -> Bool -> IO ()
325 replicateUpdate SlaveState{..} rev reqId event syncing = do
326         debug $ "Got an Update to replicate " ++ show rev
327         modifyMVar_ slaveRevision $ \nr -> if rev - 1 == nr
328             then do
329                 -- commit / run it locally
330                 case reqId of
331                     Nothing -> replicateForeign
332                     Just rid -> replicateOwn rid
333                 -- send reply: we're done
334                 unless syncing $ sendToMaster slaveZmqSocket $ RepDone rev
335                 return rev
336             else do
337                 sendToMaster slaveZmqSocket RepError
338                 void $ error $
339                     "Data.Acid.Centered.Slave: Replication failed at revision "
340                         ++ show nr ++ " -> " ++ show rev
341                 return nr
342         where
343             replicateForeign =
344                 if slaveStateIsRed then do
345                     act <- newEmptyMVar >>= scheduleLocalColdUpdate' (downcast slaveLocalState) event
346                     modifyMVar_ slaveRepFinalizers $ return . IM.insert rev act
347                 else
348                     void $ scheduleColdUpdate slaveLocalState event
349             replicateOwn rid = do
350                 act <- modifyMVar slaveRequests $ \srs -> do
351                     debug $ "This is the Update for Request " ++ show rid
352                     let (icallback, timeoutId) = srs IM.! rid
353                     callback <- icallback
354                     killThread timeoutId
355                     let nsrs = IM.delete rid srs
356                     return (nsrs, callback)
357                 when slaveStateIsRed $
358                     modifyMVar_ slaveRepFinalizers $ return . IM.insert rev act
359
360 repCheckpoint :: SlaveState st -> Revision -> IO ()
361 repCheckpoint SlaveState{..} rev = do
362     debug $ "Got Checkpoint request at revision: " ++ show rev
363     withMVar slaveRevision $ \_ ->
364         -- create checkpoint
365         createCheckpoint slaveLocalState
366
367 repArchive :: SlaveState st -> Revision -> IO ()
368 repArchive SlaveState{..} rev = do
369     debug $ "Got Archive request at revision: " ++ show rev
370     withMVar slaveRevision $ \_ ->
371         createArchive slaveLocalState
372
373
374 -- | Update on slave site.
375 --      The steps are:
376 --      - Request Update from Master
377 --      - Master issues Update with same RequestID
378 --      - repHandler replicates and puts result in MVar
379 scheduleSlaveUpdate :: (UpdateEvent e, Typeable (EventState e)) => SlaveState (EventState e) -> e -> IO (MVar (EventResult e))
380 scheduleSlaveUpdate slaveState@SlaveState{..} event = do
381     unlocked <- isEmptyMVar slaveStateLock
382     if not unlocked then error "State is locked."
383     else do
384         debug "Update by Slave."
385         result <- newEmptyMVar
386         reqId <- getNextRequestId slaveState
387         modifyMVar_ slaveRequests $ \srs -> do
388             let encoded = runPutLazy (safePut event)
389             sendToMaster slaveZmqSocket $ ReqUpdate reqId (methodTag event, encoded)
390             timeoutID <- forkIO $ timeoutRequest slaveState reqId result
391             let callback = if slaveStateIsRed
392                     then scheduleLocalUpdate' (downcast slaveLocalState) event result
393                     else do
394                         hd <- scheduleUpdate slaveLocalState event
395                         void $ forkIO $ putMVar result =<< takeMVar hd
396                         return (return ())      -- bogus finalizer
397             return $ IM.insert reqId (callback, timeoutID) srs
398         return result
399
400 -- | Cold Update on slave site. This enables for using Remote.
401 scheduleSlaveColdUpdate :: Typeable st => SlaveState st -> Tagged ByteString -> IO (MVar ByteString)
402 scheduleSlaveColdUpdate slaveState@SlaveState{..} encoded = do
403     unlocked <- isEmptyMVar slaveStateLock
404     if not unlocked then error "State is locked."
405     else do
406         debug "Cold Update by Slave."
407         result <- newEmptyMVar
408         -- slaveLastRequestID is only modified here - and used for locking the state
409         reqId <- getNextRequestId slaveState
410         modifyMVar_ slaveRequests $ \srs -> do
411             sendToMaster slaveZmqSocket $ ReqUpdate reqId encoded
412             timeoutID <- forkIO $ timeoutRequest slaveState reqId result
413             let callback = if slaveStateIsRed
414                     then scheduleLocalColdUpdate' (downcast slaveLocalState) encoded result
415                     else do
416                         hd <- scheduleColdUpdate slaveLocalState encoded
417                         void $ forkIO $ putMVar result =<< takeMVar hd
418                         return (return ())      -- bogus finalizer
419             return $ IM.insert reqId (callback, timeoutID) srs
420         return result
421
422 -- | Generate ID for another request.
423 getNextRequestId :: SlaveState st -> IO RequestID
424 getNextRequestId SlaveState{..} = modifyMVar slaveLastRequestID $ \x -> return (x+1,x+1)
425
426 -- | Ensures requests are actually answered or fail.
427 --   On timeout the Slave dies, not the thread that invoked the Update.
428 timeoutRequest :: SlaveState st -> RequestID -> MVar m -> IO ()
429 timeoutRequest SlaveState{..} reqId mvar = do
430     threadDelay $ 5*1000*1000
431     stillThere <- withMVar slaveRequests (return . IM.member reqId)
432     when stillThere $ do
433         putMVar mvar $ error "Data.Acid.Centered.Slave: Update-Request timed out."
434         throwTo slaveParentThreadId $ ErrorCall "Data.Acid.Centered.Slave: Update-Request timed out."
435
436 -- | Send a message to Master.
437 sendToMaster :: MVar (Socket Dealer) -> SlaveMessage -> IO ()
438 sendToMaster msock smsg = withMVar msock $ \sock -> send sock [] (encode smsg)
439
440 -- | Close an enslaved State.
441 liberateState :: SlaveState st -> IO ()
442 liberateState SlaveState{..} =
443     -- lock state against updates: disallow requests
444     whenM (tryPutMVar slaveStateLock ()) $ do
445         debug "Closing Slave state..."
446         -- check / wait unprocessed requests
447         debug "Waiting for Requests to finish."
448         waitPoll 100 (withMVar slaveRequests (return . IM.null))
449         -- send master quit message
450         sendToMaster slaveZmqSocket SlaveQuit
451         -- wait replication chan, only if sync done
452         syncDone <- Event.isSet slaveSyncDone
453         when syncDone $ do
454             debug "Waiting for repChan to empty."
455             mtid <- myThreadId
456             putMVar slaveRepThreadId mtid
457         -- kill handler threads
458         debug "Killing request handler."
459         withMVar slaveReqThreadId $ flip throwTo GracefulExit
460         -- cleanup zmq
461         debug "Closing down zmq."
462         withMVar slaveZmqSocket $ \s -> do
463             -- avoid the socket hanging around
464             setLinger (restrict (1000 :: Int)) s
465             disconnect s slaveZmqAddr
466             close s
467         term slaveZmqContext
468         -- cleanup local state
469         debug "Closing local state."
470         closeAcidState slaveLocalState
471
472 slaveToAcidState :: (IsAcidic st, Typeable st)  => SlaveState st -> AcidState st
473 slaveToAcidState slaveState
474   = AcidState { _scheduleUpdate    = scheduleSlaveUpdate slaveState
475               , scheduleColdUpdate = scheduleSlaveColdUpdate slaveState
476               , _query             = query $ slaveLocalState slaveState
477               , queryCold          = queryCold $ slaveLocalState slaveState
478               , createCheckpoint   = createCheckpoint $ slaveLocalState slaveState
479               , createArchive      = createArchive $ slaveLocalState slaveState
480               , closeAcidState     = liberateState slaveState
481               , acidSubState       = mkAnyState slaveState
482               }