1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.graphx.lib
19
20import org.apache.spark.SparkFunSuite
21import org.apache.spark.graphx._
22import org.apache.spark.graphx.util.GraphGenerators
23
24
25object GridPageRank {
26  def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double): Seq[(VertexId, Double)] = {
27    val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int])
28    val outDegree = Array.fill(nRows * nCols)(0)
29    // Convert row column address into vertex ids (row major order)
30    def sub2ind(r: Int, c: Int): Int = r * nCols + c
31    // Make the grid graph
32    for (r <- 0 until nRows; c <- 0 until nCols) {
33      val ind = sub2ind(r, c)
34      if (r + 1 < nRows) {
35        outDegree(ind) += 1
36        inNbrs(sub2ind(r + 1, c)) += ind
37      }
38      if (c + 1 < nCols) {
39        outDegree(ind) += 1
40        inNbrs(sub2ind(r, c + 1)) += ind
41      }
42    }
43    // compute the pagerank
44    var pr = Array.fill(nRows * nCols)(resetProb)
45    for (iter <- 0 until nIter) {
46      val oldPr = pr
47      pr = new Array[Double](nRows * nCols)
48      for (ind <- 0 until (nRows * nCols)) {
49        pr(ind) = resetProb + (1.0 - resetProb) *
50          inNbrs(ind).map( nbr => oldPr(nbr) / outDegree(nbr)).sum
51      }
52    }
53    (0L until (nRows * nCols)).zip(pr)
54  }
55
56}
57
58
59class PageRankSuite extends SparkFunSuite with LocalSparkContext {
60
61  def compareRanks(a: VertexRDD[Double], b: VertexRDD[Double]): Double = {
62    a.leftJoin(b) { case (id, a, bOpt) => (a - bOpt.getOrElse(0.0)) * (a - bOpt.getOrElse(0.0)) }
63      .map { case (id, error) => error }.sum()
64  }
65
66  test("Star PageRank") {
67    withSpark { sc =>
68      val nVertices = 100
69      val starGraph = GraphGenerators.starGraph(sc, nVertices).cache()
70      val resetProb = 0.15
71      val errorTol = 1.0e-5
72
73      val staticRanks1 = starGraph.staticPageRank(numIter = 1, resetProb).vertices
74      val staticRanks2 = starGraph.staticPageRank(numIter = 2, resetProb).vertices.cache()
75
76      // Static PageRank should only take 2 iterations to converge
77      val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) =>
78        if (pr1 != pr2) 1 else 0
79      }.map { case (vid, test) => test }.sum()
80      assert(notMatching === 0)
81
82      val staticErrors = staticRanks2.map { case (vid, pr) =>
83        val p = math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) ))
84        val correct = (vid > 0 && pr == resetProb) || (vid == 0L && p < 1.0E-5)
85        if (!correct) 1 else 0
86      }
87      assert(staticErrors.sum === 0)
88
89      val dynamicRanks = starGraph.pageRank(0, resetProb).vertices.cache()
90      assert(compareRanks(staticRanks2, dynamicRanks) < errorTol)
91    }
92  } // end of test Star PageRank
93
94  test("Star PersonalPageRank") {
95    withSpark { sc =>
96      val nVertices = 100
97      val starGraph = GraphGenerators.starGraph(sc, nVertices).cache()
98      val resetProb = 0.15
99      val errorTol = 1.0e-5
100
101      val staticRanks1 = starGraph.staticPersonalizedPageRank(0, numIter = 1, resetProb).vertices
102      val staticRanks2 = starGraph.staticPersonalizedPageRank(0, numIter = 2, resetProb)
103        .vertices.cache()
104
105      // Static PageRank should only take 2 iterations to converge
106      val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) =>
107        if (pr1 != pr2) 1 else 0
108      }.map { case (vid, test) => test }.sum
109      assert(notMatching === 0)
110
111      val staticErrors = staticRanks2.map { case (vid, pr) =>
112        val correct = (vid > 0 && pr == 0.0) ||
113          (vid == 0 && pr == resetProb)
114        if (!correct) 1 else 0
115      }
116      assert(staticErrors.sum === 0)
117
118      val dynamicRanks = starGraph.personalizedPageRank(0, 0, resetProb).vertices.cache()
119      assert(compareRanks(staticRanks2, dynamicRanks) < errorTol)
120
121      val parallelStaticRanks1 = starGraph
122        .staticParallelPersonalizedPageRank(Array(0), 1, resetProb).mapVertices {
123          case (vertexId, vector) => vector(0)
124        }.vertices.cache()
125      assert(compareRanks(staticRanks1, parallelStaticRanks1) < errorTol)
126
127      val parallelStaticRanks2 = starGraph
128        .staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices {
129          case (vertexId, vector) => vector(0)
130        }.vertices.cache()
131      assert(compareRanks(staticRanks2, parallelStaticRanks2) < errorTol)
132
133      // We have one outbound edge from 1 to 0
134      val otherStaticRanks2 = starGraph.staticPersonalizedPageRank(1, numIter = 2, resetProb)
135        .vertices.cache()
136      val otherDynamicRanks = starGraph.personalizedPageRank(1, 0, resetProb).vertices.cache()
137      val otherParallelStaticRanks2 = starGraph
138        .staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices {
139          case (vertexId, vector) => vector(1)
140        }.vertices.cache()
141      assert(compareRanks(otherDynamicRanks, otherStaticRanks2) < errorTol)
142      assert(compareRanks(otherStaticRanks2, otherParallelStaticRanks2) < errorTol)
143      assert(compareRanks(otherDynamicRanks, otherParallelStaticRanks2) < errorTol)
144    }
145  } // end of test Star PersonalPageRank
146
147  test("Grid PageRank") {
148    withSpark { sc =>
149      val rows = 10
150      val cols = 10
151      val resetProb = 0.15
152      val tol = 0.0001
153      val numIter = 50
154      val errorTol = 1.0e-5
155      val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache()
156
157      val staticRanks = gridGraph.staticPageRank(numIter, resetProb).vertices.cache()
158      val dynamicRanks = gridGraph.pageRank(tol, resetProb).vertices.cache()
159      val referenceRanks = VertexRDD(
160        sc.parallelize(GridPageRank(rows, cols, numIter, resetProb))).cache()
161
162      assert(compareRanks(staticRanks, referenceRanks) < errorTol)
163      assert(compareRanks(dynamicRanks, referenceRanks) < errorTol)
164    }
165  } // end of Grid PageRank
166
167  test("Chain PageRank") {
168    withSpark { sc =>
169      val chain1 = (0 until 9).map(x => (x, x + 1))
170      val rawEdges = sc.parallelize(chain1, 1).map { case (s, d) => (s.toLong, d.toLong) }
171      val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache()
172      val resetProb = 0.15
173      val tol = 0.0001
174      val numIter = 10
175      val errorTol = 1.0e-5
176
177      val staticRanks = chain.staticPageRank(numIter, resetProb).vertices
178      val dynamicRanks = chain.pageRank(tol, resetProb).vertices
179
180      assert(compareRanks(staticRanks, dynamicRanks) < errorTol)
181    }
182  }
183
184  test("Chain PersonalizedPageRank") {
185    withSpark { sc =>
186      val chain1 = (0 until 9).map(x => (x, x + 1) )
187      val rawEdges = sc.parallelize(chain1, 1).map { case (s, d) => (s.toLong, d.toLong) }
188      val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache()
189      val resetProb = 0.15
190      val tol = 0.0001
191      val numIter = 10
192      val errorTol = 1.0e-1
193
194      val staticRanks = chain.staticPersonalizedPageRank(4, numIter, resetProb).vertices
195      val dynamicRanks = chain.personalizedPageRank(4, tol, resetProb).vertices
196
197      assert(compareRanks(staticRanks, dynamicRanks) < errorTol)
198
199      val parallelStaticRanks = chain
200        .staticParallelPersonalizedPageRank(Array(4), numIter, resetProb).mapVertices {
201          case (vertexId, vector) => vector(0)
202        }.vertices.cache()
203      assert(compareRanks(staticRanks, parallelStaticRanks) < errorTol)
204    }
205  }
206}
207