ぱろっと・すたじお

技術メモなどをまったりと / my site : http://parrot-studio.com/

ScalaのアクターモデルでMapReduce処理を書いてみる

今回の元ネタはこちら(´・ω・)っ

オブジェクト指向プログラマが次に読む本 ?Scalaで学ぶ関数脳入門

オブジェクト指向プログラマが次に読む本 ?Scalaで学ぶ関数脳入門

これまで、Scalaの本として・・・

・・・の二冊を読んできました


前者は洋書の翻訳の上ちょっと難しめで、
後者は和書なのですが「一応Scalaが書ける」ってレベルで、
関数型の処理にはあまり踏み込んでません(´・ω・`)


しかし、今回の本はタイトル通り「Scalaで関数型を学ぶ」という本で、
超良書である「ふつうのHaskellプログラミング」ほどではないにせよ、
なかなかよろしい本です


一方で、細かい文法には踏み込んでおらず、ファイルの入出力すら書いてませんΣ(・ω・ノ)ノ
必要な文法の説明はあるので、理解するのは十分なのですが、
そのあたりが「ふつうのHaskell」に似ています


そんなこの本の中で、一番私にHITしたのが、
「ScalaのアクターモデルでMapReduce処理をやる」というところで、
個人的にはまさに目から鱗だったのです( ゚Д゚)y \_ ポロッ


そもそも、Scalaを「なんとなく理解」はしていても、
実際にコードを書く気になれなかったのは、
「具体的に何を書いていいのかわからなかった」というのがあります


何かのparserでも書けばいいんだろうな・・・と思いつつ、
それならRubyで書けば早いしな・・・と思ってしまうわけですが、
アクターモデルでMapReduce」というのをRubyで書くのは面倒です


これでスイッチが入った私は、以前Rubyで書いた、
MapReduce「風」の処理でレコメンドというコードを
Scalaのアクターモデルで書き直してみることにしました...φ(・ω・`)


MapReduce風の手順でレコメンドエンジンを作る - ぱろっと・すたじお


前回は半日もかからずIndexを呼び出す側まで書けたのですが、
今回はほぼ初めてScalaに触ったのもあり、
Indexを作るところまでで1日以上かかりました


Javaを知っているので楽・・・と思いきや、
今回のコードでJavaが絡むのは「ファイル出力」だけで、
あまりJavaは関係なかったりΣ(・ω・ノ)ノ

具体的なコード


https://gist.github.com/812122


そんなに長くはないですが、ここに貼り付けられるほど短くもないので、
初めてGistを使いました


大雑把な構造としては・・・

    • 同期処理を順番に進めていくExecutor
    • ログのparseをするMapActor
    • ユーザごとの情報ベクトルをまとめるReduceActor
    • それぞれのActorを複数管理し、Executorの窓口になるMasterActor


・・・こんな感じです
各Actorがメッセージを送り合うことで、
非同期でかつ完全に分離された状態で処理を進められるわけです


引っかかった点をいくつか(´・ω・)っ

非同期処理の終わりを判断する


Executorが全てのログを送り終わったとしても、
MapActorが全ての処理を終えているかはわかりません


そこで、「これで終わりだよ」というメッセージを、
MasterActor経由で全てのMapActorに送り、
「終わった」というメッセージを送り返してもらうようにしました


Actorの「メールボックス」は順番に処理されるので、
「これで終わり」というメッセージを処理するときには、
届いたログを処理し終わってるはず・・・というわけです

caseに渡す正規表現


Scalaのmatch-caseにもRuby同様正規表現が使えるのですが、
parser.findFirstIn(line)で期待した結果が返ってくるのに、
caseに渡すとmatchしない、という問題にぶつかりました


これをTwitterに流したところ、@さんから、
「caseの場合は全体がmatchしないとダメ」と教えていただきまして、
前後に「.*」をつけることで解決しましたヽ(`・ω・´)ノ

関数型としての再帰処理


せっかく関数型の言語で書くので、
手続き型のループ処理を書きたくなかったので、
コレクションの処理部分に結構時間がかかりました


たぶん、Rubyで書けば30秒で書ける程度の話なのですが、
ここのパラダイムを乗り越えないと、
Scalaを使う意味がないかな・・・と思いまして


各クラスのdivideメソッドがその苦労の跡なのですが、
一応再帰のみでロジックが書けました(`・ω・´)


ただ、MapReduceのShuffleにあたるコレクションの処理に問題がありまして、
テスト用の数万行程度のログならこれで通るのですが、
本番データである400MB(意味のあるデータは40MBくらい)だとフローします


結局これが解決できなかったわけですが、
MapReduceとしては処理単位が大きすぎるのが原因と思われるので、
全体を二つのMapReduce系に分けるくらいでいいのかもしれません


単純にロジックが(゚д゚)マズーな気もしますが・・・

mutableなコレクション


Scalaは関数型ではあるのですが、Java由来の手続き型的な仕組みも持っています
なので、ListBufferのような、mutableのコレクションもあります


これを完全に排除できれば「純関数型」と言えるのでしょうが、
それを排除するために大量のコードを書くよりも、
素直に格納しちゃった方が早いのかな・・・と(´・ω・`)


そのあたりの柔軟性がScalaの利点だと思いますが、
できるだけ限定するようにしたいところですね
(今回はActorからの返信を格納するところだけ使用)


あと、さっきのメモリ問題のように、
オブジェクトを使い捨てにすると無駄ですし、
heapを使い切ってまずい場合もあります


JavaでStringの加算をするならStringBuffer/StringBuilderを使うってのと同じで、
状況次第ではListBufferのようなものも使っていっていいのではと思います
Javaの似たクラスより安全で高機能ですし)


なお、こういったmutableなコレクションはデフォルトでは読み込まれず、
importで明示する必要があるので、利用している箇所の特定はしやすいはずです




というわけで、今回はScalaでレコメンドのIndex生成処理を書き直してみたわけですが、
処理速度でいうとScalaの方が相当速いですΣ(゚Д゚;≡;゚д゚)


スタートアップの時間があるとはいえ、
コンパイル後に実行するなら気にならないレベルです
(その代わり、メモリの問題が発生しましたが・・・)


一方、アクターモデルの本家はErlangなわけで、
Erlangだとどうなるのか・・・というのも気になります


以前Erlangの本を読んだときには正直さっぱりでしたが、
今ならある程度理解できるような気がするので、
「ふつうのHaskell」と一緒に読み返してみようかなと


あとは、何か具体的にサービス的なものを・・・(´-ω-)