# 頂點和邊RDDs
GraphX暴露保存在圖中的頂點和邊的RDD。然而,因為GraphX包含的頂點和邊擁有優化的數據結構,這些數據結構提供了額外的功能。頂點和邊分別返回`VertexRDD`和`EdgeRDD`。這一章我們將學習它們的一些有用的功能。
### VertexRDDs
`VertexRDD[A]`繼承自`RDD[(VertexID, A)]`并且添加了額外的限制,那就是每個`VertexID`只能出現一次。此外,`VertexRDD[A]`代表了一組屬性類型為A的頂點。在內部,這通過保存頂點屬性到一個可重復使用的hash-map數據結構來獲得。所以,如果兩個`VertexRDDs`從相同的基本`VertexRDD`獲得(如通過filter或者mapValues),它們能夠在固定的時間內連接而不需要hash評價。為了利用這個索引數據結構,`VertexRDD`暴露了一下附加的功能:
~~~
class VertexRDD[VD] extends RDD[(VertexID, VD)] {
// Filter the vertex set but preserves the internal index
def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]
// Transform the values without changing the ids (preserves the internal index)
def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]
// Remove vertices from this set that appear in the other set
def diff(other: VertexRDD[VD]): VertexRDD[VD]
// Join operators that take advantage of the internal indexing to accelerate joins (substantially)
def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
// Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
}
~~~
舉個例子,`filter`操作如何返回一個VertexRDD。過濾器實際使用一個`BitSet`實現,因此它能夠重用索引以及保留和其它`VertexRDDs`做連接時速度快的能力。同樣的,`mapValues`操作不允許`map`函數改變`VertexID`,因此可以保證相同的`HashMap`數據結構能夠重用。當連接兩個從相同的`hashmap`獲取的VertexRDDs和使用線性掃描而不是昂貴的點查找實現連接操作時,`leftJoin`和`innerJoin`都能夠使用。
從一個`RDD[(VertexID, A)]`高效地構建一個新的`VertexRDD`,`aggregateUsingIndex`操作是有用的。概念上,如果我通過一組頂點構造了一個`VertexRDD[B]`,而`VertexRDD[B]`是一些`RDD[(VertexID, A)]`中頂點的超集,那么我們就可以在聚合以及隨后索引`RDD[(VertexID, A)]`中重用索引。例如:
~~~
val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
// There should be 200 entries in rddB
rddB.count
val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
// There should be 100 entries in setB
setB.count
// Joining A and B should now be fast!
val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)
~~~
### EdgeRDDs
`EdgeRDD[ED]`繼承自`RDD[Edge[ED]]`,使用定義在[PartitionStrategy](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.PartitionStrategy)的各種分區策略中的一個在塊分區中組織邊。在每個分區中,邊屬性和相鄰結構被分別保存,當屬性值改變時,它們可以最大化的重用。
`EdgeRDD`暴露了三個額外的函數
~~~
// Transform the edge attributes while preserving the structure
def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
// Revere the edges reusing both attributes and structure
def reverse: EdgeRDD[ED]
// Join two `EdgeRDD`s partitioned using the same partitioning strategy.
def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
~~~
在大多數的應用中,我們發現,EdgeRDD操作可以通過圖操作者(graph operators)或者定義在基本RDD中的操作來完成。
- Introduction
- 快速上手
- Spark Shell
- 獨立應用程序
- 開始翻滾吧!
- 編程指南
- 引入 Spark
- 初始化 Spark
- Spark RDDs
- 并行集合
- 外部數據集
- RDD 操作
- RDD持久化
- 共享變量
- 從這里開始
- Spark Streaming
- 一個快速的例子
- 基本概念
- 關聯
- 初始化StreamingContext
- 離散流
- 輸入DStreams
- DStream中的轉換
- DStream的輸出操作
- 緩存或持久化
- Checkpointing
- 部署應用程序
- 監控應用程序
- 性能調優
- 減少批數據的執行時間
- 設置正確的批容量
- 內存調優
- 容錯語義
- Spark SQL
- 開始
- 數據源
- RDDs
- parquet文件
- JSON數據集
- Hive表
- 性能調優
- 其它SQL接口
- 編寫語言集成(Language-Integrated)的相關查詢
- Spark SQL數據類型
- GraphX編程指南
- 開始
- 屬性圖
- 圖操作符
- Pregel API
- 圖構造者
- 頂點和邊RDDs
- 圖算法
- 例子
- 提交應用程序
- 獨立運行Spark
- 在yarn上運行Spark
- Spark配置