JTPAのギークサロンに参加してきた。今回は参加者がラップトップ持ち込みでコーディングしていくハッカソン形式で、会場入りする前までにHadoopが使える環境を自前で用意しておく必要があった。
1月13日(金)にPalo Altoで行われた前編では、Hadoopを使って計算処理をするための準備として、EC2上にHadoopクラスタを構築する方法を紹介した。後編では実際にギークサロンで手を動かしてやったことを中心に紹介していこう。
WordCount.java – 最初のサンプルコード
Hadoop Tutorial / Module4: MapReduceで紹介されているWordCount.java
を使って、EC2上のHadoopクラスタで処理をさせてみる。いわゆる"Hello, wolrd!"
的なサンプル。
まずはEC2上のUbuntuにSSHでログインして、Hadoopのクラスタを立ち上げ、Hadoopクラスタのマスターノードにログインする。
local$ ec2hadoop ubuntu$ hadoop-ec2 launch-cluster hadoop-cluster 2 ubuntu$ hadoop-ec2 login hadoop-cluster cluster#
ホームディレクトリに作業用ディレクトリを作成して、そこにWordCount.java
のソースコードをコピーする。
cluster# mkdir ~/cordcount cluster# cp $HADOOP_HOME/src/examples/org/apache/hadoop/examples/WordCount.java ~/wordcount
WordCount.java
は複数のテキストファイルを入力として、ファイル中の単語をすべてカウントして、各単語の出現回数集計するアプリ。簡略化したソースコードを以下に掲載しておこう。
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.examples; import java.io.IOException; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class WordCount extends Configured implements Tool { /** * Counts the words in each line. * For each line of input, break the line into words and emit them. */ public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one); } } } // A reducer class that just emits the sum of the input values. public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } // The main driver for word count map/reduce program. // Invoke this method to submit the map/reduce job. public static void main(String[] args) throws Exception { JobConf conf = new JobConf(getConf(), WordCount.class); conf.setJobName("wordcount"); // the keys are words (strings) conf.setOutputKeyClass(Text.class); // the values are counts (ints) conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }
自分はJavaに馴染みないのだが、短いプログラムなので理解するは難しくない。MapClass
クラスのmap()
関数で、入力ファイルの各行をStringTokenizer
で単語に区切り、<単語, 1>というペアを出力する(Map処理)。次にReduce
クラスのreduce()
関数で、MapClass
の出力したペアの値に対して総計を算出する(Reduce処理)。main()
関数では、JobConf
をセットアップして、コマンドライン引数で渡された入出力先ディレクトリを指定している。
HadoopのMap/Reduceチュートリアルのウォークスルーの節に、詳しい説明が日本語で書かれているので参考にするといいだろう。
WordCount.java
をコンパイルして、クラスコードをjarファイルにパッケージングする。
cluster# cd ~/wordcount cluster# mkdir classes cluster# javac -classpath $HADOOP_HOME/hadoop-0.19.0-core.jar -d classes WordCount.java cluster# jar -cf wordcount.jar -C classes . cluster# tree . |-- WordCount.java |-- classes | `-- org | `-- apache | `-- hadoop | `-- examples | |-- WordCount$MapClass.class | |-- WordCount$Reduce.class | `-- WordCount.class `-- wordcount.jar 5 directories, 5 files
分散ファイルシステム(HDFS)の構築
Hadoopの入出力ファイルは、ローカルのファイルシステムとは異なる、Hadoop専用の分散ファイルシステム(HDFS: Hadoop Distributed File System)を介して配置しなければならない。HadoopクラスタではデフォルトでHDFSがマウントされているので、ファイルシステム自体のセットアップは不要だ。HDFS内のファイルやディレクトリを操作するにはhadoop dfs
コマンドを使う(参考:Hadoop HDFSコマンド実行メモ)。
まず、WordCountの入力ディレクトリをHDFS内に作成する。
cluster# hadoop dfs -mkdir input cluster# hadoop dfs -ls Found 1 items drwxr-xr-x - root supergroup 0 2012-01-24 18:33 /user/root/input
Hadoopのマスターノードにrootとしてログインしているので、/user/root
がHDFS内でのホームディレクトリになっている。試しにローカルファイルシステムに適当なファイルを作成して、それをHDFS内のinput
ディレクトリにコピーしてみよう。
cluster# echo Hello > hello.txt cluster# hadoop dfs -put hello.txt input/ cluster# hadoop dfs -lsr drwxr-xr-x - root supergroup 0 2012-01-24 18:38 /user/root/input -rw-r--r-- 3 root supergroup 6 2012-01-24 18:38 /user/root/input/hello.txt cluster# hadoop dfs -cat input/hello.txt Hello cluster# hadoop dfs -rm input/hello.txt cluster# rm -f hello.txt
では実際にWordCountのアプリケーションで処理させる入力ファイルを用意しよう。それなりのボリュームの入力があった方がいいので、RFCドキュメントをローカルのファイルシステム上に巡回ダウンロードするスクリプトを作ってみた。
cluster# cd ~/wordcount cluster# vi wgetrfc
#!/bin/bash INDEX=1 COUNTER=0 BASEURL=http://www.ietf.org/rfc OUTDIR=$PWD/input rm -rf $OUTDIR mkdir -p $OUTDIR while test $COUNTER -lt $1; do FILENAME=rfc$INDEX if wget -q -O $OUTDIR/$FILENAME $BASEURL/$FILENAME; then COUNTER=`expr $COUNTER + 1` echo &amp;quot;$FILENAME OK&amp;quot; else rm -rf $OUTDIR/$FILENAME fi INDEX=`expr $INDEX + 1` done
とりあえず100個のRFCドキュメントをダウンロードしておこう。
cluster# chmod +x wgetrfc cluster# ./wgetrc 100 rfc1 OK rfc2 OK ... rfc103 OK cluster# cat input/* | wc 32907 157746 1162188
3つくらい欠番があるようだが、1MBほどの入力ファイルができたのでHDFSにコピーする。
cluster# hadoop dfs -put input/* input cluster# hadoop dfs -ls input Found 100 items -rw-r--r-- 3 root supergroup 21088 2012-01-24 19:22 /user/root/input/rfc1.txt -rw-r--r-- 3 root supergroup 4510 2012-01-24 19:22 /user/root/input/rfc10.txt ... -rw-r--r-- 3 root supergroup 24529 2012-01-24 19:22 /user/root/input/rfc98.txt -rw-r--r-- 3 root supergroup 1010 2012-01-24 19:22 /user/root/input/rfc99.txt
WordCountの実行
これでWordCountを実行する準備が整った。以下のコマンドでHadoopクラスタに処理をさせてみよう。WordCountの第1引数は入力ファイル群が置かれているHDFS上のディレクトリ(/user/root/input
)、第2引数は同じくHDFS上の出力先のディレクトリ(/user/root/output
)を指定する。出力先ディレクトリは自分で先に作ってしまうと”Output directory already exists”というFileAlreadyExistsException
エラーが発生してしまう。Hadoopが勝手に生成してくれるのでお膳立ては不要だ。
cluster# cd ~/wordcount cluster# hadoop jar wordcount.jar org.apache.hadoop.examples.WordCount input output 12/01/24 19:36:54 INFO mapred.FileInputFormat: Total input paths to process : 100 12/01/24 19:36:54 INFO mapred.JobClient: Running job: job_201201241700_0001 12/01/24 19:36:55 INFO mapred.JobClient: map 0% reduce 0% 12/01/24 19:37:03 INFO mapred.JobClient: map 1% reduce 0% ... 12/01/24 19:40:51 INFO mapred.JobClient: Map output records=157747 12/01/24 19:40:51 INFO mapred.JobClient: Reduce input records=56339
Map/Reduceの処理結果はoutput/part-00000.deflate
というファイルに出力されている。
cluster# hadoop dfs -ls output Found 2 items drwxr-xr-x - root supergroup 0 2012-01-24 19:36 /user/root/output/_logs -rw-r--r-- 3 root supergroup 67154 2012-01-24 19:40 /user/root/output/part-00000.deflate
hadoop dfs -cat
コマンドで出力ファイルの内容を標準出力にダンプしてみるとわかるが、中身はバイナリデータになっている。拡張子の.deflate
から推測できるようにDeflateアルゴリズムで圧縮されているからだ。Hadoopではクラスタ間でデータを転送する際のネットワーク帯域を節約するために、デフォルトでデータを圧縮するようになっているらしい。
Deflateの圧縮を解凍するために、unzipコマンドを使ってみたりしたがうまくいかなかったので(誰か知っていたら教えてください)、Hadoopの設定ファイルを変更して出力データを圧縮しないようにする。$HADOOP_HOME/conf/hadoop-site.xml
をエディタで開いて、mapred.output.compress
の値をtrue
からfalse
に変更する。
cluster# vi $HADOOP_HOME/conf/hadoop-site.xml
<property> <name>mapred.output.compress</name> <value>false</value> </property>
出力ディレクトリを削除して、WordCountを再実行しよう。
cluster# cd ~/wordcount cluster# hadoop dfs -rmr output cluster# hadoop jar wordcount.jar org.apache.hadoop.examples.WordCount input output cluster# hadoop dfs -ls output Found 2 items drwxr-xr-x - root supergroup 0 2012-01-24 20:01 /user/root/output/_logs -rw-r--r-- 3 root supergroup 186137 2012-01-24 20:04 /user/root/output/part-00000
今度は.deflate
という拡張子は付いていないので、圧縮されていないことがわかる。実行結果を標準出力に表示させてみよう。
cluster# hadoop dfs -cat output/part-00000 ! 13 !=, 1 " 22 "#33, 1 "--" 1 ... ||=======>| 1 ||=>- 1 } 6 ~ 4 �sec 1
記号ばかりだがテキストとして読める状態にはなっている。HDFS上のファイルをローカルのファイルシステムに取り出す場合は
cluster# hadoop dfs -get output/part-00000 .
を実行すればよい。エディタでファイルの中身を見てみるといいだろう。
WordCountは単語の出現回数を数えるだけのつまらないプログラムだが、Map/ReduceのアーキテクチャやHDFSの操作方法を学ぶには十分なサンプルだ。
$HADOOP_HOME/src/examples/org/apache/hadoop/examples/
には他のサンプルもあるので、どのようにMap/Reduceの処理をさせるか参考にするといいだろう。サンプルソースを読むだけなら、わざわざHadoopクラスタのマスターノードにログインしなくても、Hadoopのソースをダウンロードすればローカルで学習できる。
以下、ギークサロンをホストしてくださった山中氏よりコメント。
私は以前から画像処理など各種科学技術計算用のPCクラスタを開発してきました。オリジナルで分散PCクラスタを構築しようとすると、非常に沢山の機能が必要になります。例えば実行ファイルや入力データを各ワークステーションに配置する機能や中間実行結果を取り出して他のクラスタに配信する機能、リモートでプロセスの実行を管理したり発生したエラーをハンドリングする機能も必要です。これらを全て提供し、かつJavaのクラスファイルから柔軟に計算フレームワークを構築できるのがHadoopなのです。
Amazon S3ストレージをHDFSとして使う
ここまではマスターノードのローカルにマウントされたデフォルトのHDFSを使ってきたが、今度はAmazon S3にHDFSをマウントするようにしてみる。S3をHDFSのストレージとして利用することで、多数のHadoopクラスタから入出力先を相互に設定できるようになる。例えば、1つのHadoopクラスタが出力したデータをS3上に保存しておき、別のHadoopクラスタがそれを入力データとして利用するといったことが可能になる。Hadoopはクラウド環境と非常に相性がいい。
ローカルにマウントされたHDFSは、EC2上でHadoopクラスタを稼働させている間でなければ、入出力データを取り出すことができないという欠点がある。EC2 API Toolsのhadoop-ec2 launch-cluster
コマンドによって起動されるHadoopクラスタはルートデバイスにインスタンスストアを使っている(EBSではない)ため、hadoop-ec2 terminate-cluster
コマンドでHadoopクラスタを終了すると同時に、HDFSに保存されていたデータも消失する。EC2上でHadoopクラスタを構築すると、たとえ計算処理をしていなくても、インスタンスを立ち上げているだけで課金されてしまい、油断すると簡単にクラウド破産しかねない。Hadoopの出力データをS3上に保存できれば、Hadoopクラスタが不要になったら即インスタンスを停止、または終了させることができるので、そういった面でもS3をストレージとして使うメリットは大きいだろう。
それではまず、Amazon S3のセットアップをしておこう。
AWSの管理コンソールにログインして[Amazon S3]のタブを開き、左上にある[Create Bucket]ボタンをクリックする。
ダイアログが表示されたら、[Bucket Name]にS3バケット名を入力する。
なおバケット名には、Amazon S3を使用しているすべてのユーザー間で一意なものを選ばなければならない。ここで設定している「h2plus-hadoop-hdfs
」と同じバケット名はもう使えないので、オリジナルのバケット名を付けるようにしよう。ダイアログで[Create]ボタンをクリックするとS3バケットが作成される。
次にHadoopのマスターノード上で$HADOOP_HOME/conf/hadoop-site.xml
を編集する。
<property> <name>fs.default.name</name> <value>s3://h2plus-hadoop-hdfs</value> </property> <property> <name>fs.s3.awsAccessKeyId</name> <value>Your Access key ID</value> </property> <property> <name>fs.s3.awsSecretAccessKey</name> <value>Your Secret Access Key</value> </property>
fs.default.name
を先ほど作成したS3バケットへのURLに変更して、後続の行にfs.s3.awsAccessKeyId
とfs.s3.awsSecretAccessKey
というプロパティを追加する。fs.s3.awsAccessKeyId
とfs.s3.awsSecretAccessKey
の値は、前編でhadoop-ec2-env.sh
に設定したものと同じで、AWSのSecurity CredentialsページのAccess Credentialsに表示されていたものだ。
これでHDFSがS3上にマウントされるようになった。
まず、WordCount用の入力データをHDFSに転送する。AWSの管理コンソールにはS3バケットにファイルをアップロードする機能がついているが、HDFSには対応していないためhadoop dfs
コマンドを使う必要がある。
cluster# cd ~/wordcount cluster# hadoop dfs -mkdir input cluster# hadoop dfs -put input/* input
AWSの管理コンソールからS3バケットの中身を見てみると、なにやらファイルがたくさん作られているのがわかる。
では同じようにHadoopクラスタにWordCountを処理させてみよう。
cluster# hadoop jar wordcount.jar org.apache.hadoop.examples.WordCount input output 12/01/24 20:34:08 INFO mapred.FileInputFormat: Total input paths to process : 100 12/01/24 20:34:10 INFO mapred.JobClient: Running job: job_201201241846_0001 12/01/24 20:34:11 INFO mapred.JobClient: map 0% reduce 0% ... 12/01/24 20:37:27 INFO mapred.JobClient: Combine input records=157747 12/01/24 20:37:27 INFO mapred.JobClient: Map output records=157747 12/01/24 20:37:27 INFO mapred.JobClient: Reduce input records=56339 cluster# hadoop dfs -ls output Found 2 items drwxrwxrwx - 0 1969-12-31 19:00 /user/root/output/_logs -rwxrwxrwx 1 186137 1969-12-31 19:00 /user/root/output/part-00000
設定ファイル(hadoop-site.xml
)を変更するだけで、簡単にS3のクラウドストレージ上に分散ファイルシステムを構築することができた。
Hadoopクラスタのマスターノードからログアウトして、クラスタ用のEC2インスタンスをすべて停止させた後でも、ローカル環境にHadoopをインストールしておけば、後からクラスタの出力を取得することができる。
cluster# exit ubuntu$ hadoop-ec2 terminate-cluster hadoop-cluster ubuntu$ exit local$ hadoop dfs -ls output Found 2 items drwxrwxrwx - 0 1969-12-31 19:00 /user/root/output/_logs -rwxrwxrwx 1 186137 1969-12-31 19:00 /user/root/output/part-00000 local$ hadoop dfs -get output/part-00000
感想
ギークサロンで手を動かしてHadoopに触ったのはここまでだった。だいたい18時くらいから人が集まりだして、22時近くまで「あ~そうか!」とか「動かねぇorz」とかこぼしながら、参加者全員がラップトップに向かって黙々とHadoopと戯れていた。
今回、山中氏が準備してくださった事前資料は、「Hadoopって聞いたことあるけど触ったことない」という参加者を想定して作られていたこともあり、ウィザード形式で着々とHadoopの環境を整備することができ、ギークサロンではHadoopの分散並列処理を目の前で体感することができた。
実はギークサロンで予定されていた内容はもうワンステップ先があり、1GBほどのfacebookのユーザーデータを解析するという、より実践的なハッカソンとなるはずで、このブログエントリのソースにもなっている山中氏の資料でも続きがあるのだが、ひとまずはHadoopの入門ということでここで区切りたいと思う(余裕があったら別エントリ立てるかも)。
ブログへの転載を快諾してくださった山中仁氏に、この場を借りて御礼申し上げます。
Hadoopの情報を探していたらここにたどり着いた(笑)
大変参考になりました。
https://github.com/jaoki/hadoop_wordcount
にmaven でラップしたのをおいておくので参考までに。
junさん>
灯台下暗しってヤツですねw
GitHubのリンクもありがとうございます!