1 /****************************************************************/
2 /* Parallel Combinatorial BLAS Library (for Graph Computations) */
3 /* version 1.6 -------------------------------------------------*/
4 /* date: 6/15/2017 ---------------------------------------------*/
5 /* authors: Ariful Azad, Aydin Buluc --------------------------*/
6 /****************************************************************/
7 /*
8 Copyright (c) 2010-2017, The Regents of the University of California
9
10 Permission is hereby granted, free of charge, to any person obtaining a copy
11 of this software and associated documentation files (the "Software"), to deal
12 in the Software without restriction, including without limitation the rights
13 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 copies of the Software, and to permit persons to whom the Software is
15 furnished to do so, subject to the following conditions:
16
17 The above copyright notice and this permission notice shall be included in
18 all copies or substantial portions of the Software.
19
20 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 THE SOFTWARE.
27 */
28
29
30 #include "usort/parUtils.h"
31
32 namespace combblas {
33
34 template <typename IT>
ReDistributeToVector(int * & map_scnt,std::vector<std::vector<IT>> & locs_send,std::vector<std::vector<std::string>> & data_send,std::vector<std::array<char,MAXVERTNAME>> & distmapper_array,const MPI_Comm & comm)35 void SpParHelper::ReDistributeToVector(int* & map_scnt, std::vector< std::vector< IT > > & locs_send, std::vector< std::vector< std::string > > & data_send,
36 std::vector<std::array<char, MAXVERTNAME>> & distmapper_array, const MPI_Comm & comm)
37 {
38 int nprocs, myrank;
39 MPI_Comm_size(comm, &nprocs);
40 MPI_Comm_rank(comm, &myrank);
41
42 int * map_rcnt = new int[nprocs];
43 MPI_Alltoall(map_scnt, 1, MPI_INT, map_rcnt, 1, MPI_INT, comm);
44 int * map_sdspl = new int[nprocs]();
45 int * map_rdspl = new int[nprocs]();
46 std::partial_sum(map_scnt, map_scnt+nprocs-1, map_sdspl+1);
47 std::partial_sum(map_rcnt, map_rcnt+nprocs-1, map_rdspl+1);
48 IT totmapsend = map_sdspl[nprocs-1] + map_scnt[nprocs-1];
49 IT totmaprecv = map_rdspl[nprocs-1] + map_rcnt[nprocs-1];
50
51 // sendbuf is a pointer to array of MAXVERTNAME chars.
52 // Explicit grouping syntax is due to precedence of [] over *
53 // char* sendbuf[MAXVERTNAME] would have declared a MAXVERTNAME-length array of char pointers
54 char (*sendbuf)[MAXVERTNAME]; // each sendbuf[i] is type char[MAXVERTNAME]
55 sendbuf = (char (*)[MAXVERTNAME]) malloc(sizeof(char[MAXVERTNAME])* totmapsend); // notice that this is allocating a contiguous block of memory
56
57 IT * sendinds = new IT[totmapsend];
58 for(int i=0; i<nprocs; ++i)
59 {
60 int loccnt = 0;
61 for(std::string s:data_send[i])
62 {
63 std::strcpy(sendbuf[map_sdspl[i]+loccnt], s.c_str());
64 loccnt++;
65 }
66 std::vector<std::string>().swap(data_send[i]); // free memory
67 }
68 for(int i=0; i<nprocs; ++i) // sanity check: received indices should be sorted by definition
69 {
70 std::copy(locs_send[i].begin(), locs_send[i].end(), sendinds+map_sdspl[i]);
71 std::vector<IT>().swap(locs_send[i]); // free memory
72 }
73
74 char (*recvbuf)[MAXVERTNAME]; // recvbuf is of type char (*)[MAXVERTNAME]
75 recvbuf = (char (*)[MAXVERTNAME]) malloc(sizeof(char[MAXVERTNAME])* totmaprecv);
76
77 MPI_Datatype MPI_STRING; // this is not necessary (we could just use char) but easier for bookkeeping
78 MPI_Type_contiguous(sizeof(char[MAXVERTNAME]), MPI_CHAR, &MPI_STRING);
79 MPI_Type_commit(&MPI_STRING);
80
81 MPI_Alltoallv(sendbuf, map_scnt, map_sdspl, MPI_STRING, recvbuf, map_rcnt, map_rdspl, MPI_STRING, comm);
82 free(sendbuf); // can't delete[] so use free
83 MPI_Type_free(&MPI_STRING);
84
85 IT * recvinds = new IT[totmaprecv];
86 MPI_Alltoallv(sendinds, map_scnt, map_sdspl, MPIType<IT>(), recvinds, map_rcnt, map_rdspl, MPIType<IT>(), comm);
87 DeleteAll(sendinds, map_scnt, map_sdspl, map_rcnt, map_rdspl);
88
89 if(!std::is_sorted(recvinds, recvinds+totmaprecv))
90 std::cout << "Assertion failed at proc " << myrank << ": Received indices are not sorted, this is unexpected" << std::endl;
91
92 for(IT i=0; i< totmaprecv; ++i)
93 {
94 assert(i == recvinds[i]);
95 std::copy(recvbuf[i], recvbuf[i]+MAXVERTNAME, distmapper_array[i].begin());
96 }
97 free(recvbuf);
98 delete [] recvinds;
99 }
100
101
102 template<typename KEY, typename VAL, typename IT>
MemoryEfficientPSort(std::pair<KEY,VAL> * array,IT length,IT * dist,const MPI_Comm & comm)103 void SpParHelper::MemoryEfficientPSort(std::pair<KEY,VAL> * array, IT length, IT * dist, const MPI_Comm & comm)
104 {
105 int nprocs, myrank;
106 MPI_Comm_size(comm, &nprocs);
107 MPI_Comm_rank(comm, &myrank);
108 int nsize = nprocs / 2; // new size
109 if(nprocs < 10000)
110 {
111 bool excluded = false;
112 if(dist[myrank] == 0) excluded = true;
113
114 int nreals = 0;
115 for(int i=0; i< nprocs; ++i)
116 if(dist[i] != 0) ++nreals;
117
118 //SpParHelper::MemoryEfficientPSort(vecpair, nnz, dist, World);
119
120 if(nreals == nprocs) // general case
121 {
122 long * dist_in = new long[nprocs];
123 for(int i=0; i< nprocs; ++i) dist_in[i] = (long) dist[i];
124 vpsort::parallel_sort (array, array+length, dist_in, comm);
125 delete [] dist_in;
126 }
127 else
128 {
129 long * dist_in = new long[nreals];
130 int * dist_out = new int[nprocs-nreals]; // ranks to exclude
131 int indin = 0;
132 int indout = 0;
133 for(int i=0; i< nprocs; ++i)
134 {
135 if(dist[i] == 0)
136 dist_out[indout++] = i;
137 else
138 dist_in[indin++] = (long) dist[i];
139 }
140
141 #ifdef DEBUG
142 std::ostringstream outs;
143 outs << "To exclude indices: ";
144 std::copy(dist_out, dist_out+indout, std::ostream_iterator<int>(outs, " ")); outs << std::endl;
145 SpParHelper::Print(outs.str());
146 #endif
147
148 MPI_Group sort_group, real_group;
149 MPI_Comm_group(comm, &sort_group);
150 MPI_Group_excl(sort_group, indout, dist_out, &real_group);
151 MPI_Group_free(&sort_group);
152
153 // The Create() function should be executed by all processes in comm,
154 // even if they do not belong to the new group (in that case MPI_COMM_NULL is returned as real_comm?)
155 // MPI::Intracomm MPI::Intracomm::Create(const MPI::Group& group) const;
156 MPI_Comm real_comm;
157 MPI_Comm_create(comm, real_group, &real_comm);
158 if(!excluded)
159 {
160 vpsort::parallel_sort (array, array+length, dist_in, real_comm);
161 MPI_Comm_free(&real_comm);
162 }
163 MPI_Group_free(&real_group);
164 delete [] dist_in;
165 delete [] dist_out;
166 }
167 }
168 else
169 {
170 IT gl_median = std::accumulate(dist, dist+nsize, static_cast<IT>(0)); // global rank of the first element of the median processor
171 sort(array, array+length); // re-sort because we might have swapped data in previous iterations
172 int color = (myrank < nsize)? 0: 1;
173
174 std::pair<KEY,VAL> * low = array;
175 std::pair<KEY,VAL> * upp = array;
176 GlobalSelect(gl_median, low, upp, array, length, comm);
177 BipartiteSwap(low, array, length, nsize, color, comm);
178
179 if(color == 1) dist = dist + nsize; // adjust for the second half of processors
180
181 // recursive call; two implicit 'spawn's where half of the processors execute different paramaters
182 // MPI::Intracomm MPI::Intracomm::Split(int color, int key) const;
183
184 MPI_Comm halfcomm;
185 MPI_Comm_split(comm, color, myrank, &halfcomm); // split into two communicators
186 MemoryEfficientPSort(array, length, dist, halfcomm);
187 }
188
189 }
190
191
192 /*
193 TODO: This function is just a hack at this moment.
194 The payload (VAL) can only be integer at this moment.
195 FIX this.
196 */
197 template<typename KEY, typename VAL, typename IT>
KeyValuePSort(std::pair<KEY,VAL> * array,IT length,IT * dist,const MPI_Comm & comm)198 std::vector<std::pair<KEY,VAL>> SpParHelper::KeyValuePSort(std::pair<KEY,VAL> * array, IT length, IT * dist, const MPI_Comm & comm)
199 {
200 int nprocs, myrank;
201 MPI_Comm_size(comm, &nprocs);
202 MPI_Comm_rank(comm, &myrank);
203 int nsize = nprocs / 2; // new size
204
205
206
207 bool excluded = false;
208 if(dist[myrank] == 0) excluded = true;
209
210 int nreals = 0;
211 for(int i=0; i< nprocs; ++i)
212 if(dist[i] != 0) ++nreals;
213
214 std::vector<IndexHolder<KEY>> in(length);
215 #ifdef THREADED
216 #pragma omp parallel for
217 #endif
218 for(int i=0; i< length; ++i)
219 {
220 in[i] = IndexHolder<KEY>(array[i].first, static_cast<unsigned long>(array[i].second));
221 }
222
223 if(nreals == nprocs) // general case
224 {
225 par::sampleSort(in, comm);
226 }
227 else
228 {
229 long * dist_in = new long[nreals];
230 int * dist_out = new int[nprocs-nreals]; // ranks to exclude
231 int indin = 0;
232 int indout = 0;
233 for(int i=0; i< nprocs; ++i)
234 {
235 if(dist[i] == 0)
236 dist_out[indout++] = i;
237 else
238 dist_in[indin++] = (long) dist[i];
239 }
240
241 #ifdef DEBUG
242 std::ostringstream outs;
243 outs << "To exclude indices: ";
244 std::copy(dist_out, dist_out+indout, std::ostream_iterator<int>(outs, " ")); outs << std::endl;
245 SpParHelper::Print(outs.str());
246 #endif
247
248 MPI_Group sort_group, real_group;
249 MPI_Comm_group(comm, &sort_group);
250 MPI_Group_excl(sort_group, indout, dist_out, &real_group);
251 MPI_Group_free(&sort_group);
252
253 // The Create() function should be executed by all processes in comm,
254 // even if they do not belong to the new group (in that case MPI_COMM_NULL is returned as real_comm?)
255 // MPI::Intracomm MPI::Intracomm::Create(const MPI::Group& group) const;
256 MPI_Comm real_comm;
257 MPI_Comm_create(comm, real_group, &real_comm);
258 if(!excluded)
259 {
260 par::sampleSort(in, real_comm);
261 MPI_Comm_free(&real_comm);
262 }
263 MPI_Group_free(&real_group);
264 delete [] dist_in;
265 delete [] dist_out;
266 }
267
268 std::vector<std::pair<KEY,VAL>> sorted(in.size());
269 for(int i=0; i<in.size(); i++)
270 {
271 sorted[i].second = static_cast<VAL>(in[i].index);
272 sorted[i].first = in[i].value;
273 }
274 return sorted;
275 }
276
277
278 template<typename KEY, typename VAL, typename IT>
GlobalSelect(IT gl_rank,std::pair<KEY,VAL> * & low,std::pair<KEY,VAL> * & upp,std::pair<KEY,VAL> * array,IT length,const MPI_Comm & comm)279 void SpParHelper::GlobalSelect(IT gl_rank, std::pair<KEY,VAL> * & low, std::pair<KEY,VAL> * & upp, std::pair<KEY,VAL> * array, IT length, const MPI_Comm & comm)
280 {
281 int nprocs, myrank;
282 MPI_Comm_size(comm, &nprocs);
283 MPI_Comm_rank(comm, &myrank);
284 IT begin = 0;
285 IT end = length; // initially everyone is active
286 std::pair<KEY, double> * wmminput = new std::pair<KEY,double>[nprocs]; // (median, #{actives})
287
288 MPI_Datatype MPI_sortType;
289 MPI_Type_contiguous (sizeof(std::pair<KEY,double>), MPI_CHAR, &MPI_sortType);
290 MPI_Type_commit (&MPI_sortType);
291
292 KEY wmm; // our median pick
293 IT gl_low, gl_upp;
294 IT active = end-begin; // size of the active range
295 IT nacts = 0;
296 bool found = 0;
297 int iters = 0;
298
299 /* changes by shan : to add begin0 and end0 for exit condition */
300 IT begin0, end0;
301 do
302 {
303 iters++;
304 begin0 = begin; end0 = end;
305 KEY median = array[(begin + end)/2].first; // median of the active range
306 wmminput[myrank].first = median;
307 wmminput[myrank].second = static_cast<double>(active);
308 MPI_Allgather(MPI_IN_PLACE, 0, MPI_sortType, wmminput, 1, MPI_sortType, comm);
309 double totact = 0; // total number of active elements
310 for(int i=0; i<nprocs; ++i)
311 totact += wmminput[i].second;
312
313 // input to weighted median of medians is a set of (object, weight) pairs
314 // the algorithm computes the first set of elements (according to total
315 // order of "object"s), whose sum is still less than or equal to 1/2
316 for(int i=0; i<nprocs; ++i)
317 wmminput[i].second /= totact ; // normalize the weights
318
319 sort(wmminput, wmminput+nprocs); // sort w.r.t. medians
320 double totweight = 0;
321 int wmmloc=0;
322 while( wmmloc<nprocs && totweight < 0.5 )
323 {
324 totweight += wmminput[wmmloc++].second;
325 }
326
327 wmm = wmminput[wmmloc-1].first; // weighted median of medians
328
329 std::pair<KEY,VAL> wmmpair = std::make_pair(wmm, VAL());
330 low =std::lower_bound (array+begin, array+end, wmmpair);
331 upp =std::upper_bound (array+begin, array+end, wmmpair);
332 IT loc_low = low-array; // #{elements smaller than wmm}
333 IT loc_upp = upp-array; // #{elements smaller or equal to wmm}
334
335 MPI_Allreduce( &loc_low, &gl_low, 1, MPIType<IT>(), MPI_SUM, comm);
336 MPI_Allreduce( &loc_upp, &gl_upp, 1, MPIType<IT>(), MPI_SUM, comm);
337
338 if(gl_upp < gl_rank)
339 {
340 // our pick was too small; only recurse to the right
341 begin = (low - array);
342 }
343 else if(gl_rank < gl_low)
344 {
345 // our pick was too big; only recurse to the left
346 end = (upp - array);
347 }
348 else
349 {
350 found = true;
351 }
352 active = end-begin;
353 MPI_Allreduce(&active, &nacts, 1, MPIType<IT>(), MPI_SUM, comm);
354 if (begin0 == begin && end0 == end) break; // ABAB: Active range did not shrink, so we break (is this kosher?)
355 }
356 while((nacts > 2*nprocs) && (!found));
357 delete [] wmminput;
358
359 MPI_Datatype MPI_pairType;
360 MPI_Type_contiguous (sizeof(std::pair<KEY,VAL>), MPI_CHAR, &MPI_pairType);
361 MPI_Type_commit (&MPI_pairType);
362
363 int * nactives = new int[nprocs];
364 nactives[myrank] = static_cast<int>(active); // At this point, actives are small enough
365 MPI_Allgather(MPI_IN_PLACE, 0, MPI_INT, nactives, 1, MPI_INT, comm);
366 int * dpls = new int[nprocs](); // displacements (zero initialized pid)
367 std::partial_sum(nactives, nactives+nprocs-1, dpls+1);
368 std::pair<KEY,VAL> * recvbuf = new std::pair<KEY,VAL>[nacts];
369 low = array + begin; // update low to the beginning of the active range
370 MPI_Allgatherv(low, active, MPI_pairType, recvbuf, nactives, dpls, MPI_pairType, comm);
371
372 std::pair<KEY,int> * allactives = new std::pair<KEY,int>[nacts];
373 int k = 0;
374 for(int i=0; i<nprocs; ++i)
375 {
376 for(int j=0; j<nactives[i]; ++j)
377 {
378 allactives[k] = std::make_pair(recvbuf[k].first, i);
379 k++;
380 }
381 }
382 DeleteAll(recvbuf, dpls, nactives);
383 sort(allactives, allactives+nacts);
384 MPI_Allreduce(&begin, &gl_low, 1, MPIType<IT>(), MPI_SUM, comm); // update
385 int diff = gl_rank - gl_low;
386 for(int k=0; k < diff; ++k)
387 {
388 if(allactives[k].second == myrank)
389 ++low; // increment the local pointer
390 }
391 delete [] allactives;
392 begin = low-array;
393 MPI_Allreduce(&begin, &gl_low, 1, MPIType<IT>(), MPI_SUM, comm); // update
394 }
395
396 template<typename KEY, typename VAL, typename IT>
BipartiteSwap(std::pair<KEY,VAL> * low,std::pair<KEY,VAL> * array,IT length,int nfirsthalf,int color,const MPI_Comm & comm)397 void SpParHelper::BipartiteSwap(std::pair<KEY,VAL> * low, std::pair<KEY,VAL> * array, IT length, int nfirsthalf, int color, const MPI_Comm & comm)
398 {
399 int nprocs, myrank;
400 MPI_Comm_size(comm, &nprocs);
401 MPI_Comm_rank(comm, &myrank);
402
403 IT * firsthalves = new IT[nprocs];
404 IT * secondhalves = new IT[nprocs];
405 firsthalves[myrank] = low-array;
406 secondhalves[myrank] = length - (low-array);
407
408 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), firsthalves, 1, MPIType<IT>(), comm);
409 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), secondhalves, 1, MPIType<IT>(), comm);
410
411 int * sendcnt = new int[nprocs](); // zero initialize
412 int totrecvcnt = 0;
413
414 std::pair<KEY,VAL> * bufbegin = NULL;
415 if(color == 0) // first processor half, only send second half of data
416 {
417 bufbegin = low;
418 totrecvcnt = length - (low-array);
419 IT beg_oftransfer = std::accumulate(secondhalves, secondhalves+myrank, static_cast<IT>(0));
420 IT spaceafter = firsthalves[nfirsthalf];
421 int i=nfirsthalf+1;
422 while(i < nprocs && spaceafter < beg_oftransfer)
423 {
424 spaceafter += firsthalves[i++]; // post-incremenet
425 }
426 IT end_oftransfer = beg_oftransfer + secondhalves[myrank]; // global index (within second half) of the end of my data
427 IT beg_pour = beg_oftransfer;
428 IT end_pour = std::min(end_oftransfer, spaceafter);
429 sendcnt[i-1] = end_pour - beg_pour;
430 while( i < nprocs && spaceafter < end_oftransfer ) // find other recipients until I run out of data
431 {
432 beg_pour = end_pour;
433 spaceafter += firsthalves[i];
434 end_pour = std::min(end_oftransfer, spaceafter);
435 sendcnt[i++] = end_pour - beg_pour; // post-increment
436 }
437 }
438 else if(color == 1) // second processor half, only send first half of data
439 {
440 bufbegin = array;
441 totrecvcnt = low-array;
442 // global index (within the second processor half) of the beginning of my data
443 IT beg_oftransfer = std::accumulate(firsthalves+nfirsthalf, firsthalves+myrank, static_cast<IT>(0));
444 IT spaceafter = secondhalves[0];
445 int i=1;
446 while( i< nfirsthalf && spaceafter < beg_oftransfer)
447 {
448 //spacebefore = spaceafter;
449 spaceafter += secondhalves[i++]; // post-increment
450 }
451 IT end_oftransfer = beg_oftransfer + firsthalves[myrank]; // global index (within second half) of the end of my data
452 IT beg_pour = beg_oftransfer;
453 IT end_pour = std::min(end_oftransfer, spaceafter);
454 sendcnt[i-1] = end_pour - beg_pour;
455 while( i < nfirsthalf && spaceafter < end_oftransfer ) // find other recipients until I run out of data
456 {
457 beg_pour = end_pour;
458 spaceafter += secondhalves[i];
459 end_pour = std::min(end_oftransfer, spaceafter);
460 sendcnt[i++] = end_pour - beg_pour; // post-increment
461 }
462 }
463 DeleteAll(firsthalves, secondhalves);
464 int * recvcnt = new int[nprocs];
465 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, comm); // get the recv counts
466 // Alltoall is actually unnecessary, because sendcnt = recvcnt
467 // If I have n_mine > n_yours data to send, then I can send you only n_yours
468 // as this is your space, and you'll send me identical amount.
469 // Then I can only receive n_mine - n_yours from the third processor and
470 // that processor can only send n_mine - n_yours to me back.
471 // The proof follows from induction
472
473 MPI_Datatype MPI_valueType;
474 MPI_Type_contiguous(sizeof(std::pair<KEY,VAL>), MPI_CHAR, &MPI_valueType);
475 MPI_Type_commit(&MPI_valueType);
476
477 std::pair<KEY,VAL> * receives = new std::pair<KEY,VAL>[totrecvcnt];
478 int * sdpls = new int[nprocs](); // displacements (zero initialized pid)
479 int * rdpls = new int[nprocs]();
480 std::partial_sum(sendcnt, sendcnt+nprocs-1, sdpls+1);
481 std::partial_sum(recvcnt, recvcnt+nprocs-1, rdpls+1);
482
483 MPI_Alltoallv(bufbegin, sendcnt, sdpls, MPI_valueType, receives, recvcnt, rdpls, MPI_valueType, comm); // sparse swap
484
485 DeleteAll(sendcnt, recvcnt, sdpls, rdpls);
486 std::copy(receives, receives+totrecvcnt, bufbegin);
487 delete [] receives;
488 }
489
490
491 template<typename KEY, typename VAL, typename IT>
DebugPrintKeys(std::pair<KEY,VAL> * array,IT length,IT * dist,MPI_Comm & World)492 void SpParHelper::DebugPrintKeys(std::pair<KEY,VAL> * array, IT length, IT * dist, MPI_Comm & World)
493 {
494 int rank, nprocs;
495 MPI_Comm_rank(World, &rank);
496 MPI_Comm_size(World, &nprocs);
497 MPI_File thefile;
498
499 char _fn[] = "temp_sortedkeys"; // AL: this is to avoid the problem that C++ string literals are const char* while C string literals are char*, leading to a const warning (technically error, but compilers are tolerant)
500 MPI_File_open(World, _fn, MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &thefile);
501
502 // The cast in the last parameter is crucial because the signature of the function is
503 // T accumulate ( InputIterator first, InputIterator last, T init )
504 // Hence if init if of type "int", the output also becomes an it (remember C++ signatures are ignorant of return value)
505 IT sizeuntil = std::accumulate(dist, dist+rank, static_cast<IT>(0));
506
507 MPI_Offset disp = sizeuntil * sizeof(KEY); // displacement is in bytes
508 MPI_File_set_view(thefile, disp, MPIType<KEY>(), MPIType<KEY>(), "native", MPI_INFO_NULL);
509
510 KEY * packed = new KEY[length];
511 for(int i=0; i<length; ++i)
512 {
513 packed[i] = array[i].first;
514 }
515 MPI_File_write(thefile, packed, length, MPIType<KEY>(), NULL);
516 MPI_File_close(&thefile);
517 delete [] packed;
518
519 // Now let processor-0 read the file and print
520 if(rank == 0)
521 {
522 FILE * f = fopen("temp_sortedkeys", "r");
523 if(!f)
524 {
525 std::cerr << "Problem reading binary input file\n";
526 return;
527 }
528 IT maxd = *std::max_element(dist, dist+nprocs);
529 KEY * data = new KEY[maxd];
530
531 for(int i=0; i<nprocs; ++i)
532 {
533 // read n_per_proc integers and print them
534 fread(data, sizeof(KEY), dist[i],f);
535
536 std::cout << "Elements stored on proc " << i << ": " << std::endl;
537 std::copy(data, data+dist[i], std::ostream_iterator<KEY>(std::cout, "\n"));
538 }
539 delete [] data;
540 }
541 }
542
543
544 /**
545 * @param[in,out] MRecv {an already existing, but empty SpMat<...> object}
546 * @param[in] essentials {carries essential information (i.e. required array sizes) about ARecv}
547 * @param[in] arrwin {windows array of size equal to the number of built-in arrays in the SpMat data structure}
548 * @param[in] ownind {processor index (within this processor row/column) of the owner of the matrix to be received}
549 * @remark {The communicator information is implicitly contained in the MPI::Win objects}
550 **/
551 template <class IT, class NT, class DER>
FetchMatrix(SpMat<IT,NT,DER> & MRecv,const std::vector<IT> & essentials,std::vector<MPI_Win> & arrwin,int ownind)552 void SpParHelper::FetchMatrix(SpMat<IT,NT,DER> & MRecv, const std::vector<IT> & essentials, std::vector<MPI_Win> & arrwin, int ownind)
553 {
554 MRecv.Create(essentials); // allocate memory for arrays
555
556 Arr<IT,NT> arrinfo = MRecv.GetArrays();
557 assert( (arrwin.size() == arrinfo.totalsize()));
558
559 // C-binding for MPI::Get
560 // int MPI_Get(void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp,
561 // int target_count, MPI_Datatype target_datatype, MPI_Win win)
562
563 IT essk = 0;
564 for(int i=0; i< arrinfo.indarrs.size(); ++i) // get index arrays
565 {
566 //arrwin[essk].Lock(MPI::LOCK_SHARED, ownind, 0);
567 MPI_Get( arrinfo.indarrs[i].addr, arrinfo.indarrs[i].count, MPIType<IT>(), ownind, 0, arrinfo.indarrs[i].count, MPIType<IT>(), arrwin[essk++]);
568 }
569 for(int i=0; i< arrinfo.numarrs.size(); ++i) // get numerical arrays
570 {
571 //arrwin[essk].Lock(MPI::LOCK_SHARED, ownind, 0);
572 MPI_Get(arrinfo.numarrs[i].addr, arrinfo.numarrs[i].count, MPIType<NT>(), ownind, 0, arrinfo.numarrs[i].count, MPIType<NT>(), arrwin[essk++]);
573 }
574 }
575
576
577 /**
578 * @param[in] Matrix {For the root processor, the local object to be sent to all others.
579 * For all others, it is a (yet) empty object to be filled by the received data}
580 * @param[in] essentials {irrelevant for the root}
581 **/
582 template<typename IT, typename NT, typename DER>
BCastMatrix(MPI_Comm & comm1d,SpMat<IT,NT,DER> & Matrix,const std::vector<IT> & essentials,int root)583 void SpParHelper::BCastMatrix(MPI_Comm & comm1d, SpMat<IT,NT,DER> & Matrix, const std::vector<IT> & essentials, int root)
584 {
585 int myrank;
586 MPI_Comm_rank(comm1d, &myrank);
587 if(myrank != root)
588 {
589 Matrix.Create(essentials); // allocate memory for arrays
590 }
591
592 Arr<IT,NT> arrinfo = Matrix.GetArrays();
593 for(unsigned int i=0; i< arrinfo.indarrs.size(); ++i) // get index arrays
594 {
595 MPI_Bcast(arrinfo.indarrs[i].addr, arrinfo.indarrs[i].count, MPIType<IT>(), root, comm1d);
596 }
597 for(unsigned int i=0; i< arrinfo.numarrs.size(); ++i) // get numerical arrays
598 {
599 MPI_Bcast(arrinfo.numarrs[i].addr, arrinfo.numarrs[i].count, MPIType<NT>(), root, comm1d);
600 }
601 }
602
603
604 /**
605 * Just a test function to see the time to gather a matrix on an MPI process
606 * The ultimate object would be to create the whole matrix on rank 0 (TODO)
607 * @param[in] Matrix {For the root processor, the local object to be sent to all others.
608 * For all others, it is a (yet) empty object to be filled by the received data}
609 * @param[in] essentials {irrelevant for the root}
610 **/
611 template<typename IT, typename NT, typename DER>
GatherMatrix(MPI_Comm & comm1d,SpMat<IT,NT,DER> & Matrix,int root)612 void SpParHelper::GatherMatrix(MPI_Comm & comm1d, SpMat<IT,NT,DER> & Matrix, int root)
613 {
614 int myrank, nprocs;
615 MPI_Comm_rank(comm1d, &myrank);
616 MPI_Comm_size(comm1d,&nprocs);
617
618 /*
619 if(myrank != root)
620 {
621 Matrix.Create(essentials); // allocate memory for arrays
622 }
623 */
624
625 Arr<IT,NT> arrinfo = Matrix.GetArrays();
626 std::vector<std::vector<int>> recvcnt_ind(arrinfo.indarrs.size());
627 std::vector<std::vector<int>> recvcnt_num(arrinfo.numarrs.size());
628 for(unsigned int i=0; i< arrinfo.indarrs.size(); ++i) // get index arrays
629 {
630 recvcnt_ind[i].resize(nprocs);
631 int lcount = (int)arrinfo.indarrs[i].count;
632 MPI_Gather(&lcount, 1, MPI_INT, recvcnt_ind[i].data(),1, MPI_INT, root, comm1d);
633 }
634 for(unsigned int i=0; i< arrinfo.numarrs.size(); ++i) // get numerical arrays
635 {
636 recvcnt_num[i].resize(nprocs);
637 int lcount = (int) arrinfo.numarrs[i].count;
638 MPI_Gather(&lcount, 1, MPI_INT, recvcnt_num[i].data(),1, MPI_INT, root, comm1d);
639 }
640
641 // now gather the actual vector
642 std::vector<std::vector<int>> recvdsp_ind(arrinfo.indarrs.size());
643 std::vector<std::vector<int>> recvdsp_num(arrinfo.numarrs.size());
644 std::vector<std::vector<IT>> recvind(arrinfo.indarrs.size());
645 std::vector<std::vector<IT>> recvnum(arrinfo.numarrs.size());
646 for(unsigned int i=0; i< arrinfo.indarrs.size(); ++i) // get index arrays
647 {
648 recvdsp_ind[i].resize(nprocs);
649 recvdsp_ind[i][0] = 0;
650 for(int j=1; j<nprocs; j++)
651 recvdsp_ind[i][j] = recvdsp_ind[i][j-1] + recvcnt_ind[i][j-1];
652 recvind[i].resize(recvdsp_ind[i][nprocs-1] + recvcnt_ind[i][nprocs-1]);
653 MPI_Gatherv(arrinfo.indarrs[i].addr, arrinfo.indarrs[i].count, MPIType<IT>(), recvind[i].data(),recvcnt_ind[i].data(), recvdsp_ind[i].data(), MPIType<IT>(), root, comm1d);
654 }
655
656
657 for(unsigned int i=0; i< arrinfo.numarrs.size(); ++i) // gather num arrays
658 {
659 recvdsp_num[i].resize(nprocs);
660 recvdsp_num[i][0] = 0;
661 for(int j=1; j<nprocs; j++)
662 recvdsp_num[i][j] = recvdsp_num[i][j-1] + recvcnt_num[i][j-1];
663 recvnum[i].resize(recvdsp_num[i][nprocs-1] + recvcnt_num[i][nprocs-1]);
664 MPI_Gatherv(arrinfo.numarrs[i].addr, arrinfo.numarrs[i].count, MPIType<NT>(), recvnum[i].data(),recvcnt_num[i].data(), recvdsp_num[i].data(), MPIType<NT>(), root, comm1d);
665 }
666 }
667
668
669 template <class IT, class NT, class DER>
SetWindows(MPI_Comm & comm1d,const SpMat<IT,NT,DER> & Matrix,std::vector<MPI_Win> & arrwin)670 void SpParHelper::SetWindows(MPI_Comm & comm1d, const SpMat< IT,NT,DER > & Matrix, std::vector<MPI_Win> & arrwin)
671 {
672 Arr<IT,NT> arrs = Matrix.GetArrays();
673
674 // static MPI::Win MPI::Win::create(const void *base, MPI::Aint size, int disp_unit, MPI::Info info, const MPI_Comm & comm);
675 // The displacement unit argument is provided to facilitate address arithmetic in RMA operations
676 // *** COLLECTIVE OPERATION ***, everybody exposes its own array to everyone else in the communicator
677
678 for(int i=0; i< arrs.indarrs.size(); ++i)
679 {
680 MPI_Win nWin;
681 MPI_Win_create(arrs.indarrs[i].addr,
682 arrs.indarrs[i].count * sizeof(IT), sizeof(IT), MPI_INFO_NULL, comm1d, &nWin);
683 arrwin.push_back(nWin);
684 }
685 for(int i=0; i< arrs.numarrs.size(); ++i)
686 {
687 MPI_Win nWin;
688 MPI_Win_create(arrs.numarrs[i].addr,
689 arrs.numarrs[i].count * sizeof(NT), sizeof(NT), MPI_INFO_NULL, comm1d, &nWin);
690 arrwin.push_back(nWin);
691 }
692 }
693
LockWindows(int ownind,std::vector<MPI_Win> & arrwin)694 inline void SpParHelper::LockWindows(int ownind, std::vector<MPI_Win> & arrwin)
695 {
696 for(std::vector<MPI_Win>::iterator itr = arrwin.begin(); itr != arrwin.end(); ++itr)
697 {
698 MPI_Win_lock(MPI_LOCK_SHARED, ownind, 0, *itr);
699 }
700 }
701
UnlockWindows(int ownind,std::vector<MPI_Win> & arrwin)702 inline void SpParHelper::UnlockWindows(int ownind, std::vector<MPI_Win> & arrwin)
703 {
704 for(std::vector<MPI_Win>::iterator itr = arrwin.begin(); itr != arrwin.end(); ++itr)
705 {
706 MPI_Win_unlock( ownind, *itr);
707 }
708 }
709
710
711 /**
712 * @param[in] owner {target processor rank within the processor group}
713 * @param[in] arrwin {start access epoch only to owner's arrwin (-windows) }
714 */
StartAccessEpoch(int owner,std::vector<MPI_Win> & arrwin,MPI_Group & group)715 inline void SpParHelper::StartAccessEpoch(int owner, std::vector<MPI_Win> & arrwin, MPI_Group & group)
716 {
717 /* Now start using the whole comm as a group */
718 int acc_ranks[1];
719 acc_ranks[0] = owner;
720 MPI_Group access;
721 MPI_Group_incl(group, 1, acc_ranks, &access); // take only the owner
722
723 // begin the ACCESS epochs for the arrays of the remote matrices A and B
724 // Start() *may* block until all processes in the target group have entered their exposure epoch
725 for(unsigned int i=0; i< arrwin.size(); ++i)
726 MPI_Win_start(access, 0, arrwin[i]);
727
728 MPI_Group_free(&access);
729 }
730
731 /**
732 * @param[in] self {rank of "this" processor to be excluded when starting the exposure epoch}
733 */
PostExposureEpoch(int self,std::vector<MPI_Win> & arrwin,MPI_Group & group)734 inline void SpParHelper::PostExposureEpoch(int self, std::vector<MPI_Win> & arrwin, MPI_Group & group)
735 {
736 // begin the EXPOSURE epochs for the arrays of the local matrices A and B
737 for(unsigned int i=0; i< arrwin.size(); ++i)
738 MPI_Win_post(group, MPI_MODE_NOPUT, arrwin[i]);
739 }
740
741 template <class IT, class DER>
AccessNFetch(DER * & Matrix,int owner,std::vector<MPI_Win> & arrwin,MPI_Group & group,IT ** sizes)742 void SpParHelper::AccessNFetch(DER * & Matrix, int owner, std::vector<MPI_Win> & arrwin, MPI_Group & group, IT ** sizes)
743 {
744 StartAccessEpoch(owner, arrwin, group); // start the access epoch to arrwin of owner
745
746 std::vector<IT> ess(DER::esscount); // pack essentials to a vector
747 for(int j=0; j< DER::esscount; ++j)
748 ess[j] = sizes[j][owner];
749
750 Matrix = new DER(); // create the object first
751 FetchMatrix(*Matrix, ess, arrwin, owner); // then start fetching its elements
752 }
753
754 template <class IT, class DER>
LockNFetch(DER * & Matrix,int owner,std::vector<MPI_Win> & arrwin,MPI_Group & group,IT ** sizes)755 void SpParHelper::LockNFetch(DER * & Matrix, int owner, std::vector<MPI_Win> & arrwin, MPI_Group & group, IT ** sizes)
756 {
757 LockWindows(owner, arrwin);
758
759 std::vector<IT> ess(DER::esscount); // pack essentials to a vector
760 for(int j=0; j< DER::esscount; ++j)
761 ess[j] = sizes[j][owner];
762
763 Matrix = new DER(); // create the object first
764 FetchMatrix(*Matrix, ess, arrwin, owner); // then start fetching its elements
765 }
766
767 /**
768 * @param[in] sizes 2D array where
769 * sizes[i] is an array of size r/s representing the ith essential component of all local blocks within that row/col
770 * sizes[i][j] is the size of the ith essential component of the jth local block within this row/col
771 */
772 template <class IT, class NT, class DER>
GetSetSizes(const SpMat<IT,NT,DER> & Matrix,IT ** & sizes,MPI_Comm & comm1d)773 void SpParHelper::GetSetSizes(const SpMat<IT,NT,DER> & Matrix, IT ** & sizes, MPI_Comm & comm1d)
774 {
775 std::vector<IT> essentials = Matrix.GetEssentials();
776 int index;
777 MPI_Comm_rank(comm1d, &index);
778
779 for(IT i=0; (unsigned)i < essentials.size(); ++i)
780 {
781 sizes[i][index] = essentials[i];
782 MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), sizes[i], 1, MPIType<IT>(), comm1d);
783 }
784 }
785
PrintFile(const std::string & s,const std::string & filename)786 inline void SpParHelper::PrintFile(const std::string & s, const std::string & filename)
787 {
788 int myrank;
789 MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
790 if(myrank == 0)
791 {
792 std::ofstream out(filename.c_str(), std::ofstream::app);
793 out << s;
794 out.close();
795 }
796 }
797
PrintFile(const std::string & s,const std::string & filename,MPI_Comm & world)798 inline void SpParHelper::PrintFile(const std::string & s, const std::string & filename, MPI_Comm & world)
799 {
800 int myrank;
801 MPI_Comm_rank(world, &myrank);
802 if(myrank == 0)
803 {
804 std::ofstream out(filename.c_str(), std::ofstream::app);
805 out << s;
806 out.close();
807 }
808 }
809
810
Print(const std::string & s)811 inline void SpParHelper::Print(const std::string & s)
812 {
813 int myrank;
814 MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
815 if(myrank == 0)
816 {
817 std::cerr << s;
818 }
819 }
820
Print(const std::string & s,MPI_Comm & world)821 inline void SpParHelper::Print(const std::string & s, MPI_Comm & world)
822 {
823 int myrank;
824 MPI_Comm_rank(world, &myrank);
825 if(myrank == 0)
826 {
827 std::cerr << s;
828 }
829 }
830
831
check_newline(int * bytes_read,int bytes_requested,char * buf)832 inline void SpParHelper::check_newline(int *bytes_read, int bytes_requested, char *buf)
833 {
834 if ((*bytes_read) < bytes_requested) {
835 // fewer bytes than expected, this means EOF
836 if (buf[(*bytes_read) - 1] != '\n') {
837 // doesn't terminate with a newline, add one to prevent infinite loop later
838 buf[(*bytes_read) - 1] = '\n';
839 std::cout << "Error in Matrix Market format, appending missing newline at end of file" << std::endl;
840 (*bytes_read)++;
841 }
842 }
843 }
844
845
FetchBatch(MPI_File & infile,MPI_Offset & curpos,MPI_Offset end_fpos,bool firstcall,std::vector<std::string> & lines,int myrank)846 inline bool SpParHelper::FetchBatch(MPI_File & infile, MPI_Offset & curpos, MPI_Offset end_fpos, bool firstcall, std::vector<std::string> & lines, int myrank)
847 {
848 size_t bytes2fetch = ONEMILLION; // we might read more than needed but no problem as we won't process them
849 MPI_Status status;
850 int bytes_read;
851 if(firstcall && myrank != 0)
852 {
853 curpos -= 1; // first byte is to check whether we started at the beginning of a line
854 bytes2fetch += 1;
855 }
856 char * buf = new char[bytes2fetch]; // needs to happen **after** bytes2fetch is updated
857 char * originalbuf = buf; // so that we can delete it later because "buf" will move
858
859 MPI_File_read_at(infile, curpos, buf, bytes2fetch, MPI_CHAR, &status);
860 MPI_Get_count(&status, MPI_CHAR, &bytes_read); // MPI_Get_Count can only return 32-bit integers
861 if(!bytes_read)
862 {
863 delete [] originalbuf;
864 return true; // done
865 }
866 SpParHelper::check_newline(&bytes_read, bytes2fetch, buf);
867 if(firstcall && myrank != 0)
868 {
869 if(buf[0] == '\n') // we got super lucky and hit the line break
870 {
871 buf += 1;
872 bytes_read -= 1;
873 curpos += 1;
874 }
875 else // skip to the next line and let the preceeding processor take care of this partial line
876 {
877 char *c = (char*)memchr(buf, '\n', MAXLINELENGTH); // return a pointer to the matching byte or NULL if the character does not occur
878 if (c == NULL) {
879 std::cout << "Unexpected line without a break" << std::endl;
880 }
881 int n = c - buf + 1;
882 bytes_read -= n;
883 buf += n;
884 curpos += n;
885 }
886 }
887 while(bytes_read > 0 && curpos < end_fpos) // this will also finish the last line
888 {
889 char *c = (char*)memchr(buf, '\n', bytes_read); // return a pointer to the matching byte or NULL if the character does not occur
890 if (c == NULL) {
891 delete [] originalbuf;
892 return false; // if bytes_read stops in the middle of a line, that line will be re-read next time since curpos has not been moved forward yet
893 }
894 int n = c - buf + 1;
895
896 // string constructor from char * buffer: copies the first n characters from the array of characters pointed by s
897 lines.push_back(std::string(buf, n-1)); // no need to copy the newline character
898 bytes_read -= n; // reduce remaining bytes
899 buf += n; // move forward the buffer
900 curpos += n;
901 }
902 delete [] originalbuf;
903 if (curpos >= end_fpos) return true; // don't call it again, nothing left to read
904 else return false;
905 }
906
907
WaitNFree(std::vector<MPI_Win> & arrwin)908 inline void SpParHelper::WaitNFree(std::vector<MPI_Win> & arrwin)
909 {
910 // End the exposure epochs for the arrays of the local matrices A and B
911 // The Wait() call matches calls to Complete() issued by ** EACH OF THE ORIGIN PROCESSES **
912 // that were granted access to the window during this epoch.
913 for(unsigned int i=0; i< arrwin.size(); ++i)
914 {
915 MPI_Win_wait(arrwin[i]);
916 }
917 FreeWindows(arrwin);
918 }
919
FreeWindows(std::vector<MPI_Win> & arrwin)920 inline void SpParHelper::FreeWindows(std::vector<MPI_Win> & arrwin)
921 {
922 for(unsigned int i=0; i< arrwin.size(); ++i)
923 {
924 MPI_Win_free(&arrwin[i]);
925 }
926 }
927
928 }
929