1 /****************************************************************/
2 /* Parallel Combinatorial BLAS Library (for Graph Computations) */
3 /* version 1.6 -------------------------------------------------*/
4 /* date: 6/15/2017 ---------------------------------------------*/
5 /* authors: Ariful Azad, Aydin Buluc --------------------------*/
6 /****************************************************************/
7 /*
8 Copyright (c) 2010-2017, The Regents of the University of California
9
10 Permission is hereby granted, free of charge, to any person obtaining a copy
11 of this software and associated documentation files (the "Software"), to deal
12 in the Software without restriction, including without limitation the rights
13 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 copies of the Software, and to permit persons to whom the Software is
15 furnished to do so, subject to the following conditions:
16
17 The above copyright notice and this permission notice shall be included in
18 all copies or substantial portions of the Software.
19
20 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 THE SOFTWARE.
27 */
28
29 #define DETERMINISTIC
30 #include "CombBLAS/CombBLAS.h"
31 #include <mpi.h>
32 #include <sys/time.h>
33 #include <iostream>
34 #include <functional>
35 #include <algorithm>
36 #include <vector>
37 #include <string>
38 #include <sstream>
39 #ifdef THREADED
40 #ifndef _OPENMP
41 #define _OPENMP
42 #endif
43 #include <omp.h>
44 #endif
45
46
47 double cblas_alltoalltime;
48 double cblas_allgathertime;
49 int cblas_splits;
50
51 #include "TwitterEdge.h"
52
53 #define EDGEFACTOR 5 // For MIS
54 #define ITERS 16
55 #define PERCENTS 4 // testing with 4 different percentiles
56
57 using namespace std;
58 using namespace combblas;
59
60
61 template <typename PARMAT>
Symmetricize(PARMAT & A)62 void Symmetricize(PARMAT & A)
63 {
64 // boolean addition is practically a "logical or"
65 // therefore this doesn't destruct any links
66 PARMAT AT = A;
67 AT.Transpose();
68 A += AT;
69 }
70
71
72 struct DetSymmetricize: public std::binary_function<TwitterEdge, TwitterEdge, TwitterEdge>
73 {
74 // have to deterministically choose between one of the two values.
75 // cannot just add them because that will change the distribution (small values are unlikely to survive)
operator ()DetSymmetricize76 TwitterEdge operator()(const TwitterEdge & g, const TwitterEdge & t)
77 {
78 TwitterEdge toret = g;
79
80 if(((g.latest + t.latest) & 1) == 1)
81 {
82 toret.latest = std::min(g.latest, t.latest);
83 }
84 else
85 {
86 toret.latest = std::max(g.latest, t.latest);
87 }
88 return toret;
89 }
90 };
91
92 typedef SpParMat < int64_t, TwitterEdge, SpDCCols<int64_t, TwitterEdge > > PSpMat_Twitter;
93 typedef SpParMat < int64_t, bool, SpDCCols<int64_t, bool > > PSpMat_Bool;
94
SymmetricizeRands(PSpMat_Twitter & A)95 void SymmetricizeRands(PSpMat_Twitter & A)
96 {
97 PSpMat_Twitter AT = A;
98 AT.Transpose();
99 // SpParMat<IU,RETT,RETDER> EWiseApply (const SpParMat<IU,NU1,UDERA> & A,
100 // const SpParMat<IU,NU2,UDERB> & B, _BinaryOperation __binary_op, bool notB, const NU2& defaultBVal)
101 // Default B value is irrelevant since the structures of the matrices are
102 A = EWiseApply<TwitterEdge, SpDCCols<int64_t, TwitterEdge > >(A, AT, DetSymmetricize(), false, TwitterEdge());
103 }
104
105 #ifdef DETERMINISTIC
106 MTRand GlobalMT(1);
107 #else
108 MTRand GlobalMT; // generate random numbers with Mersenne Twister
109 #endif
110
111
112 struct Twitter_obj_randomizer : public std::unary_function<TwitterEdge, TwitterEdge>
113 {
operator ()Twitter_obj_randomizer114 const TwitterEdge operator()(const TwitterEdge & x) const
115 {
116 short mycount = 1;
117 bool myfollow = 0;
118 time_t mylatest = static_cast<int64_t>(GlobalMT.rand() * 10000); // random.randrange(0,10000)
119
120 return TwitterEdge(mycount, myfollow, mylatest);
121 }
122 };
123
124 struct Twitter_materialize: public std::binary_function<TwitterEdge, time_t, bool>
125 {
operator ()Twitter_materialize126 bool operator()(const TwitterEdge & x, time_t sincedate) const
127 {
128 if(x.isRetwitter() && x.LastTweetBy(sincedate))
129 return false; // false if the edge is going to be kept
130 else
131 return true; // true if the edge is to be pruned
132 }
133 };
134
135 // def rand( verc ):
136 // import random
137 // return random.random()
138 struct randGen : public std::unary_function<double, double>
139 {
operator ()randGen140 const double operator()(const double & ignore)
141 {
142 return GlobalMT.rand();
143 }
144 };
145
146
main(int argc,char * argv[])147 int main(int argc, char* argv[])
148 {
149 int nprocs, myrank;
150 #ifdef _OPENMP
151 int cblas_splits = omp_get_max_threads();
152 int provided, flag, claimed;
153 MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided );
154 MPI_Is_thread_main( &flag );
155 if (!flag)
156 SpParHelper::Print("This thread called init_thread but Is_thread_main gave false\n");
157 MPI_Query_thread( &claimed );
158 if (claimed != provided)
159 SpParHelper::Print("Query thread gave different thread level than requested\n");
160 #else
161 MPI_Init(&argc, &argv);
162 int cblas_splits = 1;
163 #endif
164
165 MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
166 MPI_Comm_rank(MPI_COMM_WORLD,&myrank);
167 if(argc < 2)
168 {
169 if(myrank == 0)
170 {
171 cout << "Usage: ./FilteredMIS <Scale>" << endl;
172 }
173 MPI_Finalize();
174 return -1;
175 }
176 {
177 // Declare objects
178 PSpMat_Twitter A;
179 shared_ptr<CommGrid> fullWorld;
180 fullWorld.reset( new CommGrid(MPI_COMM_WORLD, 0, 0) );
181 FullyDistVec<int64_t, int64_t> indegrees(fullWorld); // in-degrees of vertices (including multi-edges and self-loops)
182 FullyDistVec<int64_t, int64_t> oudegrees(fullWorld); // out-degrees of vertices (including multi-edges and self-loops)
183 FullyDistVec<int64_t, int64_t> degrees(fullWorld); // combined degrees of vertices (including multi-edges and self-loops)
184 PSpMat_Bool * ABool;
185
186 SpParHelper::Print("Using synthetic data, which we ALWAYS permute for load balance\n");
187 SpParHelper::Print("We only balance the original input, we don't repermute after each filter change\n");
188 SpParHelper::Print("BFS is run on UNDIRECTED graph, hence hitting CCs, and TEPS is bidirectional\n");
189
190 double initiator[4] = {.25, .25, .25, .25}; // creating erdos-renyi
191 double t01 = MPI_Wtime();
192 DistEdgeList<int64_t> * DEL = new DistEdgeList<int64_t>();
193
194 unsigned scale = static_cast<unsigned>(atoi(argv[1]));
195 ostringstream outs;
196 outs << "Forcing scale to : " << scale << endl;
197 SpParHelper::Print(outs.str());
198
199 // parameters: (double initiator[4], int log_numverts, int edgefactor, bool scramble, bool packed)
200 DEL->GenGraph500Data(initiator, scale, EDGEFACTOR, true, true ); // generate packed edges
201 SpParHelper::Print("Generated renamed edge lists\n");
202
203 ABool = new PSpMat_Bool(*DEL, false);
204 int64_t removed = ABool->RemoveLoops();
205 ostringstream loopinfo;
206 loopinfo << "Converted to Boolean and removed " << removed << " loops" << endl;
207 SpParHelper::Print(loopinfo.str());
208 ABool->PrintInfo();
209 delete DEL; // free memory
210 A = PSpMat_Twitter(*ABool); // any upcasting generates the default object
211
212 double t02 = MPI_Wtime();
213 ostringstream tinfo;
214 tinfo << "Generation took " << t02-t01 << " seconds" << endl;
215 SpParHelper::Print(tinfo.str());
216
217 // indegrees is sum along rows because A is loaded as "tranposed", similarly oudegrees is sum along columns
218 ABool->PrintInfo();
219 ABool->Reduce(oudegrees, Column, plus<int64_t>(), static_cast<int64_t>(0));
220 ABool->Reduce(indegrees, Row, plus<int64_t>(), static_cast<int64_t>(0));
221
222 // indegrees_filt and oudegrees_filt is used for the real data
223 FullyDistVec<int64_t, int64_t> indegrees_filt(fullWorld);
224 FullyDistVec<int64_t, int64_t> oudegrees_filt(fullWorld);
225
226 typedef FullyDistVec<int64_t, int64_t> IntVec; // used for the synthetic data (symmetricized before randomization)
227 FullyDistVec<int64_t, int64_t> degrees_filt[4] = {IntVec(fullWorld), IntVec(fullWorld), IntVec(fullWorld), IntVec(fullWorld)};
228 int64_t keep[PERCENTS] = {100, 1000, 2500, 10000}; // ratio of edges kept in range (0, 10000)
229
230 degrees = indegrees;
231 degrees.EWiseApply(oudegrees, plus<int64_t>());
232 SpParHelper::Print("All degrees calculated\n");
233 delete ABool;
234
235 float balance = A.LoadImbalance();
236 ostringstream outlb;
237 outlb << "Load balance: " << balance << endl;
238 SpParHelper::Print(outlb.str());
239
240 // We symmetricize before we apply the random generator
241 // Otherwise += will naturally add the random numbers together
242 // hence will create artificially high-permeable filters
243 Symmetricize(A); // A += A';
244 SpParHelper::Print("Symmetricized\n");
245
246 A.Apply(Twitter_obj_randomizer());
247 A.PrintInfo();
248 SymmetricizeRands(A);
249 SpParHelper::Print("Symmetricized Rands\n");
250 A.PrintInfo();
251
252
253 FullyDistVec<int64_t, int64_t> * nonisov = new FullyDistVec<int64_t, int64_t>(degrees.FindInds(bind2nd(greater<int64_t>(), 0)));
254 SpParHelper::Print("Found (and permuted) non-isolated vertices\n");
255 nonisov->RandPerm(); // so that A(v,v) is load-balanced (both memory and time wise)
256 A(*nonisov, *nonisov, true); // in-place permute to save memory
257 SpParHelper::Print("Dropped isolated vertices from input\n");
258
259 indegrees = indegrees(*nonisov); // fix the degrees arrays too
260 oudegrees = oudegrees(*nonisov);
261 degrees = degrees(*nonisov);
262 delete nonisov;
263
264 for (int i=0; i < PERCENTS; i++)
265 {
266 PSpMat_Twitter B = A;
267 B.Prune(bind2nd(Twitter_materialize(), keep[i]));
268 PSpMat_Bool BBool = B;
269 BBool.PrintInfo();
270 float balance = B.LoadImbalance();
271 ostringstream outs;
272 outs << "Load balance of " << static_cast<float>(keep[i])/100 << "% filtered case: " << balance << endl;
273 SpParHelper::Print(outs.str());
274
275 // degrees_filt[i] is by-default generated as permuted
276 BBool.Reduce(degrees_filt[i], Column, plus<int64_t>(), static_cast<int64_t>(0)); // Column=Row since BBool is symmetric
277 }
278
279 float balance_former = A.LoadImbalance();
280 ostringstream outs_former;
281 outs_former << "Load balance: " << balance_former << endl;
282 SpParHelper::Print(outs_former.str());
283
284 for(int trials =0; trials < PERCENTS; trials++)
285 {
286 cblas_allgathertime = 0;
287 cblas_alltoalltime = 0;
288 double MISVS[ITERS]; // numbers of vertices in each MIS
289 double TIMES[ITERS];
290
291 LatestRetwitterMIS::sincedate = keep[trials];
292 LatestRetwitterSelect2nd::sincedate = keep[trials];
293 ostringstream outs;
294 outs << "Initializing since date (only once) to " << LatestRetwitterMIS::sincedate << endl;
295 SpParHelper::Print(outs.str());
296
297 for(int sruns = 0; sruns < ITERS; ++sruns)
298 {
299 double t1 = MPI_Wtime();
300 int64_t nvert = A.getncol();
301
302 //# the final result set. S[i] exists and is 1 if vertex i is in the MIS
303 //S = Vec(nvert, sparse=True)
304 FullyDistSpVec<int64_t, uint8_t> S ( A.getcommgrid(), nvert);
305
306 //# the candidate set. initially all vertices are candidates.
307 //# this vector doubles as 'r', the random value vector.
308 //# i.e. if C[i] exists, then i is a candidate. The value C[i] is i's r for this iteration.
309 //C = Vec.ones(nvert, sparse=True)
310 //FullyDistSpVec's length is not the same as its nnz
311 //Since FullyDistSpVec::Apply only affects nonzeros, nnz should be forced to glen
312 // FullyDistVec ( shared_ptr<CommGrid> grid, IT globallen, NT initval);
313 FullyDistVec<int64_t, double> * denseC = new FullyDistVec<int64_t, double>( A.getcommgrid(), nvert, 1.0);
314 FullyDistSpVec<int64_t, double> C ( *denseC);
315 delete denseC;
316
317 FullyDistSpVec<int64_t, double> min_neighbor_r ( A.getcommgrid(), nvert);
318 FullyDistSpVec<int64_t, uint8_t> new_S_members ( A.getcommgrid(), nvert);
319 FullyDistSpVec<int64_t, uint8_t> new_S_neighbors ( A.getcommgrid(), nvert);
320
321 while (C.getnnz() > 0)
322 {
323 //# label each vertex in C with a random value
324 C.Apply(randGen());
325
326 //# find the smallest random value among a vertex's neighbors
327 //# In other words: min_neighbor_r[i] = min(C[j] for all neighbors j of vertex i)
328 //min_neighbor_r = Gmatrix.SpMV(C, sr(myMin,select2nd)) # could use "min" directly
329 SpMV<LatestRetwitterMIS>(A, C, min_neighbor_r, false); // min_neighbor_r empty OK?
330 #ifdef PRINTITERS
331 min_neighbor_r.PrintInfo("Neighbors");
332 #endif
333
334 #ifdef DEBUG
335 min_neighbor_r.DebugPrint();
336 C.DebugPrint();
337 #endif
338
339 //# The vertices to be added to S this iteration are those whose random value is
340 //# smaller than those of all its neighbors:
341 //# new_S_members[i] exists if C[i] < min_neighbor_r[i]
342 //# If C[i] exists and min_neighbor_r[i] doesn't, still a value is returned with bin_op(NULL,C[i])
343 //new_S_members = min_neighbor_r.eWiseApply(C, return1, doOp=is2ndSmaller, allowANulls=True, allowBNulls=False, inPlace=False, ANull=2)
344 new_S_members = EWiseApply<uint8_t>(min_neighbor_r, C, return1_uint8(), is2ndSmaller(), true, false, (double) 2.0, (double) 2.0, true);
345 //// template <typename RET, typename IU ...>
346 //// FullyDistSpVec<IU,RET> EWiseApply (const FullyDistSpVec<IU,NU1> & V, const FullyDistSpVec<IU,NU2> & W , _BinaryOperation _binary_op, _BinaryPredicate _doOp,
347 //// bool allowVNulls, bool allowWNulls, NU1 Vzero, NU2 Wzero, const bool allowIntersect = true);
348 #ifdef PRINTITERS
349 new_S_members.PrintInfo("New members of the MIS");
350 #endif
351
352 #ifdef DEBUG
353 new_S_members.DebugPrint();
354 #endif
355
356 //# new_S_members are no longer candidates, so remove them from C
357 //C.eWiseApply(new_S_members, return1, allowANulls=False, allowIntersect=False, allowBNulls=True, inPlace=True)
358 C = EWiseApply<double>(C, new_S_members, return1_uint8(), return1_uint8(), false, true, (double) 0.0, (uint8_t) 0, false);
359 #ifdef PRINTITERS
360 C.PrintInfo("Entries to be removed from the Candidates set");
361 #endif
362
363 //# find neighbors of new_S_members
364 //new_S_neighbors = Gmatrix.SpMV(new_S_members, sr(select2nd,select2nd))
365 SpMV<LatestRetwitterSelect2nd>(A, new_S_members, new_S_neighbors, false);
366
367 //# remove neighbors of new_S_members from C, because they cannot be part of the MIS anymore
368 //# If C[i] exists and new_S_neighbors[i] doesn't, still a value is returned with bin_op(NULL,C[i])
369 //C.eWiseApply(new_S_neighbors, return1, allowANulls=False, allowIntersect=False, allowBNulls=True, inPlace=True)
370 C = EWiseApply<double>(C, new_S_neighbors, return1_uint8(), return1_uint8(), false, true, (double) 0.0, (uint8_t) 0, false);
371 #ifdef PRINTITERS
372 C.PrintInfo("Candidates set after neighbors of MIS removed");
373 #endif
374
375 //# add new_S_members to S
376 //S.eWiseApply(new_S_members, return1, allowANulls=True, allowBNulls=True, inPlace=True)
377 S = EWiseApply<uint8_t>(S, new_S_members, return1_uint8(), return1_uint8(), true, true, (uint8_t) 1, (uint8_t) 1, true);
378 S.PrintInfo("The current MIS:");
379 }
380 double t2 = MPI_Wtime();
381
382 ostringstream ositr;
383 ositr << "MIS has " << S.getnnz() << " vertices" << endl;
384 SpParHelper::Print(ositr.str());
385
386 ostringstream ositr2;
387 ositr << "MIS time: " << t2-t1 << " seconds" << endl;
388 SpParHelper::Print(ositr.str());
389
390 TIMES[sruns] = t2-t1;
391 MISVS[sruns] = S.getnnz();
392 } // end for(int sruns = 0; sruns < ITERS; ++sruns)
393
394 ostringstream os;
395 os << "Per iteration communication times: " << endl;
396 os << "AllGatherv: " << cblas_allgathertime / ITERS << endl;
397 os << "AlltoAllv: " << cblas_alltoalltime / ITERS << endl;
398
399 sort(MISVS, MISVS+ITERS);
400 os << "--------------------------" << endl;
401 os << "Min MIS vertices: " << MISVS[0] << endl;
402 os << "Median MIS vertices: " << (MISVS[(ITERS/2)-1] + MISVS[ITERS/2])/2 << endl;
403 os << "Max MIS vertices: " << MISVS[ITERS-1] << endl;
404 double mean = accumulate( MISVS, MISVS+ITERS, 0.0 )/ ITERS;
405 vector<double> zero_mean(ITERS); // find distances to the mean
406 transform(MISVS, MISVS+ITERS, zero_mean.begin(), bind2nd( minus<double>(), mean ));
407 // self inner-product is sum of sum of squares
408 double deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
409 deviation = sqrt( deviation / (ITERS-1) );
410 os << "Mean MIS vertices: " << mean << endl;
411 os << "STDDEV MIS vertices: " << deviation << endl;
412 os << "--------------------------" << endl;
413
414 sort(TIMES,TIMES+ITERS);
415 os << "Filter keeps " << static_cast<double>(keep[trials])/100.0 << " percentage of edges" << endl;
416 os << "Min time: " << TIMES[0] << " seconds" << endl;
417 os << "Median time: " << (TIMES[(ITERS/2)-1] + TIMES[ITERS/2])/2 << " seconds" << endl;
418 os << "Max time: " << TIMES[ITERS-1] << " seconds" << endl;
419 mean = accumulate( TIMES, TIMES+ITERS, 0.0 )/ ITERS;
420 transform(TIMES, TIMES+ITERS, zero_mean.begin(), bind2nd( minus<double>(), mean ));
421 deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
422 deviation = sqrt( deviation / (ITERS-1) );
423 os << "Mean time: " << mean << " seconds" << endl;
424 os << "STDDEV time: " << deviation << " seconds" << endl;
425 os << "--------------------------" << endl;
426 SpParHelper::Print(os.str());
427 } // end for(int trials =0; trials < PERCENTS; trials++)
428 }
429 MPI_Finalize();
430 return 0;
431 }
432
433