ScalaのアクターモデルでMapReduce処理を書いてみる
今回の元ネタはこちら(´・ω・)っ
オブジェクト指向プログラマが次に読む本 ?Scalaで学ぶ関数脳入門
- 作者: 株式会社テクノロジックアート,長瀬嘉秀,町田修一
- 出版社/メーカー: 技術評論社
- 発売日: 2010/11/13
- メディア: 単行本(ソフトカバー)
- 購入: 11人 クリック: 340回
- この商品を含むブログ (31件) を見る
これまで、Scalaの本として・・・
・・・の二冊を読んできました
前者は洋書の翻訳の上ちょっと難しめで、
後者は和書なのですが「一応Scalaが書ける」ってレベルで、
関数型の処理にはあまり踏み込んでません(´・ω・`)
しかし、今回の本はタイトル通り「Scalaで関数型を学ぶ」という本で、
超良書である「ふつうのHaskellプログラミング」ほどではないにせよ、
なかなかよろしい本です
一方で、細かい文法には踏み込んでおらず、ファイルの入出力すら書いてませんΣ(・ω・ノ)ノ
必要な文法の説明はあるので、理解するのは十分なのですが、
そのあたりが「ふつうのHaskell」に似ています
そんなこの本の中で、一番私にHITしたのが、
「ScalaのアクターモデルでMapReduce処理をやる」というところで、
個人的にはまさに目から鱗だったのです( ゚Д゚)y \_ ポロッ
そもそも、Scalaを「なんとなく理解」はしていても、
実際にコードを書く気になれなかったのは、
「具体的に何を書いていいのかわからなかった」というのがあります
何かのparserでも書けばいいんだろうな・・・と思いつつ、
それならRubyで書けば早いしな・・・と思ってしまうわけですが、
「アクターモデルでMapReduce」というのをRubyで書くのは面倒です
これでスイッチが入った私は、以前Rubyで書いた、
MapReduce「風」の処理でレコメンドというコードを
Scalaのアクターモデルで書き直してみることにしました...φ(・ω・`)
MapReduce風の手順でレコメンドエンジンを作る - ぱろっと・すたじお
前回は半日もかからずIndexを呼び出す側まで書けたのですが、
今回はほぼ初めてScalaに触ったのもあり、
Indexを作るところまでで1日以上かかりました
Javaを知っているので楽・・・と思いきや、
今回のコードでJavaが絡むのは「ファイル出力」だけで、
あまりJavaは関係なかったりΣ(・ω・ノ)ノ
具体的なコード
https://gist.github.com/812122
そんなに長くはないですが、ここに貼り付けられるほど短くもないので、
初めてGistを使いました
大雑把な構造としては・・・
-
- 同期処理を順番に進めていくExecutor
- ログのparseをするMapActor
- ユーザごとの情報ベクトルをまとめるReduceActor
- それぞれのActorを複数管理し、Executorの窓口になるMasterActor
・・・こんな感じです
各Actorがメッセージを送り合うことで、
非同期でかつ完全に分離された状態で処理を進められるわけです
引っかかった点をいくつか(´・ω・)っ
非同期処理の終わりを判断する
Executorが全てのログを送り終わったとしても、
MapActorが全ての処理を終えているかはわかりません
そこで、「これで終わりだよ」というメッセージを、
MasterActor経由で全てのMapActorに送り、
「終わった」というメッセージを送り返してもらうようにしました
Actorの「メールボックス」は順番に処理されるので、
「これで終わり」というメッセージを処理するときには、
届いたログを処理し終わってるはず・・・というわけです
caseに渡す正規表現
Scalaのmatch-caseにもRuby同様正規表現が使えるのですが、
parser.findFirstIn(line)で期待した結果が返ってくるのに、
caseに渡すとmatchしない、という問題にぶつかりました
これをTwitterに流したところ、@yasushiaさんから、
「caseの場合は全体がmatchしないとダメ」と教えていただきまして、
前後に「.*」をつけることで解決しましたヽ(`・ω・´)ノ
関数型としての再帰処理
せっかく関数型の言語で書くので、
手続き型のループ処理を書きたくなかったので、
コレクションの処理部分に結構時間がかかりました
たぶん、Rubyで書けば30秒で書ける程度の話なのですが、
ここのパラダイムを乗り越えないと、
Scalaを使う意味がないかな・・・と思いまして
各クラスのdivideメソッドがその苦労の跡なのですが、
一応再帰のみでロジックが書けました(`・ω・´)
ただ、MapReduceのShuffleにあたるコレクションの処理に問題がありまして、
テスト用の数万行程度のログならこれで通るのですが、
本番データである400MB(意味のあるデータは40MBくらい)だとフローします
結局これが解決できなかったわけですが、
MapReduceとしては処理単位が大きすぎるのが原因と思われるので、
全体を二つのMapReduce系に分けるくらいでいいのかもしれません
単純にロジックが(゚д゚)マズーな気もしますが・・・
mutableなコレクション
Scalaは関数型ではあるのですが、Java由来の手続き型的な仕組みも持っています
なので、ListBufferのような、mutableのコレクションもあります
これを完全に排除できれば「純関数型」と言えるのでしょうが、
それを排除するために大量のコードを書くよりも、
素直に格納しちゃった方が早いのかな・・・と(´・ω・`)
そのあたりの柔軟性がScalaの利点だと思いますが、
できるだけ限定するようにしたいところですね
(今回はActorからの返信を格納するところだけ使用)
あと、さっきのメモリ問題のように、
オブジェクトを使い捨てにすると無駄ですし、
heapを使い切ってまずい場合もあります
JavaでStringの加算をするならStringBuffer/StringBuilderを使うってのと同じで、
状況次第ではListBufferのようなものも使っていっていいのではと思います
(Javaの似たクラスより安全で高機能ですし)
なお、こういったmutableなコレクションはデフォルトでは読み込まれず、
importで明示する必要があるので、利用している箇所の特定はしやすいはずです
というわけで、今回はScalaでレコメンドのIndex生成処理を書き直してみたわけですが、
処理速度でいうとScalaの方が相当速いですΣ(゚Д゚;≡;゚д゚)
スタートアップの時間があるとはいえ、
コンパイル後に実行するなら気にならないレベルです
(その代わり、メモリの問題が発生しましたが・・・)
一方、アクターモデルの本家はErlangなわけで、
Erlangだとどうなるのか・・・というのも気になります
以前Erlangの本を読んだときには正直さっぱりでしたが、
今ならある程度理解できるような気がするので、
「ふつうのHaskell」と一緒に読み返してみようかなと
あとは、何か具体的にサービス的なものを・・・(´-ω-)