1 ////////////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2008-2012 The Regents of the University of California
4 //
5 // This file is part of Qbox
6 //
7 // Qbox is distributed under the terms of the GNU General Public License
8 // as published by the Free Software Foundation, either version 2 of
9 // the License, or (at your option) any later version.
10 // See the file COPYING in the root directory of this distribution
11 // or <http://www.gnu.org/licenses/>.
12 //
13 ////////////////////////////////////////////////////////////////////////////////
14 //
15 // XMLGFPreprocessor.cpp
16 //
17 ////////////////////////////////////////////////////////////////////////////////
18
19 #include <iostream>
20 #include <fstream>
21 #include <sstream>
22 #include <string>
23 #include <cstring> // memcpy
24 #include <cstdlib>
25 #include <cstdio>
26 #include <cassert>
27 #include <unistd.h> // close
28
29 #include <vector>
30 #include <valarray>
31
32 #include <sys/stat.h>
33 #include <sys/types.h>
34 #include <sys/socket.h>
35 #include <netdb.h>
36
37 #include "Timer.h"
38 #include "Context.h"
39 #include "Base64Transcoder.h"
40 #include "Matrix.h"
41 #include "XMLGFPreprocessor.h"
42 using namespace std;
43
44 ////////////////////////////////////////////////////////////////////////////////
45 //
46 // XMLGFPreprocessor class
47 //
48 // Used to preprocess a sample document.
49 // The contents of all <grid_function> elements is removed from the XML
50 // document and stored in the distributed matrix gfdata.
51 // All <species> elements are expanded from their url or file name and
52 // inserted in the xmlcontent string.
53 //
54 // Input: filename or uri, DoubleMatrix& gfdata, string xmlcontent
55 // If the uri is a file name, the preprocessor reads the file in parallel,
56 // then processes all <grid_function> elements and stores the values of
57 // the grid_functions in the matrix gfdata which has dimensions
58 // (ngf,maxgridsize), where ngf is the total number of
59 // <grid_function> elements found in the file, and maxgridsize is the size of
60 // the largest grid_function.
61 // On return, the string xmlcontent contains (on all tasks) the XML document
62 // with <grid_function> elements reduced to empty strings and <species>
63 // elements expanded to include all species information.
64 //
65 ////////////////////////////////////////////////////////////////////////////////
process(const char * const uri,DoubleMatrix & gfdata,string & xmlcontent,bool serial)66 int XMLGFPreprocessor::process(const char* const uri,
67 DoubleMatrix& gfdata, string& xmlcontent, bool serial)
68 {
69 cout.precision(4);
70
71 const Context& ctxt = gfdata.context();
72 // define a global single row context for segment manipulations
73 Context rctxt(MPI_COMM_WORLD);
74 #if DEBUG
75 if ( rctxt.onpe0() )
76 {
77 cout << "gfdata.context(): " << ctxt;
78 cout << "rctxt: " << rctxt;
79 }
80 #endif
81
82 Timer tm,ttm;
83 ttm.start();
84 string st;
85
86 // determine if the given uri refers to a local file or to an URL
87 struct stat statbuf;
88 bool read_from_file = !stat(uri,&statbuf);
89
90 if ( read_from_file && rctxt.onpe0() )
91 cout << " XMLGFPreprocessor: reading from "
92 << uri << " size: " << statbuf.st_size << endl;
93
94 #if DEBUG
95 cout << " process " << ctxt.mype() << " found file "
96 << uri << " size: " << statbuf.st_size << endl;
97 #endif
98
99 // check if serial flag: force serial reading even if file is present
100 read_from_file &= !serial;
101
102 string buf;
103
104 // if reading from a file, read the entire file in parallel on all tasks
105 if ( read_from_file )
106 {
107 FILE* infile;
108 infile = fopen(uri,"r");
109 if ( !infile )
110 {
111 cout << " process " << ctxt.mype() << " could not open file "
112 << uri << " for reading" << endl;
113 return 1;
114 }
115
116 off_t sz = statbuf.st_size;
117 // determine local size
118 off_t block_size = sz / ctxt.size();
119 off_t local_size = block_size;
120 off_t max_local_size = local_size + sz % ctxt.size();
121 // adjust local_size on last task
122 if ( ctxt.mype()==ctxt.size()-1 )
123 {
124 local_size = max_local_size;
125 }
126
127 // use contiguous read buffer, to be copied later to a string
128 char *rdbuf = new char[local_size];
129 #if DEBUG
130 cout << ctxt.mype() << ": local_size: " << local_size << endl;
131 #endif
132
133 tm.start();
134 off_t offset = ctxt.mype()*block_size;
135 int fseek_status = fseeko(infile,offset,SEEK_SET);
136 if ( fseek_status != 0 )
137 {
138 cout << "fseeko failed: offset=" << offset << " file_size=" << sz << endl;
139 }
140
141 assert(fseek_status==0);
142 size_t items_read;
143 #if PARALLEL_FS
144 // parallel file system: all nodes read at once
145 items_read = fread(rdbuf,sizeof(char),local_size,infile);
146 #else
147 // On a serial (or NFS) file system: tasks read by increasing row order
148 // to avoid overloading the NFS server
149 // No more than ctxt.npcol() tasks are reading at any given time
150 for ( int irow = 0; irow < ctxt.nprow(); irow++ )
151 {
152 if ( irow == ctxt.myrow() )
153 items_read = fread(rdbuf,sizeof(char),local_size,infile);
154 }
155 #endif
156 assert(items_read==local_size);
157
158 buf.assign(rdbuf,local_size);
159 delete [] rdbuf;
160
161 ctxt.barrier();
162 tm.stop();
163
164 if ( ctxt.onpe0() )
165 {
166 cout << " XMLGFPreprocessor: read time: " << tm.real() << endl;
167 cout << " XMLGFPreprocessor: local read rate: "
168 << local_size/(tm.real()*1024*1024) << " MB/s"
169 << " aggregate read rate: "
170 << sz/(tm.real()*1024*1024) << " MB/s" << endl;
171 }
172
173 } // if (read_from_file)
174 else
175 {
176 // read from a URI
177 // task 0 connects to the server, gets the data and distributes it
178 // to MPI tasks
179 tm.start();
180
181 int mype = rctxt.mype();
182 int nprocs = rctxt.size();
183 bool onpe0 = rctxt.onpe0();
184
185 // get a document from an http server
186 // the host name is e.g. "128.120.80.40" or "fpmd.ucdavis.edu"
187 // the document name is e.g. "/index.html"
188 //
189 // the document is distributed in local strings localdoc
190 //
191 // check that the uri starts with http://
192 string suri(uri);
193 if ( suri.substr(0,7) != string("http://") )
194 {
195 if ( onpe0 )
196 {
197 cout << " URI must start with http://" << endl;
198 cout << " could not access URI: " << uri << endl;
199 }
200 // return with error code
201 return 2;
202 }
203 else
204 {
205 // erase "http://" from the URI
206 suri.erase(0,7);
207 }
208
209 string localdoc;
210 int sockfd;
211 struct addrinfo hints, *servinfo, *p;
212 int rv;
213 const int blocksize = 1024*1024;
214 int nblocal = 0; // local number of blocks received
215 int total_received = 0;
216
217 if ( onpe0 )
218 {
219 cout << " XMLGFPreprocessor: blocksize: " << blocksize << endl;
220 char recvbuffer[blocksize];
221 string buffer;
222
223 // parse URI
224 // e.g. URI = "www.example.com/file.html" or "123.45.678.90/index.html"
225 // find position of first "/" in argument
226 string::size_type first_slash_pos = suri.find_first_of("/");
227 string host = suri.substr(0,first_slash_pos);
228 string docname = suri.substr(first_slash_pos);
229 cout << " XMLGFPreprocessor: host: " << host << endl;
230 cout << " XMLGFPreprocessor: docname: " << docname << endl;
231
232 memset(&hints, 0, sizeof hints);
233 hints.ai_family = AF_UNSPEC;
234 hints.ai_socktype = SOCK_STREAM;
235
236 if ( (rv = getaddrinfo(host.c_str(), "http", &hints, &servinfo)) != 0)
237 {
238 cout << " XMLGFPreprocessor: getaddrinfo: "
239 << gai_strerror(rv) << endl;
240 return 3;
241 }
242
243 // loop through all the results and connect to the first working address
244 for ( p = servinfo; p != NULL; p = p->ai_next )
245 {
246 if ( (sockfd = socket(p->ai_family, p->ai_socktype,
247 p->ai_protocol)) == -1 )
248 {
249 perror("socket");
250 continue;
251 }
252
253 if (connect(sockfd, p->ai_addr, p->ai_addrlen) == -1)
254 {
255 close(sockfd);
256 perror("connect");
257 continue;
258 }
259
260 break; // if we get here, we must have connected successfully
261 }
262
263 if (p == NULL)
264 {
265 // looped off the end of the list with no connection
266 cout << " XMLGFPreprocessor: failed to connect" << endl;
267 return 4;
268 }
269
270 cout << " XMLGFPreprocessor: connected to " << host << " .. " << endl;
271
272 // assemble request string
273 string req = "GET " + docname + " HTTP/1.0\r\nHOST:" + host + "\r\n\r\n";
274 // cout << " req = " << req << endl;
275
276 // send request
277 if ( send(sockfd, req.c_str(), req.size(), 0) == -1)
278 {
279 perror("send");
280 return 5;
281 }
282
283 cout << " XMLGFPreprocessor: waiting for response..." << endl;
284
285 // read packets and append to document
286 int ibglobal = 0;
287 int len = 0;
288 do
289 {
290 // read a block into buffer
291 do
292 {
293 len = recv(sockfd, recvbuffer, blocksize, 0);
294 total_received += len;
295 if ( len > 0 )
296 {
297 buffer.append(recvbuffer,len);
298 }
299 } while ( len > 0 && buffer.size() < blocksize );
300
301 if ( len > 0 )
302 {
303 // buffer contains at least blocksize characters
304 // send first blocksize chars of buffer
305 int dest = ibglobal%nprocs;
306 if ( dest == 0 )
307 {
308 localdoc.append(buffer.c_str(),blocksize);
309 nblocal++;
310 }
311 else
312 {
313 int tag = 0;
314 MPI_Send((void*) buffer.c_str(),blocksize,
315 MPI_CHAR,dest,tag,rctxt.comm());
316 }
317 ibglobal++;
318 // remove first blocksize chars from localdoc
319 buffer.erase(0,blocksize);
320 }
321 else
322 {
323 // len = 0: reached end of file
324 // send remaining chars in buffer
325 int dest = ibglobal%nprocs;
326 if ( dest == 0 )
327 {
328 localdoc.append(buffer);
329 nblocal++;
330 }
331 else
332 {
333 int tag = 0;
334 MPI_Send((void*) buffer.c_str(),buffer.size(),
335 MPI_CHAR,dest,tag,rctxt.comm());
336 }
337 ibglobal++;
338 }
339 } while ( len != 0 );
340 cout << " XMLGFPreprocessor: received " << total_received
341 << " bytes" << endl;
342
343 freeaddrinfo(servinfo);
344
345 // send empty message to all listening tasks to signify end
346 for ( int i = 1; i < nprocs; i++ )
347 MPI_Send(recvbuffer,0,MPI_CHAR,i,0,rctxt.comm());
348
349 // broadcast total number of characters received
350 MPI_Bcast(&total_received,1,MPI_INT,0,rctxt.comm());
351
352 }
353 else // onpe0
354 {
355 // start listening
356 // cout << mype << ": listening.." << endl;
357 bool done = false;
358 while ( !done )
359 {
360 MPI_Status status;
361 char rbuffer[blocksize];
362 // blocking receive of message from task 0
363 int ierr = MPI_Recv(rbuffer, blocksize, MPI_CHAR, 0, 0,
364 rctxt.comm(), &status);
365 if ( ierr != 0 )
366 {
367 cout << " XMLGFPreprocessor::process: Error in MPI_Recv on node "
368 << mype << endl;
369 MPI_Abort(rctxt.comm(),2);
370 }
371 int count = -1;
372 MPI_Get_count(&status,MPI_CHAR,&count);
373 done = ( count == 0 );
374 if ( !done )
375 {
376 localdoc.append(rbuffer,count);
377 nblocal++;
378 }
379 }
380
381 MPI_Bcast(&total_received,1,MPI_INT,0,rctxt.comm());
382 // cout << "node " << mype << " done" << endl;
383 }
384
385 MPI_Barrier(rctxt.comm());
386
387 if ( onpe0 )
388 cout << " XMLGFPreprocessor: done reading" << endl;
389 // cout << mype << ": localdoc.size(): " << localdoc.size() << endl;
390 // cout << mype << ": localdoc: " << localdoc << endl;
391 // cout << mype << ": nblocal: " << nblocal << endl;
392
393 // redistribute data to all tasks
394 // cyclic to cyclic array redistribution
395
396 // determine nbglobal: total number of blocks
397 int nbglobal;
398 MPI_Allreduce(&nblocal, &nbglobal, 1, MPI_INT, MPI_SUM, rctxt.comm());
399
400 // maximum number of messages received on a task =
401 // maximum number of local blocks nblocalmax
402 int nblocalmax = (nbglobal+nprocs-1)/nprocs;
403
404 // Send messages
405 int nsend = 0;
406 MPI_Request *send_req = new MPI_Request[nblocal];
407 for ( int iblocal = 0; iblocal < nblocal; iblocal++ )
408 {
409 int ibglobal = mype + iblocal * nprocs;
410 int dest = ibglobal / nblocalmax;
411 const char *p = localdoc.c_str();
412 int len = ibglobal < nbglobal-1 ? blocksize : total_received % blocksize;
413 MPI_Isend((void*) &p[iblocal*blocksize],len,MPI_CHAR,dest,ibglobal,
414 rctxt.comm(),&send_req[iblocal]);
415 // cout << mype << ": sending block " << ibglobal
416 // << " to " << dest << endl;
417 nsend++;
418 }
419 int tmpnsend;
420 MPI_Reduce(&nsend,&tmpnsend,1,MPI_INT,MPI_SUM,0,rctxt.comm());
421 nsend = tmpnsend;
422
423 int nrecv = 0;
424 MPI_Request *recv_req = new MPI_Request[nblocalmax];
425 valarray<char> rbuf(nblocalmax*blocksize);
426 int ireq = 0;
427 int localsize = 0;
428 // post non-blocking receives
429 for ( int ibglobal = 0; ibglobal < nbglobal; ibglobal++ )
430 {
431 // coordinates of block ibglobal in final configuration: (i,j)
432 int j = ibglobal / nblocalmax;
433 // check if block (i,j) will be received on this task
434 if ( j == mype )
435 {
436 // determine task sending block (i,j)
437 int src = ibglobal % nprocs;
438 int iblocal = ibglobal % nblocalmax;
439 int len = ibglobal<nbglobal-1 ? blocksize : total_received % blocksize;
440 MPI_Irecv(&rbuf[iblocal*blocksize],len,MPI_CHAR,src,ibglobal,
441 rctxt.comm(),&recv_req[ireq]);
442 // cout << mype << ": receiving block " << ibglobal
443 // << " from " << src << endl;
444 nrecv++;
445 ireq++;
446 localsize += len;
447 }
448 }
449 int tmpnrecv;
450 MPI_Reduce(&nrecv,&tmpnrecv,1,MPI_INT,MPI_SUM,0,rctxt.comm());
451 nrecv = tmpnrecv;
452
453 // if ( onpe0 )
454 // cout << "total msgs sent/received: " << nsend << "/" << nrecv << endl;
455 // cout << mype << ": localsize: " << localsize << endl;
456
457 // wait for send calls to complete
458 MPI_Waitall(nblocal, send_req, MPI_STATUSES_IGNORE);
459 MPI_Waitall(ireq, recv_req, MPI_STATUSES_IGNORE);
460
461 delete [] recv_req;
462 delete [] send_req;
463
464 // the data now resides in rbuf on each task
465
466 buf.assign(&rbuf[0],localsize);
467
468 // on task 0, remove HTTP header
469 // erase characters before "<?xml " header
470 int xml_decl_error = 0;
471 string::size_type xml_start;
472 if ( onpe0 )
473 {
474 xml_start = buf.find("<?xml ");
475 if ( xml_start == string::npos )
476 {
477 cout << " XMLGFPreprocessor: could not find <?xml > declaration"
478 << endl;
479 xml_decl_error = true;
480 }
481 else
482 {
483 cout << " XMLGFPreprocessor: <?xml > declaration found at position "
484 << xml_start << endl;
485 }
486 }
487 MPI_Bcast(&xml_decl_error,1,MPI_INT,0,rctxt.comm());
488 if ( xml_decl_error )
489 return 6;
490
491 // An <?xml > declaration was found
492
493 if ( onpe0 )
494 buf.erase(0,xml_start);
495
496 ctxt.barrier();
497 tm.stop();
498
499 if ( ctxt.onpe0() )
500 {
501 cout << " XMLGFPreprocessor: read time: " << tm.real() << endl;
502 cout << " XMLGFPreprocessor: read rate: "
503 << total_received/(tm.real()*1024*1024) << " MB/s" << endl;
504 }
505 }
506
507 // At this point all tasks hold a fragment of size local_size in buf
508
509 tm.reset();
510 tm.start();
511
512 ////////////////////////////////////////////////////////////////////////////
513 // fix broken tags
514 ////////////////////////////////////////////////////////////////////////////
515 string::size_type first_bracket_pos = buf.find_first_of("<>");
516 string::size_type last_bracket_pos = buf.find_last_of("<>");
517 //cout << ctxt.mype() << ": first_bracket_pos: " << first_bracket_pos
518 // << " bracket is " << buf[first_bracket_pos] << endl;
519 if ( first_bracket_pos == string::npos )
520 {
521 // no bracket found, nothing to fix
522 // cout << " no bracket found" << endl;
523 }
524 else if ( first_bracket_pos == last_bracket_pos )
525 {
526 // only one bracket found
527 char bracket = buf[first_bracket_pos];
528 assert(bracket=='<' || bracket=='>');
529 if ( bracket == '<' )
530 {
531 // receive missing data from mype+1
532 //cout << " found < bracket only: receive data from mype+1" << endl;
533 string right_buf;
534 //cout << "receiving from task " << ctxt.mype()+1 << endl;
535 rctxt.string_recv(right_buf,0,rctxt.mype()+1);
536 //cout << " received: " << right_buf << endl;
537 buf.append(right_buf);
538 }
539 else
540 {
541 // bracket == '>'
542 // send data up to bracket (inclusive) to mype-1
543 string send_buf = buf.substr(0,first_bracket_pos+1);
544 rctxt.string_send(send_buf,0,rctxt.mype()-1);
545 buf.erase(0,first_bracket_pos+1);
546 last_bracket_pos -= first_bracket_pos+1;
547 }
548 }
549 else
550 {
551 // two or more brackets found
552
553 // process first bracket
554 char bracket = buf[first_bracket_pos];
555 assert(bracket=='<' || bracket=='>');
556 if ( bracket == '<' )
557 {
558 // nothing to fix
559 // cout << " found < first: nothing to fix" << endl;
560 }
561 else
562 {
563 // bracket == '>'
564 // send data up to first_bracket (inclusive) to task mype-1
565 // cout << " found > bracket first: sending data to mype-1" << endl;
566 string send_buf = buf.substr(0,first_bracket_pos+1);
567 rctxt.string_send(send_buf,0,rctxt.mype()-1);
568 buf.erase(0,first_bracket_pos+1);
569 last_bracket_pos -= first_bracket_pos+1;
570 }
571
572 // process last bracket
573 bracket = buf[last_bracket_pos];
574 assert(bracket=='<' || bracket=='>');
575 if ( bracket == '<' )
576 {
577 // receive missing data from task mype+1
578 // cout << " found < bracket last: receive data from mype+1" << endl;
579 string right_buf;
580 // cout << "receiving from task " << rctxt.mype()+1 << endl;
581 rctxt.string_recv(right_buf,0,rctxt.mype()+1);
582 // cout << " received: " << right_buf << endl;
583 buf.append(right_buf);
584 }
585 else
586 {
587 // bracket == '>'
588 // nothing to fix
589 // cout << " found > last: nothing to fix" << endl;
590 }
591 }
592
593 tm.stop();
594 if ( ctxt.onpe0() )
595 cout << " XMLGFPreprocessor: tag fixing time: " << tm.real() << endl;
596 tm.reset();
597 tm.start();
598
599 ////////////////////////////////////////////////////////////////////////////
600 // reduce data in <grid_function> elements
601 ////////////////////////////////////////////////////////////////////////////
602
603 // locate grid_function tags
604 string::size_type first_start = buf.find("<grid_function");
605 string::size_type first_end = buf.find("</grid_function>");
606 string::size_type last_start = buf.rfind("<grid_function");
607 string::size_type last_end = buf.rfind("</grid_function>");
608 bool start_tag_found = ( first_start != string::npos ) ;
609 bool end_tag_found = ( first_end != string::npos ) ;
610
611 ////////////////////////////////////////////////////////////////////////////
612 // Determine if this task starts and/or ends within a grid_function element
613 ////////////////////////////////////////////////////////////////////////////
614 int start_within,end_within=0;
615 if ( ctxt.onpe0() )
616 {
617 start_within = 0; // task 0 starts at the beginning of the XML file
618 }
619 else
620 {
621 // on pe > 0
622 rctxt.irecv(1,1,&start_within,1,0,rctxt.mype()-1);
623 }
624
625 if ( !start_within )
626 {
627 end_within =
628 ( ( start_tag_found && !end_tag_found ) ||
629 ( start_tag_found && end_tag_found && last_start > last_end ) );
630 }
631 else
632 {
633 end_within =
634 ( ( !end_tag_found ) ||
635 ( start_tag_found && end_tag_found && last_start > last_end ) );
636 }
637
638 if ( rctxt.mype() < ctxt.size()-1 )
639 rctxt.isend(1,1,&end_within,1,0,rctxt.mype()+1);
640
641 #if DEBUG
642 cout << rctxt.mype() << ": start_within=" << start_within
643 << " end_within=" << end_within << endl;
644 #endif
645
646 ////////////////////////////////////////////////////////////////////////////
647 // Determine intervals of characters to be processed
648 ////////////////////////////////////////////////////////////////////////////
649 // build tables seg_start[i], seg_end[i] that define intervals lying inside
650 // elements
651
652 string::size_type pos = 0;
653 bool done = false;
654 vector<string::size_type> seg_start,seg_end;
655
656 // position of start and end tags on this task
657 vector<string::size_type> local_start_tag_offset, local_end_tag_offset;
658
659 // node on which starting tag igf can be found: starting_node[igf]
660 vector<string::size_type> starting_node, ending_node;
661
662 // treat first the case where the buffer starts within an element
663 if ( start_within )
664 {
665 // get next end tag
666 string::size_type next_end = buf.find("</grid_function>",pos);
667 if ( next_end == string::npos )
668 {
669 // did not find endin tag
670 // add segment [pos,buf.size()] to list
671 seg_start.push_back(pos);
672 seg_end.push_back(buf.size());
673 done = true;
674 assert(end_within);
675 }
676 else
677 {
678 // found end tag at position next_end
679 local_end_tag_offset.push_back(next_end);
680 // add segment [pos,next_end] to list
681 seg_start.push_back(pos);
682 seg_end.push_back(next_end);
683 pos = next_end+16; // size of "</grid_function>"
684 }
685 }
686
687 while (!done)
688 {
689 string::size_type next_start = buf.find("<grid_function",pos);
690 if ( next_start == string::npos )
691 {
692 // did not find start tag
693 // no start tag found, done.
694 done = true;
695 assert(!end_within);
696 }
697 else
698 {
699 // found start tag
700 local_start_tag_offset.push_back(next_start);
701 pos = buf.find(">",next_start)+1;
702 string::size_type next_end = buf.find("</grid_function>",pos);
703 if ( next_end == string::npos )
704 {
705 // did not find end tag
706 // add segment [pos,buf.size()] to list
707 seg_start.push_back(pos);
708 seg_end.push_back(buf.size());
709 done = true;
710 assert(end_within);
711 }
712 else
713 {
714 // found end tag
715 local_end_tag_offset.push_back(next_end);
716 // add segment [pos,next_end] to list
717 seg_start.push_back(pos);
718 seg_end.push_back(next_end);
719 pos = next_end+16; // size of "</grid_function>"
720 }
721 }
722 }
723
724 // compute total number of grid functions (total number of start tags)
725 const int ngf_loc = local_start_tag_offset.size();
726 int ngf = ngf_loc;
727 rctxt.isum(1,1,&ngf,1);
728
729 // compute index of grid function starting at the first <grid_function> tag
730 int igfmin, igfmax;
731 if ( rctxt.mype() == 0 )
732 {
733 igfmin = 0;
734 }
735 else
736 {
737 rctxt.irecv(1,1,&igfmin,1,0,rctxt.mype()-1);
738 }
739
740 igfmax = igfmin + local_end_tag_offset.size();
741
742 if ( rctxt.mype() < rctxt.size()-1 )
743 {
744 rctxt.isend(1,1,&igfmax,1,0,rctxt.mype()+1);
745 }
746
747 // associate an igf value to each segment
748 vector<int> igf(seg_start.size());
749 for ( int i = 0; i < seg_start.size(); i++ )
750 igf[i] = i+igfmin;
751
752 // cout << rctxt.mype() << ": igfmin=" << igfmin
753 // << " igfmax=" << igfmax << endl;
754
755 tm.stop();
756 if ( ctxt.onpe0() )
757 cout << " XMLGFPreprocessor: segment definition time: " << tm.real() << endl;
758
759 ////////////////////////////////////////////////////////////////////////////
760 // Adjust segment boundaries to allow for transcoding
761 ////////////////////////////////////////////////////////////////////////////
762 // If encoding is "text", the segment boundary must fall on ' ' or '\n'
763 // If encoding is "base64", the segment boundary must fall between
764 // groups of 4 characters in "A-Za-z0-9+/="
765
766 ////////////////////////////////////////////////////////////////////////////
767 // Determine encoding type of the last segment and send encoding_type
768 // to right neighbour
769
770 string left_encoding = "none";
771 if ( start_within && !rctxt.onpe0() )
772 {
773 rctxt.string_recv(left_encoding,0,rctxt.mype()-1);
774 }
775
776 string right_encoding = "none";
777 // find position of "encoding" string after last_start position
778 if ( end_within )
779 {
780 if ( start_tag_found )
781 {
782 string::size_type encoding_pos = buf.find("encoding",last_start);
783 assert(encoding_pos != string::npos);
784 encoding_pos = buf.find("\"",encoding_pos)+1;
785 string::size_type end = buf.find("\"",encoding_pos);
786 right_encoding = buf.substr(encoding_pos,end-encoding_pos);
787 }
788 else
789 {
790 right_encoding = left_encoding;
791 }
792 }
793
794 if ( end_within && rctxt.mype() < ctxt.size()-1 )
795 {
796 rctxt.string_send(right_encoding,0,rctxt.mype()+1);
797 }
798 #if DEBUG
799 cout << rctxt.mype() << ": left_encoding = " << left_encoding << endl;
800 cout << rctxt.mype() << ": right_encoding = " << right_encoding << endl;
801 #endif
802
803 // build table of encodings for all segments
804 vector<string> encoding(seg_start.size());
805 int iseg = 0;
806 if ( start_within )
807 encoding[iseg++] = left_encoding;
808 for ( int i = 0; i < local_start_tag_offset.size(); i++ )
809 {
810 // determine encoding of tag at local_start_tag_offset[i]
811 string::size_type encoding_pos =
812 buf.find("encoding",local_start_tag_offset[i]);
813 assert(encoding_pos != string::npos);
814 encoding_pos = buf.find("\"",encoding_pos)+1;
815 string::size_type end = buf.find("\"",encoding_pos);
816 encoding[iseg++] = buf.substr(encoding_pos,end-encoding_pos);
817 }
818 assert(iseg == seg_start.size() );
819
820 // print out table of segments [seg_start,seg_end] and corresponding igf
821 assert(seg_start.size()==seg_end.size());
822 #if DEBUG
823 for ( int i = 0; i < seg_start.size(); i++ )
824 {
825 cout << rctxt.mype() << ": segment [" << seg_start[i] << ","
826 << seg_end[i] << "] igf=" << igf[i] << " ("
827 << seg_end[i]-seg_start[i] << " bytes)"
828 << " encoding=" << encoding[i]
829 << endl;
830 }
831 #endif
832
833 tm.reset();
834 tm.start();
835
836 ////////////////////////////////////////////////////////////////////////////
837 // Fix boundaries:
838 ////////////////////////////////////////////////////////////////////////////
839
840 // first fix all text_encoded boundaries
841 // on all nodes involved in a text encoded boundary
842 string from_right, to_left;
843 if ( left_encoding == "text" || right_encoding == "text" )
844 {
845 if ( right_encoding == "text" )
846 {
847 // post string_receive from right
848
849 // next line: cannot have right_encoding = text on last task
850 assert(rctxt.mype() < rctxt.size()-1);
851
852 rctxt.string_recv(from_right,0,rctxt.mype()+1);
853 buf.append(from_right);
854
855 // adjust ending offset of last segment
856 seg_end[seg_end.size()-1] += from_right.size();
857
858 }
859 if (left_encoding == "text")
860 {
861 // send string up to first separator to left
862 // next line: cannot have left_encoding = text on task 0
863 assert(!rctxt.onpe0());
864
865 string::size_type pos = buf.find_first_of(" \n<",0);
866 to_left = buf.substr(0,pos);
867 rctxt.string_send(to_left,0,rctxt.mype()-1);
868
869 // adjust starting offset of first segment
870 seg_start[0] = pos;
871
872 // remove first pos characters and shift all segment indices
873 buf.erase(0,pos);
874 for ( int iseg = 0; iseg < seg_start.size(); iseg++ )
875 {
876 seg_start[iseg] -= pos;
877 seg_end[iseg] -= pos;
878 }
879 }
880 }
881
882 // at this point, all text_encoded boundaries are fixed
883
884 // fix base64 boundaries
885
886 // all nodes having a base64 encoded boundary
887 if ( left_encoding == "base64" || right_encoding == "base64" )
888 {
889 // count valid base64 chars in last segment
890 // count = number of chars in "A-z,a-z,0-9,+,/,=" in last segment
891 // Note: the last segment may also be the first, if only one segment
892 int count = 0;
893 int last_seg = seg_start.size()-1;
894 // count number of valid base64 chars in segment last_seg
895 for ( int i = seg_start[last_seg]; i < seg_end[last_seg]; i++ )
896 {
897 int ch = buf[i];
898 if ( isalnum(ch) || ch == '+' || ch == '/' || ch == '=' )
899 count++;
900 }
901 #if DEBUG
902 cout << rctxt.mype() << ": valid base64 chars in last seg="
903 << count << endl;
904 #endif
905
906 string send_left;
907 int missing_on_left = 0;
908 if ( left_encoding == "base64" )
909 {
910 // each node except node 0 posts a receive of the number of missing
911 // valid chars from its left neighbour
912 if ( !rctxt.onpe0() )
913 rctxt.irecv(1,1,&missing_on_left,1,0,rctxt.mype()-1);
914
915 // search for missing_on_left valid characters in buf[0]
916 // Note: it is assumed that there are always enough valid
917 // characters in the first segment to align on a 128 char boundary
918 // 3-doubles = 192 bits
919 // one 4-char block encodes 24 bits
920 // boundary at 8 4-char blocks = 32 chars
921 assert(missing_on_left < 32);
922
923 // Note: the number of chars missing on left can be larger than
924 // the total number of chars available in the first segment.
925 // In that case, all characters in the first segment are sent
926 string::size_type pos = 0;
927 int n = 0;
928 while ( pos < seg_end[0] && n < missing_on_left )
929 {
930 char c = buf[pos++];
931 if ( isalnum(c) || c == '+' || c == '/' || c == '=' )
932 {
933 send_left.push_back(c);
934 n++;
935 }
936 }
937 // adjust start of segment offset
938 seg_start[0] = pos;
939 // remove first pos characters and shift all segment indices
940 buf.erase(0,pos);
941 for ( int iseg = 0; iseg < seg_start.size(); iseg++ )
942 {
943 seg_start[iseg] -= pos;
944 seg_end[iseg] -= pos;
945 }
946 }
947
948 // if the last segment is also the first (i.e. only one segment)
949 // remove chars sent to left from valid char count
950 if ( seg_start.size() == 1 )
951 count -= missing_on_left;
952
953 int missing_on_right = 0;
954 if ( right_encoding == "base64" )
955 {
956 // compute number of missing valid chars on the right
957 // Request enough characters to fall on a 32-char boundary
958 // number of missing *valid* chars requested is (32 - count % 32)
959 // Note: there may not be enough chars in the first segment of the
960 // right neighbor to return that many chars. In that case, fewer
961 // chars will be received from the right neighbor
962 int countmod32 = count % 32;
963 if ( countmod32 != 0 )
964 missing_on_right = 32 - countmod32;
965 #if DEBUG
966 cout << rctxt.mype() << ": missing_on_right="
967 << missing_on_right << endl;
968 #endif
969
970 // each node except the last sends the number of missing chars to its
971 // right neighbour (receive is already posted by other nodes)
972 // each node except the last posts a string receive from the right node
973
974 if ( rctxt.mype() < rctxt.size()-1 )
975 {
976 rctxt.isend(1,1,&missing_on_right,1,0,rctxt.mype()+1);
977 // receive string from task mype+1
978 string missing_chars;
979 rctxt.string_recv(missing_chars,0,rctxt.mype()+1);
980 // append to buf
981 buf.append(missing_chars);
982 seg_end[last_seg] += missing_chars.size();
983 }
984 }
985
986 if ( left_encoding == "base64" )
987 {
988 // each node except node 0 sends string "send_left" to left node
989 if ( !rctxt.onpe0() )
990 {
991 rctxt.string_send(send_left,0,rctxt.mype()-1);
992 #if DEBUG
993 cout << rctxt.mype() << ": sent " << send_left.size()
994 << " chars to left task: " << send_left << endl;
995 #endif
996 }
997 }
998 }
999
1000 // print out table of segments [seg_start,seg_end] and corresponding igf
1001 #if DEBUG
1002 for ( int i = 0; i < seg_start.size(); i++ )
1003 {
1004 cout << rctxt.mype() << ": segment [" << seg_start[i] << ","
1005 << seg_end[i] << "] igf=" << igf[i] << " ("
1006 << seg_end[i]-seg_start[i] << " bytes)"
1007 << " encoding=" << encoding[i]
1008 << endl;
1009 }
1010 #endif
1011
1012 tm.stop();
1013 if ( ctxt.onpe0() )
1014 cout << " XMLGFPreprocessor: boundary adjustment time: "
1015 << tm.real() << endl;
1016 tm.reset();
1017 tm.start();
1018
1019 // Transcode segments
1020 // Instantiate a Base64Transcoder
1021 Base64Transcoder xcdr;
1022
1023 vector<vector<double> > dbuf;
1024 dbuf.resize(seg_start.size());
1025 for ( int iseg = 0; iseg < seg_start.size(); iseg++ )
1026 {
1027 int nchars = seg_end[iseg]-seg_start[iseg];
1028 if ( encoding[iseg] == "base64" )
1029 {
1030 // Base64 case:
1031 // the dimension of the dbuf array is computed from the *total* number
1032 // of bytes in the segment, i.e. will be overestimated slightly
1033 // since not all bytes are valid base64 bytes
1034 dbuf[iseg].resize((((nchars/4)*3)/8));
1035 #if DEBUG
1036 cout << rctxt.mype() << ": iseg=" << iseg
1037 << " dbufsize=" << dbuf[iseg].size()
1038 << endl;
1039 #endif
1040 int nbytes = xcdr.decode(nchars,buf.data()+seg_start[iseg],
1041 (byte*)&dbuf[iseg][0]);
1042 #if DEBUG
1043 cout << rctxt.mype() << ": iseg=" << iseg << " nbytes=" << nbytes
1044 << endl;
1045 #endif
1046 assert(nbytes % 8 == 0 );
1047 int ndoubles = nbytes / 8;
1048 assert(ndoubles <= dbuf[iseg].size());
1049 // adjust size of double array
1050 dbuf[iseg].resize(ndoubles);
1051 }
1052 else
1053 {
1054 // text encoding
1055 #if DEBUG
1056 cout << rctxt.mype() << ": text in iseg=" << iseg << ": "
1057 << buf.substr(seg_start[iseg],80) << endl;
1058 #endif
1059 istringstream stst(buf.substr(seg_start[iseg],nchars));
1060 double d;
1061 while ( stst >> d )
1062 {
1063 dbuf[iseg].push_back(d);
1064 }
1065 #if DEBUG
1066 cout << rctxt.mype() << ": doubles read from text: "
1067 << dbuf[iseg].size() << endl;
1068 #endif
1069 }
1070 }
1071
1072 tm.stop();
1073 if ( ctxt.onpe0() )
1074 cout << " XMLGFPreprocessor: transcoding time: " << tm.real() << endl;
1075 tm.reset();
1076 tm.start();
1077
1078 ////////////////////////////////////////////////////////////////////////////
1079 // byte swapping on big-endian platforms
1080 #if PLT_BIG_ENDIAN
1081 for ( int iseg = 0; iseg < seg_start.size(); iseg++ )
1082 xcdr.byteswap_double(dbuf[iseg].size(),&dbuf[iseg][0]);
1083
1084 tm.stop();
1085 if ( ctxt.onpe0() )
1086 cout << " XMLGFPreprocessor: byte swapping time: " << tm.real() << endl;
1087 tm.reset();
1088 tm.start();
1089 #endif
1090
1091 #if DEBUG
1092 for ( int iseg = 0; iseg < seg_start.size(); iseg++ )
1093 {
1094 cout << rctxt.mype() << ": seg_data[iseg=" << iseg << "]: size="
1095 << dbuf[iseg].size() << " ";
1096 if ( dbuf[iseg].size() >=3 )
1097 cout << dbuf[iseg][0] << " " << dbuf[iseg][1]
1098 << " " << dbuf[iseg][2];
1099 cout << endl;
1100 }
1101 #endif
1102
1103 ////////////////////////////////////////////////////////////////////////////
1104 // Collect all data in dbuf into a DoubleMatrix of dimension maxgfdim x ngf
1105 // Note: the largest grid must be used to dimension the matrix.
1106 // Determine the largest dimension of the grid_functions
1107 // use dmax call
1108
1109 // compute size of largest grid
1110 int maxgfsize = 0;
1111 valarray<int> gfsize(ngf);
1112 gfsize = 0;
1113 for ( int iseg = 0; iseg < seg_start.size(); iseg++ )
1114 {
1115 for ( int i = 0; i < ngf; i++ )
1116 if ( igf[iseg] == i )
1117 gfsize[i] = dbuf[iseg].size();
1118 }
1119 rctxt.isum(ngf,1,&gfsize[0],ngf);
1120 if ( ngf > 0 )
1121 maxgfsize = gfsize.max();
1122 #if DEBUG
1123 cout << rctxt.mype() << ": maxgfsize=" << maxgfsize << endl;
1124 for ( int i = 0; i < ngf; i++ )
1125 cout << rctxt.mype() << ": igf=" << i << " gfsize=" << gfsize[i]
1126 << endl;
1127 #endif
1128
1129 // determine block sizes of matrix gfdata
1130 // note: use ctxt, not rctxt
1131 int gfmb = maxgfsize / ctxt.nprow() +
1132 ( maxgfsize % ctxt.nprow() != 0 ? 1 : 0 );
1133 int gfnb = ngf / ctxt.npcol() +
1134 ( ngf % ctxt.npcol() != 0 ? 1 : 0 );
1135 gfdata.resize(maxgfsize,ngf,gfmb,gfnb);
1136 #if DEBUG
1137 cout << ctxt.mype() << ": gfdata resized: (" << maxgfsize << "x" << ngf
1138 << ") (" << gfmb << "x" << gfnb << ") blocks" << endl;
1139 cout << ctxt.mype() << ": gfdata.context(): " << gfdata.context();
1140 #endif
1141
1142 // prepare buffer sbuf for all_to_all operation
1143 int sbufsize = 0;
1144 for ( int iseg = 0; iseg < seg_start.size(); iseg++ )
1145 sbufsize += dbuf[iseg].size();
1146
1147 valarray<double> sbuf(sbufsize);
1148
1149 // determine offset in the grid of the first element of the first
1150 // segment
1151 // Each task having start_within==true posts a receive from its left
1152 // task
1153 // Each task having end_within==true computes the offset of the last
1154 // element of its last segment and sends it to its right neighbour
1155
1156 int left_offset = 0;
1157 if ( start_within )
1158 {
1159 rctxt.irecv(1,1,&left_offset,1,0,rctxt.mype()-1);
1160 }
1161 if ( end_within )
1162 {
1163 // compute offset of last element of last segment
1164 int right_offset = 0;
1165 if ( dbuf.size() == 1 )
1166 {
1167 // if there is only one segment, take left offset into account
1168 right_offset = left_offset + dbuf[seg_start.size()-1].size();
1169 }
1170 else
1171 {
1172 right_offset = dbuf[seg_start.size()-1].size();
1173 }
1174 #if DEBUG
1175 cout << rctxt.mype() << ": right_offset=" << right_offset << endl;
1176 #endif
1177 rctxt.isend(1,1,&right_offset,1,0,rctxt.mype()+1);
1178 }
1179 #if DEBUG
1180 cout << rctxt.mype() << ": left_offset=" << left_offset << endl;
1181 #endif
1182
1183 // compute array sbuf
1184 // copy into sbuf all data to be sent
1185 // data going to task irow icol is in subsegment irow
1186 // for each task itask
1187 // compute irow,icol of itask, igfmin, igfmax, min_offset, max_offset
1188 // for each segment iseg
1189 // if igf[iseg] falls on task itask
1190 // for each block of segment iseg
1191 // if offset of this block falls in [min_offset,max_offset]
1192 // copy block to sbuf
1193 // adjust scount[itask]
1194 valarray<int> scounts(0,rctxt.size()), sdispl(0,rctxt.size());
1195 valarray<int> rcounts(0,rctxt.size()), rdispl(0,rctxt.size());
1196
1197 int sbuf_pos = 0;
1198 for ( int itask = 0; itask < ctxt.size(); itask++ )
1199 {
1200 // determine block sizes of the block cyclic distribution of gfdata
1201 const int irow = itask % ctxt.nprow();
1202 const int icol = itask / ctxt.nprow();
1203
1204 const int igfmin = icol * gfnb;
1205 // nb_loc: size of block on task icol
1206 // nb_loc is gfnb for icol < kn
1207 // ngf-k*gfnb for icol == kn
1208 // 0 for icol > kn
1209 // where kn = int(ngf/gfnb)
1210 const int kn = (gfnb>0) ? ngf / gfnb : 0;
1211 int nb_loc;
1212 if ( icol < kn )
1213 nb_loc = gfnb;
1214 else if ( icol == kn )
1215 nb_loc = ngf - kn * gfnb;
1216 else
1217 nb_loc = 0;
1218 const int igfmax = igfmin + nb_loc;
1219
1220 const int min_offset = irow * gfmb;
1221 // mb_loc: size of block on task irow
1222 // mb_loc is gfmb for irow < km
1223 // maxgfsize-k*gfmb for irow == km
1224 // 0 for irow > km
1225 // where km = int(maxgfsize/gfmb)
1226 const int km = (gfmb>0) ? maxgfsize / gfmb : 0;
1227 int mb_loc;
1228 if ( irow < km )
1229 mb_loc = gfmb;
1230 else if ( irow == km )
1231 mb_loc = maxgfsize - km * gfmb;
1232 else
1233 mb_loc = 0;
1234
1235 const int max_offset = min_offset + mb_loc;
1236
1237 sdispl[itask] = sbuf_pos;
1238 for ( int iseg = 0; iseg < seg_start.size(); iseg++ )
1239 {
1240 const int segsize = dbuf[iseg].size();
1241
1242 if ( igf[iseg] >= igfmin && igf[iseg] < igfmax )
1243 {
1244 // igf falls within the states stored on itask
1245
1246 // Find if there is an intersection of this segment
1247 // with the range [min_offset,max_offset[ on task itask
1248
1249 int block_start, block_size = 0;
1250
1251 if ( iseg == 0 )
1252 {
1253 // On segment 0, the available range is
1254 // [left_offset,left_offset+segsize[
1255 // There is overlap if min_offset < left_offset+segsize
1256 // and max_offset > left_offset
1257 // If there is overlap, the range to be sent is
1258 // [max(min_offset-left_offset,0) ,
1259 // min(max_offset-left_offset,segsize)[
1260 if ( min_offset < left_offset+segsize && max_offset > left_offset )
1261 {
1262 // there is overlap
1263 block_start = max(min_offset-left_offset,0);
1264 block_size = min(max_offset-left_offset,segsize) - block_start;
1265 }
1266 }
1267 else
1268 {
1269 // On all segments except segment 0, the available range
1270 // is [0,segsize[
1271 // There is overlap if min_offset < segsize
1272 // (and if max_offset > 0, but this is always true)
1273 // If there is overlap, the range to be sent is
1274 // [min_offset, min(max_offset,segsize[
1275 if ( min_offset < segsize )
1276 {
1277 // there is overlap
1278 block_start = min_offset;
1279 block_size = min(max_offset,segsize) - block_start;
1280 }
1281 }
1282
1283 if ( block_size > 0 )
1284 {
1285 // copy block to sbuf
1286 memcpy((void*)&sbuf[sbuf_pos],(void*)&dbuf[iseg][block_start],
1287 block_size*sizeof(double));
1288 sbuf_pos += block_size;
1289 scounts[itask] += block_size;
1290
1291 #if DEBUG
1292 cout << rctxt.mype() << ": sbuf: iseg=" << iseg << " sending start="
1293 << block_start << " size=" << block_size << " to task " << itask
1294 << " (irow=" << irow << ",icol=" << icol << ")" << endl;
1295 #endif
1296 }
1297 }
1298 }
1299 }
1300
1301 #if USE_MPI
1302 // send scount array using all_to_all call
1303 valarray<int> a2a_scounts(1,ctxt.size()),a2a_rcounts(1,ctxt.size()),
1304 a2a_sdispl(0,ctxt.size()),a2a_rdispl(0,ctxt.size());
1305 for ( int i = 0; i < ctxt.size(); i++ )
1306 {
1307 a2a_rdispl[i] = a2a_sdispl[i] = i;
1308 }
1309 int status = MPI_Alltoallv(&scounts[0],&a2a_scounts[0],&a2a_sdispl[0],
1310 MPI_INT,&rcounts[0],&a2a_rcounts[0],&a2a_rdispl[0],MPI_INT,
1311 rctxt.comm());
1312 assert(status==0);
1313
1314 rdispl[0] = 0;
1315 for ( int i = 1; i < ctxt.size(); i++ )
1316 rdispl[i] = rdispl[i-1] + rcounts[i-1];
1317
1318 int rbufsize = 0;
1319 for ( int i = 0; i < ctxt.size(); i++ )
1320 rbufsize += rcounts[i];
1321 #if DEBUG
1322 cout << ctxt.mype() << ": "
1323 << " rbufsize: " << rbufsize << endl;
1324 #endif
1325
1326 valarray<double> rbuf(rbufsize);
1327
1328 status = MPI_Alltoallv(&sbuf[0],&scounts[0],&sdispl[0],MPI_DOUBLE,
1329 &rbuf[0],&rcounts[0],&rdispl[0],MPI_DOUBLE,rctxt.comm());
1330 assert(status==0);
1331
1332 #else // USE_MPI
1333 valarray<double> rbuf(sbuf);
1334 #endif // USE_MPI
1335
1336 // copy data from rbuf to gfdata.valptr()
1337 // functions in rbuf can have varying length
1338 // determine bounds igfmin, igfmax of functions in rbuf
1339 // then use gfsize[igf] to copy one function at a time to gfdata
1340 //
1341 // compute block sizes for current node
1342 const int irow = ctxt.myrow();
1343 const int icol = ctxt.mycol();
1344
1345 const int igfminloc = icol * gfnb;
1346 // nb_loc: size of block on task icol
1347 // nb_loc is gfnb for icol < kn
1348 // ngf-k*gfnb for icol == kn
1349 // 0 for icol > kn
1350 // where kn = int(ngf/gfnb)
1351 const int kn = (gfnb>0) ? ngf / gfnb : 0;
1352 int nb_loc;
1353 if ( icol < kn )
1354 nb_loc = gfnb;
1355 else if ( icol == kn )
1356 nb_loc = ngf - kn * gfnb;
1357 else
1358 nb_loc = 0;
1359 const int igfmaxloc = igfminloc + nb_loc;
1360
1361 #if DEBUG
1362 cout << ctxt.mype() << ": "
1363 << " igfminloc: " << igfminloc << " igfmaxloc: " << igfmaxloc << endl;
1364 #endif
1365 int rbuf_pos = 0;
1366 int igfloc = 0;
1367 for ( int igf = igfminloc; igf < igfmaxloc; igf++ )
1368 {
1369 // mb_loc: size of block on task irow
1370 // mb_loc is gfmb for irow < km
1371 // gfsize-k*gfmb for irow == km
1372 // 0 for irow > km
1373 // where km = int(gfsize/gfmb)
1374 const int km = (gfmb>0) ? gfsize[igf] / gfmb : 0;
1375 int mb_loc;
1376 if ( irow < km )
1377 mb_loc = gfmb;
1378 else if ( irow == km )
1379 mb_loc = gfsize[igf] - km * gfmb;
1380 else
1381 mb_loc = 0;
1382
1383 #if DEBUG
1384 cout << " copying gfsize[" << igf << "] = " << gfsize[igf]
1385 << " block size: " << mb_loc
1386 << " gfdata.mloc(): " << gfdata.mloc() << endl;
1387 #endif
1388
1389 // copy rbuf[rbuf_pos] size mb_loc to
1390 // gfdata.valptr(icol*mloc())
1391 const int dest = igfloc * gfdata.mloc();
1392 memcpy(gfdata.valptr(dest),&rbuf[rbuf_pos],mb_loc*sizeof(double));
1393 rbuf_pos += mb_loc;
1394 igfloc++;
1395 }
1396
1397 tm.stop();
1398 if ( ctxt.onpe0() )
1399 cout << " XMLGFPreprocessor: data redistribution time: "
1400 << tm.real() << endl;
1401 tm.reset();
1402 tm.start();
1403
1404 // compact XML data:
1405 // erase <grid_function> contents from XML data
1406 // Note: when removing the data from <grid_function>, add
1407 // the attribute xsi:null="true"
1408 // Also add to the schema definition: nillable="true" in the
1409 // definition of element grid_function
1410
1411 // delete segment data
1412 for ( int iseg = seg_start.size()-1; iseg >= 0; iseg--)
1413 {
1414 //cout << " erasing segment: ["
1415 // << seg_start[iseg] << "," << seg_end[iseg] << "]" << endl;
1416 buf.erase(seg_start[iseg],seg_end[iseg]-seg_start[iseg]);
1417 }
1418 //cout << " buf.size() after erase: " << buf.size() << endl;
1419
1420 // collect all XML data
1421 // Distribute sizes of local strings to all tasks
1422 // and store in array rcounts
1423
1424 #if USE_MPI
1425 int xmlcontentsize = buf.size();
1426 status = MPI_Allgather(&xmlcontentsize,1,MPI_INT,&rcounts[0],1,
1427 MPI_INT,rctxt.comm());
1428 assert(status==0);
1429 // compute total size
1430 int totalsize = 0;
1431 for ( int i = 0; i < rctxt.size(); i++ )
1432 totalsize += rcounts[i];
1433
1434 rdispl[0] = 0;
1435 for ( int i = 1; i < rctxt.size(); i++ )
1436 rdispl[i] = rdispl[i-1] + rcounts[i-1];
1437
1438 xmlcontent.resize(totalsize);
1439
1440 // Allgatherv xml content
1441 status = MPI_Allgatherv(&buf[0],xmlcontentsize,MPI_CHAR,&xmlcontent[0],
1442 &rcounts[0],&rdispl[0],MPI_CHAR,rctxt.comm());
1443 #else
1444 xmlcontent = buf;
1445 #endif // USE_MPI
1446 tm.stop();
1447 if ( ctxt.onpe0() )
1448 cout << " XMLGFPreprocessor: XML compacting time: " << tm.real() << endl;
1449 tm.reset();
1450 tm.start();
1451
1452 // data is now available in the form of the DoubleMatrix gfdata
1453 //
1454 // Redistribution of the data can be done using the DoubleMatrix::getsub
1455 // and DoubleMatrix::operator= members
1456 //
1457 // 1) Use getsub to get only wf (not dwf) in a DoubleMatrix distributed over
1458 // the global context
1459 // 2) Use operator= to redistribute the gridfunctions to the same
1460 // context as Wavefunction
1461
1462 // Fourier transforms
1463 // WavefunctionHandler can use the real-space data in the redistributed
1464 // gfdata matrix to compute Fourier coefficients, which completes the
1465 // load operation
1466
1467 ////////////////////////////////////////////////////////////////////////////
1468 // end of program
1469
1470 ctxt.barrier();
1471 ttm.stop();
1472 if ( ctxt.onpe0() )
1473 {
1474 cout << " XMLGFPreprocessor: total time: " << ttm.real() << endl;
1475 }
1476 return 0;
1477 }
1478