スゴいと噂のsparkを動かしてみる
やりたいこと
sparkとかいう分散処理系の何かがスゴいらしいので、とりあえず動かしてみたい
sparkって何ができるの??
一言「分散処理」
ただ、分散処理でも、Hadoopと違って、ストレージに保存せずにon memoryで分散処理を実行できる
いちいちストレージに保存する必要がない処理、例えば機械学習のイテレート処理で効果を発揮する。
やってみたこと
sparkのインストール
まずはsparkをインストールする
とは、言っても、バイナリファイルを落としてきて、移動するだけだけら、問題はない・・・はず
ここから、まずは落としてくる
で、tarファイルを展開して、任意の場所に移動しておく
とりあえずは/usr/local/share/spark/
に移動しておいた
パスが通っていなかったら、通しておいてくださいね
コマンドラインでインタラクティブに実行する
公式マニュアルを見ながらやってみる
sparkを置いてあるパスに移動して、bin/spark-shell
を実行するだけ(localマシンをmasterにして、sparkが立ち上がる)
※ --master local[スレッド数]
と指定すると、スレッド数を指定して実行できる
※ pythonの表記で実行したい場合は./bin/pyspark
を実行する。また別の機会に取り組んでみる
% cd /usr/local/share/spark % bin/spark-shell // ここらへんになんかいっぱいログが出る。長いので省略 // あと、こんなかっこいいロゴがでる Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.2.1 /_/ // コマンドの待機状態になったら、起動終了 scala>
基本集計っぽいことをやってみる
// ファイルをRDDの上に乗っけてみる。RDDってのは、HDFSみたいなもの。まぁ、メモリだ // sparkのルートディレクトリ、つまり、今回は`/usr/local/share/spark/`から見た相対パスなので要注意 scala> val textFile = sc.textFile("README.md") // 読み込んでるっぽいログ scala> textFile.count() // 行数を数えるコマンド // なんかログ res1: Long = 98 // "Spark"が含まれる行だけをフィルタリングして、別の変数に格納(実際は、変数linesWithSparkは新しいRDDの上に生成される) scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) linesWithSpark: org.apache.spark.rdd.RDD[String] = FilteredRDD[2] at filter at <console>:14 // フィルタリングとアクションはつなげて(chainして)実行することもできる // "Spark"がある行をフィルタリングして、"Spark"の出現回数を数える scala> textFile.filter(line => line.contains("Spark")).count() res3: Long = 15
map/reduceをやってみる
公式マニュアルを見ながらやってみる
さっきのコードの続き。textFile
オブジェクトは引き継ぎ
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) res4: Long = 15
やっていることは、「1行あたりの最大の単語数を探す」
map()
メソッド内でmap処理を実行している
map処理の段階で新しいRDDが生成される。
で、reduceメソッドで、RDDを呼び出し、最大の単語数を探す命令を実行している
単語の出現頻度をカウントする
val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
flatMapメソッドは、mapと似てるけど、integer以外も返せる命令。今は単語(string)を返している
mapメソッドで(word, 1)
のタプルを作り出している(初期化)
reduceByKeyメソッドは集計関数を引数に渡せるメソッド。集計関数は「input:(K, V)のペア。Kでまとめて(V, V) => V
を実行して(つまり集計)する。output:(K, V)」を渡す。今回は「input:(string, int)のペア。stringでまとめて、(a, b) => a + b
(つまり、加算)をする。output:(string, int)」という関数
最後に集計結果を見る
scala> wordCounts.collect() res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
collectメソッドは、「dataをarrayの形で返す。」っていうメソッド。普通は集計の後に実行する。
これで、単語の出現頻度の集計おしまい。あらら、簡単ですわぁ
その他、transformationメソッドの説明はここに。
sparkを呼び出すscalaコードを書く
次に、scalaのコードで処理を書いて、スタンドアローンに実行する場合。
流れを書くと
- 1 scalaでsparkを呼び出して演算するコードを書く
- 2
sbt assembly
とかsbt package
とかで.jarファイル化する - 3
spark-submit
コマンドで.jarファイルを呼び出して、処理をする
になる。
ここの記事の通りにやってみた
その他sparkを使った実例など
おもしろい実例だとtwitterの集計につかっている
まとめ
今回はローカルマシン1台でやったから価値が実感できてないけど、sparkになんかすごそうだね!