<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                # 快速入門 - [安全](https://spark.apache.org/docs/latest/quick-start.html#security) * [使用 Spark Shell 進行交互式分析](#使用-spark-shell-進行交互式分析) * [基礎](#基礎) * [Dataset 上的更多操作](#dataset-上的更多操作) * [緩存](#緩存) * [獨立的應用](#獨立的應用) * [快速跳轉](#快速跳轉) 本教程提供了如何使用 Spark 的快速入門介紹。首先通過運行 Spark 交互式的 shell(在 Python 或 Scala 中)來介紹 API,然后展示如何使用 Java,Scala 和 Python 來編寫應用程序。 為了繼續閱讀本指南,首先從 [Spark 官網](http://spark.apache.org/downloads.html) 下載 Spark 的發行包。因為我們不使用 HDFS,所以你可以下載一個任何 Hadoop 版本的軟件包。 請注意,在 Spark 2.0 之前,Spark 的主要編程接口是彈性分布式數據集(RDD)。 在 Spark 2.0 之后,RDD 被 Dataset 替換,它是像RDD 一樣的 strongly-typed(強類型),但是在引擎蓋下更加優化。 RDD 接口仍然受支持,您可以在 [RDD 編程指南](rdd-programming-guide.html) 中獲得更完整的參考。 但是,我們強烈建議您切換到使用 Dataset(數據集),其性能要更優于 RDD。 請參閱 [SQL 編程指南](sql-programming-guide.html) 獲取更多有關 Dataset 的信息。 # 安全 默認情況下,Spark中的安全性處于關閉狀態。這意味著您默認情況下容易受到攻擊。在下載和運行Spark之前,請參閱[Spark Security](https://spark.apache.org/docs/latest/security.html)。 # 使用 Spark Shell 進行交互式分析 ## 基礎 Spark shell 提供了一種來學習該 API 比較簡單的方式,以及一個強大的來分析數據交互的工具。在 Scala(運行于 Java 虛擬機之上,并能很好的調用已存在的 Java 類庫)或者 Python 中它是可用的。通過在 Spark 目錄中運行以下的命令來啟動它: ``` ./bin/spark-shell ``` Spark 的主要抽象是一個稱為 Dataset 的分布式的 item 集合。Datasets 可以從 Hadoop 的 InputFormats(例如 HDFS文件)或者通過其它的 Datasets 轉換來創建。讓我們從 Spark 源目錄中的 README 文件來創建一個新的 Dataset: ``` scala> val textFile = spark.read.textFile("README.md") textFile: org.apache.spark.sql.Dataset[String] = [value: string] ``` 您可以直接從 Dataset 中獲取 values(值),通過調用一些 actions(動作),或者 transform(轉換)Dataset 以獲得一個新的。更多細節,請參閱 _[API doc](api/scala/index.html#org.apache.spark.sql.Dataset)_。 ``` scala> textFile.count() // Number of items in this Dataset res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs scala> textFile.first() // First item in this Dataset res1: String = # Apache Spark ``` 現在讓我們 transform 這個 Dataset 以獲得一個新的 。我們調用 `filter` 以返回一個新的 Dataset,它是文件中的 items 的一個子集。 ``` scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string] ``` 我們可以鏈式操作 transformation(轉換)和 action(動作): ``` scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"? res3: Long = 15 ``` ``` ./bin/pyspark ``` Spark有一個主要的抽象概念叫做 Dataset 的分布式集合類。Dataset 可以從Hadoop InputFormats(例如HDFS文件)或通過 transforming 其他數據集來創建數據集。 由于Python的動態特性,我們不需要在Python中定義強類型的 Dataset。 因此,Python中的所有數據集都是 Dataset[Row],我們稱之為"DataFrame" 來與 Pandas 和R中的數據框概念一致。讓我們從Spark源文件中的README文件中創建一個新的 DataFrame 目錄: ``` >>> textFile = spark.read.text("README.md") ``` 您可以通過調用某些action直接從DataFrame獲取值,也可以transform DataFrame以獲取新的DataFrame。 有關詳細信息,請閱讀 _[API doc](api/python/index.html#pyspark.sql.DataFrame)_. ``` >>> textFile.count() # 這個DataFrame 有多少行 126 >>> textFile.first() # DataFrame的第一行 Row(value=u'# Apache Spark') ``` 現在讓我們 transform(轉換) 這個DataFrame來獲得一個新的DataFrame. 我們調用 `filter` 方法來返回文件中的一個子集. ``` >>> linesWithSpark = textFile.filter(textFile.value.contains("Spark")) ``` 我們可以把 transform 和 acition 連在一起用: ``` >>> textFile.filter(textFile.value.contains("Spark")).count() # 統計文件中 "Spark" 字符串有多少個 15 ``` ## Dataset 上的更多操作 Dataset actions(操作)和 transformations(轉換)可以用于更復雜的計算。例如,統計出現次數最多的行 : ``` scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) res4: Long = 15 ``` 第一個 map 操作創建一個新的 Dataset,將一行數據 map 為一個整型值。在 Dataset 上調用 `reduce` 來找到最大的行計數。參數 `map` 與 `reduce` 是 Scala 函數(closures),并且可以使用 Scala/Java 庫的任何語言特性。例如,我們可以很容易地調用函數聲明,我們將定義一個 max 函數來使代碼更易于理解 : ``` scala> import java.lang.Math import java.lang.Math scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b)) res5: Int = 15 ``` 一種常見的數據流模式是被 Hadoop 所推廣的 MapReduce。Spark 可以很容易實現 MapReduce: ``` scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count() wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint] ``` 在這里,我們調用了 `flatMap` 以 transform 一個 lines 的 Dataset 為一個 words 的 Dataset,然后結合 `groupByKey` 和 `count` 來計算文件中每個單詞的 counts 作為一個 (String, Long) 的 Dataset pairs。要在 shell 中收集 word counts,我們可以調用 `collect`: ``` scala> wordCounts.collect() res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...) ``` ``` >>> from pyspark.sql.functions import * >>> textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords"))).collect() [Row(max(numWords)=15)] ``` 這首先將一行映射為一個整數值并且別名為 "numWords" ,從中創建一個新的DataFrame. 在該 DataFrame 上調用 `agg` 函數是為了找到最大詞數(word count) . `select` 和 `agg` 的參數都是 _[Column](api/python/index.html#pyspark.sql.Column)_ 里的,我們也可以通過 `df.colName` 來獲得該 DataFrame 的一列. 我們也可以導入 pyspark.sql.functions 來提供了許多方便的功能來從舊的列構建一個新的列, 一個常見的數據流模式是MapReduce,由Hadoop推廣。 Spark可以輕松實現MapReduce流程: ``` >>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).as("word")).groupBy("word").count() ``` 在這里我們在 `select` 中使用 `explode` 函數來將一個 Dataset 的所有行轉換成一個詞的數據集,然后組合使用 `groupBy` 和 `count` 來計算文件中各個單詞的計數作為 DataFrame 的兩列: "word" 和 "count".要在我們的shell中統計單詞的詞頻,我們可以調用 `collect`: ``` >>> wordCounts.collect() [Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...] ``` ## 緩存 Spark 還支持 Pulling(拉取)數據集到一個群集范圍的內存緩存中。例如當查詢一個小的 “hot” 數據集或運行一個像 PageRANK 這樣的迭代算法時,在數據被重復訪問時是非常高效的。舉一個簡單的例子,讓我們標記我們的 `linesWithSpark` 數據集到緩存中: ``` scala> linesWithSpark.cache() res7: linesWithSpark.type = [value: string] scala> linesWithSpark.count() res8: Long = 15 scala> linesWithSpark.count() res9: Long = 15 ``` 使用 Spark 來探索和緩存一個 100 行的文本文件看起來比較愚蠢。有趣的是,即使在他們跨越幾十或者幾百個節點時,這些相同的函數也可以用于非常大的數據集。您也可以像 [編程指南](rdd-programming-guide.html#using-the-shell). 中描述的一樣通過連接 `bin/spark-shell` 到集群中,使用交互式的方式來做這件事情。 ``` >>> linesWithSpark.cache() >>> linesWithSpark.count() 15 >>> linesWithSpark.count() 15 ``` 使用Spark探索和緩存100行文本文件似乎很愚蠢. T有趣的是,這些相同的功能可用于非常大的數據集,即使它們跨越數十個或數百個節點交錯著, 你也可以通過`bin/pyspark` 連接到集群來進行交互,詳細描述在 [RDD programming guide](rdd-programming-guide.html#using-the-shell). # 獨立的應用 假設我們希望使用 Spark API 來創建一個獨立的應用程序。我們在 Scala(SBT),Java(Maven)和 Python 中練習一個簡單應用程序。 我們將在 Scala 中創建一個非常簡單的 Spark 應用程序 - 很簡單的,事實上,它名為 `SimpleApp.scala`: ``` /* SimpleApp.scala */ import org.apache.spark.sql.SparkSession object SimpleApp { def main(args: Array[String]) { val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system val spark = SparkSession.builder.appName("Simple Application").getOrCreate() val logData = spark.read.textFile(logFile).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println(s"Lines with a: $numAs, Lines with b: $numBs") spark.stop() } } ``` 注意,這個應用程序我們應該定義一個 `main()` 方法而不是去擴展 `scala.App`。使用 `scala.App` 的子類可能不會正常運行。 該程序僅僅統計了 Spark README 文件中每一行包含 ‘a’ 的數量和包含 ‘b’ 的數量。注意,您需要將 YOUR_SPARK_HOME 替換為您 Spark 安裝的位置。不像先前使用 spark shell 操作的示例,它們初始化了它們自己的 SparkContext,我們初始化了一個 SparkContext 作為應用程序的一部分。 我們調用 `SparkSession.builder` 以構造一個 [[SparkSession]],然后設置 application name(應用名稱),最終調用 `getOrCreate` 以獲得 [[SparkSession]] 實例。 我們的應用依賴了 Spark API,所以我們將包含一個名為 `build.sbt` 的 sbt 配置文件,它描述了 Spark 的依賴。該文件也會添加一個 Spark 依賴的 repository: ``` name := "Simple Project" version := "1.0" scalaVersion := "2.12.8" libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4" ``` 為了讓 sbt 正常的運行,我們需要根據經典的目錄結構來布局 `SimpleApp.scala` 和 `build.sbt` 文件。在成功后,我們可以創建一個包含應用程序代碼的 JAR 包,然后使用 `spark-submit` 腳本來運行我們的程序。 ``` # Your directory layout should look like this $ find . . ./build.sbt ./src ./src/main ./src/main/scala ./src/main/scala/SimpleApp.scala # Package a jar containing your application $ sbt package ... [info] Packaging {..}/{..}/target/scala-2.12/simple-project_2.12-1.0.jar # Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --class "SimpleApp" \ --master local[4] \ target/scala-2.12/simple-project_2.12-1.0.jar ... Lines with a: 46, Lines with b: 23 ``` 這個例子使用Maven來編譯成一個jar應用程序,其他的構建系統(如Ant、Gradle,譯者注)也可以。 我們會創建一個非常簡單的Spark應用,`SimpleApp.java`: ``` /* SimpleApp.java */ import org.apache.spark.sql.SparkSession; public class SimpleApp { public static void main(String[] args) { String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate(); Dataset<String> logData = spark.read.textFile(logFile).cache(); long numAs = logData.filter(s -> s.contains("a")).count(); long numBs = logData.filter(s -> s.contains("b")).count(); System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs); spark.stop(); } } ``` 這個程序計算Spark README文檔中包含字母’a’和字母’b’的行數。注意把YOUR_SPARK_HOME修改成你的Spark的安裝目錄。 跟之前的Spark shell不同,我們需要初始化SparkSession。 把Spark依賴添加到Maven的`pom.xml`文件里。 注意Spark的artifacts使用Scala版本進行標記。 ``` <project> <groupId>edu.berkeley</groupId> <artifactId>simple-project</artifactId> <modelVersion>4.0.0</modelVersion> <name>Simple Project</name> <packaging>jar</packaging> <version>1.0</version> <dependencies> <dependency> <!-- Spark dependency --> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>2.4.4</version> <scope>provided</scope> </dependency> </dependencies> </project> ``` 我們按照Maven經典的目錄結構組織這些文件: ``` $ find . ./pom.xml ./src ./src/main ./src/main/java ./src/main/java/SimpleApp.java ``` 現在我們用Maven打包這個應用,然后用`./bin/spark-submit`執行它。 ``` # 打包包含應用程序的JAR $ mvn package ... [INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar # 用spark-submit來運行程序 $ YOUR_SPARK_HOME/bin/spark-submit \ --class "SimpleApp" \ --master local[4] \ target/simple-project-1.0.jar ... Lines with a: 46, Lines with b: 23 ``` 現在我們來展示如何用python API 來寫一個應用 (pyspark). 如果要構建打包的PySpark應用程序或庫,則可以添加以下內容到setup.py文件中: ``` install_requires=[ 'pyspark=={site.SPARK_VERSION}' ] ``` 我們以一個簡單的例子為例,創建一個簡單的pyspark 應用 `SimpleApp.py`: ``` """SimpleApp.py""" from pyspark.sql import SparkSession logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system spark = SparkSession.builder().appName(appName).master(master).getOrCreate() logData = spark.read.text(logFile).cache() numAs = logData.filter(logData.value.contains('a')).count() numBs = logData.filter(logData.value.contains('b')).count() print("Lines with a: %i, lines with b: %i" % (numAs, numBs)) spark.stop() ``` 該程序只是統計計算在該文本中包含a字母和包含b字母的行數. 請注意你需要將 YOUR_SPARK_HOME 替換成你的spark路徑.就像scala 示例和java示例一樣,我們使用 SparkSession 來創建數據集, 對于使用自定義類護著第三方庫的應用程序,我們還可以通過 `spark-submit` 帶著 `--py-files` 來添加代碼依賴 , 我們也可以通過把代碼打成zip包來進行依賴添加 (詳細請看 `spark-submit --help` ). `SimpleApp` 是個簡單的例子我們不需要添加特別的代碼或自定義類. 我們可以通過 `bin/spark-submit` 腳本來運行應用: ``` # Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --master local[4] \ SimpleApp.py ... Lines with a: 46, Lines with b: 23 ``` 如果您的環境中已安裝PySpark pip(例如pip install pyspark),則可以使用常規Python解釋器運行應用程序,也可以根據需要使用前面的“ spark-submit”。 ``` # Use the Python interpreter to run your application $ python SimpleApp.py ... Lines with a: 46, Lines with b: 23 ``` # 快速跳轉 恭喜您成功的運行了您的第一個 Spark 應用程序! * 更多 API 的深入概述,從 [RDD programming guide](rdd-programming-guide.html) 和 [SQL programming guide](sql-programming-guide.html) 這里開始,或者看看 “編程指南” 菜單中的其它組件。 * 為了在集群上運行應用程序,請前往 [deployment overview](cluster-overview.html). * 最后,在 Spark 的 `examples` 目錄中包含了一些 ([Scala](https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples),[Java](https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples),[Python](https://github.com/apache/spark/tree/master/examples/src/main/python),[R](https://github.com/apache/spark/tree/master/examples/src/main/r)) 示例。您可以按照如下方式來運行它們: ``` # 針對 Scala 和 Java,使用 run-example: ./bin/run-example SparkPi # 針對 Python 示例,直接使用 spark-submit: ./bin/spark-submit examples/src/main/python/pi.py # 針對 R 示例,直接使用 spark-submit: ./bin/spark-submit examples/src/main/r/dataframe.R ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看