bc8e4a798baff25b05232b457e9ced8f6ec95075
[acid-state-dist.git] / src / Data / Acid / Centered / Slave.hs
1 {-# LANGUAGE DeriveDataTypeable, RecordWildCards, FlexibleContexts #-}
2 --------------------------------------------------------------------------------
3 {- |
4   Module      :  Data.Acid.CenteredSlave.hs
5   Copyright   :  ?
6
7   Maintainer  :  max.voit+hdv@with-eyes.net
8   Portability :  non-portable (uses GHC extensions)
9
10   The Slave part of a the Centered replication backend for acid state.
11
12 -}
13
14 --------------------------------------------------------------------------------
15 -- SLAVE part
16
17 module Data.Acid.Centered.Slave
18     (
19       enslaveState
20     , enslaveStateFrom
21     , enslaveRedState
22     , enslaveRedStateFrom
23     , SlaveState(..)
24     )  where
25
26 import Data.Typeable
27 import Data.SafeCopy
28 import Data.Serialize (decode, encode, runPutLazy, runGetLazy)
29
30 import Data.Acid
31 import Data.Acid.Core
32 import Data.Acid.Abstract
33 import Data.Acid.Local
34 import Data.Acid.Log
35
36 import Data.Acid.Centered.Common
37
38 import System.ZMQ4 (Context, Socket, Dealer(..),
39                     setReceiveHighWM, setSendHighWM, setLinger, restrict,
40                     poll, Poll(..), Event(..),
41                     context, term, socket, close,
42                     connect, disconnect, send, receive)
43 import System.FilePath ( (</>) )
44
45 import Control.Concurrent (forkIO, ThreadId, myThreadId, killThread, threadDelay, forkIOWithUnmask)
46 import Control.Concurrent.MVar (MVar, newMVar, newEmptyMVar, isEmptyMVar,
47                                 withMVar, modifyMVar, modifyMVar_,
48                                 takeMVar, putMVar, tryPutMVar)
49 import Data.IORef (writeIORef)
50 import Control.Monad (void, when, unless)
51 import Control.Monad.STM (atomically)
52 import Control.Concurrent.STM.TVar (readTVar, writeTVar)
53 import qualified Control.Concurrent.Event as Event
54 import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
55 import Control.Exception (handle, throwTo, SomeException, ErrorCall(..))
56
57 import Data.IntMap (IntMap)
58 import qualified Data.IntMap as IM
59 import Data.Maybe (fromMaybe)
60
61 import qualified Data.ByteString.Lazy.Char8 as CSL
62
63 --------------------------------------------------------------------------------
64
65 -- | Slave state structure, for internal use.
66 data SlaveState st
67     = SlaveState { slaveLocalState :: AcidState st
68                  , slaveStateIsRed :: Bool
69                  , slaveStateLock :: MVar ()
70                  , slaveRepFinalizers :: MVar (IntMap (IO ()))
71                  , slaveRepChan :: Chan SlaveRepItem
72                  , slaveSyncDone :: Event.Event
73                  , slaveRevision :: MVar NodeRevision
74                  , slaveRequests :: MVar SlaveRequests
75                  , slaveLastRequestID :: MVar RequestID
76                  , slaveRepThreadId :: MVar ThreadId
77                  , slaveReqThreadId :: MVar ThreadId
78                  , slaveParentThreadId :: ThreadId
79                  , slaveZmqContext :: Context
80                  , slaveZmqAddr :: String
81                  , slaveZmqSocket :: MVar (Socket Dealer)
82                  } deriving (Typeable)
83
84 -- | Memory of own Requests sent to Master.
85 type SlaveRequests = IntMap (IO (IO ()),ThreadId)
86
87 -- | One Update + Metainformation to replicate.
88 data SlaveRepItem =
89       SRIEnd
90     | SRICheckpoint Revision
91     | SRIArchive Revision
92     | SRIUpdate Revision (Maybe RequestID) (Tagged CSL.ByteString)
93
94 -- | Open a local State as Slave for a Master.
95 --
96 -- The directory for the local state files is the default one ("state/[typeOf
97 -- state]").
98 enslaveState :: (IsAcidic st, Typeable st) =>
99             String          -- ^ hostname of the Master
100          -> PortNumber      -- ^ port to connect to
101          -> st              -- ^ initial state
102          -> IO (AcidState st)
103 enslaveState address port initialState =
104     enslaveStateFrom ("state" </> show (typeOf initialState)) address port initialState
105
106 -- | Open a local State as Slave for a Master.
107 --
108 -- The directory for the local state files is the default one ("state/[typeOf
109 -- state]").
110 enslaveRedState :: (IsAcidic st, Typeable st) =>
111             String          -- ^ hostname of the Master
112          -> PortNumber      -- ^ port to connect to
113          -> st              -- ^ initial state
114          -> IO (AcidState st)
115 enslaveRedState address port initialState =
116     enslaveRedStateFrom ("state" </> show (typeOf initialState)) address port initialState
117
118 -- | Open a local State as Slave for a Master. The directory of the local state
119 -- files can be specified.
120 enslaveStateFrom :: (IsAcidic st, Typeable st) =>
121             FilePath        -- ^ location of the local state files.
122          -> String          -- ^ hostname of the Master
123          -> PortNumber      -- ^ port to connect to
124          -> st              -- ^ initial state
125          -> IO (AcidState st)
126 enslaveStateFrom = enslaveMayRedStateFrom False
127
128 -- | Open a local State as Slave for a _redundant_ Master. The directory of the local state
129 -- files can be specified.
130 enslaveRedStateFrom :: (IsAcidic st, Typeable st) =>
131             FilePath        -- ^ location of the local state files.
132          -> String          -- ^ hostname of the Master
133          -> PortNumber      -- ^ port to connect to
134          -> st              -- ^ initial state
135          -> IO (AcidState st)
136 enslaveRedStateFrom = enslaveMayRedStateFrom True
137
138 -- | Open a local State as Slave for a Master, redundant or not.
139 --   The directory of the local state files can be specified.
140 enslaveMayRedStateFrom :: (IsAcidic st, Typeable st) =>
141             Bool            -- ^ is redundant
142          -> FilePath        -- ^ location of the local state files.
143          -> String          -- ^ hostname of the Master
144          -> PortNumber      -- ^ port to connect to
145          -> st              -- ^ initial state
146          -> IO (AcidState st)
147 enslaveMayRedStateFrom isRed directory address port initialState = do
148         -- local
149         lst <- openLocalStateFrom directory initialState
150         let levs = localEvents $ downcast lst
151         lrev <- atomically $ readTVar $ logNextEntryId levs
152         rev <- newMVar lrev
153         debug $ "Opening enslaved state at revision " ++ show lrev
154         srs <- newMVar IM.empty
155         lastReqId <- newMVar 0
156         repChan <- newChan
157         syncDone <- Event.new
158         reqTid <- newEmptyMVar
159         repTid <- newEmptyMVar
160         parTid <- myThreadId
161         repFin <- newMVar IM.empty
162         lock <- newEmptyMVar
163         -- remote
164         let addr = "tcp://" ++ address ++ ":" ++ show port
165         ctx <- context
166         sock <- socket ctx Dealer
167         setReceiveHighWM (restrict (100*1000 :: Int)) sock
168         setSendHighWM (restrict (100*1000 :: Int)) sock
169         connect sock addr
170         msock <- newMVar sock
171         sendToMaster msock $ NewSlave lrev
172         let slaveState = SlaveState { slaveLocalState = lst
173                                     , slaveStateIsRed = isRed
174                                     , slaveStateLock = lock
175                                     , slaveRepFinalizers = repFin
176                                     , slaveRepChan = repChan
177                                     , slaveSyncDone = syncDone
178                                     , slaveRevision = rev
179                                     , slaveRequests = srs
180                                     , slaveLastRequestID = lastReqId
181                                     , slaveReqThreadId = reqTid
182                                     , slaveRepThreadId = repTid
183                                     , slaveParentThreadId = parTid
184                                     , slaveZmqContext = ctx
185                                     , slaveZmqAddr = addr
186                                     , slaveZmqSocket = msock
187                                     }
188         void $ forkIOWithUnmask $ slaveRequestHandler slaveState
189         void $ forkIO $ slaveReplicationHandler slaveState
190         return $ slaveToAcidState slaveState
191
192 -- | Replication handler of the Slave.
193 slaveRequestHandler :: (IsAcidic st, Typeable st) => SlaveState st -> (IO () -> IO ()) -> IO ()
194 slaveRequestHandler slaveState@SlaveState{..} unmask = do
195     mtid <- myThreadId
196     putMVar slaveReqThreadId mtid
197     let loop = handle (\e -> throwTo slaveParentThreadId (e :: SomeException)) $
198             unmask $ handle killHandler $ do
199             --waitRead =<< readMVar slaveZmqSocket
200             -- FIXME: we needn't poll if not for strange zmq behaviour
201             re <- withMVar slaveZmqSocket $ \sock -> poll 100 [Sock sock [In] Nothing]
202             unless (null $ head re) $ do
203                 msg <- withMVar slaveZmqSocket receive
204                 case decode msg of
205                     Left str -> error $ "Data.Serialize.decode failed on MasterMessage: " ++ show str
206                     Right mmsg -> do
207                          debug $ "Received: " ++ show mmsg
208                          case mmsg of
209                             -- We are sent an Update to replicate.
210                             DoRep r i d -> queueRepItem slaveState (SRIUpdate r i d)
211                             -- We are sent a Checkpoint for synchronization.
212                             DoSyncCheckpoint r d -> replicateSyncCp slaveState r d
213                             -- We are sent an Update to replicate for synchronization.
214                             DoSyncRep r d -> replicateSyncUpdate slaveState r d
215                             -- Master done sending all synchronization Updates.
216                             SyncDone c -> onSyncDone slaveState c
217                             -- We are sent a Checkpoint request.
218                             DoCheckpoint r -> queueRepItem slaveState (SRICheckpoint r)
219                             -- We are sent an Archive request.
220                             DoArchive r -> queueRepItem slaveState (SRIArchive r)
221                             -- Full replication of a revision
222                             FullRep r -> modifyMVar_ slaveRepFinalizers $ \rf -> do
223                                             rf IM.! r
224                                             return $ IM.delete r rf
225                             -- Full replication of events up to revision
226                             FullRepTo r -> modifyMVar_ slaveRepFinalizers $ \rf -> do
227                                             let (ef, nrf) = IM.partitionWithKey (\k _ -> k <= r) rf
228                                             sequence_ (IM.elems ef)
229                                             return nrf
230                             -- We are allowed to Quit.
231                             MayQuit -> writeChan slaveRepChan SRIEnd
232                             -- We are requested to Quit - shall be handled by
233                             -- 'bracket' usage by user.
234                             MasterQuit -> throwTo slaveParentThreadId $
235                                 ErrorCall "Data.Acid.Centered.Slave: Master quit."
236                             -- no other messages possible, enforced by type checker
237             loop
238     loop
239     where
240         killHandler :: AcidException -> IO ()
241         killHandler GracefulExit = return ()
242
243 -- | After sync check CRC
244 onSyncDone :: (IsAcidic st, Typeable st) => SlaveState st -> Crc -> IO ()
245 onSyncDone SlaveState{..} crc = do
246     localCrc <- crcOfState slaveLocalState
247     if crc /= localCrc then
248         error "Data.Acid.Centered.Slave: CRC mismatch after sync."
249     else do
250         debug "Sync Done, CRC fine."
251         Event.set slaveSyncDone
252
253 -- | Queue Updates into Chan for replication.
254 -- We use the Chan so Sync-Updates and normal ones can be interleaved.
255 queueRepItem :: SlaveState st -> SlaveRepItem -> IO ()
256 queueRepItem SlaveState{..} repItem = do
257         debug "Queuing RepItem."
258         writeChan slaveRepChan repItem
259
260 -- | Replicates content of Chan.
261 slaveReplicationHandler :: Typeable st => SlaveState st -> IO ()
262 slaveReplicationHandler slaveState@SlaveState{..} = do
263         mtid <- myThreadId
264         putMVar slaveRepThreadId mtid
265         -- todo: timeout is magic variable, make customizable?
266         noTimeout <- Event.waitTimeout slaveSyncDone $ 10*1000*1000
267         unless noTimeout $ throwTo slaveParentThreadId $ ErrorCall "Data.Acid.Centered.Slave: Took too long to sync. Timeout."
268         let loop = handle (\e -> throwTo slaveParentThreadId (e :: SomeException)) $ do
269                 mayRepItem <- readChan slaveRepChan
270                 case mayRepItem of
271                     SRIEnd -> return ()
272                     SRICheckpoint r -> do
273                         repCheckpoint slaveState r
274                         loop
275                     SRIArchive r -> do
276                         repArchive slaveState r
277                         loop
278                     SRIUpdate r i d -> do
279                         replicateUpdate slaveState r i d False
280                         loop
281         loop
282         -- signal that we're done
283         void $ takeMVar slaveRepThreadId
284
285 -- | Replicate Sync-Checkpoints directly.
286 replicateSyncCp :: (IsAcidic st, Typeable st) =>
287         SlaveState st -> Revision -> CSL.ByteString -> IO ()
288 replicateSyncCp SlaveState{..} rev encoded = do
289     st <- decodeCheckpoint encoded
290     let lst = downcast slaveLocalState
291     let core = localCore lst
292     modifyMVar_ slaveRevision $ \sr -> do
293         when (sr > rev) $ error "Data.Acid.Centered.Slave: Revision mismatch for checkpoint: Slave is newer."
294         modifyCoreState_ core $ \_ -> do
295             writeIORef (localCopy lst) st
296             createCpFake lst encoded rev
297             adjustEventLogId lst rev
298             return st
299         return rev
300     where
301         adjustEventLogId l r = do
302             atomically $ writeTVar (logNextEntryId (localEvents l)) r
303             void $ cutFileLog (localEvents l)
304         createCpFake l e r = do
305             mvar <- newEmptyMVar
306             pushAction (localEvents l) $
307                 pushEntry (localCheckpoints l) (Checkpoint r e) (putMVar mvar ())
308             takeMVar mvar
309         decodeCheckpoint e =
310             case runGetLazy safeGet e of
311                 Left msg  -> error $ "Data.Acid.Centered.Slave: Checkpoint could not be decoded: " ++ msg
312                 Right val -> return val
313
314 -- | Replicate Sync-Updates directly.
315 replicateSyncUpdate :: Typeable st => SlaveState st -> Revision -> Tagged CSL.ByteString -> IO ()
316 replicateSyncUpdate slaveState rev event = replicateUpdate slaveState rev Nothing event True
317
318 -- | Replicate an Update as requested by Master.
319 --   Updates that were requested by this Slave are run locally and the result
320 --   put into the MVar in SlaveRequests.
321 --   Other Updates are just replicated without using the result.
322 replicateUpdate :: Typeable st => SlaveState st -> Revision -> Maybe RequestID -> Tagged CSL.ByteString -> Bool -> IO ()
323 replicateUpdate SlaveState{..} rev reqId event syncing = do
324         debug $ "Got an Update to replicate " ++ show rev
325         modifyMVar_ slaveRevision $ \nr -> if rev - 1 == nr
326             then do
327                 -- commit / run it locally
328                 case reqId of
329                     Nothing -> if slaveStateIsRed
330                         then do
331                             act <- newEmptyMVar >>= scheduleLocalColdUpdate' (downcast slaveLocalState) event 
332                             modifyMVar_ slaveRepFinalizers $ return . IM.insert rev act
333                         else
334                             void $ scheduleColdUpdate slaveLocalState event
335                     Just rid -> do
336                         act <- modifyMVar slaveRequests $ \srs -> do
337                             debug $ "This is the Update for Request " ++ show rid
338                             let (icallback, timeoutId) = fromMaybe (error $ "Data.Acid.Centered.Slave: Callback not found: " ++ show rid) (IM.lookup rid srs)
339                             callback <- icallback
340                             killThread timeoutId
341                             let nsrs = IM.delete rid srs
342                             return (nsrs, callback)
343                         when slaveStateIsRed $
344                             modifyMVar_ slaveRepFinalizers $ return . IM.insert rev act
345                 -- send reply: we're done
346                 unless syncing $ sendToMaster slaveZmqSocket $ RepDone rev
347                 return rev
348             else do
349                 sendToMaster slaveZmqSocket RepError
350                 void $ error $ "Data.Acid.Centered.Slave: Replication failed at revision " ++ show nr ++ " -> " ++ show rev
351                 return nr
352
353 repCheckpoint :: SlaveState st -> Revision -> IO ()
354 repCheckpoint SlaveState{..} rev = do
355     debug $ "Got Checkpoint request at revision: " ++ show rev
356     withMVar slaveRevision $ \_ ->
357         -- create checkpoint
358         createCheckpoint slaveLocalState
359
360 repArchive :: SlaveState st -> Revision -> IO ()
361 repArchive SlaveState{..} rev = do
362     debug $ "Got Archive request at revision: " ++ show rev
363     withMVar slaveRevision $ \_ ->
364         createArchive slaveLocalState
365
366
367 -- | Update on slave site.
368 --      The steps are:
369 --      - Request Update from Master
370 --      - Master issues Update with same RequestID
371 --      - repHandler replicates and puts result in MVar
372 scheduleSlaveUpdate :: (UpdateEvent e, Typeable (EventState e)) => SlaveState (EventState e) -> e -> IO (MVar (EventResult e))
373 scheduleSlaveUpdate slaveState@SlaveState{..} event = do
374     unlocked <- isEmptyMVar slaveStateLock
375     if not unlocked then error "State is locked."
376     else do
377         debug "Update by Slave."
378         result <- newEmptyMVar
379         -- slaveLastRequestID is only modified here - and used for locking the state
380         reqId <- modifyMVar slaveLastRequestID $ \x -> return (x+1,x+1)
381         modifyMVar_ slaveRequests $ \srs -> do
382             let encoded = runPutLazy (safePut event)
383             sendToMaster slaveZmqSocket $ ReqUpdate reqId (methodTag event, encoded)
384             timeoutID <- forkIO $ timeoutRequest slaveState reqId result
385             let callback = if slaveStateIsRed
386                     then scheduleLocalUpdate' (downcast slaveLocalState) event result
387                     else do
388                         hd <- scheduleUpdate slaveLocalState event
389                         void $ forkIO $ putMVar result =<< takeMVar hd
390                         return (return ())      -- bogus finalizer
391             return $ IM.insert reqId (callback, timeoutID) srs
392         return result
393
394 -- | Cold Update on slave site. This enables for using Remote.
395 scheduleSlaveColdUpdate :: Typeable st => SlaveState st -> Tagged CSL.ByteString -> IO (MVar CSL.ByteString)
396 scheduleSlaveColdUpdate slaveState@SlaveState{..} encoded = do
397     unlocked <- isEmptyMVar slaveStateLock
398     if not unlocked then error "State is locked."
399     else do
400         debug "Cold Update by Slave."
401         result <- newEmptyMVar
402         -- slaveLastRequestID is only modified here - and used for locking the state
403         reqId <- modifyMVar slaveLastRequestID $ \x -> return (x+1,x+1)
404         modifyMVar_ slaveRequests $ \srs -> do
405             sendToMaster slaveZmqSocket $ ReqUpdate reqId encoded
406             timeoutID <- forkIO $ timeoutRequest slaveState reqId result
407             let callback = if slaveStateIsRed
408                     then scheduleLocalColdUpdate' (downcast slaveLocalState) encoded result
409                     else do
410                         hd <- scheduleColdUpdate slaveLocalState encoded
411                         void $ forkIO $ putMVar result =<< takeMVar hd
412                         return (return ())      -- bogus finalizer
413             return $ IM.insert reqId (callback, timeoutID) srs
414         return result
415
416 -- | Ensures requests are actually answered or fail.
417 --   On timeout the Slave dies, not the thread that invoked the Update.
418 timeoutRequest :: SlaveState st -> RequestID -> MVar m -> IO ()
419 timeoutRequest SlaveState{..} reqId mvar = do
420     threadDelay $ 5*1000*1000
421     stillThere <- withMVar slaveRequests (return . IM.member reqId)
422     when stillThere $ do
423         putMVar mvar $ error "Data.Acid.Centered.Slave: Update-Request timed out."
424         throwTo slaveParentThreadId $ ErrorCall "Data.Acid.Centered.Slave: Update-Request timed out."
425
426 -- | Send a message to Master.
427 sendToMaster :: MVar (Socket Dealer) -> SlaveMessage -> IO ()
428 sendToMaster msock smsg = withMVar msock $ \sock -> send sock [] (encode smsg)
429
430 -- | Close an enslaved State.
431 liberateState :: SlaveState st -> IO ()
432 liberateState SlaveState{..} =
433     -- lock state against updates: disallow requests
434     whenM (tryPutMVar slaveStateLock ()) $ do
435         debug "Closing Slave state..."
436         -- check / wait unprocessed requests
437         debug "Waiting for Requests to finish."
438         waitPoll 100 (withMVar slaveRequests (return . IM.null))
439         -- send master quit message
440         sendToMaster slaveZmqSocket SlaveQuit
441         -- wait replication chan, only if sync done
442         syncDone <- Event.isSet slaveSyncDone
443         when syncDone $ do
444             debug "Waiting for repChan to empty."
445             mtid <- myThreadId
446             putMVar slaveRepThreadId mtid
447         -- kill handler threads
448         debug "Killing request handler."
449         withMVar slaveReqThreadId $ flip throwTo GracefulExit
450         -- cleanup zmq
451         debug "Closing down zmq."
452         withMVar slaveZmqSocket $ \s -> do
453             -- avoid the socket hanging around
454             setLinger (restrict (1000 :: Int)) s
455             disconnect s slaveZmqAddr
456             close s
457         term slaveZmqContext
458         -- cleanup local state
459         debug "Closing local state."
460         closeAcidState slaveLocalState
461
462 slaveToAcidState :: (IsAcidic st, Typeable st)  => SlaveState st -> AcidState st
463 slaveToAcidState slaveState
464   = AcidState { _scheduleUpdate    = scheduleSlaveUpdate slaveState
465               , scheduleColdUpdate = scheduleSlaveColdUpdate slaveState
466               , _query             = query $ slaveLocalState slaveState
467               , queryCold          = queryCold $ slaveLocalState slaveState
468               , createCheckpoint   = createCheckpoint $ slaveLocalState slaveState
469               , createArchive      = createArchive $ slaveLocalState slaveState
470               , closeAcidState     = liberateState slaveState
471               , acidSubState       = mkAnyState slaveState
472               }