<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                > 原文出處:https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice3/ > 作者:王 龍, 軟件開發工程師, IBM # 使用Spark SQL 對結構化數據進行統計分析 本文將通過兩個例子向讀者展示如何使用 Spark SQL/DataFrame API 編寫應用程序來對結構化的大數據進行統計分析,并且還會通過分析程序運行日志以及利用 Spark Web Console 向讀者介紹 Spark 應用程序運行的基本過程和原理。通過本文的閱讀,讀者將會對 Spark SQL 模塊有較為深入的認識和理解。 [TOC=2,3] ## 引言 在很多領域,如電信,金融等,每天都會產生大量的結構化數據,當數據量不斷變大,傳統的數據存儲 (DBMS) 和計算方式 (單機程序) 已經不能滿足企業對數據存儲,統計分析以及知識挖掘的需要。在過去的數年里,傳統的軟件開發和維護人員已經積累了大量的基于 DBMS 的操作數據知識和經驗,他們已經習慣了通過編寫 SQL 語句來對數據記錄進行統計分析。于是大數據工程師們開始探索如何使用類 SQL 的方式來操作和分析大數據,通過大量的努力,目前業界已經出現很多 SQL on Hadoop 的方案,如 Hive, Impala 等。Spark SQL 就是其中的一個,實際上 Spark SQL 并不是一開始就存在于 Spark 生態系統里的,它的前身是 Shark。隨著 Spark 自身的發展,Spark 團隊開始試圖放棄 Shark 這個對于 Hive 有太多依賴 (查詢優化,語法解析) 的東西,于是才有了 Spark SQL 這個全新的模塊,通過幾個版本的發展,目前 Spark SQL 已經趨于穩定,功能也逐漸豐富。本文將以 Spark1.4.1 版本為基礎,由淺入深地向讀者介紹 Spark SQL/DataFrame 的基本概念和原理,并且通過實例向讀者展示如何使用 Spark SQL/DataFrame API 開發應用程序。接下來,就讓我們開始 Spark SQL 的體驗之旅吧。 ## 關于 Spark SQL/DataFrame Spark SQL 是 Spark 生態系統里用于處理結構化大數據的模塊,該模塊里最重要的概念就是 DataFrame, 相信熟悉 R 語言的工程師對此并不陌生。Spark 的 DataFrame 是基于早期版本中的 SchemaRDD,所以很自然的使用分布式大數據處理的場景。Spark DataFrame 以 RDD 為基礎,但是帶有 Schema 信息,它類似于傳統數據庫中的二維表格。 Spark SQL 模塊目前支持將多種外部數據源的數據轉化為 DataFrame,并像操作 RDD 或者將其注冊為臨時表的方式處理和分析這些數據。當前支持的數據源有: * Json * 文本文件 * RDD * 關系數據庫 * Hive * Parquet 一旦將 DataFrame 注冊成臨時表,我們就可以使用類 SQL 的方式操作這些數據,我們將在下文的案例中詳細向讀者展示如何使用 Spark SQL/DataFrame 提供的 API 完成數據讀取,臨時表注冊,統計分析等步驟。 ## 案例介紹與編程實現 ### 案例一 **a.案例描述與分析** 本案例中,我們將使用 Spark SQL 分析包含 5 億條人口信息的結構化數據,數據存儲在文本文件上,總大小為 7.81G。文件總共包含三列,第一列是 ID,第二列是性別信息 (F -> 女,M -> 男),第三列是人口的身高信息,單位是 cm。實際上這個文件與我們在本系列文章第一篇中的案例三使用的格式是一致的,讀者可以參考相關章節,并使用提供的測試數據生成程序,生成 5 億條數據,用于本案例中。為了便于讀者理解,本案例依然把用于分析的文本文件的內容片段貼出來,具體格式如下。 圖 1\. 案例一測試數據格式預覽 ![圖 1\. 案例一測試數據格式預覽](https://box.kancloud.cn/2015-09-23_56023f0d11e54.png) 生成該測試文件后,讀者需要把文件上傳到 HDFS 上,您可以選擇使用 HDFS shell 命令或者 HDSF 的 eclipse 插件。上傳到 HDFS 后,我們可以通過訪問 HDFS web console(http://namenode:50070),查看文件具體信息。 圖 2\. 案例一測試數據文件基本信息 ![圖 2\. 案例一測試數據文件基本信息](https://box.kancloud.cn/2015-09-23_56023f0f473a4.png) 本例中,我們的統計任務如下: * 用 SQL 語句的方式統計男性中身高超過 180cm 的人數。 * 用 SQL 語句的方式統計女性中身高超過 170cm 的人數。 * 對人群按照性別分組并統計男女人數。 * 用類 RDD 轉換的方式對 DataFrame 操作來統計并打印身高大于 210cm 的前 50 名男性。 * 對所有人按身高進行排序并打印前 50 名的信息。 * 統計男性的平均身高。 * 統計女性身高的最大值。 讀者可以看到,上述統計任務中有些是相似的,但是我們要用不同的方式實現它,以向讀者展示不同的語法。 **b.編碼實現** 清單 1\. 案例一示例程序源代碼 ~~~ import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.types._ import org.apache.spark.sql.Row import org.apache.spark.rdd.RDD object PeopleDataStatistics2 { private val schemaString = "id,gender,height" def main(args: Array[String]) { if (args.length < 1) { println("Usage:PeopleDataStatistics2 filePath") System.exit(1) } val conf = new SparkConf().setAppName("Spark Exercise:People Data Statistics 2") val sc = new SparkContext(conf) val peopleDataRDD = sc.textFile(args(0)) val sqlCtx = new SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlCtx.implicits._ val schemaArray = schemaString.split(",") val schema = StructType(schemaArray.map(fieldName = > StructField(fieldName, StringType, true))) val rowRDD: RDD[Row] = peopleDataRDD.map(_.split(" ")).map( eachRow => Row(eachRow(0), eachRow(1), eachRow(2))) val peopleDF = sqlCtx.createDataFrame(rowRDD, schema) peopleDF.registerTempTable("people") //get the male people whose height is more than 180 val higherMale180 = sqlCtx.sql("select id,gender, height from people where height > 180 and gender='M'") println("Men whose height are more than 180: " + higherMale180.count()) println("<Display #1>") //get the female people whose height is more than 170 val higherFemale170 = sqlCtx.sql("select id,gender, height from people where height > 170 and gender='F'") println("Women whose height are more than 170: " + higherFemale170.count()) println("<Display #2>") //Grouped the people by gender and count the number peopleDF.groupBy(peopleDF("gender")).count().show() println("People Count Grouped By Gender") println("<Display #3>") // peopleDF.filter(peopleDF("gender").equalTo("M")).filter( peopleDF("height") > 210).show(50) println("Men whose height is more than 210") println("<Display #4>") // peopleDF.sort($"height".desc).take(50).foreach { row => println(row(0) + "," + row(1) + "," + row(2)) } println("Sorted the people by height in descend order,Show top 50 people") println("<Display #5>") // peopleDF.filter(peopleDF("gender").equalTo("M")).agg(Map("height" -> "avg")).show() println("The Average height for Men") println("<Display #6>") // peopleDF.filter(peopleDF("gender").equalTo("F")).agg("height" -> "max").show() println("The Max height for Women:") println("<Display #7>") //...... println("All the statistics actions are finished on structured People data.") } } ~~~ **c.提交并運行** 編碼完成后,把項目打成 jar 包,在這里,我們將源碼打成名為 spark_exercise-1.0.jar, 筆者使用 Maven 來管理項目,也建議讀者可以嘗試下 Maven 管理您的 Scala 項目。 清單 2\. 案例一示例程序執行命令 ~~~ ./spark-submit --class com.ibm.spark.exercise.sql.PeopleDataStatistics2 \ --master spark://<spark_master_node_ip>:7077 \ --driver-memory 8g \ --executor-memory 2g \ --total-executor-cores 12 \ /home/fams/spark_exercise-1.0.jar \ hdfs://<hdfs_namenode_ip>:9000/ user/fams/inputfiles/sample_people_info2.txt ~~~ **d.監控執行過程** 在提交后,我們可以在 Spark web console(http://:8080) 中監控程序執行過程。下面我們將分別向讀者展示如何監控程序產生的 Jobs,Stages,以及 D 可視化的查看 DAG 信息。 圖 3\. 案例一程序監控圖 1 ![圖 3\. 案例一程序監控圖 1](https://box.kancloud.cn/2015-09-23_56023f13ee9a6.png) 圖 4\. 案例一程序監控圖 2 ![圖 4\. 案例一程序監控圖 2](https://box.kancloud.cn/2015-09-23_56023f15da997.png) 圖 5\. 案例一程序監控圖 3 ![圖 5\. 案例一程序監控圖 3](https://box.kancloud.cn/2015-09-23_56023f16e67b2.png) 圖 6\. 案例一程序監控圖 4 ![圖 6\. 案例一程序監控圖 4](https://box.kancloud.cn/2015-09-23_56023f17f1396.png) 其實在 Spark web console 中還可以查看很多信息,如運行環境信息,Executor 進程信息,讀者可以在界面上一一查看,在這里不再贅述。 **e.運行結果** 圖 7\. 案例一程序運行結果 (部分) ![圖 7\. 案例一程序運行結果 (部分)](https://box.kancloud.cn/2015-09-23_56023f18e7547.png) ### 案例二 **a.案例描述與分析** 在案例一中,我們將存儲于 HDFS 中的文件轉換成 DataFrame 并進行了一系列的統計,細心的讀者會發現,都是一個關聯于一個 DataFrame 的簡單查詢和統計,那么在案例二中,我們將向讀者展示如何連接多個 DataFrame 做更復雜的統計分析。 在本案例中,我們將統計分析 1 千萬用戶和 1 億條交易數據。對于用戶數據,它是一個包含 6 個列 (ID, 性別, 年齡, 注冊日期, 角色 (從事行業), 所在區域) 的文本文件,具有以下格式。 圖 8\. 案例二測試用戶數據格式預覽 ![圖 8\. 案例二測試用戶數據格式預覽](https://box.kancloud.cn/2015-09-23_56023f1af381a.png) 我們使用以下 Scala 程序來生成本案例所需的測試數據。 清單 3\. 案例二用戶測試數據生成類源代碼 ~~~ import java.io.{File, FileWriter} import scala.util.Random object UserDataGenerator { private val FILE_PATH = "C:\\LOCAL_DISK_D\\sample_user_data.txt" private val ROLE_ID_ARRAY = Array[String]("ROLE001","ROLE002","ROLE003","ROLE004","ROLE005") private val REGION_ID_ARRAY = Array[String]("REG001","REG002","REG003","REG004","REG005") private val MAX_USER_AGE = 60 //how many records to be generated private val MAX_RECORDS = 10000000 def main(args:Array[String]): Unit = { generateDataFile(FILE_PATH , MAX_RECORDS) } private def generateDataFile(filePath : String, recordNum: Int): Unit = { var writer:FileWriter = null try { writer = new FileWriter(filePath,true) val rand = new Random() for (i <- 1 to recordNum) { //generate the gender of the user var gender = getRandomGender // var age = rand.nextInt(MAX_USER_AGE) if (age < 10) { age = age + 10 } //generate the registering date for the user var year = rand.nextInt(16) + 2000 var month = rand.nextInt(12)+1 //to avoid checking if it is a valid day for specific month //we always generate a day which is no more than 28 var day = rand.nextInt(28) + 1 var registerDate = year + "-" + month + "-" + day //generate the role of the user var roleIndex:Int = rand.nextInt(ROLE_ID_ARRAY.length) var role = ROLE_ID_ARRAY(roleIndex) //generate the region where the user is var regionIndex:Int = rand.nextInt(REGION_ID_ARRAY.length) var region = REGION_ID_ARRAY(regionIndex) writer.write(i + " " + gender + " " + age + " " + registerDate + " " + role + " " + region) writer.write(System.getProperty("line.separator")) } writer.flush() } catch { case e:Exception => println("Error occurred:" + e) } finally { if (writer != null) writer.close() } println("User Data File generated successfully.") } private def getRandomGender() :String = { val rand = new Random() val randNum = rand.nextInt(2) + 1 if (randNum % 2 == 0) { "M" } else { "F" } } } ~~~ 對于交易數據,它是一個包含 5 個列 (交易單號, 交易日期, 產品種類, 價格, 用戶 ID) 的文本文件,具有以下格式。 圖 9\. 案例二測試交易數據格式預覽 ![圖 9\. 案例二測試交易數據格式預覽](https://box.kancloud.cn/2015-09-23_56023f1c96e1e.png) 對于交易數據,我們使用以下 Scala 程序來生成。 清單 4\. 案例二交易測試數據生成類源代碼 ~~~ import java.io.{File, FileWriter} import scala.util.Random object ConsumingDataGenerator { private val FILE_PATH = "C:\\LOCAL_DISK_D\\sample_consuming_data.txt" // how many records to be generated private val MAX_RECORDS = 100000000 // we suppose only 10 kinds of products in the consuming data private val PRODUCT_ID_ARRAY = Array[Int](1,2,3,4,5,6,7,8,9,10) // we suppose the price of most expensive product will not exceed 2000 RMB private val MAX_PRICE = 2000 // we suppose the price of cheapest product will not be lower than 10 RMB private val MIN_PRICE = 10 //the users number which should be same as the one in UserDataGenerator object private val USERS_NUM = 10000000 def main(args:Array[String]): Unit = { generateDataFile(FILE_PATH,MAX_RECORDS); } private def generateDataFile(filePath : String, recordNum: Int): Unit = { var writer:FileWriter = null try { writer = new FileWriter(filePath,true) val rand = new Random() for (i <- 1 to recordNum) { //generate the buying date var year = rand.nextInt(16) + 2000 var month = rand.nextInt(12)+1 //to avoid checking if it is a valid day for specific // month,we always generate a day which is no more than 28 var day = rand.nextInt(28) + 1 var recordDate = year + "-" + month + "-" + day //generate the product ID var index:Int = rand.nextInt(PRODUCT_ID_ARRAY.length) var productID = PRODUCT_ID_ARRAY(index) //generate the product price var price:Int = rand.nextInt(MAX_PRICE) if (price == 0) { price = MIN_PRICE } // which user buys this product val userID = rand.nextInt(10000000)+1 writer.write(i + " " + recordDate + " " + productID + " " + price + " " + userID) writer.write(System.getProperty("line.separator")) } writer.flush() } catch { case e:Exception => println("Error occurred:" + e) } finally { if (writer != null) writer.close() } println("Consuming Data File generated successfully.") } } ~~~ **b.編碼實現** 清單 5\. 案例二示例程序源代碼 ~~~ import org.apache.spark.sql.SQLContext import org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkContext, SparkConf} //define case class for user case class User(userID: String, gender: String, age: Int, registerDate: String,role: String, region: String) //define case class for consuming data case class Order(orderID: String, orderDate: String, productID: Int, price: Int, userID: String) object UserConsumingDataStatistics { def main(args: Array[String]) { if (args.length < 1) { println("Usage:UserConsumingDataStatistics userDataFilePath consumingDataFilePath") System.exit(1) } val conf = new SparkConf().setAppName("Spark Exercise:User Consuming Data Statistics") //Kryo serializer is more quickly by default java serializer conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val ctx = new SparkContext(conf) val sqlCtx = new SQLContext(ctx) import sqlCtx.implicits._ //Convert user data RDD to a DataFrame and register it as a temp table val userDF = ctx.textFile(args(0)).map(_.split(" ")).map( u => User(u(0), u(1), u(2).toInt,u(3),u(4),u(5))).toDF() userDF.registerTempTable("user") //Convert consuming data RDD to a DataFrame and register it as a temp table val orderDF = ctx.textFile(args(1)).map(_.split(" ")).map(o => Order( o(0), o(1), o(2).toInt,o(3).toInt,o(4))).toDF() orderDF.registerTempTable("orders") //cache the DF in memory with serializer should make the program run much faster userDF.persist(StorageLevel.MEMORY_ONLY_SER) orderDF.persist(StorageLevel.MEMORY_ONLY_SER) //The number of people who have orders in the year 2015 val count = orderDF.filter(orderDF("orderDate").contains("2015")).join( userDF, orderDF("userID").equalTo(userDF("userID"))).count() println("The number of people who have orders in the year 2015:" + count) //total orders produced in the year 2014 val countOfOrders2014 = sqlCtx.sql("SELECT * FROM orders where orderDate like '2014%'").count() println("total orders produced in the year 2014:" + countOfOrders2014) //Orders that are produced by user with ID 1 information overview val countOfOrdersForUser1 = sqlCtx.sql("SELECT o.orderID,o.productID, o.price,u.userID FROM orders o,user u where u.userID = 1 and u.userID = o.userID").show() println("Orders produced by user with ID 1 showed.") //Calculate the max,min,avg prices for the orders that are producted by user with ID 10 val orderStatsForUser10 = sqlCtx.sql("SELECT max(o.price) as maxPrice, min(o.price) as minPrice,avg(o.price) as avgPrice,u.userID FROM orders o, user u where u.userID = 10 and u.userID = o.userID group by u.userID") println("Order statistic result for user with ID 10:") orderStatsForUser10.collect().map(order => "Minimum Price=" + order.getAs("minPrice") + ";Maximum Price=" + order.getAs("maxPrice") + ";Average Price=" + order.getAs("avgPrice") ).foreach(result => println(result)) } } ~~~ **c.提交并執行** ~~~ ./spark-submit –class com.ibm.spark.exercise.sql.UserConsumingDataStatistics \ --master spark://<spark_master_node_ip>:7077 \ --num-executors 6 \ --driver-memory 8g \ --executor-memory 2g \ --executor-cores 2 \ /home/fams/spark_exercise-1.0.jar \ hdfs://<hdfs_namenode_ip>:9000/user/fams/inputfiles/sample_user_data.txt \ hdfs://<hdfs_namenode_ip>:9000/user/fams/inputfiles/sample_consuming_data.txt ~~~ **d.監控執行過程** 程序提交后,讀者可以用案例一描述的方式在 Spark web console 監控執行過程,這樣也能幫助您深入的理解 Spark SQL 程序的執行過程。 **e.運行結果** 圖 10\. 案例二程序運行結果 (部分) ![圖 10\. 案例二程序運行結果 (部分)](https://box.kancloud.cn/2015-09-23_56023f1d3ed2d.png) ## 總結 關于 Spark SQL 程序開發,我們通常有以下需要注意的地方。 1. Spark SQL 程序開發過程中,我們有兩種方式確定 schema,第一種是反射推斷 schema,如本文的案例二,這種方式下,我們需要定義樣本類 (case class) 來對應數據的列;第二種方式是通過編程方式來確定 schema,這種方式主要是通過 Spark SQL 提供的 StructType 和 StructField 等 API 來編程實現,這種方式下我們不需要定義樣本類,如本文中的案例一。 在程序實現中,我們需要使用![](https://box.kancloud.cn/2015-09-23_56023f1e156f6.png)以便隱式的把 RDD 轉化成 DataFrame 來操作。 2. 本文展示的 DataFrame API 使用的方法只是其中很小的一部分,但是一旦讀者掌握了開發的基本流程,就能通過參考?[DataFrame API 文檔](http://spark.apache.org/docs/1.4.1/api/scala/index.html#org.apache.spark.sql.DataFrame)寫出更為復雜的程序。 3. 通常來說,我們有兩種方式了解 Spark 程序的執行流程。第一種是通過在控制臺觀察輸出日志,另一種則更直觀,就是通過 Spark Web Console 來觀察 Driver 程序里各個部分產生的 job 信息以及 job 里包含的 stages 信息。 4. 需要指出的是,熟練的掌握 Spark SQL/DataFrame 的知識對學習最新的 Spark 機器學習庫 ML Pipeline 至關重要,因為 ML Pipeline 使用 DataFrame 作為數據集來支持多種的數據類型。 5. 筆者在測試的過程中發現,處理相同的數據集和類似的操作,Spark SQL/DataFrame 比傳統的 RDD 轉換操作具有更好的性能。這是由于 SQL 模塊的 Catalyst 對用戶 SQL 做了很好的查詢優化。在以后的文章中會向讀者詳細的介紹該組件。 ## 結束語 本文通過兩個案例向讀者詳細的介紹了使用 Spark SQL/DataFrame 處理結構化數據的過程,限于篇幅,我們并沒有在文中向讀者詳細介紹 Spark SQL/DataFrame 程序的執行流程,以及 Catalyst SQL 解析和查詢優化引擎。這個將會在本系列后面的文章中介紹。其實文中提供的測試數據還可以用來做更為復雜的 Spark SQL 測試,讀者可以基于本文,進行更多的工作。需要指出的是,由于我們用到的數據是靠程序隨機生成的,所以部分數據難免有不符合實際的情況,讀者應該關注在使用 Spark SQL/DataFrame 處理這些數據的過程。最后,感謝您耐心的閱讀本文,如果您有任何問題或者想法,請在文末留言,我們可以進行深入的討論。讓我們互相學習,共同進步。 ## 參考資料 ### 學習 * 參考[Spark SQL/DataFrame 官網文檔](http://spark.apache.org/docs/latest/sql-programming-guide.html),了解 Spark SQL/DataFrame 的基本原理和編程模型。 * 參考[Spark Scala API 文檔](http://spark.apache.org/docs/1.4.1/api/scala/index.html#package),了解 Spark SQL/DataFrame 相關 API 的使用。 * [developerWorks 開源技術主題](http://www.ibm.com/developerworks/cn/opensource/):查找豐富的操作信息、工具和項目更新,幫助您掌握開源技術并將其用于 IBM 產品。 ### 討論 * 加入?[developerWorks 中文社區](http://www.ibm.com/developerworks/cn/community/),查看開發人員推動的博客、論壇、組和維基,并與其他 developerWorks 用戶交流。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看