<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                # Pregel API 圖本身是遞歸數據結構,頂點的屬性依賴于它們鄰居的屬性,這些鄰居的屬性又依賴于自己鄰居的屬性。所以許多重要的圖算法都是迭代的重新計算每個頂點的屬性,直到滿足某個確定的條件。一系列的graph-parallel抽象已經被提出來用來表達這些迭代算法。GraphX公開了一個類似Pregel的操作,它是廣泛使用的Pregel和GraphLab抽象的一個融合。 在GraphX中,更高級的Pregel操作是一個約束到圖拓撲的批量同步(bulk-synchronous)并行消息抽象。Pregel操作者執行一系列的超級步驟(super steps),在這些步驟中,頂點從之前的超級步驟中接收進入(inbound)消息的總和,為頂點屬性計算一個新的值,然后在以后的超級步驟中發送消息到鄰居頂點。不像Pregel而更像GraphLab,消息作為一個邊三元組的函數被并行計算,消息計算既訪問了源頂點特征也訪問了目的頂點特征。在超級步中,沒有收到消息的頂點被跳過。當沒有消息遺留時,Pregel操作停止迭代并返回最終的圖。 注意,與更標準的Pregel實現不同的是,GraphX中的頂點僅僅能發送信息給鄰居頂點,并利用用戶自定義的消息函數構造消息。這些限制允許在GraphX進行額外的優化。 一下是[ Pregel操作](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.GraphOps@pregel[A](A,Int,EdgeDirection)((VertexId,VD,A)?VD,(EdgeTriplet[VD,ED])?Iterator[(VertexId,A)],(A,A)?A)(ClassTag[A]):Graph[VD,ED])的類型簽名以及實現草圖(注意,訪問graph.cache已經被刪除) ~~~ class GraphOps[VD, ED] { def pregel[A] (initialMsg: A, maxIter: Int = Int.MaxValue, activeDir: EdgeDirection = EdgeDirection.Out) (vprog: (VertexId, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A) : Graph[VD, ED] = { // Receive the initial message at each vertex var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache() // compute the messages var messages = g.mapReduceTriplets(sendMsg, mergeMsg) var activeMessages = messages.count() // Loop until no messages remain or maxIterations is achieved var i = 0 while (activeMessages > 0 && i < maxIterations) { // Receive the messages: ----------------------------------------------------------------------- // Run the vertex program on all vertices that receive messages val newVerts = g.vertices.innerJoin(messages)(vprog).cache() // Merge the new vertex values back into the graph g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache() // Send Messages: ------------------------------------------------------------------------------ // Vertices that didn't receive a message above don't appear in newVerts and therefore don't // get to send messages. More precisely the map phase of mapReduceTriplets is only invoked // on edges in the activeDir of vertices in newVerts messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache() activeMessages = messages.count() i += 1 } g } } ~~~ 注意,pregel有兩個參數列表(graph.pregel(list1)(list2))。第一個參數列表包含配置參數初始消息、最大迭代數、發送消息的邊的方向(默認是沿邊方向出)。第二個參數列表包含用戶自定義的函數用來接收消息(vprog)、計算消息(sendMsg)、合并消息(mergeMsg)。 我們可以用Pregel操作表達計算單源最短路徑( single source shortest path)。 ~~~ import org.apache.spark.graphx._ // Import random graph generation library import org.apache.spark.graphx.util.GraphGenerators // A graph with edge attributes containing distances val graph: Graph[Int, Double] = GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble) val sourceId: VertexId = 42 // The ultimate source // Initialize the graph such that all vertices except the root have distance infinity. val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity) val sssp = initialGraph.pregel(Double.PositiveInfinity)( (id, dist, newDist) => math.min(dist, newDist), // Vertex Program triplet => { // Send Message if (triplet.srcAttr + triplet.attr < triplet.dstAttr) { Iterator((triplet.dstId, triplet.srcAttr + triplet.attr)) } else { Iterator.empty } }, (a,b) => math.min(a,b) // Merge Message ) println(sssp.vertices.collect.mkString("\n")) ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看