<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                * Pregel是Google提出的用于大規模分布式圖計算框架。可用于: * 圖遍歷(BFS)。 * 單源最短路徑(SSSP)。 * PageRank計算。 * Pregel的計算由一系列迭代組成,稱為supersteps。 * Pregel迭代過程。 (1)每個頂點從上一個superstep接收入站消息。 (2)計算頂點新的屬性值。 (3)在下一個superstep中向相鄰的頂點發送消息。 (4)當沒有剩余消息時,迭代結束。 求下圖中頂點的最小值。 ![](https://img.kancloud.cn/e7/ac/e7ac0c8448e4d4f91255760e59f47868_890x260.png) Pregel的計算過程如下: ![](https://img.kancloud.cn/0c/16/0c16b3d6b8591b6fed8e85abc13f3f88_1303x1091.png) Graphx提供的Pregel API如下: ```scala class GraphOps[VD, ED](graph : org.apache.spark.graphx.Graph[VD, ED]){ def pregel[A]( initialMsg : A, // 在superstep 0之前發送至頂點的初始消息 maxIterations : Int, // 將要執行的最大迭代次數 activeDirection : EdgeDirection // 發送消息方向(默認是出邊方向:EdgeDirection.Out) ) ( vprog : Function3, // 用戶定義函數,用于頂點接收消息 sendMsg : Function1, // 用戶定義的函數,用于確定下一個迭代發送的消息及發往何處 mergeMsg : Function2[A, A, A] // 用戶定義的函數,在vprog前,合并到達頂點的多個消息 ){ } } ``` Pregel求下圖的最小值。 ![](https://img.kancloud.cn/e7/ac/e7ac0c8448e4d4f91255760e59f47868_890x260.png) ```scala import org.apache.spark.SparkContext import org.apache.spark.graphx.{Edge, EdgeDirection, EdgeTriplet, Graph, VertexId} import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession object Pregel { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .master("local[4]") .appName(this.getClass.getName) .getOrCreate() val sc: SparkContext = spark.sparkContext import spark.implicits._ // 1. 構建頂點的RDD val verts: RDD[(Long, (Int, Int))] = sc.parallelize(Array( (1L, (7, -1)), (2L, (3, -1)), (3L, (2, -1)), (4L, (6, -1)) )) // 2. 構建邊的RDD val edges: RDD[Edge[Boolean]] = sc.parallelize(Array( Edge(1L, 2L, true), Edge(1L, 4L, true), Edge(2L, 4L, true), Edge(3L, 1L, true), Edge(3L, 4L, true) )) // 3. 構建圖 val graph: Graph[(Int, Int), Boolean] = Graph(verts, edges) graph.triplets.foreach(println) // ((1,(7,-1)),(4,(6,-1)),true) // ((3,(2,-1)),(1,(7,-1)),true) // ((1,(7,-1)),(2,(3,-1)),true) // ((2,(3,-1)),(4,(6,-1)),true) // ((3,(2,-1)),(4,(6,-1)),true) // initialMsg 在superstep 0之前發送至頂點的初始消息 // maxIterations 將要執行的最大迭代次數 // activeDirection 發送消息方向(默認是出邊方向:EdgeDirection.Out) // vprog 用戶定義函數,用于頂點接收消息 // sendMsg 用戶定義的函數,用于確定下一個迭代發送的消息及發往何處 // mergeMsg 用戶定義的函數,在vprog前,合并到達頂點的多個消息 val initialMsg: Int = 9999 val maxIterations: Int = Int.MaxValue // vertexId: 當前頂點的Id // value: 當前頂點的attr // message: 當前頂點要接收的消息 def vprog(vertexId: VertexId, value: (Int, Int), message: Int) = { if (message == initialMsg) value else (message min value._1, value._1) } def sendMsg(triple: EdgeTriplet[(Int, Int), Boolean]) = { val sourceVertex: (Int, Int) = triple.srcAttr if (sourceVertex._1 == sourceVertex._2) Iterator.empty else Iterator((triple.dstId, sourceVertex._1)) } def mergeMsg(msg1: Int, msg2: Int): Int = msg1 min msg2 graph.pregel(initialMsg, maxIterations, EdgeDirection.Out)(vprog, sendMsg, mergeMsg) .vertices.foreach(println) // (1,(2,7)) // (2,(2,3)) // (3,(2,-1)) // (4,(2,2)) } } ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看