1 /* Copyright (C) 2013 The PARI group.
2
3 This file is part of the PARI/GP package.
4
5 PARI/GP is free software; you can redistribute it and/or modify it under the
6 terms of the GNU General Public License as published by the Free Software
7 Foundation; either version 2 of the License, or (at your option) any later
8 version. It is distributed in the hope that it will be useful, but WITHOUT
9 ANY WARRANTY WHATSOEVER.
10
11 Check the License for details. You should have received a copy of it, along
12 with the package; see the file 'COPYING'. If not, write to the Free Software
13 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
14 #include <mpi.h>
15 #include "pari.h"
16 #include "paripriv.h"
17 #include "../src/language/anal.h"
18 #include "mt.h"
19
20 static THREAD int pari_MPI_size, pari_MPI_rank;
21 static THREAD long nbreq = 0, mt_issingle = 0;
22
23 enum PMPI_cmd { PMPI_close, PMPI_worker, PMPI_work, PMPI_parisizemax,
24 PMPI_parisize, PMPI_precreal, PMPI_primetab,
25 PMPI_varpriority, PMPI_eval,
26 PMPI_exportadd, PMPI_exportdel};
27
28 struct mt_mstate
29 {
30 long n;
31 int source;
32 long nbint;
33 long *workid;
34 };
35
36 static struct mt_mstate pari_mt_data;
37 static struct mt_mstate *pari_mt;
38
39 static void
send_long(long a,long dest)40 send_long(long a, long dest)
41 {
42 BLOCK_SIGINT_START
43 MPI_Send(&a, 1, MPI_LONG, dest, 0, MPI_COMM_WORLD);
44 BLOCK_SIGINT_END
45 }
46
47 static void
send_vlong(long * a,long n,long dest)48 send_vlong(long *a, long n, long dest)
49 {
50 BLOCK_SIGINT_START
51 MPI_Send(a, n, MPI_LONG, dest, 0, MPI_COMM_WORLD);
52 BLOCK_SIGINT_END
53 }
54
55 static void
send_request(enum PMPI_cmd ecmd,long dest)56 send_request(enum PMPI_cmd ecmd, long dest)
57 {
58 send_long((long)ecmd, dest);
59 }
60
61 static void
send_GEN(GEN elt,int dest)62 send_GEN(GEN elt, int dest)
63 {
64 pari_sp av = avma;
65 int size;
66 GEN reloc = copybin_unlink(elt);
67 GENbin *buf = copy_bin_canon(mkvec2(elt,reloc));
68 size = sizeof(GENbin) + buf->len*sizeof(ulong);
69 {
70 BLOCK_SIGINT_START
71 MPI_Send(buf, size, MPI_CHAR, dest, 0, MPI_COMM_WORLD);
72 BLOCK_SIGINT_END
73 }
74 pari_free(buf); set_avma(av);
75 }
76
77 static void
send_request_GEN(enum PMPI_cmd ecmd,GEN elt,int dest)78 send_request_GEN(enum PMPI_cmd ecmd, GEN elt, int dest)
79 {
80 send_request(ecmd, dest);
81 send_GEN(elt, dest);
82 }
83
84 static void
send_request_long(enum PMPI_cmd ecmd,long elt,int dest)85 send_request_long(enum PMPI_cmd ecmd, long elt, int dest)
86 {
87 send_request(ecmd, dest);
88 send_long(elt, dest);
89 }
90
91 static void
send_request_vlong(enum PMPI_cmd ecmd,long * a,long n,int dest)92 send_request_vlong(enum PMPI_cmd ecmd, long *a, long n, int dest)
93 {
94 send_request(ecmd, dest);
95 send_vlong(a, n, dest);
96 }
97
98 static long
recvfrom_long(int src)99 recvfrom_long(int src)
100 {
101 long a;
102 BLOCK_SIGINT_START
103 MPI_Recv(&a, 1, MPI_LONG, src, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
104 BLOCK_SIGINT_END
105 return a;
106 }
107
108 static void
recvfrom_vlong(long * a,long n,int src)109 recvfrom_vlong(long *a, long n, int src)
110 {
111 BLOCK_SIGINT_START
112 MPI_Recv(a, n, MPI_LONG, src, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
113 BLOCK_SIGINT_END
114 }
115
116 static enum PMPI_cmd
recvfrom_request(int src)117 recvfrom_request(int src)
118 {
119 return (enum PMPI_cmd) recvfrom_long(src);
120 }
121
122 static GENbin *
recvstatus_buf(int source,MPI_Status * status)123 recvstatus_buf(int source, MPI_Status *status)
124 {
125 int size;
126 GENbin *buf;
127 BLOCK_SIGINT_START
128
129 MPI_Get_count(status, MPI_CHAR, &size);
130 buf = (GENbin *)pari_malloc(size);
131 MPI_Recv(buf, size, MPI_CHAR, source, 0/* tag */,
132 MPI_COMM_WORLD, MPI_STATUS_IGNORE);
133 BLOCK_SIGINT_END
134 return buf;
135 }
136
137 static GEN
recvstatus_GEN(int source,MPI_Status * status)138 recvstatus_GEN(int source, MPI_Status *status)
139 {
140 GEN res;
141 GENbin *buf = recvstatus_buf(source, status);
142 buf->rebase = &shiftaddress_canon;
143 res = bin_copy(buf);
144 bincopy_relink(gel(res,1),gel(res,2));
145 return gel(res,1);
146 }
147
148 static void
recvstatus_void(int source,MPI_Status * status)149 recvstatus_void(int source, MPI_Status *status)
150 {
151 GENbin *buf = recvstatus_buf(source, status);
152 free(buf);
153 }
154
155 static GEN
recvfrom_GEN(int src)156 recvfrom_GEN(int src)
157 {
158 MPI_Status status;
159 BLOCK_SIGINT_START
160 MPI_Probe(src, 0, MPI_COMM_WORLD, &status);
161 BLOCK_SIGINT_END
162 return recvstatus_GEN(src, &status);
163 }
164
165 static GEN
recvany_GEN(int * source)166 recvany_GEN(int *source)
167 {
168 MPI_Status status;
169 BLOCK_SIGINT_START
170 MPI_Probe(MPI_ANY_SOURCE, 0 /* tag */, MPI_COMM_WORLD, &status);
171 *source = status.MPI_SOURCE;
172 BLOCK_SIGINT_END
173 return recvstatus_GEN(*source, &status);
174 }
175
176 static void
recvany_void(int * source)177 recvany_void(int *source)
178 {
179 MPI_Status status;
180 BLOCK_SIGINT_START
181 MPI_Probe(MPI_ANY_SOURCE, 0 /* tag */, MPI_COMM_WORLD, &status);
182 *source = status.MPI_SOURCE;
183 BLOCK_SIGINT_END
184 recvstatus_void(*source, &status);
185 }
186
187 static jmp_buf child_env;
188
189 static void
pari_MPI_child(void)190 pari_MPI_child(void)
191 {
192 pari_sp av = avma;
193 ulong rsize = 0, vsize = 0;
194 GEN worker = NULL, work, done;
195 struct gp_context rec;
196 pari_mt_nbthreads = 1;
197 gp_context_save(&rec);
198 if (setjmp(child_env))
199 {
200 send_GEN(pari_err_last(), 0);
201 gp_context_restore(&rec);
202 }
203 while (1)
204 switch (recvfrom_request(0))
205 {
206 case PMPI_worker:
207 paristack_setsize(rsize, vsize);
208 gp_context_save(&rec);
209 worker = recvfrom_GEN(0);
210 av = avma;
211 break;
212 case PMPI_work:
213 work = recvfrom_GEN(0);
214 done = closure_callgenvec(worker, work);
215 send_GEN(done, 0);
216 set_avma(av);
217 break;
218 case PMPI_parisizemax:
219 vsize = recvfrom_long(0);
220 break;
221 case PMPI_parisize:
222 rsize = recvfrom_long(0);
223 break;
224 case PMPI_precreal:
225 precreal = recvfrom_long(0);
226 break;
227 case PMPI_primetab:
228 {
229 pari_sp ltop = avma;
230 GEN tab = recvfrom_GEN(0);
231 if (!gequal(tab, primetab))
232 {
233 long i, l = lg(tab);
234 GEN old = primetab, t = cgetg_block(l, t_VEC);
235 for (i = 1; i < l; i++) gel(t,i) = gclone(gel(tab,i));
236 primetab = t;
237 gunclone_deep(old);
238 }
239 set_avma(ltop);
240 }
241 break;
242 case PMPI_eval:
243 (void) closure_evalgen(recvfrom_GEN(0));
244 set_avma(av);
245 break;
246 case PMPI_varpriority:
247 recvfrom_vlong(varpriority-1,MAXVARN+2,0);
248 break;
249 case PMPI_exportadd:
250 {
251 GEN str = recvfrom_GEN(0);
252 GEN val = recvfrom_GEN(0);
253 entree *ep = fetch_entry(GSTR(str));
254 export_add(ep->name, val);
255 set_avma(av);
256 break;
257 }
258 case PMPI_exportdel:
259 {
260 GEN str = recvfrom_GEN(0);
261 entree *ep = fetch_entry(GSTR(str));
262 export_del(ep->name);
263 set_avma(av);
264 break;
265 }
266 case PMPI_close:
267 MPI_Barrier(MPI_COMM_WORLD);
268 MPI_Finalize();
269 exit(0);
270 break;
271 }
272 }
273
274 void
mt_err_recover(long er)275 mt_err_recover(long er)
276 {
277 if (pari_MPI_rank) longjmp(child_env,er);
278 else if (mt_issingle) mtsingle_err_recover(er);
279 }
mt_sigint_block(void)280 void mt_sigint_block(void) { }
mt_sigint_unblock(void)281 void mt_sigint_unblock(void) { }
mt_sigint(void)282 void mt_sigint(void) {}
283
284 int
mt_is_parallel(void)285 mt_is_parallel(void)
286 {
287 return !!pari_mt;
288 }
289
290 int
mt_is_thread(void)291 mt_is_thread(void)
292 {
293 return pari_MPI_rank ? 1 : mt_issingle ? mtsingle_is_thread(): 0;
294 }
295
296 long
mt_nbthreads(void)297 mt_nbthreads(void)
298 {
299 return pari_mt || pari_MPI_rank || pari_MPI_size <= 2 ? 1: pari_mt_nbthreads;
300 }
301
302 void
mt_export_add(const char * str,GEN val)303 mt_export_add(const char *str, GEN val)
304 {
305 pari_sp av = avma;
306 long i, n = pari_MPI_size-1;
307 GEN s;
308 if (pari_mt || pari_MPI_rank)
309 pari_err(e_MISC,"export not allowed during parallel sections");
310 export_add(str, val);
311 s = strtoGENstr(str);
312 for (i=1; i <= n; i++)
313 {
314 send_request(PMPI_exportadd, i);
315 send_GEN(s, i);
316 send_GEN(val, i);
317 }
318 set_avma(av);
319 }
320
321 void
mt_export_del(const char * str)322 mt_export_del(const char *str)
323 {
324 pari_sp av = avma;
325 long i, n = pari_MPI_size-1;
326 GEN s;
327 if (pari_MPI_rank)
328 pari_err(e_MISC,"unexport not allowed during parallel sections");
329 export_del(str);
330 s = strtoGENstr(str);
331 for (i=1; i <= n; i++)
332 send_request_GEN(PMPI_exportdel, s, i);
333 set_avma(av);
334 }
335
336 void
mt_broadcast(GEN code)337 mt_broadcast(GEN code)
338 {
339 long i;
340 if (!pari_MPI_rank && !pari_mt)
341 for (i=1;i<pari_MPI_size;i++)
342 send_request_GEN(PMPI_eval, code, i);
343 }
344
345 void
pari_mt_init(void)346 pari_mt_init(void)
347 {
348 int res = MPI_Init(0, NULL);
349 if (res == MPI_SUCCESS)
350 {
351 MPI_Comm_size(MPI_COMM_WORLD, &pari_MPI_size);
352 MPI_Comm_rank(MPI_COMM_WORLD, &pari_MPI_rank);
353 if (pari_MPI_rank) pari_MPI_child();
354 #ifdef _IOFBF
355 /* HACK: most MPI implementation does not handle stdin well.
356 stdinsize is sufficient for the largest test file to fit */
357 setvbuf(stdin,pari_malloc(128*1024),_IOFBF,128*1024);
358 #endif
359 if (!pari_mt_nbthreads)
360 pari_mt_nbthreads = maxss(1, pari_MPI_size-1);
361 }
362 else
363 {
364 pari_MPI_size = 0;
365 pari_MPI_rank = 0;
366 pari_mt_nbthreads = 1;
367 }
368 pari_mt = NULL;
369 }
370
371 void
pari_mt_close(void)372 pari_mt_close(void)
373 {
374 long i;
375 if (!pari_MPI_rank)
376 for (i = 1; i < pari_MPI_size; i++)
377 send_request(PMPI_close, i);
378 MPI_Barrier(MPI_COMM_WORLD);
379 MPI_Finalize();
380 }
381
382 static GEN
mtmpi_queue_get(struct mt_state * junk,long * workid,long * pending)383 mtmpi_queue_get(struct mt_state *junk, long *workid, long *pending)
384 {
385 struct mt_mstate *mt = pari_mt;
386 GEN done;
387 (void) junk;
388 if (mt->nbint<=mt->n) { mt->source=mt->nbint; *pending = nbreq; return NULL; }
389 done = recvany_GEN(&mt->source);
390 nbreq--; *pending = nbreq;
391 if (workid) *workid = mt->workid[mt->source];
392 if (typ(done) == t_ERROR)
393 {
394 if (err_get_num(done)==e_STACK)
395 pari_err(e_STACKTHREAD);
396 else
397 pari_err(0,done);
398 }
399 return done;
400 }
401
402 static void
mtmpi_queue_submit(struct mt_state * junk,long workid,GEN work)403 mtmpi_queue_submit(struct mt_state *junk, long workid, GEN work)
404 {
405 struct mt_mstate *mt = pari_mt;
406 (void) junk;
407 if (!work) { mt->nbint=mt->n+1; return; }
408 if (mt->nbint<=mt->n) mt->nbint++;
409 nbreq++;
410 mt->workid[mt->source] = workid;
411 send_request_GEN(PMPI_work, work, mt->source);
412 }
413
414 void
mt_queue_reset(void)415 mt_queue_reset(void)
416 {
417 struct mt_mstate *mt = pari_mt;
418 if (DEBUGLEVEL>0 && nbreq)
419 pari_warn(warner,"%ld discarded threads (MPI)",nbreq);
420 for( ;nbreq>0; nbreq--) recvany_void(&mt->source);
421 pari_free(mt->workid);
422 pari_mt = NULL;
423 }
424
425 void
mt_queue_start_lim(struct pari_mt * pt,GEN worker,long lim)426 mt_queue_start_lim(struct pari_mt *pt, GEN worker, long lim)
427 {
428 if (lim==0) lim = pari_mt_nbthreads;
429 else lim = minss(pari_mt_nbthreads, lim);
430 if (pari_mt || pari_MPI_rank || pari_MPI_size <= 2 || lim <= 1)
431 {
432 mt_issingle = 1;
433 mtsingle_queue_start(pt, worker);
434 }
435 else
436 {
437 struct mt_mstate *mt = &pari_mt_data;
438 long i, n = minss(lim, pari_MPI_size-1);
439 long mtparisize = GP_DATA->threadsize? GP_DATA->threadsize: pari_mainstack->rsize;
440 long mtparisizemax = GP_DATA->threadsizemax;
441 pari_mt = mt;
442 mt_issingle = 0;
443 mt->workid = (long*) pari_malloc(sizeof(long)*(n+1));
444 for (i=1; i <= n; i++)
445 {
446 send_request_long(PMPI_parisize, mtparisize, i);
447 send_request_long(PMPI_parisizemax, mtparisizemax, i);
448 send_request_long(PMPI_precreal, get_localbitprec(), i);
449 send_request_vlong(PMPI_varpriority,varpriority-1,MAXVARN+2, i);
450 send_request_GEN(PMPI_primetab, primetab, i);
451 send_request_GEN(PMPI_worker, worker, i);
452 }
453 mt->n = n;
454 mt->nbint = 1;
455 mt->source = 1;
456 pt->get=&mtmpi_queue_get;
457 pt->submit=&mtmpi_queue_submit;
458 pt->end=&mt_queue_reset;
459 }
460 }
461