Skip to content

HaskellのConcurrentについて調べてまとめる (MVar編)

Posted on:2015年7月22日

どうもこんにちは.

前回(Haskell の Concurrent について調べてまとめる (IORef 編) - プログラミングのメモ帳 ➚)の続きです.

今回はスレッド間協調のためにMVarを使う方法について調べたので, まとめたいと思います.

MVar

Haskell にかかわらず, 最近の並行処理はメッセージパッシングでやれみたいなのが流行ってますね (Scala の Akka や golang の chan など).

MVarは Haskell における, 容量 1 のメッセージボックスのようなものです. MVarを使うことで, スレッド間でメッセージのやり取りを協調的に行うことができます.

複数のスレッドが1つのMVarに対して, メッセージを入れたり取り出したりすることでスレッド間協調を行います.

基本となる API はこのような感じ

newEmptyMVar :: IO (MVar a)
newMVar :: a -> IO (MVar a)
takeMVar :: MVar a -> IO a
putMVar :: MVar a -> a -> IO ()
readMVar :: MVar a -> IO a

型を見ればなんとなく使い方もわかる気がしますね.

MVarを作るにはnewEmptyMVarnewMVarを使用します. newEmptyMVarは空のメッセージボックスを作り, newMVarは第一引数を初期値としてもつメッセージボックスを作ります.

MVarにメッセージを格納するには, putMVarを使います. putMVar mvar msg で, msgmvarに格納します.

この際, もしMVarにすでにメッセージが格納されている場合, MVarは容量 1 のボックスなので, putMVarがブロックされます. 他のスレッドがMVarからメッセージを取り出して空にするまで待ってから, メッセージを格納します.

一方, MVarからメッセージを読み取るには, takeMVarreadMVarを使用します.

takeMVarはメッセージを読み取り, そのMVarを空にします. readMVarはメッセージを読み取りますが, MVarの中のメッセージはそのまま残します.

ここで, putの時と同様に, takeMVarreadMVarMVarにメッセージが格納されていなかった場合, 他のスレッドがMVarにメッセージを格納するまでブロックします.

というわけで簡単なサンプルコード

module Main where

import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.MVar

main :: IO ()
main = do
    mvar <- newEmptyMVar
    forkIO $ do
        msg <- takeMVar mvar
        putStrLn $ "recv: " ++ msg
        threadDelay $ 1 * 10 ^ 6
        putMVar mvar "B"
    putStrLn "sleep 1"
    threadDelay $ 1 * 10 ^ 6
    putStrLn "wake up"
    putMVar mvar "A"
    takeMVar mvar >>= print

実行結果

sleep 1
wake up
recv: A
"B"

確かにメッセージが格納されるまで takeMVarがブロックしていることがわかります

共有変数としての MVar

さて, MVarにはもうひとつの使い方があります. 共有変数としてのMVarです.

MVarの特徴として, 誰かがtakeしてからputするまでの間は, 他のスレッドはだれもMVarの中身に触れないという点が挙げられます.

main = do
    mvar <- newMVar 0
    forkIO $ do
        val <- takeMVar mvar
        -- 他のスレッドはMVarの中身に触れない
        putMVar mvar $ val + 1
    ...

この特徴はまさにロックの特徴といえます. ロックを取得し解放するまでは, 他のスレッドは同じロックで保護された区間にははいれません.

というわけでMVarは型レベルでロックがついた共有変数とみなすことができますね!(このへんは Rust の Mutex に似た空気を感じます. どちらも型レベルでロックとそれが保護する中身がつながっています)

型レベルでロックがくっついているので, 中身にアクセスするには必ずロックをとる(takeMVar)必要があり, ロックの取得忘れがありません.

さらに, Haskell は基本的に破壊的操作があまり登場しない言語であることもこのMVarロックにプラスに働きます.

例えば, 連想配列をスレッド間で共有することを考えます. また, ここでは連想配列の実装として, hashtable ではなくData.Mapを使用するとします(Data.Mapは immutable な構造になっていて, lookup は O(log n)ですが, immutable なので Haskell 上で扱いやすいというメリットがあります).

Data.Mapは immutable なので, 一度MVarから取得してしまえばそれ以降変更される可能性もないため, ロックを保持し続ける必要がありません. そこで, 単なる読み込みの場合は, takeMVarしてすぐにputMVarで戻すだとか, readMVarで読み込むだけにすることで, ロックの粒度を小さくできます.

MVarの中身を書き換えたい場合は, 単純にロックを取得し, 書き換え後の値をputMVarします.

module Main where

import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.MVar
import qualified Data.Map.Strict as M

main :: IO ()
main = do
    mvar <- newMVar M.empty
    forkIO $ do
        table <- takeMVar mvar
        putMVar mvar table
        -- tableを使用する操作
    forkIO $ do
         table <- readMVar mvar
         -- tableを使用する操作
    forkIO $ do
        table <- takeMVar mvar
        -- tableを変更する操作
        let newTable = ...
        putMVar mvar newTable

このようにMVarと immutable なデータ構造を組み合わせることで, 粒度の小さいロックを実現することができます.

一方, MVarと mutable なデータ構造(IORefなど)を組み合わせる場合は, たとえ読み込みしかしない場合であっても操作が終わるまではロックを保持しておく必要があることに注意しなければなりません (IORefには前回紹介したようにatomicModifyIORefがあるのでなかなかこういう状況は起こりませんね)

また, Rust の Mutex と違い, MVarによるロックの模倣(?)はロックの解放を自動的には行いません. したがって例外が送出された場合にロックを開放し忘れるケースがあるので, 注意が必要です.

一旦まとめ

というわけで今回はMVarについて紹介しました. MVarでロックを実現する方に関しては, 散々言われているロックの問題点をそのまま持ってきてしまうのであまり使えないかもしれませんね…

MVarは容量 1 のメッセージボックスでしたが, Haskell にはChanというものもあります. こちらは golang の chan にかなり近いもので, 容量の制限がないキューのように働かせることができます. Chanのよみとり専用のスレッドを1つ立てておき, 他の複数のスレッドがタスクをChanに書き込んでいくといったユースケースが考えられますね. こっちのほうが便利そうな気がしてきました.

ロックはいろいろ厄介で, デッドロックとか解放忘れとかの問題がついて回ります. それを解決する1つの方法としてSTMがあるようなので, 次はそれについて調べてみようと思います.