1 /*
2 * ParCommGraph.cpp
3 *
4 */
5
6 #include "moab/ParCommGraph.hpp"
7 // we need to recompute adjacencies for merging to work
8 #include "moab/Core.hpp"
9 #include "AEntityFactory.hpp"
10
11 #ifdef MOAB_HAVE_ZOLTAN
12 #include "moab/ZoltanPartitioner.hpp"
13 #endif
14
15 namespace moab {
16
ParCommGraph(MPI_Comm joincomm,MPI_Group group1,MPI_Group group2,int coid1,int coid2)17 ParCommGraph::ParCommGraph(MPI_Comm joincomm, MPI_Group group1, MPI_Group group2, int coid1, int coid2) :
18 comm(joincomm), compid1(coid1), compid2(coid2)
19 {
20 // find out the tasks from each group, in the joint communicator
21 find_group_ranks(group1, comm, senderTasks);
22 find_group_ranks(group2, comm, receiverTasks);
23
24 rootSender = rootReceiver = false;
25 rankInGroup1 = rankInGroup2 = rankInJoin = -1; // not initialized, or not part of the group
26
27 int mpierr = MPI_Group_rank(group1, &rankInGroup1);
28 if (MPI_SUCCESS != mpierr || rankInGroup1 == MPI_UNDEFINED)
29 rankInGroup1 = -1;
30
31 mpierr = MPI_Group_rank(group2, &rankInGroup2);
32 if (MPI_SUCCESS != mpierr || rankInGroup2 == MPI_UNDEFINED)
33 rankInGroup2 = -1;
34
35 mpierr = MPI_Comm_rank(comm, &rankInJoin);
36 if (MPI_SUCCESS != mpierr) // it should be a fatal error
37 rankInJoin = -1;
38
39 mpierr = MPI_Comm_size(comm, &joinSize);
40 if (MPI_SUCCESS != mpierr) // it should be a fatal error
41 joinSize = -1;
42
43 if (0==rankInGroup1)rootSender=true;
44 if (0==rankInGroup2)rootReceiver=true;
45 recomputed_send_graph = false;
46 comm_graph = NULL;
47 }
48
~ParCommGraph()49 ParCommGraph::~ParCommGraph() {
50 // TODO Auto-generated destructor stub
51 }
52
53 // utility to find out the ranks of the processes of a group, with respect to a joint comm,
54 // which spans for sure the group
55 // it is used locally (in the constructor), but it can be used as a utility
find_group_ranks(MPI_Group group,MPI_Comm joincomm,std::vector<int> & ranks)56 void ParCommGraph::find_group_ranks(MPI_Group group, MPI_Comm joincomm, std::vector<int> & ranks)
57 {
58 MPI_Group global_grp;
59 MPI_Comm_group(joincomm, &global_grp);
60
61 int grp_size;
62
63 MPI_Group_size(group, &grp_size);
64 std::vector<int> rks (grp_size);
65 ranks.resize(grp_size);
66
67 for (int i = 0; i < grp_size; i++)
68 rks[i] = i;
69
70 MPI_Group_translate_ranks(group, grp_size, &rks[0], global_grp, &ranks[0]);
71 MPI_Group_free(&global_grp);
72 return;
73 }
74
compute_trivial_partition(std::vector<int> & numElemsPerTaskInGroup1)75 ErrorCode ParCommGraph::compute_trivial_partition (std::vector<int> & numElemsPerTaskInGroup1)
76 {
77
78 recv_graph.clear(); recv_sizes.clear();
79 sender_graph.clear(); sender_sizes.clear();
80
81 if (numElemsPerTaskInGroup1.size() != senderTasks.size())
82 return MB_FAILURE; // each sender has a number of elements that it owns
83
84 // first find out total number of elements to be sent from all senders
85 int total_elems=0;
86 std::vector<int> accum;
87 accum.push_back(0);
88
89 int num_senders = (int) senderTasks.size();
90
91 for (size_t k=0; k<numElemsPerTaskInGroup1.size(); k++)
92 {
93 total_elems+=numElemsPerTaskInGroup1[k];
94 accum.push_back(total_elems);
95 }
96
97 int num_recv = ((int)receiverTasks.size());
98 // in trivial partition, every receiver should get about total_elems/num_receivers elements
99 int num_per_receiver = (int)(total_elems/num_recv);
100 int leftover = total_elems - num_per_receiver*num_recv;
101
102 // so receiver k will receive [starts[k], starts[k+1] ) interval
103 std::vector<int> starts;
104 starts.resize(num_recv+1);
105 starts[0]=0;
106 for (int k=0; k<num_recv; k++)
107 {
108 starts[k+1] = starts[k]+ num_per_receiver;
109 if (k<leftover) starts[k+1]++;
110 }
111
112 // each sender will send to a number of receivers, based on how the
113 // arrays starts[0:num_recv] and accum[0:sendr] overlap
114 int lastUsedReceiverRank = 0; // first receiver was not treated yet
115 for (int j = 0; j < num_senders; j++ )
116 {
117 // we could start the receiver loop with the latest receiver that received from previous sender
118 for (int k = lastUsedReceiverRank; k<num_recv; k++ )
119 {
120 // if overlap:
121 if (starts[k]<accum[j+1] && starts[k+1]> accum[j] )
122 {
123 recv_graph[receiverTasks[k]].push_back(senderTasks[j]);
124 sender_graph[senderTasks[j]].push_back(receiverTasks[k]);
125
126 // we still need to decide what is the overlap
127 int sizeOverlap = 1; // at least 1, for sure
128 //1
129 if ( starts[k] >= accum[j]) // one end is starts[k]
130 {
131 if (starts[k+1] >= accum[j+1]) // the other end is accum[j+1]
132 sizeOverlap = accum[j+1]-starts[k];
133 else //
134 sizeOverlap = starts[k+1]-starts[k];
135 }
136 else // one end is accum[j]
137 {
138 if (starts[k+1] >= accum[j+1]) // the other end is accum[j+1]
139 sizeOverlap = accum[j+1]-accum[j];
140 else
141 sizeOverlap = starts[k+1]-accum[j];
142 }
143 recv_sizes[receiverTasks[k]].push_back(sizeOverlap); // basically, task k will receive from
144 // sender j, sizeOverlap elems
145 sender_sizes[senderTasks[j]].push_back(sizeOverlap);
146 if (starts[k] > accum[j+1])
147 {
148 lastUsedReceiverRank = k-1; // so next k loop will start a little higher, we probably
149 // finished with first few receivers (up to receiver lastUsedReceiverRank)
150 break ; // break the k loop, we distributed all elements from sender j to some receivers
151 }
152 }
153 }
154 }
155
156 return MB_SUCCESS;
157 }
158
pack_receivers_graph(std::vector<int> & packed_recv_array)159 ErrorCode ParCommGraph::pack_receivers_graph(std::vector<int>& packed_recv_array )
160 {
161 // it will basically look at local data, to pack communication graph, each receiver task will have to post receives
162 // for each sender task that will send data to it;
163 // the array will be communicated to root receiver, and eventually distributed to receiver tasks
164
165 /*
166 * packed_array will have receiver, number of senders, then senders, etc
167 */
168 if (recv_graph.size() < receiverTasks.size())
169 {
170 // big problem, we have empty partitions in receive
171 std::cout<<" WARNING: empty partitions, some receiver tasks will receive nothing.\n";
172 }
173 for (std::map<int, std::vector<int> >::iterator it=recv_graph.begin(); it!=recv_graph.end(); it++ )
174 {
175 int recv = it->first;
176 std::vector<int> & senders = it->second;
177 packed_recv_array.push_back(recv);
178 packed_recv_array.push_back( (int) senders.size() );
179
180 for (int k = 0; k<(int)senders.size(); k++ )
181 packed_recv_array.push_back( senders[k] );
182 }
183
184 return MB_SUCCESS;
185 }
186
split_owned_range(int sender_rank,Range & owned)187 ErrorCode ParCommGraph::split_owned_range (int sender_rank, Range & owned)
188 {
189 int senderTask = senderTasks[sender_rank];
190 std::vector<int> & distribution = sender_sizes[senderTask];
191 std::vector<int> & receivers = sender_graph[senderTask];
192 if (distribution.size() != receivers.size()) //
193 return MB_FAILURE;
194
195 Range current = owned; // get the full range first, then we will subtract stuff, for
196 // the following ranges
197
198 Range rleftover=current;
199 for (size_t k=0; k<receivers.size(); k++ )
200 {
201 Range newr;
202 newr.insert(current.begin(), current.begin() +distribution[k]);
203 split_ranges[ receivers[k] ] = newr;
204
205 rleftover = subtract(current, newr );
206 current = rleftover;
207 }
208
209 return MB_SUCCESS;
210 }
211
212 // use for this the corresponding tasks and sizes
split_owned_range(Range & owned)213 ErrorCode ParCommGraph::split_owned_range (Range & owned)
214 {
215 if (corr_tasks.size() != corr_sizes.size()) //
216 return MB_FAILURE;
217
218 Range current = owned; // get the full range first, then we will subtract stuff, for
219 // the following ranges
220
221 Range rleftover=current;
222 for (size_t k=0; k<corr_tasks.size(); k++ )
223 {
224 Range newr;
225 newr.insert(current.begin(), current.begin() +corr_sizes[k]);
226 split_ranges[ corr_tasks[k] ] = newr;
227
228 rleftover = subtract(current, newr );
229 current = rleftover;
230 }
231
232 return MB_SUCCESS;
233 }
234
send_graph(MPI_Comm jcomm)235 ErrorCode ParCommGraph::send_graph(MPI_Comm jcomm)
236 {
237 if ( is_root_sender())
238 {
239 int ierr;
240 // will need to build a communication graph, because each sender knows now to which receiver to send data
241 // the receivers need to post receives for each sender that will send data to them
242 // will need to gather on rank 0 on the sender comm, global ranks of sender with receivers to send
243 // build communication matrix, each receiver will receive from what sender
244
245 std::vector<int> packed_recv_array;
246 ErrorCode rval = pack_receivers_graph(packed_recv_array );
247 if (MB_SUCCESS!=rval) return rval;
248
249 int size_pack_array = (int) packed_recv_array.size();
250 comm_graph = new int[size_pack_array+1];
251 comm_graph[0]=size_pack_array;
252 for (int k=0; k<size_pack_array; k++)
253 comm_graph[k+1]=packed_recv_array[k];
254 // will add 2 requests
255 /// use tag 10 to send size and tag 20 to send the packed array
256 sendReqs.resize(2);
257 ierr = MPI_Isend(&comm_graph[0], 1, MPI_INT, receiver(0), 10, jcomm, &sendReqs[0]); // we have to use global communicator
258 if (ierr!=0) return MB_FAILURE;
259 ierr = MPI_Isend(&comm_graph[1], size_pack_array, MPI_INT, receiver(0), 20, jcomm, &sendReqs[1]); // we have to use global communicator
260 if (ierr!=0) return MB_FAILURE;
261 }
262 return MB_SUCCESS;
263 }
264
265 // pco has MOAB too get_moab()
266 // do we need to store "method" as a member variable ?
send_mesh_parts(MPI_Comm jcomm,ParallelComm * pco,Range & owned)267 ErrorCode ParCommGraph::send_mesh_parts(MPI_Comm jcomm, ParallelComm * pco, Range & owned )
268 {
269
270 ErrorCode rval;
271 if (split_ranges.empty()) // in trivial partition
272 {
273 rval= split_owned_range (rankInGroup1, owned);
274 if (rval!=MB_SUCCESS) return rval;
275 // we know this on the sender side:
276 corr_tasks = sender_graph[ senderTasks[rankInGroup1]]; // copy
277 corr_sizes = sender_sizes[ senderTasks[rankInGroup1]]; // another copy
278 }
279
280 int indexReq=0;
281 int ierr; // MPI error
282 if (is_root_sender()) indexReq = 2; // for sendReqs
283 sendReqs.resize(indexReq+2*split_ranges.size());
284 for (std::map<int, Range>::iterator it=split_ranges.begin(); it!=split_ranges.end(); it++)
285 {
286 int receiver_proc = it->first;
287 Range ents = it->second;
288
289 // add necessary vertices too
290 Range verts;
291 rval = pco->get_moab()->get_adjacencies(ents, 0, false, verts, Interface::UNION);
292 if (rval!=MB_SUCCESS) return rval;
293 ents.merge(verts);
294 ParallelComm::Buffer * buffer = new ParallelComm::Buffer(ParallelComm::INITIAL_BUFF_SIZE);
295 buffer->reset_ptr(sizeof(int));
296 rval = pco->pack_buffer(ents, false, true, false, -1, buffer);
297 if (rval!=MB_SUCCESS) return rval;
298 int size_pack = buffer->get_current_size();
299
300 // TODO there could be an issue with endian things; check !!!!!
301 // we are sending the size of the buffer first as an int!!!
302 ierr = MPI_Isend(buffer->mem_ptr, 1, MPI_INT, receiver_proc, 1, jcomm, &sendReqs[indexReq]); // we have to use global communicator
303 if (ierr!=0) return MB_FAILURE;
304 indexReq++;
305
306 ierr = MPI_Isend(buffer->mem_ptr, size_pack, MPI_CHAR, receiver_proc, 2, jcomm, &sendReqs[indexReq]); // we have to use global communicator
307 if (ierr!=0) return MB_FAILURE;
308 indexReq++;
309 localSendBuffs.push_back(buffer);
310
311 }
312 return MB_SUCCESS;
313 }
314
315 // this is called on receiver side
receive_comm_graph(MPI_Comm jcomm,ParallelComm * pco,std::vector<int> & pack_array)316 ErrorCode ParCommGraph::receive_comm_graph(MPI_Comm jcomm, ParallelComm *pco, std::vector<int> & pack_array)
317 {
318 // first, receive from sender_rank 0, the communication graph (matrix), so each receiver
319 // knows what data to expect
320 MPI_Comm receive = pco->comm();
321 int size_pack_array, ierr;
322 MPI_Status status;
323 if (rootReceiver)
324 {
325 ierr = MPI_Recv (&size_pack_array, 1, MPI_INT, sender(0), 10, jcomm, &status);
326 if (0!=ierr) return MB_FAILURE;
327 #ifdef VERBOSE
328 std::cout <<" receive comm graph size: " << size_pack_array << "\n";
329 #endif
330 pack_array.resize (size_pack_array);
331 ierr = MPI_Recv (&pack_array[0], size_pack_array, MPI_INT, sender(0), 20, jcomm, &status);
332 if (0!=ierr) return MB_FAILURE;
333 #ifdef VERBOSE
334 std::cout <<" receive comm graph " ;
335 for (int k=0; k<(int)pack_array.size(); k++)
336 std::cout << " " << pack_array[k];
337 std::cout <<"\n";
338 #endif
339 }
340
341 // now broadcast this whole array to all receivers, so they know what to expect
342 ierr = MPI_Bcast(&size_pack_array, 1, MPI_INT, 0, receive);
343 if (0!=ierr) return MB_FAILURE;
344 pack_array.resize(size_pack_array);
345 ierr = MPI_Bcast(&pack_array[0], size_pack_array, MPI_INT, 0, receive);
346 if (0!=ierr) return MB_FAILURE;
347 return MB_SUCCESS;
348 }
349
receive_mesh(MPI_Comm jcomm,ParallelComm * pco,EntityHandle local_set,std::vector<int> & senders_local)350 ErrorCode ParCommGraph::receive_mesh(MPI_Comm jcomm, ParallelComm *pco, EntityHandle local_set, std::vector<int> &senders_local)
351 {
352 ErrorCode rval;
353 int ierr;
354 MPI_Status status;
355 // we also need to fill corresponding mesh info on the other side
356 corr_tasks = senders_local;
357 Range newEnts;
358
359 Tag orgSendProcTag; // this will be a tag set on the received mesh, with info about from what task / PE the
360 // primary element came from, in the joint communicator ; this will be forwarded by coverage mesh
361 int defaultInt=-1; // no processor, so it was not migrated from somewhere else
362 rval = pco->get_moab()->tag_get_handle("orig_sending_processor", 1, MB_TYPE_INTEGER, orgSendProcTag,
363 MB_TAG_DENSE | MB_TAG_CREAT, &defaultInt);MB_CHK_SET_ERR(rval, "can't create original sending processor tag");
364 if (!senders_local.empty())
365 {
366 for (size_t k=0; k< senders_local.size(); k++)
367 {
368 int sender1 = senders_local[k]; // first receive the size of the buffer
369
370 int size_pack;
371 ierr = MPI_Recv (&size_pack, 1, MPI_INT, sender1, 1, jcomm, &status);
372 if (0!=ierr) return MB_FAILURE;
373 // now resize the buffer, then receive it
374 ParallelComm::Buffer * buffer = new ParallelComm::Buffer(size_pack);
375 //buffer->reserve(size_pack);
376
377 ierr = MPI_Recv (buffer->mem_ptr, size_pack, MPI_CHAR, sender1, 2, jcomm, &status);
378 if (0!=ierr) return MB_FAILURE;
379 // now unpack the buffer we just received
380 Range entities;
381 std::vector<std::vector<EntityHandle> > L1hloc, L1hrem;
382 std::vector<std::vector<int> > L1p;
383 std::vector<EntityHandle> L2hloc, L2hrem;
384 std::vector<unsigned int> L2p;
385
386 buffer->reset_ptr(sizeof(int));
387 std::vector<EntityHandle> entities_vec(entities.size());
388 std::copy(entities.begin(), entities.end(), entities_vec.begin());
389 rval = pco->unpack_buffer(buffer->buff_ptr, false, -1, -1, L1hloc, L1hrem, L1p, L2hloc,
390 L2hrem, L2p, entities_vec);
391 delete buffer;
392 if (MB_SUCCESS!= rval) return rval;
393
394 std::copy(entities_vec.begin(), entities_vec.end(), range_inserter(entities));
395 // we have to add them to the local set
396 rval = pco->get_moab()->add_entities(local_set, entities);
397 if (MB_SUCCESS!= rval) return rval;
398 // corr_sizes is the size of primary entities received
399 Range verts = entities.subset_by_dimension(0);
400 Range local_primary_ents=subtract(entities, verts);
401 corr_sizes.push_back( (int) local_primary_ents.size());
402 // set a tag with the original sender for the primary entity
403 // will be used later for coverage mesh
404 std::vector<int> orig_senders(local_primary_ents.size(), sender1);
405 rval = pco->get_moab()->tag_set_data(orgSendProcTag, local_primary_ents, &orig_senders[0]);
406 newEnts.merge(entities);
407 // make these in split ranges
408 split_ranges[sender1]=local_primary_ents;
409
410 #ifdef VERBOSE
411 std::ostringstream partial_outFile;
412
413 partial_outFile <<"part_send_" <<sender1<<"."<< "recv"<< rankInJoin <<".vtk";
414
415 // the mesh contains ghosts too, but they are not part of mat/neumann set
416 // write in serial the file, to see what tags are missing
417 std::cout<< " writing from receiver " << rankInJoin << " from sender " << sender1 << " entities: " << entities.size() << std::endl;
418 rval = pco->get_moab()->write_file(partial_outFile.str().c_str(), 0, 0, &local_set, 1); // everything on local set received
419 if (MB_SUCCESS!= rval) return rval;
420 #endif
421 }
422
423 }
424 // make sure adjacencies are updated on the new elements
425
426 if (newEnts.empty())
427 {
428 std::cout <<" WARNING: this task did not receive any entities \n";
429 }
430 // in order for the merging to work, we need to be sure that the adjacencies are updated (created)
431 Range local_verts = newEnts.subset_by_type(MBVERTEX);
432 newEnts = subtract(newEnts, local_verts);
433 Core * mb = (Core*)pco->get_moab();
434 AEntityFactory* adj_fact = mb->a_entity_factory();
435 if (!adj_fact->vert_elem_adjacencies())
436 adj_fact->create_vert_elem_adjacencies();
437 else
438 {
439 for (Range::iterator it=newEnts.begin(); it!=newEnts.end(); it++)
440 {
441 EntityHandle eh = *it;
442 const EntityHandle * conn = NULL;
443 int num_nodes=0;
444 rval = mb->get_connectivity(eh, conn, num_nodes);
445 if (MB_SUCCESS!= rval) return rval;
446 adj_fact->notify_create_entity(eh, conn, num_nodes);
447 }
448 }
449
450 return MB_SUCCESS;
451 }
452
453 // VSM: Why is the communicator never used. Remove the argument ?
release_send_buffers(MPI_Comm)454 ErrorCode ParCommGraph::release_send_buffers(MPI_Comm /*jcomm*/)
455 {
456 int ierr, nsize = (int)sendReqs.size();
457 std::vector<MPI_Status> mult_status;
458 mult_status.resize(sendReqs.size());
459 ierr = MPI_Waitall(nsize, &sendReqs[0], &mult_status[0]);
460
461 if (ierr!=0)
462 return MB_FAILURE;
463 // now we can free all buffers
464 delete [] comm_graph;
465 comm_graph = NULL;
466 std::vector<ParallelComm::Buffer*>::iterator vit;
467 for (vit = localSendBuffs.begin(); vit != localSendBuffs.end(); ++vit)
468 delete (*vit);
469 localSendBuffs.clear();
470 return MB_SUCCESS;
471 }
472
473 // again, will use the send buffers, for nonblocking sends;
474 // should be the receives non-blocking too?
send_tag_values(MPI_Comm jcomm,ParallelComm * pco,Range & owned,std::vector<Tag> & tag_handles)475 ErrorCode ParCommGraph::send_tag_values (MPI_Comm jcomm, ParallelComm *pco, Range & owned,
476 std::vector<Tag> & tag_handles )
477 {
478 // basically, owned.size() needs to be equal to sum(corr_sizes)
479 // get info about the tag size, type, etc
480 int ierr;
481 Core * mb = (Core*)pco->get_moab();
482 // get info about the tag
483 //! Get the size of the specified tag in bytes
484 int total_bytes_per_entity=0; // we need to know, to allocate buffers
485 ErrorCode rval;
486 std::vector<int> vect_bytes_per_tag;
487 #ifdef VERBOSE
488 std::vector<int> tag_sizes;
489 #endif
490 for (size_t i=0; i<tag_handles.size(); i++)
491 {
492 int bytes_per_tag;
493 rval = mb-> tag_get_bytes(tag_handles[i], bytes_per_tag) ;MB_CHK_ERR ( rval );
494 total_bytes_per_entity +=bytes_per_tag;
495 vect_bytes_per_tag.push_back(bytes_per_tag);
496 #ifdef VERBOSE
497 int tag_size;
498 rval = mb->tag_get_length(tag_handles[i], tag_size);MB_CHK_ERR ( rval );
499 tag_sizes.push_back(tag_size);
500 #endif
501 }
502
503
504
505 // bool specified_ids = send_IDs_map.size() > 0;
506 bool specified_ids = recomputed_send_graph; // in cases when sender is completely over land, send_IDs_map can still be size 0
507 int indexReq=0;
508 if (!specified_ids) // original send
509 {
510 // use the buffers data structure to allocate memory for sending the tags
511 sendReqs.resize(split_ranges.size());
512
513 for (std::map<int, Range>::iterator it=split_ranges.begin(); it!=split_ranges.end(); it++)
514 {
515 int receiver_proc = it->first;
516 Range ents = it->second; // primary entities, with the tag data
517 int size_buffer = 4 + total_bytes_per_entity*(int)ents.size(); // hopefully, below 2B; if more, we have a big problem ...
518 ParallelComm::Buffer * buffer = new ParallelComm::Buffer(size_buffer);
519
520 buffer->reset_ptr(sizeof(int));
521 for (size_t i=0; i<tag_handles.size(); i++)
522 {
523 // copy tag data to buffer->buff_ptr, and send the buffer (we could have used regular char arrays)
524 rval = mb->tag_get_data(tag_handles[i], ents, (void*)(buffer->buff_ptr) );MB_CHK_ERR ( rval );
525 // advance the butter
526 buffer->buff_ptr += vect_bytes_per_tag[i]*ents.size();
527 }
528 *((int*)buffer->mem_ptr) = size_buffer;
529 //int size_pack = buffer->get_current_size(); // debug check
530 ierr = MPI_Isend(buffer->mem_ptr, size_buffer, MPI_CHAR, receiver_proc, 222, jcomm, &sendReqs[indexReq]); // we have to use global communicator
531 if (ierr!=0) return MB_FAILURE;
532 indexReq++;
533 localSendBuffs.push_back(buffer); // we will release them after nonblocking sends are completed
534 }
535 }
536 else
537 {
538 // we know that we will need to send some tag data in a specific order
539 // first, get the ids of the local elements, from owned Range; arrange the buffer in order of increasing global id
540 Tag gidTag;
541 rval = mb->tag_get_handle("GLOBAL_ID", gidTag); MB_CHK_ERR ( rval );
542 std::vector<int> gids;
543 gids.resize(owned.size());
544 rval = mb->tag_get_data(gidTag, owned, &gids[0]); MB_CHK_ERR ( rval );
545 std::map<int, EntityHandle> gidToHandle;
546 size_t i=0;
547 for (Range::iterator it=owned.begin(); it!= owned.end(); it++)
548 {
549 EntityHandle eh = *it;
550 gidToHandle[gids[i++]] = eh;
551 }
552 // now, pack the data and send it
553 sendReqs.resize(send_IDs_map.size());
554 for (std::map<int ,std::vector<int> >::iterator mit =send_IDs_map.begin(); mit!=send_IDs_map.end(); mit++)
555 {
556 int receiver_proc = mit->first;
557 std::vector<int> & eids = mit->second;
558 int size_buffer = 4 + total_bytes_per_entity*(int)eids.size(); // hopefully, below 2B; if more, we have a big problem ...
559 ParallelComm::Buffer * buffer = new ParallelComm::Buffer(size_buffer);
560 buffer->reset_ptr(sizeof(int));
561 #ifdef VERBOSE
562 std::ofstream dbfile;
563 std::stringstream outf;
564 outf << "from_" << rankInJoin <<"_send_to_" << receiver_proc << ".txt";
565 dbfile.open (outf.str().c_str());
566 dbfile << "from " << rankInJoin << " send to " << receiver_proc << "\n";
567 #endif
568 // copy tag data to buffer->buff_ptr, and send the buffer (we could have used regular char arrays)
569 // pack data by tag, to be consistent with above, even though we loop through the entities for each tag
570
571 for (std::vector<int>::iterator it=eids.begin(); it!=eids.end(); it++)
572 {
573 int eID = *it;
574 EntityHandle eh = gidToHandle[eID];
575 for (i=0; i<tag_handles.size(); i++)
576 {
577 rval = mb->tag_get_data(tag_handles[i], &eh, 1, (void*)(buffer->buff_ptr) ); MB_CHK_ERR ( rval );
578 #ifdef VERBOSE
579 dbfile<< "global ID " << eID << " local handle " << mb->id_from_handle(eh) << " vals: ";
580 double * vals = (double*) (buffer->buff_ptr);
581 for (int kk=0; kk<tag_sizes[i]; kk++)
582 {
583 dbfile << " " << *vals;
584 vals++;
585 }
586 dbfile << "\n";
587 #endif
588 buffer->buff_ptr+=vect_bytes_per_tag[i];
589 }
590 }
591
592 #ifdef VERBOSE
593 dbfile.close();
594 #endif
595 *((int*)buffer->mem_ptr) = size_buffer;
596 //int size_pack = buffer->get_current_size(); // debug check
597 ierr = MPI_Isend(buffer->mem_ptr, size_buffer, MPI_CHAR, receiver_proc, 222, jcomm, &sendReqs[indexReq]); // we have to use global communicator
598 if (ierr!=0) return MB_FAILURE;
599 indexReq++;
600 localSendBuffs.push_back(buffer); // we will release them after nonblocking sends are completed
601 }
602 }
603
604 return MB_SUCCESS;
605 }
606
receive_tag_values(MPI_Comm jcomm,ParallelComm * pco,Range & owned,std::vector<Tag> & tag_handles)607 ErrorCode ParCommGraph::receive_tag_values (MPI_Comm jcomm, ParallelComm *pco, Range & owned,
608 std::vector<Tag> & tag_handles )
609 {
610 // opposite to sending, we will use blocking receives
611 int ierr;
612 MPI_Status status;
613 // basically, owned.size() needs to be equal to sum(corr_sizes)
614 // get info about the tag size, type, etc
615 Core * mb = (Core*)pco->get_moab();
616 // get info about the tag
617 //! Get the size of the specified tag in bytes
618 ErrorCode rval;
619 int total_bytes_per_entity=0;
620 std::vector<int> vect_bytes_per_tag;
621 #ifdef VERBOSE
622 std::vector<int> tag_sizes;
623 #endif
624 for (size_t i=0; i<tag_handles.size(); i++)
625 {
626 int bytes_per_tag;
627 rval = mb-> tag_get_bytes(tag_handles[i], bytes_per_tag) ;MB_CHK_ERR ( rval );
628 total_bytes_per_entity +=bytes_per_tag;
629 vect_bytes_per_tag.push_back(bytes_per_tag);
630 #ifdef VERBOSE
631 int tag_size;
632 rval = mb->tag_get_length(tag_handles[i], tag_size);MB_CHK_ERR ( rval );
633 tag_sizes.push_back(tag_size);
634 #endif
635 }
636
637
638
639 bool specified_ids = recv_IDs_map.size() > 0;
640 if (!specified_ids)
641 {
642 //std::map<int, Range> split_ranges;
643 //rval = split_owned_range ( owned);MB_CHK_ERR ( rval );
644
645 // use the buffers data structure to allocate memory for sending the tags
646 for (std::map<int, Range>::iterator it=split_ranges.begin(); it!=split_ranges.end(); it++)
647 {
648 int sender_proc = it->first;
649 Range ents = it->second; // primary entities, with the tag data, we will receive
650 int size_buffer = 4 + total_bytes_per_entity*(int)ents.size(); // hopefully, below 2B; if more, we have a big problem ...
651 ParallelComm::Buffer * buffer = new ParallelComm::Buffer(size_buffer);
652
653 buffer->reset_ptr(sizeof(int));
654
655 *((int*)buffer->mem_ptr) = size_buffer;
656 //int size_pack = buffer->get_current_size(); // debug check
657
658 ierr = MPI_Recv (buffer->mem_ptr, size_buffer, MPI_CHAR, sender_proc, 222, jcomm, &status);
659 if (ierr!=0) return MB_FAILURE;
660 // now set the tag
661 // copy to tag
662
663 for (size_t i=0; i<tag_handles.size(); i++)
664 {
665 rval = mb->tag_set_data(tag_handles[i], ents, (void*)(buffer->buff_ptr) );
666 buffer->buff_ptr += vect_bytes_per_tag[i]*ents.size();
667 }
668 delete buffer; // no need for it afterwards
669 MB_CHK_ERR ( rval );
670
671 }
672 }
673 else // receive buffer, then extract tag data, in a loop
674 {
675 // we know that we will need to receive some tag data in a specific order (by ids stored)
676 // first, get the ids of the local elements, from owned Range; unpack the buffer in order
677 Tag gidTag;
678 rval = mb->tag_get_handle("GLOBAL_ID", gidTag); MB_CHK_ERR ( rval );
679 std::vector<int> gids;
680 gids.resize(owned.size());
681 rval = mb->tag_get_data(gidTag, owned, &gids[0]); MB_CHK_ERR ( rval );
682 std::map<int, EntityHandle> gidToHandle;
683 size_t i=0;
684 for (Range::iterator it=owned.begin(); it!= owned.end(); it++)
685 {
686 EntityHandle eh = *it;
687 gidToHandle[gids[i++]] = eh;
688 }
689 //
690 // now, unpack the data and set it to the tag
691 for (std::map<int ,std::vector<int> >::iterator mit =recv_IDs_map.begin(); mit!=recv_IDs_map.end(); mit++)
692 {
693 int sender_proc = mit->first;
694 std::vector<int> & eids = mit->second;
695 int size_buffer = 4 + total_bytes_per_entity*(int)eids.size(); // hopefully, below 2B; if more, we have a big problem ...
696 ParallelComm::Buffer * buffer = new ParallelComm::Buffer(size_buffer);
697 buffer->reset_ptr(sizeof(int));
698 *((int*)buffer->mem_ptr) = size_buffer; // this is really not necessary, it should receive this too
699
700 // receive the buffer
701 ierr = MPI_Recv (buffer->mem_ptr, size_buffer, MPI_CHAR, sender_proc, 222, jcomm, &status);
702 if (ierr!=0) return MB_FAILURE;
703 // start copy
704 #ifdef VERBOSE
705 std::ofstream dbfile;
706 std::stringstream outf;
707 outf << "recvFrom_" << sender_proc <<"_on_proc_" << rankInJoin << ".txt";
708 dbfile.open (outf.str().c_str());
709 dbfile << "recvFrom_" << sender_proc << " on proc " << rankInJoin << "\n";
710 #endif
711
712 // copy tag data from buffer->buff_ptr
713 // data is arranged by tag , and repeat the loop for each entity ()
714 // maybe it should be arranged by entity now, not by tag (so one loop for entities, outside)
715
716 for (std::vector<int>::iterator it=eids.begin(); it!=eids.end(); it++)
717 {
718 int eID = *it;
719 EntityHandle eh = gidToHandle[eID];
720 for (i=0; i<tag_handles.size(); i++)
721 {
722 rval = mb->tag_set_data(tag_handles[i], &eh, 1, (void*)(buffer->buff_ptr) ); MB_CHK_ERR ( rval );
723 #ifdef VERBOSE
724 dbfile<< "global ID " << eID << " local handle " << mb->id_from_handle(eh) << " vals: ";
725 double * vals = (double*) (buffer->buff_ptr );
726 for (int kk=0; kk<tag_sizes[i]; kk++)
727 {
728 dbfile << " " << *vals;
729 vals++;
730 }
731 dbfile << "\n";
732 #endif
733 buffer->buff_ptr+=vect_bytes_per_tag[i];
734 }
735 }
736 #ifdef VERBOSE
737 dbfile.close();
738 #endif
739
740 }
741
742 }
743 return MB_SUCCESS;
744 }
745
distribute_sender_graph_info(MPI_Comm senderComm,std::vector<int> & sendingInfo)746 ErrorCode ParCommGraph::distribute_sender_graph_info(MPI_Comm senderComm, std::vector<int> &sendingInfo )
747 {
748 // only the root of the sender has all info
749 // it will use direct send/receives, for number of elems to be sent to each receiver, in an array
750 if (rootSender)
751 {
752 // just append the local info for the array , and send some data for each receiver
753 int nrecv0 = (int) recv_graph[senderTasks[0]].size();
754 for (int k=0; k<nrecv0; k++)
755 {
756 sendingInfo.push_back(recv_graph[senderTasks[0]][k]);
757 sendingInfo.push_back(recv_sizes[senderTasks[0]][k]);
758 }
759 // the rest of the info will be sent for each sender task
760 for (int j=1; j< (int) senderTasks.size(); j++)
761 {
762 std::vector<int> array_to_send;
763 // in the sender comm and sender group , rank to send to is j
764 for (int k=0; k<(int) recv_graph[senderTasks[j]].size(); k++)
765 {
766 array_to_send.push_back( recv_graph[senderTasks[j]][k]);
767 array_to_send.push_back( recv_sizes[senderTasks[j]][k]);
768 }
769
770 int ierr = MPI_Send(&array_to_send[0], (int)array_to_send.size(), MPI_INT, j, 11, senderComm);
771 if (MPI_SUCCESS != ierr)
772 return MB_FAILURE;
773 }
774 }
775 else
776 {
777 // expect to receive some info about where to send local data
778 // local array it is max the size of 2 times nreceivers;
779 int sizeBuffMax = 2 * (int) receiverTasks.size();
780 std::vector<int> data(sizeBuffMax);
781 MPI_Status status;
782 int ierr = MPI_Recv(&data[0], sizeBuffMax, MPI_INT, 0, 11, senderComm, &status);
783 if (MPI_SUCCESS != ierr)
784 return MB_FAILURE;
785 int count;
786 // find out how much data we actually got
787 MPI_Get_count(&status, MPI_INT, &count);
788 for (int k=0; k<count; k++)
789 sendingInfo.push_back(data[k]);
790
791 }
792 return MB_SUCCESS;
793 }
794
settle_send_graph(TupleList & TLcovIDs)795 ErrorCode ParCommGraph::settle_send_graph(TupleList & TLcovIDs)
796 {
797 // fill send_IDs_map with data
798 // will have "receiving proc" and global id of element
799 int n = TLcovIDs.get_n();
800 recomputed_send_graph = true; // do not rely only on send_IDs_map.size(); this can be 0 in some cases
801 for (int i=0; i<n; i++)
802 {
803 int to_proc= TLcovIDs.vi_wr[2 * i];
804 int globalIdElem= TLcovIDs.vi_wr[2 * i + 1 ];
805 send_IDs_map[to_proc].push_back(globalIdElem);
806 }
807 #ifdef VERBOSE
808 for (std::map<int ,std::vector<int> >::iterator mit=send_IDs_map.begin(); mit!=send_IDs_map.end(); mit++)
809 {
810 std::cout <<" towards task " << mit->first << " send: " << mit->second.size() << " cells "<< std::endl;
811 for (size_t i=0; i< mit->second.size() ; i++)
812 {
813 std::cout << " " << mit->second[i] ;
814 }
815 std::cout<<std::endl;
816 }
817 #endif
818 return MB_SUCCESS;
819 }
820
821 // this will set recv_IDs_map will store all ids to be received from one sender task
SetReceivingAfterCoverage(std::map<int,std::set<int>> & idsFromProcs)822 void ParCommGraph::SetReceivingAfterCoverage(std::map<int, std::set<int> > & idsFromProcs) // will make sense only on receivers, right now after cov
823 {
824 for (std::map<int, std::set<int> >::iterator mt=idsFromProcs.begin(); mt!=idsFromProcs.end(); mt++)
825 {
826 int fromProc = mt->first;
827 std::set<int> & setIds = mt->second;
828 recv_IDs_map[fromProc].resize( setIds.size() );
829 std::vector<int> & listIDs = recv_IDs_map[fromProc];
830 size_t indx = 0;
831 for ( std::set<int>::iterator st= setIds.begin(); st!=setIds.end(); st++)
832 {
833 int valueID = *st;
834 listIDs[indx++]=valueID;
835 }
836 }
837 return;
838 }
839
840 // new partition calculation
compute_partition(ParallelComm * pco,Range & owned,int met)841 ErrorCode ParCommGraph::compute_partition (ParallelComm *pco, Range & owned, int met)
842 {
843 // we are on a task on sender, and need to compute a new partition;
844 // primary cells need to be distributed to nb receivers tasks
845 // first, we will use graph partitioner, with zoltan;
846 // in the graph that we need to build, the first layer of ghosts is needed;
847 // can we avoid that ? For example, we can find out from each boundary edge/face what is the other
848 // cell (on the other side), then form the global graph, and call zoltan in parallel
849 // met 1 would be a geometric partitioner, and met 2 would be a graph partitioner
850 // for method 1 we do not need any ghost exchange
851
852 // find first edges that are shared
853 if (owned.empty())
854 return MB_SUCCESS; // nothing to do? empty partition is not allowed, maybe we should return error?
855 Core * mb = (Core*)pco->get_moab();
856
857 int primaryDim = mb->dimension_from_handle(*owned.rbegin());
858 int interfaceDim = primaryDim -1; // should be 1 or 2
859 Range sharedEdges;
860 ErrorCode rval = pco->get_shared_entities(/*int other_proc*/ -1, sharedEdges, interfaceDim, /*const bool iface*/ true);MB_CHK_ERR ( rval );
861
862 #if VERBOSE
863 std::cout <<" on sender task " << pco->rank() << " number of shared interface cells " << sharedEdges.size() << "\n";
864 #endif
865 // find to what processors we need to send the ghost info about the edge
866 std::vector<int> shprocs(MAX_SHARING_PROCS);
867 std::vector<EntityHandle> shhandles(MAX_SHARING_PROCS);
868
869 Tag gidTag; //
870 rval = mb->tag_get_handle("GLOBAL_ID", gidTag); MB_CHK_ERR ( rval );
871 int np;
872 unsigned char pstatus;
873
874 std::map<int, int> adjCellsId;
875 std::map<int, int> extraCellsProc;
876 // if method is 2, no need to do the exchange for adjacent cells across partition boundary
877 // these maps above will be empty for method 2 (geometry)
878 if (1==met)
879 {
880 // first determine the local graph; what elements are adjacent to each cell in owned range
881 // cells that are sharing a partition interface edge, are identified first, and form a map
882 TupleList TLe; // tuple list for cells
883 TLe.initialize(2, 0, 1, 0, sharedEdges.size()); // send to, id of adj cell, remote edge
884 TLe.enableWriteAccess();
885
886 std::map<EntityHandle, int> edgeToCell; // from local boundary edge to adjacent cell id
887 // will be changed after
888 for (Range::iterator eit=sharedEdges.begin(); eit!=sharedEdges.end(); eit++)
889 {
890 EntityHandle edge = *eit;
891 // get the adjacent cell
892 Range adjEnts;
893 rval = mb->get_adjacencies(&edge, 1, primaryDim, false, adjEnts); MB_CHK_ERR ( rval );
894 if (adjEnts.size()>0)
895 {
896 EntityHandle adjCell = adjEnts[0];
897 int gid;
898 rval = mb->tag_get_data(gidTag, &adjCell, 1, &gid); MB_CHK_ERR ( rval );
899 rval = pco->get_sharing_data(edge, &shprocs[0], &shhandles[0], pstatus, np); MB_CHK_ERR ( rval );
900 int n=TLe.get_n();
901 TLe.vi_wr[2*n] = shprocs[0];
902 TLe.vi_wr[2*n+1] = gid;
903 TLe.vul_wr[n] = shhandles[0]; // the remote edge corresponding to shared edge
904 edgeToCell[edge] = gid; // store the map between edge and local id of adj cell
905 TLe.inc_n();
906 }
907 }
908
909 #ifdef VERBOSE
910 std::stringstream ff2;
911 ff2 << "TLe_"<< pco->rank() << ".txt";
912 TLe.print_to_file(ff2.str().c_str());
913 #endif
914 // send the data to the other processors:
915 (pco->proc_config().crystal_router())->gs_transfer(1, TLe, 0);
916 // on receiver side, each local edge will have the remote cell adjacent to it!
917
918 int ne = TLe.get_n();
919 for (int i=0; i<ne; i++)
920 {
921 int sharedProc = TLe.vi_rd[2*i] ; // this info is coming from here, originally
922 int remoteCellID = TLe.vi_rd[2*i+1] ;
923 EntityHandle localCell = TLe.vul_rd[i] ; // this is now local cell on the this proc
924 adjCellsId [edgeToCell[localCell]] = remoteCellID;
925 extraCellsProc[remoteCellID] = sharedProc;
926 #if VERBOSE
927 std::cout <<"local ID " << edgeToCell[localCell] << " remote cell ID: " << remoteCellID << "\n";
928 #endif
929 }
930 }
931 // so adj cells ids; need to call zoltan for parallel partition
932 #ifdef MOAB_HAVE_ZOLTAN
933 ZoltanPartitioner * mbZTool = new ZoltanPartitioner(mb);
934 if (1<=met) // partition in zoltan, either graph or geometric partitioner
935 {
936 std::map<int, Range> distribution; // how to distribute owned elements by processors in receiving groups
937 // in how many tasks do we want to be distributed?
938 int numNewPartitions = (int)receiverTasks.size();
939 Range primaryCells=owned.subset_by_dimension(primaryDim);
940 rval = mbZTool->partition_owned_cells(primaryCells, pco, adjCellsId, extraCellsProc,
941 numNewPartitions, distribution, met); MB_CHK_ERR ( rval );
942 for (std::map<int, Range>::iterator mit=distribution.begin(); mit!=distribution.end(); mit++)
943 {
944 int part_index = mit->first;
945 assert(part_index <numNewPartitions );
946 split_ranges[receiverTasks[part_index]] = mit->second;
947 }
948 }
949 #endif
950
951 return MB_SUCCESS;
952 }
953 // at this moment, each sender task has split_ranges formed;
954 // we need to aggregate that info and send it to receiver
send_graph_partition(ParallelComm * pco,MPI_Comm jcomm)955 ErrorCode ParCommGraph::send_graph_partition (ParallelComm *pco, MPI_Comm jcomm)
956 {
957 // first, accumulate the info to root of sender; use gatherv
958 // first, accumulate number of receivers from each sender, to the root receiver
959 int numberReceivers = (int) split_ranges.size(); // these are ranges of elements to be sent to each receiver, from this task
960 int nSenders =(int) senderTasks.size(); //
961 // this sender will have to send to this many receivers
962 std::vector<int> displs(1); // displacements for gatherv
963 std::vector<int> counts(1);
964 if ( is_root_sender() )
965 {
966 displs.resize(nSenders+1);
967 counts.resize(nSenders);
968 }
969
970 int ierr= MPI_Gather( &numberReceivers, 1, MPI_INT, &counts[0], 1, MPI_INT, 0, pco->comm() );
971 if (ierr!=MPI_SUCCESS )return MB_FAILURE;
972 // compute now displacements
973 if ( is_root_sender() )
974 {
975 displs[0]=0;
976 for (int k=0; k<nSenders; k++)
977 {
978 displs[k+1] = displs[k]+counts[k];
979 }
980 }
981 std::vector<int> buffer;
982 if ( is_root_sender() ) buffer.resize(displs[nSenders]); // the last one will have the total count now
983
984 std::vector<int> recvs;
985 for (std::map<int, Range>::iterator mit=split_ranges.begin(); mit!=split_ranges.end(); mit++)
986 {
987 recvs.push_back( mit->first );
988 }
989 ierr = MPI_Gatherv(&recvs[0], numberReceivers, MPI_INT, &buffer[0], &counts[0], &displs[0], MPI_INT, 0, pco->comm());
990 if (ierr!=MPI_SUCCESS )return MB_FAILURE;
991
992 // now form recv_graph map; points from the
993 // now form the graph to be sent to the other side; only on root
994 if ( is_root_sender() )
995 {
996 //
997 for (int k=0; k<nSenders; k++)
998 {
999 int indexInBuff = displs[k];
1000 int senderTask = senderTasks[k];
1001 for (int j=0; j<counts[k]; j++)
1002 {
1003 int recvTask = buffer[indexInBuff+j];
1004 recv_graph[recvTask].push_back(senderTask); // this will be packed and sent to root receiver, with nonblocking send
1005 }
1006 }
1007 std::vector<int> packed_recv_array;
1008
1009 // this is the same as trivial partition
1010 ErrorCode rval = send_graph(jcomm); MB_CHK_ERR ( rval );
1011 }
1012
1013
1014 return MB_SUCCESS;
1015 }
1016 } // namespace moab
1017