1 /****************************************************************/
2 /* Parallel Combinatorial BLAS Library (for Graph Computations) */
3 /* version 1.5 -------------------------------------------------*/
4 /* date: 10/09/2015 ---------------------------------------------*/
5 /* authors: Ariful Azad, Aydin Buluc, Adam Lugowski ------------*/
6 /****************************************************************/
7 /*
8  Copyright (c) 2010-2015, The Regents of the University of California
9 
10  Permission is hereby granted, free of charge, to any person obtaining a copy
11  of this software and associated documentation files (the "Software"), to deal
12  in the Software without restriction, including without limitation the rights
13  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  copies of the Software, and to permit persons to whom the Software is
15  furnished to do so, subject to the following conditions:
16 
17  The above copyright notice and this permission notice shall be included in
18  all copies or substantial portions of the Software.
19 
20  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  THE SOFTWARE.
27  */
28 
29 #include "CombBLAS/CombBLAS.h"
30 #include <mpi.h>
31 #include <sys/time.h>
32 #include <iostream>
33 #include <functional>
34 #include <algorithm>
35 #include <vector>
36 #include <string>
37 #include <sstream>
38 
39 using namespace combblas;
40 
41 #ifdef _OPENMP
42 int cblas_splits = omp_get_max_threads();
43 #else
44 int cblas_splits = 1;
45 #endif
46 
47 double cblas_alltoalltime;
48 double cblas_allgathertime;
49 double cblas_mergeconttime;
50 double cblas_transvectime;
51 double cblas_localspmvtime;
52 
53 
54 #define EDGEFACTOR 16
55 int ITERS;
56 using namespace std;
57 
58 
59 typedef SelectMaxSRing<bool, int32_t> SR;
60 typedef SpParMat < int64_t, bool, SpDCCols<int64_t,bool> > PSpMat_Bool;
61 typedef SpParMat < int64_t, bool, SpDCCols<int32_t,bool> > PSpMat_s32p64;	// sequentially use 32-bits for local matrices, but parallel semantics are 64-bits
62 typedef SpParMat < int64_t, int, SpDCCols<int32_t,int> > PSpMat_s32p64_Int;	// similarly mixed, but holds integers as upposed to booleans
63 typedef SpParMat < int64_t, int64_t, SpDCCols<int64_t,int64_t> > PSpMat_Int64;
64 typedef SpParMat < int64_t, bool, SpCCols<int64_t,bool> > Par_CSC_Bool;
65 
66 
67 template <typename PARMAT>
Symmetricize(PARMAT & A)68 void Symmetricize(PARMAT & A)
69 {
70     // boolean addition is practically a "logical or"
71     // therefore this doesn't destruct any links
72     PARMAT AT = A;
73     AT.Transpose();
74     A += AT;
75 }
76 
77 
78 
79 struct SelectMinSR
80 {
81     typedef int64_t T_promote;
idSelectMinSR82     static T_promote id(){ return -1; };
returnedSAIDSelectMinSR83     static bool returnedSAID() { return false; }
84     //static MPI_Op mpi_op() { return MPI_MIN; };
85 
addSelectMinSR86     static T_promote add(const T_promote & arg1, const T_promote & arg2)
87     {
88         return std::min(arg1, arg2);
89     }
90 
multiplySelectMinSR91     static T_promote multiply(const bool & arg1, const T_promote & arg2)
92     {
93         return arg2;
94     }
95 
axpySelectMinSR96     static void axpy(bool a, const T_promote & x, T_promote & y)
97     {
98         y = std::min(y, x);
99     }
100 };
101 
102 
103 
104 
BFS_CSC(PSpMat_s32p64 Aeff,int64_t source,FullyDistVec<int64_t,int64_t> degrees)105 void BFS_CSC(PSpMat_s32p64 Aeff, int64_t source, FullyDistVec<int64_t, int64_t> degrees)
106 {
107 
108     int nthreads=1;
109 #ifdef THREADED
110 #pragma omp parallel
111     {
112         nthreads = omp_get_num_threads();
113     }
114 #endif
115 
116     Par_CSC_Bool ABoolCSC (Aeff);
117     PreAllocatedSPA<int64_t> SPA(ABoolCSC.seq(), nthreads*4);
118 
119     double tspmvall=0, tall=0;
120     int iterall = 0;
121     int visitedE = 0;
122     int visitedV = 0;
123 
124 
125     for(int i=0; i<ITERS; ++i)
126     {
127         // FullyDistVec ( shared_ptr<CommGrid> grid, IT globallen, NT initval);
128         FullyDistVec<int64_t, int64_t> parents ( Aeff.getcommgrid(), Aeff.getncol(), (int64_t) -1);	// identity is -1
129 
130         // FullyDistSpVec ( shared_ptr<CommGrid> grid, IT glen);
131         FullyDistSpVec<int64_t, int64_t> fringe(Aeff.getcommgrid(), Aeff.getncol());	// numerical values are stored 0-based
132 
133         MPI_Barrier(MPI_COMM_WORLD);
134         double t1 = MPI_Wtime();
135 
136         fringe.SetElement(source, source);
137         int iterations = 0;
138 
139         while(fringe.getnnz() > 0)
140         {
141             int64_t xnnz = fringe.getnnz();
142             fringe.setNumToInd();
143             double tstart = MPI_Wtime();
144             SpMV<SelectMinSR>(ABoolCSC, fringe, fringe, false, SPA);
145             double tspmv = MPI_Wtime()-tstart;
146             tspmvall += tspmv;
147             int64_t ynnz = fringe.getnnz();
148             fringe = EWiseMult(fringe, parents, true, (int64_t) -1);	// clean-up vertices that already has parents
149 
150             ostringstream outs1;
151             outs1 << "iteration: " << iterations << " xnnz: "<< xnnz << " ynnz: " << ynnz << " SpMSpV time: " << tspmv << endl;
152             SpParHelper::Print(outs1.str());
153 
154             parents.Set(fringe);
155             iterations++;
156         }
157         MPI_Barrier(MPI_COMM_WORLD);
158         double t2 = MPI_Wtime();
159         tall += (t2 - t1);
160 
161         FullyDistSpVec<int64_t, int64_t> parentsp = parents.Find(bind2nd(greater<int64_t>(), -1));
162         parentsp.Apply(myset<int64_t>(1));
163 
164         // we use degrees on the directed graph, so that we don't count the reverse edges in the teps score
165         int64_t nedges = EWiseMult(parentsp, degrees, false, (int64_t) 0).Reduce(plus<int64_t>(), (int64_t) 0);
166 
167         visitedE += nedges;
168         visitedV += parentsp.Reduce(plus<int64_t>(), (int64_t) 0);
169         iterall += iterations;
170     }
171 
172     int myrank;
173     MPI_Comm_rank(MPI_COMM_WORLD,&myrank);
174     if(myrank == 0)
175     {
176         cout << "\nOverall stats:"  << endl;
177         cout << "  starting vertex: " << source << endl;
178         cout << "  Avg number iterations: " << iterall/ITERS << endl;
179         cout << "  Avg number of vertices found: " << visitedV/ITERS << endl;
180         cout << "  Avg Number of edges traversed: " << visitedE/ITERS << endl;
181         cout << "  Avg SpMSpV time: " << tspmvall/ITERS << endl;
182         cout << "  Avg Total time: " << tall/ITERS << endl;
183     }
184 
185 }
186 
187 
BFS_DCSC(PSpMat_s32p64 Aeff1,int64_t source,FullyDistVec<int64_t,int64_t> degrees)188 void BFS_DCSC(PSpMat_s32p64 Aeff1, int64_t source, FullyDistVec<int64_t, int64_t> degrees)
189 {
190 
191     PSpMat_Bool Aeff = Aeff1;
192     OptBuf<int32_t, int64_t> optbuf;	// let indices be 32-bits
193     //Aeff.OptimizeForGraph500(optbuf);
194     int nthreads=1;
195 #ifdef THREADED
196 #pragma omp parallel
197     {
198         nthreads = omp_get_num_threads();
199         cblas_splits = nthreads*4;
200     }
201     Aeff.ActivateThreading(cblas_splits);
202 #endif
203 
204     double tspmvall=0, tall=0;
205     int iterall = 0;
206     int visitedE = 0;
207     int visitedV = 0;
208 
209 
210     for(int i=0; i<ITERS; ++i)
211     {
212         // FullyDistVec ( shared_ptr<CommGrid> grid, IT globallen, NT initval);
213         FullyDistVec<int64_t, int64_t> parents ( Aeff.getcommgrid(), Aeff.getncol(), (int64_t) -1);	// identity is -1
214 
215         // FullyDistSpVec ( shared_ptr<CommGrid> grid, IT glen);
216         FullyDistSpVec<int64_t, int64_t> fringe(Aeff.getcommgrid(), Aeff.getncol());	// numerical values are stored 0-based
217 
218         MPI_Barrier(MPI_COMM_WORLD);
219         double t1 = MPI_Wtime();
220 
221         fringe.SetElement(source, source);
222         int iterations = 0;
223 
224         while(fringe.getnnz() > 0)
225         {
226             int64_t xnnz = fringe.getnnz();
227             fringe.setNumToInd();
228             double tstart = MPI_Wtime();
229             SpMV<SelectMinSR>(Aeff, fringe, fringe, false);
230             double tspmv = MPI_Wtime()-tstart;
231             tspmvall += tspmv;
232             int64_t ynnz = fringe.getnnz();
233             fringe = EWiseMult(fringe, parents, true, (int64_t) -1);	// clean-up vertices that already has parents
234 
235             ostringstream outs1;
236             outs1 << "iteration: " << iterations << " xnnz: "<< xnnz << " ynnz: " << ynnz << " SpMSpV time: " << tspmv << endl;
237             SpParHelper::Print(outs1.str());
238 
239 
240             parents.Set(fringe);
241             iterations++;
242         }
243         MPI_Barrier(MPI_COMM_WORLD);
244         double t2 = MPI_Wtime();
245         tall += (t2 - t1);
246 
247         FullyDistSpVec<int64_t, int64_t> parentsp = parents.Find(bind2nd(greater<int64_t>(), -1));
248         parentsp.Apply(myset<int64_t>(1));
249 
250         // we use degrees on the directed graph, so that we don't count the reverse edges in the teps score
251         int64_t nedges = EWiseMult(parentsp, degrees, false, (int64_t) 0).Reduce(plus<int64_t>(), (int64_t) 0);
252 
253         visitedE += nedges;
254         visitedV += parentsp.Reduce(plus<int64_t>(), (int64_t) 0);
255         iterall += iterations;
256     }
257 
258     int myrank;
259     MPI_Comm_rank(MPI_COMM_WORLD,&myrank);
260     if(myrank == 0)
261     {
262         cout << "\nOverall stats:" << endl;
263         cout << "  starting vertex: " << source << endl;
264         cout << "  Avg number iterations: " << iterall/ITERS << endl;
265         cout << "  Avg number of vertices found: " << visitedV/ITERS << endl;
266         cout << "  Avg Number of edges traversed: " << visitedE/ITERS << endl;
267         cout << "  Avg SpMSpV time: " << tspmvall/ITERS << endl;
268         cout << "  Avg Total time: " << tall/ITERS << endl;
269     }
270 
271 
272 }
273 
274 
275 
276 
BFS_CSC_Split(PSpMat_s32p64 Aeff,int64_t source,FullyDistVec<int64_t,int64_t> degrees)277 void BFS_CSC_Split(PSpMat_s32p64 Aeff, int64_t source, FullyDistVec<int64_t, int64_t> degrees)
278 {
279     int nthreads=1;
280 
281 
282     Par_CSC_Bool ABoolCSC(Aeff);
283 
284 
285 
286 
287 #ifdef THREADED
288 #pragma omp parallel
289     {
290         nthreads = omp_get_num_threads();
291         cblas_splits = nthreads*4;
292     }
293     ABoolCSC.ActivateThreading(cblas_splits);
294 #endif
295 
296     double tspmvall=0, tall=0;
297     int iterall = 0;
298     int visitedE = 0;
299     int visitedV = 0;
300 
301 
302     for(int i=0; i<ITERS; ++i)
303     {
304         // FullyDistVec ( shared_ptr<CommGrid> grid, IT globallen, NT initval);
305         FullyDistVec<int64_t, int64_t> parents ( Aeff.getcommgrid(), Aeff.getncol(), (int64_t) -1);	// identity is -1
306 
307         // FullyDistSpVec ( shared_ptr<CommGrid> grid, IT glen);
308         FullyDistSpVec<int64_t, int64_t> fringe(Aeff.getcommgrid(), Aeff.getncol());	// numerical values are stored 0-based
309 
310         MPI_Barrier(MPI_COMM_WORLD);
311         double t1 = MPI_Wtime();
312 
313         fringe.SetElement(source, source);
314         int iterations = 0;
315 
316         while(fringe.getnnz() > 0)
317         {
318             int64_t xnnz = fringe.getnnz();
319             fringe.setNumToInd();
320             double tstart = MPI_Wtime();
321             SpMV<SelectMinSR>(ABoolCSC, fringe, fringe, false);
322             double tspmv = MPI_Wtime()-tstart;
323             tspmvall += tspmv;
324             int64_t ynnz = fringe.getnnz();
325             fringe = EWiseMult(fringe, parents, true, (int64_t) -1);	// clean-up vertices that already has parents
326             ostringstream outs1;
327             outs1 << "iteration: " << iterations << " xnnz: "<< xnnz << " ynnz: " << ynnz << " SpMSpV time: " << tspmv << endl;
328             SpParHelper::Print(outs1.str());
329 
330             parents.Set(fringe);
331             iterations++;
332         }
333         MPI_Barrier(MPI_COMM_WORLD);
334         double t2 = MPI_Wtime();
335         tall += (t2-t1);
336 
337         FullyDistSpVec<int64_t, int64_t> parentsp = parents.Find(bind2nd(greater<int64_t>(), -1));
338         parentsp.Apply(myset<int64_t>(1));
339 
340         // we use degrees on the directed graph, so that we don't count the reverse edges in the teps score
341         int64_t nedges = EWiseMult(parentsp, degrees, false, (int64_t) 0).Reduce(plus<int64_t>(), (int64_t) 0);
342 
343         visitedE += nedges;
344         visitedV += parentsp.Reduce(plus<int64_t>(), (int64_t) 0);
345         iterall += iterations;
346     }
347 
348     int myrank;
349     MPI_Comm_rank(MPI_COMM_WORLD,&myrank);
350     if(myrank == 0)
351     {
352         cout << "\nOverall stats:" << endl;
353         cout << "  starting vertex: " << source << endl;
354         cout << "  Avg number iterations: " << iterall/ITERS << endl;
355         cout << "  Avg number of vertices found: " << visitedV/ITERS << endl;
356         cout << "  Avg Number of edges traversed: " << visitedE/ITERS << endl;
357         cout << "  Avg SpMSpV time: " << tspmvall/ITERS << endl;
358         cout << "  Avg Total time: " << tall/ITERS << endl;
359 
360 
361     }
362 
363 
364 }
365 
366 
367 
368 
main(int argc,char * argv[])369 int main(int argc, char* argv[])
370 {
371     int nprocs, myrank;
372 
373     int provided;
374     MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &provided);
375     if (provided < MPI_THREAD_SERIALIZED)
376     {
377         printf("ERROR: The MPI library does not have MPI_THREAD_SERIALIZED support\n");
378         MPI_Abort(MPI_COMM_WORLD, 1);
379     }
380 
381     MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
382     MPI_Comm_rank(MPI_COMM_WORLD,&myrank);
383     if(argc < 3)
384     {
385         if(myrank == 0)
386         {
387             cout << "Usage: ./SpMSpVBench <-input|-rmat|-er> <scale|filename> " << endl;
388             cout << "    optional parameters:" << endl;
389             cout << "       -source \"source of BFS\" (default: 0) " << endl;
390             cout << "       -iter \"number of BFS iterations\" (default: 1)" << endl;
391             cout << "Example with a user supplied matrix:" << endl;
392             cout << "    mpirun -np 4 ./SpMSpVBench -input a.mtx -source 2" << endl;
393             cout << "Example with a user supplied matrix (pre-permute the input matrix for load balance):" << endl;
394             cout << "    mpirun -np 4 ./SpMSpVBench -input a.mtx -permute" << endl;
395             cout << "Example with RMAT matrix: mpirun -np 4 ./SpMSpVBench -rmat 18" << endl;
396             cout << "Example with an Erdos-Renyi matrix: mpirun -np 4 ./SpMSpVBench -er 18" << endl;
397         }
398         MPI_Finalize();
399         return -1;
400     }
401     {
402         shared_ptr<CommGrid> fullWorld;
403         fullWorld.reset( new CommGrid(MPI_COMM_WORLD, 0, 0) );
404         int nthreads=1;
405 
406 #ifdef THREADED
407 #pragma omp parallel
408         {
409             nthreads = omp_get_num_threads();
410         }
411 #endif
412 
413 
414         // Declare objects
415         PSpMat_Bool A(fullWorld);
416         PSpMat_s32p64 Aeff(fullWorld);
417         FullyDistVec<int64_t, int64_t> degrees(fullWorld);	// degrees of vertices (including multi-edges and self-loops)
418         FullyDistVec<int64_t, int64_t> nonisov(fullWorld);	// id's of non-isolated (connected) vertices
419         unsigned scale;
420         bool scramble = false;
421         int source = 0;
422         ITERS = 1;
423         bool randpermute = false;
424         bool symm = false;
425         int maxthreads = nthreads;
426         int minthreads = nthreads;
427         string filename(argv[2]);
428         for (int i = 1; i < argc; i++)
429         {
430             if (strcmp(argv[i],"-permute")==0)
431             {
432                 if(myrank == 0) cout << "Randomly permute the matrix " << endl;
433                 randpermute = true;
434             }
435             if (strcmp(argv[i],"-source")==0)
436             {
437                 source = atoi(argv[i + 1]);
438                 if(myrank == 0) cout << "Source vertex: " << source << endl;
439             }
440             if (strcmp(argv[i],"-iter")==0)
441             {
442                 ITERS = atoi(argv[i + 1]);
443                 if(myrank == 0) cout << "Number of iterations: " << ITERS << endl;
444             }
445         }
446 
447 
448 
449         if(string(argv[1]) == string("-input")) // input option
450         {
451 
452             //A.ReadDistribute(string(argv[2]), 0);	// read it from file
453             A.ParallelReadMM(filename, true, maximum<bool>());
454             SpParHelper::Print("Read input\n");
455 
456             PSpMat_Int64 * G = new PSpMat_Int64(A);
457             G->Reduce(degrees, Row, plus<int64_t>(), static_cast<int64_t>(0));	// identity is 0
458             delete G;
459 
460             Symmetricize(A);
461 
462             FullyDistVec<int64_t, int64_t> * ColSums = new FullyDistVec<int64_t, int64_t>(A.getcommgrid());
463             A.Reduce(*ColSums, Column, plus<int64_t>(), static_cast<int64_t>(0)); 	// plus<int64_t> matches the type of the output vector
464             nonisov = ColSums->FindInds(bind2nd(greater<int64_t>(), 0));	// only the indices of non-isolated vertices
465             delete ColSums;
466 
467             if(randpermute)
468                 nonisov.RandPerm();
469             A(nonisov, nonisov, true);	// in-place permute to save memory
470             degrees = degrees(nonisov);	// fix the degrees array too
471             FullyDistVec<int64_t, int64_t> newsource = nonisov.FindInds(bind2nd(equal_to<int64_t>(), source));
472             source = newsource.GetElement(0);
473             degrees = degrees(nonisov);	// fix the source vertex too
474             Aeff = PSpMat_s32p64(A);
475             A.FreeMemory();
476         }
477         else if(string(argv[1]) == string("-rmat"))
478         {
479             unsigned scale;
480             scale = static_cast<unsigned>(atoi(argv[2]));
481             double initiator[4] = {.57, .19, .19, .05};
482             DistEdgeList<int64_t> * DEL = new DistEdgeList<int64_t>();
483             DEL->GenGraph500Data(initiator, scale, EDGEFACTOR, true, false );
484             MPI_Barrier(MPI_COMM_WORLD);
485 
486             A = PSpMat_Bool(*DEL, false);
487             delete DEL;
488             Symmetricize(A);
489             Aeff = PSpMat_s32p64(A);
490             Aeff.Reduce(degrees, Row, plus<int64_t>(), static_cast<int64_t>(0));	// identity is 0
491             A.FreeMemory();
492 
493 
494         }
495         else if(string(argv[1]) == string("-er"))
496         {
497             unsigned scale;
498             scale = static_cast<unsigned>(atoi(argv[2]));
499             double initiator[4] = {.25, .25, .25, .25};
500             DistEdgeList<int64_t> * DEL = new DistEdgeList<int64_t>();
501             DEL->GenGraph500Data(initiator, scale, EDGEFACTOR, true, false );
502             MPI_Barrier(MPI_COMM_WORLD);
503 
504             A = PSpMat_Bool(*DEL, false);
505             delete DEL;
506             Symmetricize(A);
507             Aeff = PSpMat_s32p64(A);
508             Aeff.Reduce(degrees, Row, plus<int64_t>(), static_cast<int64_t>(0));	// identity is 0
509             A.FreeMemory();
510         }
511         else
512         {
513             SpParHelper::Print("Unknown input option\n");
514             MPI_Finalize();
515             return -1;
516         }
517 
518 
519 
520 
521         Aeff.PrintInfo();
522         float balance = Aeff.LoadImbalance();
523         ostringstream outs;
524         outs << "Load balance: " << balance << endl;
525         SpParHelper::Print(outs.str());
526 
527         MPI_Barrier(MPI_COMM_WORLD);
528 
529 
530         SpParHelper::Print("-------------------------------------------------\n");
531         SpParHelper::Print ("BFS With CSC matrix and SpMSpV-bucket algorithm \n");
532         SpParHelper::Print("-------------------------------------------------\n");
533         BFS_CSC(Aeff, source, degrees);
534         SpParHelper::Print("-------------------------------------------------\n");
535         SpParHelper::Print ("BFS With Split CSC matrix and SpMSpV-heapsort algorithm \n");
536         SpParHelper::Print("-------------------------------------------------\n");
537         BFS_CSC_Split(Aeff, source, degrees);
538         SpParHelper::Print("-------------------------------------------------\n");
539         SpParHelper::Print ("BFS With DCSC matric and SpMSpV-SPA algorithm \n");
540         SpParHelper::Print("-------------------------------------------------\n");
541         BFS_DCSC(Aeff, source, degrees);
542 
543     }
544     MPI_Finalize();
545     return 0;
546 
547 }
548