1 // cslmpi.cpp Copyright (C) 1997-2021
2
3 //
4 // Interfaces for mpi from CSL. The bulk of this code was written by
5 // M O Seymour (1997-98) who has released it for inclusion as part of
6 // this Lisp system.
7 //
8
9 // I have left this using malloc/free rather than new/delete since it is
10 // self-contained and the memory allocated does not get passed out to
11 // other parts of CSL.
12
13 /**************************************************************************
14 * Copyright (C) 2021, Codemist. A C Norman *
15 * *
16 * Redistribution and use in source and binary forms, with or without *
17 * modification, are permitted provided that the following conditions are *
18 * met: *
19 * *
20 * * Redistributions of source code must retain the relevant *
21 * copyright notice, this list of conditions and the following *
22 * disclaimer. *
23 * * Redistributions in binary form must reproduce the above *
24 * copyright notice, this list of conditions and the following *
25 * disclaimer in the documentation and/or other materials provided *
26 * with the distribution. *
27 * *
28 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS *
29 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT *
30 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS *
31 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE *
32 * COPYRIGHT OWNERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, *
33 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, *
34 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS *
35 * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND *
36 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR *
37 * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF *
38 * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH *
39 * DAMAGE. *
40 *************************************************************************/
41
42
43 // $Id: cslmpi.cpp 5609 2021-01-23 22:02:30Z arthurcnorman $
44
45 #include "headers.h"
46
47
48 // Note VERY WELL....
49 // I have not even compiled this code at all recently, though I made
50 // significant changed to it to adjust for calling conventions. Anybody
51 // trying to use it should expect to need to make adjustments and
52 // corrections. ACN August 2017.
53
54
55 #ifdef USE_MPI
56
57 #include "mpipack.c"
58
get_fix_arg(LispObject & v,const char * fun_name)59 inline LispObject get_fix_arg(LispObject& v, const char *fun_name)
60 { if (!is_fixnum(v)) return aerror1(fun_name, v)
61 v = int_of_fixnum(v);
62 }
63
64
65 /************************ Environmental functions *******************/
66
67 // Returns process rank
68 // (mpi_comm_rank comm)
69 //
70 // For now, I assume that comm will fit into a fixnum.
71 // This appears to be the case with MPICH (values in the hundreds),
72 // but assumptions like this should not be made.
73 //
Lmpi_comm_rank(LispObject,LispObject comm)74 static LispObject Lmpi_comm_rank(LispObject, LispObject comm)
75 { int rank;
76 static char fun_name[] = "mpi_comm_rank";
77 if (!is_fixnum(comm)) return aerror1(fun_name, v)
78 MPI_Comm_rank(int_of_fixnum(comm),&rank);
79 return onevalue(fixnum_of_int(rank));
80 }
81
82 // returns size of communicator
83 // (mpi_comm_size comm)
84 //
85 // Same assumption about comm.
Lmpi_comm_size(LispObject,LispObject comm)86 static LispObject Lmpi_comm_size(LispObject, LispObject comm)
87 { int size;
88 static char fun_name[] = "mpi_comm_size";
89 if (!is_fixnum(comm)) return aerror1(fun_name, v);
90 MPI_Comm_size(int_of_fixnum(comm),&size);
91 return onevalue(fixnum_of_int(size));
92 }
93
94 /********************** Blocking point-to-point functions *************/
95
96 // Standard blocking send
97 // (mpi_send message dest tag comm)
98 // returns nil.
99 //
100 // Same assumption about comm.
Lmpi_send(LispObject env,LispObject message,LispObject dest,LispObject tag,LispObject comm)101 static LispObject Lmpi_send(LispObject env, LispObject message,
102 LispObject dest,
103 LispObject tag, LispObject comm)
104 { static char fun_name[] = "mpi_send";
105
106 get_fix_arg(dest, fun_name);
107 get_fix_arg(tag, fun_name);
108 if (cdr(comm) != nil) return aerror("too many args for mpi-send");
109 comm = car(comm);
110 get_fix_arg(comm, fun_name);
111
112 pack_object(message);
113 MPI_Send(mpi_pack_buffer, mpi_pack_position, MPI_PACKED,
114 dest, tag, comm);
115 std::free(mpi_pack_buffer);
116 return onevalue(nil);
117 }
118
119 // Standard blocking receive
120 // (mpi_recv source tag comm)
121 // returns (message (source tag error)).
122 //
Lmpi_recv(LispObject,LispObject source,LispObject tag,LispObject comm)123 static LispObject Lmpi_recv(LispObject, LispObject source,
124 LispObject tag, LispObject comm)
125 { static char fun_name[] = "mpi_recv";
126
127 MPI_Status status;
128 LispObject Lstatus;
129 std::va_list a;
130
131 get_fix_arg(source, fun_name);
132 get_fix_arg(tag, fun_name);
133 get_fix_arg(comm, fun_name);
134
135 MPI_Probe(source, tag, comm, &status);
136 MPI_Get_count(&status, MPI_PACKED, &mpi_pack_size);
137 mpi_pack_buffer = reinterpret_cast<char*>(std::malloc(mpi_pack_size));
138
139 MPI_Recv(mpi_pack_buffer, mpi_pack_size, MPI_PACKED,
140 source, tag, comm, &status);
141
142 // The only relevant status things are the 3 public fields, so I'll
143 // stick them in a list and return them as the 2nd value
144 //
145 LispObject r = unpack_object();
146 Save save(r);
147 std::free(mpi_pack_buffer);
148 Lstatus = list3(fixnum_of_int(status.MPI_SOURCE),
149 fixnum_of_int(status.MPI_TAG),
150 fixnum_of_int(status.MPI_ERROR));
151 errexit();
152 save.restore(r);
153 return onevalue(list2(r, Lstatus));
154 }
155
156 // Standard blocking simultaneous send and receive
157 // (mpi_sendrecv send_message dest send_tag source recv_tag comm)
158 // returns (recv_message (source recv_tag error))
159 //
160 // THERE IS A LIMIT OF 1024 BYTES FOR THE RECEIVE BUFFER (sorry.)
161 // THIS WILL BE REMOVED ASAP.
162 //
Lmpi_sendrecv(LispObject,LispObject s_mess,LispObject dest,LispObject tag,LispObject a4up)163 static LispObject Lmpi_sendrecv(LispObject, LispObject s_mess,
164 LispObject dest,
165 LispObject tag, LispObject a4up)
166 { static char fun_name[] = "mpi_sendrecv";
167
168 MPI_Status status;
169 LispObject Lstatus;
170 LispObject s_mess;
171 int r_tag, source, comm;
172 char r_buffer[1024];
173
174 get_fix_arg(dest, fun_name);
175 get_fix_arg(s_tag, fun_name);
176 source = car(a4up);
177 a4up = cdr(a4up);
178 get_fix_arg(source);
179 if (a4up == nil) return aerror("not enough arguments for mpi_sendrecv");
180 r_tag = car(a4up);
181 a4up = cdr(a4up);
182 get_fix_arg(r_tag);
183 if (a4up == nil) return aerror("not enough arguments for mpi_sendrecv");
184 com = car(a4up);
185 a4up = cdr(a4up);
186 get_fix_arg(comm);
187 if (a4up != nil) return aerror("too many arguments for mpi_sendrecv");
188
189 pack_object(s_mess);
190 MPI_Sendrecv(mpi_pack_buffer, mpi_pack_position, MPI_PACKED,
191 dest, s_tag,
192 r_buffer, 1024, MPI_PACKED,
193 source, r_tag, comm, &status);
194 std::free(mpi_pack_buffer);
195 mpi_pack_buffer = r_buffer;
196 LispObject r = unpack_object();
197 Save save(r);
198 Lstatus = list3(fixnum_of_int(status.MPI_SOURCE),
199 fixnum_of_int(status.MPI_TAG),
200 fixnum_of_int(status.MPI_ERROR));
201 errexit();
202 save.restore(r);
203 return onevalue(list2(r, Lstatus));
204 }
205
206 /************** Non-Blocking point-to-point functions ***********/
207
208 // Standard non-blocking send post
209 // (mpi_isend message dest tag comm)
210 // returns request handle
211 //
Lmpi_isend(LispObject,LispObject message,LispObject dest,LispObject tag,LispObject comm)212 static LispObject Lmpi_isend(LispObject, LispObject message,
213 LispObject dest,
214 LispObject tag, LispObject comm)
215 { static char fun_name[] = "mpi_isend";
216
217 LispObject message, request;
218 int dest, tag, comm;
219
220 // For now, we assume type MPI_Request to be 32 bits.
221 request = Lmkvect32(nil,fixnum_of_int(2));
222
223 get_fix_arg(dest, fun_name);
224 get_fix_arg(tag, fun_name);
225 if (cdr(comm) != nil) return aerror("too many args for mpi_isend");
226 comm = car(comm);
227 get_fix_arg(comm, fun_name);
228
229 pack_object(message);
230 MPI_Isend(mpi_pack_buffer, mpi_pack_position, MPI_PACKED,
231 dest, tag, comm, (MPI_Request*)&elt(request,0));
232 elt(request,1) = static_cast<int>(mpi_pack_buffer);
233 return onevalue(request);
234 }
235
236 // Standard non-blocking receive post
237 // (mpi_irecv source tag comm)
238 // returns request handle
239 //
240 // I actually cheat horribly by not posting the request at all (at least
241 // not via MPI), but rather create my own "dummy" request structure.
242 // Then, to complete the request, I MPI_(I)Probe for a matching message,
243 // and receive it if it is there.
244 // This is unsatisfactory since the operation is only non-blocking until the
245 // first lump of the message arrives; for a long message, there may by
246 // a lot of latency after this.
247 //
248 struct dummy_request
249 { int source;
250 int tag;
251 int comm;
252 };
253
254 static LispObject Lmpi_irecv(LispObject, LispObject aource,
255 LispObject tag, LispObject comm
256 { static char fun_name[] = "mpi_irecv";
257
258 LispObject request;
259 char* buffer;
260
261 // For now, we assume type MPI_Request to be 32 bits.
262 request = Lmkvect32(nil,fixnum_of_int(2));
263
264 get_fix_arg(source, fun_name);
265 get_fix_arg(tag, fun_name);
266 get_fix_arg(comm, fun_name);
267
268 elt(request,1) = 0; // There is no buffer yet
269 elt(request,0) = static_cast<int>(std::malloc(sizeof(struct dummy_request)));
270 ((struct dummy_request*)elt(request,0))->source = source;
271 ((struct dummy_request*)elt(request,0))->tag = tag;
272 ((struct dummy_request*)elt(request,0))->comm = comm;
273
274 return onevalue(request);
275 }
276
277 // Wait to complete operation, and deallocate buffer.
278 // (mpi_wait request)
279 // for send, returns nil
280 // for recv, returns (message (source tag error))
281 //
282 static LispObject Lmpi_wait(LispObject env, LispObject request)
283 { MPI_Status status;
284 LispObject message, Lstatus;
285 if ( !(is_vector(request) &&
286 type_of_header(vechdr(request)) == TYPE_VEC32 &&
287 length_of_header(vechdr(request)) == 3*CELL) )
288 return aerror1("mpi_wait",request);
289 if ( elt(request,1))
290 { status.MPI_ERROR = MPI_UNDEFINED;
291 mpi_pack_buffer = reinterpret_cast<void*>(elt(request,1));
292 MPI_Wait( (MPI_Request*)&elt(request,0), &status);
293 if (status.MPI_ERROR == MPI_UNDEFINED) // i.e. send request
294 { std::free(mpi_pack_buffer);
295 return onevalue(nil);
296 }
297 else // old-style receive
298 { LispObject r = unpack_object();
299 Save save(r);
300 std::free(mpi_pack_buffer);
301 Lstatus = list3(fixnum_of_int(status.MPI_SOURCE),
302 fixnum_of_int(status.MPI_TAG),
303 fixnum_of_int(status.MPI_ERROR));
304 errexit();
305 save.restore(r);
306 return onevalue(list2(r, Lstatus));
307 }
308 }
309 else // new-style receive
310 { int source = ((struct dummy_request*)elt(request,0))->source,
311 tag = ((struct dummy_request*)elt(request,0))->tag,
312 comm = ((struct dummy_request*)elt(request,0))->comm;
313 MPI_Probe(source, tag, comm, &status);
314 std::free((struct dummy_request*)elt(request,0));
315 MPI_Get_count(&status, MPI_PACKED, &mpi_pack_size);
316 mpi_pack_buffer = reinterpret_cast<char*>(std::malloc(mpi_pack_size));
317
318 MPI_Recv(mpi_pack_buffer, mpi_pack_size, MPI_PACKED,
319 source, tag, comm, &status);
320
321 // The only relevant status things are the 3 public fields, so I'll
322 // stick them in a list and return them as the 2nd value
323 //
324 LispObject r = unpack_object());
325 Save save(r);
326 std::free(mpi_pack_buffer);
327 Lstatus = list3(fixnum_of_int(status.MPI_SOURCE),
328 fixnum_of_int(status.MPI_TAG),
329 fixnum_of_int(status.MPI_ERROR));
330 errexit();
331 save.restore(r);
332 return onevalue(list2(r, Lstatus));
333 }
334 }
335
336
337 // Test for completion, deallocate buffer if so
338 // (mpi_test request)
339 // for send, returns flag
340 // for recv, returns nil or (message (source tag error))
341 //
342 static LispObject Lmpi_test(LispObject env, LispObject request)
343 { MPI_Status status;
344 LispObject message, Lstatus;
345 int flag;
346 if ( !(is_vector(request) &&
347 type_of_header(vechdr(request)) == TYPE_VEC32 &&
348 length_of_header(vechdr(request)) == 3*CELL) )
349 return aerror1("mpi_wait",request);
350 if (elt(request,1))
351 { status.MPI_ERROR = MPI_UNDEFINED;
352 mpi_pack_buffer = reinterpret_cast<void*>(elt(request,1));
353 MPI_Test( (MPI_Request*)&elt(request,0), &flag, &status);
354 if (!flag) return onevalue(nil);
355 if (status.MPI_ERROR == MPI_UNDEFINED) // send request
356 { std::free(mpi_pack_buffer);
357 return onevalue(Lispify_predicate(YES));
358 }
359 else // old-style receive
360 { LispObject r = unpack_object();
361 Save save(r);
362 std::free(mpi_pack_buffer);
363 Lstatus = list3(fixnum_of_int(status.MPI_SOURCE),
364 fixnum_of_int(status.MPI_TAG),
365 fixnum_of_int(status.MPI_ERROR));
366 errexit();
367 save.restore(r);
368 return onevalue(list2(r, Lstatus));
369 }
370 }
371 else // new-style receive
372 { int source = ((struct dummy_request*)elt(request,0))->source,
373 tag = ((struct dummy_request*)elt(request,0))->tag,
374 comm = ((struct dummy_request*)elt(request,0))->comm, flag;
375 MPI_Iprobe(source, tag, comm, &flag, &status);
376
377 if (!flag) return onevalue(nil);
378
379 std::free((struct dummy_request*)elt(request,0));
380 MPI_Get_count(&status, MPI_PACKED, &mpi_pack_size);
381 mpi_pack_buffer = reinterpret_cast<char*>(std::malloc(mpi_pack_size));
382
383 MPI_Recv(mpi_pack_buffer, mpi_pack_size, MPI_PACKED,
384 source, tag, comm, &status);
385
386 // The only relevant status things are the 3 public fields, so I'll
387 // stick them in a list and return them as the 2nd value
388 //
389 LispObject r = unpack_object();
390 Save save(r);
391 std::free(mpi_pack_buffer);
392 Lstatus = list3(fixnum_of_int(status.MPI_SOURCE),
393 fixnum_of_int(status.MPI_TAG),
394 fixnum_of_int(status.MPI_ERROR));
395 errexit();
396 save.restore(r);
397 return onevalue(list2(r, Lstatus));
398 }
399 }
400
401 /************** Probe functions *******************/
402 // Non-blocking probe
403 // (mpi_iprobe source tag comm)
404 // returns (flag (source tag error))
405 //
406 static LispObject Lmpi_iprobe(LispObject, LispObject source,
407 LispObject tag,
408 LispObject comm)
409 { static char fun_name[] = "impi_probe";
410
411 MPI_Status status;
412 int flag;
413 LispObject Lstatus;
414 get_fix_arg(source, fun_name);
415 get_fix_arg(tag, fun_name);
416 get_fix_arg(comm, fun_name);
417
418 MPI_Iprobe(source, tag, comm, &flag, &status);
419 Lstatus = list3(fixnum_of_int(status.MPI_SOURCE),
420 fixnum_of_int(status.MPI_TAG),
421 fixnum_of_int(status.MPI_ERROR));
422 return onevalue(list2(Lispify_predicate(flag), Lstatus));
423 }
424
425 // Blocking probe
426 // (mpi_probe source tag comm)
427 // returns (source tag error)
428 //
429 static LispObject Lmpi_probe(LispObject, LispObject source,
430 LispObject tag,
431 LispObject comm)
432 { static char fun_name[] = "mpi_probe";
433
434 MPI_Status status;
435 int source, tag, comm;
436 LispObject Lstatus;
437 get_fix_arg(source, fun_name);
438 get_fix_arg(tag, fun_name);
439 get_fix_arg(comm, fun_name);
440
441 MPI_Probe(source, tag, comm, &status);
442 Lstatus = list3(fixnum_of_int(status.MPI_SOURCE),
443 fixnum_of_int(status.MPI_TAG),
444 fixnum_of_int(status.MPI_ERROR));
445 return onevalue(Lstatus);
446 }
447
448 /************** Collective Communications *********/
449
450 // Barrier; blocks until all processes have called
451 // (mpi_barrier comm)
452 // returns nil
453 //
454 static LispObject Lmpi_barrier(LispObject env, LispObject comm)
455 { int rank;
456 static char fun_name[] = "mpi_barrier";
457 if (!is_fixnum(comm)) return aerror1(fun_name, v);
458 MPI_Barrier(int_of_fixnum(comm));
459 return onevalue(nil);
460 }
461
462 // Broadcast; sends buffer of root to buffers of others.
463 // (mpi_bcast message root comm) [message ignored if not root]
464 // returns message
465 //
466 static LispObject Lmpi_bcast(LispObject, LispObject message,
467 LispObject root, LispObject comm)
468 { static char fun_name[] = "mpi_bcast";
469
470 int rank;
471 get_arg(message, fun_name);
472 get_fix_arg(root, fun_name);
473 get_fix_arg(comm, fun_name);
474
475 MPI_Comm_rank(comm,&rank);
476 if (rank == root)
477 { pack_object(message);
478 MPI_Bcast(&mpi_pack_position, 1, MPI_LONG, root, comm);
479 MPI_Bcast(mpi_pack_buffer, mpi_pack_position, MPI_PACKED, root, comm);
480 std::free(mpi_pack_buffer);
481 }
482 else
483 { MPI_Bcast(&mpi_pack_size, 1, MPI_LONG, root, comm);
484 mpi_pack_buffer = reinterpret_cast<char*>(std::malloc(mpi_pack_size));
485 MPI_Bcast(mpi_pack_buffer, mpi_pack_size, MPI_PACKED, root, comm);
486 message = unpack_object();
487 std::free(mpi_pack_buffer);
488 }
489 return onevalue(message);
490 }
491
492 // Gather: root receives messages from others.
493 // (mpi_gather message root comm)
494 // returns vector of messages if root, else nil.
495 //
496 static LispObject Lmpi_gather(LispObject, LispObject message,
497 LispObject root,
498 LispObject comm)
499 { static char fun_name[] = "mpi_gather";
500
501 int rank;
502 get_arg(message, fun_name);
503 get_fix_arg(root, fun_name);
504 get_fix_arg(comm, fun_name);
505
506 MPI_Comm_rank(comm,&rank);
507 pack_object(message);
508 if (rank == root)
509 { int commsize, count;
510 int *recvcounts, *displs;
511 char *recvbuffer;
512
513 MPI_Comm_size(comm,&commsize);
514 recvcounts = reinterpret_cast<int*>(std::calloc(commsize,
515 sizeof(int)));
516 displs = reinterpret_cast<int*>(std::calloc(commsize+1, sizeof(int)));
517 MPI_Gather(&mpi_pack_position, 1, MPI_LONG,
518 recvcounts, 1, MPI_LONG, root, comm);
519
520 displs[0] = 0;
521 for (count = 0; count < commsize; ++count)
522 displs[count+1] = displs[count] + recvcounts[count];
523
524 recvbuffer = reinterpret_cast<char*>(std::malloc(displs[commsize]));
525
526 MPI_Gatherv(mpi_pack_buffer, mpi_pack_position, MPI_PACKED,
527 recvbuffer, recvcounts, displs, MPI_PACKED, root, comm);
528 std::free(mpi_pack_buffer);
529
530 message = Lmkvect(nil, fixnum_of_int(commsize-1));
531 for (count = 0; count < commsize; ++count)
532 { mpi_pack_buffer = recvbuffer + displs[count];
533 mpi_pack_size = recvcounts[count];
534 elt(message, count) = unpack_object();
535 }
536 std::free(recvbuffer); std::free(recvcounts); std::free(displs);
537 }
538 else
539 { MPI_Gather(&mpi_pack_position, 1, MPI_LONG, 0, 0, MPI_LONG, root,
540 comm);
541 MPI_Gatherv(mpi_pack_buffer, mpi_pack_position, MPI_PACKED,
542 0,0,0,MPI_PACKED, root, comm);
543 std::free(mpi_pack_buffer);
544 message = nil;
545 }
546 return onevalue(message);
547 }
548
549 // Scatter: inverse of gather.
550 // (mpi_scatter vector_of_messages root comm) [messages ignored if not root]
551 // returns message
552 //
553 static LispObject Lmpi_scatter(LispObject, LispObject messages,
554 LispObject root,
555 LispObject comm)
556 { static char fun_name[] = "mpi_scatter";
557
558 LispObject message;
559 int rank;
560 std::va_list a;
561
562 get_fix_arg(root, fun_name);
563 get_fix_arg(comm, fun_name);
564
565 MPI_Comm_rank(comm,&rank);
566 if (rank == root)
567 { int commsize, count, *sendcounts, *displs, recvcount;
568 char* recvbuffer;
569
570 MPI_Comm_size(comm,&commsize);
571 sendcounts = reinterpret_cast<int*>(std::calloc(commsize,
572 sizeof(int)));
573 displs = reinterpret_cast<int*>(std::calloc(commsize+1, sizeof(int)));
574 displs[0] = 0;
575
576 // Call private functions in mpi_packing for consecutive packs
577 check_buffer = scatter_check_buffer;
578 mpi_pack_offset = 0;
579 mpi_pack_position = 0;
580 mpi_pack_size = 0;
581 mpi_buffer_bottom = 0;
582 mpi_real_size = 0;
583 for (count = 0; count < commsize; ++count)
584 { pack_cell(elt(messages,count));
585 sendcounts[count] = mpi_pack_position;
586 mpi_pack_size -= mpi_pack_position;
587 mpi_pack_offset += mpi_pack_position;
588 mpi_pack_buffer += mpi_pack_position;
589 displs[count+1] = mpi_pack_offset;
590 mpi_pack_position = 0;
591 }
592 check_buffer = default_check_buffer;
593 MPI_Scatter(sendcounts, 1, MPI_LONG, &recvcount, 1, MPI_LONG, root,
594 comm);
595 recvbuffer = reinterpret_cast<char*>(std::malloc(recvcount));
596 MPI_Scatterv(mpi_buffer_bottom, sendcounts, displs, MPI_PACKED,
597 recvbuffer, recvcount, MPI_PACKED, root, comm);
598 std::free(recvbuffer);
599 std::free(sendcounts);
600 std::free(displs);
601 std::free(mpi_buffer_bottom);
602 message = elt(messages, root);
603 }
604 else
605 { MPI_Scatter(0,0,MPI_LONG,&mpi_pack_size,1,MPI_LONG,root,comm);
606 mpi_pack_buffer = reinterpret_cast<char*>(std::malloc(mpi_pack_size));
607 MPI_Scatterv(0,0,0,MPI_PACKED,
608 mpi_pack_buffer,mpi_pack_size,MPI_PACKED,root,comm);
609 message = unpack_object();
610 std::free(mpi_pack_buffer);
611 }
612 return onevalue(message);
613 }
614
615
616 // Allgather: just like gather, only everyone gets the result.
617 // (mpi_allgather message comm)
618 // returns vector of messages
619 //
620 static LispObject Lmpi_allgather(LispObject,
621 LispObject message,
622 LispObject comm)
623 { static char fun_name[] = "mpi_gather";
624 int commsize, buffersize, count;
625 int *recvcounts, *displs;
626 char *recvbuffer;
627
628 if (!is_fixnum(comm)) return aerror1(fun_name, v);
629 comm = int_of_fixnum(comm);
630
631 pack_object(message);
632
633 MPI_Comm_size(comm,&commsize);
634 recvcounts = reinterpret_cast<int*>(std::calloc(commsize,
635 sizeof(int)));
636 displs = reinterpret_cast<int*>(std::calloc(commsize+1, sizeof(int)));
637 MPI_Allgather(&mpi_pack_position, 1, MPI_LONG, recvcounts, 1,
638 MPI_LONG, comm);
639
640 displs[0] = 0;
641 for (count = 0; count < commsize; ++count)
642 displs[count+1] = displs[count] + recvcounts[count];
643
644 recvbuffer = reinterpret_cast<char*>(std::malloc(displs[commsize]));
645
646 MPI_Allgatherv(mpi_pack_buffer, mpi_pack_position, MPI_PACKED,
647 recvbuffer, recvcounts, displs, MPI_PACKED, comm);
648 std::free(mpi_pack_buffer); std::free(recvcounts); std::free(displs);
649
650 message = Lmkvect(nil, fixnum_of_int(commsize-1));
651 for (count = 0; count < commsize; ++count)
652 { mpi_pack_buffer = recvbuffer + displs[count];
653 mpi_pack_size = recvcounts[count];
654 elt(message, count) = unpack_object();
655 }
656 std::free(recvbuffer);
657 return onevalue(message);
658 }
659
660 // All to all scatter/gather.
661 // (mpi_alltoall vector_of_messages comm)
662 // returns vector of messages.
663 //
664 static LispObject Lmpi_alltoall(LispObject,
665 LispObject smessages, LispObject Lcomm)
666 { static char fun_name[] = "mpi_alltoall";
667
668 LispObject rmessages;
669 int rank,comm, commsize, count;
670 int *sendcounts, *recvcounts, *sdispls, *rdispls;
671 char* recvbuffer;
672
673 if (!is_fixnum(comm)) return aerror1(fun_name, v);
674 comm = int_of_fixnum(Lcomm);
675
676 MPI_Comm_size(comm,&commsize);
677 sendcounts = reinterpret_cast<int*>(std::calloc(commsize, sizeof(int)));
678 recvcounts = reinterpret_cast<int*>(std::calloc(commsize, sizeof(int)));
679 sdispls = reinterpret_cast<int*>(std::calloc(commsize+1, sizeof(int)));
680 rdispls = reinterpret_cast<int*>(std::calloc(commsize+1, sizeof(int)));
681
682 // Call private functions in mpi_packing for consecutive packs
683 check_buffer = scatter_check_buffer;
684 mpi_pack_offset = 0;
685 mpi_pack_position = 0;
686 mpi_pack_size = 0;
687 mpi_buffer_bottom = 0;
688 mpi_real_size = 0;
689 for (count = 0; count < commsize; ++count)
690 { pack_cell(elt(smessages,count));
691 sendcounts[count] = mpi_pack_position;
692 mpi_pack_size -= mpi_pack_position;
693 mpi_pack_offset += mpi_pack_position;
694 mpi_pack_buffer += mpi_pack_position;
695 sdispls[count+1] = mpi_pack_offset;
696 mpi_pack_position = 0;
697 }
698 check_buffer = default_check_buffer;
699
700 MPI_Comm_rank(comm,&rank);
701
702 MPI_Alltoall(sendcounts, 1, MPI_LONG, recvcounts, 1, MPI_LONG, comm);
703
704 rdispls[0] = 0;
705 for (count = 0; count < commsize; ++count)
706 rdispls[count+1] = rdispls[count] + recvcounts[count];
707
708 recvbuffer = reinterpret_cast<char*>(std::malloc(rdispls[commsize]));
709
710 MPI_Alltoallv(mpi_buffer_bottom, sendcounts, sdispls, MPI_PACKED,
711 recvbuffer, recvcounts, rdispls, MPI_PACKED, comm);
712
713 std::free(mpi_buffer_bottom); std::free(sendcounts);
714 std::free(sdispls);
715
716 rmessages = Lmkvect(nil, fixnum_of_int(commsize-1));
717 for (count = 0; count < commsize; ++count)
718 { mpi_pack_buffer = recvbuffer + rdispls[count];
719 mpi_pack_size = recvcounts[count];
720 elt(rmessages, count) = unpack_object();
721 }
722 std::free(recvbuffer); std::free(recvcounts); std::free(rdispls);
723 return onevalue(rmessages);
724 }
725
726 #else // USE_MPI
727
728 static LispObject Lmpi_comm_rank(LispObject, LispObject)
729 { return aerror0("mpi support not built into this version of CSL");
730 }
731
732 static LispObject Lmpi_comm_size(LispObject, LispObject)
733 { return aerror0("mpi support not built into this version of CSL");
734 }
735
736 static LispObject Lmpi_send(LispObject, LispObject, LispObject,
737 LispObject, LispObject)
738 { return aerror0("mpi support not built into this version of CSL");
739 }
740
741 static LispObject Lmpi_recv(LispObject, LispObject, LispObject,
742 LispObject)
743 { return aerror0("mpi support not built into this version of CSL");
744 }
745
746 static LispObject Lmpi_sendrecv(LispObject, LispObject, LispObject,
747 LispObject, LispObject)
748 { return aerror0("mpi support not built into this version of CSL");
749 }
750
751 static LispObject Lmpi_isend(LispObject, LispObject, LispObject,
752 LispObject, LispObject)
753 { return aerror0("mpi support not built into this version of CSL");
754 }
755
756 static LispObject Lmpi_irecv(LispObject, LispObject, LispObject,
757 LispObject)
758 { return aerror0("mpi support not built into this version of CSL");
759 }
760
761 static LispObject Lmpi_wait(LispObject, LispObject)
762 { return aerror0("mpi support not built into this version of CSL");
763 }
764
765
766 static LispObject Lmpi_test(LispObject, LispObject)
767 { return aerror0("mpi support not built into this version of CSL");
768 }
769
770 static LispObject Lmpi_iprobe(LispObject, LispObject, LispObject,
771 LispObject)
772 { return aerror0("mpi support not built into this version of CSL");
773 }
774
775 static LispObject Lmpi_probe(LispObject, LispObject, LispObject,
776 LispObject)
777 { return aerror0("mpi support not built into this version of CSL");
778 }
779
780 static LispObject Lmpi_barrier(LispObject, LispObject)
781 { return aerror0("mpi support not built into this version of CSL");
782 }
783
784 static LispObject Lmpi_bcast(LispObject, LispObject, LispObject,
785 LispObject)
786 { return aerror0("mpi support not built into this version of CSL");
787 }
788
789 static LispObject Lmpi_gather(LispObject, LispObject, LispObject,
790 LispObject)
791 { return aerror0("mpi support not built into this version of CSL");
792 }
793
794 static LispObject Lmpi_scatter(LispObject, LispObject, LispObject,
795 LispObject)
796 { return aerror0("mpi support not built into this version of CSL");
797 }
798
799
800 static LispObject Lmpi_allgather(LispObject,
801 LispObject,
802 LispObject)
803 { return aerror0("mpi support not built into this version of CSL");
804 }
805
806 static LispObject Lmpi_alltoall(LispObject,
807 LispObject, LispObject)
808 { return aerror0("mpi support not built into this version of CSL");
809 }
810
811 #endif // USE_MPI
812
813
814 setup_type const mpi_setup[] =
815 { DEF_1("mpi_comm_rank", Lmpi_comm_rank),
816 DEF_1("mpi_comm_size", Lmpi_comm_size),
817 DEF_4up("mpi_send", Lmpi_send),
818 DEF_3("mpi_recv", Lmpi_recv),
819 DEF_4up("mpi_sendrecv", Lmpi_sendrecv),
820 DEF_4up("mpi_isend", Lmpi_isend),
821 DEF_3("mpi_irecv", Lmpi_irecv),
822 DEF_1("mpi_barrier", Lmpi_barrier),
823 DEF_1("mpi_wait", Lmpi_wait),
824 DEF_1("mpi_test", Lmpi_test),
825 DEF_3("mpi_probe", Lmpi_probe),
826 DEF_3("mpi_iprobe", Lmpi_iprobe),
827 DEF_3("mpi_bcast", Lmpi_bcast),
828 DEF_3("mpi_gather", Lmpi_gather),
829 DEF_2("mpi_allgather", Lmpi_allgather),
830 DEF_3("mpi_scatter", Lmpi_scatter),
831 DEF_2("mpi_alltoall", Lmpi_alltoall),
832 {nullptr, nullptr, nullptr, nullptr, nullptr, nullptr}
833 };
834
835
836 // end of cslmpi.cpp
837