2011年3月6日日曜日

Hadoop MapReduce によるビッグ・データ分析

Google が画像検索サービスを開始した 2001年の当時、Google は 2.5 億もの画像に索引付けをしていました。その後 10 年も経たないうちに、この検索最大手企業が索引付けをした画像の数は合計 100 億にも上っています。一方、YouTube には 35 時間分のコンテンツが毎分アップロードされています。さらに、Twitter は 1 日平均 5500 万件のツイートを処理していると言われています。今年の初め、Twitter の検索機能は 1 日 6 億のクエリーを記録しました。私たちがビッグ・データという言葉を口にするときは、まさにこのような膨大なデータを意味します。

かつて、このような膨大な規模のデータを扱っていたのは、大企業や大学、そして政府に限られていました。つまり、莫大な費用がかかるスーパー・コンピューターを購入して、稼働させ続けるための人件費を払えるだけの事業体です。けれども、ストレージ・コストが低下し、処理能力がコモディティー化されるようになっている今、中小企業や一部の個人も同じデータを保管、マイニングするようになり、アプリケーション革新の波が広がっています。

このようなビッグ・データ革命を実現する技術の 1 つが、大量の分散されたデータ・セットを処理するプログラミング・モデルおよび実装として Google が開発した MapReduce です。この記事では、クラウド・コンピューティングのキラー・アプリケーションとも呼ばれる、Apache によるオープンソースの MapReduce 実装、Hadoop を紹介します。

Hadoop の概要

基本的に、Apache の Hadoop フレームワークは膨大なデータ・セットを分析するためのメカニズムです。分析するデータ・セットは必ずしもデータ・ストアに収容されている必要はありません。Hadoop は開発者がアクセスしやすいように、MapReduce の大規模データ分析エンジンを抽象化します。Hadoop は無数のノードにスケールアウトして、データの保管に関連するありとあらゆるアクティビティーおよび調整に対処することができます。

Hadoop を驚くほど有用かつ強力なフレームワークにしているのは、Hadoop が提供する有り余るほどの機能と構成です。Hadoop が山のように大量のデータを分析するための効率的なメカニズムであることは、Yahoo! やその他数えきれないほどの組織が認めています。Hadoop は、単一のノードでもかなり簡単に稼働させることができます。その場合に必要になるのは、分析するデータと、Generics などの Java コードの知識だけです。Hadoop はまた、Ruby、Python、C++ とも連動します。

この連載の読者は、これまでの記事で何度か MapReduce を目にしています。「Groovy の RESTClient を使用して REST によって CouchDB の操作を行う」では、CouchDB がビューに MapReduce を利用する方法を説明しました。そして、「MongoDB: (適切なすべての) RDBMS の動作をする NoSQL データ・ストア」でも、MongoDB 文書を処理するメカニズムとして MapReduce を使用しています。

MapReduce は膨大なデータ・セットを処理するための概念フレームワークとして、多数のコンピューターを使用して分散された問題を解決するように高度に最適化されています。MapReduce フレームワークはその名前が示唆するように、map 関数と reduce 関数からなります。map 関数は、膨大なデータ入力を受け取り、それを小さな塊に分割するように設計されています (そして、分割したデータを他のプロセスに渡して処理させます)。そして reduce 関数は、map 関数が収集した個別の結果を集約し、1 つの最終的な出力としてレンダリングします。

Hadoop では、Hadoop 独自の基底クラスを継承することで、map 実装および reduce 実装を定義します。そしてこれらの実装を互いに結びつけるために、コンフィグ・ファイルではこれらの実装と入力および出力フォーマットが指定されます。Hadoop は、構造化データが含まれる膨大なファイルを処理するのに最適ですが、特に便利な点は、Hadoop は入力ファイルをそのまま構文解析するため、一度に 1 行ずつ処理できることです。従って、map 関数を定義するといっても、実際には入力テキスト行から何を取得したいかを決めるだけのことです。

至るところにデータは溢れています!

米国政府が生み出す膨大なデータの多くは、一般市民にも関心のある情報です。したがって、さまざまな政府機関では、米国経済の健全性に関するデータや、変化する社会人口に関するデータを無料で配布しています。米国地質調査所 (USGS) でも、全世界の地震データを公開しています。

小さな地震は、世界中の至るところで毎日、何度も発生しています。その大多数は地球の地殻奥深くで発生するため、人間は感知しませんが、各地の地震観測所ではこれらの地震が記録されています。USGS では、記録した 1 週間分の地震データを CSV (カンマ区切り) ファイル形式で公開しています。

平均的な週次ファイルはそれほど大きいものではなく、約 100KB といったところですが、それでも Hadoop を学習する際の素材としては役に立ちます。ただし、Hadoop はこれよりも遥かに膨大なデータ・セットを処理できることをお忘れなく。

地震の記録を追跡する

USGS の Web サイトから私が最近ダウンロードした CSV ファイルは、リスト 1 に示すように、約 920 行で構成されています。


リスト 1. USGS 地震データ・ファイルの行数
$> wc -l eqs7day-M1.txt    920 eqs7day-M1.txt 

CSV ファイルには、リスト 2 のような内容が含まれています (このリストに示されているのは、ファイルの先頭の 2 行です)。


リスト 2. CSV ファイルの先頭の 2 行
$> head -n 2 eqs7day-M1.txt  Src,Eqid,Version,Datetime,Lat,Lon,Magnitude,Depth,NST,Region ci,14896484,2,"Sunday, December 12, 2010 23:23:20 UTC",33.3040,-116.4130,1.0,11.70,22,   "Southern California" 

このような情報が 920 行にもわたって含まれているともなれば、これはまさに情報豊富なファイルと言ってよいでしょう。けれども私がこのファイルから知りたいのは、1 週間の各日ごとに報告された地震の数だけです。そしてその情報から、その 7 日間にわたって最も地震の発生回数が多かった地域を調べることを目的としています。

私が最初に考えたのは、単純な grep コマンドを使って 1 日あたりの地震の発生回数を検索するという方法です。ファイルを見てみると、データは 12月 12日から開始されています。そこで、12月 12日を示すストリングを grep -c で検索したところ、リスト 3 に記載する結果となりました。


リスト 3. 12月 12日に発生した地震の回数
$> grep -c 'December 12' eqs7day-M1.txt  98 

Hadoop をインストールしてください

まだ Hadoop をインストールしていない場合は、この時点でインストールしてください。まず、最新のバイナリーをダウンロードして解凍し、Hadoop  bin ディレクトリーをパスに設定します。これで、hadoop コマンドを直接実行できるようになります。Hadoop を使用するには、この後の例でわかるように、java コマンドを呼び出すのではなく、Hadoop  hadoopコマンドを実行する必要があります。hadoop コマンドには、Java バイナリー・ファイル (例えば、map 実装と reduce実装を表すファイル) の検索先などをオプションとして渡すことができます。記事の例では、jar ファイルを作成して、その jar ファイルで実行するジョブを Hadoop に指示します。また、アプリケーションを実行するために必要なその他のバイナリーも Hadoop のクラス・パスに追加します。

12月 12日には 98 のエントリーがあることがわかりました。つまり、98 回の地震が記録されているということです。さらに 12月 11日、10日、等々と遡り、grep で地震の回数を調べることはできますが、それでは面倒な作業になりそうです。さらに厄介なことに、この情報を引き出すには、ファイルに何月何日のデータが含まれているかを知っていなければなりません。私は地震が起きた日にちにはまったく関心がなく、その情報を利用しない可能性すらあります。私が知りたいのは、7 日間のうちの各日に発生した地震の回数だけです。この情報は、Hadoop を使えば簡単に取得することができます。

Hadoop が私の 1 番目の質問と 2番目の質問に答えるために必要な情報は、ほんのわずかしかありません。具体的には、どの入力を処理し、map 関数と reduce 関数をどのように扱うかという情報だけです。最終的にはすべてを 1 つにまとめるためのジョブを用意することになりますが、そのコードに取り掛かる前に少し時間を取って、CSV データに何も問題がないことを確認しておきます。

opencsv によるデータの構文解析

地震の CSV ファイルでは、先頭行 (ヘッダー) を除き、各行がカンマで区切られた一連のデータ値となっています。これらのデータ値のうち、主に興味の対象となるのは、地震が発生した日付、場所、そして地震のマグニチュードの 3 つです。これらのデータを取得するために、CSV ファイルの構文解析を支援する opencsv という巧妙なオープンソースのライブラリーを使用しようと思います。

私はテスト・ファーストを好む人間なので、ここでも簡単な JUnit テストを作成するところから始めます。これは、CSV ファイルから抜き取ったサンプル行から、必要な情報を取得できることを確かめるためのテストです (リスト 4 を参照)。


リスト 4. CSV 行の構文解析
public class CSVProcessingTest {   private final String LINE = "ci,14897012,2,\"Monday, December 13, 2010 " +             "14:10:32 UTC\",33.0290,-115." +             "5388,1.9,15.70,41,\"Southern California\"";   @Test  public void testReadingOneLine() throws Exception {   String[] lines = new CSVParser().parseLine(LINE);    assertEquals("should be Monday, December 13, 2010 14:10:32 UTC",     "Monday, December 13, 2010 14:10:32 UTC", lines[3]);    assertEquals("should be Southern California",     "Southern California", lines[9]);    assertEquals("should be 1.9", "1.9", lines[6]);  } } 

リスト 4 を見るとわかるように、opencsv はカンマで区切られた値の操作を大幅に簡易化します。パーサーは単純に String の配列を返すので、そこから位置の値を取得できるというわけです (Java 言語では、配列およびコレクションにアクセスする際はゼロを基準とすることを思い出してください)。

日時フォーマットを変換する

MapReduce の操作で map 関数が受け持つ役割は、処理する値を何らかのキーと一緒に取得することです。つまり、map は主として、キーと値という 2 つの要素を処理して返します。前述の要件からすると、ここでまず必要となるのは、日ごとに発生した地震の回数を調べることです。したがって、地震ファイルを分析するときには、キーとしての日付、そして値としてのカウンターという 2 つの値を出力することになります。その後、reduce 関数がカウンターを合計すれば (単に整数値 1 を加えていくだけですが)、ある日付がターゲット地震ファイル内に出現した回数がわかります。

私が関心を持っているのは、地震の発生した時刻ではなく、どの日に地震が発生したかについてなので、各ファイル内での時刻の部分は削除することにします。リスト 5 で作成しているのは、入力ファイルでの特定の日時フォーマットをより一般的な日付フォーマットに変換する方法を検証する簡単なテストです。


リスト 5. 日時フォーマットの変換
@Test public void testParsingDate() throws Exception {  String datest = "Monday, December 13, 2010 14:10:32 UTC";  SimpleDateFormat formatter = new SimpleDateFormat("EEEEE, MMMMM dd, yyyy HH:mm:ss Z");  Date dt = formatter.parse(datest);   formatter.applyPattern("dd-MM-yyyy");  String dtstr = formatter.format(dt);  assertEquals("should be 13-12-2010", "13-12-2010", dtstr); } 

リスト 5 では、CSV ファイルにおいて「Monday, December 13, 2010 14:10:32 UTC」というフォーマットで日時を表す String SimpleDateFormat という Java オブジェクトを使用して、「13-12-2010」という (米国では) 一般的な日付フォーマットに変換しました。

Hadoop の map と reduce

CSV ファイルとその日時フォーマットをどのように処理するかが決まったところで、Hadoop に map 関数と reduce 関数を実装する作業に取り掛かります。Hadoop は明示的なタイプセーフを好むため、このプロセスには Java の Generics の知識が必要になります。

Hadoop で map 実装を定義するときには、私は単に Hadoop の Mapper クラスを継承させるだけにしています。そうすれば、出力するキーと値に、Generics を使って明示的な型を指定できるからです。型の節には入力するキーと値についても明示的に記述します。ファイルを読み取る場合、キーはバイト・カウント、値はテキスト行になります。

私が定義した EarthQuakesPerDateMapper クラスが継承しているのは、Hadoop の Mapper オブジェクトです。クラスの出力のキーはText オブジェクトとして、値は IntWritable として明示的に記述されています。後者は Hadoop 固有のクラスで、基本的に整数となります。クラス節の最初の 2 つの型が LongWritable  Text になっていることにも注意してください。この 2 つはそれぞれ、バイト・カウント、テキスト行です。

このクラス定義内の型の節により、map メソッドに渡されるパラメーターの型は、このメソッドの出力と併せて context.write 節のなかに設定されます。他のどこかに指定しようとすれば、コンパイラーの問題が発生するか、Hadoop が型の不一致を伝えるエラー・メッセージを出すことになります。


リスト 6. map 実装
public class EarthQuakesPerDateMapper extends Mapper<LongWritable,    Text, Text, IntWritable> {  @Override  protected void map(LongWritable key, Text value, Context context) throws IOException,    InterruptedException {    if (key.get() > 0) {    try {      CSVParser parser = new CSVParser();      String[] lines = parser.parseLine(value.toString());       SimpleDateFormat formatter =         new SimpleDateFormat("EEEEE, MMMMM dd, yyyy HH:mm:ss Z");      Date dt = formatter.parse(lines[3]);      formatter.applyPattern("dd-MM-yyyy");       String dtstr = formatter.format(dt);      context.write(new Text(dtstr), new IntWritable(1));    } catch (ParseException e) {}   }  } } 

リスト 6 の map 実装は単純な内容です。Hadoop はこのクラスを、基本的に入力ファイルで検出した各テキスト行に対して呼び出します。CSV のヘッダーを処理しないようにするため、最初にバイト・カウント (key オブジェクト) がゼロでないかどうかをチェックします。次に行っている作業は、すでにリスト 4 とリスト 5 に記載した作業と変わりません。つまり、入力される日時を取得し、それを変換して、出力キーとして設定します。さらに、カウントとして 1 を指定します。つまり、日付ごとにカウンターをコード化することで、呼び出された reduce 実装がキー、そして値のコレクションを取得するようにしています。この例の場合、キーは日付とその値で構成されることになります (リスト 7 を参照)。


リスト 7. map 出力と reduce 入力の論理ビュー
"13-12-2010":[1,1,1,1,1,1,1,1] "14-12-2010":[1,1,1,1,1,1] "15-12-2010":[1,1,1,1,1,1,1,1,1] 

リスト 7 に記載した論理コレクションを組み立てたのは、context.write(new Text(dtstr), new IntWritable(1)) の行 (リスト 6) であることに注意してください。おそらくお気付きだと思いますが、context は、各種の情報を保持する Hadoop のデータ構造体です。この context  reduce 実装に渡されると、そこに含まれる 1 の値を取って合計します。その結果、reduce 実装は必然的にリスト 8 のようなデータ構造体を作り出します。


リスト 8. reduce 出力のビュー
"13-12-2010":8 "14-12-2010":6 "15-12-2010":9 

リスト 9 に、reduce 実装を記載します。Hadoop の Mapper と同じく、Reducer はパラメーター化されます。最初の 2 つのパラメーターは入力するキーの型 (Text) と値の型 (IntWritable) です。これに続く 2 つのパラメーターは出力するキーと値の型で、この例では入力する型と同じになっています。


リスト 9. reduce 実装
public class EarthQuakesPerDateReducer extends Reducer<Text, IntWritable, Text,    IntWritable> {  @Override  protected void reduce(Text key, Iterable<IntWritable> values, Context context)   throws IOException, InterruptedException {   int count = 0;   for (IntWritable value : values) {    count++;   }   context.write(key, new IntWritable(count));  } } 

上記のとおり、reduce 実装は単純を極めています。リスト 7 で指摘したように、入力する値はまさに値のコレクションそのもの、つまりこの例では値 1 のコレクションです。私が行わなければならない作業と言えば、これらの値を合計して、日付とカウントを表す新しいキーと値のペアを書き出すことぐらいです。この reduce コードは、基本的には続いてリスト 8 に記載した行を分割します。論理フローは以下のようになります。

"13-12-2010":[1,1,1,1,1,1,1,1] -> "13-12-2010":8 

上記のリストを概念で表すと、もちろん map -> reduce になります。

Hadoop Job の定義

map 実装と reduce 実装のコードは完成しました。残る作業は、すべてのコードを Hadoop の Job に連結するだけです。Job を定義するのは簡単です。入力および出力、map および reduce 実装 (リスト 6 とリスト 7 に記載した実装)、それに出力の型を指定するだけで定義することができます。この例での出力の型は、reduce 実装に使用した型と同じです。


リスト 10. map と reduce を結び付ける Job
public class EarthQuakesPerDayJob {   public static void main(String[] args) throws Throwable {    Job job = new Job();   job.setJarByClass(EarthQuakesPerDayJob.class);   FileInputFormat.addInputPath(job, new Path(args[0]));   FileOutputFormat.setOutputPath(job, new Path(args[1]));    job.setMapperClass(EarthQuakesPerDateMapper.class);   job.setReducerClass(EarthQuakesPerDateReducer.class);   job.setOutputKeyClass(Text.class);   job.setOutputValueClass(IntWritable.class);    System.exit(job.waitForCompletion(true) ? 0 : 1);  } } 

リスト 10 ですべてを結合している main メソッドは、2 つのパラメーターを取ります。1 つは地震の CSV ファイルが置かれているディレクトリー、もう 1 つは結果のレポートを書き込むディレクトリーです (Hadoop はこの出力先ディレクトリーを新規に作成するほうを好みます)。

この小さなフレームワークを実行するには、関連するクラスを jar ファイルにまとめなければなりません。さらに、Hadoop に opencsvバイナリーがある場所を指示する必要もあります。これで、リスト 11 に記載するコマンドラインを使って Hadoop を実行できるようになります。


リスト 11. Hadoop の実行
$> export HADOOP_CLASSPATH=lib/opencsv-2.2.jar $> hadoop jar target/quake.jar com.b50.hadoop.quake.EarthQuakesPerDayJob    ~/temp/mreduce/in/ ~/temp/mreduce/out 

上記のコードを実行して Hadoop がその作業を開始すると、画面上に一連のテキストが流れていきます。注意する点として、私がここで使用している CSV ファイルは、Hadoop が処理するように意図された膨大なデータに比べると、ほんのちっぽけなデータに過ぎません。使用している処理能力によっては、Hadoop は数秒で処理を完了します。

処理が完了した後は、事実上どのエディターでも出力ファイルの内容を確認することができます。あるいはもう 1 つのオプションと、リスト 12 で行っているように、hadoop コマンドを直接使用するという方法もあります。


リスト 12. Hadoop の出力の読み取り
$> hadoop dfs -cat part-r-00000  05-12-2010      43 06-12-2010      143 07-12-2010      112 08-12-2010      136 09-12-2010      178 10-12-2010      114 11-12-2010      114 12-12-2010      79 

私と同じタイプの読者であれば、リスト 12 で最初に気付く点は、1 日あたりの地震の回数の多さでしょう。12月 9日の 1 日だけで 178 回も地震が発生しています。その一方で願わくは、私の意図していた目的を Hadoop がきちんと果たした点 (つまり、指定した範囲のすべての日ごとに、地震の発生回数が整然と一覧形式で出力されています) にもお気づきであればよいのですが。

もう 1 つの Mapper の作成

今度は、地震がどこで発生しているのかを調べて、指定した日付範囲のなかで最も多くの地震を記録している場所がどこであるかを迅速に判断したいと思います。ご想像のとおり、この目的を果たすのも Hadoop を使えば簡単なことです。この場合のキーは日付ではなく場所なので、新しい Mapper クラスを作成します。


リスト 13. 新しい map 実装
public class EarthQuakeLocationMapper extends Mapper<LongWritable, Text, Text,   IntWritable> {  @Override  protected void map(LongWritable key, Text value, Context context) throws IOException,   InterruptedException {   if (key.get() > 0) {    String[] lines = new CSVParser().parseLine(value.toString());    context.write(new Text(lines[9]), new IntWritable(1));   }  } } 

リスト 13 では日付を取得して変換するのではなく、ただ単に、地震が発生した場所を取得しているだけです。これは、CSV 配列で最後の位置にくる要素となっています。

結果を場所とその数値からなる膨大なリストにする代わりに、7 日間で 10 回以上の地震が発生した場所だけに結果を絞り込みます。


リスト 14. 地震が多発している場所はどこか
public class EarthQuakeLocationReducer extends Reducer<Text, IntWritable, Text,   IntWritable> {  @Override  protected void reduce(Text key, Iterable<IntWritable> values, Context context)   throws IOException, InterruptedException {   int count = 0;   for (IntWritable value : values) {    count++;   }   if (count >= 10) {    context.write(key, new IntWritable(count));   }  } } 

リスト 14 のコードはリスト 9 のコードにかなりよく似ていますが、今回は合計 10 回以上地震が発生した場所に絞り込んで出力しています。次に、map  reduce を別の Job 実装で結合し、すべてを jar ファイルにまとめて通常通りに Hadoop を実行すれば、新しい質問への答えを出すことができます。

hadoop dfs コマンドを実行すると、要求した新しい値が表示されます。


リスト 15. 場所ごとの地震発生回数
$> hadoop dfs -cat part-r-00000  Andreanof Islands, Aleutian Islands, Alaska     24 Arkansas        40 Baja California, Mexico 101 Central Alaska  74 Central California      68 Greater Los Angeles area, California    16 Island of Hawaii, Hawaii        16 Kenai Peninsula, Alaska 11 Nevada  15 Northern California     114 San Francisco Bay area, California      21 Southern Alaska 97 Southern California     115 Utah    19 western Montana 11  

リスト 15 から何がわかるかと言うと、まず 1 点目は、メキシコからアラスカまでの北米西海岸は地震の多発地帯だということです。2 点目として、私はこれまで知りませんでしたが、明らかにアーカンソーは断層線の近くに位置します。そして最後の点は、北カリフォルニアまたは南カリフォルニアに住んでいるとしたら (実際、多くのソフトウェア開発者はこの辺りに住んでいます)、地面はほぼ 13 分間隔で揺れているということです。

まとめ

Hadoop によるデータの分析は簡単で、効率的です。この記事では、Hadoop がデータ分析に発揮するはずの実力をほとんどかじってもいません。Hadoop は実際には、分散方式で動作し、map  reduce を実行する各種のノードを調整するように設計されています。この記事の例では、Hadoop をたった 1 つの JVM で実行して、ほんの小さなファイルを処理しただけです。

Hadoop はそれだけでも素晴らしいツールですが、サブプロジェクトからクラウド・ベースの Hadoop サービスに至るまで、Hadoop を中心としたエコシステム全体が成長を続けています。Hadoop エコシステムは、このプロジェクトを充実したコミュニティーが支えていることの証です。このコミュニティーから生まれた数多くのツールは、ビッグ・データ分析をグローバル・ビジネス・アクティビティーとして実現できることを明らかにしています。このように、Hadoop は Google や Yahoo! といった検索エンジンの最大手に限らず、あらゆる類のソフトウェアの革新者と起業家に分散データのマイニングと分析を可能にします。


参考文献

学ぶために

  • 連載「Java 開発 2.0」: この developerWorks 連載では Java 開発の様相を大きく変える技術とツールを探っています。最近の記事では、MongoDB (2010年9月)、CouchDB (2009年11月)、Objectify AppEngine (2010年11月) などを取り上げました。

  • Hadoop による分散データ処理: 第 1 回 導入編」(M. Tim Jones 著、developerWorks、2010年5月): Hadoop に関する連載の第 1 回では、Hadoop ファイルシステム (HDFS) とよく使われるノード・タイプを含め、Hadoop フレームワークについて詳しく説明しています。単一ノードの Hadoop クラスターをインストールして構成する方法を学び、MapReduce アプリケーションの詳細を探ってください。記事の最後では、Hadoop をそのコアとなる Web インターフェースを使用して監視および管理する方法についても説明します。第 2 回第 3 回も読んでください。

  • "クラウドで MapReduce を使用してロード・バランシングを行う (Kirpal A. Venkatesh, et. al., developerWorks, July 2010): クラウド環境で Hadoop の MapReduce フレームワークを実装する方法と、仮想的なロード・バランシングを使って単一ノード・システムと複数ノード・システム両方のパフォーマンスを改善する方法を学びましょう。

  • A profile of Apache Hadoop MapReduce computing efficiency, Part 1」(Paul Burkhardt 著、Cloudera Development Center、2010年12月): MapReduce アプリケーションがコンピューティング・リソースをいかに効率的に使用するかを 2回にわたって解説しています。この前半の記事では、コンピューティングの効率性を Hadoop MapReduce アプリケーションの評価との関連で概説しています。

  • Hadoop companies everywhere」(Alex Handy 著、SD Times、2009年7月): 多数の企業が毎日大量のデータを生成していますが、そのデータからビジネス情報を引き出している企業は多くありません。これは大きなチャンスであると Handy は語っています。

0 件のコメント:

コメントを投稿