fix: locking and lock checking
[acid-state-dist.git] / src / Data / Acid / Centered / Slave.hs
1 {-# LANGUAGE DeriveDataTypeable, RecordWildCards, FlexibleContexts #-}
2 --------------------------------------------------------------------------------
3 {- |
4   Module      :  Data.Acid.CenteredSlave.hs
5   Copyright   :  ?
6
7   Maintainer  :  max.voit+hdv@with-eyes.net
8   Portability :  non-portable (uses GHC extensions)
9
10   The Slave part of a the Centered replication backend for acid state.
11
12 -}
13
14 --------------------------------------------------------------------------------
15 -- SLAVE part
16
17 module Data.Acid.Centered.Slave
18     (
19       enslaveState
20     , enslaveStateFrom
21     , enslaveRedState
22     , enslaveRedStateFrom
23     , SlaveState(..)
24     )  where
25
26 import Data.Typeable
27 import Data.SafeCopy
28 import Data.Serialize (decode, encode, runPutLazy, runGetLazy)
29
30 import Data.Acid
31 import Data.Acid.Core
32 import Data.Acid.Abstract
33 import Data.Acid.Local
34 import Data.Acid.Log
35
36 import Data.Acid.Centered.Common
37
38 import System.ZMQ4 (Context, Socket, Dealer(..),
39                     setReceiveHighWM, setSendHighWM, setLinger, restrict,
40                     poll, Poll(..), Event(..),
41                     context, term, socket, close,
42                     connect, disconnect, send, receive)
43 import System.FilePath ( (</>) )
44
45 import Control.Concurrent (forkIO, ThreadId, myThreadId, killThread, threadDelay, forkIOWithUnmask)
46 import Control.Concurrent.MVar (MVar, newMVar, newEmptyMVar, isEmptyMVar,
47                                 withMVar, modifyMVar, modifyMVar_,
48                                 takeMVar, putMVar, tryPutMVar)
49 import Data.IORef (writeIORef)
50 import Control.Monad (void, when, unless, liftM)
51 import Control.Monad.STM (atomically)
52 import Control.Concurrent.STM.TVar (readTVar, writeTVar)
53 import qualified Control.Concurrent.Event as Event
54 import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
55 import Control.Exception (handle, throwTo, SomeException, ErrorCall(..))
56
57 import Data.IntMap (IntMap)
58 import qualified Data.IntMap as IM
59 import Data.Maybe (fromMaybe)
60
61 import qualified Data.ByteString.Lazy.Char8 as CSL
62
63 --------------------------------------------------------------------------------
64
65 -- | Slave state structure, for internal use.
66 data SlaveState st
67     = SlaveState { slaveLocalState :: AcidState st
68                  , slaveStateIsRed :: Bool
69                  , slaveStateLock :: MVar ()
70                  , slaveRepFinalizers :: MVar (IntMap (IO ()))
71                  , slaveRepChan :: Chan SlaveRepItem
72                  , slaveSyncDone :: Event.Event
73                  , slaveRevision :: MVar NodeRevision
74                  , slaveRequests :: MVar SlaveRequests
75                  , slaveLastRequestID :: MVar RequestID
76                  , slaveRepThreadId :: MVar ThreadId
77                  , slaveReqThreadId :: MVar ThreadId
78                  , slaveParentThreadId :: ThreadId
79                  , slaveZmqContext :: Context
80                  , slaveZmqAddr :: String
81                  , slaveZmqSocket :: MVar (Socket Dealer)
82                  } deriving (Typeable)
83
84 -- | Memory of own Requests sent to Master.
85 type SlaveRequests = IntMap (IO (IO ()),ThreadId)
86
87 -- | One Update + Metainformation to replicate.
88 data SlaveRepItem =
89       SRIEnd
90     | SRICheckpoint Revision
91     | SRIArchive Revision
92     | SRIUpdate Revision (Maybe RequestID) (Tagged CSL.ByteString)
93
94 -- | Open a local State as Slave for a Master.
95 --
96 -- The directory for the local state files is the default one ("state/[typeOf
97 -- state]").
98 enslaveState :: (IsAcidic st, Typeable st) =>
99             String          -- ^ hostname of the Master
100          -> PortNumber      -- ^ port to connect to
101          -> st              -- ^ initial state
102          -> IO (AcidState st)
103 enslaveState address port initialState =
104     enslaveStateFrom ("state" </> show (typeOf initialState)) address port initialState
105
106 -- | Open a local State as Slave for a Master.
107 --
108 -- The directory for the local state files is the default one ("state/[typeOf
109 -- state]").
110 enslaveRedState :: (IsAcidic st, Typeable st) =>
111             String          -- ^ hostname of the Master
112          -> PortNumber      -- ^ port to connect to
113          -> st              -- ^ initial state
114          -> IO (AcidState st)
115 enslaveRedState address port initialState =
116     enslaveRedStateFrom ("state" </> show (typeOf initialState)) address port initialState
117
118 -- | Open a local State as Slave for a Master. The directory of the local state
119 -- files can be specified.
120 enslaveStateFrom :: (IsAcidic st, Typeable st) =>
121             FilePath        -- ^ location of the local state files.
122          -> String          -- ^ hostname of the Master
123          -> PortNumber      -- ^ port to connect to
124          -> st              -- ^ initial state
125          -> IO (AcidState st)
126 enslaveStateFrom = enslaveMayRedStateFrom False
127
128 -- | Open a local State as Slave for a _redundant_ Master. The directory of the local state
129 -- files can be specified.
130 enslaveRedStateFrom :: (IsAcidic st, Typeable st) =>
131             FilePath        -- ^ location of the local state files.
132          -> String          -- ^ hostname of the Master
133          -> PortNumber      -- ^ port to connect to
134          -> st              -- ^ initial state
135          -> IO (AcidState st)
136 enslaveRedStateFrom = enslaveMayRedStateFrom True
137
138 -- | Open a local State as Slave for a Master, redundant or not.
139 --   The directory of the local state files can be specified.
140 enslaveMayRedStateFrom :: (IsAcidic st, Typeable st) =>
141             Bool            -- ^ is redundant
142          -> FilePath        -- ^ location of the local state files.
143          -> String          -- ^ hostname of the Master
144          -> PortNumber      -- ^ port to connect to
145          -> st              -- ^ initial state
146          -> IO (AcidState st)
147 enslaveMayRedStateFrom isRed directory address port initialState = do
148         -- local
149         lst <- openLocalStateFrom directory initialState
150         let levs = localEvents $ downcast lst
151         lrev <- atomically $ readTVar $ logNextEntryId levs
152         rev <- newMVar lrev
153         debug $ "Opening enslaved state at revision " ++ show lrev
154         srs <- newMVar IM.empty
155         lastReqId <- newMVar 0
156         repChan <- newChan
157         syncDone <- Event.new
158         reqTid <- newEmptyMVar
159         repTid <- newEmptyMVar
160         parTid <- myThreadId
161         repFin <- newMVar IM.empty
162         lock <- newEmptyMVar
163         -- remote
164         let addr = "tcp://" ++ address ++ ":" ++ show port
165         ctx <- context
166         sock <- socket ctx Dealer
167         setReceiveHighWM (restrict (100*1000 :: Int)) sock
168         setSendHighWM (restrict (100*1000 :: Int)) sock
169         connect sock addr
170         msock <- newMVar sock
171         sendToMaster msock $ NewSlave lrev
172         let slaveState = SlaveState { slaveLocalState = lst
173                                     , slaveStateIsRed = isRed
174                                     , slaveStateLock = lock
175                                     , slaveRepFinalizers = repFin
176                                     , slaveRepChan = repChan
177                                     , slaveSyncDone = syncDone
178                                     , slaveRevision = rev
179                                     , slaveRequests = srs
180                                     , slaveLastRequestID = lastReqId
181                                     , slaveReqThreadId = reqTid
182                                     , slaveRepThreadId = repTid
183                                     , slaveParentThreadId = parTid
184                                     , slaveZmqContext = ctx
185                                     , slaveZmqAddr = addr
186                                     , slaveZmqSocket = msock
187                                     }
188         void $ forkIOWithUnmask $ slaveRequestHandler slaveState
189         void $ forkIO $ slaveReplicationHandler slaveState
190         return $ slaveToAcidState slaveState
191
192 -- | Replication handler of the Slave.
193 slaveRequestHandler :: (IsAcidic st, Typeable st) => SlaveState st -> (IO () -> IO ()) -> IO ()
194 slaveRequestHandler slaveState@SlaveState{..} unmask = do
195     mtid <- myThreadId
196     putMVar slaveReqThreadId mtid
197     let loop = handle (\e -> throwTo slaveParentThreadId (e :: SomeException)) $
198             unmask $ handle killHandler $ do
199             --waitRead =<< readMVar slaveZmqSocket
200             -- FIXME: we needn't poll if not for strange zmq behaviour
201             re <- withMVar slaveZmqSocket $ \sock -> poll 100 [Sock sock [In] Nothing]
202             unless (null $ head re) $ do
203                 msg <- withMVar slaveZmqSocket receive
204                 case decode msg of
205                     Left str -> error $ "Data.Serialize.decode failed on MasterMessage: " ++ show str
206                     Right mmsg -> do
207                          debug $ "Received: " ++ show mmsg
208                          case mmsg of
209                             -- We are sent an Update to replicate.
210                             DoRep r i d -> queueRepItem slaveState (SRIUpdate r i d)
211                             -- We are sent a Checkpoint for synchronization.
212                             DoSyncCheckpoint r d -> replicateSyncCp slaveState r d
213                             -- We are sent an Update to replicate for synchronization.
214                             DoSyncRep r d -> replicateSyncUpdate slaveState r d
215                             -- Master done sending all synchronization Updates.
216                             SyncDone c -> onSyncDone slaveState c
217                             -- We are sent a Checkpoint request.
218                             DoCheckpoint r -> queueRepItem slaveState (SRICheckpoint r)
219                             -- We are sent an Archive request.
220                             DoArchive r -> queueRepItem slaveState (SRIArchive r)
221                             -- Full replication of a revision
222                             FullRep r -> modifyMVar_ slaveRepFinalizers $ \rf -> do
223                                             rf IM.! r
224                                             return $ IM.delete r rf
225                             -- Full replication of events up to revision
226                             FullRepTo r -> modifyMVar_ slaveRepFinalizers $ \rf -> do
227                                             let (ef, nrf) = IM.partitionWithKey (\k _ -> k <= r) rf
228                                             sequence_ (IM.elems ef)
229                                             return nrf
230                             -- We are allowed to Quit.
231                             MayQuit -> writeChan slaveRepChan SRIEnd
232                             -- We are requested to Quit - shall be handled by
233                             -- 'bracket' usage by user.
234                             MasterQuit -> throwTo slaveParentThreadId $
235                                 ErrorCall "Data.Acid.Centered.Slave: Master quit."
236                             -- no other messages possible, enforced by type checker
237             loop
238     loop
239     where
240         killHandler :: AcidException -> IO ()
241         killHandler GracefulExit = return ()
242
243 -- | After sync check CRC
244 onSyncDone :: (IsAcidic st, Typeable st) => SlaveState st -> Crc -> IO ()
245 onSyncDone SlaveState{..} crc = do
246     localCrc <- crcOfState slaveLocalState
247     if crc /= localCrc then
248         error "Data.Acid.Centered.Slave: CRC mismatch after sync."
249     else do
250         debug "Sync Done, CRC fine."
251         Event.set slaveSyncDone
252
253 -- | Queue Updates into Chan for replication.
254 -- We use the Chan so Sync-Updates and normal ones can be interleaved.
255 queueRepItem :: SlaveState st -> SlaveRepItem -> IO ()
256 queueRepItem SlaveState{..} repItem = do
257         debug "Queuing RepItem."
258         writeChan slaveRepChan repItem
259
260 -- | Replicates content of Chan.
261 slaveReplicationHandler :: Typeable st => SlaveState st -> IO ()
262 slaveReplicationHandler slaveState@SlaveState{..} = do
263         mtid <- myThreadId
264         putMVar slaveRepThreadId mtid
265         -- todo: timeout is magic variable, make customizable?
266         noTimeout <- Event.waitTimeout slaveSyncDone $ 10*1000*1000
267         unless noTimeout $ throwTo slaveParentThreadId $ ErrorCall "Data.Acid.Centered.Slave: Took too long to sync. Timeout."
268         let loop = handle (\e -> throwTo slaveParentThreadId (e :: SomeException)) $ do
269                 mayRepItem <- readChan slaveRepChan
270                 case mayRepItem of
271                     SRIEnd -> return ()
272                     SRICheckpoint r -> do
273                         repCheckpoint slaveState r
274                         loop
275                     SRIArchive r -> do
276                         repArchive slaveState r
277                         loop
278                     SRIUpdate r i d -> do
279                         replicateUpdate slaveState r i d False
280                         loop
281         loop
282         -- signal that we're done
283         void $ takeMVar slaveRepThreadId
284
285 -- | Replicate Sync-Checkpoints directly.
286 replicateSyncCp :: (IsAcidic st, Typeable st) =>
287         SlaveState st -> Revision -> CSL.ByteString -> IO ()
288 replicateSyncCp SlaveState{..} rev encoded = do
289     st <- decodeCheckpoint encoded
290     let lst = downcast slaveLocalState
291     let core = localCore lst
292     modifyMVar_ slaveRevision $ \sr -> do
293         when (sr > rev) $ error "Data.Acid.Centered.Slave: Revision mismatch for checkpoint: Slave is newer."
294         -- todo: check
295         modifyCoreState_ core $ \_ -> do
296             writeIORef (localCopy lst) st
297             createCpFake lst encoded rev
298             adjustEventLogId lst rev
299             return st
300         return rev
301     where
302         adjustEventLogId l r = do
303             atomically $ writeTVar (logNextEntryId (localEvents l)) r
304             void $ cutFileLog (localEvents l)
305         createCpFake l e r = do
306             mvar <- newEmptyMVar
307             pushAction (localEvents l) $
308                 pushEntry (localCheckpoints l) (Checkpoint r e) (putMVar mvar ())
309             takeMVar mvar
310         decodeCheckpoint e =
311             case runGetLazy safeGet e of
312                 Left msg  -> error $ "Data.Acid.Centered.Slave: Checkpoint could not be decoded: " ++ msg
313                 Right val -> return val
314
315 -- | Replicate Sync-Updates directly.
316 replicateSyncUpdate :: Typeable st => SlaveState st -> Revision -> Tagged CSL.ByteString -> IO ()
317 replicateSyncUpdate slaveState rev event = replicateUpdate slaveState rev Nothing event True
318
319 -- | Replicate an Update as requested by Master.
320 --   Updates that were requested by this Slave are run locally and the result
321 --   put into the MVar in SlaveRequests.
322 --   Other Updates are just replicated without using the result.
323 replicateUpdate :: Typeable st => SlaveState st -> Revision -> Maybe RequestID -> Tagged CSL.ByteString -> Bool -> IO ()
324 replicateUpdate SlaveState{..} rev reqId event syncing = do
325         debug $ "Got an Update to replicate " ++ show rev
326         modifyMVar_ slaveRevision $ \nr -> if rev - 1 == nr
327             then do
328                 -- commit / run it locally
329                 case reqId of
330                     Nothing -> if slaveStateIsRed
331                         then do
332                             act <- liftM snd $ scheduleLocalColdUpdate' (downcast slaveLocalState) event
333                             modifyMVar_ slaveRepFinalizers $ return . IM.insert rev act
334                         else
335                             void $ scheduleColdUpdate slaveLocalState event
336                     Just rid -> do
337                         act <- modifyMVar slaveRequests $ \srs -> do
338                             debug $ "This is the Update for Request " ++ show rid
339                             let (icallback, timeoutId) = fromMaybe (error $ "Data.Acid.Centered.Slave: Callback not found: " ++ show rid) (IM.lookup rid srs)
340                             callback <- icallback
341                             killThread timeoutId
342                             let nsrs = IM.delete rid srs
343                             return (nsrs, callback)
344                         when slaveStateIsRed $
345                             modifyMVar_ slaveRepFinalizers $ return . IM.insert rev act
346                 -- send reply: we're done
347                 unless syncing $ sendToMaster slaveZmqSocket $ RepDone rev
348                 return rev
349             else do
350                 sendToMaster slaveZmqSocket RepError
351                 void $ error $ "Data.Acid.Centered.Slave: Replication failed at revision " ++ show nr ++ " -> " ++ show rev
352                 return nr
353
354 repCheckpoint :: SlaveState st -> Revision -> IO ()
355 repCheckpoint SlaveState{..} rev = do
356     debug $ "Got Checkpoint request at revision: " ++ show rev
357     -- todo: check that we're at the correct revision
358     withMVar slaveRevision $ \_ ->
359         -- create checkpoint
360         createCheckpoint slaveLocalState
361
362 repArchive :: SlaveState st -> Revision -> IO ()
363 repArchive SlaveState{..} rev = do
364     debug $ "Got Archive request at revision: " ++ show rev
365     -- todo: at right revision?
366     withMVar slaveRevision $ \_ ->
367         createArchive slaveLocalState
368
369
370 -- | Update on slave site.
371 --      The steps are:
372 --      - Request Update from Master
373 --      - Master issues Update with same RequestID
374 --      - repHandler replicates and puts result in MVar
375 scheduleSlaveUpdate :: (UpdateEvent e, Typeable (EventState e)) => SlaveState (EventState e) -> e -> IO (MVar (EventResult e))
376 scheduleSlaveUpdate slaveState@SlaveState{..} event = do
377     unlocked <- isEmptyMVar slaveStateLock
378     if not unlocked then error "State is locked."
379     else do
380         debug "Update by Slave."
381         result <- newEmptyMVar
382         -- slaveLastRequestID is only modified here - and used for locking the state
383         reqId <- modifyMVar slaveLastRequestID $ \x -> return (x+1,x+1)
384         modifyMVar_ slaveRequests $ \srs -> do
385             let encoded = runPutLazy (safePut event)
386             sendToMaster slaveZmqSocket $ ReqUpdate reqId (methodTag event, encoded)
387             timeoutID <- forkIO $ timeoutRequest slaveState reqId result
388             let callback = if slaveStateIsRed
389                     then scheduleLocalUpdate' (downcast slaveLocalState) event result
390                     else do
391                         hd <- scheduleUpdate slaveLocalState event
392                         void $ forkIO $ putMVar result =<< takeMVar hd
393                         return (return ())      -- bogus finalizer
394             return $ IM.insert reqId (callback, timeoutID) srs
395         return result
396
397 -- | Ensures requests are actually answered or fail.
398 --   On timeout the Slave dies, not the thread that invoked the Update.
399 timeoutRequest :: SlaveState st -> RequestID -> MVar m -> IO ()
400 timeoutRequest SlaveState{..} reqId mvar = do
401     threadDelay $ 5*1000*1000
402     stillThere <- withMVar slaveRequests (return . IM.member reqId)
403     when stillThere $ do
404         putMVar mvar $ error "Data.Acid.Centered.Slave: Update-Request timed out."
405         throwTo slaveParentThreadId $ ErrorCall "Data.Acid.Centered.Slave: Update-Request timed out."
406
407 -- | Send a message to Master.
408 sendToMaster :: MVar (Socket Dealer) -> SlaveMessage -> IO ()
409 sendToMaster msock smsg = withMVar msock $ \sock -> send sock [] (encode smsg)
410
411 -- | Close an enslaved State.
412 liberateState :: SlaveState st -> IO ()
413 liberateState SlaveState{..} =
414     -- lock state against updates: disallow requests
415     -- todo: rather use a special value allowing exceptions in scheduleUpdate
416     whenM (tryPutMVar slaveStateLock ()) $ do
417         debug "Closing Slave state..."
418         -- check / wait unprocessed requests
419         debug "Waiting for Requests to finish."
420         waitPoll 100 (withMVar slaveRequests (return . IM.null))
421         -- send master quit message
422         sendToMaster slaveZmqSocket SlaveQuit
423         -- wait replication chan, only if sync done
424         syncDone <- Event.isSet slaveSyncDone
425         when syncDone $ do
426             debug "Waiting for repChan to empty."
427             mtid <- myThreadId
428             putMVar slaveRepThreadId mtid
429         -- kill handler threads
430         debug "Killing request handler."
431         withMVar slaveReqThreadId $ flip throwTo GracefulExit
432         -- cleanup zmq
433         debug "Closing down zmq."
434         withMVar slaveZmqSocket $ \s -> do
435             -- avoid the socket hanging around
436             setLinger (restrict (1000 :: Int)) s
437             disconnect s slaveZmqAddr
438             close s
439         term slaveZmqContext
440         -- cleanup local state
441         debug "Closing local state."
442         closeAcidState slaveLocalState
443
444 slaveToAcidState :: (IsAcidic st, Typeable st)  => SlaveState st -> AcidState st
445 slaveToAcidState slaveState
446   = AcidState { _scheduleUpdate    = scheduleSlaveUpdate slaveState
447               , scheduleColdUpdate = undefined
448               , _query             = query $ slaveLocalState slaveState
449               , queryCold          = queryCold $ slaveLocalState slaveState
450               , createCheckpoint   = createCheckpoint $ slaveLocalState slaveState
451               , createArchive      = createArchive $ slaveLocalState slaveState
452               , closeAcidState     = liberateState slaveState
453               , acidSubState       = mkAnyState slaveState
454               }