下面我們想說一下怎樣使用Spark API編寫一個獨立的應用程序。 這里使用Scala (SBT構建工具)和Java舉例。 (Python官方文檔中有,譯者未翻譯)
~~~
/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SimpleApp {
def main(args: Array[String]) {
val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
}
}
~~~
這個程序統計Spark README文件中包含字符`a`和`b`的行數。 注意你需要用你實際的Spark路徑替換 YOUR_SPARK_HOME。 不像上面的Spark shell的例子, 我們初始化一個SparkContext 作為程序的一部分.
我們將一個SparkConf對象傳給SparkContext的構造函數, 它包含了我們程序的信息。
我們的程序依賴Spark API,所以我們包含一個sbt配置文件:simple.sbt 指明Spark是一個依賴, 這個文件也增加了Spark依賴的倉庫(repository):
~~~
name := "Simple Project"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.1.1"
~~~
為了保證sbt工作正常,我們需要將SimpleApp.scala和simple.sbt放入典型的sbt項目布局的文件夾中。 如此一來我們將應用代碼可以打包成一個jar文件, 然后使用spark-submit腳本來運行此程序。
~~~
# Your directory layout should look like this
$ find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.10/simple-project_2.10-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/scala-2.10/simple-project_2.10-1.0.jar
...
Lines with a: 46, Lines with b: 23
~~~
或者使用Java
~~~
/* SimpleApp.java */
import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
public class SimpleApp {
public static void main(String[] args) {
String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
SparkConf conf = new SparkConf().setAppName("Simple Application");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD logData = sc.textFile(logFile).cache();
long numAs = logData.filter(new Function() {
public Boolean call(String s) { return s.contains("a"); }
}).count();
long numBs = logData.filter(new Function() {
public Boolean call(String s) { return s.contains("b"); }
}).count();
System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
}
}
~~~
這個程序統計Spark README文件中包含字符`a`和`b`的行數。. 注意你需要用你實際的Spark路徑替換 YOUR_SPARK_HOME。 不像上面的Spark shell的例子, 我們需要一個JavaSparkContext對象. 我們也創建了RDD (JavaRDD)然后運行transformations. 最后我們傳遞給Spark一個function對象, 這個function對象是一個匿名類,繼承于 spark.api.java.function.Function. Spark開發指南描述了細節. (譯者注: 這是Java 7的語法, 通過Java 8 Lambda表達式,上面的代碼和scala一樣的簡化)
為了編譯此程序,我們需要寫一個Maven pom.xml文件, 增加Spark作為依賴. 注意Spark artifact帶有Scala的版本.
~~~
project>
groupId>edu.berkeleygroupId>
artifactId>simple-projectartifactId>
modelVersion>4.0.0modelVersion>
name>Simple Projectname>
packaging>jarpackaging>
version>1.0version>
dependencies>
dependency>
groupId>org.apache.sparkgroupId>
artifactId>spark-core_2.10artifactId>
version>1.1.1version>
dependency>
dependencies>
project>
~~~
使用Maven項目的布局:
~~~
$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java
~~~
現在,我們使用Maven打包并使用./bin/spark-submit執行此程序.
~~~
# Package a jar containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23
~~~