d085427f136d5883a87adfad970d00611b35c7e2
[acid-state-dist.git] / src / Data / Acid / Centered / Slave.hs
1 {-# LANGUAGE DeriveDataTypeable, RecordWildCards, FlexibleContexts #-}
2 --------------------------------------------------------------------------------
3 {- |
4   Module      :  Data.Acid.CenteredSlave.hs
5   Copyright   :  ?
6
7   Maintainer  :  max.voit+hdv@with-eyes.net
8   Portability :  non-portable (uses GHC extensions)
9
10   The Slave part of a the Centered replication backend for acid state.
11
12 -}
13
14 --------------------------------------------------------------------------------
15 -- SLAVE part
16
17 module Data.Acid.Centered.Slave
18     (
19       enslaveState
20     , enslaveStateFrom
21     , enslaveRedState
22     , enslaveRedStateFrom
23     , SlaveState(..)
24     )  where
25
26 import Data.Typeable
27 import Data.SafeCopy
28 import Data.Serialize (decode, encode, runPutLazy, runGetLazy)
29
30 import Data.Acid
31 import Data.Acid.Core
32 import Data.Acid.Abstract
33 import Data.Acid.Local
34 import Data.Acid.Log
35
36 import Data.Acid.Centered.Common
37
38 import System.ZMQ4 (Context, Socket, Dealer(..),
39                     setReceiveHighWM, setSendHighWM, setLinger, restrict,
40                     poll, Poll(..), Event(..),
41                     context, term, socket, close,
42                     connect, disconnect, send, receive)
43 import System.FilePath ( (</>) )
44
45 import Control.Concurrent (forkIO, ThreadId, myThreadId, killThread, threadDelay, forkIOWithUnmask)
46 import Control.Concurrent.MVar (MVar, newMVar, newEmptyMVar, isEmptyMVar,
47                                 withMVar, modifyMVar, modifyMVar_,
48                                 takeMVar, putMVar, tryPutMVar)
49 import Data.IORef (writeIORef)
50 import Control.Monad (void, when, unless, liftM)
51 import Control.Monad.STM (atomically)
52 import Control.Concurrent.STM.TVar (readTVar, writeTVar)
53 import qualified Control.Concurrent.Event as Event
54 import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
55 import Control.Exception (handle, throwTo, SomeException, ErrorCall(..))
56
57 import Data.IntMap (IntMap)
58 import qualified Data.IntMap as IM
59 import Data.Maybe (fromMaybe)
60
61 import qualified Data.ByteString.Lazy.Char8 as CSL
62
63 --------------------------------------------------------------------------------
64
65 -- | Slave state structure, for internal use.
66 data SlaveState st
67     = SlaveState { slaveLocalState :: AcidState st
68                  , slaveStateIsRed :: Bool
69                  , slaveStateLock :: MVar ()
70                  , slaveRepFinalizers :: MVar (IntMap (IO ()))
71                  , slaveRepChan :: Chan SlaveRepItem
72                  , slaveSyncDone :: Event.Event
73                  , slaveRevision :: MVar NodeRevision
74                  , slaveRequests :: MVar SlaveRequests
75                  , slaveLastRequestID :: MVar RequestID
76                  , slaveRepThreadId :: MVar ThreadId
77                  , slaveReqThreadId :: MVar ThreadId
78                  , slaveParentThreadId :: ThreadId
79                  , slaveZmqContext :: Context
80                  , slaveZmqAddr :: String
81                  , slaveZmqSocket :: MVar (Socket Dealer)
82                  } deriving (Typeable)
83
84 -- | Memory of own Requests sent to Master.
85 type SlaveRequests = IntMap (IO (IO ()),ThreadId)
86
87 -- | One Update + Metainformation to replicate.
88 data SlaveRepItem =
89       SRIEnd
90     | SRICheckpoint Revision
91     | SRIArchive Revision
92     | SRIUpdate Revision (Maybe RequestID) (Tagged CSL.ByteString)
93
94 -- | Open a local State as Slave for a Master.
95 --
96 -- The directory for the local state files is the default one ("state/[typeOf
97 -- state]").
98 enslaveState :: (IsAcidic st, Typeable st) =>
99             String          -- ^ hostname of the Master
100          -> PortNumber      -- ^ port to connect to
101          -> st              -- ^ initial state
102          -> IO (AcidState st)
103 enslaveState address port initialState =
104     enslaveStateFrom ("state" </> show (typeOf initialState)) address port initialState
105
106 -- | Open a local State as Slave for a Master.
107 --
108 -- The directory for the local state files is the default one ("state/[typeOf
109 -- state]").
110 enslaveRedState :: (IsAcidic st, Typeable st) =>
111             String          -- ^ hostname of the Master
112          -> PortNumber      -- ^ port to connect to
113          -> st              -- ^ initial state
114          -> IO (AcidState st)
115 enslaveRedState address port initialState =
116     enslaveRedStateFrom ("state" </> show (typeOf initialState)) address port initialState
117
118 -- | Open a local State as Slave for a Master. The directory of the local state
119 -- files can be specified.
120 enslaveStateFrom :: (IsAcidic st, Typeable st) =>
121             FilePath        -- ^ location of the local state files.
122          -> String          -- ^ hostname of the Master
123          -> PortNumber      -- ^ port to connect to
124          -> st              -- ^ initial state
125          -> IO (AcidState st)
126 enslaveStateFrom = enslaveMayRedStateFrom False
127
128 -- | Open a local State as Slave for a _redundant_ Master. The directory of the local state
129 -- files can be specified.
130 enslaveRedStateFrom :: (IsAcidic st, Typeable st) =>
131             FilePath        -- ^ location of the local state files.
132          -> String          -- ^ hostname of the Master
133          -> PortNumber      -- ^ port to connect to
134          -> st              -- ^ initial state
135          -> IO (AcidState st)
136 enslaveRedStateFrom = enslaveMayRedStateFrom True
137
138 -- | Open a local State as Slave for a Master, redundant or not.
139 --   The directory of the local state files can be specified.
140 enslaveMayRedStateFrom :: (IsAcidic st, Typeable st) =>
141             Bool            -- ^ is redundant
142          -> FilePath        -- ^ location of the local state files.
143          -> String          -- ^ hostname of the Master
144          -> PortNumber      -- ^ port to connect to
145          -> st              -- ^ initial state
146          -> IO (AcidState st)
147 enslaveMayRedStateFrom isRed directory address port initialState = do
148         -- local
149         lst <- openLocalStateFrom directory initialState
150         let levs = localEvents $ downcast lst
151         lrev <- atomically $ readTVar $ logNextEntryId levs
152         rev <- newMVar lrev
153         debug $ "Opening enslaved state at revision " ++ show lrev
154         srs <- newMVar IM.empty
155         lastReqId <- newMVar 0
156         repChan <- newChan
157         syncDone <- Event.new
158         reqTid <- newEmptyMVar
159         repTid <- newEmptyMVar
160         parTid <- myThreadId
161         repFin <- newMVar IM.empty
162         lock <- newEmptyMVar
163         -- remote
164         let addr = "tcp://" ++ address ++ ":" ++ show port
165         ctx <- context
166         sock <- socket ctx Dealer
167         setReceiveHighWM (restrict (100*1000 :: Int)) sock
168         setSendHighWM (restrict (100*1000 :: Int)) sock
169         connect sock addr
170         msock <- newMVar sock
171         sendToMaster msock $ NewSlave lrev
172         let slaveState = SlaveState { slaveLocalState = lst
173                                     , slaveStateIsRed = isRed
174                                     , slaveStateLock = lock
175                                     , slaveRepFinalizers = repFin
176                                     , slaveRepChan = repChan
177                                     , slaveSyncDone = syncDone
178                                     , slaveRevision = rev
179                                     , slaveRequests = srs
180                                     , slaveLastRequestID = lastReqId
181                                     , slaveReqThreadId = reqTid
182                                     , slaveRepThreadId = repTid
183                                     , slaveParentThreadId = parTid
184                                     , slaveZmqContext = ctx
185                                     , slaveZmqAddr = addr
186                                     , slaveZmqSocket = msock
187                                     }
188         void $ forkIOWithUnmask $ slaveRequestHandler slaveState
189         void $ forkIO $ slaveReplicationHandler slaveState
190         return $ slaveToAcidState slaveState
191
192 -- | Replication handler of the Slave.
193 slaveRequestHandler :: (IsAcidic st, Typeable st) => SlaveState st -> (IO () -> IO ()) -> IO ()
194 slaveRequestHandler slaveState@SlaveState{..} unmask = do
195     mtid <- myThreadId
196     putMVar slaveReqThreadId mtid
197     let loop = handle (\e -> throwTo slaveParentThreadId (e :: SomeException)) $
198             unmask $ handle killHandler $ do
199             --waitRead =<< readMVar slaveZmqSocket
200             -- FIXME: we needn't poll if not for strange zmq behaviour
201             re <- withMVar slaveZmqSocket $ \sock -> poll 100 [Sock sock [In] Nothing]
202             unless (null $ head re) $ do
203                 msg <- withMVar slaveZmqSocket receive
204                 case decode msg of
205                     Left str -> error $ "Data.Serialize.decode failed on MasterMessage: " ++ show str
206                     Right mmsg -> do
207                          debug $ "Received: " ++ show mmsg
208                          case mmsg of
209                             -- We are sent an Update to replicate.
210                             DoRep r i d -> queueRepItem slaveState (SRIUpdate r i d)
211                             -- We are sent a Checkpoint for synchronization.
212                             DoSyncCheckpoint r d -> replicateSyncCp slaveState r d
213                             -- We are sent an Update to replicate for synchronization.
214                             DoSyncRep r d -> replicateSyncUpdate slaveState r d
215                             -- Master done sending all synchronization Updates.
216                             SyncDone c -> onSyncDone slaveState c
217                             -- We are sent a Checkpoint request.
218                             DoCheckpoint r -> queueRepItem slaveState (SRICheckpoint r)
219                             -- We are sent an Archive request.
220                             DoArchive r -> queueRepItem slaveState (SRIArchive r)
221                             -- Full replication of a revision
222                             FullRep r -> modifyMVar_ slaveRepFinalizers $ \rf -> do
223                                             rf IM.! r
224                                             return $ IM.delete r rf
225                             -- Full replication of events up to revision
226                             FullRepTo r -> modifyMVar_ slaveRepFinalizers $ \rf -> do
227                                             let (ef, nrf) = IM.partitionWithKey (\k _ -> k <= r) rf
228                                             sequence_ (IM.elems ef)
229                                             return nrf
230                             -- We are allowed to Quit.
231                             MayQuit -> writeChan slaveRepChan SRIEnd
232                             -- We are requested to Quit - shall be handled by
233                             -- 'bracket' usage by user.
234                             MasterQuit -> throwTo slaveParentThreadId $
235                                 ErrorCall "Data.Acid.Centered.Slave: Master quit."
236                             -- no other messages possible, enforced by type checker
237             loop
238     loop
239     where
240         killHandler :: AcidException -> IO ()
241         killHandler GracefulExit = return ()
242
243 -- | After sync check CRC
244 onSyncDone :: (IsAcidic st, Typeable st) => SlaveState st -> Crc -> IO ()
245 onSyncDone SlaveState{..} crc = do
246     localCrc <- crcOfState slaveLocalState
247     if crc /= localCrc then
248         error "Data.Acid.Centered.Slave: CRC mismatch after sync."
249     else do
250         debug "Sync Done, CRC fine."
251         Event.set slaveSyncDone
252
253 -- | Queue Updates into Chan for replication.
254 -- We use the Chan so Sync-Updates and normal ones can be interleaved.
255 queueRepItem :: SlaveState st -> SlaveRepItem -> IO ()
256 queueRepItem SlaveState{..} repItem = do
257         debug "Queuing RepItem."
258         writeChan slaveRepChan repItem
259
260 -- | Replicates content of Chan.
261 slaveReplicationHandler :: Typeable st => SlaveState st -> IO ()
262 slaveReplicationHandler slaveState@SlaveState{..} = do
263         mtid <- myThreadId
264         putMVar slaveRepThreadId mtid
265         -- todo: timeout is magic variable, make customizable?
266         noTimeout <- Event.waitTimeout slaveSyncDone $ 10*1000*1000
267         unless noTimeout $ throwTo slaveParentThreadId $ ErrorCall "Data.Acid.Centered.Slave: Took too long to sync. Timeout."
268         let loop = handle (\e -> throwTo slaveParentThreadId (e :: SomeException)) $ do
269                 mayRepItem <- readChan slaveRepChan
270                 case mayRepItem of
271                     SRIEnd -> return ()
272                     SRICheckpoint r -> do
273                         repCheckpoint slaveState r
274                         loop
275                     SRIArchive r -> do
276                         repArchive slaveState r
277                         loop
278                     SRIUpdate r i d -> do
279                         replicateUpdate slaveState r i d False
280                         loop
281         loop
282         -- signal that we're done
283         void $ takeMVar slaveRepThreadId
284
285 -- | Replicate Sync-Checkpoints directly.
286 replicateSyncCp :: (IsAcidic st, Typeable st) =>
287         SlaveState st -> Revision -> CSL.ByteString -> IO ()
288 replicateSyncCp SlaveState{..} rev encoded = do
289     st <- decodeCheckpoint encoded
290     let lst = downcast slaveLocalState
291     let core = localCore lst
292     modifyMVar_ slaveRevision $ \sr -> do
293         when (sr > rev) $ error "Data.Acid.Centered.Slave: Revision mismatch for checkpoint: Slave is newer."
294         -- todo: check
295         modifyCoreState_ core $ \_ -> do
296             writeIORef (localCopy lst) st
297             createCpFake lst encoded rev
298             adjustEventLogId lst rev
299             return st
300         return rev
301     where
302         adjustEventLogId l r = do
303             atomically $ writeTVar (logNextEntryId (localEvents l)) r
304             void $ cutFileLog (localEvents l)
305         createCpFake l e r = do
306             mvar <- newEmptyMVar
307             pushAction (localEvents l) $
308                 pushEntry (localCheckpoints l) (Checkpoint r e) (putMVar mvar ())
309             takeMVar mvar
310         decodeCheckpoint e =
311             case runGetLazy safeGet e of
312                 Left msg  -> error $ "Data.Acid.Centered.Slave: Checkpoint could not be decoded: " ++ msg
313                 Right val -> return val
314
315 -- | Replicate Sync-Updates directly.
316 replicateSyncUpdate :: Typeable st => SlaveState st -> Revision -> Tagged CSL.ByteString -> IO ()
317 replicateSyncUpdate slaveState rev event = replicateUpdate slaveState rev Nothing event True
318
319 -- | Replicate an Update as requested by Master.
320 --   Updates that were requested by this Slave are run locally and the result
321 --   put into the MVar in SlaveRequests.
322 --   Other Updates are just replicated without using the result.
323 replicateUpdate :: Typeable st => SlaveState st -> Revision -> Maybe RequestID -> Tagged CSL.ByteString -> Bool -> IO ()
324 replicateUpdate SlaveState{..} rev reqId event syncing = do
325         debug $ "Got an Update to replicate " ++ show rev
326         modifyMVar_ slaveRevision $ \nr -> if rev - 1 == nr
327             then do
328                 -- commit / run it locally
329                 case reqId of
330                     Nothing -> if slaveStateIsRed
331                         then do
332                             act <- liftM snd $ scheduleLocalColdUpdate' (downcast slaveLocalState) event
333                             modifyMVar_ slaveRepFinalizers $ return . IM.insert rev act
334                         else
335                             void $ scheduleColdUpdate slaveLocalState event
336                     Just rid -> do
337                         act <- modifyMVar slaveRequests $ \srs -> do
338                             debug $ "This is the Update for Request " ++ show rid
339                             let (icallback, timeoutId) = fromMaybe (error $ "Data.Acid.Centered.Slave: Callback not found: " ++ show rid) (IM.lookup rid srs)
340                             callback <- icallback
341                             killThread timeoutId
342                             let nsrs = IM.delete rid srs
343                             return (nsrs, callback)
344                         when slaveStateIsRed $
345                             modifyMVar_ slaveRepFinalizers $ return . IM.insert rev act
346                 -- send reply: we're done
347                 unless syncing $ sendToMaster slaveZmqSocket $ RepDone rev
348                 return rev
349             else do
350                 sendToMaster slaveZmqSocket RepError
351                 void $ error $ "Data.Acid.Centered.Slave: Replication failed at revision " ++ show nr ++ " -> " ++ show rev
352                 return nr
353
354 repCheckpoint :: SlaveState st -> Revision -> IO ()
355 repCheckpoint SlaveState{..} rev = do
356     debug $ "Got Checkpoint request at revision: " ++ show rev
357     withMVar slaveRevision $ \_ ->
358         -- create checkpoint
359         createCheckpoint slaveLocalState
360
361 repArchive :: SlaveState st -> Revision -> IO ()
362 repArchive SlaveState{..} rev = do
363     debug $ "Got Archive request at revision: " ++ show rev
364     withMVar slaveRevision $ \_ ->
365         createArchive slaveLocalState
366
367
368 -- | Update on slave site.
369 --      The steps are:
370 --      - Request Update from Master
371 --      - Master issues Update with same RequestID
372 --      - repHandler replicates and puts result in MVar
373 scheduleSlaveUpdate :: (UpdateEvent e, Typeable (EventState e)) => SlaveState (EventState e) -> e -> IO (MVar (EventResult e))
374 scheduleSlaveUpdate slaveState@SlaveState{..} event = do
375     unlocked <- isEmptyMVar slaveStateLock
376     if not unlocked then error "State is locked."
377     else do
378         debug "Update by Slave."
379         result <- newEmptyMVar
380         -- slaveLastRequestID is only modified here - and used for locking the state
381         reqId <- modifyMVar slaveLastRequestID $ \x -> return (x+1,x+1)
382         modifyMVar_ slaveRequests $ \srs -> do
383             let encoded = runPutLazy (safePut event)
384             sendToMaster slaveZmqSocket $ ReqUpdate reqId (methodTag event, encoded)
385             timeoutID <- forkIO $ timeoutRequest slaveState reqId result
386             let callback = if slaveStateIsRed
387                     then scheduleLocalUpdate' (downcast slaveLocalState) event result
388                     else do
389                         hd <- scheduleUpdate slaveLocalState event
390                         void $ forkIO $ putMVar result =<< takeMVar hd
391                         return (return ())      -- bogus finalizer
392             return $ IM.insert reqId (callback, timeoutID) srs
393         return result
394
395 -- | Ensures requests are actually answered or fail.
396 --   On timeout the Slave dies, not the thread that invoked the Update.
397 timeoutRequest :: SlaveState st -> RequestID -> MVar m -> IO ()
398 timeoutRequest SlaveState{..} reqId mvar = do
399     threadDelay $ 5*1000*1000
400     stillThere <- withMVar slaveRequests (return . IM.member reqId)
401     when stillThere $ do
402         putMVar mvar $ error "Data.Acid.Centered.Slave: Update-Request timed out."
403         throwTo slaveParentThreadId $ ErrorCall "Data.Acid.Centered.Slave: Update-Request timed out."
404
405 -- | Send a message to Master.
406 sendToMaster :: MVar (Socket Dealer) -> SlaveMessage -> IO ()
407 sendToMaster msock smsg = withMVar msock $ \sock -> send sock [] (encode smsg)
408
409 -- | Close an enslaved State.
410 liberateState :: SlaveState st -> IO ()
411 liberateState SlaveState{..} =
412     -- lock state against updates: disallow requests
413     whenM (tryPutMVar slaveStateLock ()) $ do
414         debug "Closing Slave state..."
415         -- check / wait unprocessed requests
416         debug "Waiting for Requests to finish."
417         waitPoll 100 (withMVar slaveRequests (return . IM.null))
418         -- send master quit message
419         sendToMaster slaveZmqSocket SlaveQuit
420         -- wait replication chan, only if sync done
421         syncDone <- Event.isSet slaveSyncDone
422         when syncDone $ do
423             debug "Waiting for repChan to empty."
424             mtid <- myThreadId
425             putMVar slaveRepThreadId mtid
426         -- kill handler threads
427         debug "Killing request handler."
428         withMVar slaveReqThreadId $ flip throwTo GracefulExit
429         -- cleanup zmq
430         debug "Closing down zmq."
431         withMVar slaveZmqSocket $ \s -> do
432             -- avoid the socket hanging around
433             setLinger (restrict (1000 :: Int)) s
434             disconnect s slaveZmqAddr
435             close s
436         term slaveZmqContext
437         -- cleanup local state
438         debug "Closing local state."
439         closeAcidState slaveLocalState
440
441 slaveToAcidState :: (IsAcidic st, Typeable st)  => SlaveState st -> AcidState st
442 slaveToAcidState slaveState
443   = AcidState { _scheduleUpdate    = scheduleSlaveUpdate slaveState
444               , scheduleColdUpdate = undefined
445               , _query             = query $ slaveLocalState slaveState
446               , queryCold          = queryCold $ slaveLocalState slaveState
447               , createCheckpoint   = createCheckpoint $ slaveLocalState slaveState
448               , createArchive      = createArchive $ slaveLocalState slaveState
449               , closeAcidState     = liberateState slaveState
450               , acidSubState       = mkAnyState slaveState
451               }