<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                # HBase 與 Spark > 貢獻者:[TsingJyujing](https://github.com/TsingJyujing) [Apache Spark](https://spark.apache.org/) 是一個分布式的、用于在內存中處理數據的軟件框架,在許多場景中用于代替 MapReduce。 Spark 本身已經超出了本文檔的范圍,請參考 Spark 的項目及子項目的網站來獲取更多信息。本文檔將會集中在 4 個主要的 HBase 和 Spark 交互的要點上,這四點分別是: 基礎 Spark 這可以在 Spark DAG 中的任意一點使用 HBase Connection。 Spark Streaming 這可以在 Spark Streaming 應用中的任意一點使用 HBase Connection。 Spark 批量加載 這可以允許在批量插入 HBase 的時候直接寫 HBase 的 HFiles。 SparkSQL/DataFrames 這將提供為 HBase 中定義的表提供寫 SparkSQL 的能力。 下面的部分將會用幾個例子來說明上面幾點交互。 ## 104\. 基礎 Spark 這一部分將會在最底層和最簡單的等級討論 HBase 與 Spark 的整合。其他交互的要點都是基于這些操作構建的,我們會在這里完整描述。 一切 HBase 和 Spark 整合的基礎都是 HBaseContext,HBaseContext 接受 HBase 配置并且會將其推送到 Spark 執行器(executor)中。這允許我們在每個 Spark 執行器(executor)中有一個靜態的 HBase 連接。 作為參考,Spark 執行器(executor)既可以和 Region Server 在同一個節點,也可以在不同的節點,他們不存在共存的依賴關系。 可以認為每個 Spark 執行器(executor)都是一個多線程的客戶端程序,這允許運行在不同的執行器上的 Spark 任務訪問共享的連接對象。 例 31\. HBaseContext 使用例程 這個例子展現了如何使用 Scala 語言在 RDD 的`foreachPartition`方法中使用 HBaseContext。 ``` val sc = new SparkContext("local", "test") val config = new HBaseConfiguration() ... val hbaseContext = new HBaseContext(sc, config) rdd.hbaseForeachPartition(hbaseContext, (it, conn) => { val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1")) it.foreach((putRecord) => { . val put = new Put(putRecord._1) . putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3)) . bufferedMutator.mutate(put) }) bufferedMutator.flush() bufferedMutator.close() }) ``` 這里是使用 Java 編寫的同樣的例子。 ``` JavaSparkContext jsc = new JavaSparkContext(sparkConf); try { List<byte[]> list = new ArrayList<>(); list.add(Bytes.toBytes("1")); ... list.add(Bytes.toBytes("5")); JavaRDD<byte[]> rdd = jsc.parallelize(list); Configuration conf = HBaseConfiguration.create(); JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); hbaseContext.foreachPartition(rdd, new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() { public void call(Tuple2<Iterator<byte[]>, Connection> t) throws Exception { Table table = t._2().getTable(TableName.valueOf(tableName)); BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName)); while (t._1().hasNext()) { byte[] b = t._1().next(); Result r = table.get(new Get(b)); if (r.getExists()) { mutator.mutate(new Put(b)); } } mutator.flush(); mutator.close(); table.close(); } }); } finally { jsc.stop(); } ``` 所有的函數式都同時在 Spark 和 HBase 中,并且都支持用 Scala 或者 Java 開發。除了 SparkSQL 以外,所有 Spark 支持的語言在這里也都支持。 目前在余下的文檔中,我們將會重點關注 Scala 的例程。 上面的例程闡釋了如何在 foreachPartition 操作中使用連接。除此之外,許多 Spark 的基礎函數都是支持的: `bulkPut` 并行的寫入大量數據到 HBase `bulkDelete` 并行的刪除 HBase 中大量數據 `bulkGet` 并行的從 HBase 中獲取大量的數據,并且創建一個新的 RDD `mapPartition` 在 Spark 的 Map 函數中使用連接對象,并且允許使用完整的 HBase 訪問 `hBaseRDD` 簡單的創建一個用于分布式掃描數據的 RDD 想要參看所有機能的例程,參見 HBase-Spark 模塊。 ## 105\. Spark Streaming [Spark Streaming](https://spark.apache.org/streaming/) 是一個基于 Spark 構建的微批流處理框架。 HBase 和 Spark Streaming 的良好配合使得 HBase 可以提供一下益處: * 可以動態的獲取參考或者描述性數據 * 基于 Spark Streaming 提供的恰好一次處理,可以存儲計數或者聚合結果 HBase-Spark 模塊整合的和 Spark Streaming 的相關的點與 Spark 整合的點非常相似, 以下的指令可以在 Spark Streaming DStream 中立刻使用: `bulkPut` 并行的寫入大量數據到 HBase `bulkDelete` 并行的刪除 HBase 中大量數據 `bulkGet` 并行的從 HBase 中獲取大量的數據,并且創建一個新的 RDD `mapPartition` 在 Spark 的 Map 函數中使用連接對象,并且允許使用完整的 HBase 訪問 `hBaseRDD` 簡單的創建一個用于分布式掃描數據的 RDD。 例 32\. `bulkPut`在 DStreams 中使用的例程 以下是 bulkPut 在 DStreams 中的使用例程,感覺上與 RDD 批量插入非常接近。 ``` val sc = new SparkContext("local", "test") val config = new HBaseConfiguration() val hbaseContext = new HBaseContext(sc, config) val ssc = new StreamingContext(sc, Milliseconds(200)) val rdd1 = ... val rdd2 = ... val queue = mutable.Queue[RDD[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])]]() queue += rdd1 queue += rdd2 val dStream = ssc.queueStream(queue) dStream.hbaseBulkPut( hbaseContext, TableName.valueOf(tableName), (putRecord) => { val put = new Put(putRecord._1) putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3)) put }) ``` 這里到`hbaseBulkPut`函數有三個輸入,hbaseContext 攜帶了配置廣播信息,來幫助我們連接到執行器中的 HBase Connections。 表名用于指明我們要往哪個表放數據。一個函數將 DStream 中的記錄轉換為 HBase Put 對象。 ## 106\. 批量加載 使用 Spark 加載大量的數據到 HBase 有兩個選項。 基本的大量數據加載功能適用于你的行有數百萬列數據,以及在 Spark 批量加載之前的 Map 操作列沒有合并和分組的情況。 Spark 中還有一個輕量批量加載選項,這個第二選項設計給每一行少于一萬的情況。 第二個選項的優勢在于更高的吞吐量,以及 Spark 的 shuffle 操作中更輕的負載。 兩種實現都或多或少的類似 MapReduce 批量加載過程, 因為分區器基于 Region 劃分對行鍵進行分區。并且行鍵被順序的發送到 Reducer 所以 HFile 可以在 reduce 階段被直接寫出。 依照 Spark 的術語來說,批量加載將會基于`repartitionAndSortWithinPartitions`實現,并且之后是 Spark 的`foreachPartition`。 讓我們首先看一下使用批量加載功能的例子 例 33\. 批量加載例程 下面的例子展現了 Spark 中的批量加載。 ``` val sc = new SparkContext("local", "test") val config = new HBaseConfiguration() val hbaseContext = new HBaseContext(sc, config) val stagingFolder = ... val rdd = sc.parallelize(Array( (Bytes.toBytes("1"), (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))), (Bytes.toBytes("3"), (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ... rdd.hbaseBulkLoad(TableName.valueOf(tableName), t => { val rowKey = t._1 val family:Array[Byte] = t._2(0)._1 val qualifier = t._2(0)._2 val value = t._2(0)._3 val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier) Seq((keyFamilyQualifier, value)).iterator }, stagingFolder.getPath) val load = new LoadIncrementalHFiles(config) load.doBulkLoad(new Path(stagingFolder.getPath), conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName))) ``` `hbaseBulkLoad` 函數需要三個必要參數: 1. 我們需要從之加載數據的表名 2. 一個函數用于將 RDD 中的某個記錄轉化為一個元組形式的鍵值對。 其中鍵值是一個 KeyFamilyQualifer 對象,值是 cell value。 KeyFamilyQualifer 將會保存行鍵,列族和列標識位。 針對行鍵的隨機操作會根據這三個值來排序。 3. 寫出 HFile 的臨時路徑 接下來的 Spark 批量加載指令,使用 HBase 的 LoadIncrementalHFiles 對象來加載 HBase 中新創建的 HFiles。 使用 Spark 批量加載的附加參數 你可以在 hbaseBulkLoad 中用附加參數設置以下屬性: * HFile 的最大文件大小 * 從壓縮中排除 HFile 的標志 * 列族設置,包含 compression(壓縮), bloomType(布隆(過濾器)類型), blockSize(塊大小), and dataBlockEncoding(數據塊編碼) 例 34\. 使用附加參數 ``` val sc = new SparkContext("local", "test") val config = new HBaseConfiguration() val hbaseContext = new HBaseContext(sc, config) val stagingFolder = ... val rdd = sc.parallelize(Array( (Bytes.toBytes("1"), (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))), (Bytes.toBytes("3"), (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ... val familyHBaseWriterOptions = new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions] val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128, "PREFIX") familyHBaseWriterOptions.put(Bytes.toBytes("columnFamily1"), f1Options) rdd.hbaseBulkLoad(TableName.valueOf(tableName), t => { val rowKey = t._1 val family:Array[Byte] = t._2(0)._1 val qualifier = t._2(0)._2 val value = t._2(0)._3 val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier) Seq((keyFamilyQualifier, value)).iterator }, stagingFolder.getPath, familyHBaseWriterOptions, compactionExclude = false, HConstants.DEFAULT_MAX_FILE_SIZE) val load = new LoadIncrementalHFiles(config) load.doBulkLoad(new Path(stagingFolder.getPath), conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName))) ``` 現在讓我們來看一下如何調用輕量化對象批量加載的實現: 例 35\. 使用輕量批量加載 ``` val sc = new SparkContext("local", "test") val config = new HBaseConfiguration() val hbaseContext = new HBaseContext(sc, config) val stagingFolder = ... val rdd = sc.parallelize(Array( ("1", (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))), ("3", (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ... rdd.hbaseBulkLoadThinRows(hbaseContext, TableName.valueOf(tableName), t => { val rowKey = t._1 val familyQualifiersValues = new FamiliesQualifiersValues t._2.foreach(f => { val family:Array[Byte] = f._1 val qualifier = f._2 val value:Array[Byte] = f._3 familyQualifiersValues +=(family, qualifier, value) }) (new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues) }, stagingFolder.getPath, new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions], compactionExclude = false, 20) val load = new LoadIncrementalHFiles(config) load.doBulkLoad(new Path(stagingFolder.getPath), conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName))) ``` 注意在使用輕量行批量加載的時候函數返回的元組中: 第一個元素是行鍵,第二個元素是 FamiliesQualifiersValues,這個對象中含有這一行里所有的數值,并且包含了所有的列族。 ## 107\. SparkSQL/DataFrames (HBase-Spark 中的)HBase-Spark 連接器 提供了: [DataSource API](https://databricks.com/blog/2015/01/09/spark-sql-data-sources-api-unified-data-access-for-the-spark-platform.html) ([SPARK-3247](https://issues.apache.org/jira/browse/SPARK-3247)) 在 Spark 1.2.0 的時候被引入,連接了簡單的 HBase 的鍵值存儲與復雜的關系型 SQL 查詢,并且使得用戶可以使用 Spark 在 HBase 上施展復雜的數據分析工作。 HBase Dataframe 是 Spark Dataframe 的一個標準,并且它允許和其他任何數據源——例如 Hive, Orc, Parquet, JSON 之類。 HBase-Spark Connector 使用的關鍵技術例如分區修剪,列修剪,推斷后置以及數據本地化。 為了使用 HBase-Spark connector,用戶需要定義 HBase 到 Spark 表中的映射目錄。 準備數據并且填充 HBase 的表,然后將其加載到 HBase DataFrame 中去。 在此之后,用戶可以使用 SQL 查詢語句整合查詢與數據獲取。 接下來的例子說明了最基本的過程 ### 107.1\. 定義目錄 ``` def catalog = s"""{ |"table":{"namespace":"default", "name":"table1"}, |"rowkey":"key", |"columns":{ |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"}, |"col2":{"cf":"cf2", "col":"col2", "type":"double"}, |"col3":{"cf":"cf3", "col":"col3", "type":"float"}, |"col4":{"cf":"cf4", "col":"col4", "type":"int"}, |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"}, |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"}, |"col7":{"cf":"cf7", "col":"col7", "type":"string"}, |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"} |} |}""".stripMargin ``` 目錄定義了從 HBase 到 Spark 表的一個映射。 這個目錄中有兩個關鍵部分。 第一個是行鍵的定義,另一個是將 Spark 表中的列映射到 HBase 的列族和列標識位。 上面的 schema 定義了一個 HBase 中的表,名為 Table1,行鍵作為鍵與一些列(col1 `-` col8)。 注意行鍵也需要被定義為一個列(col0),該列具有特定的列族(rowkey)。 ### 107.2\. 保存 DataFrame ``` case class HBaseRecord( col0: String, col1: Boolean, col2: Double, col3: Float, col4: Int, col5: Long, col6: Short, col7: String, col8: Byte) object HBaseRecord { def apply(i: Int, t: String): HBaseRecord = { val s = s"""row${"%03d".format(i)}""" HBaseRecord(s, i % 2 == 0, i.toDouble, i.toFloat, i, i.toLong, i.toShort, s"String$i: $t", i.toByte) } } val data = (0 to 255).map { i => HBaseRecord(i, "extra")} sc.parallelize(data).toDF.write.options( Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")) .format("org.apache.hadoop.hbase.spark ") .save() ``` 用戶準備的數據(`data`)是一個本地的 Scala 集合,含有 256 個 HBaseRecord 對象。 `sc.parallelize(data)` 函數分發了從 RDD 中來的數據。`toDF`返回了一個 DataFrame。 `write` 函數返回了一個 DataFrameWriter 來將 DataFrame 中的數據到外部存儲(例如這里是 HBase)。 `save` 函數將會創建一個具有 5 個 Region 的 HBase 表來在內部保存 DataFrame。 ### 107.3\. 加載 DataFrame ``` def withCatalog(cat: String): DataFrame = { sqlContext .read .options(Map(HBaseTableCatalog.tableCatalog->cat)) .format("org.apache.hadoop.hbase.spark") .load() } val df = withCatalog(catalog) ``` 在 withCatalog 函數中,sqlContext 是一個 SQLContext 的變量,是一個用于與 Spark 中結構化(行與列)的數據一起工作的一個入口點。 `read` 返回一個 DataFrameReader,他可以用于從 DataFrame 中讀取數據。`option`函數為輸出到 DataFrameReader 的底層的數據源增加了輸入選項。 以及,`format`函數表示了 DataFrameReader 的輸入數據源的格式。 `load()` 函數將其加載為一個 DataFrame, 數據幀 `df`將由`withCatalog`函數返回,用于訪問 HBase 表,例如 4.4 與 4.5. ### 107.4\. Language Integrated Query ``` val s = df.filter(($"col0" <= "row050" && $"col0" > "row040") || $"col0" === "row005" || $"col0" <= "row005") .select("col0", "col1", "col4") s.show ``` DataFrame 可以做很多操作,例如 join, sort, select, filter, orderBy 等等等等。`df.filter` 通過指定的 SQL 表達式提供過濾器,`select`選擇一系列的列:`col0`, `col1` 和 `col4`。 ### 107.5\. SQL 查詢 ``` df.registerTempTable("table1") sqlContext.sql("select count(col1) from table1").show ``` `registerTempTable` 注冊了一個名為 `df` 的 DataFrame 作為臨時表,表名為`table1`,臨時表的生命周期和 SQLContext 有關,用于創建`df`。 `sqlContext.sql`函數允許用戶執行 SQL 查詢。 ### 107.6\. Others 例 36\. 查詢不同的時間戳 在 HBaseSparkConf 中,可以設置 4 個和時間戳有關的參數,它們分別表示為 TIMESTAMP, MIN_TIMESTAMP, MAX_TIMESTAMP 和 MAX_VERSIONS。用戶可以使用不同的時間戳或者利用 MIN_TIMESTAMP 和 MAX_TIMESTAMP 查詢時間范圍內的記錄。同時,下面的例子使用了具體的數值取代了 tsSpecified 和 oldMs。 下例展示了如何使用不同的時間戳加載 df DataFrame。tsSpecified 由用戶定義,HBaseTableCatalog 定義了 HBase 和 Relation 關系的 schema。writeCatalog 定義了 schema 映射的目錄。 ``` val df = sqlContext.read .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMESTAMP -> tsSpecified.toString)) .format("org.apache.hadoop.hbase.spark") .load() ``` 下例展示了如何使用不同的時間范圍加載 df DataFrame。oldMs 由用戶定義。 ``` val df = sqlContext.read .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0", HBaseSparkConf.MAX_TIMESTAMP -> oldMs.toString)) .format("org.apache.hadoop.hbase.spark") .load() ``` 在加載 DataFrame 之后,用戶就可以查詢數據。 ``` df.registerTempTable("table") sqlContext.sql("select count(col1) from table").show ``` 例 37\. 原生 Avro 支持 Example 37\. Native Avro support HBase-Spark Connector 支持不同類型的數據格式例如 Avro, Jason 等等。下面的用例展示了 Spark 如何支持 Avro。用戶可以將 Avro 記錄直接持久化進 HBase。 在內部,Avro schema 自動的轉換為原生的 Spark Catalyst 數據類型。 注意,HBase 表中無論是鍵或者值的部分都可以在 Avro 格式定義。 1) 為 schema 映射定義目錄: ``` def catalog = s"""{ |"table":{"namespace":"default", "name":"Avrotable"}, |"rowkey":"key", |"columns":{ |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, |"col1":{"cf":"cf1", "col":"col1", "type":"binary"} |} |}""".stripMargin ``` `catalog`是一個 HBase 表的 schema,命名為 `Avrotable`。行鍵作為鍵,并且有一個列 col1。行鍵也被定義為詳細的一列(col0),并且指定列族(rowkey)。 2) 準備數據: ``` object AvroHBaseRecord { val schemaString = s"""{"namespace": "example.avro", | "type": "record", "name": "User", | "fields": [ | {"name": "name", "type": "string"}, | {"name": "favorite_number", "type": ["int", "null"]}, | {"name": "favorite_color", "type": ["string", "null"]}, | {"name": "favorite_array", "type": {"type": "array", "items": "string"}}, | {"name": "favorite_map", "type": {"type": "map", "values": "int"}} | ] }""".stripMargin val avroSchema: Schema = { val p = new Schema.Parser p.parse(schemaString) } def apply(i: Int): AvroHBaseRecord = { val user = new GenericData.Record(avroSchema); user.put("name", s"name${"%03d".format(i)}") user.put("favorite_number", i) user.put("favorite_color", s"color${"%03d".format(i)}") val favoriteArray = new GenericData.Array[String](2, avroSchema.getField("favorite_array").schema()) favoriteArray.add(s"number${i}") favoriteArray.add(s"number${i+1}") user.put("favorite_array", favoriteArray) import collection.JavaConverters._ val favoriteMap = Map[String, Int](("key1" -> i), ("key2" -> (i+1))).asJava user.put("favorite_map", favoriteMap) val avroByte = AvroSedes.serialize(user, avroSchema) AvroHBaseRecord(s"name${"%03d".format(i)}", avroByte) } } val data = (0 to 255).map { i => AvroHBaseRecord(i) } ``` 首先定義 `schemaString`,然后它被解析來獲取`avroSchema`,`avroSchema`是用來生成 `AvroHBaseRecord`的。`data` 由用戶準備,是一個有 256 個`AvroHBaseRecord`對象的原生 Scala 集合。 3) 保存 DataFrame: ``` sc.parallelize(data).toDF.write.options( Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")) .format("org.apache.spark.sql.execution.datasources.hbase") .save() ``` 對于由 schema `catalog`提供的已有的數據幀,上述語句將會創建一個具有 5 個分區的 HBase 表,并且將數據存進去。 4) 加載 DataFrame ``` def avroCatalog = s"""{ |"table":{"namespace":"default", "name":"avrotable"}, |"rowkey":"key", |"columns":{ |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"} |} |}""".stripMargin def withCatalog(cat: String): DataFrame = { sqlContext .read .options(Map("avroSchema" -> AvroHBaseRecord.schemaString, HBaseTableCatalog.tableCatalog -> avroCatalog)) .format("org.apache.spark.sql.execution.datasources.hbase") .load() } val df = withCatalog(catalog) ``` 在 `withCatalog` 函數中,`read` 會返回一個可以將數據讀取成 DataFrame 格式的 DataFrameReader。 `option` 函數追加輸入選項來指定 DataFrameReader 使用的底層數據源。這里有兩個選項,一個是設置`avroSchema`為`AvroHBaseRecord.schemaString`,另一個是設置`HBaseTableCatalog.tableCatalog` 為 `avroCatalog`。`load()` 函數加載所有的數據為 DataFrame。數據幀 `df` 由`withCatalog` 函數返回,可用于訪問 HBase 表中的數據。 5) SQL 查詢 ``` df.registerTempTable("avrotable") val c = sqlContext.sql("select count(1) from avrotable"). ``` 在加載 df DataFrame 之后,用戶可以查詢數據。registerTempTable 將 df DataFrame 注冊為一個臨時表,表名為 avrotable。 `sqlContext.sql`函數允許用戶執行 SQL 查詢。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看