a81791c5ac2a8ff148096c26624c414938ed7fbf
[acid-state-dist.git] / src / Data / Acid / Centered / Slave.hs
1 {-# LANGUAGE DeriveDataTypeable, RecordWildCards, FlexibleContexts #-}
2 --------------------------------------------------------------------------------
3 {- |
4   Module      :  Data.Acid.CenteredSlave.hs
5   Copyright   :  ?
6
7   Maintainer  :  max.voit+hdv@with-eyes.net
8   Portability :  non-portable (uses GHC extensions)
9
10   The Slave part of a the Centered replication backend for acid state.
11
12 -}
13
14 --------------------------------------------------------------------------------
15 -- SLAVE part
16
17 module Data.Acid.Centered.Slave
18     (
19       enslaveState
20     , enslaveStateFrom
21     , enslaveRedState
22     , enslaveRedStateFrom
23     , SlaveState(..)
24     )  where
25
26 import Data.Typeable
27 import Data.SafeCopy
28 import Data.Serialize (decode, encode, runPutLazy, runGetLazy)
29
30 import Data.Acid
31 import Data.Acid.Core
32 import Data.Acid.Abstract
33 import Data.Acid.Local
34 import Data.Acid.Log
35
36 import Data.Acid.Centered.Common
37
38 import Control.Concurrent (forkIO, ThreadId, myThreadId, killThread, threadDelay, forkIOWithUnmask)
39 import Control.Concurrent.MVar (MVar, newMVar, newEmptyMVar, isEmptyMVar,
40                                 withMVar, modifyMVar, modifyMVar_,
41                                 takeMVar, putMVar, tryPutMVar)
42 import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
43 import Control.Concurrent.STM.TVar (readTVar, writeTVar)
44 import Data.IORef (writeIORef)
45 import qualified Control.Concurrent.Event as Event
46
47 import Control.Monad.STM (atomically)
48 import Control.Monad (void, when, unless)
49 import Control.Exception (handle, throwTo, SomeException, ErrorCall(..))
50
51 import System.ZMQ4 (Context, Socket, Dealer(..),
52                     setReceiveHighWM, setSendHighWM, setLinger, restrict,
53                     poll, Poll(..), Event(..),
54                     context, term, socket, close,
55                     connect, disconnect, send, receive)
56 import System.FilePath ( (</>) )
57
58 import Data.ByteString.Lazy.Char8 (ByteString)
59 import Data.Maybe (fromMaybe)
60
61 import           Data.IntMap (IntMap)
62 import qualified Data.IntMap as IM
63
64 --------------------------------------------------------------------------------
65
66 -- | Slave state structure, for internal use.
67 data SlaveState st
68     = SlaveState { slaveLocalState :: AcidState st
69                  , slaveStateIsRed :: Bool
70                  , slaveStateLock :: MVar ()
71                  , slaveRepFinalizers :: MVar (IntMap (IO ()))
72                  , slaveRepChan :: Chan SlaveRepItem
73                  , slaveSyncDone :: Event.Event
74                  , slaveRevision :: MVar NodeRevision
75                  , slaveRequests :: MVar SlaveRequests
76                  , slaveLastRequestID :: MVar RequestID
77                  , slaveRepThreadId :: MVar ThreadId
78                  , slaveReqThreadId :: MVar ThreadId
79                  , slaveParentThreadId :: ThreadId
80                  , slaveZmqContext :: Context
81                  , slaveZmqAddr :: String
82                  , slaveZmqSocket :: MVar (Socket Dealer)
83                  } deriving (Typeable)
84
85 -- | Memory of own Requests sent to Master.
86 type SlaveRequests = IntMap (IO (IO ()),ThreadId)
87
88 -- | One Update + Metainformation to replicate.
89 data SlaveRepItem =
90       SRIEnd
91     | SRICheckpoint Revision
92     | SRIArchive Revision
93     | SRIUpdate Revision (Maybe RequestID) (Tagged ByteString)
94
95 -- | Open a local State as Slave for a Master.
96 --
97 -- The directory for the local state files is the default one ("state/[typeOf
98 -- state]").
99 enslaveState :: (IsAcidic st, Typeable st) =>
100             String          -- ^ hostname of the Master
101          -> PortNumber      -- ^ port to connect to
102          -> st              -- ^ initial state
103          -> IO (AcidState st)
104 enslaveState address port initialState =
105     enslaveStateFrom ("state" </> show (typeOf initialState)) address port initialState
106
107 -- | Open a local State as Slave for a Master.
108 --
109 -- The directory for the local state files is the default one ("state/[typeOf
110 -- state]").
111 enslaveRedState :: (IsAcidic st, Typeable st) =>
112             String          -- ^ hostname of the Master
113          -> PortNumber      -- ^ port to connect to
114          -> st              -- ^ initial state
115          -> IO (AcidState st)
116 enslaveRedState address port initialState =
117     enslaveRedStateFrom ("state" </> show (typeOf initialState)) address port initialState
118
119 -- | Open a local State as Slave for a Master. The directory of the local state
120 -- files can be specified.
121 enslaveStateFrom :: (IsAcidic st, Typeable st) =>
122             FilePath        -- ^ location of the local state files.
123          -> String          -- ^ hostname of the Master
124          -> PortNumber      -- ^ port to connect to
125          -> st              -- ^ initial state
126          -> IO (AcidState st)
127 enslaveStateFrom = enslaveMayRedStateFrom False
128
129 -- | Open a local State as Slave for a _redundant_ Master. The directory of the local state
130 -- files can be specified.
131 enslaveRedStateFrom :: (IsAcidic st, Typeable st) =>
132             FilePath        -- ^ location of the local state files.
133          -> String          -- ^ hostname of the Master
134          -> PortNumber      -- ^ port to connect to
135          -> st              -- ^ initial state
136          -> IO (AcidState st)
137 enslaveRedStateFrom = enslaveMayRedStateFrom True
138
139 -- | Open a local State as Slave for a Master, redundant or not.
140 --   The directory of the local state files can be specified.
141 enslaveMayRedStateFrom :: (IsAcidic st, Typeable st) =>
142             Bool            -- ^ is redundant
143          -> FilePath        -- ^ location of the local state files.
144          -> String          -- ^ hostname of the Master
145          -> PortNumber      -- ^ port to connect to
146          -> st              -- ^ initial state
147          -> IO (AcidState st)
148 enslaveMayRedStateFrom isRed directory address port initialState = do
149         -- local
150         lst <- openLocalStateFrom directory initialState
151         let levs = localEvents $ downcast lst
152         lrev <- atomically $ readTVar $ logNextEntryId levs
153         rev <- newMVar lrev
154         debug $ "Opening enslaved state at revision " ++ show lrev
155         srs <- newMVar IM.empty
156         lastReqId <- newMVar 0
157         repChan <- newChan
158         syncDone <- Event.new
159         reqTid <- newEmptyMVar
160         repTid <- newEmptyMVar
161         parTid <- myThreadId
162         repFin <- newMVar IM.empty
163         lock <- newEmptyMVar
164         -- remote
165         let addr = "tcp://" ++ address ++ ":" ++ show port
166         ctx <- context
167         sock <- socket ctx Dealer
168         setReceiveHighWM (restrict (100*1000 :: Int)) sock
169         setSendHighWM (restrict (100*1000 :: Int)) sock
170         connect sock addr
171         msock <- newMVar sock
172         sendToMaster msock $ NewSlave lrev
173         let slaveState = SlaveState { slaveLocalState = lst
174                                     , slaveStateIsRed = isRed
175                                     , slaveStateLock = lock
176                                     , slaveRepFinalizers = repFin
177                                     , slaveRepChan = repChan
178                                     , slaveSyncDone = syncDone
179                                     , slaveRevision = rev
180                                     , slaveRequests = srs
181                                     , slaveLastRequestID = lastReqId
182                                     , slaveReqThreadId = reqTid
183                                     , slaveRepThreadId = repTid
184                                     , slaveParentThreadId = parTid
185                                     , slaveZmqContext = ctx
186                                     , slaveZmqAddr = addr
187                                     , slaveZmqSocket = msock
188                                     }
189         void $ forkIOWithUnmask $ slaveRequestHandler slaveState
190         void $ forkIO $ slaveReplicationHandler slaveState
191         return $ slaveToAcidState slaveState
192
193 -- | Replication handler of the Slave.
194 slaveRequestHandler :: (IsAcidic st, Typeable st) => SlaveState st -> (IO () -> IO ()) -> IO ()
195 slaveRequestHandler slaveState@SlaveState{..} unmask = do
196     mtid <- myThreadId
197     putMVar slaveReqThreadId mtid
198     let loop = handle (\e -> throwTo slaveParentThreadId (e :: SomeException)) $
199             unmask $ handle killHandler $ do
200             --waitRead =<< readMVar slaveZmqSocket
201             -- FIXME: we needn't poll if not for strange zmq behaviour
202             re <- withMVar slaveZmqSocket $ \sock -> poll 100 [Sock sock [In] Nothing]
203             unless (null $ head re) $ do
204                 msg <- withMVar slaveZmqSocket receive
205                 case decode msg of
206                     Left str -> error $ "Data.Serialize.decode failed on MasterMessage: " ++ show str
207                     Right mmsg -> do
208                          debug $ "Received: " ++ show mmsg
209                          case mmsg of
210                             -- We are sent an Update to replicate.
211                             DoRep r i d -> queueRepItem slaveState (SRIUpdate r i d)
212                             -- We are sent a Checkpoint for synchronization.
213                             DoSyncCheckpoint r d -> replicateSyncCp slaveState r d
214                             -- We are sent an Update to replicate for synchronization.
215                             DoSyncRep r d -> replicateSyncUpdate slaveState r d
216                             -- Master done sending all synchronization Updates.
217                             SyncDone c -> onSyncDone slaveState c
218                             -- We are sent a Checkpoint request.
219                             DoCheckpoint r -> queueRepItem slaveState (SRICheckpoint r)
220                             -- We are sent an Archive request.
221                             DoArchive r -> queueRepItem slaveState (SRIArchive r)
222                             -- Full replication of a revision
223                             FullRep r -> modifyMVar_ slaveRepFinalizers $ \rf -> do
224                                             rf IM.! r
225                                             return $ IM.delete r rf
226                             -- Full replication of events up to revision
227                             FullRepTo r -> modifyMVar_ slaveRepFinalizers $ \rf -> do
228                                             let (ef, nrf) = IM.partitionWithKey (\k _ -> k <= r) rf
229                                             sequence_ (IM.elems ef)
230                                             return nrf
231                             -- We are allowed to Quit.
232                             MayQuit -> writeChan slaveRepChan SRIEnd
233                             -- We are requested to Quit - shall be handled by
234                             -- 'bracket' usage by user.
235                             MasterQuit -> throwTo slaveParentThreadId $
236                                 ErrorCall "Data.Acid.Centered.Slave: Master quit."
237                             -- no other messages possible, enforced by type checker
238             loop
239     loop
240     where
241         killHandler :: AcidException -> IO ()
242         killHandler GracefulExit = return ()
243
244 -- | After sync check CRC
245 onSyncDone :: (IsAcidic st, Typeable st) => SlaveState st -> Crc -> IO ()
246 onSyncDone SlaveState{..} crc = do
247     localCrc <- crcOfState slaveLocalState
248     if crc /= localCrc then
249         error "Data.Acid.Centered.Slave: CRC mismatch after sync."
250     else do
251         debug "Sync Done, CRC fine."
252         Event.set slaveSyncDone
253
254 -- | Queue Updates into Chan for replication.
255 -- We use the Chan so Sync-Updates and normal ones can be interleaved.
256 queueRepItem :: SlaveState st -> SlaveRepItem -> IO ()
257 queueRepItem SlaveState{..} repItem = do
258         debug "Queuing RepItem."
259         writeChan slaveRepChan repItem
260
261 -- | Replicates content of Chan.
262 slaveReplicationHandler :: Typeable st => SlaveState st -> IO ()
263 slaveReplicationHandler slaveState@SlaveState{..} = do
264         mtid <- myThreadId
265         putMVar slaveRepThreadId mtid
266         -- todo: timeout is magic variable, make customizable?
267         noTimeout <- Event.waitTimeout slaveSyncDone $ 10*1000*1000
268         unless noTimeout $ throwTo slaveParentThreadId $ ErrorCall "Data.Acid.Centered.Slave: Took too long to sync. Timeout."
269         let loop = handle (\e -> throwTo slaveParentThreadId (e :: SomeException)) $ do
270                 mayRepItem <- readChan slaveRepChan
271                 case mayRepItem of
272                     SRIEnd -> return ()
273                     SRICheckpoint r -> do
274                         repCheckpoint slaveState r
275                         loop
276                     SRIArchive r -> do
277                         repArchive slaveState r
278                         loop
279                     SRIUpdate r i d -> do
280                         replicateUpdate slaveState r i d False
281                         loop
282         loop
283         -- signal that we're done
284         void $ takeMVar slaveRepThreadId
285
286 -- | Replicate Sync-Checkpoints directly.
287 replicateSyncCp :: (IsAcidic st, Typeable st) =>
288         SlaveState st -> Revision -> ByteString -> IO ()
289 replicateSyncCp SlaveState{..} rev encoded = do
290     st <- decodeCheckpoint encoded
291     let lst = downcast slaveLocalState
292     let core = localCore lst
293     modifyMVar_ slaveRevision $ \sr -> do
294         when (sr > rev) $ error "Data.Acid.Centered.Slave: Revision mismatch for checkpoint: Slave is newer."
295         modifyCoreState_ core $ \_ -> do
296             writeIORef (localCopy lst) st
297             createCpFake lst encoded rev
298             adjustEventLogId lst rev
299             return st
300         return rev
301     where
302         adjustEventLogId l r = do
303             atomically $ writeTVar (logNextEntryId (localEvents l)) r
304             void $ cutFileLog (localEvents l)
305         createCpFake l e r = do
306             mvar <- newEmptyMVar
307             pushAction (localEvents l) $
308                 pushEntry (localCheckpoints l) (Checkpoint r e) (putMVar mvar ())
309             takeMVar mvar
310         decodeCheckpoint e =
311             case runGetLazy safeGet e of
312                 Left msg  -> error $ "Data.Acid.Centered.Slave: Checkpoint could not be decoded: " ++ msg
313                 Right val -> return val
314
315 -- | Replicate Sync-Updates directly.
316 replicateSyncUpdate :: Typeable st => SlaveState st -> Revision -> Tagged ByteString -> IO ()
317 replicateSyncUpdate slaveState rev event = replicateUpdate slaveState rev Nothing event True
318
319 -- | Replicate an Update as requested by Master.
320 --   Updates that were requested by this Slave are run locally and the result
321 --   put into the MVar in SlaveRequests.
322 --   Other Updates are just replicated without using the result.
323 replicateUpdate :: Typeable st => SlaveState st -> Revision -> Maybe RequestID -> Tagged ByteString -> Bool -> IO ()
324 replicateUpdate SlaveState{..} rev reqId event syncing = do
325         debug $ "Got an Update to replicate " ++ show rev
326         modifyMVar_ slaveRevision $ \nr -> if rev - 1 == nr
327             then do
328                 -- commit / run it locally
329                 case reqId of
330                     Nothing -> if slaveStateIsRed
331                         then do
332                             act <- newEmptyMVar >>= scheduleLocalColdUpdate' (downcast slaveLocalState) event 
333                             modifyMVar_ slaveRepFinalizers $ return . IM.insert rev act
334                         else
335                             void $ scheduleColdUpdate slaveLocalState event
336                     Just rid -> do
337                         act <- modifyMVar slaveRequests $ \srs -> do
338                             debug $ "This is the Update for Request " ++ show rid
339                             let (icallback, timeoutId) = fromMaybe (error $ "Data.Acid.Centered.Slave: Callback not found: " ++ show rid) (IM.lookup rid srs)
340                             callback <- icallback
341                             killThread timeoutId
342                             let nsrs = IM.delete rid srs
343                             return (nsrs, callback)
344                         when slaveStateIsRed $
345                             modifyMVar_ slaveRepFinalizers $ return . IM.insert rev act
346                 -- send reply: we're done
347                 unless syncing $ sendToMaster slaveZmqSocket $ RepDone rev
348                 return rev
349             else do
350                 sendToMaster slaveZmqSocket RepError
351                 void $ error $ "Data.Acid.Centered.Slave: Replication failed at revision " ++ show nr ++ " -> " ++ show rev
352                 return nr
353
354 repCheckpoint :: SlaveState st -> Revision -> IO ()
355 repCheckpoint SlaveState{..} rev = do
356     debug $ "Got Checkpoint request at revision: " ++ show rev
357     withMVar slaveRevision $ \_ ->
358         -- create checkpoint
359         createCheckpoint slaveLocalState
360
361 repArchive :: SlaveState st -> Revision -> IO ()
362 repArchive SlaveState{..} rev = do
363     debug $ "Got Archive request at revision: " ++ show rev
364     withMVar slaveRevision $ \_ ->
365         createArchive slaveLocalState
366
367
368 -- | Update on slave site.
369 --      The steps are:
370 --      - Request Update from Master
371 --      - Master issues Update with same RequestID
372 --      - repHandler replicates and puts result in MVar
373 scheduleSlaveUpdate :: (UpdateEvent e, Typeable (EventState e)) => SlaveState (EventState e) -> e -> IO (MVar (EventResult e))
374 scheduleSlaveUpdate slaveState@SlaveState{..} event = do
375     unlocked <- isEmptyMVar slaveStateLock
376     if not unlocked then error "State is locked."
377     else do
378         debug "Update by Slave."
379         result <- newEmptyMVar
380         -- slaveLastRequestID is only modified here - and used for locking the state
381         reqId <- modifyMVar slaveLastRequestID $ \x -> return (x+1,x+1)
382         modifyMVar_ slaveRequests $ \srs -> do
383             let encoded = runPutLazy (safePut event)
384             sendToMaster slaveZmqSocket $ ReqUpdate reqId (methodTag event, encoded)
385             timeoutID <- forkIO $ timeoutRequest slaveState reqId result
386             let callback = if slaveStateIsRed
387                     then scheduleLocalUpdate' (downcast slaveLocalState) event result
388                     else do
389                         hd <- scheduleUpdate slaveLocalState event
390                         void $ forkIO $ putMVar result =<< takeMVar hd
391                         return (return ())      -- bogus finalizer
392             return $ IM.insert reqId (callback, timeoutID) srs
393         return result
394
395 -- | Cold Update on slave site. This enables for using Remote.
396 scheduleSlaveColdUpdate :: Typeable st => SlaveState st -> Tagged ByteString -> IO (MVar ByteString)
397 scheduleSlaveColdUpdate slaveState@SlaveState{..} encoded = do
398     unlocked <- isEmptyMVar slaveStateLock
399     if not unlocked then error "State is locked."
400     else do
401         debug "Cold Update by Slave."
402         result <- newEmptyMVar
403         -- slaveLastRequestID is only modified here - and used for locking the state
404         reqId <- modifyMVar slaveLastRequestID $ \x -> return (x+1,x+1)
405         modifyMVar_ slaveRequests $ \srs -> do
406             sendToMaster slaveZmqSocket $ ReqUpdate reqId encoded
407             timeoutID <- forkIO $ timeoutRequest slaveState reqId result
408             let callback = if slaveStateIsRed
409                     then scheduleLocalColdUpdate' (downcast slaveLocalState) encoded result
410                     else do
411                         hd <- scheduleColdUpdate slaveLocalState encoded
412                         void $ forkIO $ putMVar result =<< takeMVar hd
413                         return (return ())      -- bogus finalizer
414             return $ IM.insert reqId (callback, timeoutID) srs
415         return result
416
417 -- | Ensures requests are actually answered or fail.
418 --   On timeout the Slave dies, not the thread that invoked the Update.
419 timeoutRequest :: SlaveState st -> RequestID -> MVar m -> IO ()
420 timeoutRequest SlaveState{..} reqId mvar = do
421     threadDelay $ 5*1000*1000
422     stillThere <- withMVar slaveRequests (return . IM.member reqId)
423     when stillThere $ do
424         putMVar mvar $ error "Data.Acid.Centered.Slave: Update-Request timed out."
425         throwTo slaveParentThreadId $ ErrorCall "Data.Acid.Centered.Slave: Update-Request timed out."
426
427 -- | Send a message to Master.
428 sendToMaster :: MVar (Socket Dealer) -> SlaveMessage -> IO ()
429 sendToMaster msock smsg = withMVar msock $ \sock -> send sock [] (encode smsg)
430
431 -- | Close an enslaved State.
432 liberateState :: SlaveState st -> IO ()
433 liberateState SlaveState{..} =
434     -- lock state against updates: disallow requests
435     whenM (tryPutMVar slaveStateLock ()) $ do
436         debug "Closing Slave state..."
437         -- check / wait unprocessed requests
438         debug "Waiting for Requests to finish."
439         waitPoll 100 (withMVar slaveRequests (return . IM.null))
440         -- send master quit message
441         sendToMaster slaveZmqSocket SlaveQuit
442         -- wait replication chan, only if sync done
443         syncDone <- Event.isSet slaveSyncDone
444         when syncDone $ do
445             debug "Waiting for repChan to empty."
446             mtid <- myThreadId
447             putMVar slaveRepThreadId mtid
448         -- kill handler threads
449         debug "Killing request handler."
450         withMVar slaveReqThreadId $ flip throwTo GracefulExit
451         -- cleanup zmq
452         debug "Closing down zmq."
453         withMVar slaveZmqSocket $ \s -> do
454             -- avoid the socket hanging around
455             setLinger (restrict (1000 :: Int)) s
456             disconnect s slaveZmqAddr
457             close s
458         term slaveZmqContext
459         -- cleanup local state
460         debug "Closing local state."
461         closeAcidState slaveLocalState
462
463 slaveToAcidState :: (IsAcidic st, Typeable st)  => SlaveState st -> AcidState st
464 slaveToAcidState slaveState
465   = AcidState { _scheduleUpdate    = scheduleSlaveUpdate slaveState
466               , scheduleColdUpdate = scheduleSlaveColdUpdate slaveState
467               , _query             = query $ slaveLocalState slaveState
468               , queryCold          = queryCold $ slaveLocalState slaveState
469               , createCheckpoint   = createCheckpoint $ slaveLocalState slaveState
470               , createArchive      = createArchive $ slaveLocalState slaveState
471               , closeAcidState     = liberateState slaveState
472               , acidSubState       = mkAnyState slaveState
473               }