2011年3月6日日曜日

Hadoop による分散データ処理: 第 3 回 アプリケーション開発

今回は Ruby 言語で MapReduce アプリケーションを開発します。Ruby を選んだ理由は、第 1 に、この言語は開発者が知っていなければならない優れたオブジェクト指向のスクリプト言語であること、そして第 2 に、「参考文献」セクションで紹介するように Java™ 言語と Python 言語を使用する場合のチュートリアルとしては、数多くの資料があるからです。MapReduce プログラミングについて詳しく探るこの記事では、ストリーミング・アプリケーション・プログラミング・インターフェース (API) についても紹介します。この API は、Java 以外の言語でアプリケーションを開発する手段となります。

まずは、map と reduce について (関数の観点から) 簡単に紹介したいと思います。その後、Hadoop プログラミング・モデルとそのアーキテクチャーについて詳しく見ていき、作業を切り分けて分配し、管理する要素について詳しく探っていきます。

map と reduce の起源

MapReduce プログラミング・パラダイムが生まれるきっかけとなった関数要素は何でしょうか。1958年、John McCarthy は Lisp という言語を発明しました。この言語は数値計算と記号計算の両方を実装しましたが、その実装形式は、今日使われているほとんどの言語にとって異質な再帰形式です (Wikipedia (英語版) では、Lisp の実に興味深い歴史を紹介しています。ここには参考になるチュートリアルも記載されているので、時間を割いて読むだけの価値はあります)。Lisp は IBM® 704 で最初に実現されました。IBM 704 は世界で初めて大量生産されたコンピューターであり、かつて好んで使われていたもう 1 つの言語、FORTRAN もサポートしていました。

Lisp のような関数型言語を起源としながらも、今では関数型言語以外の多くの言語で使用されている map 関数は、要素のリストに適用される関数です。これが何を意味するかを理解するには、リスト 1 を見てください。これは、Lisp から派生した SCSH (Scheme Shell) で解釈されたセッションです。最初の行が定義する square という関数は引数を 1 つ取って、その平方根を出力します。その次の行では、map 関数が使用されています。ここに示されているように、map では関数と、その関数を適用する要素のリストを指定します。その結果、二乗された要素が含まれる新しいリストが生成されます。


リスト 1. SCSH での map 関数のデモ
> (define square (lambda (x) (* x x))) > (map square '(1 3 5 7)) '(1 9 25 49) > 

リダクションも同じくリストに適用されますが、通常はリストを 1 つのスカラー値にまとめます。リスト 2 に記載する例に、リストをスカラーにまとめる別の SCSH 関数を示します。この例の場合、値のリストを (1 + (2 + (3 + (4 + (5))))) の形で加算します。これは、繰り返し処理ではなく、再帰に依存する従来の関数型プログラミングであることに注意してください。


リスト 2. SCSH でのリダクションのデモ
> (define (list-sum lis) (if (null? lis) 0 (+ (car lis) (list-sum (cdr lis))))) > (list-sum '(1 2 3 4 5)) 15 >  

興味深い点として、命令型言語では再帰が内部で繰り返し処理に変換されるため、再帰の効率性は繰り返し処理の場合と変わりません。

Hadoop のプログラミング・モデル

Google では、大量のデータ・セットを処理または生成するためのプログラミング・モデルとして、MapReduce の概念を導入しました。規範モデルでは、map 関数がキーと値のペアを処理して、キーと値のペアの中間データを生成します。このキーと値のペアの中間データを reduce 関数が処理し、値をそれに関連付けられたキーに対してマージします (図 1 を参照)。入力データは、クラスター内の複数のマシンに分配して並列処理できるような形で分割されます。生成された中間データも同じように並列処理されるため、膨大なデータを処理するには理想的な方法となっています。


図 1. MapReduce 処理の概略図
 

簡単な復習として、図 1 のアーキテクチャーを見てください。この図には、map と reduce の観点から見たワード・カウントのアーキテクチャーを示しています (この記事では MapReduce アプリケーションを開発するためです)。入力データが (Hadoop ファイルシステム (HDFS) に) 提供されると、まず初めに分割され、それから (ジョブ・トラッカーにより) map ワーカーに分配されます。図 2 の例で分割しているセンテンスは短いものですが、一般に作業の量を分割する単位は 128MB となっています。その理由は、128MB であれば作業をセットアップする時間が少なくて済み、処理する作業が多くてもセットアップのオーバーヘッドを最小限に抑えられるためです。(規範的な例での) map ワーカーは作業を個々のベクトルに分割し、それぞれにトークン化された単語と初期値 (この例では 1) を含めます。map タスクが (タスク・トラッカーによる Hadoop での定義に従って) 完了すると、作業は reduce ワーカーに渡されます。reduce ワーカーはキーを固有のセットにまとめ、検出したキーの数を表す値を設定します。


図 2. 単純な MapReduce の例
 

このプロセスは同じ 1 つのマシンで行われることもあれば、異なる複数のマシンで行われることもあります。あるいはさまざまなパーティションのデータを使用して順次行われることや、並列して行われることもありますが、いずれにしても結果は同じです。

(ワード・カウントを使用して検索索引を生成する場合の) この規範的な例は、Hadoop の 1 つの捉え方でしかありません。この後わかるように、このコンピューティング・モデルは、数々のコンピューティング関連の問題に広範に適用することができます。

Hadoop の柔軟性

図 2 に記載した単純な例での基本要素は、map プロセスと reduce プロセスの 2 つです。この 2 つのプロセスが機能する仕組みについては従来からの見解がありますが、map および reduce がこの見解に従って振る舞うことはアーキテクチャーの要件ではありません。Hadoop の真の威力は、特定のアプリケーションを解決するように振る舞う map プロセスおよび reduce プロセスを実装できるという、その柔軟性にあります。ワード・カウントの例は多数の問題で参考になったり、適用したりできますが、この汎用フレームワークには他のモデルもぴったりと適合します。必要なことは唯一、map および reduce プロセスを Hadoop に対して可視にする MapReduce アプリケーションを開発することだけです。

これまで Hadoop は、ニューラル・ネットワーク、サポート・べクター・マシン、K 平均法クラスタリングといった多様なアルゴリズムを実装する機械学習アプリケーションに使われてきました (詳細については、「参考文献」セクションを参照)。

データ・ストリーミング

Hadoop は Java ベースのフレームワークですが、Java 以外の言語で MapReduce アプリケーションを作成することもできます。これを可能にするのはストリーミングです。Hadoop 内での streaming ユーティリティーは、ある種のデータ・フロー接続機能を実装します。streaming ユーティリティーでは、ユーザー独自の map および reduce 実行可能ファイル (それぞれ標準入力 [stdin] から入力を取り、標準出力 [stdout] に出力を生成) を定義することができます。すると、streaming ユーティリティーがファイルに応じてデータの読み取り、書き込みを行い、必要に応じてアプリケーションを呼び出します (リスト 3 を参照)。


リスト 3. Hadoop streaming ユーティリティーを使用する
hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ 	-input inputData 	-output outputData 	-mapper map_exec 	-reducer reduce_exec 

リスト 3 には、Hadoop 内での streaming ユーティリティーの使用方法が示されています。図 3 に、このフローがどのように定義されているかを図解します。これは単純なストリーミングの使用例であることに注意してください。ストリーミングでは数々のオプションを使用することができます。例えば、データを構文解析する方法やイメージを呼び出す方法を調整するオプション、パーティショナー (partitioner) またはコンバイナー (combiner) の置換イメージを指定するオプション、さらに構成を調整するオプションなど、さまざまなオプションが使用できるようになっています (詳細については、「参考文献」セクションを参照)。


図 3. ストリーミングの例の図解
 

Ruby を使用した例

streaming ユーティリティーの基本を理解したところで、これから単純な Ruby MapReduce アプリケーションを作成して、Hadoop フレームワーク内で map プロセスと reduce プロセスを使用する方法を見ていきます。ここで作成するサンプル・アプリケーションは規範的な MapReduce アプリケーションですが、その他のアプリケーションについても後で紹介します (それと同時に、MapReduce 形式でアプリケーションを実装する方法も説明します)。

まずはマッパーから取り掛かります。このスクリプトは stdin からテキスト入力を取り、それをトークン化してから、キーと値のペアを stdout に出力します。ほとんどのオブジェクト指向のスクリプト言語と同じく、このタスクは単純過ぎると言ってもよいほどです。リスト 4 に、このマッパーのスクリプトを記載します (コメントとホワイト・スペースを追加しているため、多少サイズが大きくなっています)。このプログラムは、stdin から行を読み取るためのイテレーター、そして行を個別のトークンに分割するための別のイテレーターを使用します。分割された各トークン (単語) は、その後にタブで区切られた関連する値 1 と一緒に stdout に出力されます。


リスト 4. Ruby の map スクリプト (map.rb)
#!/usr/bin/env ruby  # Our input comes from STDIN STDIN.each_line do |line|    # Iterate over the line, splitting the words from the line and emitting   # as the word with a count of 1.   line.split.each do |word|     puts "#{word}\t1"   end  end 

次に、reduce アプリケーションを見てください。このアプリケーションのほうが少し複雑ですが、Ruby のハッシュ (連想配列) を使用してリダクション操作を単純にしています (リスト 5 を参照)。このスクリプトも同じく (streaming ユーティリティーによって渡された) stdin からの入力データで機能し、行を単語と値に分割します。それに続き、ハッシュにその単語が含まれているかどうかをチェックし、該当する単語を検出した場合には、その要素にカウントを追加します。単語が存在しない場合、ハッシュ内にその単語の新しいエントリーを作成してから、カウントをロードします (カウントは、マッパー・プロセスでの値 1 でなければなりません)。すべての入力が処理された後は、ただ単にハッシュを繰り返し処理し、キーと値のペアを stdoutに出力すればよいのです。


リスト 5. Ruby の reduce スクリプト (reduce.rb)
#!/usr/bin/env ruby  # Create an empty word hash wordhash = {}  # Our input comes from STDIN, operating on each line STDIN.each_line do |line|    # Each line will represent a word and count   word, count = line.strip.split    # If we have the word in the hash, add the count to it, otherwise   # create a new one.   if wordhash.has_key?(word)     wordhash[word] += count.to_i   else     wordhash[word] = count.to_i   end  end  # Iterate through and emit the word counters wordhash.each {|record, count| puts "#{record}\t#{count}"} 

map スクリプトと reduce スクリプトが完成したところで、コマンドラインからこれらのスクリプトをテストします。テストの前には忘れずに chmod +x を使って、これらのファイルを実行可能ファイルに変更してください。テストは、入力ファイルを生成するところから始めます (リスト 6 を参照)


リスト 6. 入力ファイルを生成する
# echo "Hadoop is an implementation of the map reduce framework for " \ 	"distributed processing of large data sets." > input # 

この入力ファイルが生成されれば、早速マッパーのスクリプトをテストすることができます (リスト 7 を参照)。前にも説明したように、このスクリプトは入力ファイルをキーと値のペアにトークン化するにすぎません。ここで、各値は 1 (一意ではない入力) になります。


リスト 7. マッパーのスクリプトをテストする
# cat input | ruby map.rb Hadoop	1 is	1 an	1 implementation	1 of	1 the	1 map	1 reduce	1 framework	1 for	1 distributed	1 processing	1 of	1 large	1 data	1 sets.	1 # 

今のところ、すべて順調に進んでいます。そこで今度は、このアプリケーションごと元のストリーミング形式にします (Linux® のパイプを使用)。リスト 8 では、map スクリプトによって入力を渡し、(オプションのステップで) その出力をソートしてから、生成された中間データをリデューサーのスクリプトによって渡しています。


リスト 8. Linux のパイプを使用した単純な MapReduce
# cat input | ruby map.rb | sort | ruby reduce.rb large	1 of	2 framework	1 distributed	1 data	1 an	1 the	1 reduce	1 map	1 sets.	1 Hadoop	1 implementation	1 for	1 processing	1 is	1 #  

Hadoop での Ruby の使用

map スクリプトと reduce スクリプトがシェル環境で正常に動作することがわかったので、今度は Hadoop でスクリプトをテストします。ここでは、Hadoop のセットアップ・タスクは省略します (Hadoop を稼働状態にするには、この連載の第 1 回または第 2 回を参照してください)。

最初のステップでは、入力データ用の入力ディレクトリーを HDFS 内に作成した上で、スクリプトのテストを行うためのサンプル・ファイルを用意します。リスト 9 に、このステップを説明します (これらのステップについての詳細は、第 1 回または第 2 回を参照してください)。


リスト 9. MapReduce プロセスに使用する入力データを作成する
# hadoop fs -mkdir input # hadoop dfs -put /usr/src/linux-source-2.6.27/Documentation/memory-barriers.txt input # hadoop fs -ls input Found 1 items -rw-r--r--  1 root supergroup  78031 2010-06-04 17:36 /user/root/input/memory-barriers.txt #  

次に、streaming ユーティリティーを使用して入力データと出力のロケーションを指定し、カスタム・スクリプトを使って Hadoop を起動します (リスト 10 を参照)。注意する点として、この例で使用している -file オプションは、単に Hadoop に対し、Ruby スクリプトをジョブ・サブミッションの一部としてパッケージ化するように指示しているだけです。


リスト 10. カスタム Ruby MapReduce スクリプトで Hadoop ストリーミングを使用する
# hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-0.20.2+228-streaming.jar \   -file /home/mtj/ruby/map.rb -mapper /home/mtj/ruby/map.rb \   -file /home/mtj/ruby/reduce.rb -reducer /home/mtj/ruby/reduce.rb \   -input input/* -output output packageJobJar: [/home/mtj/ruby/map.rb, /home/mtj/ruby/reduce.rb, /var/lib/hadoop-0.20/... 10/06/04 17:42:38 INFO mapred.FileInputFormat: Total input paths to process : 1 10/06/04 17:42:39 INFO streaming.StreamJob: getLocalDirs(): [/var/lib/hadoop-0.20/... 10/06/04 17:42:39 INFO streaming.StreamJob: Running job: job_201006041053_0001 10/06/04 17:42:39 INFO streaming.StreamJob: To kill this job, run: 10/06/04 17:42:39 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job ... 10/06/04 17:42:39 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/... 10/06/04 17:42:40 INFO streaming.StreamJob:  map 0%  reduce 0% 10/06/04 17:43:17 INFO streaming.StreamJob:  map 100%  reduce 0% 10/06/04 17:43:26 INFO streaming.StreamJob:  map 100%  reduce 100% 10/06/04 17:43:29 INFO streaming.StreamJob: Job complete: job_201006041053_0001 10/06/04 17:43:29 INFO streaming.StreamJob: Output: output #  

最後に、hadoop ユーティリティーからファイルシステム操作 cat を使用して出力を調べます (リスト 11 を参照)。


リスト 11. Hadoop の出力を調べる
# hadoop fs -ls /user/root/output Found 2 items drwxr-xr-x  - root supergroup      0 2010-06-04 17:42 /user/root/output/_logs -rw-r--r--  1 root supergroup  23014 2010-06-04 17:43 /user/root/output/part-00000 # hadoop fs -cat /user/root/output/part-00000 | head -12 +--->|	4 immediate	2 Alpha)	1 enable	1 _mandatory_	1 Systems	1 DMA.	2 AMD64	1 {*C,*D},	2 certainly	2 back	2 this	23 #  

以上のように、30 行に満たないスクリプトで、map 要素と reduce 要素を実装し、Hadoop フレームワーク内でこれらのプロセスが実行される仕組みを説明しました。単純な例ですが、この例から、Hadoop の真の威力と、カスタム・アルゴリズムまたは専用アルゴリズムで大量のデータ・セットを処理するためのフレームワークとして、Hadoop がこれほどよく使われている理由がわかるはずです。

Hadoop のその他のアプリケーション

単純に大量のデータ・セットのワード・カウントを計算する以外にも、Hadoop はさまざまなアプリケーションに使用することができます。アプリケーションで Hadoop を使うには、Hadoop インフラストラクチャーで使用できるベクトル形式でデータを表現するだけでよいのです。規範的な例では、キーと値としてのベクトル表現を使用しましたが、値の定義方法については、例えば複数の値を集約するなどの制約は何もありません。この柔軟性は、さらに多彩なアプリケーションで Hadoop の新たな可能性を広げます。

MapReduce ワード・カウント・モデルにぴったりの興味深いアプリケーションとしては、Web サーバーのアクセス頻度を表にするというアプリケーションがあります (このアプリケーションについては、Google の独創的な論文のなかで説明されています)。このアプリケーションでは、(Web サーバーのアクセス・ログから取り込まれた) URL がキーの役割を果たします。この場合、reduce プロセスの結果として、Web サーバーのログに基づき、特定の Web サイトに対する URL ごとの合計アクセス回数が出力されます。

機械学習アプリケーションでは、Hadoop は大量の GA 個体数を処理するための遺伝的アルゴリズムのスケーリング方法として使用されてきました (潜在的なソリューション)。この場合は、map プロセスで従来の遺伝的アルゴリズムが実行され、ローカル・プールから最適な個々のソリューションを探し出します。そして map フェーズで突き止められた個々のソリューションが、reduce アプリケーションで選抜されることになります。この仕組みでは、個々のノードがそれぞれに最適なソリューションを特定し、さらに reduce フェーズで、これらのソリューションを適者生存の分布表示で選抜することができます。

他にも E メール・スパムのボットネットを特定するための興味深いアプリケーションを作成しています。このプロセスの最初のステップは、特定の組織から送られてくる E メール・メッセージを (フィンガープリントのセットを基に) 減らす目的で、E メール・メッセージを分類することです。このフィルタリングされたデータから、何らかの点で (例えば、E メール・メッセージ本文で同じリンクが参照されているなど) 関連する E メールのグラフが作成されます。これらの関連する E メールをホスト (静的または動的 IP アドレス) にまとめることで、問題となっているボットネットを特定します。

map および reduce プリミティブから世界を覗くアプリケーションの他では、Hadoop はマシンのクラスターに作業を分配する手段として役立ちます。map と reduce では、特定のタイプのアプリケーションしか使えないわけではありません。Hadoop は、ホストにデータとアルゴリズムの両方を分配して並列処理を迅速化するための手段と見なすことができます。

Hadoop アプリケーションのエコシステム

Hadoop は柔軟なフレームワークとなりますが、このフレームワークのインターフェースを他のアプリケーション用に変換できるアプケーションもあります。その興味深い一例は、独自の問い合わせ言語 (Hive QL) を持つデータ・ウェアハウス・インフラストラクチャー、Hive です。Hive は、SQL の知識を持つ開発者にとって Hadoop をより使いやすいフレームワークにするだけでなく、データ処理に従来の MapReduce インフラストラクチャーも使用することができます。

HBase も、HDFS をベースにした興味深いアプリケーションです。HBase は Google BigTable に似たハイパフォーマンス・データベース・システムで、従来のファイル処理の代わりに、HBase はデータベース・テーブルを MapReduce 処理に対応した入出力形式に変換します。

最後に、Pig についても紹介しておきます。Pig は、Hadoop で大量のデータ・セットを分析するためのプラットフォームとして、MapReduce アプリケーションにコンパイルされる高級言語を提供します。

さらに詳しく調べてください

この Hadoop をテーマにした連載の最終回では、Hadoop フレームワーク対応の MapReduce アプリケーションを Ruby で開発する方法を説明しました。この記事から、皆さんが Hadoop の真の威力を理解していただけたようであれば幸いです。Hadoop では特定のプログラミング・モデルに従わなければなりませんが、そのモデルは柔軟で、さまざまなアプケーションに適用することができます。

0 件のコメント:

コメントを投稿