1 /****************************************************************/
2 /* Parallel Combinatorial BLAS Library (for Graph Computations) */
3 /* version 1.6 -------------------------------------------------*/
4 /* date: 6/15/2017 ---------------------------------------------*/
5 /* authors: Ariful Azad, Aydin Buluc  --------------------------*/
6 /****************************************************************/
7 /*
8  Copyright (c) 2010-2017, The Regents of the University of California
9 
10  Permission is hereby granted, free of charge, to any person obtaining a copy
11  of this software and associated documentation files (the "Software"), to deal
12  in the Software without restriction, including without limitation the rights
13  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  copies of the Software, and to permit persons to whom the Software is
15  furnished to do so, subject to the following conditions:
16 
17  The above copyright notice and this permission notice shall be included in
18  all copies or substantial portions of the Software.
19 
20  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  THE SOFTWARE.
27  */
28 
29 
30 #include "usort/parUtils.h"
31 
32 namespace combblas {
33 
34 template <typename IT>
ReDistributeToVector(int * & map_scnt,std::vector<std::vector<IT>> & locs_send,std::vector<std::vector<std::string>> & data_send,std::vector<std::array<char,MAXVERTNAME>> & distmapper_array,const MPI_Comm & comm)35 void SpParHelper::ReDistributeToVector(int* & map_scnt, std::vector< std::vector< IT > > & locs_send, std::vector< std::vector< std::string > > & data_send,
36 					std::vector<std::array<char, MAXVERTNAME>> & distmapper_array, const MPI_Comm & comm)
37 {
38 	int nprocs, myrank;
39 	MPI_Comm_size(comm, &nprocs);
40 	MPI_Comm_rank(comm, &myrank);
41 
42     	int * map_rcnt = new int[nprocs];
43     	MPI_Alltoall(map_scnt, 1, MPI_INT, map_rcnt, 1, MPI_INT, comm);
44     	int * map_sdspl = new int[nprocs]();
45     	int * map_rdspl = new int[nprocs]();
46     	std::partial_sum(map_scnt, map_scnt+nprocs-1, map_sdspl+1);
47     	std::partial_sum(map_rcnt, map_rcnt+nprocs-1, map_rdspl+1);
48     	IT totmapsend = map_sdspl[nprocs-1] + map_scnt[nprocs-1];
49     	IT totmaprecv = map_rdspl[nprocs-1] + map_rcnt[nprocs-1];
50 
51     	// sendbuf is a pointer to array of MAXVERTNAME chars.
52     	// Explicit grouping syntax is due to precedence of [] over *
53     	// char* sendbuf[MAXVERTNAME] would have declared a MAXVERTNAME-length array of char pointers
54     	char (*sendbuf)[MAXVERTNAME];	// each sendbuf[i] is type char[MAXVERTNAME]
55     	sendbuf = (char (*)[MAXVERTNAME]) malloc(sizeof(char[MAXVERTNAME])* totmapsend);	// notice that this is allocating a contiguous block of memory
56 
57     	IT * sendinds =  new IT[totmapsend];
58     	for(int i=0; i<nprocs; ++i)
59     	{
60 	    int loccnt = 0;
61 	    for(std::string s:data_send[i])
62 	    {
63 		    std::strcpy(sendbuf[map_sdspl[i]+loccnt], s.c_str());
64 		    loccnt++;
65 	    }
66 	    std::vector<std::string>().swap(data_send[i]);	// free memory
67     	}
68     	for(int i=0; i<nprocs; ++i)	      // sanity check: received indices should be sorted by definition
69     	{
70           std::copy(locs_send[i].begin(), locs_send[i].end(), sendinds+map_sdspl[i]);
71         	std::vector<IT>().swap(locs_send[i]);	// free memory
72     	}
73 
74     	char (*recvbuf)[MAXVERTNAME];	// recvbuf is of type char (*)[MAXVERTNAME]
75     	recvbuf = (char (*)[MAXVERTNAME]) malloc(sizeof(char[MAXVERTNAME])* totmaprecv);
76 
77    	MPI_Datatype MPI_STRING;	// this is not necessary (we could just use char) but easier for bookkeeping
78    	MPI_Type_contiguous(sizeof(char[MAXVERTNAME]), MPI_CHAR, &MPI_STRING);
79     	MPI_Type_commit(&MPI_STRING);
80 
81     	MPI_Alltoallv(sendbuf, map_scnt, map_sdspl, MPI_STRING, recvbuf, map_rcnt, map_rdspl, MPI_STRING, comm);
82     	free(sendbuf);	// can't delete[] so use free
83     	MPI_Type_free(&MPI_STRING);
84 
85     	IT * recvinds = new IT[totmaprecv];
86     	MPI_Alltoallv(sendinds, map_scnt, map_sdspl, MPIType<IT>(), recvinds, map_rcnt, map_rdspl, MPIType<IT>(), comm);
87     	DeleteAll(sendinds, map_scnt, map_sdspl, map_rcnt, map_rdspl);
88 
89     	if(!std::is_sorted(recvinds, recvinds+totmaprecv))
90 	    	std::cout << "Assertion failed at proc " << myrank << ": Received indices are not sorted, this is unexpected" << std::endl;
91 
92     	for(IT i=0; i< totmaprecv; ++i)
93     	{
94 		assert(i == recvinds[i]);
95 		std::copy(recvbuf[i], recvbuf[i]+MAXVERTNAME, distmapper_array[i].begin());
96     	}
97     	free(recvbuf);
98     	delete [] recvinds;
99 }
100 
101 
102 template<typename KEY, typename VAL, typename IT>
MemoryEfficientPSort(std::pair<KEY,VAL> * array,IT length,IT * dist,const MPI_Comm & comm)103 void SpParHelper::MemoryEfficientPSort(std::pair<KEY,VAL> * array, IT length, IT * dist, const MPI_Comm & comm)
104 {
105 	int nprocs, myrank;
106 	MPI_Comm_size(comm, &nprocs);
107 	MPI_Comm_rank(comm, &myrank);
108 	int nsize = nprocs / 2;	// new size
109 	if(nprocs < 10000)
110 	{
111 		bool excluded =  false;
112 		if(dist[myrank] == 0)	excluded = true;
113 
114 		int nreals = 0;
115 		for(int i=0; i< nprocs; ++i)
116 			if(dist[i] != 0) ++nreals;
117 
118         //SpParHelper::MemoryEfficientPSort(vecpair, nnz, dist, World);
119 
120 		if(nreals == nprocs)	// general case
121 		{
122             long * dist_in = new long[nprocs];
123             for(int i=0; i< nprocs; ++i)    dist_in[i] = (long) dist[i];
124             vpsort::parallel_sort (array, array+length,  dist_in, comm);
125             delete [] dist_in;
126 		}
127 		else
128 		{
129 			long * dist_in = new long[nreals];
130 			int * dist_out = new int[nprocs-nreals];	// ranks to exclude
131 			int indin = 0;
132 			int indout = 0;
133 			for(int i=0; i< nprocs; ++i)
134 			{
135 				if(dist[i] == 0)
136 					dist_out[indout++] = i;
137 				else
138 					dist_in[indin++] = (long) dist[i];
139 			}
140 
141 			#ifdef DEBUG
142       std::ostringstream outs;
143 			outs << "To exclude indices: ";
144       std::copy(dist_out, dist_out+indout, std::ostream_iterator<int>(outs, " ")); outs << std::endl;
145 			SpParHelper::Print(outs.str());
146 			#endif
147 
148 			MPI_Group sort_group, real_group;
149 			MPI_Comm_group(comm, &sort_group);
150 			MPI_Group_excl(sort_group, indout, dist_out, &real_group);
151 			MPI_Group_free(&sort_group);
152 
153 			// The Create() function should be executed by all processes in comm,
154 			// even if they do not belong to the new group (in that case MPI_COMM_NULL is returned as real_comm?)
155 			// MPI::Intracomm MPI::Intracomm::Create(const MPI::Group& group) const;
156 			MPI_Comm real_comm;
157 			MPI_Comm_create(comm, real_group, &real_comm);
158 			if(!excluded)
159 			{
160 				vpsort::parallel_sort (array, array+length,  dist_in, real_comm);
161 				MPI_Comm_free(&real_comm);
162 			}
163 			MPI_Group_free(&real_group);
164 			delete [] dist_in;
165 			delete [] dist_out;
166 		}
167 	}
168 	else
169 	{
170 		IT gl_median = std::accumulate(dist, dist+nsize, static_cast<IT>(0));	// global rank of the first element of the median processor
171 		sort(array, array+length);	// re-sort because we might have swapped data in previous iterations
172 		int color = (myrank < nsize)? 0: 1;
173 
174 		std::pair<KEY,VAL> * low = array;
175 		std::pair<KEY,VAL> * upp = array;
176 		GlobalSelect(gl_median, low, upp, array, length, comm);
177 		BipartiteSwap(low, array, length, nsize, color, comm);
178 
179 		if(color == 1)	dist = dist + nsize;	// adjust for the second half of processors
180 
181 		// recursive call; two implicit 'spawn's where half of the processors execute different paramaters
182 		// MPI::Intracomm MPI::Intracomm::Split(int color, int key) const;
183 
184 		MPI_Comm halfcomm;
185 		MPI_Comm_split(comm, color, myrank, &halfcomm);	// split into two communicators
186 		MemoryEfficientPSort(array, length, dist, halfcomm);
187 	}
188 
189 }
190 
191 
192 /*
193  TODO: This function is just a hack at this moment.
194  The payload (VAL) can only be integer at this moment.
195  FIX this.
196  */
197 template<typename KEY, typename VAL, typename IT>
KeyValuePSort(std::pair<KEY,VAL> * array,IT length,IT * dist,const MPI_Comm & comm)198 std::vector<std::pair<KEY,VAL>> SpParHelper::KeyValuePSort(std::pair<KEY,VAL> * array, IT length, IT * dist, const MPI_Comm & comm)
199 {
200     int nprocs, myrank;
201     MPI_Comm_size(comm, &nprocs);
202     MPI_Comm_rank(comm, &myrank);
203     int nsize = nprocs / 2;	// new size
204 
205 
206 
207     bool excluded =  false;
208     if(dist[myrank] == 0)	excluded = true;
209 
210     int nreals = 0;
211     for(int i=0; i< nprocs; ++i)
212         if(dist[i] != 0) ++nreals;
213 
214     std::vector<IndexHolder<KEY>> in(length);
215 #ifdef THREADED
216 #pragma omp parallel for
217 #endif
218     for(int i=0; i< length; ++i)
219     {
220         in[i] = IndexHolder<KEY>(array[i].first, static_cast<unsigned long>(array[i].second));
221     }
222 
223     if(nreals == nprocs)	// general case
224     {
225         par::sampleSort(in, comm);
226     }
227     else
228     {
229         long * dist_in = new long[nreals];
230         int * dist_out = new int[nprocs-nreals];	// ranks to exclude
231         int indin = 0;
232         int indout = 0;
233         for(int i=0; i< nprocs; ++i)
234         {
235             if(dist[i] == 0)
236                 dist_out[indout++] = i;
237             else
238                 dist_in[indin++] = (long) dist[i];
239         }
240 
241 #ifdef DEBUG
242         std::ostringstream outs;
243         outs << "To exclude indices: ";
244         std::copy(dist_out, dist_out+indout, std::ostream_iterator<int>(outs, " ")); outs << std::endl;
245         SpParHelper::Print(outs.str());
246 #endif
247 
248         MPI_Group sort_group, real_group;
249         MPI_Comm_group(comm, &sort_group);
250         MPI_Group_excl(sort_group, indout, dist_out, &real_group);
251         MPI_Group_free(&sort_group);
252 
253         // The Create() function should be executed by all processes in comm,
254         // even if they do not belong to the new group (in that case MPI_COMM_NULL is returned as real_comm?)
255         // MPI::Intracomm MPI::Intracomm::Create(const MPI::Group& group) const;
256         MPI_Comm real_comm;
257         MPI_Comm_create(comm, real_group, &real_comm);
258         if(!excluded)
259         {
260             par::sampleSort(in, real_comm);
261             MPI_Comm_free(&real_comm);
262         }
263         MPI_Group_free(&real_group);
264         delete [] dist_in;
265         delete [] dist_out;
266     }
267 
268     std::vector<std::pair<KEY,VAL>> sorted(in.size());
269     for(int i=0; i<in.size(); i++)
270     {
271         sorted[i].second = static_cast<VAL>(in[i].index);
272         sorted[i].first = in[i].value;
273     }
274     return sorted;
275 }
276 
277 
278 template<typename KEY, typename VAL, typename IT>
GlobalSelect(IT gl_rank,std::pair<KEY,VAL> * & low,std::pair<KEY,VAL> * & upp,std::pair<KEY,VAL> * array,IT length,const MPI_Comm & comm)279 void SpParHelper::GlobalSelect(IT gl_rank, std::pair<KEY,VAL> * & low,  std::pair<KEY,VAL> * & upp, std::pair<KEY,VAL> * array, IT length, const MPI_Comm & comm)
280 {
281 	int nprocs, myrank;
282 	MPI_Comm_size(comm, &nprocs);
283 	MPI_Comm_rank(comm, &myrank);
284 	IT begin = 0;
285 	IT end = length;	// initially everyone is active
286 	std::pair<KEY, double> * wmminput = new std::pair<KEY,double>[nprocs];	// (median, #{actives})
287 
288 	MPI_Datatype MPI_sortType;
289 	MPI_Type_contiguous (sizeof(std::pair<KEY,double>), MPI_CHAR, &MPI_sortType);
290 	MPI_Type_commit (&MPI_sortType);
291 
292 	KEY wmm;	// our median pick
293 	IT gl_low, gl_upp;
294 	IT active = end-begin;				// size of the active range
295 	IT nacts = 0;
296 	bool found = 0;
297 	int iters = 0;
298 
299 	/* changes by shan : to add begin0 and end0 for exit condition */
300 	IT begin0, end0;
301 	do
302 	{
303 		iters++;
304 		begin0 = begin; end0 = end;
305 		KEY median = array[(begin + end)/2].first; 	// median of the active range
306 		wmminput[myrank].first = median;
307 		wmminput[myrank].second = static_cast<double>(active);
308 		MPI_Allgather(MPI_IN_PLACE, 0, MPI_sortType, wmminput, 1, MPI_sortType, comm);
309 		double totact = 0;	// total number of active elements
310 		for(int i=0; i<nprocs; ++i)
311 			totact += wmminput[i].second;
312 
313 		// input to weighted median of medians is a set of (object, weight) pairs
314 		// the algorithm computes the first set of elements (according to total
315 		// order of "object"s), whose sum is still less than or equal to 1/2
316 		for(int i=0; i<nprocs; ++i)
317 			wmminput[i].second /= totact ;	// normalize the weights
318 
319 		sort(wmminput, wmminput+nprocs);	// sort w.r.t. medians
320 		double totweight = 0;
321 		int wmmloc=0;
322 		while( wmmloc<nprocs && totweight < 0.5 )
323 		{
324 			totweight += wmminput[wmmloc++].second;
325 		}
326 
327         	wmm = wmminput[wmmloc-1].first;	// weighted median of medians
328 
329 		std::pair<KEY,VAL> wmmpair = std::make_pair(wmm, VAL());
330 		low =std::lower_bound (array+begin, array+end, wmmpair);
331 		upp =std::upper_bound (array+begin, array+end, wmmpair);
332 		IT loc_low = low-array;	// #{elements smaller than wmm}
333 		IT loc_upp = upp-array;	// #{elements smaller or equal to wmm}
334 
335 		MPI_Allreduce( &loc_low, &gl_low, 1, MPIType<IT>(), MPI_SUM, comm);
336 		MPI_Allreduce( &loc_upp, &gl_upp, 1, MPIType<IT>(), MPI_SUM, comm);
337 
338 		if(gl_upp < gl_rank)
339 		{
340 			// our pick was too small; only recurse to the right
341 			begin = (low - array);
342 		}
343 		else if(gl_rank < gl_low)
344 		{
345 			// our pick was too big; only recurse to the left
346 			end = (upp - array);
347 		}
348 		else
349 		{
350 			found = true;
351 		}
352 		active = end-begin;
353 		MPI_Allreduce(&active, &nacts, 1, MPIType<IT>(), MPI_SUM, comm);
354 		if (begin0 == begin && end0 == end) break;  // ABAB: Active range did not shrink, so we break (is this kosher?)
355 	}
356 	while((nacts > 2*nprocs) && (!found));
357 	delete [] wmminput;
358 
359     	MPI_Datatype MPI_pairType;
360 	MPI_Type_contiguous (sizeof(std::pair<KEY,VAL>), MPI_CHAR, &MPI_pairType);
361 	MPI_Type_commit (&MPI_pairType);
362 
363 	int * nactives = new int[nprocs];
364 	nactives[myrank] = static_cast<int>(active);	// At this point, actives are small enough
365 	MPI_Allgather(MPI_IN_PLACE, 0, MPI_INT, nactives, 1, MPI_INT, comm);
366 	int * dpls = new int[nprocs]();	// displacements (zero initialized pid)
367 	std::partial_sum(nactives, nactives+nprocs-1, dpls+1);
368 	std::pair<KEY,VAL> * recvbuf = new std::pair<KEY,VAL>[nacts];
369 	low = array + begin;	// update low to the beginning of the active range
370 	MPI_Allgatherv(low, active, MPI_pairType, recvbuf, nactives, dpls, MPI_pairType, comm);
371 
372 	std::pair<KEY,int> * allactives = new std::pair<KEY,int>[nacts];
373 	int k = 0;
374 	for(int i=0; i<nprocs; ++i)
375 	{
376 		for(int j=0; j<nactives[i]; ++j)
377 		{
378 			allactives[k] = std::make_pair(recvbuf[k].first, i);
379 			k++;
380 		}
381 	}
382 	DeleteAll(recvbuf, dpls, nactives);
383 	sort(allactives, allactives+nacts);
384 	MPI_Allreduce(&begin, &gl_low, 1, MPIType<IT>(), MPI_SUM, comm);        // update
385 	int diff = gl_rank - gl_low;
386 	for(int k=0; k < diff; ++k)
387 	{
388 		if(allactives[k].second == myrank)
389 			++low;	// increment the local pointer
390 	}
391 	delete [] allactives;
392 	begin = low-array;
393 	MPI_Allreduce(&begin, &gl_low, 1, MPIType<IT>(), MPI_SUM, comm);        // update
394 }
395 
396 template<typename KEY, typename VAL, typename IT>
BipartiteSwap(std::pair<KEY,VAL> * low,std::pair<KEY,VAL> * array,IT length,int nfirsthalf,int color,const MPI_Comm & comm)397 void SpParHelper::BipartiteSwap(std::pair<KEY,VAL> * low, std::pair<KEY,VAL> * array, IT length, int nfirsthalf, int color, const MPI_Comm & comm)
398 {
399 	int nprocs, myrank;
400 	MPI_Comm_size(comm, &nprocs);
401 	MPI_Comm_rank(comm, &myrank);
402 
403 	IT * firsthalves = new IT[nprocs];
404 	IT * secondhalves = new IT[nprocs];
405 	firsthalves[myrank] = low-array;
406 	secondhalves[myrank] = length - (low-array);
407 
408 	MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), firsthalves, 1, MPIType<IT>(), comm);
409 	MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), secondhalves, 1, MPIType<IT>(), comm);
410 
411 	int * sendcnt = new int[nprocs]();	// zero initialize
412 	int totrecvcnt = 0;
413 
414 	std::pair<KEY,VAL> * bufbegin = NULL;
415 	if(color == 0)	// first processor half, only send second half of data
416 	{
417 		bufbegin = low;
418 		totrecvcnt = length - (low-array);
419 		IT beg_oftransfer = std::accumulate(secondhalves, secondhalves+myrank, static_cast<IT>(0));
420 		IT spaceafter = firsthalves[nfirsthalf];
421 		int i=nfirsthalf+1;
422 		while(i < nprocs && spaceafter < beg_oftransfer)
423 		{
424 			spaceafter += firsthalves[i++];		// post-incremenet
425 		}
426 		IT end_oftransfer = beg_oftransfer + secondhalves[myrank];	// global index (within second half) of the end of my data
427 		IT beg_pour = beg_oftransfer;
428 		IT end_pour = std::min(end_oftransfer, spaceafter);
429 		sendcnt[i-1] = end_pour - beg_pour;
430 		while( i < nprocs && spaceafter < end_oftransfer )	// find other recipients until I run out of data
431 		{
432 			beg_pour = end_pour;
433 			spaceafter += firsthalves[i];
434 			end_pour = std::min(end_oftransfer, spaceafter);
435 			sendcnt[i++] = end_pour - beg_pour;	// post-increment
436 		}
437 	}
438 	else if(color == 1)	// second processor half, only send first half of data
439 	{
440 		bufbegin = array;
441 		totrecvcnt = low-array;
442 		// global index (within the second processor half) of the beginning of my data
443 		IT beg_oftransfer = std::accumulate(firsthalves+nfirsthalf, firsthalves+myrank, static_cast<IT>(0));
444 		IT spaceafter = secondhalves[0];
445 		int i=1;
446 		while( i< nfirsthalf && spaceafter < beg_oftransfer)
447 		{
448 			//spacebefore = spaceafter;
449 			spaceafter += secondhalves[i++];	// post-increment
450 		}
451 		IT end_oftransfer = beg_oftransfer + firsthalves[myrank];	// global index (within second half) of the end of my data
452 		IT beg_pour = beg_oftransfer;
453 		IT end_pour = std::min(end_oftransfer, spaceafter);
454 		sendcnt[i-1] = end_pour - beg_pour;
455 		while( i < nfirsthalf && spaceafter < end_oftransfer )	// find other recipients until I run out of data
456 		{
457 			beg_pour = end_pour;
458 			spaceafter += secondhalves[i];
459 			end_pour = std::min(end_oftransfer, spaceafter);
460 			sendcnt[i++] = end_pour - beg_pour;	// post-increment
461 		}
462 	}
463 	DeleteAll(firsthalves, secondhalves);
464 	int * recvcnt = new int[nprocs];
465 	MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, comm);   // get the recv counts
466 	// Alltoall is actually unnecessary, because sendcnt = recvcnt
467 	// If I have n_mine > n_yours data to send, then I can send you only n_yours
468 	// as this is your space, and you'll send me identical amount.
469 	// Then I can only receive n_mine - n_yours from the third processor and
470 	// that processor can only send n_mine - n_yours to me back.
471 	// The proof follows from induction
472 
473 	MPI_Datatype MPI_valueType;
474 	MPI_Type_contiguous(sizeof(std::pair<KEY,VAL>), MPI_CHAR, &MPI_valueType);
475 	MPI_Type_commit(&MPI_valueType);
476 
477 	std::pair<KEY,VAL> * receives = new std::pair<KEY,VAL>[totrecvcnt];
478 	int * sdpls = new int[nprocs]();	// displacements (zero initialized pid)
479 	int * rdpls = new int[nprocs]();
480 	std::partial_sum(sendcnt, sendcnt+nprocs-1, sdpls+1);
481 	std::partial_sum(recvcnt, recvcnt+nprocs-1, rdpls+1);
482 
483 	MPI_Alltoallv(bufbegin, sendcnt, sdpls, MPI_valueType, receives, recvcnt, rdpls, MPI_valueType, comm);  // sparse swap
484 
485 	DeleteAll(sendcnt, recvcnt, sdpls, rdpls);
486   std::copy(receives, receives+totrecvcnt, bufbegin);
487 	delete [] receives;
488 }
489 
490 
491 template<typename KEY, typename VAL, typename IT>
DebugPrintKeys(std::pair<KEY,VAL> * array,IT length,IT * dist,MPI_Comm & World)492 void SpParHelper::DebugPrintKeys(std::pair<KEY,VAL> * array, IT length, IT * dist, MPI_Comm & World)
493 {
494 	int rank, nprocs;
495 	MPI_Comm_rank(World, &rank);
496 	MPI_Comm_size(World, &nprocs);
497 	MPI_File thefile;
498 
499     char _fn[] = "temp_sortedkeys"; // AL: this is to avoid the problem that C++ string literals are const char* while C string literals are char*, leading to a const warning (technically error, but compilers are tolerant)
500 	MPI_File_open(World, _fn, MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &thefile);
501 
502 	// The cast in the last parameter is crucial because the signature of the function is
503    	// T accumulate ( InputIterator first, InputIterator last, T init )
504 	// Hence if init if of type "int", the output also becomes an it (remember C++ signatures are ignorant of return value)
505 	IT sizeuntil = std::accumulate(dist, dist+rank, static_cast<IT>(0));
506 
507 	MPI_Offset disp = sizeuntil * sizeof(KEY);	// displacement is in bytes
508     	MPI_File_set_view(thefile, disp, MPIType<KEY>(), MPIType<KEY>(), "native", MPI_INFO_NULL);
509 
510 	KEY * packed = new KEY[length];
511 	for(int i=0; i<length; ++i)
512 	{
513 		packed[i] = array[i].first;
514 	}
515 	MPI_File_write(thefile, packed, length, MPIType<KEY>(), NULL);
516 	MPI_File_close(&thefile);
517 	delete [] packed;
518 
519 	// Now let processor-0 read the file and print
520 	if(rank == 0)
521 	{
522 		FILE * f = fopen("temp_sortedkeys", "r");
523                 if(!f)
524                 {
525                         std::cerr << "Problem reading binary input file\n";
526                         return;
527                 }
528 		IT maxd = *std::max_element(dist, dist+nprocs);
529 		KEY * data = new KEY[maxd];
530 
531 		for(int i=0; i<nprocs; ++i)
532 		{
533 			// read n_per_proc integers and print them
534 			fread(data, sizeof(KEY), dist[i],f);
535 
536 			std::cout << "Elements stored on proc " << i << ": " << std::endl;
537 			std::copy(data, data+dist[i], std::ostream_iterator<KEY>(std::cout, "\n"));
538 		}
539 		delete [] data;
540 	}
541 }
542 
543 
544 /**
545   * @param[in,out] MRecv {an already existing, but empty SpMat<...> object}
546   * @param[in] essentials {carries essential information (i.e. required array sizes) about ARecv}
547   * @param[in] arrwin {windows array of size equal to the number of built-in arrays in the SpMat data structure}
548   * @param[in] ownind {processor index (within this processor row/column) of the owner of the matrix to be received}
549   * @remark {The communicator information is implicitly contained in the MPI::Win objects}
550  **/
551 template <class IT, class NT, class DER>
FetchMatrix(SpMat<IT,NT,DER> & MRecv,const std::vector<IT> & essentials,std::vector<MPI_Win> & arrwin,int ownind)552 void SpParHelper::FetchMatrix(SpMat<IT,NT,DER> & MRecv, const std::vector<IT> & essentials, std::vector<MPI_Win> & arrwin, int ownind)
553 {
554 	MRecv.Create(essentials);		// allocate memory for arrays
555 
556 	Arr<IT,NT> arrinfo = MRecv.GetArrays();
557 	assert( (arrwin.size() == arrinfo.totalsize()));
558 
559 	// C-binding for MPI::Get
560 	//	int MPI_Get(void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp,
561         //    		int target_count, MPI_Datatype target_datatype, MPI_Win win)
562 
563 	IT essk = 0;
564 	for(int i=0; i< arrinfo.indarrs.size(); ++i)	// get index arrays
565 	{
566 		//arrwin[essk].Lock(MPI::LOCK_SHARED, ownind, 0);
567 		MPI_Get( arrinfo.indarrs[i].addr, arrinfo.indarrs[i].count, MPIType<IT>(), ownind, 0, arrinfo.indarrs[i].count, MPIType<IT>(), arrwin[essk++]);
568 	}
569 	for(int i=0; i< arrinfo.numarrs.size(); ++i)	// get numerical arrays
570 	{
571 		//arrwin[essk].Lock(MPI::LOCK_SHARED, ownind, 0);
572 		MPI_Get(arrinfo.numarrs[i].addr, arrinfo.numarrs[i].count, MPIType<NT>(), ownind, 0, arrinfo.numarrs[i].count, MPIType<NT>(), arrwin[essk++]);
573 	}
574 }
575 
576 
577 /**
578   * @param[in] Matrix {For the root processor, the local object to be sent to all others.
579   * 		For all others, it is a (yet) empty object to be filled by the received data}
580   * @param[in] essentials {irrelevant for the root}
581  **/
582 template<typename IT, typename NT, typename DER>
BCastMatrix(MPI_Comm & comm1d,SpMat<IT,NT,DER> & Matrix,const std::vector<IT> & essentials,int root)583 void SpParHelper::BCastMatrix(MPI_Comm & comm1d, SpMat<IT,NT,DER> & Matrix, const std::vector<IT> & essentials, int root)
584 {
585 	int myrank;
586 	MPI_Comm_rank(comm1d, &myrank);
587 	if(myrank != root)
588 	{
589 		Matrix.Create(essentials);		// allocate memory for arrays
590 	}
591 
592 	Arr<IT,NT> arrinfo = Matrix.GetArrays();
593 	for(unsigned int i=0; i< arrinfo.indarrs.size(); ++i)	// get index arrays
594 	{
595 		MPI_Bcast(arrinfo.indarrs[i].addr, arrinfo.indarrs[i].count, MPIType<IT>(), root, comm1d);
596 	}
597 	for(unsigned int i=0; i< arrinfo.numarrs.size(); ++i)	// get numerical arrays
598 	{
599 		MPI_Bcast(arrinfo.numarrs[i].addr, arrinfo.numarrs[i].count, MPIType<NT>(), root, comm1d);
600 	}
601 }
602 
603 
604 /**
605  * Just a test function to see the time to gather a matrix on an MPI process
606  * The ultimate object would be to create the whole matrix on rank 0 (TODO)
607  * @param[in] Matrix {For the root processor, the local object to be sent to all others.
608  * 		For all others, it is a (yet) empty object to be filled by the received data}
609  * @param[in] essentials {irrelevant for the root}
610  **/
611 template<typename IT, typename NT, typename DER>
GatherMatrix(MPI_Comm & comm1d,SpMat<IT,NT,DER> & Matrix,int root)612 void SpParHelper::GatherMatrix(MPI_Comm & comm1d, SpMat<IT,NT,DER> & Matrix, int root)
613 {
614     int myrank, nprocs;
615     MPI_Comm_rank(comm1d, &myrank);
616     MPI_Comm_size(comm1d,&nprocs);
617 
618     /*
619     if(myrank != root)
620     {
621         Matrix.Create(essentials);		// allocate memory for arrays
622     }
623      */
624 
625     Arr<IT,NT> arrinfo = Matrix.GetArrays();
626     std::vector<std::vector<int>> recvcnt_ind(arrinfo.indarrs.size());
627     std::vector<std::vector<int>> recvcnt_num(arrinfo.numarrs.size());
628     for(unsigned int i=0; i< arrinfo.indarrs.size(); ++i)	// get index arrays
629     {
630         recvcnt_ind[i].resize(nprocs);
631         int lcount = (int)arrinfo.indarrs[i].count;
632         MPI_Gather(&lcount, 1, MPI_INT, recvcnt_ind[i].data(),1, MPI_INT, root, comm1d);
633     }
634     for(unsigned int i=0; i< arrinfo.numarrs.size(); ++i)	// get numerical arrays
635     {
636         recvcnt_num[i].resize(nprocs);
637         int lcount = (int) arrinfo.numarrs[i].count;
638         MPI_Gather(&lcount, 1, MPI_INT, recvcnt_num[i].data(),1, MPI_INT, root, comm1d);
639     }
640 
641     // now gather the actual vector
642     std::vector<std::vector<int>> recvdsp_ind(arrinfo.indarrs.size());
643     std::vector<std::vector<int>> recvdsp_num(arrinfo.numarrs.size());
644     std::vector<std::vector<IT>> recvind(arrinfo.indarrs.size());
645     std::vector<std::vector<IT>> recvnum(arrinfo.numarrs.size());
646     for(unsigned int i=0; i< arrinfo.indarrs.size(); ++i)	// get index arrays
647     {
648         recvdsp_ind[i].resize(nprocs);
649         recvdsp_ind[i][0] = 0;
650         for(int j=1; j<nprocs; j++)
651             recvdsp_ind[i][j] = recvdsp_ind[i][j-1] + recvcnt_ind[i][j-1];
652         recvind[i].resize(recvdsp_ind[i][nprocs-1] + recvcnt_ind[i][nprocs-1]);
653         MPI_Gatherv(arrinfo.indarrs[i].addr, arrinfo.indarrs[i].count, MPIType<IT>(), recvind[i].data(),recvcnt_ind[i].data(), recvdsp_ind[i].data(), MPIType<IT>(), root, comm1d);
654     }
655 
656 
657     for(unsigned int i=0; i< arrinfo.numarrs.size(); ++i)	// gather num arrays
658     {
659         recvdsp_num[i].resize(nprocs);
660         recvdsp_num[i][0] = 0;
661         for(int j=1; j<nprocs; j++)
662             recvdsp_num[i][j] = recvdsp_num[i][j-1] + recvcnt_num[i][j-1];
663         recvnum[i].resize(recvdsp_num[i][nprocs-1] + recvcnt_num[i][nprocs-1]);
664         MPI_Gatherv(arrinfo.numarrs[i].addr, arrinfo.numarrs[i].count, MPIType<NT>(), recvnum[i].data(),recvcnt_num[i].data(), recvdsp_num[i].data(), MPIType<NT>(), root, comm1d);
665     }
666 }
667 
668 
669 template <class IT, class NT, class DER>
SetWindows(MPI_Comm & comm1d,const SpMat<IT,NT,DER> & Matrix,std::vector<MPI_Win> & arrwin)670 void SpParHelper::SetWindows(MPI_Comm & comm1d, const SpMat< IT,NT,DER > & Matrix, std::vector<MPI_Win> & arrwin)
671 {
672 	Arr<IT,NT> arrs = Matrix.GetArrays();
673 
674 	// static MPI::Win MPI::Win::create(const void *base, MPI::Aint size, int disp_unit, MPI::Info info, const MPI_Comm & comm);
675 	// The displacement unit argument is provided to facilitate address arithmetic in RMA operations
676 	// *** COLLECTIVE OPERATION ***, everybody exposes its own array to everyone else in the communicator
677 
678 	for(int i=0; i< arrs.indarrs.size(); ++i)
679 	{
680 	        MPI_Win nWin;
681 	        MPI_Win_create(arrs.indarrs[i].addr,
682 			       arrs.indarrs[i].count * sizeof(IT), sizeof(IT), MPI_INFO_NULL, comm1d, &nWin);
683 		arrwin.push_back(nWin);
684 	}
685 	for(int i=0; i< arrs.numarrs.size(); ++i)
686 	{
687 	        MPI_Win nWin;
688 		MPI_Win_create(arrs.numarrs[i].addr,
689 			       arrs.numarrs[i].count * sizeof(NT), sizeof(NT), MPI_INFO_NULL, comm1d, &nWin);
690 		arrwin.push_back(nWin);
691 	}
692 }
693 
LockWindows(int ownind,std::vector<MPI_Win> & arrwin)694 inline void SpParHelper::LockWindows(int ownind, std::vector<MPI_Win> & arrwin)
695 {
696 	for(std::vector<MPI_Win>::iterator itr = arrwin.begin(); itr != arrwin.end(); ++itr)
697 	{
698 		MPI_Win_lock(MPI_LOCK_SHARED, ownind, 0, *itr);
699 	}
700 }
701 
UnlockWindows(int ownind,std::vector<MPI_Win> & arrwin)702 inline void SpParHelper::UnlockWindows(int ownind, std::vector<MPI_Win> & arrwin)
703 {
704 	for(std::vector<MPI_Win>::iterator itr = arrwin.begin(); itr != arrwin.end(); ++itr)
705 	{
706 		MPI_Win_unlock( ownind, *itr);
707 	}
708 }
709 
710 
711 /**
712  * @param[in] owner {target processor rank within the processor group}
713  * @param[in] arrwin {start access epoch only to owner's arrwin (-windows) }
714  */
StartAccessEpoch(int owner,std::vector<MPI_Win> & arrwin,MPI_Group & group)715 inline void SpParHelper::StartAccessEpoch(int owner, std::vector<MPI_Win> & arrwin, MPI_Group & group)
716 {
717 	/* Now start using the whole comm as a group */
718 	int acc_ranks[1];
719 	acc_ranks[0] = owner;
720 	MPI_Group access;
721 	MPI_Group_incl(group, 1, acc_ranks, &access);	// take only the owner
722 
723 	// begin the ACCESS epochs for the arrays of the remote matrices A and B
724 	// Start() *may* block until all processes in the target group have entered their exposure epoch
725 	for(unsigned int i=0; i< arrwin.size(); ++i)
726 	       MPI_Win_start(access, 0, arrwin[i]);
727 
728 	MPI_Group_free(&access);
729 }
730 
731 /**
732  * @param[in] self {rank of "this" processor to be excluded when starting the exposure epoch}
733  */
PostExposureEpoch(int self,std::vector<MPI_Win> & arrwin,MPI_Group & group)734 inline void SpParHelper::PostExposureEpoch(int self, std::vector<MPI_Win> & arrwin, MPI_Group & group)
735 {
736 	// begin the EXPOSURE epochs for the arrays of the local matrices A and B
737 	for(unsigned int i=0; i< arrwin.size(); ++i)
738 	       MPI_Win_post(group, MPI_MODE_NOPUT, arrwin[i]);
739 }
740 
741 template <class IT, class DER>
AccessNFetch(DER * & Matrix,int owner,std::vector<MPI_Win> & arrwin,MPI_Group & group,IT ** sizes)742 void SpParHelper::AccessNFetch(DER * & Matrix, int owner, std::vector<MPI_Win> & arrwin, MPI_Group & group, IT ** sizes)
743 {
744 	StartAccessEpoch(owner, arrwin, group);	// start the access epoch to arrwin of owner
745 
746 	std::vector<IT> ess(DER::esscount);			// pack essentials to a vector
747 	for(int j=0; j< DER::esscount; ++j)
748 		ess[j] = sizes[j][owner];
749 
750 	Matrix = new DER();	// create the object first
751 	FetchMatrix(*Matrix, ess, arrwin, owner);	// then start fetching its elements
752 }
753 
754 template <class IT, class DER>
LockNFetch(DER * & Matrix,int owner,std::vector<MPI_Win> & arrwin,MPI_Group & group,IT ** sizes)755 void SpParHelper::LockNFetch(DER * & Matrix, int owner, std::vector<MPI_Win> & arrwin, MPI_Group & group, IT ** sizes)
756 {
757 	LockWindows(owner, arrwin);
758 
759 	std::vector<IT> ess(DER::esscount);			// pack essentials to a vector
760 	for(int j=0; j< DER::esscount; ++j)
761 		ess[j] = sizes[j][owner];
762 
763 	Matrix = new DER();	// create the object first
764 	FetchMatrix(*Matrix, ess, arrwin, owner);	// then start fetching its elements
765 }
766 
767 /**
768  * @param[in] sizes 2D array where
769  *  	sizes[i] is an array of size r/s representing the ith essential component of all local blocks within that row/col
770  *	sizes[i][j] is the size of the ith essential component of the jth local block within this row/col
771  */
772 template <class IT, class NT, class DER>
GetSetSizes(const SpMat<IT,NT,DER> & Matrix,IT ** & sizes,MPI_Comm & comm1d)773 void SpParHelper::GetSetSizes(const SpMat<IT,NT,DER> & Matrix, IT ** & sizes, MPI_Comm & comm1d)
774 {
775 	std::vector<IT> essentials = Matrix.GetEssentials();
776 	int index;
777 	MPI_Comm_rank(comm1d, &index);
778 
779 	for(IT i=0; (unsigned)i < essentials.size(); ++i)
780 	{
781 		sizes[i][index] = essentials[i];
782 		MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), sizes[i], 1, MPIType<IT>(), comm1d);
783 	}
784 }
785 
PrintFile(const std::string & s,const std::string & filename)786 inline void SpParHelper::PrintFile(const std::string & s, const std::string & filename)
787 {
788 	int myrank;
789     MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
790 	if(myrank == 0)
791 	{
792 		std::ofstream out(filename.c_str(), std::ofstream::app);
793 		out << s;
794 		out.close();
795 	}
796 }
797 
PrintFile(const std::string & s,const std::string & filename,MPI_Comm & world)798 inline void SpParHelper::PrintFile(const std::string & s, const std::string & filename, MPI_Comm & world)
799 {
800     int myrank;
801     MPI_Comm_rank(world, &myrank);
802     if(myrank == 0)
803     {
804         std::ofstream out(filename.c_str(), std::ofstream::app);
805         out << s;
806         out.close();
807     }
808 }
809 
810 
Print(const std::string & s)811 inline void SpParHelper::Print(const std::string & s)
812 {
813 	int myrank;
814 	MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
815 	if(myrank == 0)
816 	{
817 		std::cerr << s;
818 	}
819 }
820 
Print(const std::string & s,MPI_Comm & world)821 inline void SpParHelper::Print(const std::string & s, MPI_Comm & world)
822 {
823     int myrank;
824     MPI_Comm_rank(world, &myrank);
825     if(myrank == 0)
826     {
827         std::cerr << s;
828     }
829 }
830 
831 
check_newline(int * bytes_read,int bytes_requested,char * buf)832 inline void SpParHelper::check_newline(int *bytes_read, int bytes_requested, char *buf)
833 {
834     if ((*bytes_read) < bytes_requested) {
835         // fewer bytes than expected, this means EOF
836         if (buf[(*bytes_read) - 1] != '\n') {
837             // doesn't terminate with a newline, add one to prevent infinite loop later
838             buf[(*bytes_read) - 1] = '\n';
839             std::cout << "Error in Matrix Market format, appending missing newline at end of file" << std::endl;
840             (*bytes_read)++;
841         }
842     }
843 }
844 
845 
FetchBatch(MPI_File & infile,MPI_Offset & curpos,MPI_Offset end_fpos,bool firstcall,std::vector<std::string> & lines,int myrank)846 inline bool SpParHelper::FetchBatch(MPI_File & infile, MPI_Offset & curpos, MPI_Offset end_fpos, bool firstcall, std::vector<std::string> & lines, int myrank)
847 {
848     size_t bytes2fetch = ONEMILLION;    // we might read more than needed but no problem as we won't process them
849     MPI_Status status;
850     int bytes_read;
851     if(firstcall && myrank != 0)
852     {
853         curpos -= 1;    // first byte is to check whether we started at the beginning of a line
854         bytes2fetch += 1;
855     }
856     char * buf = new char[bytes2fetch]; // needs to happen **after** bytes2fetch is updated
857     char * originalbuf = buf;   // so that we can delete it later because "buf" will move
858 
859     MPI_File_read_at(infile, curpos, buf, bytes2fetch, MPI_CHAR, &status);
860     MPI_Get_count(&status, MPI_CHAR, &bytes_read);  // MPI_Get_Count can only return 32-bit integers
861     if(!bytes_read)
862     {
863         delete [] originalbuf;
864         return true;    // done
865     }
866     SpParHelper::check_newline(&bytes_read, bytes2fetch, buf);
867     if(firstcall && myrank != 0)
868     {
869         if(buf[0] == '\n')  // we got super lucky and hit the line break
870         {
871             buf += 1;
872             bytes_read -= 1;
873             curpos += 1;
874         }
875         else    // skip to the next line and let the preceeding processor take care of this partial line
876         {
877             char *c = (char*)memchr(buf, '\n', MAXLINELENGTH); //  return a pointer to the matching byte or NULL if the character does not occur
878             if (c == NULL) {
879                 std::cout << "Unexpected line without a break" << std::endl;
880             }
881             int n = c - buf + 1;
882             bytes_read -= n;
883             buf += n;
884             curpos += n;
885         }
886     }
887     while(bytes_read > 0 && curpos < end_fpos)  // this will also finish the last line
888     {
889         char *c = (char*)memchr(buf, '\n', bytes_read); //  return a pointer to the matching byte or NULL if the character does not occur
890         if (c == NULL) {
891             delete [] originalbuf;
892             return false;  // if bytes_read stops in the middle of a line, that line will be re-read next time since curpos has not been moved forward yet
893         }
894         int n = c - buf + 1;
895 
896         // string constructor from char * buffer: copies the first n characters from the array of characters pointed by s
897         lines.push_back(std::string(buf, n-1));  // no need to copy the newline character
898         bytes_read -= n;   // reduce remaining bytes
899         buf += n;   // move forward the buffer
900         curpos += n;
901     }
902     delete [] originalbuf;
903     if (curpos >= end_fpos) return true;  // don't call it again, nothing left to read
904     else    return false;
905 }
906 
907 
WaitNFree(std::vector<MPI_Win> & arrwin)908 inline void SpParHelper::WaitNFree(std::vector<MPI_Win> & arrwin)
909 {
910 	// End the exposure epochs for the arrays of the local matrices A and B
911 	// The Wait() call matches calls to Complete() issued by ** EACH OF THE ORIGIN PROCESSES **
912 	// that were granted access to the window during this epoch.
913 	for(unsigned int i=0; i< arrwin.size(); ++i)
914 	{
915 		MPI_Win_wait(arrwin[i]);
916 	}
917 	FreeWindows(arrwin);
918 }
919 
FreeWindows(std::vector<MPI_Win> & arrwin)920 inline void SpParHelper::FreeWindows(std::vector<MPI_Win> & arrwin)
921 {
922 	for(unsigned int i=0; i< arrwin.size(); ++i)
923 	{
924 		MPI_Win_free(&arrwin[i]);
925 	}
926 }
927 
928 }
929