1 /****************************************************************/
2 /* Parallel Combinatorial BLAS Library (for Graph Computations) */
3 /* version 1.5 -------------------------------------------------*/
4 /* date: 10/09/2015 ---------------------------------------------*/
5 /* authors: Ariful Azad, Aydin Buluc, Adam Lugowski ------------*/
6 /****************************************************************/
7 /*
8 Copyright (c) 2010-2015, The Regents of the University of California
9
10 Permission is hereby granted, free of charge, to any person obtaining a copy
11 of this software and associated documentation files (the "Software"), to deal
12 in the Software without restriction, including without limitation the rights
13 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 copies of the Software, and to permit persons to whom the Software is
15 furnished to do so, subject to the following conditions:
16
17 The above copyright notice and this permission notice shall be included in
18 all copies or substantial portions of the Software.
19
20 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 THE SOFTWARE.
27 */
28
29 #include "CombBLAS/CombBLAS.h"
30 #include <mpi.h>
31 #include <sys/time.h>
32 #include <iostream>
33 #include <functional>
34 #include <algorithm>
35 #include <vector>
36 #include <string>
37 #include <sstream>
38
39 using namespace combblas;
40
41 #ifdef _OPENMP
42 int cblas_splits = omp_get_max_threads();
43 #else
44 int cblas_splits = 1;
45 #endif
46
47 double cblas_alltoalltime;
48 double cblas_allgathertime;
49 double cblas_mergeconttime;
50 double cblas_transvectime;
51 double cblas_localspmvtime;
52
53
54 #define EDGEFACTOR 16
55 int ITERS;
56 using namespace std;
57
58
59 typedef SelectMaxSRing<bool, int32_t> SR;
60 typedef SpParMat < int64_t, bool, SpDCCols<int64_t,bool> > PSpMat_Bool;
61 typedef SpParMat < int64_t, bool, SpDCCols<int32_t,bool> > PSpMat_s32p64; // sequentially use 32-bits for local matrices, but parallel semantics are 64-bits
62 typedef SpParMat < int64_t, int, SpDCCols<int32_t,int> > PSpMat_s32p64_Int; // similarly mixed, but holds integers as upposed to booleans
63 typedef SpParMat < int64_t, int64_t, SpDCCols<int64_t,int64_t> > PSpMat_Int64;
64 typedef SpParMat < int64_t, bool, SpCCols<int64_t,bool> > Par_CSC_Bool;
65
66
67 template <typename PARMAT>
Symmetricize(PARMAT & A)68 void Symmetricize(PARMAT & A)
69 {
70 // boolean addition is practically a "logical or"
71 // therefore this doesn't destruct any links
72 PARMAT AT = A;
73 AT.Transpose();
74 A += AT;
75 }
76
77
78
79 struct SelectMinSR
80 {
81 typedef int64_t T_promote;
idSelectMinSR82 static T_promote id(){ return -1; };
returnedSAIDSelectMinSR83 static bool returnedSAID() { return false; }
84 //static MPI_Op mpi_op() { return MPI_MIN; };
85
addSelectMinSR86 static T_promote add(const T_promote & arg1, const T_promote & arg2)
87 {
88 return std::min(arg1, arg2);
89 }
90
multiplySelectMinSR91 static T_promote multiply(const bool & arg1, const T_promote & arg2)
92 {
93 return arg2;
94 }
95
axpySelectMinSR96 static void axpy(bool a, const T_promote & x, T_promote & y)
97 {
98 y = std::min(y, x);
99 }
100 };
101
102
103
104
BFS_CSC(PSpMat_s32p64 Aeff,int64_t source,FullyDistVec<int64_t,int64_t> degrees)105 void BFS_CSC(PSpMat_s32p64 Aeff, int64_t source, FullyDistVec<int64_t, int64_t> degrees)
106 {
107
108 int nthreads=1;
109 #ifdef THREADED
110 #pragma omp parallel
111 {
112 nthreads = omp_get_num_threads();
113 }
114 #endif
115
116 Par_CSC_Bool ABoolCSC (Aeff);
117 PreAllocatedSPA<int64_t> SPA(ABoolCSC.seq(), nthreads*4);
118
119 double tspmvall=0, tall=0;
120 int iterall = 0;
121 int visitedE = 0;
122 int visitedV = 0;
123
124
125 for(int i=0; i<ITERS; ++i)
126 {
127 // FullyDistVec ( shared_ptr<CommGrid> grid, IT globallen, NT initval);
128 FullyDistVec<int64_t, int64_t> parents ( Aeff.getcommgrid(), Aeff.getncol(), (int64_t) -1); // identity is -1
129
130 // FullyDistSpVec ( shared_ptr<CommGrid> grid, IT glen);
131 FullyDistSpVec<int64_t, int64_t> fringe(Aeff.getcommgrid(), Aeff.getncol()); // numerical values are stored 0-based
132
133 MPI_Barrier(MPI_COMM_WORLD);
134 double t1 = MPI_Wtime();
135
136 fringe.SetElement(source, source);
137 int iterations = 0;
138
139 while(fringe.getnnz() > 0)
140 {
141 int64_t xnnz = fringe.getnnz();
142 fringe.setNumToInd();
143 double tstart = MPI_Wtime();
144 SpMV<SelectMinSR>(ABoolCSC, fringe, fringe, false, SPA);
145 double tspmv = MPI_Wtime()-tstart;
146 tspmvall += tspmv;
147 int64_t ynnz = fringe.getnnz();
148 fringe = EWiseMult(fringe, parents, true, (int64_t) -1); // clean-up vertices that already has parents
149
150 ostringstream outs1;
151 outs1 << "iteration: " << iterations << " xnnz: "<< xnnz << " ynnz: " << ynnz << " SpMSpV time: " << tspmv << endl;
152 SpParHelper::Print(outs1.str());
153
154 parents.Set(fringe);
155 iterations++;
156 }
157 MPI_Barrier(MPI_COMM_WORLD);
158 double t2 = MPI_Wtime();
159 tall += (t2 - t1);
160
161 FullyDistSpVec<int64_t, int64_t> parentsp = parents.Find(bind2nd(greater<int64_t>(), -1));
162 parentsp.Apply(myset<int64_t>(1));
163
164 // we use degrees on the directed graph, so that we don't count the reverse edges in the teps score
165 int64_t nedges = EWiseMult(parentsp, degrees, false, (int64_t) 0).Reduce(plus<int64_t>(), (int64_t) 0);
166
167 visitedE += nedges;
168 visitedV += parentsp.Reduce(plus<int64_t>(), (int64_t) 0);
169 iterall += iterations;
170 }
171
172 int myrank;
173 MPI_Comm_rank(MPI_COMM_WORLD,&myrank);
174 if(myrank == 0)
175 {
176 cout << "\nOverall stats:" << endl;
177 cout << " starting vertex: " << source << endl;
178 cout << " Avg number iterations: " << iterall/ITERS << endl;
179 cout << " Avg number of vertices found: " << visitedV/ITERS << endl;
180 cout << " Avg Number of edges traversed: " << visitedE/ITERS << endl;
181 cout << " Avg SpMSpV time: " << tspmvall/ITERS << endl;
182 cout << " Avg Total time: " << tall/ITERS << endl;
183 }
184
185 }
186
187
BFS_DCSC(PSpMat_s32p64 Aeff1,int64_t source,FullyDistVec<int64_t,int64_t> degrees)188 void BFS_DCSC(PSpMat_s32p64 Aeff1, int64_t source, FullyDistVec<int64_t, int64_t> degrees)
189 {
190
191 PSpMat_Bool Aeff = Aeff1;
192 OptBuf<int32_t, int64_t> optbuf; // let indices be 32-bits
193 //Aeff.OptimizeForGraph500(optbuf);
194 int nthreads=1;
195 #ifdef THREADED
196 #pragma omp parallel
197 {
198 nthreads = omp_get_num_threads();
199 cblas_splits = nthreads*4;
200 }
201 Aeff.ActivateThreading(cblas_splits);
202 #endif
203
204 double tspmvall=0, tall=0;
205 int iterall = 0;
206 int visitedE = 0;
207 int visitedV = 0;
208
209
210 for(int i=0; i<ITERS; ++i)
211 {
212 // FullyDistVec ( shared_ptr<CommGrid> grid, IT globallen, NT initval);
213 FullyDistVec<int64_t, int64_t> parents ( Aeff.getcommgrid(), Aeff.getncol(), (int64_t) -1); // identity is -1
214
215 // FullyDistSpVec ( shared_ptr<CommGrid> grid, IT glen);
216 FullyDistSpVec<int64_t, int64_t> fringe(Aeff.getcommgrid(), Aeff.getncol()); // numerical values are stored 0-based
217
218 MPI_Barrier(MPI_COMM_WORLD);
219 double t1 = MPI_Wtime();
220
221 fringe.SetElement(source, source);
222 int iterations = 0;
223
224 while(fringe.getnnz() > 0)
225 {
226 int64_t xnnz = fringe.getnnz();
227 fringe.setNumToInd();
228 double tstart = MPI_Wtime();
229 SpMV<SelectMinSR>(Aeff, fringe, fringe, false);
230 double tspmv = MPI_Wtime()-tstart;
231 tspmvall += tspmv;
232 int64_t ynnz = fringe.getnnz();
233 fringe = EWiseMult(fringe, parents, true, (int64_t) -1); // clean-up vertices that already has parents
234
235 ostringstream outs1;
236 outs1 << "iteration: " << iterations << " xnnz: "<< xnnz << " ynnz: " << ynnz << " SpMSpV time: " << tspmv << endl;
237 SpParHelper::Print(outs1.str());
238
239
240 parents.Set(fringe);
241 iterations++;
242 }
243 MPI_Barrier(MPI_COMM_WORLD);
244 double t2 = MPI_Wtime();
245 tall += (t2 - t1);
246
247 FullyDistSpVec<int64_t, int64_t> parentsp = parents.Find(bind2nd(greater<int64_t>(), -1));
248 parentsp.Apply(myset<int64_t>(1));
249
250 // we use degrees on the directed graph, so that we don't count the reverse edges in the teps score
251 int64_t nedges = EWiseMult(parentsp, degrees, false, (int64_t) 0).Reduce(plus<int64_t>(), (int64_t) 0);
252
253 visitedE += nedges;
254 visitedV += parentsp.Reduce(plus<int64_t>(), (int64_t) 0);
255 iterall += iterations;
256 }
257
258 int myrank;
259 MPI_Comm_rank(MPI_COMM_WORLD,&myrank);
260 if(myrank == 0)
261 {
262 cout << "\nOverall stats:" << endl;
263 cout << " starting vertex: " << source << endl;
264 cout << " Avg number iterations: " << iterall/ITERS << endl;
265 cout << " Avg number of vertices found: " << visitedV/ITERS << endl;
266 cout << " Avg Number of edges traversed: " << visitedE/ITERS << endl;
267 cout << " Avg SpMSpV time: " << tspmvall/ITERS << endl;
268 cout << " Avg Total time: " << tall/ITERS << endl;
269 }
270
271
272 }
273
274
275
276
BFS_CSC_Split(PSpMat_s32p64 Aeff,int64_t source,FullyDistVec<int64_t,int64_t> degrees)277 void BFS_CSC_Split(PSpMat_s32p64 Aeff, int64_t source, FullyDistVec<int64_t, int64_t> degrees)
278 {
279 int nthreads=1;
280
281
282 Par_CSC_Bool ABoolCSC(Aeff);
283
284
285
286
287 #ifdef THREADED
288 #pragma omp parallel
289 {
290 nthreads = omp_get_num_threads();
291 cblas_splits = nthreads*4;
292 }
293 ABoolCSC.ActivateThreading(cblas_splits);
294 #endif
295
296 double tspmvall=0, tall=0;
297 int iterall = 0;
298 int visitedE = 0;
299 int visitedV = 0;
300
301
302 for(int i=0; i<ITERS; ++i)
303 {
304 // FullyDistVec ( shared_ptr<CommGrid> grid, IT globallen, NT initval);
305 FullyDistVec<int64_t, int64_t> parents ( Aeff.getcommgrid(), Aeff.getncol(), (int64_t) -1); // identity is -1
306
307 // FullyDistSpVec ( shared_ptr<CommGrid> grid, IT glen);
308 FullyDistSpVec<int64_t, int64_t> fringe(Aeff.getcommgrid(), Aeff.getncol()); // numerical values are stored 0-based
309
310 MPI_Barrier(MPI_COMM_WORLD);
311 double t1 = MPI_Wtime();
312
313 fringe.SetElement(source, source);
314 int iterations = 0;
315
316 while(fringe.getnnz() > 0)
317 {
318 int64_t xnnz = fringe.getnnz();
319 fringe.setNumToInd();
320 double tstart = MPI_Wtime();
321 SpMV<SelectMinSR>(ABoolCSC, fringe, fringe, false);
322 double tspmv = MPI_Wtime()-tstart;
323 tspmvall += tspmv;
324 int64_t ynnz = fringe.getnnz();
325 fringe = EWiseMult(fringe, parents, true, (int64_t) -1); // clean-up vertices that already has parents
326 ostringstream outs1;
327 outs1 << "iteration: " << iterations << " xnnz: "<< xnnz << " ynnz: " << ynnz << " SpMSpV time: " << tspmv << endl;
328 SpParHelper::Print(outs1.str());
329
330 parents.Set(fringe);
331 iterations++;
332 }
333 MPI_Barrier(MPI_COMM_WORLD);
334 double t2 = MPI_Wtime();
335 tall += (t2-t1);
336
337 FullyDistSpVec<int64_t, int64_t> parentsp = parents.Find(bind2nd(greater<int64_t>(), -1));
338 parentsp.Apply(myset<int64_t>(1));
339
340 // we use degrees on the directed graph, so that we don't count the reverse edges in the teps score
341 int64_t nedges = EWiseMult(parentsp, degrees, false, (int64_t) 0).Reduce(plus<int64_t>(), (int64_t) 0);
342
343 visitedE += nedges;
344 visitedV += parentsp.Reduce(plus<int64_t>(), (int64_t) 0);
345 iterall += iterations;
346 }
347
348 int myrank;
349 MPI_Comm_rank(MPI_COMM_WORLD,&myrank);
350 if(myrank == 0)
351 {
352 cout << "\nOverall stats:" << endl;
353 cout << " starting vertex: " << source << endl;
354 cout << " Avg number iterations: " << iterall/ITERS << endl;
355 cout << " Avg number of vertices found: " << visitedV/ITERS << endl;
356 cout << " Avg Number of edges traversed: " << visitedE/ITERS << endl;
357 cout << " Avg SpMSpV time: " << tspmvall/ITERS << endl;
358 cout << " Avg Total time: " << tall/ITERS << endl;
359
360
361 }
362
363
364 }
365
366
367
368
main(int argc,char * argv[])369 int main(int argc, char* argv[])
370 {
371 int nprocs, myrank;
372
373 int provided;
374 MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &provided);
375 if (provided < MPI_THREAD_SERIALIZED)
376 {
377 printf("ERROR: The MPI library does not have MPI_THREAD_SERIALIZED support\n");
378 MPI_Abort(MPI_COMM_WORLD, 1);
379 }
380
381 MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
382 MPI_Comm_rank(MPI_COMM_WORLD,&myrank);
383 if(argc < 3)
384 {
385 if(myrank == 0)
386 {
387 cout << "Usage: ./SpMSpVBench <-input|-rmat|-er> <scale|filename> " << endl;
388 cout << " optional parameters:" << endl;
389 cout << " -source \"source of BFS\" (default: 0) " << endl;
390 cout << " -iter \"number of BFS iterations\" (default: 1)" << endl;
391 cout << "Example with a user supplied matrix:" << endl;
392 cout << " mpirun -np 4 ./SpMSpVBench -input a.mtx -source 2" << endl;
393 cout << "Example with a user supplied matrix (pre-permute the input matrix for load balance):" << endl;
394 cout << " mpirun -np 4 ./SpMSpVBench -input a.mtx -permute" << endl;
395 cout << "Example with RMAT matrix: mpirun -np 4 ./SpMSpVBench -rmat 18" << endl;
396 cout << "Example with an Erdos-Renyi matrix: mpirun -np 4 ./SpMSpVBench -er 18" << endl;
397 }
398 MPI_Finalize();
399 return -1;
400 }
401 {
402 shared_ptr<CommGrid> fullWorld;
403 fullWorld.reset( new CommGrid(MPI_COMM_WORLD, 0, 0) );
404 int nthreads=1;
405
406 #ifdef THREADED
407 #pragma omp parallel
408 {
409 nthreads = omp_get_num_threads();
410 }
411 #endif
412
413
414 // Declare objects
415 PSpMat_Bool A(fullWorld);
416 PSpMat_s32p64 Aeff(fullWorld);
417 FullyDistVec<int64_t, int64_t> degrees(fullWorld); // degrees of vertices (including multi-edges and self-loops)
418 FullyDistVec<int64_t, int64_t> nonisov(fullWorld); // id's of non-isolated (connected) vertices
419 unsigned scale;
420 bool scramble = false;
421 int source = 0;
422 ITERS = 1;
423 bool randpermute = false;
424 bool symm = false;
425 int maxthreads = nthreads;
426 int minthreads = nthreads;
427 string filename(argv[2]);
428 for (int i = 1; i < argc; i++)
429 {
430 if (strcmp(argv[i],"-permute")==0)
431 {
432 if(myrank == 0) cout << "Randomly permute the matrix " << endl;
433 randpermute = true;
434 }
435 if (strcmp(argv[i],"-source")==0)
436 {
437 source = atoi(argv[i + 1]);
438 if(myrank == 0) cout << "Source vertex: " << source << endl;
439 }
440 if (strcmp(argv[i],"-iter")==0)
441 {
442 ITERS = atoi(argv[i + 1]);
443 if(myrank == 0) cout << "Number of iterations: " << ITERS << endl;
444 }
445 }
446
447
448
449 if(string(argv[1]) == string("-input")) // input option
450 {
451
452 //A.ReadDistribute(string(argv[2]), 0); // read it from file
453 A.ParallelReadMM(filename, true, maximum<bool>());
454 SpParHelper::Print("Read input\n");
455
456 PSpMat_Int64 * G = new PSpMat_Int64(A);
457 G->Reduce(degrees, Row, plus<int64_t>(), static_cast<int64_t>(0)); // identity is 0
458 delete G;
459
460 Symmetricize(A);
461
462 FullyDistVec<int64_t, int64_t> * ColSums = new FullyDistVec<int64_t, int64_t>(A.getcommgrid());
463 A.Reduce(*ColSums, Column, plus<int64_t>(), static_cast<int64_t>(0)); // plus<int64_t> matches the type of the output vector
464 nonisov = ColSums->FindInds(bind2nd(greater<int64_t>(), 0)); // only the indices of non-isolated vertices
465 delete ColSums;
466
467 if(randpermute)
468 nonisov.RandPerm();
469 A(nonisov, nonisov, true); // in-place permute to save memory
470 degrees = degrees(nonisov); // fix the degrees array too
471 FullyDistVec<int64_t, int64_t> newsource = nonisov.FindInds(bind2nd(equal_to<int64_t>(), source));
472 source = newsource.GetElement(0);
473 degrees = degrees(nonisov); // fix the source vertex too
474 Aeff = PSpMat_s32p64(A);
475 A.FreeMemory();
476 }
477 else if(string(argv[1]) == string("-rmat"))
478 {
479 unsigned scale;
480 scale = static_cast<unsigned>(atoi(argv[2]));
481 double initiator[4] = {.57, .19, .19, .05};
482 DistEdgeList<int64_t> * DEL = new DistEdgeList<int64_t>();
483 DEL->GenGraph500Data(initiator, scale, EDGEFACTOR, true, false );
484 MPI_Barrier(MPI_COMM_WORLD);
485
486 A = PSpMat_Bool(*DEL, false);
487 delete DEL;
488 Symmetricize(A);
489 Aeff = PSpMat_s32p64(A);
490 Aeff.Reduce(degrees, Row, plus<int64_t>(), static_cast<int64_t>(0)); // identity is 0
491 A.FreeMemory();
492
493
494 }
495 else if(string(argv[1]) == string("-er"))
496 {
497 unsigned scale;
498 scale = static_cast<unsigned>(atoi(argv[2]));
499 double initiator[4] = {.25, .25, .25, .25};
500 DistEdgeList<int64_t> * DEL = new DistEdgeList<int64_t>();
501 DEL->GenGraph500Data(initiator, scale, EDGEFACTOR, true, false );
502 MPI_Barrier(MPI_COMM_WORLD);
503
504 A = PSpMat_Bool(*DEL, false);
505 delete DEL;
506 Symmetricize(A);
507 Aeff = PSpMat_s32p64(A);
508 Aeff.Reduce(degrees, Row, plus<int64_t>(), static_cast<int64_t>(0)); // identity is 0
509 A.FreeMemory();
510 }
511 else
512 {
513 SpParHelper::Print("Unknown input option\n");
514 MPI_Finalize();
515 return -1;
516 }
517
518
519
520
521 Aeff.PrintInfo();
522 float balance = Aeff.LoadImbalance();
523 ostringstream outs;
524 outs << "Load balance: " << balance << endl;
525 SpParHelper::Print(outs.str());
526
527 MPI_Barrier(MPI_COMM_WORLD);
528
529
530 SpParHelper::Print("-------------------------------------------------\n");
531 SpParHelper::Print ("BFS With CSC matrix and SpMSpV-bucket algorithm \n");
532 SpParHelper::Print("-------------------------------------------------\n");
533 BFS_CSC(Aeff, source, degrees);
534 SpParHelper::Print("-------------------------------------------------\n");
535 SpParHelper::Print ("BFS With Split CSC matrix and SpMSpV-heapsort algorithm \n");
536 SpParHelper::Print("-------------------------------------------------\n");
537 BFS_CSC_Split(Aeff, source, degrees);
538 SpParHelper::Print("-------------------------------------------------\n");
539 SpParHelper::Print ("BFS With DCSC matric and SpMSpV-SPA algorithm \n");
540 SpParHelper::Print("-------------------------------------------------\n");
541 BFS_DCSC(Aeff, source, degrees);
542
543 }
544 MPI_Finalize();
545 return 0;
546
547 }
548