1月 252012
 

Hadoopで始める並列データ解析/後編 | Inhale n’ Exhale1月13日(金)にPalo Altoで行われたJTPAのギークサロンに参加してきた。今回は参加者がラップトップ持ち込みでコーディングしていくハッカソン形式で、会場入りする前までにHadoopが使える環境を自前で用意しておく必要があった。

前編では、Hadoopを使って計算処理をするための準備として、EC2上にHadoopクラスタを構築する方法を紹介した。後編では実際にギークサロンで手を動かしてやったことを中心に紹介していこう。

WordCount.java – 最初のサンプルコード

Hadoop Tutorial / Module4: MapReduceで紹介されているWordCount.javaを使って、EC2上のHadoopクラスタで処理をさせてみる。いわゆる"Hello, wolrd!"的なサンプル。

まずはEC2上のUbuntuにSSHでログインして、Hadoopのクラスタを立ち上げ、Hadoopクラスタのマスターノードにログインする。

local$ ec2hadoop
ubuntu$ hadoop-ec2 launch-cluster hadoop-cluster 2
ubuntu$ hadoop-ec2 login hadoop-cluster
cluster#

ホームディレクトリに作業用ディレクトリを作成して、そこにWordCount.javaのソースコードをコピーする。

cluster# mkdir ~/cordcount
cluster# cp $HADOOP_HOME/src/examples/org/apache/hadoop/examples/WordCount.java ~/wordcount

WordCount.javaは複数のテキストファイルを入力として、ファイル中の単語をすべてカウントして、各単語の出現回数集計するアプリ。簡略化したソースコードを以下に掲載しておこう。

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class WordCount extends Configured implements Tool {
  /**
   * Counts the words in each line.
   * For each line of input, break the line into words and emit them.
   */
  public static class MapClass extends MapReduceBase
    implements Mapper<LongWritable, Text, Text, IntWritable> {
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    
    public void map(LongWritable key, Text value, 
                    OutputCollector<Text, IntWritable> output, 
                    Reporter reporter) throws IOException {
      String line = value.toString();
      StringTokenizer itr = new StringTokenizer(line);
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        output.collect(word, one);
      }
    }
  }
  
  // A reducer class that just emits the sum of the input values.
  public static class Reduce extends MapReduceBase
    implements Reducer<Text, IntWritable, Text, IntWritable> {
    
    public void reduce(Text key, Iterator<IntWritable> values,
                       OutputCollector<Text, IntWritable> output, 
                       Reporter reporter) throws IOException {
      int sum = 0;
      while (values.hasNext()) {
        sum += values.next().get();
      }
      output.collect(key, new IntWritable(sum));
    }
  }

  // The main driver for word count map/reduce program.
  // Invoke this method to submit the map/reduce job.
  public static void main(String[] args) throws Exception {
    JobConf conf = new JobConf(getConf(), WordCount.class);
    conf.setJobName("wordcount");
 
    // the keys are words (strings)
    conf.setOutputKeyClass(Text.class);
    // the values are counts (ints)
    conf.setOutputValueClass(IntWritable.class);
    
    conf.setMapperClass(MapClass.class);        
    conf.setCombinerClass(Reduce.class);
    conf.setReducerClass(Reduce.class);
    
    FileInputFormat.setInputPaths(conf, new Path(args[0]));
    FileOutputFormat.setOutputPath(conf, new Path(args[1]));
        
    JobClient.runJob(conf);
  }
}

自分はJavaに馴染みないのだが、短いプログラムなので理解するは難しくない。MapClassクラスのmap()関数で、入力ファイルの各行をStringTokenizerで単語に区切り、<単語, 1>というペアを出力する(Map処理)。次にReduceクラスのreduce()関数で、MapClassの出力したペアの値に対して総計を算出する(Reduce処理)。main()関数では、JobConfをセットアップして、コマンドライン引数で渡された入出力先ディレクトリを指定している。

HadoopのMap/Reduceチュートリアルのウォークスルーの節に、詳しい説明が日本語で書かれているので参考にするといいだろう。

WordCount.javaをコンパイルして、クラスコードをjarファイルにパッケージングする。

cluster# cd ~/wordcount
cluster# mkdir classes
cluster# javac -classpath $HADOOP_HOME/hadoop-0.19.0-core.jar -d classes WordCount.java
cluster# jar -cf wordcount.jar -C classes .
cluster# tree
.
|-- WordCount.java
|-- classes
|   `-- org
|       `-- apache
|           `-- hadoop
|               `-- examples
|                   |-- WordCount$MapClass.class
|                   |-- WordCount$Reduce.class
|                   `-- WordCount.class
`-- wordcount.jar
5 directories, 5 files

分散ファイルシステム(HDFS)の構築

Hadoopの入出力ファイルは、ローカルのファイルシステムとは異なる、Hadoop専用の分散ファイルシステム(HDFS: Hadoop Distributed File System)を介して配置しなければならない。HadoopクラスタではデフォルトでHDFSがマウントされているので、ファイルシステム自体のセットアップは不要だ。HDFS内のファイルやディレクトリを操作するにはhadoop dfsコマンドを使う(参考:Hadoop HDFSコマンド実行メモ)。

まず、WordCountの入力ディレクトリをHDFS内に作成する。

cluster# hadoop dfs -mkdir input
cluster# hadoop dfs -ls
Found 1 items
drwxr-xr-x   - root supergroup          0 2012-01-24 18:33 /user/root/input

Hadoopのマスターノードにrootとしてログインしているので、/user/rootがHDFS内でのホームディレクトリになっている。試しにローカルファイルシステムに適当なファイルを作成して、それをHDFS内のinputディレクトリにコピーしてみよう。

cluster# echo Hello > hello.txt
cluster# hadoop dfs -put hello.txt input/
cluster# hadoop dfs -lsr
drwxr-xr-x   - root supergroup          0 2012-01-24 18:38 /user/root/input
-rw-r--r--   3 root supergroup          6 2012-01-24 18:38 /user/root/input/hello.txt
cluster# hadoop dfs -cat input/hello.txt
Hello
cluster# hadoop dfs -rm input/hello.txt
cluster# rm -f hello.txt

では実際にWordCountのアプリケーションで処理させる入力ファイルを用意しよう。それなりのボリュームの入力があった方がいいので、RFCドキュメントをローカルのファイルシステム上に巡回ダウンロードするスクリプトを作ってみた。

cluster# cd ~/wordcount
cluster# vi wgetrfc
#!/bin/bash

INDEX=1
COUNTER=0
BASEURL=http://www.ietf.org/rfc
OUTDIR=$PWD/input

rm -rf $OUTDIR
mkdir -p $OUTDIR

while test $COUNTER -lt $1; do
  FILENAME=rfc$INDEX
  if wget -q -O $OUTDIR/$FILENAME $BASEURL/$FILENAME; then
    COUNTER=`expr $COUNTER + 1`
    echo &amp;amp;quot;$FILENAME OK&amp;amp;quot;
  else
    rm -rf $OUTDIR/$FILENAME
  fi
  INDEX=`expr $INDEX + 1`
done

とりあえず100個のRFCドキュメントをダウンロードしておこう。

cluster# chmod +x wgetrfc
cluster# ./wgetrc 100
rfc1 OK
rfc2 OK
...
rfc103 OK
cluster# cat input/* | wc
  32907  157746 1162188

3つくらい欠番があるようだが、1MBほどの入力ファイルができたのでHDFSにコピーする。

cluster# hadoop dfs -put input/* input
cluster# hadoop dfs -ls input
Found 100 items
-rw-r--r--   3 root supergroup      21088 2012-01-24 19:22 /user/root/input/rfc1.txt
-rw-r--r--   3 root supergroup       4510 2012-01-24 19:22 /user/root/input/rfc10.txt
...
-rw-r--r--   3 root supergroup      24529 2012-01-24 19:22 /user/root/input/rfc98.txt
-rw-r--r--   3 root supergroup       1010 2012-01-24 19:22 /user/root/input/rfc99.txt

WordCountの実行

これでWordCountを実行する準備が整った。以下のコマンドでHadoopクラスタに処理をさせてみよう。WordCountの第1引数は入力ファイル群が置かれているHDFS上のディレクトリ(/user/root/input)、第2引数は同じくHDFS上の出力先のディレクトリ(/user/root/output)を指定する。出力先ディレクトリは自分で先に作ってしまうと”Output directory already exists”というFileAlreadyExistsExceptionエラーが発生してしまう。Hadoopが勝手に生成してくれるのでお膳立ては不要だ。

cluster# cd ~/wordcount
cluster# hadoop jar wordcount.jar org.apache.hadoop.examples.WordCount input output
12/01/24 19:36:54 INFO mapred.FileInputFormat: Total input paths to process : 100
12/01/24 19:36:54 INFO mapred.JobClient: Running job: job_201201241700_0001
12/01/24 19:36:55 INFO mapred.JobClient:  map 0% reduce 0%
12/01/24 19:37:03 INFO mapred.JobClient:  map 1% reduce 0%
...
12/01/24 19:40:51 INFO mapred.JobClient:     Map output records=157747
12/01/24 19:40:51 INFO mapred.JobClient:     Reduce input records=56339

Map/Reduceの処理結果はoutput/part-00000.deflateというファイルに出力されている。

cluster# hadoop dfs -ls output
Found 2 items
drwxr-xr-x   - root supergroup          0 2012-01-24 19:36 /user/root/output/_logs
-rw-r--r--   3 root supergroup      67154 2012-01-24 19:40 /user/root/output/part-00000.deflate

hadoop dfs -catコマンドで出力ファイルの内容を標準出力にダンプしてみるとわかるが、中身はバイナリデータになっている。拡張子の.deflateから推測できるようにDeflateアルゴリズムで圧縮されているからだ。Hadoopではクラスタ間でデータを転送する際のネットワーク帯域を節約するために、デフォルトでデータを圧縮するようになっているらしい。

Deflateの圧縮を解凍するために、unzipコマンドを使ってみたりしたがうまくいかなかったので(誰か知っていたら教えてください)、Hadoopの設定ファイルを変更して出力データを圧縮しないようにする。$HADOOP_HOME/conf/hadoop-site.xmlをエディタで開いて、mapred.output.compressの値をtrueからfalseに変更する。

cluster# vi $HADOOP_HOME/conf/hadoop-site.xml
&lt;property&gt;
  &lt;name&gt;mapred.output.compress&lt;/name&gt;
  &lt;value&gt;false&lt;/value&gt;
&lt;/property&gt;

出力ディレクトリを削除して、WordCountを再実行しよう。

cluster# cd ~/wordcount
cluster# hadoop dfs -rmr output
cluster# hadoop jar wordcount.jar org.apache.hadoop.examples.WordCount input output
cluster# hadoop dfs -ls output
Found 2 items
drwxr-xr-x   - root supergroup          0 2012-01-24 20:01 /user/root/output/_logs
-rw-r--r--   3 root supergroup     186137 2012-01-24 20:04 /user/root/output/part-00000

今度は.deflateという拡張子は付いていないので、圧縮されていないことがわかる。実行結果を標準出力に表示させてみよう。

cluster# hadoop dfs -cat output/part-00000
!   13
!=, 1
"   22
"#33,   1
"--"    1
...
||=======>| 1
||=>-   1
}   6
~   4
�sec    1

記号ばかりだがテキストとして読める状態にはなっている。HDFS上のファイルをローカルのファイルシステムに取り出す場合は

cluster# hadoop dfs -get output/part-00000 .

を実行すればよい。エディタでファイルの中身を見てみるといいだろう。

WordCountは単語の出現回数を数えるだけのつまらないプログラムだが、Map/ReduceのアーキテクチャやHDFSの操作方法を学ぶには十分なサンプルだ。

$HADOOP_HOME/src/examples/org/apache/hadoop/examples/には他のサンプルもあるので、どのようにMap/Reduceの処理をさせるか参考にするといいだろう。サンプルソースを読むだけなら、わざわざHadoopクラスタのマスターノードにログインしなくても、Hadoopのソースをダウンロードすればローカルで学習できる。

以下、ギークサロンをホストしてくださった山中氏よりコメント。

私は以前から画像処理など各種科学技術計算用のPCクラスタを開発してきました。オリジナルで分散PCクラスタを構築しようとすると、非常に沢山の機能が必要になります。例えば実行ファイルや入力データを各ワークステーションに配置する機能や中間実行結果を取り出して他のクラスタに配信する機能、リモートでプロセスの実行を管理したり発生したエラーをハンドリングする機能も必要です。これらを全て提供し、かつJavaのクラスファイルから柔軟に計算フレームワークを構築できるのがHadoopなのです。

Amazon S3ストレージをHDFSとして使う

ここまではマスターノードのローカルにマウントされたデフォルトのHDFSを使ってきたが、今度はAmazon S3にHDFSをマウントするようにしてみる。S3をHDFSのストレージとして利用することで、多数のHadoopクラスタから入出力先を相互に設定できるようになる。例えば、1つのHadoopクラスタが出力したデータをS3上に保存しておき、別のHadoopクラスタがそれを入力データとして利用するといったことが可能になる。Hadoopはクラウド環境と非常に相性がいい。

ローカルにマウントされたHDFSは、EC2上でHadoopクラスタを稼働させている間でなければ、入出力データを取り出すことができないという欠点がある。EC2 API Toolsのhadoop-ec2 launch-clusterコマンドによって起動されるHadoopクラスタはルートデバイスにインスタンスストアを使っている(EBSではない)ため、hadoop-ec2 terminate-clusterコマンドでHadoopクラスタを終了すると同時に、HDFSに保存されていたデータも消失する。EC2上でHadoopクラスタを構築すると、たとえ計算処理をしていなくても、インスタンスを立ち上げているだけで課金されてしまい、油断すると簡単にクラウド破産しかねない。Hadoopの出力データをS3上に保存できれば、Hadoopクラスタが不要になったら即インスタンスを停止、または終了させることができるので、そういった面でもS3をストレージとして使うメリットは大きいだろう。

それではまず、Amazon S3のセットアップをしておこう。

AWSの管理コンソールにログインして[Amazon S3]のタブを開き、左上にある[Create Bucket]ボタンをクリックする。

ダイアログが表示されたら、[Bucket Name]にS3バケット名を入力する。

なおバケット名には、Amazon S3を使用しているすべてのユーザー間で一意なものを選ばなければならない。ここで設定している「h2plus-hadoop-hdfs」と同じバケット名はもう使えないので、オリジナルのバケット名を付けるようにしよう。ダイアログで[Create]ボタンをクリックするとS3バケットが作成される。

次にHadoopのマスターノード上で$HADOOP_HOME/conf/hadoop-site.xmlを編集する。

&lt;property&gt;
  &lt;name&gt;fs.default.name&lt;/name&gt;
  &lt;value&gt;s3://h2plus-hadoop-hdfs&lt;/value&gt;
&lt;/property&gt;

&lt;property&gt;
  &lt;name&gt;fs.s3.awsAccessKeyId&lt;/name&gt;
  &lt;value&gt;Your Access key ID&lt;/value&gt;
&lt;/property&gt;

&lt;property&gt;
  &lt;name&gt;fs.s3.awsSecretAccessKey&lt;/name&gt;
  &lt;value&gt;Your Secret Access Key&lt;/value&gt;
&lt;/property&gt;

fs.default.nameを先ほど作成したS3バケットへのURLに変更して、後続の行にfs.s3.awsAccessKeyIdfs.s3.awsSecretAccessKeyというプロパティを追加する。fs.s3.awsAccessKeyIdfs.s3.awsSecretAccessKeyの値は、前編hadoop-ec2-env.shに設定したものと同じで、AWSのSecurity CredentialsページのAccess Credentialsに表示されていたものだ。

これでHDFSがS3上にマウントされるようになった。

まず、WordCount用の入力データをHDFSに転送する。AWSの管理コンソールにはS3バケットにファイルをアップロードする機能がついているが、HDFSには対応していないためhadoop dfsコマンドを使う必要がある。

cluster# cd ~/wordcount
cluster# hadoop dfs -mkdir input
cluster# hadoop dfs -put input/* input

AWSの管理コンソールからS3バケットの中身を見てみると、なにやらファイルがたくさん作られているのがわかる。

では同じようにHadoopクラスタにWordCountを処理させてみよう。

cluster# hadoop jar wordcount.jar org.apache.hadoop.examples.WordCount input output
12/01/24 20:34:08 INFO mapred.FileInputFormat: Total input paths to process : 100
12/01/24 20:34:10 INFO mapred.JobClient: Running job: job_201201241846_0001
12/01/24 20:34:11 INFO mapred.JobClient:  map 0% reduce 0%
...
12/01/24 20:37:27 INFO mapred.JobClient:     Combine input records=157747
12/01/24 20:37:27 INFO mapred.JobClient:     Map output records=157747
12/01/24 20:37:27 INFO mapred.JobClient:     Reduce input records=56339
cluster# hadoop dfs -ls output
Found 2 items
drwxrwxrwx   -          0 1969-12-31 19:00 /user/root/output/_logs
-rwxrwxrwx   1     186137 1969-12-31 19:00 /user/root/output/part-00000

設定ファイル(hadoop-site.xml)を変更するだけで、簡単にS3のクラウドストレージ上に分散ファイルシステムを構築することができた。

Hadoopクラスタのマスターノードからログアウトして、クラスタ用のEC2インスタンスをすべて停止させた後でも、ローカル環境にHadoopをインストールしておけば、後からクラスタの出力を取得することができる。

cluster# exit
ubuntu$ hadoop-ec2 terminate-cluster  hadoop-cluster
ubuntu$ exit
local$ hadoop dfs -ls output
Found 2 items
drwxrwxrwx   -          0 1969-12-31 19:00 /user/root/output/_logs
-rwxrwxrwx   1     186137 1969-12-31 19:00 /user/root/output/part-00000
local$ hadoop dfs -get output/part-00000

感想

ギークサロンで手を動かしてHadoopに触ったのはここまでだった。だいたい18時くらいから人が集まりだして、22時近くまで「あ~そうか!」とか「動かねぇorz」とかこぼしながら、参加者全員がラップトップに向かって黙々とHadoopと戯れていた。

今回、山中氏が準備してくださった事前資料は、「Hadoopって聞いたことあるけど触ったことない」という参加者を想定して作られていたこともあり、ウィザード形式で着々とHadoopの環境を整備することができ、ギークサロンではHadoopの分散並列処理を目の前で体感することができた。

実はギークサロンで予定されていた内容はもうワンステップ先があり、1GBほどのfacebookのユーザーデータを解析するという、より実践的なハッカソンとなるはずで、このブログエントリのソースにもなっている山中氏の資料でも続きがあるのだが、ひとまずはHadoopの入門ということでここで区切りたいと思う(余裕があったら別エントリ立てるかも)。

ブログへの転載を快諾してくださった山中仁氏に、この場を借りて御礼申し上げます。

  3 コメント

  1. Hadoopの情報を探していたらここにたどり着いた(笑)
    大変参考になりました。
    https://github.com/jaoki/hadoop_wordcount
    にmaven でラップしたのをおいておくので参考までに。

    • junさん>
      灯台下暗しってヤツですねw
      GitHubのリンクもありがとうございます!

 返信する

以下のHTML タグと属性が利用できます: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

(required)

(required)

このサイトはスパムを低減するために Akismet を使っています。コメントデータの処理方法の詳細はこちらをご覧ください