1 ////////////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2008-2012 The Regents of the University of California
4 //
5 // This file is part of Qbox
6 //
7 // Qbox is distributed under the terms of the GNU General Public License
8 // as published by the Free Software Foundation, either version 2 of
9 // the License, or (at your option) any later version.
10 // See the file COPYING in the root directory of this distribution
11 // or <http://www.gnu.org/licenses/>.
12 //
13 ////////////////////////////////////////////////////////////////////////////////
14 //
15 // XMLGFPreprocessor.cpp
16 //
17 ////////////////////////////////////////////////////////////////////////////////
18 
19 #include <iostream>
20 #include <fstream>
21 #include <sstream>
22 #include <string>
23 #include <cstring> // memcpy
24 #include <cstdlib>
25 #include <cstdio>
26 #include <cassert>
27 #include <unistd.h> // close
28 
29 #include <vector>
30 #include <valarray>
31 
32 #include <sys/stat.h>
33 #include <sys/types.h>
34 #include <sys/socket.h>
35 #include <netdb.h>
36 
37 #include "Timer.h"
38 #include "Context.h"
39 #include "Base64Transcoder.h"
40 #include "Matrix.h"
41 #include "XMLGFPreprocessor.h"
42 using namespace std;
43 
44 ////////////////////////////////////////////////////////////////////////////////
45 //
46 // XMLGFPreprocessor class
47 //
48 // Used to preprocess a sample document.
49 // The contents of all <grid_function> elements is removed from the XML
50 // document and stored in the distributed matrix gfdata.
51 // All <species> elements are expanded from their url or file name and
52 // inserted in the xmlcontent string.
53 //
54 // Input: filename or uri, DoubleMatrix& gfdata, string xmlcontent
55 // If the uri is a file name, the preprocessor reads the file in parallel,
56 // then processes all <grid_function> elements and stores the values of
57 // the grid_functions in the matrix gfdata which has dimensions
58 // (ngf,maxgridsize), where ngf is the total number of
59 // <grid_function> elements found in the file, and maxgridsize is the size of
60 // the largest grid_function.
61 // On return, the string xmlcontent contains (on all tasks) the XML document
62 // with <grid_function> elements reduced to empty strings and <species>
63 // elements expanded to include all species information.
64 //
65 ////////////////////////////////////////////////////////////////////////////////
process(const char * const uri,DoubleMatrix & gfdata,string & xmlcontent,bool serial)66 int XMLGFPreprocessor::process(const char* const uri,
67     DoubleMatrix& gfdata, string& xmlcontent, bool serial)
68 {
69   cout.precision(4);
70 
71   const Context& ctxt = gfdata.context();
72   // define a global single row context for segment manipulations
73   Context rctxt(MPI_COMM_WORLD);
74 #if DEBUG
75   if ( rctxt.onpe0() )
76   {
77     cout << "gfdata.context(): " << ctxt;
78     cout << "rctxt: " << rctxt;
79   }
80 #endif
81 
82   Timer tm,ttm;
83   ttm.start();
84   string st;
85 
86   // determine if the given uri refers to a local file or to an URL
87   struct stat statbuf;
88   bool read_from_file = !stat(uri,&statbuf);
89 
90   if ( read_from_file && rctxt.onpe0() )
91     cout << " XMLGFPreprocessor: reading from "
92            << uri << " size: " << statbuf.st_size << endl;
93 
94 #if DEBUG
95     cout << " process " << ctxt.mype() << " found file "
96          << uri << " size: " << statbuf.st_size << endl;
97 #endif
98 
99   // check if serial flag: force serial reading even if file is present
100   read_from_file &= !serial;
101 
102   string buf;
103 
104   // if reading from a file, read the entire file in parallel on all tasks
105   if ( read_from_file )
106   {
107     FILE* infile;
108     infile = fopen(uri,"r");
109     if ( !infile )
110     {
111       cout << " process " << ctxt.mype() << " could not open file "
112            << uri << " for reading" << endl;
113       return 1;
114     }
115 
116     off_t sz = statbuf.st_size;
117     // determine local size
118     off_t block_size = sz / ctxt.size();
119     off_t local_size = block_size;
120     off_t max_local_size = local_size + sz % ctxt.size();
121     // adjust local_size on last task
122     if ( ctxt.mype()==ctxt.size()-1 )
123     {
124       local_size = max_local_size;
125     }
126 
127     // use contiguous read buffer, to be copied later to a string
128     char *rdbuf = new char[local_size];
129 #if DEBUG
130     cout << ctxt.mype() << ": local_size: " << local_size << endl;
131 #endif
132 
133     tm.start();
134     off_t offset = ctxt.mype()*block_size;
135     int fseek_status = fseeko(infile,offset,SEEK_SET);
136     if ( fseek_status != 0 )
137     {
138       cout << "fseeko failed: offset=" << offset << " file_size=" << sz << endl;
139     }
140 
141     assert(fseek_status==0);
142     size_t items_read;
143 #if PARALLEL_FS
144     // parallel file system: all nodes read at once
145     items_read = fread(rdbuf,sizeof(char),local_size,infile);
146 #else
147     // On a serial (or NFS) file system: tasks read by increasing row order
148     // to avoid overloading the NFS server
149     // No more than ctxt.npcol() tasks are reading at any given time
150     for ( int irow = 0; irow < ctxt.nprow(); irow++ )
151     {
152       if ( irow == ctxt.myrow() )
153         items_read = fread(rdbuf,sizeof(char),local_size,infile);
154     }
155 #endif
156     assert(items_read==local_size);
157 
158     buf.assign(rdbuf,local_size);
159     delete [] rdbuf;
160 
161     ctxt.barrier();
162     tm.stop();
163 
164     if ( ctxt.onpe0() )
165     {
166       cout << " XMLGFPreprocessor: read time: " << tm.real() << endl;
167       cout << " XMLGFPreprocessor: local read rate: "
168            << local_size/(tm.real()*1024*1024) << " MB/s"
169            << "  aggregate read rate: "
170            << sz/(tm.real()*1024*1024) << " MB/s" << endl;
171     }
172 
173   } // if (read_from_file)
174   else
175   {
176     // read from a URI
177     // task 0 connects to the server, gets the data and distributes it
178     // to MPI tasks
179     tm.start();
180 
181     int mype = rctxt.mype();
182     int nprocs = rctxt.size();
183     bool onpe0 = rctxt.onpe0();
184 
185     // get a document from an http server
186     // the host name is e.g. "128.120.80.40" or "fpmd.ucdavis.edu"
187     // the document name is e.g. "/index.html"
188     //
189     // the document is distributed in local strings localdoc
190     //
191     // check that the uri starts with http://
192     string suri(uri);
193     if ( suri.substr(0,7) != string("http://") )
194     {
195       if ( onpe0 )
196       {
197         cout << " URI must start with http://" << endl;
198         cout << " could not access URI: " << uri << endl;
199       }
200       // return with error code
201       return 2;
202     }
203     else
204     {
205       // erase "http://" from the URI
206       suri.erase(0,7);
207     }
208 
209     string localdoc;
210     int sockfd;
211     struct addrinfo hints, *servinfo, *p;
212     int rv;
213     const int blocksize = 1024*1024;
214     int nblocal = 0; // local number of blocks received
215     int total_received = 0;
216 
217     if ( onpe0 )
218     {
219       cout << " XMLGFPreprocessor: blocksize: " << blocksize << endl;
220       char recvbuffer[blocksize];
221       string buffer;
222 
223       // parse URI
224       // e.g. URI = "www.example.com/file.html" or "123.45.678.90/index.html"
225       // find position of first "/" in argument
226       string::size_type first_slash_pos = suri.find_first_of("/");
227       string host = suri.substr(0,first_slash_pos);
228       string docname = suri.substr(first_slash_pos);
229       cout << " XMLGFPreprocessor: host: " << host << endl;
230       cout << " XMLGFPreprocessor: docname: " << docname << endl;
231 
232       memset(&hints, 0, sizeof hints);
233       hints.ai_family = AF_UNSPEC;
234       hints.ai_socktype = SOCK_STREAM;
235 
236       if ( (rv = getaddrinfo(host.c_str(), "http", &hints, &servinfo)) != 0)
237       {
238         cout << " XMLGFPreprocessor: getaddrinfo: "
239              << gai_strerror(rv) << endl;
240         return 3;
241       }
242 
243       // loop through all the results and connect to the first working address
244       for ( p = servinfo; p != NULL; p = p->ai_next )
245       {
246         if ( (sockfd = socket(p->ai_family, p->ai_socktype,
247               p->ai_protocol)) == -1 )
248         {
249           perror("socket");
250           continue;
251         }
252 
253         if (connect(sockfd, p->ai_addr, p->ai_addrlen) == -1)
254         {
255           close(sockfd);
256           perror("connect");
257           continue;
258         }
259 
260         break; // if we get here, we must have connected successfully
261       }
262 
263       if (p == NULL)
264       {
265         // looped off the end of the list with no connection
266         cout << " XMLGFPreprocessor: failed to connect" << endl;
267         return 4;
268       }
269 
270       cout << " XMLGFPreprocessor: connected to " << host << " .. " << endl;
271 
272       // assemble request string
273       string req = "GET " + docname + " HTTP/1.0\r\nHOST:" + host + "\r\n\r\n";
274       // cout << " req = " << req << endl;
275 
276       // send request
277       if ( send(sockfd, req.c_str(), req.size(), 0) == -1)
278       {
279         perror("send");
280         return 5;
281       }
282 
283       cout << " XMLGFPreprocessor: waiting for response..." << endl;
284 
285       // read packets and append to document
286       int ibglobal = 0;
287       int len = 0;
288       do
289       {
290         // read a block into buffer
291         do
292         {
293           len = recv(sockfd, recvbuffer, blocksize, 0);
294           total_received += len;
295           if ( len > 0 )
296           {
297             buffer.append(recvbuffer,len);
298           }
299         } while ( len > 0 && buffer.size() < blocksize );
300 
301         if ( len > 0 )
302         {
303           // buffer contains at least blocksize characters
304           // send first blocksize chars of buffer
305           int dest = ibglobal%nprocs;
306           if ( dest == 0 )
307           {
308             localdoc.append(buffer.c_str(),blocksize);
309             nblocal++;
310           }
311           else
312           {
313             int tag = 0;
314             MPI_Send((void*) buffer.c_str(),blocksize,
315                      MPI_CHAR,dest,tag,rctxt.comm());
316           }
317           ibglobal++;
318           // remove first blocksize chars from localdoc
319           buffer.erase(0,blocksize);
320         }
321         else
322         {
323           // len = 0: reached end of file
324           // send remaining chars in buffer
325           int dest = ibglobal%nprocs;
326           if ( dest == 0 )
327           {
328             localdoc.append(buffer);
329             nblocal++;
330           }
331           else
332           {
333             int tag = 0;
334             MPI_Send((void*) buffer.c_str(),buffer.size(),
335                      MPI_CHAR,dest,tag,rctxt.comm());
336           }
337           ibglobal++;
338         }
339       } while ( len != 0 );
340       cout << " XMLGFPreprocessor: received " << total_received
341            << " bytes" << endl;
342 
343       freeaddrinfo(servinfo);
344 
345       // send empty message to all listening tasks to signify end
346       for ( int i = 1; i < nprocs; i++ )
347         MPI_Send(recvbuffer,0,MPI_CHAR,i,0,rctxt.comm());
348 
349       // broadcast total number of characters received
350       MPI_Bcast(&total_received,1,MPI_INT,0,rctxt.comm());
351 
352     }
353     else // onpe0
354     {
355       // start listening
356       // cout << mype << ": listening.." << endl;
357       bool done = false;
358       while ( !done )
359       {
360         MPI_Status status;
361         char rbuffer[blocksize];
362         // blocking receive of message from task 0
363         int ierr = MPI_Recv(rbuffer, blocksize, MPI_CHAR, 0, 0,
364                             rctxt.comm(), &status);
365         if ( ierr != 0 )
366         {
367           cout << " XMLGFPreprocessor::process: Error in MPI_Recv on node "
368                << mype << endl;
369           MPI_Abort(rctxt.comm(),2);
370         }
371         int count = -1;
372         MPI_Get_count(&status,MPI_CHAR,&count);
373         done = ( count == 0 );
374         if ( !done )
375         {
376           localdoc.append(rbuffer,count);
377           nblocal++;
378         }
379       }
380 
381       MPI_Bcast(&total_received,1,MPI_INT,0,rctxt.comm());
382       // cout << "node " << mype << " done" << endl;
383     }
384 
385     MPI_Barrier(rctxt.comm());
386 
387     if ( onpe0 )
388       cout << " XMLGFPreprocessor: done reading" << endl;
389     // cout << mype << ": localdoc.size(): " << localdoc.size() << endl;
390     // cout << mype << ": localdoc: " << localdoc << endl;
391     // cout << mype << ": nblocal: " << nblocal << endl;
392 
393     // redistribute data to all tasks
394     // cyclic to cyclic array redistribution
395 
396     // determine nbglobal: total number of blocks
397     int nbglobal;
398     MPI_Allreduce(&nblocal, &nbglobal, 1, MPI_INT, MPI_SUM, rctxt.comm());
399 
400     // maximum number of messages received on a task =
401     // maximum number of local blocks nblocalmax
402     int nblocalmax = (nbglobal+nprocs-1)/nprocs;
403 
404     // Send messages
405     int nsend = 0;
406     MPI_Request *send_req = new MPI_Request[nblocal];
407     for ( int iblocal = 0; iblocal < nblocal; iblocal++ )
408     {
409       int ibglobal = mype + iblocal * nprocs;
410       int dest = ibglobal / nblocalmax;
411       const char *p = localdoc.c_str();
412       int len = ibglobal < nbglobal-1 ? blocksize : total_received % blocksize;
413       MPI_Isend((void*) &p[iblocal*blocksize],len,MPI_CHAR,dest,ibglobal,
414                 rctxt.comm(),&send_req[iblocal]);
415       // cout << mype << ": sending block " << ibglobal
416       //      << " to " << dest << endl;
417       nsend++;
418     }
419     int tmpnsend;
420     MPI_Reduce(&nsend,&tmpnsend,1,MPI_INT,MPI_SUM,0,rctxt.comm());
421     nsend = tmpnsend;
422 
423     int nrecv = 0;
424     MPI_Request *recv_req = new MPI_Request[nblocalmax];
425     valarray<char> rbuf(nblocalmax*blocksize);
426     int ireq = 0;
427     int localsize = 0;
428     // post non-blocking receives
429     for ( int ibglobal = 0; ibglobal < nbglobal; ibglobal++ )
430     {
431       // coordinates of block ibglobal in final configuration: (i,j)
432       int j = ibglobal / nblocalmax;
433       // check if block (i,j) will be received on this task
434       if ( j == mype )
435       {
436         // determine task sending block (i,j)
437         int src = ibglobal % nprocs;
438         int iblocal = ibglobal % nblocalmax;
439         int len = ibglobal<nbglobal-1 ? blocksize : total_received % blocksize;
440         MPI_Irecv(&rbuf[iblocal*blocksize],len,MPI_CHAR,src,ibglobal,
441                   rctxt.comm(),&recv_req[ireq]);
442         // cout << mype << ": receiving block " << ibglobal
443         //      << " from " << src << endl;
444         nrecv++;
445         ireq++;
446         localsize += len;
447       }
448     }
449     int tmpnrecv;
450     MPI_Reduce(&nrecv,&tmpnrecv,1,MPI_INT,MPI_SUM,0,rctxt.comm());
451     nrecv = tmpnrecv;
452 
453     // if ( onpe0 )
454     //   cout << "total msgs sent/received: " << nsend << "/" << nrecv << endl;
455     // cout << mype << ": localsize: " << localsize << endl;
456 
457     // wait for send calls to complete
458     MPI_Waitall(nblocal, send_req, MPI_STATUSES_IGNORE);
459     MPI_Waitall(ireq, recv_req, MPI_STATUSES_IGNORE);
460 
461     delete [] recv_req;
462     delete [] send_req;
463 
464     // the data now resides in rbuf on each task
465 
466     buf.assign(&rbuf[0],localsize);
467 
468     // on task 0, remove HTTP header
469     // erase characters before "<?xml " header
470     int xml_decl_error = 0;
471     string::size_type xml_start;
472     if ( onpe0 )
473     {
474       xml_start = buf.find("<?xml ");
475       if ( xml_start == string::npos )
476       {
477         cout << " XMLGFPreprocessor: could not find <?xml > declaration"
478              << endl;
479         xml_decl_error = true;
480       }
481       else
482       {
483         cout << " XMLGFPreprocessor:  <?xml > declaration found at position "
484              << xml_start << endl;
485       }
486     }
487     MPI_Bcast(&xml_decl_error,1,MPI_INT,0,rctxt.comm());
488     if ( xml_decl_error )
489       return 6;
490 
491     // An <?xml > declaration was found
492 
493     if ( onpe0 )
494       buf.erase(0,xml_start);
495 
496     ctxt.barrier();
497     tm.stop();
498 
499     if ( ctxt.onpe0() )
500     {
501       cout << " XMLGFPreprocessor: read time: " << tm.real() << endl;
502       cout << " XMLGFPreprocessor: read rate: "
503            << total_received/(tm.real()*1024*1024) << " MB/s" << endl;
504     }
505   }
506 
507   // At this point all tasks hold a fragment of size local_size in buf
508 
509   tm.reset();
510   tm.start();
511 
512   ////////////////////////////////////////////////////////////////////////////
513   // fix broken tags
514   ////////////////////////////////////////////////////////////////////////////
515   string::size_type first_bracket_pos = buf.find_first_of("<>");
516   string::size_type last_bracket_pos = buf.find_last_of("<>");
517   //cout << ctxt.mype() << ": first_bracket_pos: " << first_bracket_pos
518   //     << " bracket is " << buf[first_bracket_pos] << endl;
519   if ( first_bracket_pos == string::npos )
520   {
521     // no bracket found, nothing to fix
522     // cout << " no bracket found" << endl;
523   }
524   else if ( first_bracket_pos == last_bracket_pos )
525   {
526     // only one bracket found
527     char bracket = buf[first_bracket_pos];
528     assert(bracket=='<' || bracket=='>');
529     if ( bracket == '<' )
530     {
531       // receive missing data from mype+1
532       //cout << " found < bracket only: receive data from mype+1" << endl;
533       string right_buf;
534       //cout << "receiving from task " << ctxt.mype()+1 << endl;
535       rctxt.string_recv(right_buf,0,rctxt.mype()+1);
536       //cout << " received: " << right_buf << endl;
537       buf.append(right_buf);
538     }
539     else
540     {
541       // bracket == '>'
542       // send data up to bracket (inclusive) to mype-1
543       string send_buf = buf.substr(0,first_bracket_pos+1);
544       rctxt.string_send(send_buf,0,rctxt.mype()-1);
545       buf.erase(0,first_bracket_pos+1);
546       last_bracket_pos -= first_bracket_pos+1;
547     }
548   }
549   else
550   {
551     // two or more brackets found
552 
553     // process first bracket
554     char bracket = buf[first_bracket_pos];
555     assert(bracket=='<' || bracket=='>');
556     if ( bracket == '<' )
557     {
558       // nothing to fix
559       // cout << " found < first: nothing to fix" << endl;
560     }
561     else
562     {
563       // bracket == '>'
564       // send data up to first_bracket (inclusive) to task mype-1
565       // cout << " found > bracket first: sending data to mype-1" << endl;
566       string send_buf = buf.substr(0,first_bracket_pos+1);
567       rctxt.string_send(send_buf,0,rctxt.mype()-1);
568       buf.erase(0,first_bracket_pos+1);
569       last_bracket_pos -= first_bracket_pos+1;
570     }
571 
572     // process last bracket
573     bracket = buf[last_bracket_pos];
574     assert(bracket=='<' || bracket=='>');
575     if ( bracket == '<' )
576     {
577       // receive missing data from task mype+1
578       // cout << " found < bracket last: receive data from mype+1" << endl;
579       string right_buf;
580       // cout << "receiving from task " << rctxt.mype()+1 << endl;
581       rctxt.string_recv(right_buf,0,rctxt.mype()+1);
582       // cout << " received: " << right_buf << endl;
583       buf.append(right_buf);
584     }
585     else
586     {
587       // bracket == '>'
588       // nothing to fix
589       // cout << " found > last: nothing to fix" << endl;
590     }
591   }
592 
593   tm.stop();
594   if ( ctxt.onpe0() )
595     cout << " XMLGFPreprocessor: tag fixing time: " << tm.real() << endl;
596   tm.reset();
597   tm.start();
598 
599   ////////////////////////////////////////////////////////////////////////////
600   // reduce data in <grid_function> elements
601   ////////////////////////////////////////////////////////////////////////////
602 
603   // locate grid_function tags
604   string::size_type first_start = buf.find("<grid_function");
605   string::size_type first_end = buf.find("</grid_function>");
606   string::size_type last_start = buf.rfind("<grid_function");
607   string::size_type last_end = buf.rfind("</grid_function>");
608   bool start_tag_found = ( first_start != string::npos ) ;
609   bool end_tag_found = ( first_end != string::npos ) ;
610 
611   ////////////////////////////////////////////////////////////////////////////
612   // Determine if this task starts and/or ends within a grid_function element
613   ////////////////////////////////////////////////////////////////////////////
614   int start_within,end_within=0;
615   if ( ctxt.onpe0() )
616   {
617     start_within = 0; // task 0 starts at the beginning of the XML file
618   }
619   else
620   {
621     // on pe > 0
622     rctxt.irecv(1,1,&start_within,1,0,rctxt.mype()-1);
623   }
624 
625   if ( !start_within )
626   {
627     end_within =
628     ( ( start_tag_found && !end_tag_found ) ||
629       ( start_tag_found && end_tag_found && last_start > last_end ) );
630   }
631   else
632   {
633     end_within =
634     ( ( !end_tag_found ) ||
635       ( start_tag_found && end_tag_found && last_start > last_end ) );
636   }
637 
638   if ( rctxt.mype() < ctxt.size()-1 )
639     rctxt.isend(1,1,&end_within,1,0,rctxt.mype()+1);
640 
641 #if DEBUG
642   cout << rctxt.mype() << ": start_within=" << start_within
643        << " end_within=" << end_within << endl;
644 #endif
645 
646   ////////////////////////////////////////////////////////////////////////////
647   // Determine intervals of characters to be processed
648   ////////////////////////////////////////////////////////////////////////////
649   // build tables seg_start[i], seg_end[i] that define intervals lying inside
650   // elements
651 
652   string::size_type pos = 0;
653   bool done = false;
654   vector<string::size_type> seg_start,seg_end;
655 
656   // position of start and end tags on this task
657   vector<string::size_type> local_start_tag_offset, local_end_tag_offset;
658 
659   // node on which starting tag igf can be found: starting_node[igf]
660   vector<string::size_type> starting_node, ending_node;
661 
662   // treat first the case where the buffer starts within an element
663   if ( start_within )
664   {
665     // get next end tag
666     string::size_type next_end = buf.find("</grid_function>",pos);
667     if ( next_end == string::npos )
668     {
669       // did not find endin tag
670       // add segment [pos,buf.size()] to list
671       seg_start.push_back(pos);
672       seg_end.push_back(buf.size());
673       done = true;
674       assert(end_within);
675     }
676     else
677     {
678       // found end tag at position next_end
679       local_end_tag_offset.push_back(next_end);
680       // add segment [pos,next_end] to list
681       seg_start.push_back(pos);
682       seg_end.push_back(next_end);
683       pos = next_end+16; // size of "</grid_function>"
684     }
685   }
686 
687   while (!done)
688   {
689     string::size_type next_start = buf.find("<grid_function",pos);
690     if ( next_start == string::npos )
691     {
692       // did not find start tag
693       // no start tag found, done.
694       done = true;
695       assert(!end_within);
696     }
697     else
698     {
699       // found start tag
700       local_start_tag_offset.push_back(next_start);
701       pos = buf.find(">",next_start)+1;
702       string::size_type next_end = buf.find("</grid_function>",pos);
703       if ( next_end == string::npos )
704       {
705         // did not find end tag
706         // add segment [pos,buf.size()] to list
707         seg_start.push_back(pos);
708         seg_end.push_back(buf.size());
709         done = true;
710         assert(end_within);
711       }
712       else
713       {
714         // found end tag
715         local_end_tag_offset.push_back(next_end);
716         // add segment [pos,next_end] to list
717         seg_start.push_back(pos);
718         seg_end.push_back(next_end);
719         pos = next_end+16; // size of "</grid_function>"
720       }
721     }
722   }
723 
724   // compute total number of grid functions (total number of start tags)
725   const int ngf_loc = local_start_tag_offset.size();
726   int ngf = ngf_loc;
727   rctxt.isum(1,1,&ngf,1);
728 
729   // compute index of grid function starting at the first <grid_function> tag
730   int igfmin, igfmax;
731   if ( rctxt.mype() == 0 )
732   {
733     igfmin = 0;
734   }
735   else
736   {
737     rctxt.irecv(1,1,&igfmin,1,0,rctxt.mype()-1);
738   }
739 
740   igfmax = igfmin + local_end_tag_offset.size();
741 
742   if ( rctxt.mype() < rctxt.size()-1 )
743   {
744     rctxt.isend(1,1,&igfmax,1,0,rctxt.mype()+1);
745   }
746 
747   // associate an igf value to each segment
748   vector<int> igf(seg_start.size());
749   for ( int i = 0; i < seg_start.size(); i++ )
750     igf[i] = i+igfmin;
751 
752   // cout << rctxt.mype() << ": igfmin=" << igfmin
753   //      << " igfmax=" << igfmax << endl;
754 
755   tm.stop();
756   if ( ctxt.onpe0() )
757     cout << " XMLGFPreprocessor: segment definition time: " << tm.real() << endl;
758 
759   ////////////////////////////////////////////////////////////////////////////
760   // Adjust segment boundaries to allow for transcoding
761   ////////////////////////////////////////////////////////////////////////////
762   // If encoding is "text", the segment boundary must fall on ' ' or '\n'
763   // If encoding is "base64", the segment boundary must fall between
764   // groups of 4 characters in "A-Za-z0-9+/="
765 
766   ////////////////////////////////////////////////////////////////////////////
767   // Determine encoding type of the last segment and send encoding_type
768   // to right neighbour
769 
770   string left_encoding = "none";
771   if ( start_within && !rctxt.onpe0() )
772   {
773     rctxt.string_recv(left_encoding,0,rctxt.mype()-1);
774   }
775 
776   string right_encoding = "none";
777   // find position of "encoding" string after last_start position
778   if ( end_within )
779   {
780     if ( start_tag_found )
781     {
782       string::size_type encoding_pos = buf.find("encoding",last_start);
783       assert(encoding_pos != string::npos);
784       encoding_pos = buf.find("\"",encoding_pos)+1;
785       string::size_type end = buf.find("\"",encoding_pos);
786       right_encoding = buf.substr(encoding_pos,end-encoding_pos);
787     }
788     else
789     {
790       right_encoding = left_encoding;
791     }
792   }
793 
794   if ( end_within && rctxt.mype() < ctxt.size()-1 )
795   {
796     rctxt.string_send(right_encoding,0,rctxt.mype()+1);
797   }
798 #if DEBUG
799   cout << rctxt.mype() << ": left_encoding = " << left_encoding << endl;
800   cout << rctxt.mype() << ": right_encoding = " << right_encoding << endl;
801 #endif
802 
803   // build table of encodings for all segments
804   vector<string> encoding(seg_start.size());
805   int iseg = 0;
806   if ( start_within )
807     encoding[iseg++] = left_encoding;
808   for ( int i = 0; i < local_start_tag_offset.size(); i++ )
809   {
810     // determine encoding of tag at local_start_tag_offset[i]
811     string::size_type encoding_pos =
812       buf.find("encoding",local_start_tag_offset[i]);
813     assert(encoding_pos != string::npos);
814     encoding_pos = buf.find("\"",encoding_pos)+1;
815     string::size_type end = buf.find("\"",encoding_pos);
816     encoding[iseg++] = buf.substr(encoding_pos,end-encoding_pos);
817   }
818   assert(iseg == seg_start.size() );
819 
820   // print out table of segments [seg_start,seg_end] and corresponding igf
821   assert(seg_start.size()==seg_end.size());
822 #if DEBUG
823   for ( int i = 0; i < seg_start.size(); i++ )
824   {
825     cout << rctxt.mype() << ": segment [" << seg_start[i] << ","
826          << seg_end[i] << "] igf=" << igf[i] << " ("
827          << seg_end[i]-seg_start[i] << " bytes)"
828          << " encoding=" << encoding[i]
829          << endl;
830   }
831 #endif
832 
833   tm.reset();
834   tm.start();
835 
836   ////////////////////////////////////////////////////////////////////////////
837   // Fix boundaries:
838   ////////////////////////////////////////////////////////////////////////////
839 
840   // first fix all text_encoded boundaries
841   // on all nodes involved in a text encoded boundary
842   string from_right, to_left;
843   if ( left_encoding == "text" || right_encoding == "text" )
844   {
845     if ( right_encoding == "text" )
846     {
847       // post string_receive from right
848 
849       // next line: cannot have right_encoding = text on last task
850       assert(rctxt.mype() < rctxt.size()-1);
851 
852       rctxt.string_recv(from_right,0,rctxt.mype()+1);
853       buf.append(from_right);
854 
855       // adjust ending offset of last segment
856       seg_end[seg_end.size()-1] += from_right.size();
857 
858     }
859     if (left_encoding == "text")
860     {
861       // send string up to first separator to left
862       // next line: cannot have left_encoding = text on task 0
863       assert(!rctxt.onpe0());
864 
865       string::size_type pos = buf.find_first_of(" \n<",0);
866       to_left = buf.substr(0,pos);
867       rctxt.string_send(to_left,0,rctxt.mype()-1);
868 
869       // adjust starting offset of first segment
870       seg_start[0] = pos;
871 
872       // remove first pos characters and shift all segment indices
873       buf.erase(0,pos);
874       for ( int iseg = 0; iseg < seg_start.size(); iseg++ )
875       {
876         seg_start[iseg] -= pos;
877         seg_end[iseg] -= pos;
878       }
879     }
880   }
881 
882   // at this point, all text_encoded boundaries are fixed
883 
884   // fix base64 boundaries
885 
886   // all nodes having a base64 encoded boundary
887   if ( left_encoding == "base64" || right_encoding == "base64" )
888   {
889     // count valid base64 chars in last segment
890     // count = number of chars in "A-z,a-z,0-9,+,/,=" in last segment
891     // Note: the last segment may also be the first, if only one segment
892     int count = 0;
893     int last_seg = seg_start.size()-1;
894     // count number of valid base64 chars in segment last_seg
895     for ( int i = seg_start[last_seg]; i < seg_end[last_seg]; i++ )
896     {
897       int ch = buf[i];
898       if ( isalnum(ch) || ch == '+' || ch == '/' || ch == '=' )
899         count++;
900     }
901 #if DEBUG
902     cout << rctxt.mype() << ": valid base64 chars in last seg="
903          << count << endl;
904 #endif
905 
906     string send_left;
907     int missing_on_left = 0;
908     if ( left_encoding == "base64" )
909     {
910       // each node except node 0 posts a receive of the number of missing
911       // valid chars from its left neighbour
912       if ( !rctxt.onpe0() )
913         rctxt.irecv(1,1,&missing_on_left,1,0,rctxt.mype()-1);
914 
915       // search for missing_on_left valid characters in buf[0]
916       // Note: it is assumed that there are always enough valid
917       // characters in the first segment to align on a 128 char boundary
918       // 3-doubles = 192 bits
919       // one 4-char block encodes 24 bits
920       // boundary at 8 4-char blocks = 32 chars
921       assert(missing_on_left < 32);
922 
923       // Note: the number of chars missing on left can be larger than
924       // the total number of chars available in the first segment.
925       // In that case, all characters in the first segment are sent
926       string::size_type pos = 0;
927       int n = 0;
928       while ( pos < seg_end[0] && n < missing_on_left )
929       {
930         char c = buf[pos++];
931         if ( isalnum(c) || c == '+' || c == '/' || c == '=' )
932         {
933           send_left.push_back(c);
934           n++;
935         }
936       }
937       // adjust start of segment offset
938       seg_start[0] = pos;
939       // remove first pos characters and shift all segment indices
940       buf.erase(0,pos);
941       for ( int iseg = 0; iseg < seg_start.size(); iseg++ )
942       {
943         seg_start[iseg] -= pos;
944         seg_end[iseg] -= pos;
945       }
946     }
947 
948     // if the last segment is also the first (i.e. only one segment)
949     // remove chars sent to left from valid char count
950     if ( seg_start.size() == 1 )
951       count -= missing_on_left;
952 
953     int missing_on_right = 0;
954     if ( right_encoding == "base64" )
955     {
956       // compute number of missing valid chars on the right
957       // Request enough characters to fall on a 32-char boundary
958       // number of missing *valid* chars requested is (32 - count % 32)
959       // Note: there may not be enough chars in the first segment of the
960       // right neighbor to return that many chars. In that case, fewer
961       // chars will be received from the right neighbor
962       int countmod32 = count % 32;
963       if ( countmod32 != 0 )
964         missing_on_right = 32 - countmod32;
965 #if DEBUG
966       cout << rctxt.mype() << ": missing_on_right="
967       << missing_on_right << endl;
968 #endif
969 
970       // each node except the last sends the number of missing chars to its
971       // right neighbour (receive is already posted by other nodes)
972       // each node except the last posts a string receive from the right node
973 
974       if ( rctxt.mype() < rctxt.size()-1 )
975       {
976         rctxt.isend(1,1,&missing_on_right,1,0,rctxt.mype()+1);
977         // receive string from task mype+1
978         string missing_chars;
979         rctxt.string_recv(missing_chars,0,rctxt.mype()+1);
980         // append to buf
981         buf.append(missing_chars);
982         seg_end[last_seg] += missing_chars.size();
983       }
984     }
985 
986     if ( left_encoding == "base64" )
987     {
988       // each node except node 0 sends string "send_left" to left node
989       if ( !rctxt.onpe0() )
990       {
991         rctxt.string_send(send_left,0,rctxt.mype()-1);
992 #if DEBUG
993         cout << rctxt.mype() << ": sent " << send_left.size()
994              << " chars to left task: " << send_left << endl;
995 #endif
996       }
997     }
998   }
999 
1000   // print out table of segments [seg_start,seg_end] and corresponding igf
1001 #if DEBUG
1002   for ( int i = 0; i < seg_start.size(); i++ )
1003   {
1004     cout << rctxt.mype() << ": segment [" << seg_start[i] << ","
1005          << seg_end[i] << "] igf=" << igf[i] << " ("
1006          << seg_end[i]-seg_start[i] << " bytes)"
1007          << " encoding=" << encoding[i]
1008          << endl;
1009   }
1010 #endif
1011 
1012   tm.stop();
1013   if ( ctxt.onpe0() )
1014     cout << " XMLGFPreprocessor: boundary adjustment time: "
1015          << tm.real() << endl;
1016   tm.reset();
1017   tm.start();
1018 
1019   // Transcode segments
1020   // Instantiate a Base64Transcoder
1021   Base64Transcoder xcdr;
1022 
1023   vector<vector<double> > dbuf;
1024   dbuf.resize(seg_start.size());
1025   for ( int iseg = 0; iseg < seg_start.size(); iseg++ )
1026   {
1027     int nchars = seg_end[iseg]-seg_start[iseg];
1028     if ( encoding[iseg] == "base64" )
1029     {
1030       // Base64 case:
1031       // the dimension of the dbuf array is computed from the *total* number
1032       // of bytes in the segment, i.e. will be overestimated slightly
1033       // since not all bytes are valid base64 bytes
1034       dbuf[iseg].resize((((nchars/4)*3)/8));
1035 #if DEBUG
1036       cout << rctxt.mype() << ": iseg=" << iseg
1037            << " dbufsize=" << dbuf[iseg].size()
1038            << endl;
1039 #endif
1040       int nbytes = xcdr.decode(nchars,buf.data()+seg_start[iseg],
1041                                (byte*)&dbuf[iseg][0]);
1042 #if DEBUG
1043       cout << rctxt.mype() << ": iseg=" << iseg << " nbytes=" << nbytes
1044            << endl;
1045 #endif
1046       assert(nbytes % 8 == 0 );
1047       int ndoubles = nbytes / 8;
1048       assert(ndoubles <= dbuf[iseg].size());
1049       // adjust size of double array
1050       dbuf[iseg].resize(ndoubles);
1051     }
1052     else
1053     {
1054       // text encoding
1055 #if DEBUG
1056       cout << rctxt.mype() << ": text in iseg=" << iseg << ": "
1057            << buf.substr(seg_start[iseg],80) << endl;
1058 #endif
1059       istringstream stst(buf.substr(seg_start[iseg],nchars));
1060       double d;
1061       while ( stst >> d )
1062       {
1063         dbuf[iseg].push_back(d);
1064       }
1065 #if DEBUG
1066       cout << rctxt.mype() << ": doubles read from text: "
1067           << dbuf[iseg].size() << endl;
1068 #endif
1069     }
1070   }
1071 
1072   tm.stop();
1073   if ( ctxt.onpe0() )
1074     cout << " XMLGFPreprocessor: transcoding time: " << tm.real() << endl;
1075   tm.reset();
1076   tm.start();
1077 
1078   ////////////////////////////////////////////////////////////////////////////
1079   // byte swapping on big-endian platforms
1080 #if PLT_BIG_ENDIAN
1081   for ( int iseg = 0; iseg < seg_start.size(); iseg++ )
1082     xcdr.byteswap_double(dbuf[iseg].size(),&dbuf[iseg][0]);
1083 
1084   tm.stop();
1085   if ( ctxt.onpe0() )
1086     cout << " XMLGFPreprocessor: byte swapping time: " << tm.real() << endl;
1087   tm.reset();
1088   tm.start();
1089 #endif
1090 
1091 #if DEBUG
1092   for ( int iseg = 0; iseg < seg_start.size(); iseg++ )
1093   {
1094     cout << rctxt.mype() << ": seg_data[iseg=" << iseg << "]: size="
1095          << dbuf[iseg].size() << "  ";
1096     if ( dbuf[iseg].size() >=3 )
1097       cout << dbuf[iseg][0] << " " << dbuf[iseg][1]
1098            << " " << dbuf[iseg][2];
1099     cout << endl;
1100   }
1101 #endif
1102 
1103   ////////////////////////////////////////////////////////////////////////////
1104   // Collect all data in dbuf into a DoubleMatrix of dimension maxgfdim x ngf
1105   // Note: the largest grid must be used to dimension the matrix.
1106   // Determine the largest dimension of the grid_functions
1107   // use dmax call
1108 
1109   // compute size of largest grid
1110   int maxgfsize = 0;
1111   valarray<int> gfsize(ngf);
1112   gfsize = 0;
1113   for ( int iseg = 0; iseg < seg_start.size(); iseg++ )
1114   {
1115     for ( int i = 0; i < ngf; i++ )
1116       if ( igf[iseg] == i )
1117         gfsize[i] = dbuf[iseg].size();
1118   }
1119   rctxt.isum(ngf,1,&gfsize[0],ngf);
1120   if ( ngf > 0 )
1121     maxgfsize = gfsize.max();
1122 #if DEBUG
1123   cout << rctxt.mype() << ": maxgfsize=" << maxgfsize << endl;
1124   for ( int i = 0; i < ngf; i++ )
1125     cout << rctxt.mype() << ": igf=" << i << " gfsize=" << gfsize[i]
1126          << endl;
1127 #endif
1128 
1129   // determine block sizes of matrix gfdata
1130   // note: use ctxt, not rctxt
1131   int gfmb = maxgfsize / ctxt.nprow() +
1132            ( maxgfsize % ctxt.nprow() != 0 ? 1 : 0 );
1133   int gfnb = ngf / ctxt.npcol() +
1134            ( ngf % ctxt.npcol() != 0 ? 1 : 0 );
1135   gfdata.resize(maxgfsize,ngf,gfmb,gfnb);
1136 #if DEBUG
1137   cout << ctxt.mype() << ": gfdata resized: (" << maxgfsize << "x" << ngf
1138        << ")  (" << gfmb << "x" << gfnb << ") blocks" << endl;
1139   cout << ctxt.mype() << ": gfdata.context(): " << gfdata.context();
1140 #endif
1141 
1142   // prepare buffer sbuf for all_to_all operation
1143   int sbufsize = 0;
1144   for ( int iseg = 0; iseg < seg_start.size(); iseg++ )
1145     sbufsize += dbuf[iseg].size();
1146 
1147   valarray<double> sbuf(sbufsize);
1148 
1149   // determine offset in the grid of the first element of the first
1150   // segment
1151   // Each task having start_within==true posts a receive from its left
1152   // task
1153   // Each task having end_within==true computes the offset of the last
1154   // element of its last segment and sends it to its right neighbour
1155 
1156   int left_offset = 0;
1157   if ( start_within )
1158   {
1159     rctxt.irecv(1,1,&left_offset,1,0,rctxt.mype()-1);
1160   }
1161   if ( end_within )
1162   {
1163     // compute offset of last element of last segment
1164     int right_offset = 0;
1165     if ( dbuf.size() == 1 )
1166     {
1167       // if there is only one segment, take left offset into account
1168       right_offset = left_offset + dbuf[seg_start.size()-1].size();
1169     }
1170     else
1171     {
1172       right_offset = dbuf[seg_start.size()-1].size();
1173     }
1174 #if DEBUG
1175     cout << rctxt.mype() << ": right_offset=" << right_offset << endl;
1176 #endif
1177     rctxt.isend(1,1,&right_offset,1,0,rctxt.mype()+1);
1178   }
1179 #if DEBUG
1180   cout << rctxt.mype() << ": left_offset=" << left_offset << endl;
1181 #endif
1182 
1183   // compute array sbuf
1184   // copy into sbuf all data to be sent
1185   // data going to task irow icol is in subsegment irow
1186   // for each task itask
1187   //   compute irow,icol of itask, igfmin, igfmax, min_offset, max_offset
1188   //   for each segment iseg
1189   //      if igf[iseg] falls on task itask
1190   //        for each block of segment iseg
1191   //          if offset of this block falls in [min_offset,max_offset]
1192   //            copy block to sbuf
1193   //            adjust scount[itask]
1194   valarray<int> scounts(0,rctxt.size()), sdispl(0,rctxt.size());
1195   valarray<int> rcounts(0,rctxt.size()), rdispl(0,rctxt.size());
1196 
1197   int sbuf_pos = 0;
1198   for ( int itask = 0; itask < ctxt.size(); itask++ )
1199   {
1200     // determine block sizes of the block cyclic distribution of gfdata
1201     const int irow = itask % ctxt.nprow();
1202     const int icol = itask / ctxt.nprow();
1203 
1204     const int igfmin = icol * gfnb;
1205     // nb_loc: size of block on task icol
1206     // nb_loc is gfnb       for icol < kn
1207     //           ngf-k*gfnb for icol == kn
1208     //           0          for icol > kn
1209     // where kn = int(ngf/gfnb)
1210     const int kn = (gfnb>0) ? ngf / gfnb : 0;
1211     int nb_loc;
1212     if ( icol < kn )
1213       nb_loc = gfnb;
1214     else if ( icol == kn )
1215       nb_loc = ngf - kn * gfnb;
1216     else
1217       nb_loc = 0;
1218     const int igfmax = igfmin + nb_loc;
1219 
1220     const int min_offset = irow * gfmb;
1221     // mb_loc: size of block on task irow
1222     // mb_loc is gfmb             for irow < km
1223     //           maxgfsize-k*gfmb for irow == km
1224     //           0                for irow > km
1225     // where km = int(maxgfsize/gfmb)
1226     const int km = (gfmb>0) ? maxgfsize / gfmb : 0;
1227     int mb_loc;
1228     if ( irow < km )
1229       mb_loc = gfmb;
1230     else if ( irow == km )
1231       mb_loc = maxgfsize - km * gfmb;
1232     else
1233       mb_loc = 0;
1234 
1235     const int max_offset = min_offset + mb_loc;
1236 
1237     sdispl[itask] = sbuf_pos;
1238     for ( int iseg = 0; iseg < seg_start.size(); iseg++ )
1239     {
1240       const int segsize = dbuf[iseg].size();
1241 
1242       if ( igf[iseg] >= igfmin && igf[iseg] < igfmax )
1243       {
1244         // igf falls within the states stored on itask
1245 
1246         // Find if there is an intersection of this segment
1247         // with the range [min_offset,max_offset[ on task itask
1248 
1249         int block_start, block_size = 0;
1250 
1251         if ( iseg == 0 )
1252         {
1253           // On segment 0, the available range is
1254           // [left_offset,left_offset+segsize[
1255           // There is overlap if min_offset < left_offset+segsize
1256           // and max_offset > left_offset
1257           // If there is overlap, the range to be sent is
1258           // [max(min_offset-left_offset,0) ,
1259           //  min(max_offset-left_offset,segsize)[
1260           if ( min_offset < left_offset+segsize && max_offset > left_offset )
1261           {
1262             // there is overlap
1263             block_start = max(min_offset-left_offset,0);
1264             block_size = min(max_offset-left_offset,segsize) - block_start;
1265           }
1266         }
1267         else
1268         {
1269           // On all segments except segment 0, the available range
1270           // is [0,segsize[
1271           // There is overlap if min_offset < segsize
1272           // (and if max_offset > 0, but this is always true)
1273           // If there is overlap, the range to be sent is
1274           // [min_offset, min(max_offset,segsize[
1275           if ( min_offset < segsize )
1276           {
1277             // there is overlap
1278             block_start = min_offset;
1279             block_size = min(max_offset,segsize) - block_start;
1280           }
1281         }
1282 
1283         if ( block_size > 0 )
1284         {
1285           // copy block to sbuf
1286           memcpy((void*)&sbuf[sbuf_pos],(void*)&dbuf[iseg][block_start],
1287                  block_size*sizeof(double));
1288           sbuf_pos += block_size;
1289           scounts[itask] += block_size;
1290 
1291 #if DEBUG
1292           cout << rctxt.mype() << ": sbuf: iseg=" << iseg << " sending start="
1293           << block_start << " size=" << block_size << " to task " << itask
1294           << " (irow=" << irow << ",icol=" << icol << ")" << endl;
1295 #endif
1296         }
1297       }
1298     }
1299   }
1300 
1301 #if USE_MPI
1302   // send scount array using all_to_all call
1303   valarray<int> a2a_scounts(1,ctxt.size()),a2a_rcounts(1,ctxt.size()),
1304                 a2a_sdispl(0,ctxt.size()),a2a_rdispl(0,ctxt.size());
1305   for ( int i = 0; i < ctxt.size(); i++ )
1306   {
1307     a2a_rdispl[i] = a2a_sdispl[i] = i;
1308   }
1309   int status = MPI_Alltoallv(&scounts[0],&a2a_scounts[0],&a2a_sdispl[0],
1310      MPI_INT,&rcounts[0],&a2a_rcounts[0],&a2a_rdispl[0],MPI_INT,
1311      rctxt.comm());
1312   assert(status==0);
1313 
1314   rdispl[0] = 0;
1315   for ( int i = 1; i < ctxt.size(); i++ )
1316     rdispl[i] = rdispl[i-1] + rcounts[i-1];
1317 
1318   int rbufsize = 0;
1319   for ( int i = 0; i < ctxt.size(); i++ )
1320     rbufsize += rcounts[i];
1321 #if DEBUG
1322   cout << ctxt.mype() << ": "
1323        << " rbufsize: " << rbufsize << endl;
1324 #endif
1325 
1326   valarray<double> rbuf(rbufsize);
1327 
1328   status = MPI_Alltoallv(&sbuf[0],&scounts[0],&sdispl[0],MPI_DOUBLE,
1329            &rbuf[0],&rcounts[0],&rdispl[0],MPI_DOUBLE,rctxt.comm());
1330   assert(status==0);
1331 
1332 #else // USE_MPI
1333   valarray<double> rbuf(sbuf);
1334 #endif // USE_MPI
1335 
1336   // copy data from rbuf to gfdata.valptr()
1337   // functions in rbuf can have varying length
1338   // determine bounds igfmin, igfmax of functions in rbuf
1339   // then use gfsize[igf] to copy one function at a time to gfdata
1340   //
1341   // compute block sizes for current node
1342   const int irow = ctxt.myrow();
1343   const int icol = ctxt.mycol();
1344 
1345   const int igfminloc = icol * gfnb;
1346   // nb_loc: size of block on task icol
1347   // nb_loc is gfnb       for icol < kn
1348   //           ngf-k*gfnb for icol == kn
1349   //           0          for icol > kn
1350   // where kn = int(ngf/gfnb)
1351   const int kn = (gfnb>0) ? ngf / gfnb : 0;
1352   int nb_loc;
1353   if ( icol < kn )
1354     nb_loc = gfnb;
1355   else if ( icol == kn )
1356     nb_loc = ngf - kn * gfnb;
1357   else
1358     nb_loc = 0;
1359   const int igfmaxloc = igfminloc + nb_loc;
1360 
1361 #if DEBUG
1362   cout << ctxt.mype() << ": "
1363        << " igfminloc: " << igfminloc << " igfmaxloc: " << igfmaxloc << endl;
1364 #endif
1365   int rbuf_pos = 0;
1366   int igfloc = 0;
1367   for ( int igf = igfminloc; igf < igfmaxloc; igf++ )
1368   {
1369     // mb_loc: size of block on task irow
1370     // mb_loc is gfmb             for irow < km
1371     //           gfsize-k*gfmb for irow == km
1372     //           0                for irow > km
1373     // where km = int(gfsize/gfmb)
1374     const int km = (gfmb>0) ? gfsize[igf] / gfmb : 0;
1375     int mb_loc;
1376     if ( irow < km )
1377       mb_loc = gfmb;
1378     else if ( irow == km )
1379       mb_loc = gfsize[igf] - km * gfmb;
1380     else
1381       mb_loc = 0;
1382 
1383 #if DEBUG
1384     cout << " copying gfsize[" << igf << "] = " << gfsize[igf]
1385          << " block size: " << mb_loc
1386          << " gfdata.mloc(): " << gfdata.mloc() << endl;
1387 #endif
1388 
1389     // copy rbuf[rbuf_pos] size mb_loc to
1390     // gfdata.valptr(icol*mloc())
1391     const int dest = igfloc * gfdata.mloc();
1392     memcpy(gfdata.valptr(dest),&rbuf[rbuf_pos],mb_loc*sizeof(double));
1393     rbuf_pos += mb_loc;
1394     igfloc++;
1395   }
1396 
1397   tm.stop();
1398   if ( ctxt.onpe0() )
1399     cout << " XMLGFPreprocessor: data redistribution time: "
1400          << tm.real() << endl;
1401   tm.reset();
1402   tm.start();
1403 
1404   // compact XML data:
1405   // erase <grid_function> contents from XML data
1406   // Note: when removing the data from <grid_function>, add
1407   // the attribute xsi:null="true"
1408   // Also add to the schema definition: nillable="true" in the
1409   // definition of element grid_function
1410 
1411   // delete segment data
1412   for ( int iseg = seg_start.size()-1; iseg >= 0; iseg--)
1413   {
1414     //cout << " erasing segment: ["
1415     //     << seg_start[iseg] << "," << seg_end[iseg] << "]" << endl;
1416     buf.erase(seg_start[iseg],seg_end[iseg]-seg_start[iseg]);
1417   }
1418   //cout << " buf.size() after erase: " << buf.size() << endl;
1419 
1420   // collect all XML data
1421   // Distribute sizes of local strings to all tasks
1422   // and store in array rcounts
1423 
1424 #if USE_MPI
1425   int xmlcontentsize = buf.size();
1426   status = MPI_Allgather(&xmlcontentsize,1,MPI_INT,&rcounts[0],1,
1427              MPI_INT,rctxt.comm());
1428   assert(status==0);
1429   // compute total size
1430   int totalsize = 0;
1431   for ( int i = 0; i < rctxt.size(); i++ )
1432     totalsize += rcounts[i];
1433 
1434   rdispl[0] = 0;
1435   for ( int i = 1; i < rctxt.size(); i++ )
1436     rdispl[i] = rdispl[i-1] + rcounts[i-1];
1437 
1438   xmlcontent.resize(totalsize);
1439 
1440   // Allgatherv xml content
1441   status = MPI_Allgatherv(&buf[0],xmlcontentsize,MPI_CHAR,&xmlcontent[0],
1442     &rcounts[0],&rdispl[0],MPI_CHAR,rctxt.comm());
1443 #else
1444   xmlcontent = buf;
1445 #endif // USE_MPI
1446   tm.stop();
1447   if ( ctxt.onpe0() )
1448     cout << " XMLGFPreprocessor: XML compacting time: " << tm.real() << endl;
1449   tm.reset();
1450   tm.start();
1451 
1452   // data is now available in the form of the DoubleMatrix gfdata
1453   //
1454   // Redistribution of the data can be done using the DoubleMatrix::getsub
1455   // and DoubleMatrix::operator= members
1456   //
1457   // 1) Use getsub to get only wf (not dwf) in a DoubleMatrix distributed over
1458   //    the global context
1459   // 2) Use operator= to redistribute the gridfunctions to the same
1460   //    context as Wavefunction
1461 
1462   // Fourier transforms
1463   // WavefunctionHandler can use the real-space data in the redistributed
1464   // gfdata matrix to compute Fourier coefficients, which completes the
1465   // load operation
1466 
1467   ////////////////////////////////////////////////////////////////////////////
1468   // end of program
1469 
1470   ctxt.barrier();
1471   ttm.stop();
1472   if ( ctxt.onpe0() )
1473   {
1474     cout << " XMLGFPreprocessor: total time: " << ttm.real() << endl;
1475   }
1476   return 0;
1477 }
1478