1 /*
2  * ParCommGraph.cpp
3  *
4  */
5 
6 #include "moab/ParCommGraph.hpp"
7 // we need to recompute adjacencies for merging to work
8 #include "moab/Core.hpp"
9 #include "AEntityFactory.hpp"
10 
11 #ifdef MOAB_HAVE_ZOLTAN
12 #include "moab/ZoltanPartitioner.hpp"
13 #endif
14 
15 namespace moab {
16 
ParCommGraph(MPI_Comm joincomm,MPI_Group group1,MPI_Group group2,int coid1,int coid2)17 ParCommGraph::ParCommGraph(MPI_Comm joincomm, MPI_Group group1, MPI_Group group2, int coid1, int coid2) :
18   comm(joincomm), compid1(coid1), compid2(coid2)
19 {
20   // find out the tasks from each group, in the joint communicator
21   find_group_ranks(group1, comm, senderTasks);
22   find_group_ranks(group2, comm, receiverTasks);
23 
24   rootSender = rootReceiver = false;
25   rankInGroup1 =  rankInGroup2 = rankInJoin = -1; // not initialized, or not part of the group
26 
27   int mpierr = MPI_Group_rank(group1, &rankInGroup1);
28   if (MPI_SUCCESS != mpierr || rankInGroup1 == MPI_UNDEFINED)
29     rankInGroup1 = -1;
30 
31   mpierr = MPI_Group_rank(group2, &rankInGroup2);
32   if (MPI_SUCCESS != mpierr || rankInGroup2 == MPI_UNDEFINED)
33     rankInGroup2 = -1;
34 
35   mpierr = MPI_Comm_rank(comm, &rankInJoin);
36   if (MPI_SUCCESS != mpierr) // it should be a fatal error
37     rankInJoin = -1;
38 
39   mpierr = MPI_Comm_size(comm, &joinSize);
40   if (MPI_SUCCESS != mpierr) // it should be a fatal error
41     joinSize = -1;
42 
43   if (0==rankInGroup1)rootSender=true;
44   if (0==rankInGroup2)rootReceiver=true;
45   recomputed_send_graph = false;
46   comm_graph = NULL;
47 }
48 
~ParCommGraph()49 ParCommGraph::~ParCommGraph() {
50   // TODO Auto-generated destructor stub
51 }
52 
53 // utility to find out the ranks of the processes of a group, with respect to a joint comm,
54 // which spans for sure the group
55 // it is used locally (in the constructor), but it can be used as a utility
find_group_ranks(MPI_Group group,MPI_Comm joincomm,std::vector<int> & ranks)56 void ParCommGraph::find_group_ranks(MPI_Group group, MPI_Comm joincomm, std::vector<int> & ranks)
57 {
58    MPI_Group global_grp;
59    MPI_Comm_group(joincomm, &global_grp);
60 
61    int grp_size;
62 
63    MPI_Group_size(group, &grp_size);
64    std::vector<int> rks (grp_size);
65    ranks.resize(grp_size);
66 
67    for (int i = 0; i < grp_size; i++)
68      rks[i] = i;
69 
70    MPI_Group_translate_ranks(group, grp_size, &rks[0], global_grp, &ranks[0]);
71    MPI_Group_free(&global_grp);
72    return;
73 }
74 
compute_trivial_partition(std::vector<int> & numElemsPerTaskInGroup1)75 ErrorCode ParCommGraph::compute_trivial_partition (std::vector<int> & numElemsPerTaskInGroup1)
76 {
77 
78   recv_graph.clear(); recv_sizes.clear();
79   sender_graph.clear(); sender_sizes.clear();
80 
81   if (numElemsPerTaskInGroup1.size() != senderTasks.size())
82     return MB_FAILURE; // each sender has a number of elements that it owns
83 
84   // first find out total number of elements to be sent from all senders
85   int total_elems=0;
86   std::vector<int> accum;
87   accum.push_back(0);
88 
89   int num_senders = (int) senderTasks.size();
90 
91   for (size_t k=0; k<numElemsPerTaskInGroup1.size(); k++)
92   {
93     total_elems+=numElemsPerTaskInGroup1[k];
94     accum.push_back(total_elems);
95   }
96 
97   int num_recv  =  ((int)receiverTasks.size());
98   // in trivial partition, every receiver should get about total_elems/num_receivers elements
99   int num_per_receiver = (int)(total_elems/num_recv);
100   int leftover = total_elems - num_per_receiver*num_recv;
101 
102   // so receiver k will receive  [starts[k], starts[k+1] ) interval
103   std::vector<int> starts;
104   starts.resize(num_recv+1);
105   starts[0]=0;
106   for (int k=0; k<num_recv; k++)
107   {
108     starts[k+1] = starts[k]+  num_per_receiver;
109     if (k<leftover) starts[k+1]++;
110   }
111 
112   // each sender will send to a number of receivers, based on how the
113   // arrays starts[0:num_recv] and accum[0:sendr] overlap
114   int lastUsedReceiverRank = 0; // first receiver was not treated yet
115   for (int j = 0; j < num_senders; j++ )
116   {
117     // we could start the receiver loop with the latest receiver that received from previous sender
118     for (int k = lastUsedReceiverRank; k<num_recv; k++ )
119     {
120       // if overlap:
121       if (starts[k]<accum[j+1] && starts[k+1]> accum[j] )
122       {
123         recv_graph[receiverTasks[k]].push_back(senderTasks[j]);
124         sender_graph[senderTasks[j]].push_back(receiverTasks[k]);
125 
126         // we still need to decide what is the overlap
127         int sizeOverlap = 1; // at least 1, for sure
128         //1
129         if ( starts[k] >= accum[j]) // one end is starts[k]
130         {
131           if (starts[k+1] >= accum[j+1]) // the other end is accum[j+1]
132             sizeOverlap = accum[j+1]-starts[k];
133           else //
134             sizeOverlap = starts[k+1]-starts[k];
135         }
136         else // one end is accum[j]
137         {
138           if (starts[k+1] >= accum[j+1]) // the other end is accum[j+1]
139             sizeOverlap = accum[j+1]-accum[j];
140           else
141             sizeOverlap = starts[k+1]-accum[j];
142         }
143         recv_sizes[receiverTasks[k]].push_back(sizeOverlap); // basically, task k will receive from
144                                                         //   sender j, sizeOverlap elems
145         sender_sizes[senderTasks[j]].push_back(sizeOverlap);
146         if (starts[k] > accum[j+1])
147         {
148           lastUsedReceiverRank = k-1; // so next k loop will start a little higher, we probably
149                                             // finished with first few receivers (up to receiver lastUsedReceiverRank)
150           break ; // break the k loop, we distributed all elements from sender j to some receivers
151         }
152       }
153     }
154   }
155 
156   return MB_SUCCESS;
157 }
158 
pack_receivers_graph(std::vector<int> & packed_recv_array)159 ErrorCode ParCommGraph::pack_receivers_graph(std::vector<int>& packed_recv_array )
160 {
161   // it will basically look at local data, to pack communication graph, each receiver task will have to post receives
162   // for each sender task that will send data to it;
163   // the array will be communicated to root receiver, and eventually distributed to receiver tasks
164 
165   /*
166    * packed_array will have receiver, number of senders, then senders, etc
167    */
168   if (recv_graph.size() < receiverTasks.size())
169   {
170     // big problem, we have empty partitions in receive
171     std::cout<<" WARNING: empty partitions, some receiver tasks will receive nothing.\n";
172   }
173   for (std::map<int, std::vector<int> >::iterator it=recv_graph.begin(); it!=recv_graph.end(); it++ )
174   {
175     int recv = it->first;
176     std::vector<int> & senders = it->second;
177     packed_recv_array.push_back(recv);
178     packed_recv_array.push_back( (int) senders.size() );
179 
180     for (int k = 0; k<(int)senders.size(); k++ )
181       packed_recv_array.push_back( senders[k] );
182   }
183 
184   return MB_SUCCESS;
185 }
186 
split_owned_range(int sender_rank,Range & owned)187 ErrorCode ParCommGraph::split_owned_range (int sender_rank, Range & owned)
188 {
189   int senderTask = senderTasks[sender_rank];
190   std::vector<int> & distribution = sender_sizes[senderTask];
191   std::vector<int> & receivers = sender_graph[senderTask];
192   if (distribution.size() != receivers.size()) //
193     return MB_FAILURE;
194 
195   Range current = owned; // get the full range first, then we will subtract stuff, for
196   // the following ranges
197 
198   Range rleftover=current;
199   for (size_t k=0; k<receivers.size(); k++ )
200   {
201     Range newr;
202     newr.insert(current.begin(), current.begin() +distribution[k]);
203     split_ranges[ receivers[k] ] = newr;
204 
205     rleftover = subtract(current, newr );
206     current = rleftover;
207   }
208 
209   return MB_SUCCESS;
210 }
211 
212 // use for this the corresponding tasks and sizes
split_owned_range(Range & owned)213 ErrorCode ParCommGraph::split_owned_range (Range & owned)
214 {
215   if (corr_tasks.size() != corr_sizes.size()) //
216     return MB_FAILURE;
217 
218   Range current = owned; // get the full range first, then we will subtract stuff, for
219   // the following ranges
220 
221   Range rleftover=current;
222   for (size_t k=0; k<corr_tasks.size(); k++ )
223   {
224     Range newr;
225     newr.insert(current.begin(), current.begin() +corr_sizes[k]);
226     split_ranges[ corr_tasks[k] ] = newr;
227 
228     rleftover = subtract(current, newr );
229     current = rleftover;
230   }
231 
232   return MB_SUCCESS;
233 }
234 
send_graph(MPI_Comm jcomm)235 ErrorCode ParCommGraph::send_graph(MPI_Comm jcomm)
236 {
237   if ( is_root_sender())
238   {
239     int ierr;
240     // will need to build a communication graph, because each sender knows now to which receiver to send data
241     // the receivers need to post receives for each sender that will send data to them
242     // will need to gather on rank 0 on the sender comm, global ranks of sender with receivers to send
243     // build communication matrix, each receiver will receive from what sender
244 
245     std::vector<int> packed_recv_array;
246     ErrorCode rval = pack_receivers_graph(packed_recv_array );
247     if (MB_SUCCESS!=rval) return rval;
248 
249     int size_pack_array = (int) packed_recv_array.size();
250     comm_graph = new int[size_pack_array+1];
251     comm_graph[0]=size_pack_array;
252     for (int k=0; k<size_pack_array; k++)
253       comm_graph[k+1]=packed_recv_array[k];
254     // will add 2 requests
255     /// use tag 10 to send size and tag 20 to send the packed array
256     sendReqs.resize(2);
257     ierr = MPI_Isend(&comm_graph[0], 1, MPI_INT, receiver(0), 10, jcomm, &sendReqs[0]); // we have to use global communicator
258     if (ierr!=0) return MB_FAILURE;
259     ierr = MPI_Isend(&comm_graph[1], size_pack_array, MPI_INT, receiver(0), 20, jcomm, &sendReqs[1]); // we have to use global communicator
260     if (ierr!=0) return MB_FAILURE;
261   }
262   return MB_SUCCESS;
263 }
264 
265 // pco has MOAB too get_moab()
266 // do we need to store "method" as a member variable ?
send_mesh_parts(MPI_Comm jcomm,ParallelComm * pco,Range & owned)267 ErrorCode ParCommGraph::send_mesh_parts(MPI_Comm jcomm, ParallelComm * pco, Range & owned )
268 {
269 
270   ErrorCode rval;
271   if (split_ranges.empty()) // in trivial partition
272   {
273     rval= split_owned_range (rankInGroup1, owned);
274     if (rval!=MB_SUCCESS) return rval;
275     // we know this on the sender side:
276     corr_tasks = sender_graph[ senderTasks[rankInGroup1]]; // copy
277     corr_sizes = sender_sizes[ senderTasks[rankInGroup1]]; // another copy
278   }
279 
280   int indexReq=0;
281   int ierr; // MPI error
282   if (is_root_sender()) indexReq = 2; // for sendReqs
283   sendReqs.resize(indexReq+2*split_ranges.size());
284   for (std::map<int, Range>::iterator it=split_ranges.begin(); it!=split_ranges.end(); it++)
285   {
286     int receiver_proc = it->first;
287     Range ents = it->second;
288 
289     // add necessary vertices too
290     Range verts;
291     rval = pco->get_moab()->get_adjacencies(ents, 0, false, verts, Interface::UNION);
292     if (rval!=MB_SUCCESS) return rval;
293     ents.merge(verts);
294     ParallelComm::Buffer * buffer = new ParallelComm::Buffer(ParallelComm::INITIAL_BUFF_SIZE);
295     buffer->reset_ptr(sizeof(int));
296     rval = pco->pack_buffer(ents, false, true, false, -1, buffer);
297     if (rval!=MB_SUCCESS) return rval;
298     int size_pack = buffer->get_current_size();
299 
300     // TODO there could be an issue with endian things; check !!!!!
301     // we are sending the size of the buffer first as an int!!!
302     ierr = MPI_Isend(buffer->mem_ptr, 1, MPI_INT, receiver_proc, 1, jcomm, &sendReqs[indexReq]); // we have to use global communicator
303     if (ierr!=0) return MB_FAILURE;
304     indexReq++;
305 
306     ierr = MPI_Isend(buffer->mem_ptr, size_pack, MPI_CHAR, receiver_proc, 2, jcomm, &sendReqs[indexReq]); // we have to use global communicator
307     if (ierr!=0) return MB_FAILURE;
308     indexReq++;
309     localSendBuffs.push_back(buffer);
310 
311   }
312   return MB_SUCCESS;
313 }
314 
315 // this is called on receiver side
receive_comm_graph(MPI_Comm jcomm,ParallelComm * pco,std::vector<int> & pack_array)316 ErrorCode ParCommGraph::receive_comm_graph(MPI_Comm jcomm, ParallelComm *pco, std::vector<int> & pack_array)
317 {
318   // first, receive from sender_rank 0, the communication graph (matrix), so each receiver
319   // knows what data to expect
320   MPI_Comm receive = pco->comm();
321   int size_pack_array, ierr;
322   MPI_Status status;
323   if (rootReceiver)
324   {
325     ierr = MPI_Recv (&size_pack_array, 1, MPI_INT, sender(0), 10, jcomm, &status);
326     if (0!=ierr) return MB_FAILURE;
327 #ifdef VERBOSE
328     std::cout <<" receive comm graph size: " << size_pack_array << "\n";
329 #endif
330     pack_array.resize (size_pack_array);
331     ierr = MPI_Recv (&pack_array[0], size_pack_array, MPI_INT, sender(0), 20, jcomm, &status);
332     if (0!=ierr) return MB_FAILURE;
333 #ifdef VERBOSE
334     std::cout <<" receive comm graph " ;
335     for (int k=0; k<(int)pack_array.size(); k++)
336       std::cout << " " << pack_array[k];
337     std::cout <<"\n";
338 #endif
339   }
340 
341   // now broadcast this whole array to all receivers, so they know what to expect
342   ierr = MPI_Bcast(&size_pack_array, 1, MPI_INT, 0, receive);
343   if (0!=ierr) return MB_FAILURE;
344   pack_array.resize(size_pack_array);
345   ierr = MPI_Bcast(&pack_array[0], size_pack_array, MPI_INT, 0, receive);
346   if (0!=ierr) return MB_FAILURE;
347   return MB_SUCCESS;
348 }
349 
receive_mesh(MPI_Comm jcomm,ParallelComm * pco,EntityHandle local_set,std::vector<int> & senders_local)350 ErrorCode ParCommGraph::receive_mesh(MPI_Comm jcomm, ParallelComm *pco, EntityHandle local_set, std::vector<int> &senders_local)
351 {
352   ErrorCode rval;
353   int ierr;
354   MPI_Status status;
355   // we also need to fill corresponding mesh info on the other side
356   corr_tasks = senders_local;
357   Range newEnts;
358 
359   Tag orgSendProcTag; // this will be a tag set on the received mesh, with info about from what task / PE the
360   // primary element came from, in the joint communicator ; this will be forwarded by coverage mesh
361   int defaultInt=-1; // no processor, so it was not migrated from somewhere else
362   rval = pco->get_moab()->tag_get_handle("orig_sending_processor", 1, MB_TYPE_INTEGER, orgSendProcTag,
363       MB_TAG_DENSE | MB_TAG_CREAT, &defaultInt);MB_CHK_SET_ERR(rval, "can't create original sending processor tag");
364   if (!senders_local.empty())
365   {
366     for (size_t k=0; k< senders_local.size(); k++)
367     {
368       int sender1 = senders_local[k]; // first receive the size of the buffer
369 
370       int size_pack;
371       ierr = MPI_Recv (&size_pack, 1, MPI_INT, sender1, 1, jcomm, &status);
372       if (0!=ierr) return MB_FAILURE;
373       // now resize the buffer, then receive it
374       ParallelComm::Buffer * buffer = new ParallelComm::Buffer(size_pack);
375       //buffer->reserve(size_pack);
376 
377       ierr = MPI_Recv (buffer->mem_ptr, size_pack, MPI_CHAR, sender1, 2, jcomm, &status);
378       if (0!=ierr) return MB_FAILURE;
379       // now unpack the buffer we just received
380       Range entities;
381       std::vector<std::vector<EntityHandle> > L1hloc, L1hrem;
382       std::vector<std::vector<int> > L1p;
383       std::vector<EntityHandle> L2hloc, L2hrem;
384       std::vector<unsigned int> L2p;
385 
386       buffer->reset_ptr(sizeof(int));
387       std::vector<EntityHandle> entities_vec(entities.size());
388       std::copy(entities.begin(), entities.end(), entities_vec.begin());
389       rval = pco->unpack_buffer(buffer->buff_ptr, false, -1, -1, L1hloc, L1hrem, L1p, L2hloc,
390                                   L2hrem, L2p, entities_vec);
391       delete buffer;
392       if (MB_SUCCESS!= rval) return rval;
393 
394       std::copy(entities_vec.begin(), entities_vec.end(), range_inserter(entities));
395       // we have to add them to the local set
396       rval = pco->get_moab()->add_entities(local_set, entities);
397       if (MB_SUCCESS!= rval) return rval;
398       // corr_sizes is the size of primary entities received
399       Range verts = entities.subset_by_dimension(0);
400       Range local_primary_ents=subtract(entities, verts);
401       corr_sizes.push_back( (int) local_primary_ents.size());
402       // set a tag with the original sender for the primary entity
403       // will be used later for coverage mesh
404       std::vector<int> orig_senders(local_primary_ents.size(), sender1);
405       rval = pco->get_moab()->tag_set_data(orgSendProcTag, local_primary_ents, &orig_senders[0]);
406       newEnts.merge(entities);
407       // make these in split ranges
408       split_ranges[sender1]=local_primary_ents;
409 
410 #ifdef VERBOSE
411       std::ostringstream partial_outFile;
412 
413       partial_outFile <<"part_send_" <<sender1<<"."<< "recv"<< rankInJoin <<".vtk";
414 
415         // the mesh contains ghosts too, but they are not part of mat/neumann set
416         // write in serial the file, to see what tags are missing
417       std::cout<< " writing from receiver " << rankInJoin << " from sender " <<  sender1 << " entities: " << entities.size() << std::endl;
418       rval = pco->get_moab()->write_file(partial_outFile.str().c_str(), 0, 0, &local_set, 1); // everything on local set received
419       if (MB_SUCCESS!= rval) return rval;
420 #endif
421     }
422 
423   }
424   // make sure adjacencies are updated on the new elements
425 
426   if (newEnts.empty())
427   {
428     std::cout <<" WARNING: this task did not receive any entities \n";
429   }
430   // in order for the merging to work, we need to be sure that the adjacencies are updated (created)
431   Range local_verts = newEnts.subset_by_type(MBVERTEX);
432   newEnts = subtract(newEnts, local_verts);
433   Core * mb = (Core*)pco->get_moab();
434   AEntityFactory* adj_fact = mb->a_entity_factory();
435   if (!adj_fact->vert_elem_adjacencies())
436     adj_fact->create_vert_elem_adjacencies();
437   else
438   {
439     for (Range::iterator it=newEnts.begin(); it!=newEnts.end(); it++)
440     {
441       EntityHandle eh = *it;
442       const EntityHandle * conn = NULL;
443       int num_nodes=0;
444       rval = mb->get_connectivity(eh, conn, num_nodes);
445       if (MB_SUCCESS!= rval) return rval;
446       adj_fact->notify_create_entity(eh, conn, num_nodes);
447     }
448   }
449 
450   return MB_SUCCESS;
451 }
452 
453 // VSM: Why is the communicator never used. Remove the argument ?
release_send_buffers(MPI_Comm)454 ErrorCode ParCommGraph::release_send_buffers(MPI_Comm /*jcomm*/)
455 {
456   int ierr, nsize = (int)sendReqs.size();
457   std::vector<MPI_Status> mult_status;
458   mult_status.resize(sendReqs.size());
459   ierr = MPI_Waitall(nsize, &sendReqs[0], &mult_status[0]);
460 
461   if (ierr!=0)
462     return MB_FAILURE;
463   // now we can free all buffers
464   delete [] comm_graph;
465   comm_graph = NULL;
466   std::vector<ParallelComm::Buffer*>::iterator vit;
467   for (vit = localSendBuffs.begin(); vit != localSendBuffs.end(); ++vit)
468     delete (*vit);
469   localSendBuffs.clear();
470   return MB_SUCCESS;
471 }
472 
473 // again, will use the send buffers, for nonblocking sends;
474 // should be the receives non-blocking too?
send_tag_values(MPI_Comm jcomm,ParallelComm * pco,Range & owned,std::vector<Tag> & tag_handles)475 ErrorCode ParCommGraph::send_tag_values (MPI_Comm jcomm, ParallelComm *pco, Range & owned,
476     std::vector<Tag> & tag_handles )
477 {
478   // basically, owned.size() needs to be equal to sum(corr_sizes)
479   // get info about the tag size, type, etc
480   int ierr;
481   Core * mb = (Core*)pco->get_moab();
482   // get info about the tag
483   //! Get the size of the specified tag in bytes
484   int total_bytes_per_entity=0; // we need to know, to allocate buffers
485   ErrorCode  rval;
486   std::vector<int> vect_bytes_per_tag;
487 #ifdef VERBOSE
488   std::vector<int> tag_sizes;
489 #endif
490   for (size_t i=0; i<tag_handles.size(); i++)
491   {
492     int bytes_per_tag;
493     rval = mb-> tag_get_bytes(tag_handles[i],  bytes_per_tag) ;MB_CHK_ERR ( rval );
494     total_bytes_per_entity +=bytes_per_tag;
495     vect_bytes_per_tag.push_back(bytes_per_tag);
496 #ifdef VERBOSE
497     int tag_size;
498     rval = mb->tag_get_length(tag_handles[i], tag_size);MB_CHK_ERR ( rval );
499     tag_sizes.push_back(tag_size);
500 #endif
501   }
502 
503 
504 
505   // bool specified_ids = send_IDs_map.size() > 0;
506   bool specified_ids = recomputed_send_graph; // in cases when sender is completely over land, send_IDs_map can still be size 0
507   int indexReq=0;
508   if (!specified_ids) // original send
509   {
510     // use the buffers data structure to allocate memory for sending the tags
511     sendReqs.resize(split_ranges.size());
512 
513     for (std::map<int, Range>::iterator it=split_ranges.begin(); it!=split_ranges.end(); it++)
514     {
515       int receiver_proc = it->first;
516       Range ents = it->second; // primary entities, with the tag data
517       int size_buffer = 4 + total_bytes_per_entity*(int)ents.size(); // hopefully, below 2B; if more, we have a big problem ...
518       ParallelComm::Buffer * buffer = new ParallelComm::Buffer(size_buffer);
519 
520       buffer->reset_ptr(sizeof(int));
521       for (size_t i=0; i<tag_handles.size(); i++)
522       {
523         // copy tag data to buffer->buff_ptr, and send the buffer (we could have used regular char arrays)
524         rval = mb->tag_get_data(tag_handles[i], ents, (void*)(buffer->buff_ptr) );MB_CHK_ERR ( rval );
525         // advance the butter
526         buffer->buff_ptr += vect_bytes_per_tag[i]*ents.size();
527       }
528       *((int*)buffer->mem_ptr) = size_buffer;
529       //int size_pack = buffer->get_current_size(); // debug check
530       ierr = MPI_Isend(buffer->mem_ptr, size_buffer, MPI_CHAR, receiver_proc, 222, jcomm, &sendReqs[indexReq]); // we have to use global communicator
531       if (ierr!=0) return MB_FAILURE;
532       indexReq++;
533       localSendBuffs.push_back(buffer); // we will release them after nonblocking sends are completed
534     }
535   }
536   else
537   {
538     // we know that we will need to send some tag data in a specific order
539     // first, get the ids of the local elements, from owned Range; arrange the buffer in order of increasing global id
540     Tag gidTag;
541     rval = mb->tag_get_handle("GLOBAL_ID", gidTag); MB_CHK_ERR ( rval );
542     std::vector<int> gids;
543     gids.resize(owned.size());
544     rval = mb->tag_get_data(gidTag, owned, &gids[0]);  MB_CHK_ERR ( rval );
545     std::map<int, EntityHandle> gidToHandle;
546     size_t i=0;
547     for (Range::iterator it=owned.begin(); it!= owned.end(); it++)
548     {
549       EntityHandle eh = *it;
550       gidToHandle[gids[i++]] = eh;
551     }
552     // now, pack the data and send it
553     sendReqs.resize(send_IDs_map.size());
554     for (std::map<int ,std::vector<int> >::iterator mit =send_IDs_map.begin(); mit!=send_IDs_map.end(); mit++)
555     {
556       int receiver_proc = mit->first;
557       std::vector<int> & eids = mit->second;
558       int size_buffer = 4 + total_bytes_per_entity*(int)eids.size(); // hopefully, below 2B; if more, we have a big problem ...
559       ParallelComm::Buffer * buffer = new ParallelComm::Buffer(size_buffer);
560       buffer->reset_ptr(sizeof(int));
561 #ifdef VERBOSE
562       std::ofstream dbfile;
563       std::stringstream outf;
564       outf << "from_" << rankInJoin <<"_send_to_" << receiver_proc << ".txt";
565       dbfile.open (outf.str().c_str());
566       dbfile << "from "  << rankInJoin << " send to " << receiver_proc << "\n";
567 #endif
568       // copy tag data to buffer->buff_ptr, and send the buffer (we could have used regular char arrays)
569       // pack data by tag, to be consistent with above, even though we loop through the entities for each tag
570 
571       for (std::vector<int>::iterator it=eids.begin(); it!=eids.end(); it++)
572       {
573         int eID = *it;
574         EntityHandle eh = gidToHandle[eID];
575         for (i=0; i<tag_handles.size(); i++)
576         {
577           rval = mb->tag_get_data(tag_handles[i], &eh, 1, (void*)(buffer->buff_ptr) );  MB_CHK_ERR ( rval );
578 #ifdef VERBOSE
579           dbfile<< "global ID " << eID << " local handle " << mb->id_from_handle(eh)  << " vals: ";
580           double * vals = (double*) (buffer->buff_ptr);
581           for (int kk=0; kk<tag_sizes[i]; kk++)
582           {
583             dbfile << " " << *vals;
584             vals++;
585           }
586           dbfile << "\n";
587 #endif
588           buffer->buff_ptr+=vect_bytes_per_tag[i];
589         }
590       }
591 
592 #ifdef VERBOSE
593       dbfile.close();
594 #endif
595       *((int*)buffer->mem_ptr) = size_buffer;
596         //int size_pack = buffer->get_current_size(); // debug check
597       ierr = MPI_Isend(buffer->mem_ptr, size_buffer, MPI_CHAR, receiver_proc, 222, jcomm, &sendReqs[indexReq]); // we have to use global communicator
598       if (ierr!=0) return MB_FAILURE;
599       indexReq++;
600       localSendBuffs.push_back(buffer); // we will release them after nonblocking sends are completed
601     }
602   }
603 
604   return MB_SUCCESS;
605 }
606 
receive_tag_values(MPI_Comm jcomm,ParallelComm * pco,Range & owned,std::vector<Tag> & tag_handles)607 ErrorCode ParCommGraph::receive_tag_values (MPI_Comm jcomm, ParallelComm *pco, Range & owned,
608     std::vector<Tag> & tag_handles )
609 {
610   // opposite to sending, we will use blocking receives
611   int ierr;
612   MPI_Status status;
613   // basically, owned.size() needs to be equal to sum(corr_sizes)
614   // get info about the tag size, type, etc
615   Core * mb = (Core*)pco->get_moab();
616   // get info about the tag
617   //! Get the size of the specified tag in bytes
618   ErrorCode  rval;
619   int total_bytes_per_entity=0;
620   std::vector<int> vect_bytes_per_tag;
621 #ifdef VERBOSE
622   std::vector<int> tag_sizes;
623 #endif
624   for (size_t i=0; i<tag_handles.size(); i++)
625   {
626     int bytes_per_tag;
627     rval = mb-> tag_get_bytes(tag_handles[i],  bytes_per_tag) ;MB_CHK_ERR ( rval );
628     total_bytes_per_entity +=bytes_per_tag;
629     vect_bytes_per_tag.push_back(bytes_per_tag);
630 #ifdef VERBOSE
631     int tag_size;
632     rval = mb->tag_get_length(tag_handles[i], tag_size);MB_CHK_ERR ( rval );
633     tag_sizes.push_back(tag_size);
634 #endif
635   }
636 
637 
638 
639   bool specified_ids = recv_IDs_map.size() > 0;
640   if (!specified_ids)
641   {
642     //std::map<int, Range> split_ranges;
643     //rval = split_owned_range ( owned);MB_CHK_ERR ( rval );
644 
645     // use the buffers data structure to allocate memory for sending the tags
646     for (std::map<int, Range>::iterator it=split_ranges.begin(); it!=split_ranges.end(); it++)
647     {
648       int sender_proc = it->first;
649       Range ents = it->second; // primary entities, with the tag data, we will receive
650       int size_buffer = 4 + total_bytes_per_entity*(int)ents.size(); // hopefully, below 2B; if more, we have a big problem ...
651       ParallelComm::Buffer * buffer = new ParallelComm::Buffer(size_buffer);
652 
653       buffer->reset_ptr(sizeof(int));
654 
655       *((int*)buffer->mem_ptr) = size_buffer;
656       //int size_pack = buffer->get_current_size(); // debug check
657 
658       ierr = MPI_Recv (buffer->mem_ptr, size_buffer, MPI_CHAR, sender_proc, 222, jcomm, &status);
659       if (ierr!=0) return MB_FAILURE;
660       // now set the tag
661       // copy to tag
662 
663       for (size_t i=0; i<tag_handles.size(); i++)
664       {
665         rval = mb->tag_set_data(tag_handles[i], ents, (void*)(buffer->buff_ptr) );
666         buffer->buff_ptr += vect_bytes_per_tag[i]*ents.size();
667       }
668       delete buffer; // no need for it afterwards
669       MB_CHK_ERR ( rval );
670 
671     }
672   }
673   else // receive buffer, then extract tag data, in a loop
674   {
675     // we know that we will need to receive some tag data in a specific order (by ids stored)
676     // first, get the ids of the local elements, from owned Range; unpack the buffer in order
677     Tag gidTag;
678     rval = mb->tag_get_handle("GLOBAL_ID", gidTag); MB_CHK_ERR ( rval );
679     std::vector<int> gids;
680     gids.resize(owned.size());
681     rval = mb->tag_get_data(gidTag, owned, &gids[0]);  MB_CHK_ERR ( rval );
682     std::map<int, EntityHandle> gidToHandle;
683     size_t i=0;
684     for (Range::iterator it=owned.begin(); it!= owned.end(); it++)
685     {
686       EntityHandle eh = *it;
687       gidToHandle[gids[i++]] = eh;
688     }
689     //
690     // now, unpack the data and set it to the tag
691     for (std::map<int ,std::vector<int> >::iterator mit =recv_IDs_map.begin(); mit!=recv_IDs_map.end(); mit++)
692     {
693       int sender_proc = mit->first;
694       std::vector<int> & eids = mit->second;
695       int size_buffer = 4 + total_bytes_per_entity*(int)eids.size(); // hopefully, below 2B; if more, we have a big problem ...
696       ParallelComm::Buffer * buffer = new ParallelComm::Buffer(size_buffer);
697       buffer->reset_ptr(sizeof(int));
698       *((int*)buffer->mem_ptr) = size_buffer; // this is really not necessary, it should receive this too
699 
700       // receive the buffer
701       ierr = MPI_Recv (buffer->mem_ptr, size_buffer, MPI_CHAR, sender_proc, 222, jcomm, &status);
702       if (ierr!=0) return MB_FAILURE;
703 // start copy
704 #ifdef VERBOSE
705         std::ofstream dbfile;
706         std::stringstream outf;
707         outf << "recvFrom_" << sender_proc <<"_on_proc_" << rankInJoin << ".txt";
708         dbfile.open (outf.str().c_str());
709         dbfile << "recvFrom_"  << sender_proc << " on proc  " << rankInJoin << "\n";
710 #endif
711 
712       // copy tag data from buffer->buff_ptr
713       // data is arranged by tag , and repeat the loop for each entity ()
714         // maybe it should be arranged by entity now, not by tag (so one loop for entities, outside)
715 
716       for (std::vector<int>::iterator it=eids.begin(); it!=eids.end(); it++)
717       {
718         int eID = *it;
719         EntityHandle eh = gidToHandle[eID];
720         for (i=0; i<tag_handles.size(); i++)
721         {
722           rval = mb->tag_set_data(tag_handles[i], &eh, 1, (void*)(buffer->buff_ptr) );  MB_CHK_ERR ( rval );
723 #ifdef VERBOSE
724           dbfile<< "global ID " << eID << " local handle " << mb->id_from_handle(eh)  << " vals: ";
725           double * vals = (double*) (buffer->buff_ptr );
726           for (int kk=0; kk<tag_sizes[i]; kk++)
727           {
728             dbfile << " " << *vals;
729             vals++;
730           }
731           dbfile << "\n";
732 #endif
733           buffer->buff_ptr+=vect_bytes_per_tag[i];
734         }
735       }
736 #ifdef VERBOSE
737       dbfile.close();
738 #endif
739 
740     }
741 
742   }
743   return MB_SUCCESS;
744 }
745 
distribute_sender_graph_info(MPI_Comm senderComm,std::vector<int> & sendingInfo)746 ErrorCode ParCommGraph::distribute_sender_graph_info(MPI_Comm senderComm, std::vector<int> &sendingInfo )
747 {
748   // only the root of the sender has all info
749   // it will use direct send/receives, for number of elems to be sent to each receiver, in an array
750   if (rootSender)
751   {
752     // just append the local info for the array , and send some data for each receiver
753     int nrecv0 = (int) recv_graph[senderTasks[0]].size();
754     for (int k=0; k<nrecv0; k++)
755     {
756       sendingInfo.push_back(recv_graph[senderTasks[0]][k]);
757       sendingInfo.push_back(recv_sizes[senderTasks[0]][k]);
758     }
759     // the rest of the info will be sent for each sender task
760     for (int j=1; j< (int) senderTasks.size(); j++)
761     {
762       std::vector<int> array_to_send;
763       // in the sender comm and sender group , rank to send to is j
764       for (int k=0; k<(int) recv_graph[senderTasks[j]].size(); k++)
765       {
766         array_to_send.push_back( recv_graph[senderTasks[j]][k]);
767         array_to_send.push_back( recv_sizes[senderTasks[j]][k]);
768       }
769 
770       int ierr = MPI_Send(&array_to_send[0], (int)array_to_send.size(), MPI_INT, j, 11, senderComm);
771       if (MPI_SUCCESS != ierr)
772         return MB_FAILURE;
773     }
774   }
775   else
776   {
777     // expect to receive some info about where to send local data
778     // local array it is max the size of 2 times nreceivers;
779     int sizeBuffMax = 2 * (int) receiverTasks.size();
780     std::vector<int> data(sizeBuffMax);
781     MPI_Status status;
782     int ierr = MPI_Recv(&data[0], sizeBuffMax, MPI_INT, 0, 11, senderComm, &status);
783     if (MPI_SUCCESS != ierr)
784       return MB_FAILURE;
785     int count;
786     // find out how much data we actually got
787     MPI_Get_count(&status, MPI_INT, &count);
788     for (int k=0; k<count; k++)
789       sendingInfo.push_back(data[k]);
790 
791   }
792   return MB_SUCCESS;
793 }
794 
settle_send_graph(TupleList & TLcovIDs)795 ErrorCode ParCommGraph::settle_send_graph(TupleList & TLcovIDs)
796 {
797   // fill send_IDs_map with data
798   // will have "receiving proc" and global id of element
799   int n = TLcovIDs.get_n();
800   recomputed_send_graph = true; // do not rely only on send_IDs_map.size(); this can be 0 in some cases
801   for (int i=0; i<n; i++)
802   {
803     int to_proc= TLcovIDs.vi_wr[2 * i];
804     int globalIdElem= TLcovIDs.vi_wr[2 * i + 1 ];
805     send_IDs_map[to_proc].push_back(globalIdElem);
806   }
807 #ifdef VERBOSE
808   for (std::map<int ,std::vector<int> >::iterator mit=send_IDs_map.begin(); mit!=send_IDs_map.end(); mit++)
809   {
810     std::cout <<" towards task " << mit->first << " send: " << mit->second.size() << " cells "<< std::endl;
811     for (size_t i=0; i< mit->second.size() ; i++)
812     {
813       std::cout << " " <<  mit->second[i] ;
814     }
815     std::cout<<std::endl;
816   }
817 #endif
818   return MB_SUCCESS;
819 }
820 
821 // this will set recv_IDs_map will store all ids to be received from one sender task
SetReceivingAfterCoverage(std::map<int,std::set<int>> & idsFromProcs)822 void ParCommGraph::SetReceivingAfterCoverage(std::map<int, std::set<int> > & idsFromProcs) // will make sense only on receivers, right now after cov
823 {
824   for (std::map<int, std::set<int> >::iterator mt=idsFromProcs.begin(); mt!=idsFromProcs.end(); mt++)
825   {
826     int fromProc = mt->first;
827     std::set<int> & setIds = mt->second;
828     recv_IDs_map[fromProc].resize( setIds.size() );
829     std::vector<int>  & listIDs = recv_IDs_map[fromProc];
830     size_t indx = 0;
831     for ( std::set<int>::iterator st= setIds.begin(); st!=setIds.end(); st++)
832     {
833       int valueID = *st;
834       listIDs[indx++]=valueID;
835     }
836   }
837   return;
838 }
839 
840 // new partition calculation
compute_partition(ParallelComm * pco,Range & owned,int met)841 ErrorCode ParCommGraph::compute_partition (ParallelComm *pco, Range & owned, int met)
842 {
843   // we are on a task on sender, and need to compute a new partition;
844   // primary cells need to be distributed to nb receivers tasks
845   // first, we will use graph partitioner, with zoltan;
846   // in the graph that we need to build, the first layer of ghosts is needed;
847   // can we avoid that ? For example, we can find out from each boundary edge/face what is the other
848   // cell (on the other side), then form the global graph, and call zoltan in parallel
849   // met 1 would be a geometric partitioner, and met 2 would be a graph partitioner
850   // for method 1 we do not need any ghost exchange
851 
852   // find first edges that are shared
853   if (owned.empty())
854     return MB_SUCCESS; // nothing to do? empty partition is not allowed, maybe we should return error?
855   Core * mb = (Core*)pco->get_moab();
856 
857   int primaryDim = mb->dimension_from_handle(*owned.rbegin());
858   int interfaceDim = primaryDim -1; // should be 1 or 2
859   Range sharedEdges;
860   ErrorCode rval = pco->get_shared_entities(/*int other_proc*/ -1, sharedEdges, interfaceDim, /*const bool iface*/ true);MB_CHK_ERR ( rval );
861 
862 #if VERBOSE
863   std::cout <<" on sender task " << pco->rank() << " number of shared interface cells " << sharedEdges.size() << "\n";
864 #endif
865   // find to what processors we need to send the ghost info about the edge
866   std::vector<int> shprocs(MAX_SHARING_PROCS);
867   std::vector<EntityHandle> shhandles(MAX_SHARING_PROCS);
868 
869   Tag gidTag; //
870   rval = mb->tag_get_handle("GLOBAL_ID", gidTag); MB_CHK_ERR ( rval );
871   int np;
872   unsigned char pstatus;
873 
874   std::map<int, int> adjCellsId;
875   std::map<int, int> extraCellsProc;
876   // if method is 2, no need to do the exchange for adjacent cells across partition boundary
877   // these maps above will be empty for method 2 (geometry)
878   if (1==met)
879   {
880     // first determine the local graph; what elements are adjacent to each cell in owned range
881     // cells that are sharing a partition interface edge, are identified first, and form a map
882     TupleList TLe; // tuple list for cells
883     TLe.initialize(2, 0, 1, 0, sharedEdges.size()); // send to, id of adj cell, remote edge
884     TLe.enableWriteAccess();
885 
886     std::map<EntityHandle, int> edgeToCell; // from local boundary edge to adjacent cell id
887     // will be changed after
888     for (Range::iterator eit=sharedEdges.begin(); eit!=sharedEdges.end(); eit++)
889     {
890       EntityHandle edge = *eit;
891       // get the adjacent cell
892       Range adjEnts;
893       rval = mb->get_adjacencies(&edge, 1, primaryDim, false, adjEnts); MB_CHK_ERR ( rval );
894       if (adjEnts.size()>0)
895       {
896         EntityHandle adjCell = adjEnts[0];
897         int gid;
898         rval = mb->tag_get_data(gidTag, &adjCell, 1, &gid);  MB_CHK_ERR ( rval );
899         rval = pco->get_sharing_data(edge, &shprocs[0], &shhandles[0], pstatus, np); MB_CHK_ERR ( rval );
900         int n=TLe.get_n();
901         TLe.vi_wr[2*n] = shprocs[0];
902         TLe.vi_wr[2*n+1] = gid;
903         TLe.vul_wr[n] = shhandles[0]; // the remote edge corresponding to shared edge
904         edgeToCell[edge] = gid; // store the map between edge and local id of adj cell
905         TLe.inc_n();
906       }
907     }
908 
909 #ifdef VERBOSE
910     std::stringstream ff2;
911     ff2 << "TLe_"<< pco->rank() << ".txt";
912     TLe.print_to_file(ff2.str().c_str());
913 #endif
914     // send the data to the other processors:
915     (pco->proc_config().crystal_router())->gs_transfer(1, TLe, 0);
916     // on receiver side, each local edge will have the remote cell adjacent to it!
917 
918     int ne = TLe.get_n();
919     for (int i=0; i<ne; i++)
920     {
921       int sharedProc =  TLe.vi_rd[2*i] ; // this info is coming from here, originally
922       int  remoteCellID = TLe.vi_rd[2*i+1] ;
923       EntityHandle localCell = TLe.vul_rd[i] ; // this is now local cell on the this proc
924       adjCellsId [edgeToCell[localCell]] = remoteCellID;
925       extraCellsProc[remoteCellID] = sharedProc;
926 #if VERBOSE
927       std::cout <<"local ID " << edgeToCell[localCell] << " remote cell ID: " << remoteCellID << "\n";
928 #endif
929     }
930   }
931   // so adj cells ids; need to call zoltan for parallel partition
932 #ifdef MOAB_HAVE_ZOLTAN
933   ZoltanPartitioner * mbZTool = new ZoltanPartitioner(mb);
934   if (1<=met) //  partition in zoltan, either graph or geometric partitioner
935   {
936     std::map<int, Range> distribution; // how to distribute owned elements by processors in receiving groups
937     // in how many tasks do we want to be distributed?
938     int numNewPartitions = (int)receiverTasks.size();
939     Range primaryCells=owned.subset_by_dimension(primaryDim);
940     rval = mbZTool->partition_owned_cells(primaryCells, pco, adjCellsId, extraCellsProc,
941         numNewPartitions, distribution, met); MB_CHK_ERR ( rval );
942     for (std::map<int, Range>::iterator mit=distribution.begin(); mit!=distribution.end(); mit++)
943     {
944       int part_index = mit->first;
945       assert(part_index <numNewPartitions );
946       split_ranges[receiverTasks[part_index]] = mit->second;
947     }
948   }
949 #endif
950 
951   return MB_SUCCESS;
952 }
953 // at this moment, each sender task has split_ranges formed;
954 // we need to aggregate that info and send it to receiver
send_graph_partition(ParallelComm * pco,MPI_Comm jcomm)955 ErrorCode ParCommGraph::send_graph_partition (ParallelComm *pco, MPI_Comm jcomm)
956 {
957   // first, accumulate the info to root of sender; use gatherv
958   // first, accumulate number of receivers from each sender, to the root receiver
959   int numberReceivers = (int) split_ranges.size(); // these are ranges of elements to be sent to each receiver, from this task
960   int nSenders =(int) senderTasks.size();  //
961   // this sender will have to send to this many receivers
962   std::vector<int> displs(1); // displacements for gatherv
963   std::vector<int> counts(1);
964   if ( is_root_sender() )
965   {
966     displs.resize(nSenders+1);
967     counts.resize(nSenders);
968   }
969 
970   int ierr= MPI_Gather( &numberReceivers, 1, MPI_INT, &counts[0], 1, MPI_INT, 0, pco->comm() );
971   if (ierr!=MPI_SUCCESS )return MB_FAILURE;
972   // compute now displacements
973   if ( is_root_sender() )
974   {
975     displs[0]=0;
976     for (int k=0; k<nSenders; k++)
977     {
978       displs[k+1] = displs[k]+counts[k];
979     }
980   }
981   std::vector<int> buffer;
982   if ( is_root_sender() ) buffer.resize(displs[nSenders]); // the last one will have the total count now
983 
984   std::vector<int> recvs;
985   for (std::map<int, Range>::iterator mit=split_ranges.begin(); mit!=split_ranges.end(); mit++)
986   {
987     recvs.push_back( mit->first );
988   }
989   ierr =  MPI_Gatherv(&recvs[0], numberReceivers, MPI_INT, &buffer[0], &counts[0], &displs[0], MPI_INT, 0, pco->comm());
990   if (ierr!=MPI_SUCCESS )return MB_FAILURE;
991 
992   // now form recv_graph map; points from the
993   // now form the graph to be sent to the other side; only on root
994   if ( is_root_sender() )
995   {
996     //
997     for (int k=0; k<nSenders; k++)
998     {
999       int indexInBuff = displs[k];
1000       int senderTask = senderTasks[k];
1001       for (int j=0; j<counts[k]; j++)
1002       {
1003         int recvTask = buffer[indexInBuff+j];
1004         recv_graph[recvTask].push_back(senderTask); // this will be packed and sent to root receiver, with nonblocking send
1005       }
1006     }
1007     std::vector<int> packed_recv_array;
1008 
1009     // this is the same as trivial partition
1010     ErrorCode rval = send_graph(jcomm); MB_CHK_ERR ( rval );
1011   }
1012 
1013 
1014   return MB_SUCCESS;
1015 }
1016 } // namespace moab
1017