読者です 読者をやめる 読者になる 読者になる

スゴいと噂のsparkを動かしてみる

やりたいこと

sparkとかいう分散処理系の何かがスゴいらしいので、とりあえず動かしてみたい

sparkって何ができるの??

一言「分散処理」

ただ、分散処理でも、Hadoopと違って、ストレージに保存せずにon memoryで分散処理を実行できる

いちいちストレージに保存する必要がない処理、例えば機械学習のイテレート処理で効果を発揮する。

紹介記事

元資料


やってみたこと

sparkのインストール

まずはsparkをインストールする

とは、言っても、バイナリファイルを落としてきて、移動するだけだけら、問題はない・・・はず

ここから、まずは落としてくる

で、tarファイルを展開して、任意の場所に移動しておく

とりあえずは/usr/local/share/spark/に移動しておいた

パスが通っていなかったら、通しておいてくださいね

参考記事

コマンドラインインタラクティブに実行する

公式マニュアルを見ながらやってみる

sparkを置いてあるパスに移動して、bin/spark-shellを実行するだけ(localマシンをmasterにして、sparkが立ち上がる)

--master local[スレッド数]と指定すると、スレッド数を指定して実行できる

pythonの表記で実行したい場合は./bin/pysparkを実行する。また別の機会に取り組んでみる

% cd /usr/local/share/spark
% bin/spark-shell

// ここらへんになんかいっぱいログが出る。長いので省略
// あと、こんなかっこいいロゴがでる

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.2.1
      /_/

// コマンドの待機状態になったら、起動終了
scala> 

基本集計っぽいことをやってみる

// ファイルをRDDの上に乗っけてみる。RDDってのは、HDFSみたいなもの。まぁ、メモリだ
// sparkのルートディレクトリ、つまり、今回は`/usr/local/share/spark/`から見た相対パスなので要注意
scala> val textFile = sc.textFile("README.md")  


// 読み込んでるっぽいログ

scala> textFile.count()  // 行数を数えるコマンド
// なんかログ
res1: Long = 98

// "Spark"が含まれる行だけをフィルタリングして、別の変数に格納(実際は、変数linesWithSparkは新しいRDDの上に生成される)
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = FilteredRDD[2] at filter at <console>:14

// フィルタリングとアクションはつなげて(chainして)実行することもできる
// "Spark"がある行をフィルタリングして、"Spark"の出現回数を数える
scala> textFile.filter(line => line.contains("Spark")).count()
res3: Long = 15

map/reduceをやってみる

公式マニュアルを見ながらやってみる

さっきのコードの続き。textFileオブジェクトは引き継ぎ

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

やっていることは、「1行あたりの最大の単語数を探す」

map()メソッド内でmap処理を実行している

map処理の段階で新しいRDDが生成される。

で、reduceメソッドで、RDDを呼び出し、最大の単語数を探す命令を実行している

単語の出現頻度をカウントする

val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)

flatMapメソッドは、mapと似てるけど、integer以外も返せる命令。今は単語(string)を返している

mapメソッド(word, 1)のタプルを作り出している(初期化)

reduceByKeyメソッドは集計関数を引数に渡せるメソッド。集計関数は「input:(K, V)のペア。Kでまとめて(V, V) => Vを実行して(つまり集計)する。output:(K, V)」を渡す。今回は「input:(string, int)のペア。stringでまとめて、(a, b) => a + b(つまり、加算)をする。output:(string, int)」という関数

最後に集計結果を見る

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

collectメソッドは、「dataをarrayの形で返す。」っていうメソッド。普通は集計の後に実行する。

これで、単語の出現頻度の集計おしまい。あらら、簡単ですわぁ

その他、transformationメソッドの説明はここに。

sparkを呼び出すscalaコードを書く

次に、scalaのコードで処理を書いて、スタンドアローンに実行する場合。

流れを書くと

  • 1 scalaでsparkを呼び出して演算するコードを書く
  • 2 sbt assemblyとかsbt packageとかで.jarファイル化する
  • 3 spark-submitコマンドで.jarファイルを呼び出して、処理をする

になる。

ここの記事の通りにやってみた

その他sparkを使った実例など

おもしろい実例だとtwitterの集計につかっている

まとめ

今回はローカルマシン1台でやったから価値が実感できてないけど、sparkになんかすごそうだね!

次はpythonでの呼び出しとか、spark-sqlとか、複数台の分散処理とかやってみるか