1 /* Support for MPI parallelization.
2 *
3 * Contents:
4 * 1. Communicating optional arrays.
5 * 2. Communicating ESL_SQ (single biosequences)
6 * 3. Communicating ESL_MSA (multiple sequence alignments).
7 * 4. Communicating ESL_STOPWATCH (process timing).
8 * 5. Unit tests.
9 * 6. Test driver.
10 * 7. Example.
11 */
12 #include "esl_config.h"
13 #if defined(HAVE_MPI)
14 #include <string.h>
15 #include "mpi.h"
16
17 #include "easel.h"
18 #include "esl_msa.h"
19 #include "esl_sq.h"
20 #include "esl_stopwatch.h"
21 #include "esl_mpi.h"
22
23
24
25 /*****************************************************************
26 *# 1. Communicating optional arrays.
27 *****************************************************************/
28
29 /* Function: esl_mpi_PackOpt()
30 * Synopsis: Pack an optional, variable-sized array (or string).
31 * Incept: SRE, Sat Jun 2 08:40:39 2007 [Janelia]
32 *
33 * Purpose: Pack data array <inbuf> of <incount> elements of type <type> into
34 * an MPI packed buffer <pack_buf> of total size <pack_buf_size> destined
35 * for MPI communicator <comm> that is currently filled to position <*position>.
36 *
37 * <inbuf> may be <NULL>, in which case <incount> is
38 * assumed to be 0, and a `null array' is packed that
39 * <esl_mpi_UnpackOpt()> knows how to decode as a <NULL>
40 * pointer.
41 *
42 * As a special case for strings, if <type> is <MPI_CHAR>,
43 * <incount> may be passed as <-1> to indicate `unknown';
44 * the routine will use <strlen(inbuf)+1> to determine the
45 * size of the string including its <NUL> terminator.
46 *
47 * Returns: <eslOK> on success, the array is packed into <pack_buf>,
48 * and the <*position> counter is updated to point to the next byte
49 * in <pack_buf> after the packed array.
50 *
51 * Throws: <eslESYS> if an MPI call fails.
52 */
53 int
esl_mpi_PackOpt(void * inbuf,int incount,MPI_Datatype type,void * pack_buf,int pack_buf_size,int * position,MPI_Comm comm)54 esl_mpi_PackOpt(void *inbuf, int incount, MPI_Datatype type, void *pack_buf, int pack_buf_size, int *position, MPI_Comm comm)
55 {
56 if (inbuf == NULL) {
57 incount = 0;
58 if (MPI_Pack(&incount, 1, MPI_INT, pack_buf, pack_buf_size, position, comm) != 0) ESL_EXCEPTION(eslESYS, "MPI pack failed");
59 } else {
60 if (incount == -1 && type == MPI_CHAR) incount = strlen(inbuf) + 1;
61 if (MPI_Pack(&incount, 1, MPI_INT, pack_buf, pack_buf_size, position, comm) != 0) ESL_EXCEPTION(eslESYS, "MPI pack failed");
62 if (MPI_Pack(inbuf, incount, type, pack_buf, pack_buf_size, position, comm) != 0) ESL_EXCEPTION(eslESYS, "MPI pack failed");
63 }
64 return eslOK;
65 }
66
67 /* Function: esl_mpi_PackOptSize()
68 * Synopsis: Determine the size of a packed optional, variable-sized array.
69 * Incept: SRE, Sat Jun 2 10:09:16 2007 [Janelia]
70 *
71 * Purpose: Determine an upper bound on the size (in bytes) required
72 * to pack an array <inbuf> of <incount> elements of type
73 * <type> destined for MPI communicator <comm> using
74 * <esl_mpi_PackOpt()>, and return it in <*ret_n>.
75 *
76 * If <inbuf> is non-<NULL>, the packed message consists
77 * of 1 integer (the length, <incount>) followed by the array.
78 * If <inbuf> is <NULL>, the packed message consists of one
79 * integer (0).
80 *
81 * As a special case for strings, if <type> is <MPI_CHAR>,
82 * <incount> may be passed as <-1> to indicate `unknown';
83 * in this case, the routine uses <strlen(inbuf)+1> to determine the
84 * size of the string including its <NUL> terminator.
85 *
86 * Returns: <eslOK> on success, and <*ret_n> contains the upper limit size in
87 * bytes.
88 *
89 * Throws: <eslESYS> if an MPI call fails, and <*ret_n> is 0.
90 */
91 int
esl_mpi_PackOptSize(void * inbuf,int incount,MPI_Datatype type,MPI_Comm comm,int * ret_n)92 esl_mpi_PackOptSize(void *inbuf, int incount, MPI_Datatype type, MPI_Comm comm, int *ret_n)
93 {
94 int status;
95 int sz;
96
97 *ret_n = 0;
98 if (inbuf == NULL) {
99 status = MPI_Pack_size(1, MPI_INT, MPI_COMM_WORLD, &sz); *ret_n += sz; if (status != 0) ESL_XEXCEPTION(eslESYS, "mpi pack size failed");
100 } else {
101 if (incount == -1 && type == MPI_CHAR) incount = strlen(inbuf) + 1;
102 status = MPI_Pack_size(1, MPI_INT, MPI_COMM_WORLD, &sz); *ret_n += sz; if (status != 0) ESL_XEXCEPTION(eslESYS, "mpi pack size failed");
103 status = MPI_Pack_size(incount, type, MPI_COMM_WORLD, &sz); *ret_n += sz; if (status != 0) ESL_XEXCEPTION(eslESYS, "mpi pack size failed");
104 }
105 return eslOK;
106
107 ERROR:
108 *ret_n = 0;
109 return status;
110 }
111
112
113
114 /* Function: esl_mpi_UnpackOpt()
115 * Synopsis: Unpack an optional, variable-sized array (or string).
116 * Incept: SRE, Sat Jun 2 08:39:39 2007 [Janelia]
117 *
118 * Purpose: Unpack a packed MPI message in buffer <pack_buf>, of total size
119 * <pack_buf_size>, at current position <*pos> in <pack_buf>,
120 * for MPI communicator <comm>, where the next packed element is an optional
121 * array of type <type>, consisting of a <(n,data)> pair, with <n=0>
122 * indicating no data.
123 *
124 * If array data is present (<n>0>), allocate <*outbuf>,
125 * put the array in it, and optionally return <n> in
126 * <*opt_n>. The caller is responsible for free'ing this
127 * <*outbuf>.
128 *
129 * If data are not present (<n=0>), no allocation is done,
130 * <*outbuf> is set to <NULL>, and the optional <*opt_n> is
131 * 0.
132 *
133 * <*pos> is updated to point at the next element in <pack_buf>
134 * that needs to be unpacked.
135 *
136 * This routine is designed for an optional-array idiom in
137 * which <array==NULL> means the array isn't available, and
138 * otherwise the array contains valid data. For instance,
139 * this is used for optional annotation on multiple
140 * alignments.
141 *
142 * Returns: <eslOK> on success; <*pos> is updated; <*outbuf> is either a newly allocated
143 * array (that caller is responsible for freeing) and optional <*opt_n>
144 * is its length, or <*outbuf> is <NULL> and optional <*opt_n> is 0.
145 *
146 * Throws: <eslESYS> on an MPI call failure; <eslEINVAL> if something's wrong
147 * with the arguments; <eslEMEM> on allocation failure.
148 * In either case, <*outbuf> is <NULL> and optional <*opt_n> is 0.
149 */
150 int
esl_mpi_UnpackOpt(void * pack_buf,int pack_buf_size,int * pos,void ** outbuf,int * opt_n,MPI_Datatype type,MPI_Comm comm)151 esl_mpi_UnpackOpt(void *pack_buf, int pack_buf_size, int *pos, void **outbuf, int *opt_n, MPI_Datatype type, MPI_Comm comm)
152 {
153 int sz;
154 int status;
155
156 if (MPI_Unpack(pack_buf, pack_buf_size, pos, &sz, 1, MPI_INT, comm) != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
157
158 if (sz == 0) {
159 *outbuf = NULL;
160 } else {
161 if (type == MPI_CHAR) ESL_ALLOC(*outbuf, sizeof(char) * sz);
162 else if (type == MPI_SHORT) ESL_ALLOC(*outbuf, sizeof(short) * sz);
163 else if (type == MPI_INT) ESL_ALLOC(*outbuf, sizeof(int) * sz);
164 else if (type == MPI_LONG) ESL_ALLOC(*outbuf, sizeof(long) * sz);
165 else if (type == MPI_UNSIGNED_CHAR) ESL_ALLOC(*outbuf, sizeof(unsigned char) * sz);
166 else if (type == MPI_UNSIGNED_SHORT) ESL_ALLOC(*outbuf, sizeof(unsigned short) * sz);
167 else if (type == MPI_UNSIGNED) ESL_ALLOC(*outbuf, sizeof(unsigned int) * sz);
168 else if (type == MPI_UNSIGNED_LONG) ESL_ALLOC(*outbuf, sizeof(unsigned long) * sz);
169 else if (type == MPI_FLOAT) ESL_ALLOC(*outbuf, sizeof(float) * sz);
170 else if (type == MPI_DOUBLE) ESL_ALLOC(*outbuf, sizeof(double) * sz);
171 else if (type == MPI_LONG_DOUBLE) ESL_ALLOC(*outbuf, sizeof(long double) * sz);
172 else if (type == MPI_BYTE) ESL_ALLOC(*outbuf, sz);
173 else if (type == MPI_PACKED) ESL_ALLOC(*outbuf, sz);
174 else ESL_XEXCEPTION(eslEINVAL, "no such MPI datatype");
175
176 if (MPI_Unpack(pack_buf, pack_buf_size, pos, *outbuf, sz, type, comm) != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
177 }
178 if (opt_n != NULL) *opt_n = sz;
179 return eslOK;
180
181 ERROR:
182 if (*outbuf != NULL) free(*outbuf);
183 *outbuf = NULL;
184 if (opt_n != NULL) *opt_n = 0;
185 return status;
186 }
187 /*--------------------- end, optional arrays -------------------*/
188
189
190
191 /*****************************************************************
192 *# 2. Communicating ESL_SQ (single biosequences)
193 *****************************************************************/
194
195 /* Function: esl_sq_MPISend()
196 * Synopsis: Send an ESL_SQ as an MPI work unit.
197 * Incept: ER, Thu Jun 19 10:39:49 EDT 2008 [Janelia]
198 *
199 * Purpose: Sends an <ESL_SQ> <esl_sq> as a work unit to MPI process
200 * <dest> (where <dest> ranges from 0..<nproc-1>), tagged
201 * with MPI tag <tag>, for MPI communicator <comm>, as
202 * the sole workunit or result.
203 *
204 * Work units are prefixed by a status code. If <esl_sq> is
205 * <non-NULL>, the work unit is an <eslOK> code followed by
206 * the packed <ESL_SQ>. If <esl_sq> is NULL, the work unit is an
207 * <eslEOD> code, which <esl_sq_MPIRecv()> knows how to
208 * interpret; this is typically used for an end-of-data
209 * signal to cleanly shut down worker processes.
210 *
211 * In order to minimize alloc/free cycles in this routine,
212 * caller passes a pointer to a working buffer <*buf> of
213 * size <*nalloc> characters. If necessary (i.e. if <esl_sq> is
214 * too big to fit), <*buf> will be reallocated and <*nalloc>
215 * increased to the new size. As a special case, if <*buf>
216 * is <NULL> and <*nalloc> is 0, the buffer will be
217 * allocated appropriately, but the caller is still
218 * responsible for free'ing it.
219 *
220 * Returns: <eslOK> on success; <*buf> may have been reallocated and
221 * <*nalloc> may have been increased.
222 *
223 * Throws: <eslESYS> if an MPI call fails; <eslEMEM> if a malloc/realloc
224 * fails. In either case, <*buf> and <*nalloc> remain valid and useful
225 * memory (though the contents of <*buf> are undefined).
226 *
227 */
228 int
esl_sq_MPISend(ESL_SQ * sq,int dest,int tag,MPI_Comm comm,char ** buf,int * nalloc)229 esl_sq_MPISend(ESL_SQ *sq, int dest, int tag, MPI_Comm comm, char **buf, int *nalloc)
230 {
231 int status;
232 int code;
233 int sz, n, pos;
234
235 /* Figure out size */
236 if (MPI_Pack_size(1, MPI_INT, comm, &n) != 0) ESL_XEXCEPTION(eslESYS, "mpi pack size failed");
237 if (sq != NULL) {
238 if ((status = esl_sq_MPIPackSize(sq, comm, &sz)) != eslOK) return status;
239 n += sz;
240 }
241 ESL_DPRINTF2(("esl_sq_MPISend(): sq has size %d\n", n));
242
243 /* Make sure the buffer is allocated appropriately */
244 if (*buf == NULL || n > *nalloc) {
245 void *tmp;
246 ESL_RALLOC(*buf, tmp, sizeof(char) * n);
247 *nalloc = n;
248 }
249 ESL_DPRINTF2(("esl_sq_MPISend(): buffer is ready\n"));
250
251 /* Pack the status code and ESL_SQ into the buffer */
252 pos = 0;
253 code = (sq == NULL) ? eslEOD : eslOK;
254 if (MPI_Pack(&code, 1, MPI_INT, *buf, n, &pos, comm) != 0) ESL_EXCEPTION(eslESYS, "mpi pack failed");
255 if (sq != NULL) {
256 if ((status = esl_sq_MPIPack(sq, *buf, n, &pos, comm)) != eslOK) return status;
257 }
258 ESL_DPRINTF2(("esl_sq_MPISend(): sq is packed into %d bytes\n", pos));
259
260 /* Send the packed ESL_SQ to the destination. */
261 if (MPI_Send(*buf, n, MPI_PACKED, dest, tag, comm) != 0) ESL_EXCEPTION(eslESYS, "mpi send failed");
262 ESL_DPRINTF2(("esl_sq_MPISend(): sq is sent.\n"));
263 return eslOK;
264
265 ERROR:
266 return status;
267 }
268
269 /* Function: esl_sq_MPIPackSize()
270 * Synopsis: Calculates size needed to pack an ESL_SQ.
271 * Incept: ER, Thu Jun 19 10:48:25 EDT 2008 [Janelia]
272 *
273 * Purpose: Calculate an upper bound on the number of bytes
274 * that <esl_sq_MPIPack()> will need to pack an <ESL_SQ>
275 * <sq> in a packed MPI message for MPI communicator
276 * <comm>; return that number of bytes in <*ret_n>.
277 *
278 * Returns: <eslOK> on success, and <*ret_n> contains the answer.
279 *
280 * Throws: <eslESYS> if an MPI call fails, and <*ret_n> is 0.
281 */
282 int
esl_sq_MPIPackSize(ESL_SQ * sq,MPI_Comm comm,int * ret_n)283 esl_sq_MPIPackSize(ESL_SQ *sq, MPI_Comm comm, int *ret_n)
284 {
285 int status;
286 int n = 0;
287 int x; /* index for optional extra residue markups */
288 int sz;
289
290 status = MPI_Pack_size ( 1, MPI_INT, comm, &sz); n += 5*sz; if (status != 0) ESL_XEXCEPTION(eslESYS, "pack size failed");
291 status = MPI_Pack_size ( 1, MPI_UNSIGNED_LONG, comm, &sz); n += 7*sz; if (status != 0) ESL_XEXCEPTION(eslESYS, "pack size failed");
292 status = MPI_Pack_size ( sq->nalloc, MPI_CHAR, comm, &sz); n += sz; if (status != 0) ESL_XEXCEPTION(eslESYS, "pack size failed");
293 status = MPI_Pack_size ( sq->aalloc, MPI_CHAR, comm, &sz); n += sz; if (status != 0) ESL_XEXCEPTION(eslESYS, "pack size failed");
294 status = MPI_Pack_size ( sq->dalloc, MPI_CHAR, comm, &sz); n += sz; if (status != 0) ESL_XEXCEPTION(eslESYS, "pack size failed");
295 status = MPI_Pack_size ( sq->srcalloc, MPI_CHAR, comm, &sz); n += sz; if (status != 0) ESL_XEXCEPTION(eslESYS, "pack size failed");
296
297 /* sequence, digital or text; the ss and extra residue markups are optional */
298 if (sq->dsq != NULL) {
299 status = MPI_Pack_size ( sq->n+2, MPI_UNSIGNED_CHAR, comm, &sz); n += sz; if (status != 0) ESL_XEXCEPTION(eslESYS, "pack size failed");
300 status = esl_mpi_PackOptSize(sq->ss, sq->n+2, MPI_CHAR, comm, &sz); n += sz; if (status != eslOK) goto ERROR;
301 for (x = 0; x < sq->nxr; x ++) {
302 status = esl_mpi_PackOptSize(sq->xr_tag[x], sq->nalloc, MPI_CHAR, comm, &sz); n += sz; if (status != eslOK) goto ERROR;
303 status = esl_mpi_PackOptSize(sq->xr[x], sq->n+2, MPI_CHAR, comm, &sz); n += sz; if (status != eslOK) goto ERROR;
304 }
305
306 }
307 else {
308 status = MPI_Pack_size ( sq->n+1, MPI_CHAR, comm, &sz); n += sz; if (status != 0) ESL_XEXCEPTION(eslESYS, "pack size failed");
309 status = esl_mpi_PackOptSize(sq->ss, sq->n+1, MPI_CHAR, comm, &sz); n += sz; if (status != eslOK) goto ERROR;
310 for (x = 0; x < sq->nxr; x ++) {
311 status = esl_mpi_PackOptSize(sq->xr_tag[x], sq->nalloc, MPI_CHAR, comm, &sz); n += sz; if (status != eslOK) goto ERROR;
312 status = esl_mpi_PackOptSize(sq->xr[x], sq->n+1, MPI_CHAR, comm, &sz); n += sz; if (status != eslOK) goto ERROR;
313 }
314
315 }
316
317 *ret_n = n;
318 return eslOK;
319
320 ERROR:
321 *ret_n = 0;
322 return status;
323 }
324
325 /* Function: esl_sq_MPIPack()
326 * Synopsis: Packs an <ESL_SQ> into MPI buffer.
327 * Incept: ER, Thu Jun 19 10:49:10 EDT 2008 [Janelia]
328 *
329 * Purpose: Packs <ESL_SQ> <esl_sq> into an MPI packed message buffer <buf>
330 * of length <n> bytes, starting at byte position <*position>,
331 * for MPI communicator <comm>.
332 *
333 * The caller must know that <buf>'s allocation of <n>
334 * bytes is large enough to append the packed <ESL_SQ> at
335 * position <*pos>. This typically requires a call to
336 * <esl_sq_MPIPackSize()> first, and reallocation if
337 * needed.
338 *
339 * Returns: <eslOK> on success; <buf> now contains the
340 * packed <esl_sq>, and <*position> is set to the byte
341 * immediately following the last byte of the <ESL_SQ>
342 * in <buf>.
343 *
344 * Throws: <eslESYS> if an MPI call fails; or <eslEMEM> if the
345 * buffer's length <n> was overflowed in trying to pack
346 * <sq> into <buf>. In either case, the state of
347 * <buf> and <*position> is undefined, and both should
348 * be considered to be corrupted.
349 */
350 int
esl_sq_MPIPack(ESL_SQ * sq,char * buf,int n,int * pos,MPI_Comm comm)351 esl_sq_MPIPack(ESL_SQ *sq, char *buf, int n, int *pos, MPI_Comm comm)
352 {
353 unsigned long int sq_n;
354 unsigned long int start;
355 unsigned long int end;
356 unsigned long int C;
357 unsigned long int W;
358 unsigned long int L;
359 unsigned long int salloc;
360 int x; /* index for optional extra residue markups */
361 int status;
362
363 sq_n = (unsigned long int)sq->n;
364 start = (unsigned long int)sq->start;
365 end = (unsigned long int)sq->end;
366 C = (unsigned long int)sq->C;
367 W = (unsigned long int)sq->W;
368 L = (unsigned long int)sq->L;
369 salloc = (unsigned long int)sq->salloc;
370
371 /* pack allocation values */
372 status = MPI_Pack (( int *) &(sq->nalloc), 1, MPI_INT, buf, n, pos, comm); if (status != 0) ESL_EXCEPTION(eslESYS, "pack failed");
373 status = MPI_Pack (( int *) &(sq->aalloc), 1, MPI_INT, buf, n, pos, comm); if (status != 0) ESL_EXCEPTION(eslESYS, "pack failed");
374 status = MPI_Pack (( int *) &(sq->dalloc), 1, MPI_INT, buf, n, pos, comm); if (status != 0) ESL_EXCEPTION(eslESYS, "pack failed");
375 status = MPI_Pack (( int *) &(sq->srcalloc), 1, MPI_INT, buf, n, pos, comm); if (status != 0) ESL_EXCEPTION(eslESYS, "pack failed");
376 status = MPI_Pack (( int *) &(sq->nxr), 1, MPI_INT, buf, n, pos, comm); if (status != 0) ESL_EXCEPTION(eslESYS, "pack failed");
377 status = MPI_Pack ((unsigned long int *) &( salloc), 1, MPI_UNSIGNED_LONG, buf, n, pos, comm); if (status != 0) ESL_EXCEPTION(eslESYS, "pack failed");
378
379 /* pack coordenate info */
380 status = MPI_Pack ((unsigned long int *) &(sq_n), 1, MPI_UNSIGNED_LONG, buf, n, pos, comm); if (status != 0) ESL_EXCEPTION(eslESYS, "pack failed");
381 status = MPI_Pack ((unsigned long int *) &(start), 1, MPI_UNSIGNED_LONG, buf, n, pos, comm); if (status != 0) ESL_EXCEPTION(eslESYS, "pack failed");
382 status = MPI_Pack ((unsigned long int *) &(end), 1, MPI_UNSIGNED_LONG, buf, n, pos, comm); if (status != 0) ESL_EXCEPTION(eslESYS, "pack failed");
383 status = MPI_Pack ((unsigned long int *) &(C), 1, MPI_UNSIGNED_LONG, buf, n, pos, comm); if (status != 0) ESL_EXCEPTION(eslESYS, "pack failed");
384 status = MPI_Pack ((unsigned long int *) &(W), 1, MPI_UNSIGNED_LONG, buf, n, pos, comm); if (status != 0) ESL_EXCEPTION(eslESYS, "pack failed");
385 status = MPI_Pack ((unsigned long int *) &(L), 1, MPI_UNSIGNED_LONG, buf, n, pos, comm); if (status != 0) ESL_EXCEPTION(eslESYS, "pack failed");
386
387 /* pack strings */
388 status = MPI_Pack (sq->name, sq->nalloc, MPI_CHAR, buf, n, pos, comm); if (status != 0) ESL_EXCEPTION(eslESYS, "pack failed");
389 status = MPI_Pack (sq->acc, sq->aalloc, MPI_CHAR, buf, n, pos, comm); if (status != 0) ESL_EXCEPTION(eslESYS, "pack failed");
390 status = MPI_Pack (sq->desc, sq->dalloc, MPI_CHAR, buf, n, pos, comm); if (status != 0) ESL_EXCEPTION(eslESYS, "pack failed");
391 status = MPI_Pack (sq->source, sq->srcalloc, MPI_CHAR, buf, n, pos, comm); if (status != 0) ESL_EXCEPTION(eslESYS, "pack failed");
392
393 /* sequences, digital or text; the ss and extra residue markups are optional */
394 if (sq->dsq != NULL) {
395 status = MPI_Pack (sq->dsq, sq->n+2, MPI_UNSIGNED_CHAR, buf, n, pos, comm); if (status != 0) ESL_EXCEPTION(eslESYS, "pack failed");
396 status = esl_mpi_PackOpt(sq->ss, sq->n+2, MPI_CHAR, buf, n, pos, comm); if (status != eslOK) return status;
397 for (x = 0; x < sq->nxr; x ++) {
398 status = esl_mpi_PackOpt(sq->xr_tag[x], sq->nalloc, MPI_CHAR, buf, n, pos, comm); if (status != eslOK) return status;
399 status = esl_mpi_PackOpt(sq->xr[x], sq->n+2, MPI_CHAR, buf, n, pos, comm); if (status != eslOK) return status;
400 }
401 }
402 else {
403 status = MPI_Pack (sq->seq, sq->n+1, MPI_CHAR, buf, n, pos, comm); if (status != 0) ESL_EXCEPTION(eslESYS, "pack failed");
404 status = esl_mpi_PackOpt(sq->ss, sq->n+1, MPI_CHAR, buf, n, pos, comm); if (status != eslOK) return status;
405 for (x = 0; x < sq->nxr; x ++) {
406 status = esl_mpi_PackOpt(sq->xr_tag[x], sq->nalloc, MPI_CHAR, buf, n, pos, comm); if (status != eslOK) return status;
407 status = esl_mpi_PackOpt(sq->xr[x], sq->n+1, MPI_CHAR, buf, n, pos, comm); if (status != eslOK) return status;
408 }
409 }
410
411 if (*pos > n) ESL_EXCEPTION(eslEMEM, "buffer overflow");
412 return eslOK;
413 }
414
415 /* Function: esl_sq_MPIUnpack()
416 * Synopsis: Unpacks an <ESL_SQ> from an MPI buffer.
417 * Incept: SRE, Thu Jun 7 11:04:46 2007 [Janelia]
418 *
419 * Purpose: Unpack a newly allocated <ESL_SQ> from MPI packed buffer
420 * <buf>, starting from position <*pos>, where the total length
421 * of the buffer in bytes is <n>.
422 *
423 * Caller may or may not already know what alphabet the <ESL_SQ>
424 * is expected to be in.
425 *
426 * Returns: <eslOK> on success. <*pos> is updated to the position of
427 * the next element in <buf> to unpack (if any). <*ret_hmm>
428 * contains a newly allocated <ESL_SQ>, which the caller is
429 * responsible for free'ing.
430 *
431 *
432 * Throws: <eslESYS> on an MPI call failure. <eslEMEM> on allocation failure.
433 * In either case, <*ret_esl_sq> is <NULL>, and the state of <buf>
434 * and <*pos> is undefined and should be considered to be corrupted.
435 */
436 int
esl_sq_MPIUnpack(const ESL_ALPHABET * abc,char * buf,int n,int * pos,MPI_Comm comm,ESL_SQ ** ret_sq)437 esl_sq_MPIUnpack(const ESL_ALPHABET *abc, char *buf, int n, int *pos, MPI_Comm comm, ESL_SQ **ret_sq)
438 {
439 ESL_SQ *sq = NULL;
440 unsigned long int sq_n;
441 unsigned long int start;
442 unsigned long int end;
443 unsigned long int C;
444 unsigned long int W;
445 unsigned long int L;
446 unsigned long int salloc;
447 int x; /* index for optional extra residue markups */
448 int do_digital = FALSE;
449 int status;
450
451 if (abc != NULL) do_digital = TRUE;
452
453 /* allocate sq */
454 ESL_ALLOC(sq, sizeof(ESL_SQ));
455
456 /* unpack allocation values */
457 status = MPI_Unpack(buf, n, pos, &(sq->nalloc), 1, MPI_INT, comm); if (status != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
458 status = MPI_Unpack(buf, n, pos, &(sq->aalloc), 1, MPI_INT, comm); if (status != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
459 status = MPI_Unpack(buf, n, pos, &(sq->dalloc), 1, MPI_INT, comm); if (status != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
460 status = MPI_Unpack(buf, n, pos, &(sq->srcalloc), 1, MPI_INT, comm); if (status != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
461 status = MPI_Unpack(buf, n, pos, &(sq->nxr), 1, MPI_INT, comm); if (status != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
462 status = MPI_Unpack(buf, n, pos, &(salloc), 1, MPI_UNSIGNED_LONG, comm); if (status != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
463
464 /* unpack coordenate info */
465 status = MPI_Unpack(buf, n, pos, &(sq_n), 1, MPI_UNSIGNED_LONG, comm); if (status != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
466 status = MPI_Unpack(buf, n, pos, &(start), 1, MPI_UNSIGNED_LONG, comm); if (status != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
467 status = MPI_Unpack(buf, n, pos, &(end), 1, MPI_UNSIGNED_LONG, comm); if (status != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
468 status = MPI_Unpack(buf, n, pos, &(C), 1, MPI_UNSIGNED_LONG, comm); if (status != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
469 status = MPI_Unpack(buf, n, pos, &(W), 1, MPI_UNSIGNED_LONG, comm); if (status != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
470 status = MPI_Unpack(buf, n, pos, &(L), 1, MPI_UNSIGNED_LONG, comm); if (status != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
471
472 sq->salloc = (int64_t) salloc;
473 sq->n = (int64_t) sq_n;
474 sq->start = (int64_t) start;
475 sq->end = (int64_t) end;
476 sq->C = (int64_t) C;
477 sq->W = (int64_t) W;
478 sq->L = (int64_t) L;
479
480 /* allocate strings */
481 sq->name = NULL; ESL_ALLOC(sq->name, sizeof(char) * sq->nalloc);
482 sq->acc = NULL; ESL_ALLOC(sq->acc, sizeof(char) * sq->aalloc);
483 sq->desc = NULL; ESL_ALLOC(sq->desc, sizeof(char) * sq->dalloc);
484 sq->source = NULL; ESL_ALLOC(sq->source, sizeof(char) * sq->srcalloc);
485 sq->seq = NULL; if (!do_digital) ESL_ALLOC(sq->seq, sizeof(char) * sq->salloc);
486 sq->dsq = NULL; if ( do_digital) ESL_ALLOC(sq->dsq, sizeof(ESL_DSQ) * sq->salloc);
487 sq->ss = NULL; /* ss and extra residue markups are optional - they will only be allocated if needed */
488 sq->xr_tag = NULL;
489 sq->xr = NULL;
490
491 /* unpack strings */
492 status = MPI_Unpack (buf, n, pos, sq->name, sq->nalloc, MPI_CHAR, comm); if (status != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
493 status = MPI_Unpack (buf, n, pos, sq->acc, sq->aalloc, MPI_CHAR, comm); if (status != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
494 status = MPI_Unpack (buf, n, pos, sq->desc, sq->dalloc, MPI_CHAR, comm); if (status != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
495 status = MPI_Unpack (buf, n, pos, sq->source, sq->srcalloc, MPI_CHAR, comm); if (status != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
496
497 if (do_digital) {
498 status = MPI_Unpack (buf, n, pos, sq->dsq, sq->n+2, MPI_UNSIGNED_CHAR, comm); if (status != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
499 }
500 else {
501 status = MPI_Unpack (buf, n, pos, sq->seq, sq->n+1, MPI_CHAR, comm); if (status != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
502 }
503
504 /* unpack the optional ss */
505 status = esl_mpi_UnpackOpt(buf, n, pos, (void **) &(sq->ss), NULL, MPI_CHAR, comm); if (status != eslOK) goto ERROR;
506
507 /* unpack the optional extra residue markups */
508 if (sq->nxr > 0) {
509 ESL_ALLOC(sq->xr, sizeof(char *) * sq->nxr);
510 ESL_ALLOC(sq->xr_tag, sizeof(char *) * sq->nxr);
511 for (x = 0; x < sq->nxr; x ++) {
512 sq->xr[x] = NULL;
513 sq->xr_tag[x] = NULL;
514 status = esl_mpi_UnpackOpt(buf, n, pos, (void **) &(sq->xr_tag[x]), NULL, MPI_CHAR, comm); if (status != eslOK) goto ERROR;
515 status = esl_mpi_UnpackOpt(buf, n, pos, (void **) &(sq->xr[x]), NULL, MPI_CHAR, comm); if (status != eslOK) goto ERROR;
516 }
517 }
518
519 /* set disk offset bookkeeping */
520 sq->doff = -1;
521 sq->roff = -1;
522 sq->eoff = -1;
523
524 /* pointer to alphabet */
525 if (do_digital) sq->abc = abc;
526
527 *ret_sq = sq;
528 return eslOK;
529
530 ERROR:
531 if (sq != NULL) esl_sq_Destroy(sq);
532 *ret_sq = NULL;
533 return status;
534 }
535
536
537 /* Function: esl_sq_MPIRecv()
538 * Synopsis: Receives an <ESL_SQ> as a work unit from an MPI sender.
539 * Incept: ER, Thu Jun 19 10:53:40 EDT 2008 [Janelia]
540 *
541 * Purpose: Receive a work unit that consists of a single <ESL_SQ>
542 * sent by MPI <source> (<0..nproc-1>, or
543 * <MPI_ANY_SOURCE>) tagged as <tag> for MPI communicator <comm>.
544 *
545 * Work units are prefixed by a status code. If the unit's
546 * code is <eslOK> and no errors are encountered, this
547 * routine will return <eslOK> and a non-<NULL> <*ret_esl_sq>.
548 * If the unit's code is <eslEOD> (a shutdown signal),
549 * this routine returns <eslEOD> and <*ret_esl_sq> is <NULL>.
550 *
551 * Caller provides a working buffer <*buf> of size
552 * <*nalloc> characters. These are passed by reference, so
553 * that <*buf> can be reallocated and <*nalloc> increased
554 * if necessary. As a special case, if <*buf> is <NULL> and
555 * <*nalloc> is 0, the buffer will be allocated
556 * appropriately, but the caller is still responsible for
557 * free'ing it.
558 *
559 * Returns: <eslOK> on success. <*ret_esl_sq> contains the received <ESL_SQ>;
560 * it is allocated here, and the caller is responsible for
561 * free'ing it. <*buf> may have been reallocated to a
562 * larger size, and <*nalloc> may have been increased.
563 *
564 * Throws: <eslEMEM> on allocation error, in which case <*ret_esl_sq> is
565 * <NULL>.
566 */
567 int
esl_sq_MPIRecv(int source,int tag,MPI_Comm comm,const ESL_ALPHABET * abc,char ** buf,int * nalloc,ESL_SQ ** ret_sq)568 esl_sq_MPIRecv(int source, int tag, MPI_Comm comm, const ESL_ALPHABET *abc, char **buf, int *nalloc, ESL_SQ **ret_sq)
569 {
570 int status;
571 int code;
572 int n;
573 int pos;
574 MPI_Status mpistatus;
575
576 /* Probe first, because we need to know if our buffer is big enough. */
577 MPI_Probe(source, tag, comm, &mpistatus);
578 MPI_Get_count(&mpistatus, MPI_PACKED, &n);
579
580 /* Make sure the buffer is allocated appropriately */
581 if (*buf == NULL || n > *nalloc) {
582 void *tmp;
583 ESL_RALLOC(*buf, tmp, sizeof(char) * n);
584 *nalloc = n;
585 }
586
587 /* Receive the packed work unit */
588 MPI_Recv(*buf, n, MPI_PACKED, source, tag, comm, &mpistatus);
589
590 /* Unpack it, looking at the status code prefix for EOD/EOK */
591 pos = 0;
592 if (MPI_Unpack(*buf, n, &pos, &code, 1, MPI_INT, comm) != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
593 if (code == eslEOD) { *ret_sq = NULL; return eslEOD; }
594
595 return esl_sq_MPIUnpack(abc, *buf, *nalloc, &pos, comm, ret_sq);
596
597 ERROR:
598 return status;
599 }
600 /*----------------- end, ESL_SQ communication -------------------*/
601
602
603 /*****************************************************************
604 *# 3. Communicating ESL_MSA (multiple sequence alignments).
605 *****************************************************************/
606
607 /* Function: esl_msa_MPISend()
608 * Synopsis: Send essential msa info as an MPI work unit.
609 *
610 * Purpose: Sends the essential elements of a multiple alignment <msa>
611 * as a work unit to MPI process <dest> (<dest> ranges from <0..nproc-1>),
612 * tagging the message with MPI tag <tag> for MPI communicator
613 * <comm>. The receiver uses <esl_msa_MPIRecv()> to receive the MSA.
614 *
615 * Work units are prefixed by a status code. If <msa> is
616 * <non-NULL>, the work unit is an <eslOK> code followed by
617 * the packed MSA. If <msa> is NULL, the work unit is an
618 * <eslEOD> code, which <esl_msa_hmm_MPIRecv()> knows how
619 * to interpret; this is typically used for an end-of-data
620 * signal to cleanly shut down worker processes.
621 *
622 * Only an essential subset of the elements in <msa> are
623 * transmitted, sufficient to do computationally intensive
624 * work on the <msa>. Most msa annotation is not
625 * transmitted, for example. Specifically, <name>, <nseq>,
626 * <alen>, <flags>, <wgt>, <ax> or <aseq>, <desc>, <acc>,
627 * <au>, <ss_cons>, <sa_cons>, <rf>, <cutoff>, and <cutset>
628 * are transmitted.
629 *
630 * In order to minimize alloc/free cycles, caller passes a
631 * pointer to a working buffer <*buf> of size <*nalloc>
632 * characters. If necessary (i.e. if <msa> is too big to
633 * fit), <*buf> will be reallocated and <*nalloc> increased
634 * to the new size. As a special case, if <*buf> is <NULL>
635 * and <*nalloc> is 0, the buffer will be allocated
636 * appropriately, but the caller is still responsible for
637 * free'ing it.
638 *
639 * Args: msa - msa to send
640 * dest - MPI destination (0..nproc-1)
641 * tag - MPI tag
642 * buf - pointer to a working buffer
643 * nalloc - current allocated size of <*buf>, in characters
644 *
645 * Returns: <eslOK> on success; <*buf> may have been reallocated and
646 * <*nalloc> may have been increased.
647 *
648 * Throws: <eslESYS> if an MPI call fails; <eslEMEM> if a malloc/realloc
649 * fails. In either case, <*buf> and <*nalloc> remain valid and useful
650 * memory (though the contents of <*buf> are undefined).
651 *
652 * Xref: J1/72.
653 */
654 int
esl_msa_MPISend(const ESL_MSA * msa,int dest,int tag,MPI_Comm comm,char ** buf,int * nalloc)655 esl_msa_MPISend(const ESL_MSA *msa, int dest, int tag, MPI_Comm comm, char **buf, int *nalloc)
656 {
657 int status;
658 int code;
659 int sz, n, position;
660
661 /* First, figure out the size of the MSA */
662 if (MPI_Pack_size(1, MPI_INT, comm, &n) != 0) ESL_EXCEPTION(eslESYS, "mpi pack size failed");
663 if (msa != NULL) {
664 if ((status = esl_msa_MPIPackSize(msa, comm, &sz)) != eslOK) return status;
665 n += sz;
666 }
667 ESL_DPRINTF2(("esl_msa_MPISend(): msa has size %d\n", n));
668
669 /* Make sure the buffer is allocated appropriately */
670 if (*buf == NULL || n > *nalloc) {
671 void *tmp;
672 ESL_RALLOC(*buf, tmp, sizeof(char) * n);
673 *nalloc = n;
674 }
675 ESL_DPRINTF2(("esl_msa_MPISend(): buffer is ready\n"));
676
677 /* Pack the status code and MSA into the buffer */
678 position = 0;
679 code = (msa == NULL) ? eslEOD : eslOK;
680 if (MPI_Pack(&code, 1, MPI_INT, *buf, n, &position, comm) != 0) ESL_EXCEPTION(eslESYS, "mpi pack failed");
681 if (msa != NULL) {
682 if ((status = esl_msa_MPIPack(msa, *buf, n, &position, comm)) != eslOK) return status;
683 }
684 ESL_DPRINTF2(("esl_msa_MPISend(): msa is packed into %d bytes\n", position));
685
686 /* Send the packed profile to destination */
687 if (MPI_Send(*buf, n, MPI_PACKED, dest, tag, comm) != 0) ESL_EXCEPTION(eslESYS, "mpi send failed");
688 ESL_DPRINTF2(("esl_msa_MPISend(): msa is sent.\n"));
689 return eslOK;
690
691 ERROR:
692 return status;
693 }
694
695
696
697 /* Function: esl_msa_MPIPackSize()
698 * Synopsis: Calculates number of bytes needed to pack an MSA.
699 * Incept: SRE, Wed Jun 6 11:36:22 2007 [Janelia]
700 *
701 * Purpose: Calculate an upper bound on the number of bytes
702 * that <esl_msa_MPIPack()> will need to pack an
703 * essential subset of the data in MSA <msa>
704 * in a packed MPI message in communicator <comm>;
705 * return that number of bytes in <*ret_n>.
706 *
707 * Caller will generally use this result to determine how
708 * to allocate a buffer before starting to pack into it.
709 *
710 * If <msa> is <NULL> (which can happen, if <msa> is
711 * optional in the caller), size <*ret_n> is set to 0.
712 *
713 * Returns: <eslOK> on success, and <*ret_n> contains the answer.
714 *
715 * Throws: <eslESYS> if an MPI call fails, and <*ret_n> is set to 0.
716 *
717 * Xref: J1/78-79.
718 *
719 * Note: The sizing calls here need to stay matched up with
720 * the calls in <esl_msa_MPIPack()>.
721 */
722 int
esl_msa_MPIPackSize(const ESL_MSA * msa,MPI_Comm comm,int * ret_n)723 esl_msa_MPIPackSize(const ESL_MSA *msa, MPI_Comm comm, int *ret_n)
724 {
725 int status;
726 int sz;
727 int n = 0;
728 int i;
729
730 if (msa == NULL) { *ret_n = 0; return eslOK; }
731
732 status = MPI_Pack_size ( 1, MPI_INT, comm, &sz); n += 3*sz; if (status != 0) ESL_XEXCEPTION(eslESYS, "pack size failed");
733 status = MPI_Pack_size ( msa->nseq, MPI_DOUBLE, comm, &sz); n += sz; if (status != 0) ESL_XEXCEPTION(eslESYS, "pack size failed");
734 status = esl_mpi_PackOptSize(msa->name, -1, MPI_CHAR, comm, &sz); n += sz; if (status != eslOK) goto ERROR;
735 status = esl_mpi_PackOptSize(msa->desc, -1, MPI_CHAR, comm, &sz); n += sz; if (status != eslOK) goto ERROR;
736 status = esl_mpi_PackOptSize(msa->acc, -1, MPI_CHAR, comm, &sz); n += sz; if (status != eslOK) goto ERROR;
737 status = esl_mpi_PackOptSize(msa->au, -1, MPI_CHAR, comm, &sz); n += sz; if (status != eslOK) goto ERROR;
738 status = esl_mpi_PackOptSize(msa->ss_cons, msa->alen+1, MPI_CHAR, comm, &sz); n += sz; if (status != eslOK) goto ERROR;
739 status = esl_mpi_PackOptSize(msa->sa_cons, msa->alen+1, MPI_CHAR, comm, &sz); n += sz; if (status != eslOK) goto ERROR;
740 status = esl_mpi_PackOptSize(msa->pp_cons, msa->alen+1, MPI_CHAR, comm, &sz); n += sz; if (status != eslOK) goto ERROR;
741 status = esl_mpi_PackOptSize(msa->rf, msa->alen+1, MPI_CHAR, comm, &sz); n += sz; if (status != eslOK) goto ERROR;
742 status = esl_mpi_PackOptSize(msa->mm, msa->alen+1, MPI_CHAR, comm, &sz); n += sz; if (status != eslOK) goto ERROR;
743 status = MPI_Pack_size ( eslMSA_NCUTS, MPI_FLOAT, comm, &sz); n += sz; if (status != 0) ESL_XEXCEPTION(eslESYS, "pack size failed");
744 status = MPI_Pack_size ( eslMSA_NCUTS, MPI_INT, comm, &sz); n += sz; if (status != 0) ESL_XEXCEPTION(eslESYS, "pack size failed");
745
746 /* alignment, digital or text: */
747 if (msa->ax != NULL) {
748 if ((status = MPI_Pack_size ( msa->alen+2, MPI_UNSIGNED_CHAR, comm, &sz)) != 0) ESL_XEXCEPTION(eslESYS, "pack size failed");
749 n += sz*msa->nseq;
750 } else {
751 if ((status = MPI_Pack_size ( msa->alen+1, MPI_CHAR, comm, &sz)) != 0) ESL_XEXCEPTION(eslESYS, "pack size failed");
752 n += sz*msa->nseq;
753 }
754
755 /* seqnames: */
756 for (i = 0; i < msa->nseq; i++) {
757 if ((status = esl_mpi_PackOptSize(msa->sqname[i], -1, MPI_CHAR, comm, &sz)) != eslOK) goto ERROR;
758 n += sz;
759 }
760
761 *ret_n = n;
762 return eslOK;
763
764 ERROR:
765 *ret_n = 0;
766 return status;
767 }
768
769 /* Function: esl_msa_MPIPack()
770 * Synopsis: Packs an MSA into MPI buffer.
771 * Incept: SRE, Wed Jun 6 13:17:45 2007 [Janelia]
772 *
773 * Purpose: Packs essential subset of data in MSA <msa> into an MPI
774 * packed message buffer <buf> of length <n> bytes,
775 * starting at byte position <*position>, for MPI
776 * communicator <comm>.
777 *
778 * If <msa> is <NULL> (which can happen, if <msa> is being
779 * treated as optional in the caller), does nothing, and
780 * just return <eslOK>.
781 *
782 * Returns: <eslOK> on success; <buf> now contains the
783 * packed <msa>, and <*position> is set to the byte
784 * immediately following the last byte of the MSA
785 * in <buf>.
786 *
787 * Throws: <eslESYS> if an MPI call fails; or <eslEMEM> if the
788 * buffer's length <n> is overflowed by trying to pack
789 * <msa> into <buf>. In either case, the state of
790 * <buf> and <*position> is undefined, and both should
791 * be considered to be corrupted.
792 *
793 * Xref: J1/78-79.
794 */
795 int
esl_msa_MPIPack(const ESL_MSA * msa,char * buf,int n,int * position,MPI_Comm comm)796 esl_msa_MPIPack(const ESL_MSA *msa, char *buf, int n, int *position, MPI_Comm comm)
797 {
798 int status;
799 int i;
800
801 ESL_DPRINTF2(("esl_msa_MPIPack(): ready.\n"));
802
803 if (msa == NULL) return eslOK;
804
805 status = MPI_Pack ((void *) &(msa->nseq), 1, MPI_INT, buf, n, position, comm); if (status != 0) ESL_EXCEPTION(eslESYS, "pack failed");
806 status = MPI_Pack ((void *) &(msa->alen), 1, MPI_INT, buf, n, position, comm); if (status != 0) ESL_EXCEPTION(eslESYS, "pack failed");
807 status = MPI_Pack ((void *) &(msa->flags), 1, MPI_INT, buf, n, position, comm); if (status != 0) ESL_EXCEPTION(eslESYS, "pack failed");
808 status = MPI_Pack (msa->wgt, msa->nseq, MPI_DOUBLE, buf, n, position, comm); if (status != 0) ESL_EXCEPTION(eslESYS, "pack failed");
809 status = esl_mpi_PackOpt(msa->name, -1, MPI_CHAR, buf, n, position, comm); if (status != eslOK) return status;
810 status = esl_mpi_PackOpt(msa->desc, -1, MPI_CHAR, buf, n, position, comm); if (status != eslOK) return status;
811 status = esl_mpi_PackOpt(msa->acc, -1, MPI_CHAR, buf, n, position, comm); if (status != eslOK) return status;
812 status = esl_mpi_PackOpt(msa->au, -1, MPI_CHAR, buf, n, position, comm); if (status != eslOK) return status;
813 status = esl_mpi_PackOpt(msa->ss_cons, msa->alen+1, MPI_CHAR, buf, n, position, comm); if (status != eslOK) return status;
814 status = esl_mpi_PackOpt(msa->sa_cons, msa->alen+1, MPI_CHAR, buf, n, position, comm); if (status != eslOK) return status;
815 status = esl_mpi_PackOpt(msa->pp_cons, msa->alen+1, MPI_CHAR, buf, n, position, comm); if (status != eslOK) return status;
816 status = esl_mpi_PackOpt(msa->rf, msa->alen+1, MPI_CHAR, buf, n, position, comm); if (status != eslOK) return status;
817 status = esl_mpi_PackOpt(msa->mm, msa->alen+1, MPI_CHAR, buf, n, position, comm); if (status != eslOK) return status;
818 status = MPI_Pack ((void *) msa->cutoff, eslMSA_NCUTS, MPI_FLOAT, buf, n, position, comm); if (status != 0) ESL_EXCEPTION(eslESYS, "pack failed");
819 status = MPI_Pack ((void *) msa->cutset, eslMSA_NCUTS, MPI_INT, buf, n, position, comm); if (status != 0) ESL_EXCEPTION(eslESYS, "pack failed");
820
821 for (i = 0; i < msa->nseq; i++) {
822 status = esl_mpi_PackOpt(msa->sqname[i], -1, MPI_CHAR, buf, n, position, comm); if (status != eslOK) return status;
823 if (msa->flags & eslMSA_DIGITAL) {
824 if ((status = MPI_Pack (msa->ax[i], msa->alen+2, MPI_UNSIGNED_CHAR, buf, n, position, comm)) != 0) ESL_EXCEPTION(eslESYS, "pack failed");
825 } else {
826 if ((status = MPI_Pack (msa->aseq[i], msa->alen+1, MPI_CHAR, buf, n, position, comm)) != 0) ESL_EXCEPTION(eslESYS, "pack failed");
827 }
828 }
829 ESL_DPRINTF2(("esl_msa_MPIPack(): done. Packed %d bytes into buffer of size %d\n", *position, n));
830
831 if (*position > n) ESL_EXCEPTION(eslEMEM, "buffer overflow");
832 return eslOK;
833 }
834
835 /* Function: esl_msa_MPIUnpack()
836 * Synopsis: Unpacks an MSA from an MPI buffer.
837 * Incept: SRE, Wed Jun 6 15:49:11 2007 [Janelia]
838 *
839 * Purpose: Unpack a newly allocated MSA from MPI packed buffer
840 * <buf>, starting from position <*pos>, where the total length
841 * of the buffer in bytes is <n>.
842 *
843 * MSAs are usually transmitted in digital mode. In digital
844 * mode, caller must provide the alphabet <abc> for this
845 * MSA. (Thus the caller already know it before the MSA
846 * arrives, by an appropriate initialization.) If MSAs are
847 * being transmitted in text mode, <abc> is ignored; caller
848 * may pass <NULL> for it.
849 *
850 * Returns: <eslOK> on success. <*pos> is updated to the position of
851 * the next element in <buf> to unpack (if any). <*ret_msa>
852 * contains a newly allocated MSA, which the caller is
853 * responsible for free'ing.
854 *
855 * Throws: <eslESYS> on an MPI call failure. <eslEMEM> on allocation failure.
856 * In either case, <*ret_msa> is <NULL>, and the state of <buf>
857 * and <*pos> is undefined and should be considered to be corrupted.
858 *
859 * Xref: J1/78-79
860 */
861 int
esl_msa_MPIUnpack(const ESL_ALPHABET * abc,char * buf,int n,int * pos,MPI_Comm comm,ESL_MSA ** ret_msa)862 esl_msa_MPIUnpack(const ESL_ALPHABET *abc, char *buf, int n, int *pos, MPI_Comm comm, ESL_MSA **ret_msa)
863 {
864 int status;
865 ESL_MSA *msa = NULL;
866 int nseq, alen, flags;
867 int i;
868
869 status = MPI_Unpack (buf, n, pos, &nseq, 1, MPI_INT, comm); if (status != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
870 status = MPI_Unpack (buf, n, pos, &alen, 1, MPI_INT, comm); if (status != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
871 status = MPI_Unpack (buf, n, pos, &flags, 1, MPI_INT, comm); if (status != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
872
873 if (flags & eslMSA_DIGITAL) {
874 if ((msa = esl_msa_CreateDigital(abc, nseq, alen)) == NULL) { status = eslEMEM; goto ERROR; }
875 } else {
876 if ((msa = esl_msa_Create(nseq, alen)) == NULL) { status = eslEMEM; goto ERROR; }
877 }
878 msa->flags = flags;
879
880 status = MPI_Unpack (buf, n, pos, msa->wgt, nseq, MPI_DOUBLE, comm); if (status != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
881 status = esl_mpi_UnpackOpt(buf, n, pos, (void **) &(msa->name), NULL, MPI_CHAR, comm); if (status != eslOK) goto ERROR;
882 status = esl_mpi_UnpackOpt(buf, n, pos, (void **) &(msa->desc), NULL, MPI_CHAR, comm); if (status != eslOK) goto ERROR;
883 status = esl_mpi_UnpackOpt(buf, n, pos, (void **) &(msa->acc), NULL, MPI_CHAR, comm); if (status != eslOK) goto ERROR;
884 status = esl_mpi_UnpackOpt(buf, n, pos, (void **) &(msa->au), NULL, MPI_CHAR, comm); if (status != eslOK) goto ERROR;
885 status = esl_mpi_UnpackOpt(buf, n, pos, (void **) &(msa->ss_cons), NULL, MPI_CHAR, comm); if (status != eslOK) goto ERROR;
886 status = esl_mpi_UnpackOpt(buf, n, pos, (void **) &(msa->sa_cons), NULL, MPI_CHAR, comm); if (status != eslOK) goto ERROR;
887 status = esl_mpi_UnpackOpt(buf, n, pos, (void **) &(msa->pp_cons), NULL, MPI_CHAR, comm); if (status != eslOK) goto ERROR;
888 status = esl_mpi_UnpackOpt(buf, n, pos, (void **) &(msa->rf) , NULL, MPI_CHAR, comm); if (status != eslOK) goto ERROR;
889 status = esl_mpi_UnpackOpt(buf, n, pos, (void **) &(msa->mm) , NULL, MPI_CHAR, comm); if (status != eslOK) goto ERROR;
890 status = MPI_Unpack (buf, n, pos, msa->cutoff, eslMSA_NCUTS, MPI_FLOAT, comm); if (status != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
891 status = MPI_Unpack (buf, n, pos, msa->cutset, eslMSA_NCUTS, MPI_INT, comm); if (status != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
892
893 for (i = 0; i < msa->nseq; i++) {
894 status = esl_mpi_UnpackOpt(buf, n, pos, (void **) &(msa->sqname[i]), NULL, MPI_CHAR, comm); if (status != eslOK) goto ERROR;
895 if (msa->flags & eslMSA_DIGITAL) {
896 if ((status = MPI_Unpack (buf, n, pos, msa->ax[i], msa->alen+2, MPI_UNSIGNED_CHAR, comm)) != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
897 } else {
898 if ((status = MPI_Unpack (buf, n, pos, msa->aseq[i], msa->alen+1, MPI_CHAR, comm)) != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
899 }
900 }
901 *ret_msa = msa;
902 return eslOK;
903
904 ERROR:
905 if (msa != NULL) esl_msa_Destroy(msa);
906 *ret_msa = NULL;
907 return status;
908 }
909
910
911
912 /* Function: esl_msa_MPIRecv()
913 * Synopsis: Receive essential MSA info as a work unit from MPI sender.
914 * Incept: SRE, Fri Jun 1 11:01:04 2007 [Janelia]
915 *
916 * Purpose: Receives a work unit that consists of a single MSA from <source> (<0..nproc-1>, or
917 * <MPI_ANY_SOURCE>) tagged as <tag> from communicator <comm>.
918 *
919 * Work units are prefixed by a status code. If the unit's
920 * code is <eslOK> and no errors are encountered, this
921 * routine will return <eslOK> and a non-<NULL> <*ret_msa>.
922 * If the unit's code is <eslEOD> (a shutdown signal),
923 * this routine returns <eslEOD> and <*ret_msa> is <NULL>.
924 *
925 * MSAs are transmitted in digital mode. Caller must know and
926 * provide the alphabet <abc> for this MSA.
927 *
928 * To minimize alloc/free cycles in this routine, caller
929 * passes a pointer to a buffer <*buf> of size <*nalloc>
930 * characters. These are passed by reference, because when
931 * necessary, <*buf> will be reallocated and <*nalloc>
932 * increased to the new size. As a special case, if <*buf>
933 * is <NULL> and <*nalloc> is 0, the buffer will be
934 * allocated appropriately, but the caller is still
935 * responsible for free'ing it.
936 *
937 * If the packed MSA is an end-of-data signal, return
938 * <eslEOD>, and <*ret_msa> is <NULL>.
939 *
940 * Returns: <eslOK> on success. <*ret_msa> contains the new MSA; it
941 * is allocated here, and the caller is responsible for
942 * free'ing it. <*buf> may have been reallocated to a
943 * larger size, and <*nalloc> may have been increased.
944 *
945 *
946 * Throws: <eslESYS> if an MPI call fails; <eslEMEM> if an allocation fails.
947 * In either case, <*ret_msa> is NULL, and the <buf> and its size
948 * <*nalloc> remain valid.
949 * Xref: J1/72.
950 */
951 int
esl_msa_MPIRecv(int source,int tag,MPI_Comm comm,const ESL_ALPHABET * abc,char ** buf,int * nalloc,ESL_MSA ** ret_msa)952 esl_msa_MPIRecv(int source, int tag, MPI_Comm comm, const ESL_ALPHABET *abc, char **buf, int *nalloc, ESL_MSA **ret_msa)
953 {
954 int status, code;
955 ESL_MSA *msa = NULL;
956 int n;
957 int pos;
958 MPI_Status mpistatus;
959
960 /* Probe first, because we need to know if our buffer is big enough. */
961 if (MPI_Probe(source, tag, comm, &mpistatus) != 0) ESL_XEXCEPTION(eslESYS, "mpi probe failed");
962 if (MPI_Get_count(&mpistatus, MPI_PACKED, &n) != 0) ESL_XEXCEPTION(eslESYS, "mpi get count failed");
963
964 /* Make sure the buffer is allocated appropriately */
965 if (*buf == NULL || n > *nalloc) {
966 void *tmp;
967 ESL_RALLOC(*buf, tmp, sizeof(char) * n);
968 *nalloc = n;
969 }
970
971 /* Receive the packed work unit */
972 if (MPI_Recv(*buf, n, MPI_PACKED, source, tag, comm, &mpistatus) != 0) ESL_XEXCEPTION(eslESYS, "mpi recv failed");
973
974 /* Unpack it - where the first integer is a status code, OK or EOD */
975 pos = 0;
976 if (MPI_Unpack (*buf, n, &pos, &code, 1, MPI_INT, comm) != 0) ESL_XEXCEPTION(eslESYS, "mpi unpack failed");
977 if (code == eslEOD) { status = eslEOD; goto ERROR; }
978
979 return esl_msa_MPIUnpack(abc, *buf, *nalloc, &pos, comm, ret_msa);
980
981 ERROR:
982 if (msa != NULL) esl_msa_Destroy(msa);
983 *ret_msa = NULL;
984 return status;
985 }
986 /*-------------------------- end, ESL_MSA -----------------------*/
987
988
989 /*****************************************************************
990 *# 4. Communicating ESL_STOPWATCH (process timing)
991 *****************************************************************/
992
993 /* Function: esl_stopwatch_MPIReduce()
994 * Synopsis: Collect total parallel process time into master watch.
995 * Incept: SRE, Thu Jun 14 13:27:20 2007 [Janelia]
996 *
997 * Purpose: Collect all user/sys times from stopped stopwatch <w> from
998 * all MPI processes, and sum them into the watch on the
999 * master process of rank <root>, for MPI communicator
1000 * <comm>. A subsequent <esl_stopwatch_Display()> will
1001 * then show total user/sys times, not just the master's
1002 * usage.
1003 *
1004 * This routine needs to be called synchronously on all
1005 * processes; it does a collective communication using
1006 * <MPI_Reduce()>.
1007 *
1008 * Returns: <eslOK> on success.
1009 *
1010 * Throws: <eslESYS> on MPI call failure.
1011 */
1012 int
esl_stopwatch_MPIReduce(ESL_STOPWATCH * w,int root,MPI_Comm comm)1013 esl_stopwatch_MPIReduce(ESL_STOPWATCH *w, int root, MPI_Comm comm)
1014 {
1015 double user_total;
1016 double sys_total;
1017
1018 if (MPI_Reduce(&(w->user), &user_total, 1, MPI_DOUBLE, MPI_SUM, root, comm) != 0) ESL_EXCEPTION(eslESYS, "mpi reduce failed");
1019 if (MPI_Reduce(&(w->sys), &sys_total, 1, MPI_DOUBLE, MPI_SUM, root, comm) != 0) ESL_EXCEPTION(eslESYS, "mpi reduce failed");
1020
1021 w->user = user_total;
1022 w->sys = sys_total;
1023 return eslOK;
1024 }
1025
1026
1027 /*****************************************************************
1028 * 5. Unit tests.
1029 *****************************************************************/
1030 #ifdef eslMPI_TESTDRIVE
1031
1032 /* Each MPI unit test for communications routines follows a similar
1033 * pattern:
1034 * - workers and master generate identical objects, possibly using
1035 * the same RNG
1036 * - each worker sends object to master
1037 * - master receives object, compares it to known object, and fails
1038 * if they aren't the same.
1039 *
1040 * This way, master is doing the failing and error output.
1041 */
1042 static void
utest_MSASendRecv(ESL_ALPHABET * abc,ESL_MSA * msa,int my_rank,int nproc)1043 utest_MSASendRecv(ESL_ALPHABET *abc, ESL_MSA *msa, int my_rank, int nproc)
1044 {
1045 ESL_MSA *xmsa = NULL;
1046 char *wbuf = NULL;
1047 int wn = 0;
1048 int i;
1049
1050 if (my_rank == 0)
1051 {
1052 for (i = 1; i < nproc; i++)
1053 {
1054 ESL_DPRINTF1(("Master: receiving test msa\n"));
1055 esl_msa_MPIRecv(MPI_ANY_SOURCE, 0, MPI_COMM_WORLD, abc, &wbuf, &wn, &xmsa);
1056 ESL_DPRINTF1(("Master: test msa received\n"));
1057
1058 if ((esl_msa_CompareMandatory(msa, xmsa) != eslOK) ||
1059 (esl_CCompare(msa->name, xmsa->name) != eslOK) ||
1060 (esl_CCompare(msa->desc, xmsa->desc) != eslOK) ||
1061 (esl_CCompare(msa->acc, xmsa->acc) != eslOK) ||
1062 (esl_CCompare(msa->au, xmsa->au) != eslOK) ||
1063 (esl_CCompare(msa->ss_cons, xmsa->ss_cons) != eslOK) ||
1064 (esl_CCompare(msa->sa_cons, xmsa->sa_cons) != eslOK) ||
1065 (esl_CCompare(msa->pp_cons, xmsa->pp_cons) != eslOK) ||
1066 (esl_CCompare(msa->rf, xmsa->rf) != eslOK) ||
1067 (esl_CCompare(msa->mm, xmsa->mm) != eslOK))
1068 esl_fatal("Received MSA is not identical to what was sent.");
1069
1070 esl_msa_Destroy(xmsa);
1071 }
1072 }
1073 else
1074 {
1075 ESL_DPRINTF1(("Worker %d: sending test msa\n", my_rank));
1076 esl_msa_MPISend(msa, 0, 0, MPI_COMM_WORLD, &wbuf, &wn);
1077 ESL_DPRINTF1(("Worker %d: test msa sent\n", my_rank));
1078 }
1079
1080 free(wbuf);
1081 return;
1082 }
1083
1084 static void
utest_MSAPackUnpack(ESL_ALPHABET * abc,ESL_MSA * msa,int my_rank,int nproc)1085 utest_MSAPackUnpack(ESL_ALPHABET *abc, ESL_MSA *msa, int my_rank, int nproc)
1086 {
1087 ESL_MSA *xmsa = NULL;
1088 char *wbuf = NULL;
1089 int wn = 0;
1090 int pin, pout;
1091
1092 if (my_rank != 0) return; /* only execute this utest on the master. */
1093
1094 esl_msa_MPIPackSize(msa, MPI_COMM_WORLD, &wn);
1095 wbuf = malloc(sizeof(char) * wn);
1096
1097 pin = 0;
1098 esl_msa_MPIPack(msa, wbuf, wn, &pin, MPI_COMM_WORLD);
1099
1100 pout = 0;
1101 esl_msa_MPIUnpack(abc, wbuf, wn, &pout, MPI_COMM_WORLD, &xmsa);
1102
1103 if (pin != pout) esl_fatal("unit test failed: packed and unpacked sizes differ");
1104 if ((esl_msa_CompareMandatory(msa, xmsa) != eslOK) ||
1105 (esl_CCompare(msa->name, xmsa->name) != eslOK) ||
1106 (esl_CCompare(msa->desc, xmsa->desc) != eslOK) ||
1107 (esl_CCompare(msa->acc, xmsa->acc) != eslOK) ||
1108 (esl_CCompare(msa->au, xmsa->au) != eslOK) ||
1109 (esl_CCompare(msa->ss_cons, xmsa->ss_cons) != eslOK) ||
1110 (esl_CCompare(msa->sa_cons, xmsa->sa_cons) != eslOK) ||
1111 (esl_CCompare(msa->pp_cons, xmsa->pp_cons) != eslOK) ||
1112 (esl_CCompare(msa->rf, xmsa->rf) != eslOK) ||
1113 (esl_CCompare(msa->mm, xmsa->mm) != eslOK))
1114 esl_fatal("Unpacked MSA is not identical to what was packed.");
1115
1116 esl_msa_Destroy(xmsa);
1117 free(wbuf);
1118 return;
1119 }
1120
1121
1122
1123 #endif /*eslMPI_TESTDRIVE*/
1124 /*----------------------- end, unit tests -----------------------*/
1125
1126
1127 /*****************************************************************
1128 * 6. Test driver.
1129 *****************************************************************/
1130 #ifdef eslMPI_TESTDRIVE
1131 /* mpicc -o mpi_utest -g -Wall -I. -L. -DeslMPI_TESTDRIVE esl_mpi.c -leasel -lm
1132 * In an MPI environment:
1133 * mpirun C ./mpi_utest
1134 */
1135 #include "esl_getopts.h"
1136
1137 static ESL_OPTIONS options[] = {
1138 /* name type default env range toggles reqs incomp help docgroup*/
1139 { "-h", eslARG_NONE, FALSE, NULL, NULL, NULL, NULL, NULL, "show brief help on version and usage", 0 },
1140 { "-m", eslARG_INFILE, FALSE, NULL, NULL, NULL, NULL, NULL, "read test MSA from file <f>", 0 },
1141 { "-x", eslARG_NONE, FALSE, NULL, NULL, NULL, NULL, NULL, "test digital mode MSA communication", 0 },
1142 { "--stall", eslARG_NONE, FALSE, NULL, NULL, NULL, NULL, NULL, "arrest after start: for debugging MPI under gdb", 0 },
1143 { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 },
1144 };
1145 static char usage[] = "[-options]";
1146 static char banner[] = "test driver for the Easel mpi module";
1147
1148 int
main(int argc,char ** argv)1149 main(int argc, char **argv)
1150 {
1151 ESL_GETOPTS *go = esl_getopts_CreateDefaultApp(options, 0, argc, argv, banner, usage);
1152 ESL_ALPHABET *abc = NULL;
1153 ESL_MSAFILE *afp = NULL;
1154 ESL_MSA *msa = NULL;
1155 int do_stall = FALSE;
1156 int my_rank;
1157 int nproc;
1158
1159 /* For debugging: stall until GDB can be attached */
1160 if (esl_opt_GetBoolean(go, "--stall")) do_stall = TRUE;
1161 while (do_stall);
1162
1163 /* Get a test MSA and alphabet. */
1164 if (esl_opt_GetString(go, "-m") != NULL)
1165 {
1166 if (esl_msafile_Open(&abc, esl_opt_GetString(go, "-m"), eslMSAFILE_UNKNOWN, NULL, &afp) != eslOK) esl_fatal("msa file open failed");
1167 if (esl_msafile_Read(afp, &msa) != eslOK) esl_fatal("msa read failed");
1168 esl_msafile_Close(afp);
1169 }
1170 else
1171 {
1172 abc = esl_alphabet_Create(eslAMINO);
1173 if ( (esl_msafile_OpenMem(&abc,
1174 "# STOCKHOLM 1.0\n\nNIFE_CLOPA GYVGS\nNIFD_AZOVI GFDGF\nNIFD_BRAJA GYDGF\nNIFK_ANASP GYQGG\n//\n",
1175 -1, eslMSAFILE_STOCKHOLM, NULL, &afp)) != eslOK)
1176 esl_fatal("msa creation failed");
1177 if (esl_msafile_Read(afp, &msa) != eslOK) esl_fatal("msa read failed");
1178 esl_msafile_Close(afp);
1179 }
1180
1181
1182 MPI_Init(&argc, &argv);
1183 MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
1184 MPI_Comm_size(MPI_COMM_WORLD, &nproc);
1185
1186 utest_MSAPackUnpack(abc, msa, my_rank, nproc);
1187 utest_MSASendRecv (abc, msa, my_rank, nproc);
1188
1189 MPI_Finalize();
1190
1191 esl_alphabet_Destroy(abc);
1192 esl_msa_Destroy(msa);
1193 return eslOK;
1194 }
1195
1196 #endif /*eslMPI_TESTDRIVE*/
1197 /*---------------------- end, test driver -----------------------*/
1198
1199
1200
1201
1202 /*****************************************************************
1203 * 7. Example.
1204 *****************************************************************/
1205
1206
1207
1208
1209
1210 /*------------------------ end, example -------------------------*/
1211
1212
1213
1214
1215
1216 #else /*!HAVE_MPI*/
1217
1218 /* If we don't have MPI compiled in, provide some nothingness to:
1219 * a. prevent Mac OS/X ranlib from bitching about .o file that "has no symbols"
1220 * b. prevent compiler from bitching about "empty compilation unit"
1221 * c. automatically pass the automated tests.
1222 */
1223 #include "easel.h"
1224
esl_mpi_DoAbsolutelyNothing(void)1225 void esl_mpi_DoAbsolutelyNothing(void) { return; }
1226 #if defined eslMPI_TESTDRIVE || eslMPI_EXAMPLE || eslMPI_BENCHMARK
main(void)1227 int main(void) { return 0; }
1228 #endif
1229
1230 #endif /*HAVE_MPI*/
1231
1232