2011年3月6日日曜日

Hadoop による分散データ処理: 第 1 回 導入編

最大級の検索エンジンで利用されている Hadoop は、データ・リダクションの中心的役割を果たしていますが、データ・リダクションのコアというよりは、データの分散処理用フレームワークと言い表したほうが的を射ています。ここで言うデータとは、ただのデータではありません。検索エンジンが必要なデータとして収集する、膨大な量のクロール・データのことです。分散フレームワークとしての Hadoop が、数々のアプリケーションにデータ並列処理による恩恵をもたらしています。

この記事の意図は Hadoop とそのアーキテクチャーを解説することではなく、単純な Hadoop セットアップの具体例を示すことです。Hadoop のアーキテクチャー、コンポーネント、操作理論についての詳細は、「参考文献」セクションを参照してください。この点を踏まえたうえで、早速 Hadoop のインストールと構成に取り掛かりましょう。

初期設定

Hadoop の起源

Apache Hadoop プロジェクトは、Google による取り組みから発想を得て開発されました。大量のデータを処理するこの手法の特許は Google が所有していますが、Google は寛大にも、Hadoop にその使用許可を与えています。詳細については「参考文献」セクションを参照してください。

この記事のデモでは、Cloudera の Hadoop ディストリビューションを使用します。Cloudera ではさまざまな Linux® ディストリビューションをサポートしているので、Hadoop を導入するには Cloudera のディストリビューションが最適です。

この記事で第一の前提とするのは、お使いのシステムに Java™ 技術 (リリース 1.6 以降) および cURL がインストールされていることです。まだインストールされていなければ、まず先に、この 2 つを追加してください (インストールについての詳細は、「参考文献」セクションを参照)。

私が使用しているのは Ubuntu (Intrepid リリース) なので、apt ユーティリティーを使って Hadoop ディストリビューションを入手します。このプロセスは極めて単純で、ソースをダウンロードしてビルドするといった細かな作業は必要なく、バイナリー・パッケージを入手すればよいのです。プロセスではまず、apt に Cloudera サイトの情報を指定します。次に、/etc/apt/sources.list.d/cloudera.list に新規ファイルを作成し、以下のテキストを追加します。

deb http://archive.cloudera.com/debian intrepid-cdh3 contrib deb-src http://archive.cloudera.com/debian intrepid-cdh3 contrib 

Jaunty や別のリリースを実行している場合には、intrepid の部分をそのリリースの名前に変更すればよいだけです (現在、Hardy、Intrepid、Jaunty、Karmic、および Lenny がサポートされています)。

次に、Cloudera から apt-key を取得して、ダウンロードされたパッケージを確認します。

$ curl -s http://archive.cloudera.com/debian/archive.key | \ sudo apt-key add - sudo apt-get update 

続いて擬似分散構成の Hadoop をインストールします (この構成では、単一のホストですべての Hadoop デーモンが動作します)。

$ sudo apt-get install hadoop-0.20-conf-pseudo $ 

この構成はわずか約 23MB であることに注目してください (この構成には、apt が組み込む他のパッケージで、存在しないかもしれないパッケージは含まれません)。このインストールは、Hadoop をいろいろと試して、その要素とインターフェースを学ぶにはぴったりです。

最後に、パスフレーズなしの SSH をセットアップします。ssh localhost を使用してパスフレーズが求められた場合には、以下のステップを実行する必要があります。このステップにはセキュリティーが関連してくるため、ここではコンピューターが Hadoop 専用であることを前提とします (リスト 1 を参照)。


リスト 1. パスフレーズなしの SSH のセットアップ
$ sudo su - # ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa # cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys 

最後に 1 つ注意する点として、ホストに十分な datanode (キャッシュ) 用のストレージがあることを確実にしておいてください。ストレージ不足は、例えばノードへの複製が不可能であることを示すエラーといった形で現れてきます。

Hadoop の起動

Hadoop を起動する準備はこれで整いました。Hadoop は事実上、Hadoop デーモンのそれぞれを開始することによって起動します。しかしその前に、hadoop コマンドを使用して HDFS (Hadoop File System) をフォーマットしてください。hadoop コマンドにはさまざまな使い方があります。そのうちのいくつかについては後で説明します。

まず、namenode に対し、HDFS ファイルシステムをフォーマットするように要求します。これはインストールの一環として行う作業ですが、クリーンなファイルシステムを生成しなければならない場合に備え、この方法を覚えておくと役立ちます。

# hadoop-0.20 namenode -format 

要求に対する確認応答の後、ファイルシステムがフォーマットされて情報が返されるので、その時点で Hadoop デーモンを開始します。この擬似分散構成で Hadoop が開始するデーモンは、namenode、secondarynamenode、datanode、jobtracker、tasktracker の 5 つです。デーモンが開始されるごとに、デーモンのログが保管される場所を識別する短いテキストが出力されます。デーモンはそれぞれ、(デーモンとして) バックグラウンドで実行するように開始されます。図 1 に、起動が完了した時点での擬似分散ノードの構成を示します。


図 1. 擬似分散構成の Hadoop
擬似分散構成の Hadoop のブロック図 

Hadoop には、起動を単純化するいくつかのヘルパー・メソッドが用意されています。これらのツールを分類すると、start (例えばstart-dfs) と stop (例えば stop-dfs) の 2 つに分けられます。以下の簡潔なスクリプトは、Hadoop ノードの起動方法を示す一例です。

# /usr/lib/hadoop-0.20/bin/start-dfs.sh # /usr/lib/hadoop-0.20/bin/start-mapred.sh # 

デーモンが実行中であることを確かめるには、jps (JVM プロセス用の ps ユーティリティー) コマンドを利用することができます。このコマンドによって、5 つのデーモンとそれぞれのプロセス ID が表示されます。

Hadoop デーモンが実行中になったところで、それぞれのデーモンが Hadoop フレームワークの中で果たす役割を紹介します。まず、namenode は Hadoop のマスター・サーバーとして、ファイルシステムの名前空間と、クラスター内に保管されたファイルへのアクセスを管理します。secondary namenode は namenode の予備用デーモンではなく、期間のチェックポイント・タスクとハウスキーピング・タスクを行うデーモンです。Hadoop クラスターには、1 つの namenode と 1 つの secondary namenode があります。

datanode はノードに関連付けられたストレージを管理します。クラスター内に複数のノードがあるケースもありますが、その場合には、データを保管するそれぞれのノードで datanode デーモンが実行されます。

datanode での作業をスケジューリングする jobtracker はクラスターごとに 1 つ、実際の作業を行う tasktracker は datanode ごとに 1 つあります。jobtracker と tasktracker はマスター/スレーブ関係で振る舞います。つまり、jobtracker が作業を datanode に分散し、tasktracker がタスクを行います。jobtracker はさらに、要求された作業の有効性を確認し、datanode に何らかの理由で障害が発生した場合には、障害が発生する前にスケジューリングしたタスクを再スケジューリングします。

この単純な構成ではすべてのノードが同じ 1 つのノードに常駐しますが (図 1 を参照)、これまでの説明から、Hadoop が作業の並列処理を実現する仕組みは明らかです。アーキテクチャーは単純ながらも、Hadoop はデータの分散、負荷分散、そして大量データの並列処理を耐障害性のある方法で容易に行う手段となります。

HDFS の検査

Hadoop (少なくとも namenode) が正常に稼働していることを確かめるには、いくつかのテストを実行することができます。例えばプロセスのすべてが使用可能であることがわかっていれば、hadoop コマンドを使ってローカル名前空間を検査することができます (リスト 2 を参照)。


リスト 2. HDFS へのアクセスの確認
# hadoop-0.20 fs -ls / Found 2 items drwxr-xr-x   - root supergroup          0 2010-04-29 16:38 /user drwxr-xr-x   - root supergroup          0 2010-04-29 16:28 /var # 

以上の結果から、namenode が実行中であり、ローカル名前空間に対応可能であることがわかります。ここではファイルシステムを検査するために、hadoop-0.20 というコマンドを使っている点に注目してください。このユーティリティーは Hadoop クラスターと対話する手段として、ファイルシステムを検査する場合にも、クラスター内でジョブを実行する場合にも使用します。注意する点として、このコマンドの構造は、hadoop-0.20 ユーティリティーを指定した後にコマンド (上記の場合は汎用ファイルシステム・シェル) と 1 つ以上のオプションを定義する形になっています (上記では、ls を使用してファイルのリストを要求しています)。hadoop-0.20 は Hadoop クラスターとの主要なインターフェースの 1 つなので、記事の至るところで、このユーティリティーを目にすることになります。リスト 3 に、他にもいくつかのファイルシステム操作を記載します。このリストから、hadoop-0.20 インターフェースを少し掘り下げて調べることができます (このリストでは、test という名前の新規サブディレクトリーを作成し、その中身を表示した後、そのサブディレクトリーを削除しています)。


リスト 3. Hadoop でのその他のファイルシステム操作
# hadoop-0.20 fs -mkdir test # hadoop-0.20 fs -ls test # hadoop-0.20 fs -rmr test Deleted hdfs://localhost/user/root/test #  

Hadoop のテスト

ここまでの手順で、Hadoop をインストールし、Hadoop ファイルシステムとの基本インターフェースをテストする作業は完了しました。今度は、実際のアプリケーションで Hadoop をテストします。この例では、小さなデータセットに対して MapReduce プロセスを調べます。MapReduce の Map とReduce は、関数型プログラミングの関数にちなんで名付けられていますが、データ・リダクションのコアとなる機能を提供します。Map とは、入力データを一連の小さなデータの塊に分割して処理するプロセスを指します (分割されたデータは、並列ワーカーで分散処理されます)。Reduce とは、分割されたデータの処理結果を 1 つの出力に組み立てることです。このフレームワークでは処理の内容を自由に定義できますが、ここでは処理の内容は定義していないことに注意してください。標準的な MapReduce の例は、文書セットにおける単語の出現回数の計算です。

以上の説明から、アプリケーションには入力データのセットがあり、データの処理結果として出力のセットが生成されることになります。そこで、最初のステップでは処理対象を配置する先となる input サブディレクトリーをファイルシステムに作成します。それには、以下のコマンドを使用します。

# hadoop-0.20 fs -mkdir input 

次に、input サブディレクトリーに処理対象を配置します。この場合には put コマンドを使用して、ファイルをローカル・ファイルシステムから HDFS に移します (リスト 4 を参照)。以下のフォーマットでは、ソース・ファイルを HDFS のサブディレクトリー (input) に移動させています。この作業が完了すると、HDFS に処理対象の 2 つのテキスト・ファイルが用意できます。


リスト 4. HDFS へのファイルの移動
# hadoop-0.20 fs -put /usr/src/linux-source-2.6.27/Doc*/memory-barriers.txt  input # hadoop-0.20 fs -put /usr/src/linux-source-2.6.27/Doc*/rt-mutex-design.txt  input # 

HDFS にファイルがあるかどうかは、ls コマンドで確認することができます (リスト 5 を参照)。


リスト 5. HDFS 内のファイルの確認
# hadoop-0.20 fs -ls input Found 2 items -rw-r--r--  1 root supergroup 78031 2010-04-29 17:35 /user/root/input/memory-barriers.txt -rw-r--r--  1 root supergroup 33567 2010-04-29 17:36 /user/root/input/rt-mutex-design.txt  # 

処理対象のファイルは HDFS に正常に配置されたので、MapReduce 関数を実行します。この関数に必要なのは 1 つのコマンドだけですが、長いコマンドになります (リスト 6 を参照)。このコマンドでは、JAR の実行を要求します。実際には、MapReduce 関数にはいくつもの機能が実装されていますが、この例では wordcount に焦点を絞ります。jobtracker デーモンが datanode に MapReduce ジョブの実行を要求すると、かなりの出力が生成されることになります (この例で処理するファイルは 2 つだけなので、比較的少ない出力となります)。map 関数と reduce 関数の進行状況が示された後、ファイルシステムおよびレコード処理の入出力に関する有益な統計が示されます。


リスト 6. 単語の出現回数 (wordcount) を計算する MapReduce ジョブの実行
# hadoop-0.20 jar /usr/lib/hadoop-0.20/hadoop-0.20.2+228-examples.jar \ wordcount input output 10/04/29 17:36:49 INFO input.FileInputFormat: Total input paths to process : 2 10/04/29 17:36:49 INFO mapred.JobClient: Running job: job_201004291628_0009 10/04/29 17:36:50 INFO mapred.JobClient:  map 0% reduce 0% 10/04/29 17:37:00 INFO mapred.JobClient:  map 100% reduce 0% 10/04/29 17:37:06 INFO mapred.JobClient:  map 100% reduce 100% 10/04/29 17:37:08 INFO mapred.JobClient: Job complete: job_201004291628_0009 10/04/29 17:37:08 INFO mapred.JobClient: Counters: 17 10/04/29 17:37:08 INFO mapred.JobClient:   Job Counters  10/04/29 17:37:08 INFO mapred.JobClient:     Launched reduce tasks=1 10/04/29 17:37:08 INFO mapred.JobClient:     Launched map tasks=2 10/04/29 17:37:08 INFO mapred.JobClient:     Data-local map tasks=2 10/04/29 17:37:08 INFO mapred.JobClient:   FileSystemCounters 10/04/29 17:37:08 INFO mapred.JobClient:     FILE_BYTES_READ=47556 10/04/29 17:37:08 INFO mapred.JobClient:     HDFS_BYTES_READ=111598 10/04/29 17:37:08 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=95182 10/04/29 17:37:08 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=30949 10/04/29 17:37:08 INFO mapred.JobClient:   Map-Reduce Framework 10/04/29 17:37:08 INFO mapred.JobClient:     Reduce input groups=2974 10/04/29 17:37:08 INFO mapred.JobClient:     Combine output records=3381 10/04/29 17:37:08 INFO mapred.JobClient:     Map input records=2937 10/04/29 17:37:08 INFO mapred.JobClient:     Reduce shuffle bytes=47562 10/04/29 17:37:08 INFO mapred.JobClient:     Reduce output records=2974 10/04/29 17:37:08 INFO mapred.JobClient:     Spilled Records=6762 10/04/29 17:37:08 INFO mapred.JobClient:     Map output bytes=168718 10/04/29 17:37:08 INFO mapred.JobClient:     Combine input records=17457 10/04/29 17:37:08 INFO mapred.JobClient:     Map output records=17457 10/04/29 17:37:08 INFO mapred.JobClient:     Reduce input records=3381  

処理が完了したら、結果を調べてください。このジョブの要点は、入力ファイルで単語が出現する回数を計算することです。ジョブの出力は、単語とその単語が入力ファイルで出現する回数を表すタプルを集めたファイルとして生成されます。このデータを出力するには、(その特定の出力ファイルを見つけてから) hadoop-0.20 ユーティリティーで cat コマンドを使用します (リスト 7 を参照)。


リスト 7. MapReduce の wordcount 操作による出力の表示
# hadoop-0.20 fs -ls /user/root/output Found 2 items drwxr-xr-x   - root supergroup          0 2010-04-29 17:36 /user/root/output/_logs -rw-r--r--   1 root supergroup      30949 2010-04-29 17:37 /user/root/output/part-r-00000 #   # hadoop-0.20 fs -cat output/part-r-00000 | head -13 != 1 "Atomic 2 "Cache 2 "Control 1 "Examples 1 "Has 7 "Inter-CPU 1 "LOAD 1 "LOCK" 1 "Locking 1 "Locks 1 "MMIO 1 "Pending 5 # 

hadoop-0.20 ユーティリティーを使用して HDFS からファイルを抽出することもできます (リスト 8 を参照)。ファイルの抽出は、getユーティリティーを併せて使うことによって簡単に行えます (ファイルを HDFS に書き出すために実行した put と同様のユーティリティーです)。get 操作の場合には、HDFS から抽出する (output サブディレクトリー内の) ファイルと、ローカル・ファイルシステム内で書き込むファイルの名前 (output.txt) を指定する必要があります。


リスト 8. HDFS からの出力の抽出
# hadoop-0.20 fs -get output/part-r-00000 output.txt # cat output.txt | head -5 != 1 "Atomic 2 "Cache 2 "Control 1 "Examples 1 #   

次の例では同じ JAR を使っていますが、その使い方は異なります (この例では、並列で実行される grep を調べます)。テストには既存の入力ファイルを使用しますが、output サブディレクトリーは削除してこのテスト用に作り直します。

# hadoop-0.20 fs -rmr output Deleted hdfs://localhost/user/root/output 

次に、MapReduce ジョブに grep を要求してください。この場合、grep が並列で実行されてから (map 処理)、grep の結果が結合されます (reduce 処理)。リスト 9 に、この使用モデルの出力を記載します (ただし、簡潔にするために出力の一部は省略してあります)。このコマンドで要求しているのは、input というサブディレクトリーから入力を取り、その結果を output というサブディレクトリーに配置する grep であることに注意してください。最後のパラメーターは、検索対象のストリング (この場合は「kernel」) です。


リスト 9. 単語の検索結果 (grep の結果) の件数をカウントする MapReduceジョブの実行
# hadoop-0.20 jar /usr/lib/hadoop/hadoop-0.20.2+228-examples.jar \ grep input output 'kernel' 10/04/30 09:22:29 INFO mapred.FileInputFormat: Total input paths to process : 2 10/04/30 09:22:30 INFO mapred.JobClient: Running job: job_201004291628_0010 10/04/30 09:22:31 INFO mapred.JobClient:  map 0% reduce 0% 10/04/30 09:22:42 INFO mapred.JobClient:  map 66% reduce 0% 10/04/30 09:22:45 INFO mapred.JobClient:  map 100% reduce 0% 10/04/30 09:22:54 INFO mapred.JobClient:  map 100% reduce 100% 10/04/30 09:22:56 INFO mapred.JobClient: Job complete: job_201004291628_0010 10/04/30 09:22:56 INFO mapred.JobClient: Counters: 18 10/04/30 09:22:56 INFO mapred.JobClient:   Job Counters  10/04/30 09:22:56 INFO mapred.JobClient:     Launched reduce tasks=1 10/04/30 09:22:56 INFO mapred.JobClient:     Launched map tasks=3 10/04/30 09:22:56 INFO mapred.JobClient:     Data-local map tasks=3 10/04/30 09:22:56 INFO mapred.JobClient:   FileSystemCounters 10/04/30 09:22:56 INFO mapred.JobClient:     FILE_BYTES_READ=57 10/04/30 09:22:56 INFO mapred.JobClient:     HDFS_BYTES_READ=113144 10/04/30 09:22:56 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=222 10/04/30 09:22:56 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=109 ... 10/04/30 09:23:14 INFO mapred.JobClient:     Map output bytes=15 10/04/30 09:23:14 INFO mapred.JobClient:     Map input bytes=23 10/04/30 09:23:14 INFO mapred.JobClient:     Combine input records=0 10/04/30 09:23:14 INFO mapred.JobClient:     Map output records=1 10/04/30 09:23:14 INFO mapred.JobClient:     Reduce input records=1 # 

このジョブが完了したら、output ディレクトリーを調べ (結果ファイルを特定するため)、ファイルシステムの cat 操作を実行してファイルの内容を確認してください (リスト 10 を参照)。


リスト 10. MapReduce ジョブの出力の検査
# hadoop-0.20 fs -ls output Found 2 items drwxr-xr-x  - root supergroup    0 2010-04-30 09:22 /user/root/output/_logs -rw-r--r--  1 root supergroup   10 2010-04-30 09:23 /user/root/output/part-00000 # hadoop-0.20 fs -cat output/part-00000 17 kernel #  

Web ベースのインターフェース

HDFS を検査する方法は以上のとおりですが、Hadoop の操作に関する情報を調べる場合には、Web インターフェースが重宝します。Hadoop クラスターの最上位にあるのは、HDFS を管理する namenode であることを思い出してください。http://localhost:50070 にアクセスすると、ファイルシステムの上位レベルについての詳細 (使用可能なスペースと使用中のスペース、および使用可能な datanode など) や実行中のジョブを調べることができます。また、http://localhost:50030 にアクセスすることで、jobtracker の詳細 (ジョブの状況) を調べることができます。いずれの場合でも localhost を参照しているのは、すべてのデーモンは同じホストで実行されているためです。

さらに詳しく調べてください

この記事では、単純な (擬似分散) Hadoop クラスター (Cloudera の Hadoop ディストリビューションを使用) をインストールして初期構成を行う方法を説明しました。このディストリビューションを選んだ理由は、Hadoop のインストールおよび初期構成が単純化されるからです。apache.org には他にも多数の Hadoop のディストリビューション (ソースを含む) が用意されています。詳細については「参考文献」セクションを参照してください。

一方、具体的なニーズに応じて Hadoop クラスターをスケーリングするだけのハードウェア・リソースがない場合を考えてみてください。Hadoop はこれほどよく使用されているだけあって、クラウド・コンピューティング・インフラストラクチャー内でも、事前にビルドされた Hadoop VM と専用サーバーを使用して簡単に実行することができます。例えば Amazon では、Amazon EC2 (Amazon Elastic Compute Cloud) 内に AMI (Amazon Machine Image) と計算リソースを用意しています。さらに最近、Microsoft も Windows® Azure Services Platform 内で Hadoop をサポートする予定であると発表しました。

この記事から、Hadoop が大規模なデータセットを処理するための分散コンピューティングを単純化することは簡単に見て取れます。そこで、次回の記事では、さらなる例を用いて Hadoop をマルチノード・クラスターで構成する方法を探っていきます。お楽しみに!


参考文献

学ぶために

0 件のコメント:

コメントを投稿