1 /* Copyright (C) 2013  The PARI group.
2 
3 This file is part of the PARI/GP package.
4 
5 PARI/GP is free software; you can redistribute it and/or modify it under the
6 terms of the GNU General Public License as published by the Free Software
7 Foundation; either version 2 of the License, or (at your option) any later
8 version. It is distributed in the hope that it will be useful, but WITHOUT
9 ANY WARRANTY WHATSOEVER.
10 
11 Check the License for details. You should have received a copy of it, along
12 with the package; see the file 'COPYING'. If not, write to the Free Software
13 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
14 #include <mpi.h>
15 #include "pari.h"
16 #include "paripriv.h"
17 #include "../src/language/anal.h"
18 #include "mt.h"
19 
20 static THREAD int pari_MPI_size, pari_MPI_rank;
21 static THREAD long nbreq = 0, mt_issingle = 0;
22 
23 enum PMPI_cmd { PMPI_close, PMPI_worker, PMPI_work, PMPI_parisizemax,
24                 PMPI_parisize, PMPI_precreal, PMPI_primetab,
25                 PMPI_varpriority, PMPI_eval,
26                 PMPI_exportadd, PMPI_exportdel};
27 
28 struct mt_mstate
29 {
30   long n;
31   int source;
32   long nbint;
33   long *workid;
34 };
35 
36 static struct mt_mstate pari_mt_data;
37 static struct mt_mstate *pari_mt;
38 
39 static void
send_long(long a,long dest)40 send_long(long a, long dest)
41 {
42   BLOCK_SIGINT_START
43   MPI_Send(&a, 1, MPI_LONG, dest, 0, MPI_COMM_WORLD);
44   BLOCK_SIGINT_END
45 }
46 
47 static void
send_vlong(long * a,long n,long dest)48 send_vlong(long *a, long n, long dest)
49 {
50   BLOCK_SIGINT_START
51   MPI_Send(a, n, MPI_LONG, dest, 0, MPI_COMM_WORLD);
52   BLOCK_SIGINT_END
53 }
54 
55 static void
send_request(enum PMPI_cmd ecmd,long dest)56 send_request(enum PMPI_cmd ecmd, long dest)
57 {
58   send_long((long)ecmd, dest);
59 }
60 
61 static void
send_GEN(GEN elt,int dest)62 send_GEN(GEN elt, int dest)
63 {
64   pari_sp av = avma;
65   int size;
66   GEN reloc = copybin_unlink(elt);
67   GENbin *buf = copy_bin_canon(mkvec2(elt,reloc));
68   size = sizeof(GENbin) + buf->len*sizeof(ulong);
69   {
70     BLOCK_SIGINT_START
71     MPI_Send(buf, size, MPI_CHAR, dest, 0, MPI_COMM_WORLD);
72     BLOCK_SIGINT_END
73   }
74   pari_free(buf); set_avma(av);
75 }
76 
77 static void
send_request_GEN(enum PMPI_cmd ecmd,GEN elt,int dest)78 send_request_GEN(enum PMPI_cmd ecmd, GEN elt, int dest)
79 {
80   send_request(ecmd, dest);
81   send_GEN(elt, dest);
82 }
83 
84 static void
send_request_long(enum PMPI_cmd ecmd,long elt,int dest)85 send_request_long(enum PMPI_cmd ecmd, long elt, int dest)
86 {
87   send_request(ecmd, dest);
88   send_long(elt, dest);
89 }
90 
91 static void
send_request_vlong(enum PMPI_cmd ecmd,long * a,long n,int dest)92 send_request_vlong(enum PMPI_cmd ecmd, long *a, long n, int dest)
93 {
94   send_request(ecmd, dest);
95   send_vlong(a, n, dest);
96 }
97 
98 static long
recvfrom_long(int src)99 recvfrom_long(int src)
100 {
101   long a;
102   BLOCK_SIGINT_START
103   MPI_Recv(&a, 1, MPI_LONG, src, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
104   BLOCK_SIGINT_END
105   return a;
106 }
107 
108 static void
recvfrom_vlong(long * a,long n,int src)109 recvfrom_vlong(long *a, long n, int src)
110 {
111   BLOCK_SIGINT_START
112   MPI_Recv(a, n, MPI_LONG, src, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
113   BLOCK_SIGINT_END
114 }
115 
116 static enum PMPI_cmd
recvfrom_request(int src)117 recvfrom_request(int src)
118 {
119   return (enum PMPI_cmd) recvfrom_long(src);
120 }
121 
122 static GENbin *
recvstatus_buf(int source,MPI_Status * status)123 recvstatus_buf(int source, MPI_Status *status)
124 {
125   int size;
126   GENbin *buf;
127   BLOCK_SIGINT_START
128 
129   MPI_Get_count(status, MPI_CHAR, &size);
130   buf = (GENbin *)pari_malloc(size);
131   MPI_Recv(buf, size, MPI_CHAR, source, 0/* tag */,
132           MPI_COMM_WORLD, MPI_STATUS_IGNORE);
133   BLOCK_SIGINT_END
134   return buf;
135 }
136 
137 static GEN
recvstatus_GEN(int source,MPI_Status * status)138 recvstatus_GEN(int source, MPI_Status *status)
139 {
140   GEN res;
141   GENbin *buf = recvstatus_buf(source, status);
142   buf->rebase = &shiftaddress_canon;
143   res = bin_copy(buf);
144   bincopy_relink(gel(res,1),gel(res,2));
145   return gel(res,1);
146 }
147 
148 static void
recvstatus_void(int source,MPI_Status * status)149 recvstatus_void(int source, MPI_Status *status)
150 {
151   GENbin *buf = recvstatus_buf(source, status);
152   free(buf);
153 }
154 
155 static GEN
recvfrom_GEN(int src)156 recvfrom_GEN(int src)
157 {
158   MPI_Status status;
159   BLOCK_SIGINT_START
160   MPI_Probe(src, 0, MPI_COMM_WORLD, &status);
161   BLOCK_SIGINT_END
162   return recvstatus_GEN(src, &status);
163 }
164 
165 static GEN
recvany_GEN(int * source)166 recvany_GEN(int *source)
167 {
168   MPI_Status status;
169   BLOCK_SIGINT_START
170   MPI_Probe(MPI_ANY_SOURCE, 0 /* tag */, MPI_COMM_WORLD, &status);
171   *source = status.MPI_SOURCE;
172   BLOCK_SIGINT_END
173   return recvstatus_GEN(*source, &status);
174 }
175 
176 static void
recvany_void(int * source)177 recvany_void(int *source)
178 {
179   MPI_Status status;
180   BLOCK_SIGINT_START
181   MPI_Probe(MPI_ANY_SOURCE, 0 /* tag */, MPI_COMM_WORLD, &status);
182   *source = status.MPI_SOURCE;
183   BLOCK_SIGINT_END
184   recvstatus_void(*source, &status);
185 }
186 
187 static jmp_buf child_env;
188 
189 static void
pari_MPI_child(void)190 pari_MPI_child(void)
191 {
192   pari_sp av = avma;
193   ulong rsize = 0, vsize = 0;
194   GEN worker = NULL, work, done;
195   struct gp_context rec;
196   pari_mt_nbthreads = 1;
197   gp_context_save(&rec);
198   if (setjmp(child_env))
199   {
200     send_GEN(pari_err_last(), 0);
201     gp_context_restore(&rec);
202   }
203   while (1)
204     switch (recvfrom_request(0))
205     {
206     case PMPI_worker:
207       paristack_setsize(rsize, vsize);
208       gp_context_save(&rec);
209       worker = recvfrom_GEN(0);
210       av = avma;
211       break;
212     case PMPI_work:
213       work = recvfrom_GEN(0);
214       done = closure_callgenvec(worker, work);
215       send_GEN(done, 0);
216       set_avma(av);
217       break;
218     case PMPI_parisizemax:
219       vsize = recvfrom_long(0);
220       break;
221     case PMPI_parisize:
222       rsize = recvfrom_long(0);
223       break;
224     case PMPI_precreal:
225       precreal = recvfrom_long(0);
226       break;
227     case PMPI_primetab:
228       {
229         pari_sp ltop = avma;
230         GEN tab = recvfrom_GEN(0);
231         if (!gequal(tab, primetab))
232         {
233           long i, l = lg(tab);
234           GEN old = primetab, t = cgetg_block(l, t_VEC);
235           for (i = 1; i < l; i++) gel(t,i) = gclone(gel(tab,i));
236           primetab = t;
237           gunclone_deep(old);
238         }
239         set_avma(ltop);
240       }
241       break;
242     case PMPI_eval:
243       (void) closure_evalgen(recvfrom_GEN(0));
244       set_avma(av);
245       break;
246     case PMPI_varpriority:
247       recvfrom_vlong(varpriority-1,MAXVARN+2,0);
248       break;
249     case PMPI_exportadd:
250     {
251       GEN str = recvfrom_GEN(0);
252       GEN val = recvfrom_GEN(0);
253       entree *ep = fetch_entry(GSTR(str));
254       export_add(ep->name, val);
255       set_avma(av);
256       break;
257     }
258     case PMPI_exportdel:
259     {
260       GEN str = recvfrom_GEN(0);
261       entree *ep = fetch_entry(GSTR(str));
262       export_del(ep->name);
263       set_avma(av);
264       break;
265     }
266     case PMPI_close:
267       MPI_Barrier(MPI_COMM_WORLD);
268       MPI_Finalize();
269       exit(0);
270       break;
271     }
272 }
273 
274 void
mt_err_recover(long er)275 mt_err_recover(long er)
276 {
277   if (pari_MPI_rank) longjmp(child_env,er);
278   else if (mt_issingle) mtsingle_err_recover(er);
279 }
mt_sigint_block(void)280 void mt_sigint_block(void) { }
mt_sigint_unblock(void)281 void mt_sigint_unblock(void) { }
mt_sigint(void)282 void mt_sigint(void) {}
283 
284 int
mt_is_parallel(void)285 mt_is_parallel(void)
286 {
287   return !!pari_mt;
288 }
289 
290 int
mt_is_thread(void)291 mt_is_thread(void)
292 {
293   return pari_MPI_rank ? 1 : mt_issingle ? mtsingle_is_thread(): 0;
294 }
295 
296 long
mt_nbthreads(void)297 mt_nbthreads(void)
298 {
299   return pari_mt || pari_MPI_rank || pari_MPI_size <= 2 ? 1: pari_mt_nbthreads;
300 }
301 
302 void
mt_export_add(const char * str,GEN val)303 mt_export_add(const char *str, GEN val)
304 {
305   pari_sp av = avma;
306   long i, n = pari_MPI_size-1;
307   GEN s;
308   if (pari_mt || pari_MPI_rank)
309     pari_err(e_MISC,"export not allowed during parallel sections");
310   export_add(str, val);
311   s = strtoGENstr(str);
312   for (i=1; i <= n; i++)
313   {
314     send_request(PMPI_exportadd, i);
315     send_GEN(s, i);
316     send_GEN(val, i);
317   }
318   set_avma(av);
319 }
320 
321 void
mt_export_del(const char * str)322 mt_export_del(const char *str)
323 {
324   pari_sp av = avma;
325   long i, n = pari_MPI_size-1;
326   GEN s;
327   if (pari_MPI_rank)
328     pari_err(e_MISC,"unexport not allowed during parallel sections");
329   export_del(str);
330   s = strtoGENstr(str);
331   for (i=1; i <= n; i++)
332     send_request_GEN(PMPI_exportdel, s, i);
333   set_avma(av);
334 }
335 
336 void
mt_broadcast(GEN code)337 mt_broadcast(GEN code)
338 {
339   long i;
340   if (!pari_MPI_rank && !pari_mt)
341     for (i=1;i<pari_MPI_size;i++)
342       send_request_GEN(PMPI_eval, code, i);
343 }
344 
345 void
pari_mt_init(void)346 pari_mt_init(void)
347 {
348   int res = MPI_Init(0, NULL);
349   if (res == MPI_SUCCESS)
350   {
351     MPI_Comm_size(MPI_COMM_WORLD, &pari_MPI_size);
352     MPI_Comm_rank(MPI_COMM_WORLD, &pari_MPI_rank);
353     if (pari_MPI_rank) pari_MPI_child();
354 #ifdef _IOFBF
355   /* HACK: most MPI implementation does not handle stdin well.
356   stdinsize is sufficient for the largest test file to fit */
357   setvbuf(stdin,pari_malloc(128*1024),_IOFBF,128*1024);
358 #endif
359     if (!pari_mt_nbthreads)
360       pari_mt_nbthreads = maxss(1, pari_MPI_size-1);
361   }
362   else
363   {
364     pari_MPI_size = 0;
365     pari_MPI_rank = 0;
366     pari_mt_nbthreads = 1;
367   }
368   pari_mt = NULL;
369 }
370 
371 void
pari_mt_close(void)372 pari_mt_close(void)
373 {
374   long i;
375   if (!pari_MPI_rank)
376     for (i = 1; i < pari_MPI_size; i++)
377       send_request(PMPI_close, i);
378   MPI_Barrier(MPI_COMM_WORLD);
379   MPI_Finalize();
380 }
381 
382 static GEN
mtmpi_queue_get(struct mt_state * junk,long * workid,long * pending)383 mtmpi_queue_get(struct mt_state *junk, long *workid, long *pending)
384 {
385   struct mt_mstate *mt = pari_mt;
386   GEN done;
387   (void) junk;
388   if (mt->nbint<=mt->n) { mt->source=mt->nbint; *pending = nbreq; return NULL; }
389   done = recvany_GEN(&mt->source);
390   nbreq--; *pending = nbreq;
391   if (workid) *workid = mt->workid[mt->source];
392   if (typ(done) == t_ERROR)
393   {
394     if (err_get_num(done)==e_STACK)
395       pari_err(e_STACKTHREAD);
396     else
397       pari_err(0,done);
398   }
399   return done;
400 }
401 
402 static void
mtmpi_queue_submit(struct mt_state * junk,long workid,GEN work)403 mtmpi_queue_submit(struct mt_state *junk, long workid, GEN work)
404 {
405   struct mt_mstate *mt = pari_mt;
406   (void) junk;
407   if (!work) { mt->nbint=mt->n+1; return; }
408   if (mt->nbint<=mt->n) mt->nbint++;
409   nbreq++;
410   mt->workid[mt->source] = workid;
411   send_request_GEN(PMPI_work, work, mt->source);
412 }
413 
414 void
mt_queue_reset(void)415 mt_queue_reset(void)
416 {
417   struct mt_mstate *mt = pari_mt;
418   if (DEBUGLEVEL>0 && nbreq)
419     pari_warn(warner,"%ld discarded threads (MPI)",nbreq);
420   for(  ;nbreq>0;  nbreq--) recvany_void(&mt->source);
421   pari_free(mt->workid);
422   pari_mt = NULL;
423 }
424 
425 void
mt_queue_start_lim(struct pari_mt * pt,GEN worker,long lim)426 mt_queue_start_lim(struct pari_mt *pt, GEN worker, long lim)
427 {
428   if (lim==0) lim = pari_mt_nbthreads;
429   else        lim = minss(pari_mt_nbthreads, lim);
430   if (pari_mt || pari_MPI_rank || pari_MPI_size <= 2 || lim <= 1)
431   {
432     mt_issingle = 1;
433     mtsingle_queue_start(pt, worker);
434   }
435   else
436   {
437     struct mt_mstate *mt = &pari_mt_data;
438     long i, n = minss(lim, pari_MPI_size-1);
439     long mtparisize = GP_DATA->threadsize? GP_DATA->threadsize: pari_mainstack->rsize;
440     long mtparisizemax = GP_DATA->threadsizemax;
441     pari_mt = mt;
442     mt_issingle = 0;
443     mt->workid = (long*) pari_malloc(sizeof(long)*(n+1));
444     for (i=1; i <= n; i++)
445     {
446       send_request_long(PMPI_parisize, mtparisize, i);
447       send_request_long(PMPI_parisizemax, mtparisizemax, i);
448       send_request_long(PMPI_precreal, get_localbitprec(), i);
449       send_request_vlong(PMPI_varpriority,varpriority-1,MAXVARN+2, i);
450       send_request_GEN(PMPI_primetab, primetab, i);
451       send_request_GEN(PMPI_worker, worker, i);
452     }
453     mt->n = n;
454     mt->nbint = 1;
455     mt->source = 1;
456     pt->get=&mtmpi_queue_get;
457     pt->submit=&mtmpi_queue_submit;
458     pt->end=&mt_queue_reset;
459   }
460 }
461