1 /****************************************************************/
2 /* Parallel Combinatorial BLAS Library (for Graph Computations) */
3 /* version 1.6 -------------------------------------------------*/
4 /* date: 6/15/2017 ---------------------------------------------*/
5 /* authors: Ariful Azad, Aydin Buluc  --------------------------*/
6 /****************************************************************/
7 /*
8  Copyright (c) 2010-2017, The Regents of the University of California
9 
10  Permission is hereby granted, free of charge, to any person obtaining a copy
11  of this software and associated documentation files (the "Software"), to deal
12  in the Software without restriction, including without limitation the rights
13  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  copies of the Software, and to permit persons to whom the Software is
15  furnished to do so, subject to the following conditions:
16 
17  The above copyright notice and this permission notice shall be included in
18  all copies or substantial portions of the Software.
19 
20  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  THE SOFTWARE.
27  */
28 
29 #define DETERMINISTIC
30 #include "CombBLAS/CombBLAS.h"
31 #include <mpi.h>
32 #include <sys/time.h>
33 #include <iostream>
34 #include <functional>
35 #include <algorithm>
36 #include <vector>
37 #include <string>
38 #include <sstream>
39 #ifdef THREADED
40 	#ifndef _OPENMP
41 	#define _OPENMP
42 	#endif
43 	#include <omp.h>
44 #endif
45 
46 
47 double cblas_alltoalltime;
48 double cblas_allgathertime;
49 int cblas_splits;
50 
51 #include "TwitterEdge.h"
52 
53 #define EDGEFACTOR 5 // For MIS
54 #define ITERS 16
55 #define PERCENTS 4  // testing with 4 different percentiles
56 
57 using namespace std;
58 using namespace combblas;
59 
60 
61 template <typename PARMAT>
Symmetricize(PARMAT & A)62 void Symmetricize(PARMAT & A)
63 {
64 	// boolean addition is practically a "logical or"
65 	// therefore this doesn't destruct any links
66 	PARMAT AT = A;
67 	AT.Transpose();
68 	A += AT;
69 }
70 
71 
72 struct DetSymmetricize: public std::binary_function<TwitterEdge, TwitterEdge, TwitterEdge>
73 {
74 	// have to deterministically choose between one of the two values.
75 	// cannot just add them because that will change the distribution (small values are unlikely to survive)
operator ()DetSymmetricize76 	TwitterEdge operator()(const TwitterEdge & g, const TwitterEdge & t)
77 	{
78 		TwitterEdge toret = g;
79 
80 		if(((g.latest + t.latest) & 1) == 1)
81 		{
82 			toret.latest = std::min(g.latest, t.latest);
83 		}
84 		else
85 		{
86 			toret.latest = std::max(g.latest, t.latest);
87 		}
88 		return toret;
89 	}
90 };
91 
92 typedef SpParMat < int64_t, TwitterEdge, SpDCCols<int64_t, TwitterEdge > > PSpMat_Twitter;
93 typedef SpParMat < int64_t, bool, SpDCCols<int64_t, bool > > PSpMat_Bool;
94 
SymmetricizeRands(PSpMat_Twitter & A)95 void SymmetricizeRands(PSpMat_Twitter & A)
96 {
97 	PSpMat_Twitter AT = A;
98 	AT.Transpose();
99 	// SpParMat<IU,RETT,RETDER> EWiseApply (const SpParMat<IU,NU1,UDERA> & A,
100 	// const SpParMat<IU,NU2,UDERB> & B, _BinaryOperation __binary_op, bool notB, const NU2& defaultBVal)
101 	// Default B value is irrelevant since the structures of the matrices are
102 	A = EWiseApply<TwitterEdge, SpDCCols<int64_t, TwitterEdge > >(A, AT, DetSymmetricize(), false, TwitterEdge());
103 }
104 
105 #ifdef DETERMINISTIC
106 MTRand GlobalMT(1);
107 #else
108 MTRand GlobalMT;	// generate random numbers with Mersenne Twister
109 #endif
110 
111 
112 struct Twitter_obj_randomizer : public std::unary_function<TwitterEdge, TwitterEdge>
113 {
operator ()Twitter_obj_randomizer114   const TwitterEdge operator()(const TwitterEdge & x) const
115   {
116 	short mycount = 1;
117 	bool myfollow = 0;
118 	time_t mylatest = static_cast<int64_t>(GlobalMT.rand() * 10000);	// random.randrange(0,10000)
119 
120 	return TwitterEdge(mycount, myfollow, mylatest);
121   }
122 };
123 
124 struct Twitter_materialize: public std::binary_function<TwitterEdge, time_t, bool>
125 {
operator ()Twitter_materialize126 	bool operator()(const TwitterEdge & x, time_t sincedate) const
127 	{
128 		if(x.isRetwitter() && x.LastTweetBy(sincedate))
129 			return false;	// false if the edge is going to be kept
130 		else
131 			return true;	// true if the edge is to be pruned
132 	}
133 };
134 
135 //  def rand( verc ):
136 //	import random
137 //	return random.random()
138 struct randGen : public std::unary_function<double, double>
139 {
operator ()randGen140 	const double operator()(const double & ignore)
141 	{
142 		return GlobalMT.rand();
143 	}
144 };
145 
146 
main(int argc,char * argv[])147 int main(int argc, char* argv[])
148 {
149 	int nprocs, myrank;
150 #ifdef _OPENMP
151 	int cblas_splits = omp_get_max_threads();
152     	int provided, flag, claimed;
153     	MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided );
154     	MPI_Is_thread_main( &flag );
155     	if (!flag)
156         	SpParHelper::Print("This thread called init_thread but Is_thread_main gave false\n");
157     	MPI_Query_thread( &claimed );
158     	if (claimed != provided)
159         	SpParHelper::Print("Query thread gave different thread level than requested\n");
160 #else
161 	MPI_Init(&argc, &argv);
162 	int cblas_splits = 1;
163 #endif
164 
165 	MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
166 	MPI_Comm_rank(MPI_COMM_WORLD,&myrank);
167 	if(argc < 2)
168 	{
169 		if(myrank == 0)
170 		{
171 			cout << "Usage: ./FilteredMIS <Scale>" << endl;
172 		}
173 		MPI_Finalize();
174 		return -1;
175 	}
176 	{
177 		// Declare objects
178 		PSpMat_Twitter A;
179 		shared_ptr<CommGrid> fullWorld;
180 		fullWorld.reset( new CommGrid(MPI_COMM_WORLD, 0, 0) );
181 		FullyDistVec<int64_t, int64_t> indegrees(fullWorld);	// in-degrees of vertices (including multi-edges and self-loops)
182 		FullyDistVec<int64_t, int64_t> oudegrees(fullWorld);	// out-degrees of vertices (including multi-edges and self-loops)
183 		FullyDistVec<int64_t, int64_t> degrees(fullWorld);	// combined degrees of vertices (including multi-edges and self-loops)
184 		PSpMat_Bool * ABool;
185 
186 		SpParHelper::Print("Using synthetic data, which we ALWAYS permute for load balance\n");
187 		SpParHelper::Print("We only balance the original input, we don't repermute after each filter change\n");
188 		SpParHelper::Print("BFS is run on UNDIRECTED graph, hence hitting CCs, and TEPS is bidirectional\n");
189 
190 		double initiator[4] = {.25, .25, .25, .25};	// creating erdos-renyi
191 		double t01 = MPI_Wtime();
192 		DistEdgeList<int64_t> * DEL = new DistEdgeList<int64_t>();
193 
194 		unsigned scale = static_cast<unsigned>(atoi(argv[1]));
195 		ostringstream outs;
196 		outs << "Forcing scale to : " << scale << endl;
197 		SpParHelper::Print(outs.str());
198 
199 		// parameters: (double initiator[4], int log_numverts, int edgefactor, bool scramble, bool packed)
200 		DEL->GenGraph500Data(initiator, scale, EDGEFACTOR, true, true );	// generate packed edges
201 		SpParHelper::Print("Generated renamed edge lists\n");
202 
203 		ABool = new PSpMat_Bool(*DEL, false);
204 		int64_t removed  = ABool->RemoveLoops();
205 		ostringstream loopinfo;
206 		loopinfo << "Converted to Boolean and removed " << removed << " loops" << endl;
207 		SpParHelper::Print(loopinfo.str());
208 		ABool->PrintInfo();
209 		delete DEL;	// free memory
210 		A = PSpMat_Twitter(*ABool); // any upcasting generates the default object
211 
212 		double t02 = MPI_Wtime();
213 		ostringstream tinfo;
214 		tinfo << "Generation took " << t02-t01 << " seconds" << endl;
215 		SpParHelper::Print(tinfo.str());
216 
217 		// indegrees is sum along rows because A is loaded as "tranposed", similarly oudegrees is sum along columns
218 		ABool->PrintInfo();
219 		ABool->Reduce(oudegrees, Column, plus<int64_t>(), static_cast<int64_t>(0));
220 		ABool->Reduce(indegrees, Row, plus<int64_t>(), static_cast<int64_t>(0));
221 
222 		// indegrees_filt and oudegrees_filt is used for the real data
223 		FullyDistVec<int64_t, int64_t> indegrees_filt(fullWorld);
224 		FullyDistVec<int64_t, int64_t> oudegrees_filt(fullWorld);
225 
226 		typedef FullyDistVec<int64_t, int64_t> IntVec;	// used for the synthetic data (symmetricized before randomization)
227         	FullyDistVec<int64_t, int64_t> degrees_filt[4] = {IntVec(fullWorld), IntVec(fullWorld), IntVec(fullWorld), IntVec(fullWorld)};
228 		int64_t keep[PERCENTS] = {100, 1000, 2500, 10000}; 	// ratio of edges kept in range (0, 10000)
229 
230 		degrees = indegrees;
231 		degrees.EWiseApply(oudegrees, plus<int64_t>());
232 		SpParHelper::Print("All degrees calculated\n");
233 		delete ABool;
234 
235 		float balance = A.LoadImbalance();
236 		ostringstream outlb;
237 		outlb << "Load balance: " << balance << endl;
238 		SpParHelper::Print(outlb.str());
239 
240 		// We symmetricize before we apply the random generator
241 		// Otherwise += will naturally add the random numbers together
242 		// hence will create artificially high-permeable filters
243 		Symmetricize(A);	// A += A';
244 		SpParHelper::Print("Symmetricized\n");
245 
246 		A.Apply(Twitter_obj_randomizer());
247 		A.PrintInfo();
248 		SymmetricizeRands(A);
249 		SpParHelper::Print("Symmetricized Rands\n");
250 		A.PrintInfo();
251 
252 
253 		FullyDistVec<int64_t, int64_t> * nonisov = new FullyDistVec<int64_t, int64_t>(degrees.FindInds(bind2nd(greater<int64_t>(), 0)));
254 		SpParHelper::Print("Found (and permuted) non-isolated vertices\n");
255 		nonisov->RandPerm();	// so that A(v,v) is load-balanced (both memory and time wise)
256 		A(*nonisov, *nonisov, true);	// in-place permute to save memory
257 		SpParHelper::Print("Dropped isolated vertices from input\n");
258 
259 		indegrees = indegrees(*nonisov);	// fix the degrees arrays too
260 		oudegrees = oudegrees(*nonisov);
261 		degrees = degrees(*nonisov);
262 		delete nonisov;
263 
264 		for (int i=0; i < PERCENTS; i++)
265 		{
266 			PSpMat_Twitter B = A;
267 			B.Prune(bind2nd(Twitter_materialize(), keep[i]));
268 			PSpMat_Bool BBool = B;
269 			BBool.PrintInfo();
270 			float balance = B.LoadImbalance();
271 			ostringstream outs;
272 			outs << "Load balance of " << static_cast<float>(keep[i])/100 << "% filtered case: " << balance << endl;
273 			SpParHelper::Print(outs.str());
274 
275 			// degrees_filt[i] is by-default generated as permuted
276 			BBool.Reduce(degrees_filt[i], Column, plus<int64_t>(), static_cast<int64_t>(0));  // Column=Row since BBool is symmetric
277 		}
278 
279 		float balance_former = A.LoadImbalance();
280 		ostringstream outs_former;
281 		outs_former << "Load balance: " << balance_former << endl;
282 		SpParHelper::Print(outs_former.str());
283 
284 		for(int trials =0; trials < PERCENTS; trials++)
285 		{
286 			cblas_allgathertime = 0;
287 			cblas_alltoalltime = 0;
288 			double MISVS[ITERS]; // numbers of vertices in each MIS
289 			double TIMES[ITERS];
290 
291 			LatestRetwitterMIS::sincedate = keep[trials];
292 			LatestRetwitterSelect2nd::sincedate = keep[trials];
293 			ostringstream outs;
294 			outs << "Initializing since date (only once) to " << LatestRetwitterMIS::sincedate << endl;
295 			SpParHelper::Print(outs.str());
296 
297 			for(int sruns = 0; sruns < ITERS; ++sruns)
298 			{
299 				double t1 = MPI_Wtime();
300 				int64_t nvert = A.getncol();
301 
302 				//# the final result set. S[i] exists and is 1 if vertex i is in the MIS
303 				//S = Vec(nvert, sparse=True)
304 				FullyDistSpVec<int64_t, uint8_t> S ( A.getcommgrid(), nvert);
305 
306 				//# the candidate set. initially all vertices are candidates.
307 				//# this vector doubles as 'r', the random value vector.
308 				//# i.e. if C[i] exists, then i is a candidate. The value C[i] is i's r for this iteration.
309 				//C = Vec.ones(nvert, sparse=True)
310 				//FullyDistSpVec's length is not the same as its nnz
311 				//Since FullyDistSpVec::Apply only affects nonzeros, nnz should be forced to glen
312 				// FullyDistVec ( shared_ptr<CommGrid> grid, IT globallen, NT initval);
313 				FullyDistVec<int64_t, double> * denseC = new FullyDistVec<int64_t, double>( A.getcommgrid(), nvert, 1.0);
314 				FullyDistSpVec<int64_t, double> C ( *denseC);
315 				delete denseC;
316 
317 				FullyDistSpVec<int64_t, double> min_neighbor_r ( A.getcommgrid(), nvert);
318 				FullyDistSpVec<int64_t, uint8_t> new_S_members ( A.getcommgrid(), nvert);
319 				FullyDistSpVec<int64_t, uint8_t> new_S_neighbors ( A.getcommgrid(), nvert);
320 
321 				while (C.getnnz() > 0)
322 				{
323 					//# label each vertex in C with a random value
324 					C.Apply(randGen());
325 
326 					//# find the smallest random value among a vertex's neighbors
327 					//# In other words: min_neighbor_r[i] = min(C[j] for all neighbors j of vertex i)
328 					//min_neighbor_r = Gmatrix.SpMV(C, sr(myMin,select2nd)) # could use "min" directly
329 					SpMV<LatestRetwitterMIS>(A, C, min_neighbor_r, false);	// min_neighbor_r empty OK?
330 					#ifdef PRINTITERS
331 					min_neighbor_r.PrintInfo("Neighbors");
332 					#endif
333 
334 					#ifdef DEBUG
335 					min_neighbor_r.DebugPrint();
336 					C.DebugPrint();
337 					#endif
338 
339 					//# The vertices to be added to S this iteration are those whose random value is
340 					//# smaller than those of all its neighbors:
341 					//# new_S_members[i] exists if C[i] < min_neighbor_r[i]
342 					//# If C[i] exists and min_neighbor_r[i] doesn't, still a value is returned with bin_op(NULL,C[i])
343 					//new_S_members = min_neighbor_r.eWiseApply(C, return1, doOp=is2ndSmaller, allowANulls=True, allowBNulls=False, inPlace=False, ANull=2)
344 					new_S_members = EWiseApply<uint8_t>(min_neighbor_r, C, return1_uint8(), is2ndSmaller(), true, false, (double) 2.0,  (double) 2.0, true);
345 					//// template <typename RET, typename IU ...>
346 					//// FullyDistSpVec<IU,RET> EWiseApply (const FullyDistSpVec<IU,NU1> & V, const FullyDistSpVec<IU,NU2> & W , _BinaryOperation _binary_op, _BinaryPredicate _doOp,
347 					//// bool allowVNulls, bool allowWNulls, NU1 Vzero, NU2 Wzero, const bool allowIntersect = true);
348 					#ifdef PRINTITERS
349 					new_S_members.PrintInfo("New members of the MIS");
350 					#endif
351 
352 					#ifdef DEBUG
353 					new_S_members.DebugPrint();
354 					#endif
355 
356 					//# new_S_members are no longer candidates, so remove them from C
357 					//C.eWiseApply(new_S_members, return1, allowANulls=False, allowIntersect=False, allowBNulls=True, inPlace=True)
358 					C = EWiseApply<double>(C, new_S_members, return1_uint8(), return1_uint8(), false, true, (double) 0.0, (uint8_t) 0, false);
359 					#ifdef PRINTITERS
360 					C.PrintInfo("Entries to be removed from the Candidates set");
361 					#endif
362 
363 					//# find neighbors of new_S_members
364 					//new_S_neighbors = Gmatrix.SpMV(new_S_members, sr(select2nd,select2nd))
365 					SpMV<LatestRetwitterSelect2nd>(A, new_S_members, new_S_neighbors, false);
366 
367 					//# remove neighbors of new_S_members from C, because they cannot be part of the MIS anymore
368 					//# If C[i] exists and new_S_neighbors[i] doesn't, still a value is returned with bin_op(NULL,C[i])
369 					//C.eWiseApply(new_S_neighbors, return1, allowANulls=False, allowIntersect=False, allowBNulls=True, inPlace=True)
370 					C = EWiseApply<double>(C, new_S_neighbors, return1_uint8(), return1_uint8(), false, true, (double) 0.0, (uint8_t) 0, false);
371 					#ifdef PRINTITERS
372 					C.PrintInfo("Candidates set after neighbors of MIS removed");
373 					#endif
374 
375 					//# add new_S_members to S
376 					//S.eWiseApply(new_S_members, return1, allowANulls=True, allowBNulls=True, inPlace=True)
377 					S = EWiseApply<uint8_t>(S, new_S_members, return1_uint8(), return1_uint8(), true, true, (uint8_t) 1, (uint8_t) 1, true);
378 					S.PrintInfo("The current MIS:");
379 				}
380 				double t2 = MPI_Wtime();
381 
382 				ostringstream ositr;
383 				ositr << "MIS has " << S.getnnz() << " vertices" << endl;
384 				SpParHelper::Print(ositr.str());
385 
386 				ostringstream ositr2;
387 				ositr << "MIS time: " << t2-t1 << " seconds" << endl;
388 				SpParHelper::Print(ositr.str());
389 
390 				TIMES[sruns] = t2-t1;
391 				MISVS[sruns] = S.getnnz();
392 			} // end for(int sruns = 0; sruns < ITERS; ++sruns)
393 
394 			ostringstream os;
395 			os << "Per iteration communication times: " << endl;
396 			os << "AllGatherv: " << cblas_allgathertime / ITERS << endl;
397 			os << "AlltoAllv: " << cblas_alltoalltime / ITERS << endl;
398 
399 			sort(MISVS, MISVS+ITERS);
400 			os << "--------------------------" << endl;
401 			os << "Min MIS vertices: " << MISVS[0] << endl;
402 			os << "Median MIS vertices: " << (MISVS[(ITERS/2)-1] + MISVS[ITERS/2])/2 << endl;
403 			os << "Max MIS vertices: " << MISVS[ITERS-1] << endl;
404 			double mean = accumulate( MISVS, MISVS+ITERS, 0.0 )/ ITERS;
405 			vector<double> zero_mean(ITERS);	// find distances to the mean
406 			transform(MISVS, MISVS+ITERS, zero_mean.begin(), bind2nd( minus<double>(), mean ));
407 			// self inner-product is sum of sum of squares
408 			double deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
409 			deviation = sqrt( deviation / (ITERS-1) );
410 			os << "Mean MIS vertices: " << mean << endl;
411 			os << "STDDEV MIS vertices: " << deviation << endl;
412 			os << "--------------------------" << endl;
413 
414 			sort(TIMES,TIMES+ITERS);
415 			os << "Filter keeps " << static_cast<double>(keep[trials])/100.0 << " percentage of edges" << endl;
416 			os << "Min time: " << TIMES[0] << " seconds" << endl;
417 			os << "Median time: " << (TIMES[(ITERS/2)-1] + TIMES[ITERS/2])/2 << " seconds" << endl;
418 			os << "Max time: " << TIMES[ITERS-1] << " seconds" << endl;
419 			mean = accumulate( TIMES, TIMES+ITERS, 0.0 )/ ITERS;
420 			transform(TIMES, TIMES+ITERS, zero_mean.begin(), bind2nd( minus<double>(), mean ));
421 			deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
422 			deviation = sqrt( deviation / (ITERS-1) );
423 			os << "Mean time: " << mean << " seconds" << endl;
424 			os << "STDDEV time: " << deviation << " seconds" << endl;
425 			os << "--------------------------" << endl;
426 			SpParHelper::Print(os.str());
427 		}	// end for(int trials =0; trials < PERCENTS; trials++)
428 	}
429 	MPI_Finalize();
430 	return 0;
431 }
432 
433