ref: Bytestring, imports
[acid-state-dist.git] / src / Data / Acid / Centered / Common.hs
1 {-# LANGUAGE OverloadedStrings #-}
2 --------------------------------------------------------------------------------
3 {- |
4   Module      :  Data.Acid.Centered.Common
5   Copyright   :  ?
6
7   Maintainer  :  max.voit+hdv@with-eyes.net
8   Portability :  non-portable (uses GHC extensions)
9
10   Stuff common to Master and Slave in Centered systems.
11
12 -}
13
14 module Data.Acid.Centered.Common
15     (
16       debug
17     , whenM
18     , waitPoll
19     , waitPollN
20     , crcOfState
21     , Crc
22     , NodeRevision
23     , Revision
24     , RequestID
25     , PortNumber
26     , SlaveMessage(..)
27     , MasterMessage(..)
28     , AcidException(..)
29     ) where
30
31 import Data.Typeable (Typeable)
32 import Data.SafeCopy (safePut)
33
34 import Data.Acid.Core (Tagged, withCoreState)
35 import Data.Acid.Local (localCore)
36 import Data.Acid.Abstract (downcast)
37 import Data.Acid (AcidState, IsAcidic)
38 import Data.Acid.CRC (crc16)
39
40 import Control.Concurrent (threadDelay)
41
42 import Control.Monad (liftM, liftM2, liftM3,
43                       unless, when
44                      )
45 import Control.Exception (Exception)
46
47 import Data.ByteString.Lazy.Char8 (ByteString)
48 import Data.Serialize (Serialize(..), put, get,
49                        putWord8, getWord8,
50                        runPutLazy
51                       )
52 import Data.Word (Word16)
53
54 #ifdef nodebug
55 #else
56 import System.IO (stderr, hPutStrLn)
57 import qualified Control.Concurrent.Lock as L
58 import System.IO.Unsafe (unsafePerformIO)
59 #endif
60
61 --------------------------------------------------------------------------------
62
63 -- | Number of a port for establishing a network connection.
64 type PortNumber = Int
65
66 -- | (Current) Revision of a node.
67 type NodeRevision = Int
68
69 -- | Revision an Update resembles.
70 type Revision = Int
71
72 -- | ID of an Update Request.
73 type RequestID = Int
74
75 -- | We use CRC16 for now.
76 type Crc = Word16
77
78 #ifdef nodebug
79 -- | Debugging disabled.
80 debug :: String -> IO ()
81 debug _ = return ()
82 #else
83 -- | Lock for non-interleaved debug output.
84 {-# NOINLINE debugLock #-}
85 debugLock :: L.Lock
86 debugLock = unsafePerformIO L.new
87
88 -- | Debugging without interleaving output from different threads.
89 debug :: String -> IO ()
90 debug = L.with debugLock . hPutStrLn stderr
91 #endif
92
93 -- | Internally used for killing handler threads.
94 data AcidException = GracefulExit
95       deriving (Show, Typeable)
96
97 instance Exception AcidException
98
99 -- | Messages the Master sends to Slaves.
100 data MasterMessage = DoRep Revision (Maybe RequestID) (Tagged ByteString)
101                    | DoSyncRep Revision (Tagged ByteString)
102                    | SyncDone Crc
103                    | DoCheckpoint Revision
104                    | DoSyncCheckpoint Revision ByteString
105                    | DoArchive Revision
106                    | FullRep Revision
107                    | FullRepTo Revision
108                    | MayQuit
109                    | MasterQuit
110                   deriving (Show)
111
112 -- | Messages Slaves sends to the Master.
113 data SlaveMessage = NewSlave Int
114                   | RepDone Int
115                   | RepError
116                   | ReqUpdate RequestID (Tagged ByteString)
117                   | SlaveQuit
118                   deriving (Show)
119
120 instance Serialize MasterMessage where
121     put msg = case msg of
122         DoRep r i d          -> putWord8 0 >> put r >> put i >> put d
123         DoSyncRep r d        -> putWord8 1 >> put r >> put d
124         SyncDone c           -> putWord8 2 >> put c
125         DoCheckpoint r       -> putWord8 3 >> put r
126         DoSyncCheckpoint r d -> putWord8 4 >> put r >> put d
127         DoArchive r          -> putWord8 5 >> put r
128         FullRep r            -> putWord8 6 >> put r
129         FullRepTo r          -> putWord8 7 >> put r
130         MayQuit              -> putWord8 8
131         MasterQuit           -> putWord8 9
132     get = do
133         tag <- getWord8
134         case tag of
135             0 -> liftM3 DoRep get get get
136             1 -> liftM2 DoSyncRep get get
137             2 -> liftM SyncDone get
138             3 -> liftM DoCheckpoint get
139             4 -> liftM2 DoSyncCheckpoint get get
140             5 -> liftM DoArchive get
141             6 -> liftM FullRep get
142             7 -> liftM FullRepTo get
143             8 -> return MayQuit
144             9 -> return MasterQuit
145             _ -> error $ "Data.Serialize.get failed for MasterMessage: invalid tag " ++ show tag
146
147 instance Serialize SlaveMessage where
148     put msg = case msg of
149         NewSlave r    -> putWord8 0 >> put r
150         RepDone r     -> putWord8 1 >> put r
151         RepError      -> putWord8 2
152         ReqUpdate i d -> putWord8 3 >> put i >> put d
153         SlaveQuit     -> putWord8 9
154     get = do
155         tag <- getWord8
156         case tag of
157             0 -> liftM NewSlave get
158             1 -> liftM RepDone get
159             2 -> return RepError
160             3 -> liftM2 ReqUpdate get get
161             9 -> return SlaveQuit
162             _ -> error $ "Data.Serialize.get failed for SlaveMessage: invalid tag " ++ show tag
163
164 -- | Compute the CRC of a state.
165 crcOfState :: (IsAcidic st, Typeable st) => AcidState st -> IO Crc
166 crcOfState state = do
167     let lst = downcast state
168     withCoreState (localCore lst) $ \st -> do
169         let encoded = runPutLazy (safePut st)
170         return $ crc16 encoded
171
172 -- | By polling, wait until predicate fulfilled.
173 waitPoll :: Int -> IO Bool -> IO ()
174 waitPoll t p = p >>= \e -> unless e $ threadDelay t >> waitPoll t p
175
176 -- | By polling, wait until predicate fulfilled. Poll at max. n times.
177 waitPollN :: Int -> Int -> IO Bool -> IO ()
178 waitPollN t n p
179     | n == 0    = return ()
180     | otherwise = p >>= \e -> unless e $ threadDelay t >> waitPollN t (n-1) p
181
182 -- | Monadic when
183 whenM :: Monad m => m Bool -> m () -> m ()
184 whenM b a = b >>= flip when a