19ab9d2ae63e90bbd9ef9deeaf42746616dbc4a8
[acid-state-dist.git] / src / Data / Acid / Centered / Common.hs
1 {-# LANGUAGE OverloadedStrings #-}
2 --------------------------------------------------------------------------------
3 {- |
4   Module      :  Data.Acid.Centered.Common
5   Copyright   :  ?
6
7   Maintainer  :  max.voit+hdv@with-eyes.net
8   Portability :  non-portable (uses GHC extensions)
9
10   Stuff common to Master and Slave in Centered systems.
11
12 -}
13
14 module Data.Acid.Centered.Common
15     (
16       debug
17     , whenM
18     , waitPoll
19     , waitPollN
20     , crcOfState
21     , Crc
22     , NodeRevision
23     , Revision
24     , RequestID
25     , PortNumber
26     , SlaveMessage(..)
27     , MasterMessage(..)
28     , AcidException(..)
29     ) where
30
31 import Data.Acid.Core (Tagged, withCoreState)
32 import Data.Acid.Local (localCore)
33 import Data.Acid.Abstract (downcast)
34 import Data.Acid (AcidState, IsAcidic)
35 import Data.Acid.CRC (crc16)
36
37 import Control.Monad (liftM, liftM2, liftM3,
38                       unless, when
39                      )
40 import Control.Concurrent (threadDelay)
41 import Control.Exception (Exception)
42 import qualified Data.ByteString.Lazy.Char8 as CSL
43 import Data.Serialize (Serialize(..), put, get,
44                        putWord8, getWord8,
45                        runPutLazy
46                       )
47 import Data.Typeable (Typeable)
48 import Data.SafeCopy (safePut)
49 import Data.Word (Word16)
50
51 #ifdef nodebug
52 #else
53 import System.IO (stderr, hPutStrLn)
54 import qualified Control.Concurrent.Lock as L
55 import System.IO.Unsafe (unsafePerformIO)
56 #endif
57
58 --------------------------------------------------------------------------------
59
60 -- | Number of a port for establishing a network connection.
61 type PortNumber = Int
62
63 -- | (Current) Revision of a node.
64 type NodeRevision = Int
65
66 -- | Revision an Update resembles.
67 type Revision = Int
68
69 -- | ID of an Update Request.
70 type RequestID = Int
71
72 -- | We use CRC16 for now.
73 type Crc = Word16
74
75 #ifdef nodebug
76 -- | Debugging disabled.
77 debug :: String -> IO ()
78 debug _ = return ()
79 #else
80 -- | Lock for non-interleaved debug output.
81 {-# NOINLINE debugLock #-}
82 debugLock :: L.Lock
83 debugLock = unsafePerformIO L.new
84
85 -- | Debugging without interleaving output from different threads.
86 debug :: String -> IO ()
87 debug = L.with debugLock . hPutStrLn stderr
88 #endif
89
90 -- | Internally used for killing handler threads.
91 data AcidException = GracefulExit
92       deriving (Show, Typeable)
93
94 instance Exception AcidException
95
96 -- | Messages the Master sends to Slaves.
97 data MasterMessage = DoRep Revision (Maybe RequestID) (Tagged CSL.ByteString)
98                    | DoSyncRep Revision (Tagged CSL.ByteString)
99                    | SyncDone Crc
100                    | DoCheckpoint Revision
101                    | DoSyncCheckpoint Revision CSL.ByteString
102                    | DoArchive Revision
103                    | FullRep Revision
104                    | FullRepTo Revision
105                    | MayQuit
106                    | MasterQuit
107                   deriving (Show)
108
109 -- | Messages Slaves sends to the Master.
110 data SlaveMessage = NewSlave Int
111                   | RepDone Int
112                   | RepError
113                   | ReqUpdate RequestID (Tagged CSL.ByteString)
114                   | SlaveQuit
115                   deriving (Show)
116
117 instance Serialize MasterMessage where
118     put msg = case msg of
119         DoRep r i d          -> putWord8 0 >> put r >> put i >> put d
120         DoSyncRep r d        -> putWord8 1 >> put r >> put d
121         SyncDone c           -> putWord8 2 >> put c
122         DoCheckpoint r       -> putWord8 3 >> put r
123         DoSyncCheckpoint r d -> putWord8 4 >> put r >> put d
124         DoArchive r          -> putWord8 5 >> put r
125         FullRep r            -> putWord8 6 >> put r
126         FullRepTo r          -> putWord8 7 >> put r
127         MayQuit              -> putWord8 8
128         MasterQuit           -> putWord8 9
129     get = do
130         tag <- getWord8
131         case tag of
132             0 -> liftM3 DoRep get get get
133             1 -> liftM2 DoSyncRep get get
134             2 -> liftM SyncDone get
135             3 -> liftM DoCheckpoint get
136             4 -> liftM2 DoSyncCheckpoint get get
137             5 -> liftM DoArchive get
138             6 -> liftM FullRep get
139             7 -> liftM FullRepTo get
140             8 -> return MayQuit
141             9 -> return MasterQuit
142             _ -> error $ "Data.Serialize.get failed for MasterMessage: invalid tag " ++ show tag
143
144 instance Serialize SlaveMessage where
145     put msg = case msg of
146         NewSlave r    -> putWord8 0 >> put r
147         RepDone r     -> putWord8 1 >> put r
148         RepError      -> putWord8 2
149         ReqUpdate i d -> putWord8 3 >> put i >> put d
150         SlaveQuit     -> putWord8 9
151     get = do
152         tag <- getWord8
153         case tag of
154             0 -> liftM NewSlave get
155             1 -> liftM RepDone get
156             2 -> return RepError
157             3 -> liftM2 ReqUpdate get get
158             9 -> return SlaveQuit
159             _ -> error $ "Data.Serialize.get failed for SlaveMessage: invalid tag " ++ show tag
160
161 -- | Compute the CRC of a state.
162 crcOfState :: (IsAcidic st, Typeable st) => AcidState st -> IO Crc
163 crcOfState state = do
164     let lst = downcast state
165     withCoreState (localCore lst) $ \st -> do
166         let encoded = runPutLazy (safePut st)
167         return $ crc16 encoded
168
169 -- | By polling, wait until predicate fulfilled.
170 waitPoll :: Int -> IO Bool -> IO ()
171 waitPoll t p = p >>= \e -> unless e $ threadDelay t >> waitPoll t p
172
173 -- | By polling, wait until predicate fulfilled. Poll at max. n times.
174 waitPollN :: Int -> Int -> IO Bool -> IO ()
175 waitPollN t n p
176     | n == 0    = return ()
177     | otherwise = p >>= \e -> unless e $ threadDelay t >> waitPollN t (n-1) p
178
179 -- | Monadic when
180 whenM :: Monad m => m Bool -> m () -> m ()
181 whenM b a = b >>= flip when a