# 第二十八章:軟件事務內存 (STM)
在并發編程的傳統線程模型中,線程之間的數據共享需要通過鎖來保持一致性(consistentBalance),當數據產生變化時,還需要使用條件變量(condition variable)對各個線程進行通知。
某種程度上,Haskell 的 MVar 機制對上面提到的工具進行了改進,但是,它仍然帶有和這些工具一樣的缺陷:
- 因為忘記使用鎖而導致條件競爭(race condition)
- 因為不正確的加鎖順序而導致死鎖(deadblock)
- 因為未被捕捉的異常而造成程序崩潰(corruption)
- 因為錯誤地忽略了通知,造成線程無法正常喚醒(lost wakeup)
這些問題即使在很小的并發程序里也會經常發生,而在更加龐大的代碼庫或是高負載的情況下,這些問題會引發更加糟糕的難題。
比如說,對一個只有幾個大范圍鎖的程序進行編程并不難,只是一旦這個程序在高負載的環境下運行,鎖之間的相互競爭就會變得非常嚴重。另一方面,如果采用細粒度(fineo-grained)的鎖機制,保持軟件正常工作將會變得非常困難。除此之外,就算在負載不高的情況下, 加鎖帶來的額外的簿記工作(book-keeping)也會對性能產生影響。
## 基礎知識
軟件事務內存(Software transactional memory)提供了一些簡單但強大的工具。通過這些工具我們可以解決前面提到的大多數問題。通過 atomically 組合器(combinator), 我們可以在一個事務內執行一批操作。當這一組操作開始執行的時候,其他線程是覺察不到這些操作所產生的任何修改,直到所有操作完成。同樣的,當前線程也無法察覺其他線程的所產生的修改。這些性質表明的操作的隔離性(isolated)。
當從一個事務退出的時候,只會發生以下情況中的一種:
- 如果沒有其他線程修改了同樣的數據,當前線程產生的修改將會對所有其他線程可見。
- 否則,當前線程的所產生的改動會被丟棄,然后這組操作會被重新執行。
> atomically 這種全有或全無(all-or-nothing)的天性被稱之為原子性(atomic), atomically 也因為得名。如果你使用過支持事務的數據庫,你會覺得STM使用起來非常熟悉。
## 一些簡單的例子
在多玩家角色扮演的游戲里, 一個玩家的角色會有許多屬性,比如健康,財產以及金錢。讓我們從基于游戲人物屬性的一些簡單的函數和類型開始去了解STM的精彩內容。隨著學習的深入,我們也會不斷地改進我們的代碼。
STM的API位于 stm 包,模塊 Control.Concurrent.STM 。
~~~
-- file: ch28/GameInventory.hs
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
import Control.Concurrent.STM
import Control.Monad
data Item = Scroll
| Wand
| Banjo
deriving (Eq, Ord, Show)
newtype Gold = Gold Int
deriving (Eq, Ord, Show, Num)
newtype HitPoint = HitPoint Int
deriving (Eq, Ord, Show, Num)
type Inventory = TVar [Item]
type Health = TVar HitPoint
type Balance = TVar Gold
data Player = Player {
balance :: Balance,
health :: Health,
inventory :: Inventory
}
~~~
參數化類型 TVar 是一個可變量,可以在 atomically 塊中讀取或者修改。為了簡單起見,我們把玩家的背包(Inventory)定義為物品的列表。同時注意到,我們用到了 newtype ,這樣不會混淆財富和健康屬性。
當需要在兩個賬戶(Balance)之間轉賬,我們所要的做的就只是調整下各自的 Tvar 。
~~~
-- file: ch28/GameInventory.hs
basicTransfer qty fromBal toBal = do
fromQty <- readTVar fromBal
toQty <- readTVar toBal
writeTVar fromBal (fromQty - qty)
writeTVar toBal (toQty + qty)
~~~
讓我們寫個簡單的測試函數
~~~
-- file: ch28/GameInventory.hs
transferTest = do
alice <- newTVar (12 :: Gold)
bob <- newTVar 4
basicTransfer 3 alice bob
liftM2 (,) (readTVar alice) (readTVar bob)
~~~
如果我們在ghci里執行下這個函數,應該有如下的結果
~~~
ghci> :load GameInventory
[1 of 1] Compiling Main ( GameInventory.hs, interpreted )
Ok, modules loaded: Main.
ghci> atomically transferTest
Loading package array-0.4.0.0 ... linking ... done.
Loading package stm-2.3 ... linking ... done.
(Gold 9,Gold 7)
~~~
原子性和隔離性保證了當其他線程同時看到 bob 的賬戶和 alice 的賬戶被修改了。
即使在并發程序里,我們也努力保持代碼盡量的純函數化。這使得我們的代碼更加容易推導和測試。由于數據并沒有事務性,這也讓底層的STM做更少的事。以下的純函數實現了從我們來表示玩家背包的數列里移除一個物品。
~~~
-- file: ch28/GameInventory.hs
removeInv :: Eq a => a -> [a] -> Maybe [a]
removeInv x xs =
case takeWhile (/= x) xs of
(_:ys) -> Just ys
[] -> Nothing
~~~
這里返回值用了 Maybe 類型,它可以用來表示物品是否在玩家的背包里。
下面這個事務性的函數實現了把一個物品給另外一個玩家。這個函數有一點點復雜因為需要判斷給予者是否有這個物品。
~~~
-- file: ch28/GameInventory.hs
maybeGiveItem item fromInv toInv = do
fromList <- readTVar fromInv
case removeInv item fromList of
Nothing -> return False
Just newList -> do
writeTVar fromInv newList
destItems <- readTVar toInv
writeTVar toInv (item : destItems)
return True
~~~
## STM的安全性
既然我們提供了有原子性和隔離型的事務,那么保證我們不能有意或是無意的從 atomically 執行塊從脫離顯得格外重要。借由 STM monad,Haskell的類型系統保證了我們這種行為。
~~~
ghci> :type atomically
atomically :: STM a -> IO a
~~~
atomically 接受一個 STM monad的動作, 然后執行并讓我們可以從 IO monad里拿到這個結果。 STM monad是所有事務相關代碼執行的地方。比如這些操作 TVar 值的函數都在 STM monad里被執行。
~~~
ghci> :type newTVar
newTVar :: a -> STM (TVar a)
ghci> :type readTVar
readTVar :: TVar a -> STM a
ghci> :type writeTVar
writeTVar :: TVar a -> a -> STM ()
~~~
我們之前定義的事務性函數也有這個特性
~~~
-- file: ch28/GameInventory.hs
basicTransfer :: Gold -> Balance -> Balance -> STM ()
maybeGiveItem :: Item -> Inventory -> Inventory -> STM Bool
~~~
在 STM monad里是不允許執行I/O操作或者是修改非事務性的可變狀態,比如 MVar 的值。這就使得我們可以避免那些違背事務完整的操作。
## 重試一個事務
maybeGiveItem 這個函數看上去稍微有點怪異。只有當角色有這個物品時才會將它給另外一個角色,這看上去還算合理,然后返回一個 Bool 值使調用這個函數的代碼變得復雜。下面這個函數調用了 maybeGiveItem, 它必須根據 maybeGiveItem 的返回結果來決定如何繼續執行。
~~~
maybeSellItem :: Item -> Gold -> Player -> Player -> STM Bool
maybeSellItem item price buyer seller = do
given <- maybeGiveItem item (inventory seller) (inventory buyer)
if given
then do
basicTransfer price (balance buyer) (balance seller)
return True
else return False
~~~
我們不僅要檢查物品是否給到了另一個玩家,而且還得把是否成功這個信號傳遞給調用者。這就意味了復雜性被延續到了更外層。
下面我們來看看如何用更加優雅的方式處理事務無法成功進行的情況。 STM API 提供了一個 retry 函數,它可以立即中斷一個 無法成功進行的 atomically 執行塊。正如這個函數名本身所指明的意思,當它發生時,執行塊會被重新執行,所有在這之前的操作都不會被記錄。我們使用 retry 重新實現了 maybeGiveItem 。
~~~
-- file: ch28/GameInventory.hs
giveItem :: Item -> Inventory -> Inventory -> STM ()
giveItem item fromInv toInv = do
fromList <- readTVar fromInv
case removeInv item fromList of
Nothing -> retry
Just newList -> do
writeTVar fromInv newList
readTVar toInv >>= writeTVar toInv . (item :)
~~~
我們之前實現的 basicTransfer 有一個缺陷:沒有檢查發送者的賬戶是否有足夠的資金。我們可以使用 retry 來糾正這個問題并保持方法簽名不變。
~~~
-- file: ch28/GameInventory.hs
transfer :: Gold -> Balance -> Balance -> STM ()
transfer qty fromBal toBal = do
fromQty <- readTVar fromBal
when (qty > fromQty) $
retry
writeTVar fromBal (fromQty - qty)
readTVar toBal >>= writeTVar toBal . (qty +)
~~~
使用 retry 后,銷售物品的函數就顯得簡單很多。
~~~
sellItem :: Item -> Gold -> Player -> Player -> STM ()
sellItem item price buyer seller = do
giveItem item (inventory seller) (inventory buyer)
transfer price (balance buyer) (balance seller)
~~~
這個實現和之前的稍微有點不同。如果有必要會會阻塞以至賣家有東西可賣并且買家有足夠的余額支付,而不是在發現賣家沒這個物品可銷售時馬上返回 False 。
## retry 時到底發生了什么?
retry 不僅僅使得代碼更加簡潔:它似乎有魔力般的內部實現。當我們調用 retry 的時候,它并不是馬上重啟事務,而是會先阻塞線程,一直到那些在 retry 之前被訪問過的變量被其他線程修改。
比如,如果我們調用 transfer 而發現余額不足, retry 會自發的等待,直到賬戶余額的變動,然后會重新啟動事務。 同樣的,對于函數 giveItem , 如果賣家沒有那個物品,線程就會阻塞直到他有了那個物品。
## 選擇替代方案
有時候我們并不總是希望重啟 atomically 操作即使調用了 retry 或者由于其他線程的同步修改而導致的失敗。比如函數 sellItem 會不斷地重試,只要沒有滿足其條件:要有物品并且余額足夠。然而我們可能更希望只重試一次。
orElse 組合器允許我們在主操作失敗的情況下,執行一個”備用”操作。
~~~
ghci> :type orElse
orElse :: STM a -> STM a -> STM a
~~~
我們對 sellItem 做了一點修改:如果 sellItem 失敗, 則 orElse 執行 returnFalse 的動作從而使這個sale函數立即返回。
~~~
trySellItem :: Item -> Gold -> Player -> Player -> STM Bool
trySellItem item price buyer seller =
sellItem item price buyer seller >> return True
`orElse`
return False
~~~
## 在事務中使用高階代碼
假設我們想做稍微有挑戰的事情,從一系列的物品中,選取第一個賣家擁有的并且買家能承擔費用的物品進行購買,如果沒有這樣的物品則什么都不做。顯然我們可以很直觀的給出實現。
~~~
-- file: ch28/GameInventory.hs
crummyList :: [(Item, Gold)] -> Player -> Player
-> STM (Maybe (Item, Gold))
crummyList list buyer seller = go list
where go [] = return Nothing
go (this@(item,price) : rest) = do
sellItem item price buyer seller
return (Just this)
`orElse`
go rest
~~~
在這個實現里,我們有碰到了一個熟悉的問題:把我們的需求和如果實現混淆在一個。再深入一點觀察,則會發現兩個可重復使用的模式。
第一個就是讓事務失敗而不是重試。
~~~
-- file: ch28/GameInventory.hs
maybeSTM :: STM a -> STM (Maybe a)
maybeSTM m = (Just `liftM` m) `orElse` return Nothing
~~~
第二個,我們要對一系列的對象執行否一個操作,直到有一個成功為止。如果全部都失敗,則執行 retry 操作。由于 STM 是 MonadPlus 類型類的一個實例,所以顯得很方便。
~~~
-- file: ch28/STMPlus.hs
instance MonadPlus STM where
mzero = retry
mplus = orElse
~~~
Control.Monad 模塊定義了一個 msum 函數,而它就是我們所需要的。
~~~
-- file: ch28/STMPlus.hs
msum :: MonadPlus m => [m a] -> m a
msum = foldr mplus mzero
~~~
有了這些重要的工具,我們就可以寫出更加簡潔的實現了。
~~~
-- file: ch28/GameInventory.hs
shoppingList :: [(Item, Gold)] -> Player -> Player
-> STM (Maybe (Item, Gold))
shoppingList list buyer seller = maybeSTM . msum $ map sellOne list
where sellOne this@(item,price) = do
sellItem item price buyer seller
return this
~~~
既然 STM 是 MonadPlus 類型類的實例,我們可以改進 maybeSTM ,這樣就可以適用于任何 MonadPlus 的實例。
~~~
-- file: ch28/GameInventory.hs
maybeM :: MonadPlus m => m a -> m (Maybe a)
maybeM m = (Just `liftM` m) `mplus` return Nothing
~~~
這個函數會在很多不同情況下顯得非常有用。
## I/O 和 STM
STM monad 禁止任意的I/O操作,因為I/O操作會破壞原子性和隔離性。當然I/O的操作還是需要的,只是我們需要非常的謹慎。
大多數時候,我們會執行I/O操作是由于我們在 atomically 塊中產生的一個結果。在這些情況下,正確的做法通常是 atomically 返回一些數據,在I/O monad里的調用者則根據這些數據知道如何繼續下一步動作。我們甚至可以返回需要被操作的動作 (action), 因為他們是第一類值(First Class vaules)。
~~~
-- file: ch28/STMIO.hs
someAction :: IO a
stmTransaction :: STM (IO a)
stmTransaction = return someAction
doSomething :: IO a
doSomething = join (atomically stmTransaction)
~~~
我們偶爾也需要在 STM 里進行I/O操作。比如從一個肯定存在的文件里讀取一些非可變數據,這樣的操作并不會違背 STM 保證原子性和隔離性的原則。在這些情況,我們可以使用 unsafeIOToSTM 來執行一個 IO 操作。這個函數位于偏底層的一個模塊 GHC.Conc ,所以要謹慎使用。
~~~
ghci> :m +GHC.Conc
ghci> :type unsafeIOToSTM
unsafeIOToSTM :: IO a -> STM a
~~~
我們所執行的這個 IO 動作絕對不能打開另外一個 atomically 事務。如果一個線程嘗試嵌套的事務,系統就會拋出異常。
由于類型系統無法幫助我們確保 IO 代碼沒有執行一些敏感動作,最安全的做法就是我們盡量的限制使用 unsafeIOToSTM 。下面的例子展示了在 atomically 中執行 IO 的典型錯誤。
~~~
-- file: ch28/STMIO.hs
launchTorpedoes :: IO ()
notActuallyAtomic = do
doStuff
unsafeIOToSTM launchTorpedoes
mightRetry
~~~
如果 mightRetry 會引發事務的重啟,那么 launchTorpedoes 會被調用多次。事實上,我們無法預見它會被調用多少次, 因為重試是由運行時系統所處理的。解決方案就是在事務中不要有這種類型的non-idempotent I/O操作。
## 線程之間的通訊
正如基礎類型 TVar 那樣, stm 包也提供了兩個更有用的類型用于線程之間的通訊, TMVar 和 TChan 。 TMVar 是STM世界的 MVar , 它可以保存一個 Maybe 類型的值, 即 Just 值或者 Nothing 。 TChan 則是 STM 世界里的 Chan ,它實現了一個有類型的先進先出(FIFO)通道。
[譯者注:為何說 TMVar 是STM世界的 MVar 而不是 TVar ?是因為從實踐意義上理解的。 MVar 的特性是要么有值要么為空的一個容器,所以當線程去讀這個容器時,要么讀到值繼續執行,要么讀不到值就等待。 而 TVar 并沒有這樣的特性,所以引入了 TMVar 。 它的實現是這樣的, newtypeTMVara=TMVar(TVar(Maybea)) , 正是由于它包含了一個 Maybe 類型的值,這樣就有了”要么有值要么為空”這樣的特性,也就是 MVar 所擁有的特性。]
## 并發網絡鏈接檢查器
作為一個使用 STM 的實際例子, 我們將開發一個檢查HTML文件里不正確鏈接的程序,這里不正確的鏈接是指那些鏈接指向了一個錯誤的網頁或是無法訪問到其指向的服務器。用并發的方式解決這個問題非常得合適:如果我們嘗試和已經下線的服務器(dead server)通訊,需要有兩分鐘的超時時間。如果使用多線程,即使有一兩個線程由于和響應很慢或者下線的服務器通訊而停住(stuck),我們還是可以繼續進行一些有用的事情。
我們不能簡單直觀的給每一個URL新建一個線程,因為由于(也是我們預想的)大多數鏈接是正確的,那么這樣做就會導致CPU或是網絡連接超負荷。因此,我們只會創建固定數量的線程,這些線程會從一個隊列里拿URL做檢查。
~~~
-- file: ch28/Check.hs
{-# LANGUAGE FlexibleContexts, GeneralizedNewtypeDeriving,
PatternGuards #-}
import Control.Concurrent (forkIO)
import Control.Concurrent.STM
import Control.Exception (catch, finally)
import Control.Monad.Error
import Control.Monad.State
import Data.Char (isControl)
import Data.List (nub)
import Network.URI
import Prelude hiding (catch)
import System.Console.GetOpt
import System.Environment (getArgs)
import System.Exit (ExitCode(..), exitWith)
import System.IO (hFlush, hPutStrLn, stderr, stdout)
import Text.Printf (printf)
import qualified Data.ByteString.Lazy.Char8 as B
import qualified Data.Set as S
-- 這里需要HTTP包, 它并不是GHC自帶的.
import Network.HTTP
type URL = B.ByteString
data Task = Check URL | Done
~~~
main 函數顯示了這個程序的主體腳手架(scaffolding)。
~~~
-- file: ch28/Check.hs
main :: IO ()
main = do
(files,k) <- parseArgs
let n = length files
-- count of broken links
badCount <- newTVarIO (0 :: Int)
-- for reporting broken links
badLinks <- newTChanIO
-- for sending jobs to workers
jobs <- newTChanIO
-- the number of workers currently running
workers <- newTVarIO k
-- one thread reports bad links to stdout
forkIO $ writeBadLinks badLinks
-- start worker threads
forkTimes k workers (worker badLinks jobs badCount)
-- read links from files, and enqueue them as jobs
stats <- execJob (mapM_ checkURLs files)
(JobState S.empty 0 jobs)
-- enqueue "please finish" messages
atomically $ replicateM_ k (writeTChan jobs Done)
waitFor workers
broken <- atomically $ readTVar badCount
printf fmt broken
(linksFound stats)
(S.size (linksSeen stats))
n
where
fmt = "Found %d broken links. " ++
"Checked %d links (%d unique) in %d files.\n"
~~~
當我們處于 IO monad時,可以使用 newTVarIO 函數新建一個 TVar 值。同樣的,也有類似的函數可以新建 TMVar 和 TChan 值。
在程序用了 printf 函數打印出最后的結果。和C語言里類似函數 printf 不同的是Haskell這個版本會在運行時檢查參數的個數以及其類型。
~~~
ghci> :m +Text.Printf
ghci> printf "%d and %d\n" (3::Int)
3 and *** Exception: Printf.printf: argument list ended prematurely
ghci> printf "%s and %d\n" "foo" (3::Int)
foo and 3
~~~
在 **ghci** 里試試 printf"%d"True ,看看會得到什么結果。
支持 main 函數的是幾個短小的函數。
~~~
-- file: ch28/Check.hs
modifyTVar_ :: TVar a -> (a -> a) -> STM ()
modifyTVar_ tv f = readTVar tv >>= writeTVar tv . f
forkTimes :: Int -> TVar Int -> IO () -> IO ()
forkTimes k alive act =
replicateM_ k . forkIO $
act
`finally`
(atomically $ modifyTVar_ alive (subtract 1))
~~~
forkTimes 函數新建特定數量的相同的工作線程,每當一個線程推出時,則”活動”線程的計數器相應的減一。我們使用 finally 組合器確保無論線程是如何終止的,都會減少”活動”線程的數量。
下一步, writeBadLinks 會把每個失效或者死亡(dead)的鏈接打印到 stdout 。
~~~
-- file: ch28/Check.hs
writeBadLinks :: TChan String -> IO ()
writeBadLinks c =
forever $
atomically (readTChan c) >>= putStrLn >> hFlush stdout
~~~
上面我們使用了 forever 組合器使一個操作永遠的執行。
~~~
ghci> :m +Control.Monad
ghci> :type forever
forever :: (Monad m) => m a -> m ()
~~~
waitFor 函數使用了 check , 當它的參數是 False 時會調用 retry 。
~~~
-- file: ch28/Check.hs
waitFor :: TVar Int -> IO ()
waitFor alive = atomically $ do
count <- readTVar alive
check (count == 0)
~~~
## 檢查一個鏈接
這個原生的函數實現了如何檢查一個鏈接的狀態。 代碼和 [第二十二章 Chapter 22, Extended Example: Web Client Programming] 里的 podcatcher 相似但有一點不同。
~~~
-- file: ch28/Check.hs
getStatus :: URI -> IO (Either String Int)
getStatus = chase (5 :: Int)
where
chase 0 _ = bail "too many redirects"
chase n u = do
resp <- getHead u
case resp of
Left err -> bail (show err)
Right r ->
case rspCode r of
(3,_,_) ->
case findHeader HdrLocation r of
Nothing -> bail (show r)
Just u' ->
case parseURI u' of
Nothing -> bail "bad URL"
Just url -> chase (n-1) url
(a,b,c) -> return . Right $ a * 100 + b * 10 + c
bail = return . Left
getHead :: URI -> IO (Result Response)
getHead uri = simpleHTTP Request { rqURI = uri,
rqMethod = HEAD,
rqHeaders = [],
rqBody = "" }
~~~
為了避免無盡的重定向相應,我們只允許固定次數的重定向請求。我們通過查看HTTP標準HEAD信息來確認鏈接的有效性, 比起一個完整的GET請求,這樣做可以減少網絡流量。
這個代碼是典型的”marching off the left of the screen”風格。正如之前我們提到的,需要謹慎使用這樣的風格。下面我們用 ErrorT monad transformer 和幾個通用一點的方法進行了重新實現,它看上去簡潔了很多。
~~~
-- file: ch28/Check.hs
getStatusE = runErrorT . chase (5 :: Int)
where
chase :: Int -> URI -> ErrorT String IO Int
chase 0 _ = throwError "too many redirects"
chase n u = do
r <- embedEither show =<< liftIO (getHead u)
case rspCode r of
(3,_,_) -> do
u' <- embedMaybe (show r) $ findHeader HdrLocation r
url <- embedMaybe "bad URL" $ parseURI u'
chase (n-1) url
(a,b,c) -> return $ a*100 + b*10 + c
-- Some handy embedding functions.
embedEither :: (MonadError e m) => (s -> e) -> Either s a -> m a
embedEither f = either (throwError . f) return
embedMaybe :: (MonadError e m) => e -> Maybe a -> m a
embedMaybe err = maybe (throwError err) return
~~~
## 工作者線程
每個工作者線程(Worker Thread)從一個共享隊列里拿一個任務,這個任務要么檢查鏈接有效性,要么讓線程推出。
~~~
-- file: ch28/Check.hs
worker :: TChan String -> TChan Task -> TVar Int -> IO ()
worker badLinks jobQueue badCount = loop
where
-- Consume jobs until we are told to exit.
loop = do
job <- atomically $ readTChan jobQueue
case job of
Done -> return ()
Check x -> checkOne (B.unpack x) >> loop
-- Check a single link.
checkOne url = case parseURI url of
Just uri -> do
code <- getStatus uri `catch` (return . Left . show)
case code of
Right 200 -> return ()
Right n -> report (show n)
Left err -> report err
_ -> report "invalid URL"
where report s = atomically $ do
modifyTVar_ badCount (+1)
writeTChan badLinks (url ++ " " ++ s)
~~~
## 查找鏈接
我們構造了基于 IO monad 的 狀態 monad transformer棧用于查找鏈接。這個狀態會記錄我們已經找到過的鏈接(避免重復)、鏈接的數量以及一個隊列,我們會把需要做檢查的鏈接放到這個隊列里。
~~~
-- file: ch28/Check.hs
data JobState = JobState { linksSeen :: S.Set URL,
linksFound :: Int,
linkQueue :: TChan Task }
newtype Job a = Job { runJob :: StateT JobState IO a }
deriving (Monad, MonadState JobState, MonadIO)
execJob :: Job a -> JobState -> IO JobState
execJob = execStateT . runJob
~~~
嚴格來說,對于對立運行的小型程序,我們并不需要用到 newtype ,然后我們還是將它作為一個好的編碼實踐的例子放在這里。(畢竟也只多了幾行代碼)
main 函數實現了對每個輸入文件調用一次 checkURLs 方法,所以 checkURLs 的參數就是單個文件。
~~~
-- file: ch28/Check.hs
checkURLs :: FilePath -> Job ()
checkURLs f = do
src <- liftIO $ B.readFile f
let urls = extractLinks src
filterM seenURI urls >>= sendJobs
updateStats (length urls)
updateStats :: Int -> Job ()
updateStats a = modify $ \s ->
s { linksFound = linksFound s + a }
-- | Add a link to the set we have seen.
insertURI :: URL -> Job ()
insertURI c = modify $ \s ->
s { linksSeen = S.insert c (linksSeen s) }
-- | If we have seen a link, return False. Otherwise, record that we
-- have seen it, and return True.
seenURI :: URL -> Job Bool
seenURI url = do
seen <- (not . S.member url) `liftM` gets linksSeen
insertURI url
return seen
sendJobs :: [URL] -> Job ()
sendJobs js = do
c <- gets linkQueue
liftIO . atomically $ mapM_ (writeTChan c . Check) js
~~~
extractLinks 函數并沒有嘗試去準確的去解析一個HTMP或是文本文件,而只是匹配那些看上去像URL的字符串。我們認為這樣做就夠了。
~~~
-- file: ch28/Check.hs
extractLinks :: B.ByteString -> [URL]
extractLinks = concatMap uris . B.lines
where uris s = filter looksOkay (B.splitWith isDelim s)
isDelim c = isControl c || c `elem` " <>\"{}|\\^[]`"
looksOkay s = http `B.isPrefixOf` s
http = B.pack "http:"
~~~
## 命令行的實現
我們使用了 System.Console.GetOpt 模塊來解析命令行參數。這個模塊提供了很多解析命令行參數的很有用的方法,不過使用起來稍微有點繁瑣。
~~~
-- file: ch28/Check.hs
data Flag = Help | N Int
deriving Eq
parseArgs :: IO ([String], Int)
parseArgs = do
argv <- getArgs
case parse argv of
([], files, []) -> return (nub files, 16)
(opts, files, [])
| Help `elem` opts -> help
| [N n] <- filter (/=Help) opts -> return (nub files, n)
(_,_,errs) -> die errs
where
parse argv = getOpt Permute options argv
header = "Usage: urlcheck [-h] [-n n] [file ...]"
info = usageInfo header options
dump = hPutStrLn stderr
die errs = dump (concat errs ++ info) >> exitWith (ExitFailure 1)
help = dump info >> exitWith ExitSuccess
~~~
getOpt 函數接受三個參數
>
> - 參數順序的定義。 它定義了選項(Option)是否可以和其他參數混淆使用(就是我們上面用到的 Permute )或者是選項必須出現在參數之前。
> - 選項的定義。 每個選項有這四個部分組成: 簡稱,全稱,選項的描述(比如是否接受參數) 以及用戶說明。
> - 參數和選項數組,類似于 getArgs 的返回值。
這個函數返回一個三元組,包括用戶輸入的選項,參數以及錯誤信息(如果有的話)。
我們使用 Flag 代數類型(Algebraic Data Type)表示程序所能接收的選項。
~~~
-- file: ch28/Check.hs
options :: [OptDescr Flag]
options = [ Option ['h'] ["help"] (NoArg Help)
"Show this help message",
Option ['n'] [] (ReqArg (\s -> N (read s)) "N")
"Number of concurrent connections (default 16)" ]
~~~
options 列表保存了每個程序能接收選項的描述。每個描述必須要生成一個 Flag 值。參考上面例子中是如何使用 NoArg 和 ReqArg 。 GetOpt 模塊的 ArgDescr 類型有很多構造函數(Constructors)。
~~~
-- file: ch28/GetOpt.hs
data ArgDescr a = NoArg a
| ReqArg (String -> a) String
| OptArg (Maybe String -> a) String
~~~
- NoArg 接受一個參數用來表示這個選項。在我們這個例子中,如果用戶在調用程序時輸入 -h 或者 --help , 我們就用 Help 值表示。
- ReqArg 的第一個函數作為參數,這個函數把用戶輸入的參數轉化成相應的值;第二個參數是用來說明的。 這里我們是將字符串轉換為數值(integer),然后再給類型 Flag 的構造函數 N 。
- OptArg 和 ReqArg 很相似,但它允許選項沒有對應的參數。
## 模式守衛 (Pattern guards)
函數 parseArgs 的定義里其實潛在了一個語言擴展(Language Extension), Pattern guards。用它可以寫出更加簡要的guard expressions. 它通過語言擴展 PatternGuards 來使用。
一個Pattern Guard有三個組成部分: 一個模式(Pattern), 一個 <- 符號以及一個表達式。表達式會被解釋然后和模式相匹配。 如果成功,在模式中定義的變量會被賦值。我們可以在一個guard里同時使用pattern guards和普通的 Bool guard expressions。
~~~
-- file: ch28/PatternGuard.hs
{-# LANGUAGE PatternGuards #-}
testme x xs | Just y <- lookup x xs, y > 3 = y
| otherwise = 0
~~~
在上面的例子中,當關鍵字 x 存在于alist xs 并且大于等于3,則返回它所對應的值。下面的定義實現了同樣的功能。
~~~
-- file: ch28/PatternGuard.hs
testme_noguards x xs = case lookup x xs of
Just y | y > 3 -> y
_ -> 0
~~~
Pattern guards 使得我們可以把一系列的guards和 case 表達式組合到單個guard,從而寫出更加簡潔并容易理解的guards。
## STM的實踐意義
至此我們還并未提及STM所提供的特別優越的地方。比如它在做組合(*composes*)方面就表現的很好:當需要向一個事務中增加邏輯時,只需要用到常見的函數 (>>=) 和 (>>) 。
組合的概念在構建模塊化軟件是顯得格外重要。如果我們把倆段都沒有問題的代碼組合在一起,也應該是能很好工作的。常規的線程編程技術無法實現組合,然而由于STM提供了一些很關鍵的前提,從而使在線程編程時使用組合變得可能。
STM monad防止了我們意外的非事務性的I/O。我們不再需要關心鎖的順序,因為代碼里根本沒有鎖機制。我們可以忘記丟失喚醒,因為不再有條件變量了。如果有異常發生,我們則可以用函數 catchSTM 捕捉到,或者是往上級傳遞。 最后,我們可以用 retry 和 orElse 以更加漂亮的方式組織代碼。
采用STM機制的代碼不會死鎖,但是導致饑餓還是有可能的。一個長事務導致另外一個事務不停的 retry 。為了解決這樣的問題,需要盡量的短事務并保持數據一致性。
## 合理的放棄控制權
無論是同步管理還是內存管理,經常會遇到保留控制權的情況:一些軟件需要對延時或是內存使用記錄有很強的保證,因此就必須花很多時間和精力去管理和調試顯式的代碼。然后對于軟件的大多數實際情況,垃圾回收(Garbage Collection)和STM已經做的足夠好了。
STM并不是一顆完美的靈丹妙藥。當我們選擇垃圾回收而不是顯式的內存管理, 我們是放棄了控制權從而獲得更加安全的代碼。 同樣的,當使用STM時,我們放棄了底層的細節,從而希望代碼可讀性更好,更加容易理解。
## 使用不變量
STM并不能消除某些類型的bug。比如,我們在一個 atomically 事務中從某個賬號中取錢,然后返回到 IO monad,然后在另一個 atomically 事務中把錢存到另一個賬號,那么代碼就會產生不一致性,因為會在某個特定時刻,這部分錢不會出現的任意一個賬號里。
~~~
-- file: ch28/GameInventory.hs
bogusTransfer qty fromBal toBal = do
fromQty <- atomically $ readTVar fromBal
-- window of inconsistency
toQty <- atomically $ readTVar toBal
atomically $ writeTVar fromBal (fromQty - qty)
-- window of inconsistency
atomically $ writeTVar toBal (toQty + qty)
bogusSale :: Item -> Gold -> Player -> Player -> IO ()
bogusSale item price buyer seller = do
atomically $ giveItem item (inventory seller) (inventory buyer)
bogusTransfer price (balance buyer) (balance seller)
~~~
在同步程序中,這類問題顯然很難而且不容易重現。比如上述例子中的不一致性問題通常只存在一段很短的時間內。在開發階段通常不會出現這類問題,而往往只有在負載很高的產品環境才有可能發生。
我們可以用函數 alwaysSucceeds 定義一個不變量,它是永遠為真的一個數據屬性。
~~~
ghci> :type alwaysSucceeds
alwaysSucceeds :: STM a -> STM ()
~~~
當創建一個不變量時,它馬上會被檢查。如果要失敗,那么這個不變量會拋出異常。更有意思的是,不變量會在經后每個事務完成時自動被檢查。如果在任何一個點上失敗,事務就會推出,不變量拋出的異常也會被傳遞下去。這就意味著當不變量的條件被違反時,我們就可以馬上得到反饋。
比如,下面兩個函數給本章開始時定義的游戲世界增加玩家
~~~
-- file: ch28/GameInventory.hs
newPlayer :: Gold -> HitPoint -> [Item] -> STM Player
newPlayer balance health inventory =
Player `liftM` newTVar balance
`ap` newTVar health
`ap` newTVar inventory
populateWorld :: STM [Player]
populateWorld = sequence [ newPlayer 20 20 [Wand, Banjo],
newPlayer 10 12 [Scroll] ]
~~~
下面的函數則返回了一個不變量,通過它我們可以保證整個游戲世界資金總是平衡的:即任何時候的資金總量和游戲建立時的總量是一樣的。
~~~
-- file: ch28/GameInventory.hs
consistentBalance :: [Player] -> STM (STM ())
consistentBalance players = do
initialTotal <- totalBalance
return $ do
curTotal <- totalBalance
when (curTotal /= initialTotal) $
error "inconsistent global balance"
where totalBalance = foldM addBalance 0 players
addBalance a b = (a+) `liftM` readTVar (balance b)
~~~
下面我們寫個函數來試驗下。
~~~
-- file: ch28/GameInventory.hs
tryBogusSale = do
players@(alice:bob:_) <- atomically populateWorld
atomically $ alwaysSucceeds =<< consistentBalance players
bogusSale Wand 5 alice bob
~~~
由于在函數 bogusTransfer 中不正確地使用了 atomically 而會導致不一致性, 當我們在 **ghci** 里運行這個方法時則會檢測到這個不一致性。
~~~
ghci> tryBogusSale
*** Exception: inconsistent global balance
~~~
- Real World Haskell 中文版
- 第一章:入門
- 第二章:類型和函數
- 第三章:Defining Types, Streamlining Functions
- 第四章:函數式編程
- 第五章:編寫 JSON 庫
- 第六章:類型類
- 第七章:I/O
- 第八章:高效文件處理、正則表達式、文件名匹配
- 第九章:I/O學習 —— 構建一個用于搜索文件系統的庫
- 第十章:代碼案例學習:解析二進制數據格式
- 第十一章:測試和質量保障
- 第十三章:數據結構
- 第十八章: Monad變換器
- 第十九章: 錯誤處理
- 第二十章:使用 Haskell 進行系統編程
- 第二十一章:數據庫的使用
- 第二十二章:擴展示例 —— Web 客戶端編程
- 第二七章:Socket 和 Syslog
- 第二十八章:軟件事務內存 (STM)
- 翻譯約定