2011年3月6日日曜日

Hadoop による分散データ処理: 第 2 回 拡張編

Hadoop 分散コンピューティング・アーキテクチャーの真の威力は、その分散能力にあります。つまり、多数のノードで並列に処理を行えるように作業を分散できるため、Hadoop は大規模なインフラストラクチャーにまで拡大できると同時に、膨大な量のデータを処理できるということです。この記事ではまず分散型 Hadoop アーキテクチャーの内部構造を説明した上で、分散構成とその使用方法の詳細を掘り下げていきます。

分散型 Hadoop アーキテクチャー

この連載の第 1 回では、すべての Hadoop デーモンが同じ 1 つのホスト上で動作していました。この擬似分散構成では、Hadoop の並列性を利用することはありませんでしたが、最小限のセットアップで容易に Hadoop の機能をテストすることができました。今回は、複数のマシンからなるクラスターを使用して、Hadoop の並列性を詳しく調べます。

第 1 回の Hadoop 構成では、すべての Hadoop デーモンが単一のノードで動作するように定義されていたので、まずは、並行処理を行う本来の分散された Hadoop を見てみましょう。分散型 Hadoop では、1 つのマスター・ノードと複数のスレーブ・ノードで構成されます (図 1 を参照)。


図 1. Hadoop のマスター・ノードとスレーブ・ノードの内部構造
 

図 1 に示されているように、マスター・ノードは namenode、secondary namenode、および jobtracker デーモン (いわゆる、マスター・デーモン) で構成されます。さらにこのマスター・ノードは、記事のデモで (Hadoop ユーティリティーおよびブラウザーを使用して) クラスターを管理するときに使用するノードでもあります。スレーブ・ノードは、tasktracker と datanode (スレーブ・デーモン) で構成されます。この構成の特徴は、マスター・ノードにはHadoop クラスターの管理と調整を行うデーモンが含まれる一方、スレーブ・ノードには HDFS (Hadoop File System) と MapReduce 機能 (データ処理関数) のためのストレージ関数を実装するデーモンが含まれるという点です。

この記事のデモでは、1 つの LAN 上に 1 つのマスター・ノードと 2 つのスレーブ・ノードを作成します。この構成は、図 2 のとおりです。ここからは早速、マルチノード・クラスターで分散処理する場合の Hadoop システムとその構成の詳細について詳しく見ていきます。


図 2. Hadoop クラスター構成
 

デプロイメントを単純にするために、仮想化を利用します。仮想化は、いくつかの利点をもたらします。この構成の場合、パフォーマンスには有利に働かないかもしれませんが、仮想化を使用することによって、Hadoop システムを作成した後に、作成したシステムを他のノードに複製することができます。そのため、Hadoop クラスターは以下の図に示すような形になり、単一ホスト上のハイパーバイザーのコンテキストでマスター・ノードとスレーブ・ノードが仮想マシン (VM) として動作します (図 3 を参照)。


図 3. 仮想環境での Hadoop クラスター構成
 

Hadoop のアップグレード

第 1 回では、単一のノードで動作する、特殊な Hadoop ディストリビューションをインストールしました (擬似構成)。この記事では、この擬似構成を分散構成にアップグレードします。今回の記事からこの連載を読み始めているとしたら、この先を読む前に、第 1 回をひと通り読んで Hadoop 擬似構成をインストールしてください。

擬似構成では単一のノード用にすべてが事前に構成されているため、前回は一切の構成を行っていません。今回は、この構成を更新する必要があります。まずは、update-alternatives コマンドを使用して、現在の構成を確認してください (リスト 1 を参照)。このコマンドによって、現在の構成では conf.pseudo (最高の優先度) を使用していることがわかります。


リスト 1. 現在の Hadoop 構成の確認
$ update-alternatives --display hadoop-0.20-conf hadoop-0.20-conf - status is auto.  link currently points to /etc/hadoop-0.20/conf.pseudo /etc/hadoop-0.20/conf.empty - priority 10 /etc/hadoop-0.20/conf.pseudo - priority 30 Current `best' version is /etc/hadoop-0.20/conf.pseudo. $  

次に、既存の構成 (この例では、リスト 1 に示されている conf.empty) をコピーして、新しい構成を作成します。

$ sudo cp -r /etc/hadoop-0.20/conf.empty /etc/hadoop-0.20/conf.dist $  

そして最後に、新しく作成した構成を有効にして確認します。


リスト 2. Hadoop 構成を有効にして確認する
$ sudo update-alternatives --install /etc/hadoop-0.20/conf hadoop-0.20-conf \   /etc/hadoop-0.20/conf.dist 40 $ update-alternatives --display hadoop-0.20-conf hadoop-0.20-conf - status is auto.  link currently points to /etc/hadoop-0.20/conf.dist /etc/hadoop-0.20/conf.empty - priority 10 /etc/hadoop-0.20/conf.pseudo - priority 30 /etc/hadoop-0.20/conf.dist - priority 40 Current `best' version is /etc/hadoop-0.20/conf.dist. $  

これで、conf.dist という名前の新規構成が用意できました。新しい分散構成には、この構成を使用します。仮想環境で実行中の現段階で、このノードを、データ・ノードとして機能する 2 つの追加ノードに複製しておいてください。

分散処理を行うための Hadoop の構成

次のステップは、すべてのノードに互いを認識させることです。そのための作業は、それぞれ masters、slaves と名付けた /etc/hadoop-0.20/conf.dist ファイルで行います。この例で使用する 3 つのノードには、以下のように (/etc/hostsから) 静的な IP アドレスが割り当てられています。


リスト 3. この構成用の Hadoop ノード (/etc/hosts)
master 192.168.108.133 slave1 192.168.108.134 slave2 192.168.108.135 

マスター・ノードで /etc/hadoop-0.20/conf.dist/masters を更新して、以下のように記述することでマスター・ノードを識別します。

master 

続いて /etc/hadoop-0.20/conf.dist/slaves に次の 2 行を含めることで、スレーブ・ノードを識別します。

slave1 slave2 

今度は、各ノードから Secure Shell (ssh) を使用して他のノードに接続し、パスフレーズなしの ssh が機能していることを確認します。これらのファイルのそれぞれ (masters、slaves) が、連載第 1 回で使用した Hadoop の start ユーティリティーと stop ユーティリティーによって使用されます。

次は、/etc/hadoop-0.20/conf.dist サブディレクトリーでの Hadoop 固有の構成に移ります。これから行う変更は、Hadoop のドキュメントで定義されているように、すべてのノード (マスターおよび両方のスレーブ) で行う必要があります。まず初めに、core-site.xml ファイル (リスト 4) で HDFS マスターを特定し、それによって namenode のホストとポートを定義します (マスター・ノードの IP アドレスを使用していることに注意)。この core-site.xml は、Hadoop のコア・プロパティーを定義するファイルです。


リスト 4. core-site.xml での HDFS マスターの定義
<configuration>    <property>     <name>fs.default.name<name>     <value>hdfs://master:54310<value>     <description>The name and URI of the default FS.</description>   <property>  <configuration> 

次は、MapReduce の jobtracker を特定します。この jobtracker は、専用のノードに独立させることもできますが、この構成ではマスター・ノードに配置します (リスト 5 を参照)。mapred-site.xml は、MapReduce のプロパティーが含まれるファイルです。


リスト 5. mapred-site.xml での MapReduce jobtracker の定義
<configuration>    <property>     <name>mapred.job.tracker<name>     <value>master:54311<value>     <description>Map Reduce jobtracker<description>   <property>  <configuration> 

最後に、デフォルトの複製係数を定義します (リスト 6)。これは作成するレプリカの数を定義する値で、一般には 3 以下に設定されます。この例では値を 2 に定義します (datanode の数)。この値を定義する場所は、HDFS のプロパティーが含まれる hdfs-site.xml ファイルです。


リスト 6. hdfs-site.xml でのデフォルトのデータ複製の定義
<configuration>    <property>     <name>dfs.replication<name>     <value>2<value>     <description>Default block replication<description>   <property>  <configuration> 

以上のリスト 4リスト 5、およびリスト 6 に示した構成項目は、分散構成に必須の要素です。これらの必須要素の他、Hadoop には、環境全体でのカスタマイズを可能にする構成オプションが多数用意されています。使用できるオプションについての詳細は、「参考文献」セクションを参照してください。

構成が完了したら、次のステップとして namenode (HDFS マスター・ノード) をフォーマットします。この操作には、hadoop-0.20 ユーティリティーを使用して、namenode と操作 (-format) を指定してください。


リスト 7. namenode のフォーマット
user@master:~# sudo su - root@master:~# hadoop-0.20 namenode -format 10/05/11 18:39:58 INFO namenode.NameNode: STARTUP_MSG:  /************************************************************ STARTUP_MSG: Starting NameNode STARTUP_MSG:   host = master/127.0.1.1 STARTUP_MSG:   args = [-format] STARTUP_MSG:   version = 0.20.2+228 STARTUP_MSG:   build =  -r cfc3233ece0769b11af9add328261295aaf4d1ad;  ************************************************************/ 10/05/11 18:39:59 INFO namenode.FSNamesystem: fsOwner=root,root 10/05/11 18:39:59 INFO namenode.FSNamesystem: supergroup=supergroup 10/05/11 18:39:59 INFO namenode.FSNamesystem: isPermissionEnabled=true 10/05/11 18:39:59 INFO common.Storage: Image file of size 94 saved in 0 seconds. 10/05/11 18:39:59 INFO common.Storage:    Storage directory /tmp/hadoop-root/dfs/name has been successfully formatted. 10/05/11 18:39:59 INFO namenode.NameNode: SHUTDOWN_MSG:  /************************************************************ SHUTDOWN_MSG: Shutting down NameNode at master/127.0.1.1 ************************************************************/ root@master:~#  

namenode のフォーマットが終わったら、今度は Hadoop デーモンを起動します。デーモンの起動方法は、第 1 回の擬似分散構成で行った通りの方法ですが、今回のプロセスでは同じことを分散構成に対して行います。以下のコードで起動しているのは、(jps コマンドによって示されているように) namenode と secondary namenode であることに注意してください 。


リスト 8. namenode の起動
root@master:~# /usr/lib/hadoop-0.20/bin/start-dfs.sh starting namenode, logging to    /usr/lib/hadoop-0.20/bin/../logs/hadoop-root-namenode-mtj-desktop.out 192.168.108.135: starting datanode, logging to    /usr/lib/hadoop-0.20/bin/../logs/hadoop-root-datanode-mtj-desktop.out 192.168.108.134: starting datanode, logging to    /usr/lib/hadoop-0.20/bin/../logs/hadoop-root-datanode-mtj-desktop.out 192.168.108.133: starting secondarynamenode,    logging to /usr/lib/hadoop-0.20/logs/hadoop-root-secondarynamenode-mtj-desktop.out root@master:~# jps 7367 NameNode 7618 Jps 7522 SecondaryNameNode root@master:~#  

この時点で jps を使って、いずれか一方のスレーブ・ノード (データ・ノード) を調べると、各スレーブ・ノードに 1 つの datanode デーモンがあることがわかります。


リスト 9. スレーブ・ノードでの datanode の検査
root@slave1:~# jps 10562 Jps 10451 DataNode root@slave1:~#  

次のステップでは、MapReduce デーモン (jobtracker および tasktracker) を起動します。その方法は、リスト 10 に示すとおりです。注意する点として、このスクリプトは jobtracker を (リスト 5 の構成で定義されているように) マスター・ノードで起動し、tasktracker を各スレーブ・ノードで起動します。jobtracker が実行中になったことを確認するには、マスター・ノードで jps コマンドを実行してください。


リスト 10. MapReduce デーモンの起動
root@master:~# /usr/lib/hadoop-0.20/bin/start-mapred.sh starting jobtracker, logging to    /usr/lib/hadoop-0.20/logs/hadoop-root-jobtracker-mtj-desktop.out 192.168.108.134: starting tasktracker, logging to    /usr/lib/hadoop-0.20/bin/../logs/hadoop-root-tasktracker-mtj-desktop.out 192.168.108.135: starting tasktracker, logging to    /usr/lib/hadoop-0.20/bin/../logs/hadoop-root-tasktracker-mtj-desktop.out root@master:~# jps 7367 NameNode 7842 JobTracker 7938 Jps 7522 SecondaryNameNode root@master:~#  

最後のステップとして、jps を使ってスレーブ・ノードを確認します。tasktracker デーモンによって、datanode デーモンがスレーブのデータ・ノードのそれぞれに結合されたことがわかるはずです。


リスト 11. スレーブ・ノードでの datanode の検査
root@slave1:~# jps 7785 DataNode 8114 Jps 7991 TaskTracker root@slave1:~#  

図 4 に、起動スクリプト、ノード、そして起動されたデーモンの相互関係を示します。この図を見るとわかるように、start-dfs スクリプトは namenode と datanode を起動する一方、start-mapred スクリプトは jobtracker と tasktrackerを起動します。


図 4. 起動スクリプトとノードごとのデーモンとの関係
 

HDFS のテスト

Hadoop がクラスター全体で稼働中になったところで、いくつかのテストを実行して、Hadoop が正常に動作することを確認できます (リスト 12 を参照)。まずは、hadoop-0.20 ユーティリティーを使ってファイルシステム・コマンド (fs) を実行し、df (disk free) 操作を要求してください。Linux® での場合と同じく、このコマンドは単に、特定のデバイスに対して使用中のスペースと使用可能なスペースを識別します。したがって、新たにフォーマットされたファイルシステムの場合、使用中のスペースはありません。次に、HDFS のルートで ls 操作を実行します。サブディレクトリーを作成してから、その内容を表示し、その後サブディレクトリーを削除します。また、hadoop-0.20 ユーティリティーの fsck (file system check) コマンドを使って、HDFS に対して fsck を実行することができます。このすべての操作によって、他のさまざまな情報 (2 つの datanode が検出されたなど) と併せ、ファイルシステムが正常であることを確認することができます。


リスト 12. HDFS の確認
root@master:~# hadoop-0.20 fs -df File system		Size	Used	Avail		Use% /		16078839808	73728	3490967552	0% root@master:~# hadoop-0.20 fs -ls / Found 1 items drwxr-xr-x   - root supergroup          0 2010-05-12 12:16 /tmp root@master:~# hadoop-0.20 fs -mkdir test root@master:~# hadoop-0.20 fs -ls test root@master:~# hadoop-0.20 fs -rmr test Deleted hdfs://192.168.108.133:54310/user/root/test root@master:~# hadoop-0.20 fsck / .Status: HEALTHY  Total size:	4 B  Total dirs:	6  Total files:	1  Total blocks (validated):	1 (avg. block size 4 B)  Minimally replicated blocks:	1 (100.0 %)  Over-replicated blocks:	0 (0.0 %)  Under-replicated blocks:	0 (0.0 %)  Mis-replicated blocks:		0 (0.0 %)  Default replication factor:	2  Average block replication:	2.0  Corrupt blocks:		0  Missing replicas:		0 (0.0 %)  Number of data-nodes:		2  Number of racks:		1  The filesystem under path '/' is HEALTHY root@master:~#  

MapReduce ジョブの実行

今度は MapReduce ジョブを実行して、構成全体が正常に機能していることを検証します (リスト 13 を参照)。このプロセスでは最初のステップとして、何らかのデータを入力しなければなりません。そこで、まずは入力データを保持するディレクトリー (input ディレクトリー) を作成するところから始めます。それには、mkdir コマンドを指定した hadoop-0.20 ユーティリティーを使用してください。次に、hadoop-0.20  put コマンドを使って、2 つのファイルを HDFS 内に挿入します。そして Hadoop ユーティリティーの ls コマンドを使って、input ディレクトリーの内容を確認します。


リスト 13. 入力データの生成
root@master:~# hadoop-0.20 fs -mkdir input root@master:~# hadoop-0.20 fs -put \   /usr/src/linux-source-2.6.27/Doc*/memory-barriers.txt input root@master:~# hadoop-0.20 fs -put \   /usr/src/linux-source-2.6.27/Doc*/rt-mutex-design.txt input root@master:~# hadoop-0.20 fs -ls input Found 2 items -rw-r--r--  2 root supergroup  78031 2010-05-12 14:16 /user/root/input/memory-barriers.txt -rw-r--r--  2 root supergroup  33567 2010-05-12 14:16 /user/root/input/rt-mutex-design.txt root@master:~#  

ここで、wordcount という MapReduce ジョブを実行してください。擬似分散モデルでの場合と同じく、このジョブを実行する際には、input サブディレクトリー (入力ファイルが含まれるディレクトリー) と output ディレクトリー (このディレクトリーはまだ存在していませんが、namenode によって作成されて、結果のデータが入力されます) を指定します。


リスト 14. クラスターでの MapReduce の wordcount ジョブの実行
root@master:~# hadoop-0.20 jar \   /usr/lib/hadoop-0.20/hadoop-0.20.2+228-examples.jar wordcount input output 10/05/12 19:04:37 INFO input.FileInputFormat: Total input paths to process : 2 10/05/12 19:04:38 INFO mapred.JobClient: Running job: job_201005121900_0001 10/05/12 19:04:39 INFO mapred.JobClient:  map 0% reduce 0% 10/05/12 19:04:59 INFO mapred.JobClient:  map 50% reduce 0% 10/05/12 19:05:08 INFO mapred.JobClient:  map 100% reduce 16% 10/05/12 19:05:17 INFO mapred.JobClient:  map 100% reduce 100% 10/05/12 19:05:19 INFO mapred.JobClient: Job complete: job_201005121900_0001 10/05/12 19:05:19 INFO mapred.JobClient: Counters: 17 10/05/12 19:05:19 INFO mapred.JobClient:   Job Counters  10/05/12 19:05:19 INFO mapred.JobClient:     Launched reduce tasks=1 10/05/12 19:05:19 INFO mapred.JobClient:     Launched map tasks=2 10/05/12 19:05:19 INFO mapred.JobClient:     Data-local map tasks=2 10/05/12 19:05:19 INFO mapred.JobClient:   FileSystemCounters 10/05/12 19:05:19 INFO mapred.JobClient:     FILE_BYTES_READ=47556 10/05/12 19:05:19 INFO mapred.JobClient:     HDFS_BYTES_READ=111598 10/05/12 19:05:19 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=95182 10/05/12 19:05:19 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=30949 10/05/12 19:05:19 INFO mapred.JobClient:   Map-Reduce Framework 10/05/12 19:05:19 INFO mapred.JobClient:     Reduce input groups=2974 10/05/12 19:05:19 INFO mapred.JobClient:     Combine output records=3381 10/05/12 19:05:19 INFO mapred.JobClient:     Map input records=2937 10/05/12 19:05:19 INFO mapred.JobClient:     Reduce shuffle bytes=47562 10/05/12 19:05:19 INFO mapred.JobClient:     Reduce output records=2974 10/05/12 19:05:19 INFO mapred.JobClient:     Spilled Records=6762 10/05/12 19:05:19 INFO mapred.JobClient:     Map output bytes=168718 10/05/12 19:05:19 INFO mapred.JobClient:     Combine input records=17457 10/05/12 19:05:19 INFO mapred.JobClient:     Map output records=17457 10/05/12 19:05:19 INFO mapred.JobClient:     Reduce input records=3381 root@master:~# 

最後のステップは、出力データを調べることです。実行したのは wordcount MapReduce ジョブなので、1 つのファイルだけが作成されます (Map 処理による複数のファイルが、Reduce 処理によって結合されたファイル)。このファイルには、入力ファイルで検出された単語と、全入力ファイルでその単語が出現した回数を表すタプルのリストが含まれます。


リスト 15. MapReduce ジョブの出力の検査
root@master:~# hadoop-0.20 fs -ls output Found 2 items drwxr-xr-x   - root supergroup          0 2010-05-12 19:04 /user/root/output/_logs -rw-r--r--   2 root supergroup      30949 2010-05-12 19:05 /user/root/output/part-r-00000 root@master:~# hadoop-0.20 fs -cat output/part-r-00000 | head -13 !=	1 "Atomic	2 "Cache	2 "Control	1 "Examples	1 "Has	7 "Inter-CPU	1 "LOAD	1 "LOCK"	1 "Locking	1 "Locks	1 "MMIO	1 "Pending	5 root@master:~#  

Web 管理インターフェース

hadoop-0.20 ユーティリティーは極めて広範に使用できて機能も充実していますが、このユーティリティーの代わりに GUI を使ったほうが便利な場合もあります。ファイルシステムを検査する場合、namenode には http://master:50070 で、jobtracker には http://master:50030 でアクセスすることができます。namenode から HDFS を調べることで (図 5 を参照)、このファイルシステム内にある input ディレクトリー (入力データが含まれるディレクトリー。リスト 13 を参照) を検査することができます。


図 5. namenode を介した HDFS の検査
 

jobtracker からは、実行中のジョブや完了したジョブを調べることができます。図 6 に示されているのは、最後に行ったジョブ (リスト 14 で行ったジョブ) の検査結果です。ここには、JAR (Java archive) リクエストに対して送信された各種のデータが出力として示されているだけでなく、タスクの数とステータスも明示されています。この表示から、2 つの Map タスク (入力ファイルごとに 1 つ) が実行され、(この 2 つの Map タスクの入力を結合するために) 1 つの Reduce タスクが実行されたことがわかります。


図 6. 完了したジョブのステータスの確認
 

さらに、namenode を介して datanode のステータスを調べることもできます。namenode のメイン・ページには、アクティブなノードと非アクティブなノードの数がリンクとして示されるので、このリンクから、さらに詳しくノードを調べることができます。図 7 に示すページには、アクティブな datanode と、それぞれに関する統計が示されています。


図 7. アクティブな datanode のステータスの確認
 

namenode および jobtracker の Web インターフェースからは、他にも多数のビューを表示することができますが、説明を簡潔にするため、ここでは以上のサンプル・ビューだけを記載しました。namenode および jobtracker の Web ページ内ではさまざまなリンクから、Hadoop の構成および操作に関する詳細情報 (実行時のログを含む) にアクセスすることができます。

さらに詳しく調べてください

この記事では、Cloudera による擬似分散構成を、完全な分散構成に変換する方法を説明しました。驚くほど数少ないステップと MapReduce アプリケーションとまったく同じインターフェースによって、Hadoop は他に類のない便利な分散処理ツールとなっています。また、興味深い点として、Hadoop のスケーラビリティーについても調べてみてください。新しい datanode を (対応する XML ファイルとスレーブ・ファイルをマスターで更新して) 追加することにより、さらに高度な並列処理に対応するよう簡単に Hadoop をスケーリングすることができます。Hadoop をテーマにしたこの連載の最終回となる第 3 回では、Hadoop 対応の MapReduce アプリケーションを開発する方法を説明します。


0 件のコメント:

コメントを投稿