1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.graphx.lib 19 20import org.apache.spark.SparkFunSuite 21import org.apache.spark.graphx._ 22import org.apache.spark.graphx.util.GraphGenerators 23 24 25object GridPageRank { 26 def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double): Seq[(VertexId, Double)] = { 27 val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int]) 28 val outDegree = Array.fill(nRows * nCols)(0) 29 // Convert row column address into vertex ids (row major order) 30 def sub2ind(r: Int, c: Int): Int = r * nCols + c 31 // Make the grid graph 32 for (r <- 0 until nRows; c <- 0 until nCols) { 33 val ind = sub2ind(r, c) 34 if (r + 1 < nRows) { 35 outDegree(ind) += 1 36 inNbrs(sub2ind(r + 1, c)) += ind 37 } 38 if (c + 1 < nCols) { 39 outDegree(ind) += 1 40 inNbrs(sub2ind(r, c + 1)) += ind 41 } 42 } 43 // compute the pagerank 44 var pr = Array.fill(nRows * nCols)(resetProb) 45 for (iter <- 0 until nIter) { 46 val oldPr = pr 47 pr = new Array[Double](nRows * nCols) 48 for (ind <- 0 until (nRows * nCols)) { 49 pr(ind) = resetProb + (1.0 - resetProb) * 50 inNbrs(ind).map( nbr => oldPr(nbr) / outDegree(nbr)).sum 51 } 52 } 53 (0L until (nRows * nCols)).zip(pr) 54 } 55 56} 57 58 59class PageRankSuite extends SparkFunSuite with LocalSparkContext { 60 61 def compareRanks(a: VertexRDD[Double], b: VertexRDD[Double]): Double = { 62 a.leftJoin(b) { case (id, a, bOpt) => (a - bOpt.getOrElse(0.0)) * (a - bOpt.getOrElse(0.0)) } 63 .map { case (id, error) => error }.sum() 64 } 65 66 test("Star PageRank") { 67 withSpark { sc => 68 val nVertices = 100 69 val starGraph = GraphGenerators.starGraph(sc, nVertices).cache() 70 val resetProb = 0.15 71 val errorTol = 1.0e-5 72 73 val staticRanks1 = starGraph.staticPageRank(numIter = 1, resetProb).vertices 74 val staticRanks2 = starGraph.staticPageRank(numIter = 2, resetProb).vertices.cache() 75 76 // Static PageRank should only take 2 iterations to converge 77 val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) => 78 if (pr1 != pr2) 1 else 0 79 }.map { case (vid, test) => test }.sum() 80 assert(notMatching === 0) 81 82 val staticErrors = staticRanks2.map { case (vid, pr) => 83 val p = math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) 84 val correct = (vid > 0 && pr == resetProb) || (vid == 0L && p < 1.0E-5) 85 if (!correct) 1 else 0 86 } 87 assert(staticErrors.sum === 0) 88 89 val dynamicRanks = starGraph.pageRank(0, resetProb).vertices.cache() 90 assert(compareRanks(staticRanks2, dynamicRanks) < errorTol) 91 } 92 } // end of test Star PageRank 93 94 test("Star PersonalPageRank") { 95 withSpark { sc => 96 val nVertices = 100 97 val starGraph = GraphGenerators.starGraph(sc, nVertices).cache() 98 val resetProb = 0.15 99 val errorTol = 1.0e-5 100 101 val staticRanks1 = starGraph.staticPersonalizedPageRank(0, numIter = 1, resetProb).vertices 102 val staticRanks2 = starGraph.staticPersonalizedPageRank(0, numIter = 2, resetProb) 103 .vertices.cache() 104 105 // Static PageRank should only take 2 iterations to converge 106 val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) => 107 if (pr1 != pr2) 1 else 0 108 }.map { case (vid, test) => test }.sum 109 assert(notMatching === 0) 110 111 val staticErrors = staticRanks2.map { case (vid, pr) => 112 val correct = (vid > 0 && pr == 0.0) || 113 (vid == 0 && pr == resetProb) 114 if (!correct) 1 else 0 115 } 116 assert(staticErrors.sum === 0) 117 118 val dynamicRanks = starGraph.personalizedPageRank(0, 0, resetProb).vertices.cache() 119 assert(compareRanks(staticRanks2, dynamicRanks) < errorTol) 120 121 val parallelStaticRanks1 = starGraph 122 .staticParallelPersonalizedPageRank(Array(0), 1, resetProb).mapVertices { 123 case (vertexId, vector) => vector(0) 124 }.vertices.cache() 125 assert(compareRanks(staticRanks1, parallelStaticRanks1) < errorTol) 126 127 val parallelStaticRanks2 = starGraph 128 .staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices { 129 case (vertexId, vector) => vector(0) 130 }.vertices.cache() 131 assert(compareRanks(staticRanks2, parallelStaticRanks2) < errorTol) 132 133 // We have one outbound edge from 1 to 0 134 val otherStaticRanks2 = starGraph.staticPersonalizedPageRank(1, numIter = 2, resetProb) 135 .vertices.cache() 136 val otherDynamicRanks = starGraph.personalizedPageRank(1, 0, resetProb).vertices.cache() 137 val otherParallelStaticRanks2 = starGraph 138 .staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices { 139 case (vertexId, vector) => vector(1) 140 }.vertices.cache() 141 assert(compareRanks(otherDynamicRanks, otherStaticRanks2) < errorTol) 142 assert(compareRanks(otherStaticRanks2, otherParallelStaticRanks2) < errorTol) 143 assert(compareRanks(otherDynamicRanks, otherParallelStaticRanks2) < errorTol) 144 } 145 } // end of test Star PersonalPageRank 146 147 test("Grid PageRank") { 148 withSpark { sc => 149 val rows = 10 150 val cols = 10 151 val resetProb = 0.15 152 val tol = 0.0001 153 val numIter = 50 154 val errorTol = 1.0e-5 155 val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache() 156 157 val staticRanks = gridGraph.staticPageRank(numIter, resetProb).vertices.cache() 158 val dynamicRanks = gridGraph.pageRank(tol, resetProb).vertices.cache() 159 val referenceRanks = VertexRDD( 160 sc.parallelize(GridPageRank(rows, cols, numIter, resetProb))).cache() 161 162 assert(compareRanks(staticRanks, referenceRanks) < errorTol) 163 assert(compareRanks(dynamicRanks, referenceRanks) < errorTol) 164 } 165 } // end of Grid PageRank 166 167 test("Chain PageRank") { 168 withSpark { sc => 169 val chain1 = (0 until 9).map(x => (x, x + 1)) 170 val rawEdges = sc.parallelize(chain1, 1).map { case (s, d) => (s.toLong, d.toLong) } 171 val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache() 172 val resetProb = 0.15 173 val tol = 0.0001 174 val numIter = 10 175 val errorTol = 1.0e-5 176 177 val staticRanks = chain.staticPageRank(numIter, resetProb).vertices 178 val dynamicRanks = chain.pageRank(tol, resetProb).vertices 179 180 assert(compareRanks(staticRanks, dynamicRanks) < errorTol) 181 } 182 } 183 184 test("Chain PersonalizedPageRank") { 185 withSpark { sc => 186 val chain1 = (0 until 9).map(x => (x, x + 1) ) 187 val rawEdges = sc.parallelize(chain1, 1).map { case (s, d) => (s.toLong, d.toLong) } 188 val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache() 189 val resetProb = 0.15 190 val tol = 0.0001 191 val numIter = 10 192 val errorTol = 1.0e-1 193 194 val staticRanks = chain.staticPersonalizedPageRank(4, numIter, resetProb).vertices 195 val dynamicRanks = chain.personalizedPageRank(4, tol, resetProb).vertices 196 197 assert(compareRanks(staticRanks, dynamicRanks) < errorTol) 198 199 val parallelStaticRanks = chain 200 .staticParallelPersonalizedPageRank(Array(4), numIter, resetProb).mapVertices { 201 case (vertexId, vector) => vector(0) 202 }.vertices.cache() 203 assert(compareRanks(staticRanks, parallelStaticRanks) < errorTol) 204 } 205 } 206} 207