# GraphX Programming Guide
* [概述](#概述)
* [入門](#入門)
* [屬性 Graph](#屬性-graph)
* [示例屬性 Graph](#示例屬性-graph)
* [Graph 運算符](#graph-運算符)
* [運算符的匯總表](#運算符的匯總表)
* [Property 運算符](#property-運算符)
* [Structural 運算符](#structural-運算符)
* [Join 運算符](#join-運算符)
* [鄰域聚合](#鄰域聚合)
* [聚合消息(aggregateMessages)](#聚合消息-aggregatemessages)
* [Map Reduce Triplets Transition Guide (Legacy)](#map-reduce-triplets-transition-guide-legacy)
* [計算級別信息](#計算級別信息)
* [收集相鄰點](#收集相鄰點)
* [Caching and Uncaching](#caching-and-uncaching)
* [Pregel API](#pregel-api)
* [Graph 建造者](#graph-建造者)
* [Vertex and Edge RDDs](#vertex-and-edge-rdds)
* [VertexRDDs](#vertexrdds)
* [EdgeRDDs](#edgerdds)
* [優化表示](#優化表示)
* [Graph 算法](#graph-算法)
* [PageRank](#pagerank)
* [連接組件](#連接組件)
* [Triangle 計數](#triangle-計數)
* [示例](#示例)

# 概述
GraphX 是 Spark 中用于圖形和圖形并行計算的新組件。在高層次上,GraphX 通過引入一個新的[圖形](#property_graph)抽象來擴展 Spark [RDD](api/scala/index.html#org.apache.spark.rdd.RDD):一種具有附加到每個頂點和邊緣的屬性的定向多重圖形。為了支持圖計算,GraphX 公開了一組基本運算符(例如: [subgraph](#structural_operators),[joinVertices](#join_operators) 和 [aggregateMessages](#aggregateMessages))以及 [Pregel](#pregel) API 的優化變體。此外,GraphX 還包括越來越多的圖形[算法](#graph_algorithms) 和 [構建器](#graph_builders),以簡化圖形分析任務。
# 入門
首先需要將 Spark 和 GraphX 導入到項目中,如下所示:
```
import org.apache.spark._
import org.apache.spark.graphx._
// To make some of the examples work we will also need RDD
import org.apache.spark.rdd.RDD
```
如果您不使用 Spark 外殼,您還需要一個 `SparkContext`。要了解有關 Spark 入門的更多信息,請參考 [Spark快速入門指南](quick-start.html)。
# 屬性 Graph
[屬性 Graph](api/scala/index.html#org.apache.spark.graphx.Graph) 是一個定向多重圖形,用戶定義的對象附加到每個頂點和邊緣。定向多圖是具有共享相同源和目標頂點的潛在多個平行邊緣的有向圖。支持平行邊緣的能力簡化了在相同頂點之間可以有多個關系(例如: 同事和朋友)的建模場景。每個頂點都由唯一的 64 位長標識符(`VertexId`)鍵入。GraphX 不對頂點標識符施加任何排序約束。類似地,邊緣具有對應的源和目標頂點標識符。
屬性圖是通過 vertex(`VD`)和 edge(`ED`)類型進行參數化的。這些是分別與每個頂點和邊緣相關聯的對象的類型。
> 當它們是原始數據類型(例如: int,double 等等)時,GraphX 優化頂點和邊緣類型的表示,通過將其存儲在專門的數組中來減少內存占用。
在某些情況下,可能希望在同一個圖形中具有不同屬性類型的頂點。這可以通過繼承來實現。例如,將用戶和產品建模為二分圖,我們可能會執行以下操作:
```
class VertexProperty()
case class UserProperty(val name: String) extends VertexProperty
case class ProductProperty(val name: String, val price: Double) extends VertexProperty
// The graph might then have the type:
var graph: Graph[VertexProperty, String] = null
```
像 RDD 一樣,屬性圖是不可變的,分布式的和容錯的。通過生成具有所需更改的新圖形來完成對圖表的值或結構的更改。請注意,原始圖形的大部分(即,未受影響的結構,屬性和索引)在新圖表中重復使用,可降低此內在功能數據結構的成本。使用一系列頂點分割啟發式方法,在執行器之間劃分圖形。與 RDD 一樣,在發生故障的情況下,可以在不同的機器上重新創建圖形的每個分區。
邏輯上,屬性圖對應于一對編碼每個頂點和邊緣的屬性的類型集合(RDD)。因此,圖類包含訪問圖形頂點和邊的成員:
```
class Graph[VD, ED] {
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
}
```
`VertexRDD[VD]` 和 `EdgeRDD[ED]` 分別擴展了 `RDD[(VertexId, VD)]` 和 `RDD[Edge[ED]]` 的優化版本。`VertexRDD[VD]` 和 `EdgeRDD[ED]` 都提供了圍繞圖計算和利用內部優化的附加功能。我們在[頂點和邊緣 RDD](#vertex_and_edge_rdds) 部分更詳細地討論了 [`VertexRDD`](api/scala/index.html#org.apache.spark.graphx.VertexRDD) 和 [`EdgeRDD`](api/scala/index.html#org.apache.spark.graphx.EdgeRDD) API,但現在它們可以被認為是 `RDD[(VertexId, VD)]` 和 `RDD[Edge[ED]]` 的簡單 RDD。
### 示例屬性 Graph
假設我們要構建一個由 GraphX 項目中的各種協作者組成的屬性圖。頂點屬性可能包含用戶名和職業。我們可以用描述協作者之間關系的字符串來注釋邊:

生成的圖形將具有類型簽名:
```
val userGraph: Graph[(String, String), String]
```
從原始文件,RDD 甚至合成生成器構建屬性圖有許多方法,這些在[圖形構建器](#graph_builders)的一節中有更詳細的討論。最普遍的方法是使用 [Graph 對象](api/scala/index.html#org.apache.spark.graphx.Graph$)。例如,以下代碼從 RDD 集合中構建一個圖:
```
// Assume the SparkContext has already been constructed
val sc: SparkContext
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
```
在上面的例子中,我們使用了 [`Edge`](api/scala/index.html#org.apache.spark.graphx.Edge) case 類。邊緣具有 `srcId` 和 `dstId` 對應于源和目標頂點標識符。此外,`Edge` 該類有一個 `attr` 存儲邊緣屬性的成員。
我們可以分別使用 `graph.vertices` 和 `graph.edges` 成員將圖形解構成相應的頂點和邊緣視圖。
```
val graph: Graph[(String, String), String] // Constructed from above
// Count all users which are postdocs
graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
// Count all the edges where src > dst
graph.edges.filter(e => e.srcId > e.dstId).count
```
> 注意,`graph.vertices` 返回一個 `VertexRDD[(String, String)]` 擴展 `RDD[(VertexId, (String, String))]`,所以我們使用 scala `case` 表達式來解構元組。另一方面,`graph.edges` 返回一個 `EdgeRDD` 包含 `Edge[String]` 對象。我們也可以使用 case 類型構造函數,如下所示:
```
graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count
```
除了屬性圖的頂點和邊緣視圖之外,GraphX 還暴露了三元組視圖。三元組視圖邏輯上連接頂點和邊緣屬性,生成 `RDD[EdgeTriplet[VD, ED]]` 包含 [`EdgeTriplet`](api/scala/index.html#org.apache.spark.graphx.EdgeTriplet) 該類的實例。此 連接可以用以下SQL表達式表示:
```
SELECT src.id, dst.id, src.attr, e.attr, dst.attr
FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst
ON e.srcId = src.Id AND e.dstId = dst.Id
```
或圖形為:

[`EdgeTriplet`](api/scala/index.html#org.apache.spark.graphx.EdgeTriplet) 類通過分別添加包含源和目標屬性的 `srcAttr` 和 `dstAttr` 成員來擴展 [`Edge`](api/scala/index.html#org.apache.spark.graphx.Edge) 類。我們可以使用圖形的三元組視圖來渲染描述用戶之間關系的字符串集合。
```
val graph: Graph[(String, String), String] // Constructed from above
// Use the triplets view to create an RDD of facts.
val facts: RDD[String] =
graph.triplets.map(triplet =>
triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1)
facts.collect.foreach(println(_))
```
# Graph 運算符
正如 RDDs 有這樣的基本操作 `map`,`filter`,以及 `reduceByKey`,性能圖表也有采取用戶定義的函數基本運算符的集合,產生具有轉化特性和結構的新圖。定義了優化實現的核心運算符,并定義了 [`Graph`](api/scala/index.html#org.apache.spark.graphx.Graph) 表示為核心運算符組合的方便運算符 [`GraphOps`](api/scala/index.html#org.apache.spark.graphx.GraphOps)。不過,由于 Scala 的含義,操作員 `GraphOps` 可自動作為成員使用 `Graph`。例如,我們可以通過以下方法計算每個頂點的入度(定義 `GraphOps`):
```
val graph: Graph[(String, String), String]
// Use the implicit GraphOps.inDegrees operator
val inDegrees: VertexRDD[Int] = graph.inDegrees
```
區分核心圖形操作的原因 [`GraphOps`](api/scala/index.html#org.apache.spark.graphx.GraphOps) 是能夠在將來支持不同的圖形表示。每個圖形表示必須提供核心操作的實現,并重用許多有用的操作 [`GraphOps`](api/scala/index.html#org.apache.spark.graphx.GraphOps)。
### 運算符的匯總表
以下是兩個定義的功能的簡要摘要,但為簡單起見 [`Graph`](api/scala/index.html#org.apache.spark.graphx.Graph),[`GraphOps`](api/scala/index.html#org.apache.spark.graphx.GraphOps) 它作為 Graph 的成員呈現。請注意,已經簡化了一些功能簽名(例如,刪除了默認參數和類型約束),并且已經刪除了一些更高級的功能,因此請參閱 API 文檔以獲取正式的操作列表。
```
/** Summary of the functionality in the property graph */
class Graph[VD, ED] {
// Information about the Graph ===================================================================
val numEdges: Long
val numVertices: Long
val inDegrees: VertexRDD[Int]
val outDegrees: VertexRDD[Int]
val degrees: VertexRDD[Int]
// Views of the graph as collections =============================================================
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
val triplets: RDD[EdgeTriplet[VD, ED]]
// Functions for caching graphs ==================================================================
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
def cache(): Graph[VD, ED]
def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
// Change the partitioning heuristic ============================================================
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
// Transform vertex and edge attributes ==========================================================
def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
: Graph[VD, ED2]
// Modify the graph structure ====================================================================
def reverse: Graph[VD, ED]
def subgraph(
epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
vpred: (VertexId, VD) => Boolean = ((v, d) => true))
: Graph[VD, ED]
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
// Join RDDs with the graph ======================================================================
def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
def outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])
(mapFunc: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]
// Aggregate information about adjacent triplets =================================================
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
def aggregateMessages[Msg: ClassTag](
sendMsg: EdgeContext[VD, ED, Msg] => Unit,
mergeMsg: (Msg, Msg) => Msg,
tripletFields: TripletFields = TripletFields.All)
: VertexRDD[A]
// Iterative graph-parallel computation ==========================================================
def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId,A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED]
// Basic graph algorithms ========================================================================
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
def connectedComponents(): Graph[VertexId, ED]
def triangleCount(): Graph[Int, ED]
def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
}
```
## Property 運算符
與 RDD `map` 運算符一樣,屬性圖包含以下內容:
```
class Graph[VD, ED] {
def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}
```
這些運算符中的每一個產生一個新的圖形,其中頂點或邊緣屬性被用戶定義的 `map` 函數修改。
> 請注意,在每種情況下,圖形結構都不受影響。這是這些運算符的一個關鍵特征,它允許生成的圖形重用原始圖形的結構索引。以下代碼段在邏輯上是等效的,但是第一個代碼片段不保留結構索引,并且不會從 GraphX 系統優化中受益:
```
val newVertices = graph.vertices.map { case (id, attr) => (id, mapUdf(id, attr)) }
val newGraph = Graph(newVertices, graph.edges)
```
> 而是 [`mapVertices`](api/scala/index.html#org.apache.spark.graphx.Graph@mapVertices[VD2]((VertexId,VD)?VD2)(ClassTag[VD2]):Graph[VD2,ED]) 用來保存索引:
```
val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))
```
這些運算符通常用于初始化特定計算或項目的圖形以避免不必要的屬性。例如,給出一個以度為頂點屬性的圖(我們稍后將描述如何構建這樣一個圖),我們為PageRank初始化它:
```
// Given a graph where the vertex property is the out degree
val inputGraph: Graph[Int, String] =
graph.outerJoinVertices(graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0))
// Construct a graph where each edge contains the weight
// and each vertex is the initial PageRank
val outputGraph: Graph[Double, Double] =
inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)
```
## Structural 運算符
目前GraphX只支持一套簡單的常用結構運算符,我們預計將來會增加更多。以下是基本結構運算符的列表。
```
class Graph[VD, ED] {
def reverse: Graph[VD, ED]
def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}
```
該 [`reverse`](api/scala/index.html#org.apache.spark.graphx.Graph@reverse:Graph[VD,ED]) 運算符將返回逆轉的所有邊緣方向上的新圖。這在例如嘗試計算逆 PageRank 時是有用的。由于反向操作不會修改頂點或邊緣屬性或更改邊緣數量,因此可以在沒有數據移動或重復的情況下高效地實現。
在 [`subgraph`](api/scala/index.html#org.apache.spark.graphx.Graph@subgraph((EdgeTriplet[VD,ED])?Boolean,(VertexId,VD)?Boolean):Graph[VD,ED]) 操作者需要的頂點和邊緣的謂詞,并返回包含只有滿足謂詞頂點的頂點的曲線圖(評估為真),并且滿足謂詞邊緣邊緣 _并連接滿足頂點謂詞頂點_。所述 `subgraph` 操作員可在情況編號被用來限制圖形以頂點和感興趣的邊緣或消除斷開的鏈接。例如,在以下代碼中,我們刪除了斷開的鏈接:
```
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
(4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
// Notice that there is a user 0 (for which we have no information) connected to users
// 4 (peter) and 5 (franklin).
graph.triplets.map(
triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// The valid subgraph will disconnect users 4 and 5 by removing user 0
validGraph.vertices.collect.foreach(println(_))
validGraph.triplets.map(
triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))
```
> 注意在上面的例子中只提供了頂點謂詞。如果未提供頂點或邊緣謂詞,則 `subgraph` 運算符默認為 `true`。
在 [`mask`](api/scala/index.html#org.apache.spark.graphx.Graph@mask[VD2,ED2](Graph[VD2,ED2])(ClassTag[VD2],ClassTag[ED2]):Graph[VD,ED]) 操作者通過返回包含該頂點和邊,它們也在輸入圖形中發現的曲線構造一個子圖。這可以與 `subgraph` 運算符一起使用,以便根據另一個相關圖中的屬性限制圖形。例如,我們可以使用缺少頂點的圖運行連接的組件,然后將答案限制為有效的子圖。
```
// Run Connected Components
val ccGraph = graph.connectedComponents() // No longer contains missing field
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// Restrict the answer to the valid subgraph
val validCCGraph = ccGraph.mask(validGraph)
```
[`groupEdges`](api/scala/index.html#org.apache.spark.graphx.Graph@groupEdges((ED,ED)?ED):Graph[VD,ED]) 操作符將多邊形中的平行邊(即,頂點對之間的重復邊)合并。在許多數值應用中,可以將平行邊緣(它們的權重組合)合并成單個邊緣,從而減小圖形的大小。
## Join 運算符
在許多情況下,有必要使用圖形連接來自外部收集(RDD)的數據。例如,我們可能有額外的用戶屬性,我們要與現有的圖形合并,或者我們可能希望將頂點屬性從一個圖形拉到另一個。這些任務可以使用 _join_ 運算符完成。下面我們列出關鍵 join 運算符:
```
class Graph[VD, ED] {
def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
: Graph[VD, ED]
def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]
}
```
[`joinVertices`](api/scala/index.html#org.apache.spark.graphx.GraphOps@joinVertices[U](RDD[(VertexId,U)])((VertexId,VD,U)?VD)(ClassTag[U]):Graph[VD,ED]) 操作符將頂點與輸入 RDD 相連,并返回一個新的圖形,其中通過將用戶定義的 `map` 函數應用于已連接頂點的結果而獲得的頂點屬性。RDD 中沒有匹配值的頂點保留其原始值。
> 請注意,如果 RDD 包含給定頂點的多個值,則只能使用一個值。因此,建議使用以下命令使輸入 RDD 變得獨一無二,這也將對結果值進行 _pre-index_,以顯著加速后續連接。
```
val nonUniqueCosts: RDD[(VertexId, Double)]
val uniqueCosts: VertexRDD[Double] =
graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b)
val joinedGraph = graph.joinVertices(uniqueCosts)(
(id, oldCost, extraCost) => oldCost + extraCost)
```
除了將用戶定義的 `map` 函數應用于所有頂點并且可以更改頂點屬性類型之外,更一般的 [`outerJoinVertices`](api/scala/index.html#org.apache.spark.graphx.Graph@outerJoinVertices[U,VD2](RDD[(VertexId,U)])((VertexId,VD,Option[U])?VD2)(ClassTag[U],ClassTag[VD2]):Graph[VD2,ED]) 的行為類似于 `joinVertices`。因為不是所有的頂點都可能在輸入 RDD 中具有匹配的值,所以 `map` 函數采用 `Option` 類型。例如,我們可以通過使用 `outDegree` 初始化頂點屬性來為 PageRank 設置一個圖。
```
val outDegrees: VertexRDD[Int] = graph.outDegrees
val degreeGraph = graph.outerJoinVertices(outDegrees) { (id, oldAttr, outDegOpt) =>
outDegOpt match {
case Some(outDeg) => outDeg
case None => 0 // No outDegree means zero outDegree
}
}
```
> 您可能已經注意到上述示例中使用的多個參數列表(例如: `f(a)(b)` curried 函數模式。雖然我們可以將 `f(a)(b)` 同樣地寫成 `f(a,b)`,這意味著 `b` 上的類型推斷不依賴于 `a`。因此,用戶需要為用戶定義的函數提供類型注釋:
```
val joinedGraph = graph.joinVertices(uniqueCosts,
(id: VertexId, oldCost: Double, extraCost: Double) => oldCost + extraCost)
```
## 鄰域聚合
許多圖形分析任務的關鍵步驟是聚合關于每個頂點鄰域的信息。例如,我們可能想知道每個用戶擁有的關注者數量或每個用戶的追隨者的平均年齡。許多迭代圖表算法(例如:網頁級別,最短路徑,以及連接成分)相鄰頂點(例如:電流值的 PageRank,最短到源路徑,和最小可達頂點 ID)的重復聚合性質。
> 為了提高性能,主聚合操作員 `graph.mapReduceTriplets` 從新的更改 `graph.AggregateMessages`。雖然 API 的變化相對較小,但我們在下面提供了一個轉換指南。
### 聚合消息 (aggregateMessages)
GraphX 中的核心聚合操作是 [`aggregateMessages`](api/scala/index.html#org.apache.spark.graphx.Graph@aggregateMessages[A]((EdgeContext[VD,ED,A])?Unit,(A,A)?A,TripletFields)(ClassTag[A]):VertexRDD[A])。該運算符將用戶定義的 `sendMsg` 函數應用于圖中的每個 _邊緣三元組_,然后使用該 `mergeMsg` 函數在其目標頂點聚合這些消息。
```
class Graph[VD, ED] {
def aggregateMessages[Msg: ClassTag](
sendMsg: EdgeContext[VD, ED, Msg] => Unit,
mergeMsg: (Msg, Msg) => Msg,
tripletFields: TripletFields = TripletFields.All)
: VertexRDD[Msg]
}
```
用戶定義的 `sendMsg` 函數接受一個 [`EdgeContext`](api/scala/index.html#org.apache.spark.graphx.EdgeContext),它將源和目標屬性以及 edge 屬性和函數 ([`sendToSrc`](api/scala/index.html#org.apache.spark.graphx.EdgeContext@sendToSrc(msg:A):Unit),和 [`sendToDst`](api/scala/index.html#org.apache.spark.graphx.EdgeContext@sendToDst(msg:A):Unit)) 一起發送到源和目標屬性。在 map-reduce 中,將 `sendMsg` 作為 _map_ 函數。用戶定義的 `mergeMsg` 函數需要兩個發往同一頂點的消息,并產生一條消息。想想 `mergeMsg` 是 map-reduce 中的 _reduce_ 函數。[`aggregateMessages`](api/scala/index.html#org.apache.spark.graphx.Graph@aggregateMessages[A]((EdgeContext[VD,ED,A])?Unit,(A,A)?A,TripletFields)(ClassTag[A]):VertexRDD[A]) 運算符返回一個 `VertexRDD[Msg]`,其中包含去往每個頂點的聚合消息(Msg類型)。沒有收到消息的頂點不包括在返回的 `VertexRDD`[VertexRDD](api/scala/index.html#org.apache.spark.graphx.VertexRDD) 中。
另外,[`aggregateMessages`](api/scala/index.html#org.apache.spark.graphx.Graph@aggregateMessages[A]((EdgeContext[VD,ED,A])?Unit,(A,A)?A,TripletFields)(ClassTag[A]):VertexRDD[A]) 采用一個可選的`tripletsFields`,它們指示在 [`EdgeContext`](api/scala/index.html#org.apache.spark.graphx.EdgeContext) 中訪問哪些數據(即源頂點屬性,而不是目標頂點屬性)。`tripletsFields` 定義的可能選項,[`TripletFields`](api/java/org/apache/spark/graphx/TripletFields.html) 默認值是 [`TripletFields.All`](api/java/org/apache/spark/graphx/TripletFields.html#All) 指示用戶定義的 `sendMsg` 函數可以訪問的任何字段[`EdgeContext`](api/scala/index.html#org.apache.spark.graphx.EdgeContext)。該 `tripletFields` 參數可用于通知 GraphX,只有部分 [`EdgeContext`](api/scala/index.html#org.apache.spark.graphx.EdgeContext) 需要允許 GraphX 選擇優化的連接策略。例如,如果我們計算每個用戶的追隨者的平均年齡,我們只需要源字段,因此我們將用于 [`TripletFields.Src`](api/java/org/apache/spark/graphx/TripletFields.html#Src) 表示我們只需要源字段。
> 在早期版本的 GraphX 中,我們使用字節碼檢測來推斷,[`TripletFields`](api/java/org/apache/spark/graphx/TripletFields.html) 但是我們發現字節碼檢查稍微不可靠,而是選擇了更明確的用戶控制。
在下面的例子中,我們使用 [`aggregateMessages`](api/scala/index.html#org.apache.spark.graphx.Graph@aggregateMessages[A]((EdgeContext[VD,ED,A])?Unit,(A,A)?A,TripletFields)(ClassTag[A]):VertexRDD[A]) 運算符來計算每個用戶的資深追蹤者的平均年齡。
```
import org.apache.spark.graphx.{Graph, VertexRDD}
import org.apache.spark.graphx.util.GraphGenerators
// Create a graph with "age" as the vertex property.
// Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
triplet => { // Map Function
if (triplet.srcAttr > triplet.dstAttr) {
// Send message to destination vertex containing counter and age
triplet.sendToDst(1, triplet.srcAttr)
}
},
// Add counter and age
(a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
// Divide total age by number of older followers to get average age of older followers
val avgAgeOfOlderFollowers: VertexRDD[Double] =
olderFollowers.mapValues( (id, value) =>
value match { case (count, totalAge) => totalAge / count } )
// Display the results
avgAgeOfOlderFollowers.collect.foreach(println(_))
```
<small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala" in the Spark repo.</small>
> `aggregateMessages` 當消息(和消息的總和)是恒定大小(例如:浮動和加法而不是列表和級聯)時,該操作最佳地執行。
### Map Reduce Triplets Transition Guide (Legacy)
在早期版本的 GraphX 中,鄰域聚合是使用 `mapReduceTriplets` 運算符完成的:
```
class Graph[VD, ED] {
def mapReduceTriplets[Msg](
map: EdgeTriplet[VD, ED] => Iterator[(VertexId, Msg)],
reduce: (Msg, Msg) => Msg)
: VertexRDD[Msg]
}
```
`mapReduceTriplets` 操作符接受用戶定義的映射函數,該函數應用于每個三元組,并且可以使用用戶定義的縮減函數來生成聚合的 _消息_。然而,我們發現返回的迭代器的用戶是昂貴的,并且它阻止了我們應用其他優化(例如:局部頂點重新編號)的能力。在 [`aggregateMessages`](api/scala/index.html#org.apache.spark.graphx.Graph@aggregateMessages[A]((EdgeContext[VD,ED,A])?Unit,(A,A)?A,TripletFields)(ClassTag[A]):VertexRDD[A]) 中,我們引入了 EdgeContext,它暴露了三元組字段,并且還顯示了向源和目標頂點發送消息的功能。此外,我們刪除了字節碼檢查,而是要求用戶指出三元組中實際需要哪些字段。
以下代碼塊使用 `mapReduceTriplets`:
```
val graph: Graph[Int, Float] = ...
def msgFun(triplet: Triplet[Int, Float]): Iterator[(Int, String)] = {
Iterator((triplet.dstId, "Hi"))
}
def reduceFun(a: String, b: String): String = a + " " + b
val result = graph.mapReduceTriplets[String](msgFun, reduceFun)
```
可以使用 `aggregateMessages`:?
```
val graph: Graph[Int, Float] = ...
def msgFun(triplet: EdgeContext[Int, Float, String]) {
triplet.sendToDst("Hi")
}
def reduceFun(a: String, b: String): String = a + " " + b
val result = graph.aggregateMessages[String](msgFun, reduceFun)
```
### 計算級別信息
常見的聚合任務是計算每個頂點的程度:與每個頂點相鄰的邊數。在有向圖的上下文中,通常需要知道每個頂點的度數,外部程度和總程度。本 [`GraphOps`](api/scala/index.html#org.apache.spark.graphx.GraphOps) 類包含運營商計算度數每個頂點的集合。例如,在下面我們將計算最大值,最大和最大級別:
```
// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
if (a._2 > b._2) a else b
}
// Compute the max degrees
val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)
```
### 收集相鄰點
在某些情況下,通過在每個頂點處收集相鄰頂點及其屬性可以更容易地表達計算。這可以使用 [`collectNeighborIds`](api/scala/index.html#org.apache.spark.graphx.GraphOps@collectNeighborIds(EdgeDirection):VertexRDD[Array[VertexId]]) 和 [`collectNeighbors`](api/scala/index.html#org.apache.spark.graphx.GraphOps@collectNeighbors(EdgeDirection):VertexRDD[Array[(VertexId,VD)]]) 運算符輕松實現。
```
class GraphOps[VD, ED] {
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ]
}
```
> 這些操作可能相當昂貴,因為它們重復信息并需要大量通信。如果可能,請直接使用 [`aggregateMessages`](api/scala/index.html#org.apache.spark.graphx.Graph@aggregateMessages[A]((EdgeContext[VD,ED,A])?Unit,(A,A)?A,TripletFields)(ClassTag[A]):VertexRDD[A]) 操作來表達相同的計算。
## Caching and Uncaching
在 Spark 中,默認情況下,RDD 不會保留在內存中。為了避免重新計算,在多次使用它們時,必須明確緩存它們(參見 [Spark Programming Guide](programming-guide.html#rdd-persistence))。GraphX 中的圖形表現方式相同。**當多次使用圖表時,請務必先調用 [`Graph.cache()`](api/scala/index.html#org.apache.spark.graphx.Graph@cache():Graph[VD,ED])。**
在迭代計算中,_uncaching_ 也可能是最佳性能所必需的。默認情況下,緩存的 RDD 和圖形將保留在內存中,直到內存壓力迫使它們以 LRU 順序逐出。對于迭代計算,來自先前迭代的中間結果將填滿緩存。雖然它們最終被驅逐出來,但存儲在內存中的不必要的數據會減慢垃圾收集速度。一旦不再需要中間結果,就會更有效率。這涉及每次迭代實現(緩存和強制)圖形或 RDD,取消所有其他數據集,并且僅在將來的迭代中使用實例化數據集。然而,由于圖形由多個 RDD 組成,所以很難將它們正確地分開。**對于迭代計算,我們建議使用 Pregel API,它可以正確地解析中間結果。**
# Pregel API
圖形是固有的遞歸數據結構,因為頂點的屬性取決于其鄰居的屬性,而鄰居的屬性又依賴于 _其_ 鄰居的屬性。因此,許多重要的圖算法迭代地重新計算每個頂點的屬性,直到達到一個固定點條件。已經提出了一系列圖并行抽象來表達這些迭代算法。GraphX 公開了 Pregel API 的變體。
在高層次上,GraphX 中的 Pregel 運算符是 _限制到圖形拓撲的_ 批量同步并行消息抽象。Pregel 操作符在一系列超級步驟中執行,其中頂點接收來自先前超級步驟的入站消息的 _總和_,計算頂點屬性的新值,然后在下一個超級步驟中將消息發送到相鄰頂點。與 Pregel 不同,消息作為邊緣三元組的函數并行計算,消息計算可以訪問源和目標頂點屬性。在超級步驟中跳過不接收消息的頂點。Pregel 運算符終止迭代,并在沒有剩余的消息時返回最終的圖。
> 注意,與更多的標準 Pregel 實現不同,GraphX 中的頂點只能將消息發送到相鄰頂點,并且使用用戶定義的消息傳遞功能并行完成消息構造。這些約束允許在 GraphX 中進行額外優化。
以下是 [Pregel 運算符](api/scala/index.html#org.apache.spark.graphx.GraphOps@pregel[A](A,Int,EdgeDirection)((VertexId,VD,A)?VD,(EdgeTriplet[VD,ED])?Iterator[(VertexId,A)],(A,A)?A)(ClassTag[A]):Graph[VD,ED]) 的類型簽名以及 其實現的 _草圖_(注意:為了避免由于長譜系鏈引起的 stackOverflowError,pregel 支持周期性檢查點圖和消息,將 “spark.graphx.pregel.checkpointInterval” 設置為正數,說10。并使用 SparkContext.setCheckpointDir(directory: String)) 設置 checkpoint 目錄):
```
class GraphOps[VD, ED] {
def pregel[A]
(initialMsg: A,
maxIter: Int = Int.MaxValue,
activeDir: EdgeDirection = EdgeDirection.Out)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
// Receive the initial message at each vertex
var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
// compute the messages
var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
var activeMessages = messages.count()
// Loop until no messages remain or maxIterations is achieved
var i = 0
while (activeMessages > 0 && i < maxIterations) {
// Receive the messages and update the vertices.
g = g.joinVertices(messages)(vprog).cache()
val oldMessages = messages
// Send new messages, skipping edges where neither side received a message. We must cache
// messages so it can be materialized on the next line, allowing us to uncache the previous
// iteration.
messages = GraphXUtils.mapReduceTriplets(
g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
activeMessages = messages.count()
i += 1
}
g
}
}
```
請注意,Pregel 需要兩個參數列表(即:`graph.pregel(list1)(list2)`。第一個參數列表包含配置參數,包括初始消息,最大迭代次數以及發送消息的邊緣方向(默認情況下為邊緣)。第二個參數列表包含用于接收消息(頂點程序 `vprog`),計算消息(`sendMsg`)和組合消息的用戶定義函數 `mergeMsg`。
在以下示例中,我們可以使用 Pregel 運算符來表達單源最短路徑的計算。
```
import org.apache.spark.graphx.{Graph, VertexId}
import org.apache.spark.graphx.util.GraphGenerators
// A graph with edge attributes containing distances
val graph: Graph[Long, Double] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 42 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _) =>
if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist), // Vertex Program
triplet => { // Send Message
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a, b) => math.min(a, b) // Merge Message
)
println(sssp.vertices.collect.mkString("\n"))
```
<small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/SSSPExample.scala" in the Spark repo.</small>
# Graph 建造者
GraphX 提供了從 RDD 或磁盤上的頂點和邊的集合構建圖形的幾種方法。默認情況下,圖形構建器都不會重新分配圖形邊; 相反,邊緣保留在其默認分區(例如 HDFS 中的原始塊)中。[`Graph.groupEdges`](api/scala/index.html#org.apache.spark.graphx.Graph@groupEdges((ED,ED)?ED):Graph[VD,ED]) 需要重新分區圖,因為它假定相同的邊將被共同定位在同一分區上,因此您必須在調用 [`Graph.partitionBy`](api/scala/index.html#org.apache.spark.graphx.Graph@partitionBy(PartitionStrategy):Graph[VD,ED]) 之前調用 `groupEdges`。
```
object GraphLoader {
def edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
minEdgePartitions: Int = 1)
: Graph[Int, Int]
}
```
[`GraphLoader.edgeListFile`](api/scala/index.html#org.apache.spark.graphx.GraphLoader$@edgeListFile(SparkContext,String,Boolean,Int):Graph[Int,Int]) 提供了從磁盤邊緣列表中加載圖形的方法。它解析以下形式的(源頂點 ID,目標頂點 ID)對的鄰接列表,跳過以下開始的注釋行 `#`:
```
# This is a comment
2 1
4 1
1 2
```
它 `Graph` 從指定的邊緣創建一個,自動創建邊緣提到的任何頂點。所有頂點和邊緣屬性默認為1\. `canonicalOrientation` 參數允許在正方向(`srcId < dstId`)重新定向邊,這是[連接的組件](api/scala/index.html#org.apache.spark.graphx.lib.ConnectedComponents$)算法所要求的。該 `minEdgePartitions` 參數指定要生成的邊緣分區的最小數量;如果例如 HDFS 文件具有更多塊,則可能存在比指定更多的邊緣分區。
```
object Graph {
def apply[VD, ED](
vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD = null)
: Graph[VD, ED]
def fromEdges[VD, ED](
edges: RDD[Edge[ED]],
defaultValue: VD): Graph[VD, ED]
def fromEdgeTuples[VD](
rawEdges: RDD[(VertexId, VertexId)],
defaultValue: VD,
uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int]
}
```
[`Graph.apply`](api/scala/index.html#org.apache.spark.graphx.Graph$@apply[VD,ED](RDD[(VertexId,VD)],RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED]) 允許從頂點和邊緣的 RDD 創建圖形。重復的頂點被任意挑選,并且邊緣 RDD 中找到的頂點,而不是頂點 RDD 被分配了默認屬性。
[`Graph.fromEdges`](api/scala/index.html#org.apache.spark.graphx.Graph$@fromEdges[VD,ED](RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED]) 允許僅從 RDD 的邊緣創建圖形,自動創建邊緣提到的任何頂點并將其分配給默認值。
[`Graph.fromEdgeTuples`](api/scala/index.html#org.apache.spark.graphx.Graph$@fromEdgeTuples[VD](RDD[(VertexId,VertexId)],VD,Option[PartitionStrategy])(ClassTag[VD]):Graph[VD,Int]) 允許僅從邊緣元組的 RDD 創建圖形,將邊緣分配為值1,并自動創建邊緣提到的任何頂點并將其分配給默認值。它還支持重復數據刪除邊緣; 重復數據刪除,將`某些` [`PartitionStrategy`](api/scala/index.html#org.apache.spark.graphx.PartitionStrategy) 作為 `uniqueEdges` 參數傳遞(例如:`uniqueEdges = Some(PartitionStrategy.RandomVertexCut)`)。分區策略是必須的,以便在相同的分區上共同使用相同的邊,以便可以進行重復數據刪除。
# Vertex and Edge RDDs
GraphX 公開 `RDD` 了圖中存儲的頂點和邊的視圖。然而,由于 GraphX 在優化的數據結構中維護頂點和邊,并且這些數據結構提供了附加功能,所以頂點和邊分別作為[`VertexRDD`](api/scala/index.html#org.apache.spark.graphx.VertexRDD) 和 [`EdgeRDD`](api/scala/index.html#org.apache.spark.graphx.EdgeRDD) 返回。在本節中,我們將回顧一些這些類型中的其他有用功能。請注意,這只是一個不完整的列表,請參閱API文檔中的正式操作列表。
## VertexRDDs
該 `VertexRDD[A]` 擴展 `RDD[(VertexId, A)]` 并增加了額外的限制,每個 `VertexId` 只發生一次。此外,`VertexRDD[A]` 表示一組頂點,每個頂點的屬性類型A。在內部,這是通過將頂點屬性存儲在可重用的散列圖數據結構中來實現的。因此,如果兩個 `VertexRDD` 派生自相同的基礎 [`VertexRDD`](api/scala/index.html#org.apache.spark.graphx.VertexRDD)(例如:`filter`或 `mapValues`),則可以在不使用散列評估的情況下連續連接。為了利用這個索引的數據結構,[`VertexRDD`](api/scala/index.html#org.apache.spark.graphx.VertexRDD) 公開了以下附加功能:
```
class VertexRDD[VD] extends RDD[(VertexId, VD)] {
// Filter the vertex set but preserves the internal index
def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]
// Transform the values without changing the ids (preserves the internal index)
def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]
// Show only vertices unique to this set based on their VertexId's
def minus(other: RDD[(VertexId, VD)])
// Remove vertices from this set that appear in the other set
def diff(other: VertexRDD[VD]): VertexRDD[VD]
// Join operators that take advantage of the internal indexing to accelerate joins (substantially)
def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
// Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
}
```
請注意,例如,`filter` 運算符 如何返回 [`VertexRDD`](api/scala/index.html#org.apache.spark.graphx.VertexRDD)。過濾器實際上是通過 `BitSet` 使用索引重新實現的,并保留與其他`VertexRDD` 進行快速連接的能力。同樣,`mapValues` 運算符不允許 `map` 功能改變,`VertexId` 從而使相同的 `HashMap` 數據結構能夠被重用。無論是 `leftJoin` 和 `innerJoin` 能夠連接兩個時識別 `VertexRDD` 來自同一來源的小號 `HashMap` 和落實線性掃描,而不是昂貴的點查找的加入。
`aggregateUsingIndex` 運算符對于從 `RDD[(VertexId, A)]` 有效構建新的 [`VertexRDD`](api/scala/index.html#org.apache.spark.graphx.VertexRDD) 非常有用。在概念上,如果我在一組頂點上構造了一個 `VertexRDD[B]`,這是一些 `RDD[(VertexId, A)]` 中的頂點的 _超集_,那么我可以重用索引來聚合然后再索引 `RDD[(VertexId, A)]`。例如:
```
val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
// There should be 200 entries in rddB
rddB.count
val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
// There should be 100 entries in setB
setB.count
// Joining A and B should now be fast!
val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)
```
## EdgeRDDs
該 `EdgeRDD[ED]` 擴展 `RDD[Edge[ED]]` 使用其中定義的各種分區策略之一來組織塊中的邊 [`PartitionStrategy`](api/scala/index.html#org.apache.spark.graphx.PartitionStrategy)。在每個分區中,邊緣屬性和鄰接結構分別存儲,可以在更改屬性值時進行最大限度的重用。
`EdgeRDD`[EdgeRDD](api/scala/index.html#org.apache.spark.graphx.EdgeRDD) 公開的三個附加功能是:
```
// Transform the edge attributes while preserving the structure
def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
// Reverse the edges reusing both attributes and structure
def reverse: EdgeRDD[ED]
// Join two `EdgeRDD`s partitioned using the same partitioning strategy.
def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
```
在大多數應用中,我們發現在 `EdgeRDD`[EdgeRDD](api/scala/index.html#org.apache.spark.graphx.EdgeRDD) 上的操作是通過圖形運算符完成的,或者依賴基 `RDD` 類中定義的操作。
# 優化表示
雖然在分布式圖形的 GraphX 表示中使用的優化的詳細描述超出了本指南的范圍,但一些高級理解可能有助于可擴展算法的設計以及 API 的最佳使用。GraphX 采用頂點切分方式進行分布式圖分割:

GraphX 不是沿著邊沿分割圖形,而是沿著頂點分割圖形,這可以減少通信和存儲開銷。在邏輯上,這對應于將邊緣分配給機器并允許頂點跨越多臺機器。分配邊緣的確切方法取決于 [`PartitionStrategy`](api/scala/index.html#org.apache.spark.graphx.PartitionStrategy) 各種啟發式的幾種折衷。用戶可以通過與 [`Graph.partitionBy`](api/scala/index.html#org.apache.spark.graphx.Graph@partitionBy(PartitionStrategy):Graph[VD,ED]) 運算符重新分區圖來選擇不同的策略。默認分區策略是使用圖形構建中提供的邊的初始分區。然而,用戶可以輕松切換到 GraphX 中包含的 2D 劃分或其他啟發式算法。

一旦邊緣被劃分,高效的圖形并行計算的關鍵挑戰就是有效地將頂點屬性與邊緣連接起來。因為真實世界的圖形通常具有比頂點更多的邊緣,所以我們將頂點屬性移動到邊緣。因為不是所有的分區都將包含鄰近的所有頂點的邊緣,我們內部維護標識在哪里執行所需的連接像操作時,廣播頂點的路由表 `triplets` 和 `aggregateMessages`。
# Graph 算法
GraphX 包括一組簡化分析任務的圖算法。該算法被包含在 `org.apache.spark.graphx.lib` 包可直接作為方法來訪問 `Graph` 通過 [`GraphOps`](api/scala/index.html#org.apache.spark.graphx.GraphOps)。本節介紹算法及其使用方法。
## PageRank
PageRank 測量在圖中每個頂點的重要性,假設從邊緣 _u_ 到 _v_ 表示的認可 _v_ 通過的重要性 _u_。例如,如果 Twitter 用戶遵循許多其他用戶,則用戶將被高度排名。
GraphX 附帶了 PageRank 的靜態和動態實現方法作[`PageRank 對象`](api/scala/index.html#org.apache.spark.graphx.lib.PageRank$)上的方法。靜態 PageRank 運行固定次數的迭代,而動態 PageRank 運行直到排列收斂(即,停止改變超過指定的公差)。[`GraphOps`](api/scala/index.html#org.apache.spark.graphx.GraphOps) 允許直接調用這些算法作為方法 `Graph`。
GraphX還包括一個可以運行 PageRank 的社交網絡數據集示例。給出了一組用戶 `data/graphx/users.txt`,并給出了一組用戶之間的關系 `data/graphx/followers.txt`。我們計算每個用戶的 PageRank 如下:
```
import org.apache.spark.graphx.GraphLoader
// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
case (id, (username, rank)) => (username, rank)
}
// Print the result
println(ranksByUsername.collect().mkString("\n"))
```
<small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/PageRankExample.scala" in the Spark repo.</small>
## 連接組件
連接的組件算法將圖中每個連接的組件與其最低編號頂點的ID進行標記。例如,在社交網絡中,連接的組件可以近似群集。GraphX包含[`ConnectedComponents object`](api/scala/index.html#org.apache.spark.graphx.lib.ConnectedComponents$) 中算法的實現,我們從 [PageRank 部分](#pagerank) 計算示例社交網絡數據集的連接組件如下:
```
import org.apache.spark.graphx.GraphLoader
// Load the graph as in the PageRank example
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Find the connected components
val cc = graph.connectedComponents().vertices
// Join the connected components with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val ccByUsername = users.join(cc).map {
case (id, (username, cc)) => (username, cc)
}
// Print the result
println(ccByUsername.collect().mkString("\n"))
```
<small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala" in the Spark repo.</small>
## Triangle 計數
頂點是三角形的一部分,當它有兩個相鄰的頂點之間有一個邊。GraphX 在 [`TriangleCount 對象`](api/scala/index.html#org.apache.spark.graphx.lib.TriangleCount$) 中實現一個三角計數算法,用于確定通過每個頂點的三角形數量,提供聚類度量。我們從 [PageRank 部分](#pagerank) 計算社交網絡數據集的三角形數。_需要注意的是 `TriangleCount` 邊緣要處于規范方向(`srcId < dstId`),而圖形要使用 [`Graph.partitionBy`](api/scala/index.html#org.apache.spark.graphx.Graph@partitionBy(PartitionStrategy):Graph[VD,ED])。_
```
import org.apache.spark.graphx.{GraphLoader, PartitionStrategy}
// Load the edges in canonical order and partition the graph for triangle count
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt", true)
.partitionBy(PartitionStrategy.RandomVertexCut)
// Find the triangle count for each vertex
val triCounts = graph.triangleCount().vertices
// Join the triangle counts with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
(username, tc)
}
// Print the result
println(triCountByUsername.collect().mkString("\n"))
```
<small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala" in the Spark repo.</small>
# 示例
假設我想從一些文本文件中構建圖形,將圖形限制為重要的關系和用戶,在 sub-graph 上運行 page-rank,然后返回與頂級用戶關聯的屬性。我可以用 GraphX 在幾行內完成所有這些:
```
import org.apache.spark.graphx.GraphLoader
// Load my user data and parse into tuples of user id and attribute list
val users = (sc.textFile("data/graphx/users.txt")
.map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))
// Parse the edge data which is already in userId -> userId format
val followerGraph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Attach the user attributes
val graph = followerGraph.outerJoinVertices(users) {
case (uid, deg, Some(attrList)) => attrList
// Some users may not have attributes so we set them as empty
case (uid, deg, None) => Array.empty[String]
}
// Restrict the graph to users with usernames and names
val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)
// Compute the PageRank
val pagerankGraph = subgraph.pageRank(0.001)
// Get the attributes of the top pagerank users
val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
case (uid, attrList, Some(pr)) => (pr, attrList.toList)
case (uid, attrList, None) => (0.0, attrList.toList)
}
println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))
```
<small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala" in the Spark repo.</small>
- Spark 概述
- 編程指南
- 快速入門
- Spark 編程指南
- 構建在 Spark 之上的模塊
- Spark Streaming 編程指南
- Spark SQL, DataFrames and Datasets Guide
- MLlib
- GraphX Programming Guide
- API 文檔
- 部署指南
- 集群模式概述
- Submitting Applications
- 部署模式
- Spark Standalone Mode
- 在 Mesos 上運行 Spark
- Running Spark on YARN
- 其它
- 更多
- Spark 配置
- Monitoring and Instrumentation
- Tuning Spark
- 作業調度
- Spark 安全
- 硬件配置
- Accessing OpenStack Swift from Spark
- 構建 Spark
- 其它
- 外部資源
- Spark RDD(Resilient Distributed Datasets)論文
- 翻譯進度