<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                在默認情況下,當 Spark 在集群的多個不同節點的多個任務上并行運行一個函數時,它會把函數中涉及到的每個變量,在每個任務上都生成一個副本。但是,有時候需要在多個任務之間共享變量,或者在任務 (Task) 和任務控制節點 (Driver Program) 之間共享變量。 為了滿足這種需求,Spark 提供了兩種類型的變量: * **累加器 (accumulators)**:累加器支持在所有不同節點之間進行累加計算 (比如計數或者求和)。 * **廣播變量 (broadcast variables)**:廣播變量用來把變量在所有節點的內存之間進行共享,在每個機器上緩存一個只讀的變量,而不是為機器上的每個任務都生成一個副本。 ## 廣播變量 廣播變量允許程序員緩存一個只讀的變量在每臺機器上面,而不是每個任務保存一份拷貝。例如,利用廣播變量,我們能夠以一種更有效率的方式將一個大數據量輸入集合的副本分配給每個節點。(Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.)Spark也嘗試著利用有效的廣播算法去分配廣播變量,以減少通信的成本。 一個廣播變量可以通過調用`SparkContext.broadcast(v)`方法從一個初始變量v中創建。廣播變量是v的一個包裝變量,它的值可以通過`value`方法訪問,下面的代碼說明了這個過程: ```scala scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c) scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3) ``` 廣播變量創建以后,我們就能夠在集群的任何函數中使用它來代替變量v,這樣我們就不需要再次傳遞變量v到每個節點上。另外,為了保證所有的節點得到廣播變量具有相同的值,對象v不能在廣播之后被修改。 **一個相對完整的例子** ``` import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object BroadcastVariablesTest { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) sc.setLogLevel("WARN") //不使用廣播變量 val kvFruit: RDD[(Int, String)] = sc.parallelize(List((1,"apple"),(2,"orange"),(3,"banana"),(4,"grape"))) val fruitMap: collection.Map[Int, String] =kvFruit.collectAsMap //scala.collection.Map[Int,String] = Map(2 -> orange, 4 -> grape, 1 -> apple, 3 -> banana) val fruitIds: RDD[Int] = sc.parallelize(List(2,4,1,3)) //根據水果編號取水果名稱 val fruitNames: RDD[String] = fruitIds.map(x=>fruitMap(x)) fruitNames.foreach(println) //注意:以上代碼看似一點問題沒有,但是考慮到數據量如果較大,且Task數較多, //那么會導致,被各個Task共用到的fruitMap會被多次傳輸 //應該要減少fruitMap的傳輸,一臺機器上一個,被該臺機器中的Task共用即可 //如何做到?---使用廣播變量 //注意:廣播變量的值不能被修改,如需修改可以將數據存到外部數據源,如MySQL、Redis println("=====================") val BroadcastFruitMap: Broadcast[collection.Map[Int, String]] = sc.broadcast(fruitMap) val fruitNames2: RDD[String] = fruitIds.map(x=>BroadcastFruitMap.value(x)) fruitNames2.foreach(println) } } ``` ## 累加器 顧名思義,累加器是一種只能通過關聯操作進行“加”操作的變量,因此它能夠高效的應用于并行操作中。它們能夠用來實現`counters`和`sums`。Spark原生支持數值類型的累加器,開發者可以自己添加支持的類型。 如果創建了一個具名的累加器,它可以在spark的UI中顯示。這對于理解運行階段(running stages)的過程有很重要的作用。(注意:這在python中還不被支持) 一個累加器可以通過調用`SparkContext.accumulator(v)`方法從一個初始變量v中創建。運行在集群上的任務可以通過`add`方法或者使用`+=`操作來給它加值。然而,它們無法讀取這個值。只有驅動程序可以使用`value`方法來讀取累加器的值。 如下的代碼,展示了如何利用累加器將一個數組里面的所有元素相加: ```scala scala> val accum = sc.accumulator(0, "My Accumulator") accum: spark.Accumulator[Int] = 0 scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value res2: Int = 10 ``` 這個例子利用了內置的整數類型累加器。開發者可以利用子類[AccumulatorParam](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.AccumulatorParam)創建自己的 累加器類型。AccumulatorParam接口有兩個方法:`zero`方法為你的數據類型提供一個“0 值”(zero value);`addInPlace`方法計算兩個值的和。例如,假設我們有一個`Vector`類代表數學上的向量,我們能夠 如下定義累加器: ```scala object VectorAccumulatorParam extends AccumulatorParam[Vector] { def zero(initialValue: Vector): Vector = { Vector.zeros(initialValue.size) } def addInPlace(v1: Vector, v2: Vector): Vector = { v1 += v2 } } // Then, create an Accumulator of this type: val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam) ``` 在scala中,Spark支持用更一般的[Accumulable](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.Accumulable)接口來累積數據-結果類型和用于累加的元素類型 不一樣(例如通過收集的元素建立一個列表)。Spark也支持用`SparkContext.accumulableCollection`方法累加一般的scala集合類型。 **一個相對完整的測試的例子** 通常在向 `Spark` 傳遞函數時,比如使用 `map()` 函數或者用`filter()`傳條件時,可以使用驅動器程序中定義的變量,但是集群中運行的每個任務都會得到這些變量的一份新的副本,更新這些副本的值也不會影響驅動器中的對應變量。這時使用累加器就可以實現我們想要的效果: 語法:`val xx: Accumulator[Int] = sc.accumulator(0)` 示例代碼: ``` import org.apache.spark.rdd.RDD import org.apache.spark.{Accumulator, SparkConf, SparkContext} object AccumulatorTest { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) sc.setLogLevel("WARN") //使用scala集合完成累加 var counter1: Int = 0; var data = Seq(1,2,3) data.foreach(x => counter1 += x ) println(counter1)//6 println("+++++++++++++++++++++++++") //使用RDD進行累加(RDD是分布式的) var counter2: Int = 0; val dataRDD: RDD[Int] = sc.parallelize(data) //分布式集合的[1,2,3] dataRDD.foreach(x => counter2 += x) println(counter2)//0 //注意:上面的RDD操作運行結果是0 //因為foreach中的函數是傳遞給Worker中的Executor執行,用到了counter2變量 //而counter2變量在Driver端定義的,在傳遞給Executor的時候,各個Executor都有了一份counter2 //最后各個Executor將各自個x加到自己的counter2上面了,和Driver端的counter2沒有關系 //那這個問題得解決啊!不能因為使用了Spark連累加都做不了了啊! //如果解決?---使用累加器 val counter3: Accumulator[Int] = sc.accumulator(0) dataRDD.foreach(x => counter3 += x) println(counter3)//6 } } ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看