<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                [TOC] # 1. 圖常用算子 ![](https://img.kancloud.cn/fc/92/fc92b71f52c778519fbd1cbb3b2b0fbc_1009x396.png) ```scala import org.apache.spark.SparkContext import org.apache.spark.graphx.{Edge, Graph, VertexId} import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession object GraphxFun { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .master("local[4]") .appName(this.getClass.getName) .getOrCreate() val sc: SparkContext = spark.sparkContext import spark.implicits._ // 1. 構建點集合 val users: RDD[(Long, (String, Int))] = sc.parallelize(Array( (1L, ("Alice", 28)), (2L, ("Bob", 27)), (3L, ("Charlie", 65)), (4L, ("David", 42)), (5L, ("Ed", 55)), (6L, ("Fran", 50)) )) // 2. 構建邊集合 val cntCall: RDD[Edge[Int]] = sc.parallelize(Array( Edge(2L, 1L, 7), Edge(2L, 4L, 2), Edge(3L, 2L, 4), Edge(3L, 6L, 3), Edge(4L, 1L, 1), Edge(5L, 2L, 2), Edge(5L, 3L, 8), Edge(5L, 6L, 3) )) // 3. 構圖 val graph: Graph[(String, Int), Int] = Graph(users, cntCall) graph.triplets.foreach(x => println(x.toString())) // ((5,(Ed,55)),(3,(Charlie,65)),8) // ((2,(Bob,27)),(1,(Alice,28)),7) // ((4,(David,42)),(1,(Alice,28)),1) // ((3,(Charlie,65)),(2,(Bob,27)),4) // ((5,(Ed,55)),(2,(Bob,27)),2) // ((2,(Bob,27)),(4,(David,42)),2) // ((5,(Ed,55)),(6,(Fran,50)),3) // ((3,(Charlie,65)),(6,(Fran,50)),3) println() /** ********** mapXXX算子 ********************/ // 通過遍歷該圖的所有頂點,生成一個新的圖 // 可以改變頂點的attr,并生成一個新的Graph返回,但頂點的Id改變不了 val graph2: Graph[(VertexId, String), Int] = graph.mapVertices((vertexId, attr) => (vertexId * 100, attr._1)) // 也可以采用如下寫法,結果是一樣的 // val graph2: Graph[(VertexId, String), Int] = graph.mapVertices({ case (vertexId, (name, age)) => (vertexId * 100, name) }) graph2.triplets.foreach(x => println(x.toString())) // ((2,(200,Bob)),(1,(100,Alice)),7) // ((2,(200,Bob)),(4,(400,David)),2) // ((5,(500,Ed)),(3,(300,Charlie)),8) // ((3,(300,Charlie)),(2,(200,Bob)),4) // ((4,(400,David)),(1,(100,Alice)),1) // ((5,(500,Ed)),(2,(200,Bob)),2) // ((5,(500,Ed)),(6,(600,Fran)),3) // ((3,(300,Charlie)),(6,(600,Fran)),3) println() // 遍歷該圖的所有邊,生成一個新的圖 // 只能改變邊的attr,點什么都不能改變 val graph3: Graph[(String, Int), Int] = graph.mapEdges(e => e.attr * 100) graph3.triplets.foreach(x => println(x.toString())) // ((2,(Bob,27)),(1,(Alice,28)),700) // ((2,(Bob,27)),(4,(David,42)),200) // ((3,(Charlie,65)),(2,(Bob,27)),400) // ((4,(David,42)),(1,(Alice,28)),100) // ((5,(Ed,55)),(2,(Bob,27)),200) // ((3,(Charlie,65)),(6,(Fran,50)),300) // ((5,(Ed,55)),(3,(Charlie,65)),800) // ((5,(Ed,55)),(6,(Fran,50)),300) println() /** ************ 結構算子 ***************/ // 將該圖所有邊的方向反轉,并生成新的Graph val graph4: Graph[(String, Int), Int] = graph.reverse graph4.triplets.foreach(println) // ((1,(Alice,28)),(2,(Bob,27)),7) // ((2,(Bob,27)),(3,(Charlie,65)),4) // ((3,(Charlie,65)),(5,(Ed,55)),8) // ((1,(Alice,28)),(4,(David,42)),1) // ((6,(Fran,50)),(5,(Ed,55)),3) // ((6,(Fran,50)),(3,(Charlie,65)),3) // ((4,(David,42)),(2,(Bob,27)),2) // ((2,(Bob,27)),(5,(Ed,55)),2) println() // 生成滿足頂點條件的子圖 val graph6: Graph[(String, Int), Int] = graph.subgraph(vpred = (id, attr) => attr._2 < 65) graph6.triplets.foreach(println) // ((5,(Ed,55)),(6,(Fran,50)),3) // ((2,(Bob,27)),(1,(Alice,28)),7) // ((2,(Bob,27)),(4,(David,42)),2) // ((4,(David,42)),(1,(Alice,28)),1) // ((5,(Ed,55)),(2,(Bob,27)),2) println() /** ************* join算子 ****************/ // 構建頂點集合 val vertices: RDD[(VertexId, String)] = sc.parallelize(Array((1L, "kgc.cn"), (2L, "baidu.com"), (3L, "google.com"))) // 內連接,根據頂點Id相等進行join,并生成連接后的圖 // id: 圖中頂點的Id與vertices頂點Id的交集 // attr: 圖中頂點的attr // company: vertices頂點的attr val graph7: Graph[(String, Int), Int] = graph.joinVertices(vertices)((id, attr, company) => (id * 100 + "@" + company, attr._2)) graph7.triplets.foreach(println) // ((3,(300@google.com,65)),(2,(200@baidu.com,27)),4) // ((3,(300@google.com,65)),(6,(Fran,50)),3) // ((5,(Ed,55)),(3,(300@google.com,65)),8) // ((5,(Ed,55)),(6,(Fran,50)),3) // ((4,(David,42)),(1,(100@kgc.cn,28)),1) // ((5,(Ed,55)),(2,(200@baidu.com,27)),2) // ((2,(200@baidu.com,27)),(1,(100@kgc.cn,28)),7) // ((2,(200@baidu.com,27)),(4,(David,42)),2) println() // 外連接, // Id: 圖頂點Id和vertices頂點Id的并集 // attr: 圖頂點的attr // company: vertices頂點attr,圖頂點Id不等于vertices中的頂點Id,則company默認為None val graph8: Graph[(String, Int), Int] = graph.outerJoinVertices(vertices)((id, attr, company) => (id * 100 + "#" + company, attr._2)) graph8.triplets.foreach(println) // ((4,(400#None,42)),(1,(100#Some(kgc.cn),28)),1) // ((5,(500#None,55)),(2,(200#Some(baidu.com),27)),2) // ((5,(500#None,55)),(3,(300#Some(google.com),65)),8) // ((5,(500#None,55)),(6,(600#None,50)),3) // ((2,(200#Some(baidu.com),27)),(1,(100#Some(kgc.cn),28)),7) // ((2,(200#Some(baidu.com),27)),(4,(400#None,42)),2) // ((3,(300#Some(google.com),65)),(2,(200#Some(baidu.com),27)),4) // ((3,(300#Some(google.com),65)),(6,(600#None,50)),3) } } ``` 下面舉兩個圖算子應用的案例,幫助理解算子用途。 <br/> # 2. 案例一:計算用戶的粉絲數量 ![](https://img.kancloud.cn/fc/92/fc92b71f52c778519fbd1cbb3b2b0fbc_1009x396.png) 頂點的入度就是這個用戶的粉絲數量。 ```scala import org.apache.spark.SparkContext import org.apache.spark.graphx.{Edge, Graph} import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession object ComputeFanNum { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .master("local[4]") .appName(this.getClass.getName) .getOrCreate() val sc: SparkContext = spark.sparkContext import spark.implicits._ // 1. 構造頂點RDD val users: RDD[(Long, (String, Int))] = sc.parallelize(Array( (1L, ("Alice", 28)), (2L, ("Bob", 27)), (3L, ("Charlie", 65)), (4L, ("David", 55)), (5L, ("Ed", 55)), (6L, ("Fran", 50)) )) // 2. 構造邊RDD val cntCall: RDD[Edge[Int]] = sc.parallelize(Array( Edge(2L, 1L, 7), Edge(2L, 4L, 2), Edge(3L, 2L, 4), Edge(3L, 6L, 3), Edge(4L, 1L, 1), Edge(5L, 2L, 2), Edge(5L, 3L, 8), Edge(5L, 6L, 3) )) // 3. 構圖 val graph: Graph[(String, Int), Int] = Graph(users, cntCall) case class User(name: String, age: Int, inDeg: Int, outDeg: Int) // 將頂點attr轉換為User val initUserGraph: Graph[User, Int] = graph.mapVertices({ case (id, (name, age)) => User(name, age, 0, 0) }) val userGraph: Graph[User, Int] = initUserGraph.outerJoinVertices(initUserGraph.inDegrees)({ case (id, attr, inDegOpt) => User(attr.name, attr.age, inDegOpt.getOrElse(0), 0) }).outerJoinVertices(initUserGraph.outDegrees)({ case (id, attr, outDegOpt) => User(attr.name, attr.age, attr.inDeg, outDegOpt.getOrElse(0)) }) userGraph.vertices.foreach(x => println(s"用戶${x._1}是${x._2.name}他擁有${x._2.inDeg}個粉絲.")) // 用戶4是David他擁有1個粉絲. // 用戶1是Alice他擁有2個粉絲. // 用戶6是Fran他擁有2個粉絲. // 用戶3是Charlie他擁有1個粉絲. // 用戶2是Bob他擁有2個粉絲. // 用戶5是Ed他擁有0個粉絲. } } ``` <br/> # 3. 案例二:誰是網絡紅人 (1)案例數據格式 ```txt ((被跟隨者), (跟隨者)) ((User47,86566510),(User83,15647839)) ((User47,86566510),(User42,197134784)) ((User89,74286565),(User49,19315174)) ((User16,22679419),(User69,45705189)) ``` (2)案例要求 創建圖并計算每個用戶的粉絲數量,找出誰才是網絡紅色。 (3)案例代碼 ```scala import org.apache.spark.SparkContext import org.apache.spark.graphx.{Edge, Graph} import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import scala.util.matching.Regex object InternetCelebrityGraphx { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .master("local[4]") .appName(this.getClass.getName) .getOrCreate() val sc: SparkContext = spark.sparkContext import spark.implicits._ // 匹配((User47,86566510),(User83,15647839))的正則表達式 val pattern: Regex = """\(\((User\d+,\d+)\),\((User\d+,\d+)\)\)""".r // 加載數據文件,分割出(用戶名, 用戶Id) val twitters: RDD[(Array[String], Array[String])] = sc.textFile("file:///E:\\hadoop\\input\\twitter_graph_data.txt") .map(line => line match { case pattern(followee, follower) => (Some(followee), Some(follower)) case _ => (None, None) // 將None值過濾 }).filter(x => x._1 != None && x._2 != None) // 將(User47,86566510)分割出Array(用戶名, Id) .map(x => (x._1.get.split(","), x._2.get.split(","))) twitters.toDF.show(3) // +------------------+-------------------+ // | _1| _2| // +------------------+-------------------+ // |[User47, 86566510]| [User83, 15647839]| // |[User47, 86566510]|[User42, 197134784]| // |[User89, 74286565]| [User49, 19315174]| // +------------------+-------------------+ // 構建頂點RDD val verts: RDD[(Long, String)] = twitters.flatMap(x => Array((x._1(1).toLong, x._1(0)), (x._2(1).toLong, x._2(0)))).distinct() verts.toDF.show(3) // +--------+------+ // | _1| _2| // +--------+------+ // |38521400|User85| // |14676022| User0| // |24741685|User87| // +--------+------+ // 構建邊RDD val edges: RDD[Edge[String]] = twitters.map(x => Edge(x._2(1).toLong, x._1(1).toLong, "follow")) // 構建圖有可能會出現一種情況, 在邊集合中出現的點在點集合中不存在,所以提供一個默認值 "" val graph: Graph[String, String] = Graph(verts, edges, "") // 誰是網絡紅人,就看哪個頂點的入口多,按照降序排序 graph.inDegrees.repartition(1).sortBy(x => x._2, false).toDF.show(5) // +---------+---+ // | _1| _2| // +---------+---+ // | 36851222| 56| // |123004655| 56| // | 59804598| 54| // | 63644892| 46| // | 14444530| 42| // +---------+---+ } } ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看