1 /*
2  * $Id: mpool.i,v 1.2 2011-02-11 05:25:42 dhmunro Exp $
3  * Pool of jobs function for mpy.
4  */
5 /* Copyright (c) 2010, David H. Munro.
6  * All rights reserved.
7  * This file is part of yorick (http://yorick.sourceforge.net).
8  * Read the accompanying LICENSE file for details.
9  */
10 
11 /*= SECTION() mpy pool of tasks programming paradigm =======================*/
12 
mpool(__a__)13 func mpool(__a__) /* fsow, fwork, freap, use_vsave=, self=, list=, nmax= */
14 /* DOCUMENT pool_stats = mpool(fsow, fwork, freap)
15  *   perform a pool-of-jobs parallel calculation, in which a master
16  *   process farms out jobs to many slave processes.  The FSOW function
17  *   defines one job, which mpool sends via MPI to an idle slave.  The
18  *   slave calls the FWORK function to do the job, which mpool sends
19  *   back to the master.  The master calls the FREAP function to assimilate
20  *   the result of one job.  The whole cycle repeats until FSOW signals
21  *   there are no more jobs.  The return value of mpool is a mpool_t struct
22  *   instance containing statistics about the pool.
23  *
24  *   The mpool function can be called in serial mode on rank 0, or in
25  *   parallel mode on all participating ranks (by default that is all ranks).
26  *   The FSOW, FWORK, and FREAP functions must conform to the following
27  *   templates:
28  *   func FSOW(njob, job_define_handle) {
29  *     if (no_more_jobs) return 0;
30  *     <compute parameters p1,p2,... for job number njob>
31  *     vpack, job_define_handle, p1, p2, ...;
32  *     return 1;
33  *   }
34  *   func FWORK(njob, job_define_handle, job_result_handle) {
35  *     local p1, p2, ...;
36  *     vunpack, job_define_handle, p1, p2, ...;
37  *     <do job number njob defined by p1, p2, ...>
38  *     <produce results r1, r2, ...>
39  *     vpack, job_result_handle, r1, r2, ...;
40  *   }
41  *   func FREAP(njob, job_result_handle) {
42  *     local r1, r2, ...;
43  *     vunpack, job_result_handle, r1, r2, ...;
44  *     <assimilate results r1,r2,... of job number njob>
45  *   }
46  *
47  *   Several keywords are accepted by mpool:
48  *   use_vsave=1   if FSOW, FWORK, and FREAP use vsave instead of vpack
49  *     and restore instead of vunpack.
50  *   self=1        if the FWORK function should be called on the master
51  *     process when all slaves are busy.  (This is usually not a good idea,
52  *     because many slaves can become idle while the master is working.)
53  *   list=[master_rank, slave_rank1, slave_rank2, ...]
54  *     to specify a subset of processes to participate in the pool.  If mpool
55  *     is called in serial mode, any processes not in list will be idle.  If
56  *     mpool is called in parallel mode, the list= keyword must be identical
57  *     in every participating process, but mpool will generate no message
58  *     traffic outside the listed processes.  You can use this mechanism to
59  *     permit FWORK functions to themselves call mpool.
60  *     Without list=, rank 0 is the master and all other ranks are slaves.
61  *   nmax=   do not use more than this many processes as slaves
62  *
63  *   If tsow is the time to generate a job with FSOW, twork is the time to
64  *   do a job with FWORK, and treap is the time to reap a job with FREAP,
65  *   then mpool will employ twork/tsow slaves before the first slave finishes.
66  *   In the long run, however, mpool can employ only twork/(tsow+treap) slaves
67  *   in steady state, so unless treap is negligible compared to tsow, you risk
68  *   starting more jobs than you can handle.  Since all slaves send messages
69  *   back to the master, there is a serious risk that mpool will create a
70  *   "denial of service attack" against the master process, if a large number
71  *   of slaves is available.  By default, nmax=100, but even this may be too
72  *   large, and you should consider carefully raising the limit.  You don't
73  *   want more than a couple of dozen slaves whose job results are in the
74  *   master's mp_recv queue waiting to be reaped.
75  *
76  *   The return value of mpool is meaningful only on the master process.
77  *   Among other things, it contains the following information:
78  *     pool.njobs   total number of jobs done
79  *     pool.nused   maximum number of slaves employed
80  *     pool.navg    average number of slaves employed = twork/(tsow+treap)
81  *     pool.nqmax   largest number of jobs in mp_recv queue
82  *     pool.nignore  number of non-pool messages in mp_recv queue
83  *     pool.nself   number of jobs done by master
84  *     pool.tsow    total FSOW time [cpu,sys,wall] (help,timer)
85  *     pool.twork   total FWORK time [cpu,sys,wall]
86  *     pool.treap   total FREAP time [cpu,sys,wall]
87  *
88  *   Algorithm:
89  *     if no more jobs,
90  *       if no slaves active, quit
91  *       else block until message pending from slave
92  *     if message pending from slave,
93  *       reap pending, then sow next job to that slave
94  *     else if nused<nmax
95  *       sow to new slave, increment nused
96  *     else if self work allowed
97  *       sow to self, then reap from self
98  *     else
99  *       block until message pending from slave
100  *
101  * SEE ALSO: mpool_test, mpool_stats, mp_exec, mp_recv, vpack, vsave, timer
102  */
103 {
104   local _jobi, _jobo, _pool;
105   if (mp_exec()) {
106     mp_exec, "_mpool";
107 
108   } else {
109     _mp_fsow = __a__(1);
110     _mp_fwork = __a__(2);
111     _mp_freap = __a__(3);
112     if (__a__(0)!=3 || !is_func(_mp_fsow)
113         || !is_func(_mp_fwork) || !is_func(_mp_freap))
114       error, "expecting exactly three function-valued arguments";
115 
116     __b__ = __a__("list");
117     __c__ = numberof(__b__)? __b__(1) : 0;
118     if (__c__==mp_rank) __f__ = _mpool_fmaster;
119     else __f__ = _mpool_fslave;
120     _pool = mpool_t(master=__c__, list=&__b__, vsave=!(!__a__("use_vsave")),
121                     self=!(!__a__("self")));
122     __c__ = __a__("nmax");
123     if (__c__) _pool.nmax = __c__;
124     if (!numberof(__b__) || anyof(__b__==mp_rank)) __f__, _pool;
125   }
126 
127   return _pool;
128 }
129 wrap_args, mpool;
130 
131 func _mpool /* parallel task that mpool launches in serial mode */
132 {
133   if (!mp_rank) {
134     /* mpool __a__ argument available as extern on rank 0 */
135     __b__ = __a__(-,1);
136     if (!__b__) __b__ = nameof(__a__(1));
137     __c__ = __a__(-,2);
138     if (!__c__) __c__ = nameof(__a__(2));
139     __c__ = "_pool=mpool("+__b__+","+__c__+",";
140     __b__ = __a__(-,3);
141     if (!__b__) __b__ = nameof(__a__(3));
142     __c__ += __b__;
143     __b__ = __a__("use_vsave");
144     if (!is_void(__b__)) __c__ += ",use_vsave="+print(__b__)(1);
145     __b__ = __a__("nmax");
146     if (!is_void(__b__)) __c__ += ",nmax="+print(__b__)(1);
147     __b__ = __a__("self");
148     if (!is_void(__b__)) __c__ += ",self="+print(__b__)(1);
149     __b__ = __a__("list");
150     if (!is_void(__b__)) __c__ += ",list=__b__";
151     __c__ += ")";
152     __c__ = vpack(__b__, __c__);
153   }
154   mp_handout, __c__;
155   vunpack, __c__, __b__, __c__;
156   include, [__c__], 1;
157   if (_pool.master) {
158     if (mp_rank == _pool.master) mp_send, 0, 0;
159     if (!mp_rank) __c__ = mp_recv(_pool.master);
160   }
161 }
162 
163 struct mpool_t {
164   long master;
165   pointer list;
166   long nmax;
167   int self, vsave;
168 
169   int sow;   /* flag set if op is sow, clear if op is reap */
170   int state; /* for _mpool_active */
171   long slave, njobs, nused, nself, nqmax, nignore, nactive;
172   double navg, t0(3), tsow(3), twork(3), treap(3);
173 };
174 
_mpool_fmaster(_pool)175 func _mpool_fmaster(_pool)
176 {
177   /* _jobi, _jobo local to mpool */
178   extern _jobi, _jobo;
179   while (_mpool_active(_pool)) {
180     if (_pool.sow) {
181       _mpool_timer, _pool, 0;
182       _jobi = _mpool_create(_pool, ++_pool.njobs);
183       __c__ = _mp_fsow(_pool.njobs, _jobi);
184       if (!__c__) --_pool.njobs;
185       _jobi = _pool.vsave? vsave(_jobi) : vpack(_jobi);
186       _mpool_timer, _pool, 1;
187       if (__c__) {
188         if (_pool.slave!=mp_rank) mp_send, _pool.slave, _jobi;
189         else _mpool_fslave, _pool;
190       } else {
191         if (_pool.slave!=mp_rank) --_pool.nactive;  /* did no sow */
192         _pool.state = 20;
193       }
194     } else {
195       _mpool_timer, _pool, 0;
196       if (_pool.slave!=mp_rank) mp_recv, _pool.slave, _jobo;
197       __c__ = _mpool_open(_pool, _jobo);
198       _mp_freap, __c__, _jobo;
199       _mpool_timer, _pool, 3;
200     }
201   }
202 }
203 
_mpool_fslave(_pool)204 func _mpool_fslave(_pool)
205 {
206   /* _jobi, _jobo local to mpool */
207   extern _jobi, _jobo;
208   while (_mpool_active(_pool)) {
209     _mpool_timer, _pool, 0;
210     if (_pool.master!=mp_rank) mp_recv, _pool.master, _jobi;
211     __c__ = _mpool_open(_pool, _jobi);
212     if (__c__ > 0) {
213       _jobo = _mpool_create(_pool, __c__);
214       _mp_fwork, __c__, _jobi, _jobo;
215       _jobo = _pool.vsave? vsave(_jobo) : vpack(_jobo);
216       if (_pool.master!=mp_rank) mp_send, _pool.master, _jobo;
217     } else {
218       _pool.state = 30;
219     }
220     _mpool_timer, _pool, 2;
221   }
222 }
223 
_mpool_create(_pool,__f__)224 func _mpool_create(_pool, __f__)
225 {
226   if (_pool.vsave) {
227     __c__ = createb(char);
228     vsave, __c__, "_mpool", __f__;
229   } else {
230     __c__ = vopen(,1);
231     vpack, __c__, __f__;
232   }
233   return __c__
234 }
235 
236 func _mpool_open(_pool, &__f__)
237 {
238   if (_pool.vsave) {
239     __f__ = openb(__f__);
240     __c__ = __f__._mpool;
241   } else {
242     __c__ = vunpack(__f__, -);
243   }
244   return __c__;
245 }
246 
247 if (!mpool_nmax0) mpool_nmax0 = 100;
248 func _mpool_active(_pool)
249 {
250   if (!_pool.state) {  /* not initialized */
251     if (!_pool.nmax) _pool.nmax = min(mp_size-1, mpool_nmax0);
252     if (_pool.list) _pool.nmax = min(_pool.nmax, numberof(*_pool.list)-1);
253     _pool.state = (mp_rank==_pool.master)? 1 : 11;
254     return _mpool_active(_pool);
255 
256   } else if (_pool.state == 1) { /* just did non-self sow or self reap */
257     unused = (_pool.nused < _pool.nmax);
258     if (_pool.nactive &&
259         _mpool_ready(_pool, !unused && !_pool.self)) {
260       /* ready to reap a job */
261       _pool.sow = 0;
262       _pool.state = 2;
263     } else {
264       /* sow another job without reaping any */
265       _pool.sow = 1;
266       if (unused) {
267         ++_pool.nactive;
268         n = ++_pool.nused;
269         _pool.slave = _pool.list? (*_pool.list)(1+n) : n;
270       } else /* _pool.self */ {
271         ++_pool.nself;
272         _pool.slave = mp_rank;
273         _pool.state = 12;  /* ready for fslave */
274       }
275     }
276 
277   } else if (_pool.state == 2) { /* just did non-self reap */
278     /* sow immediately to rank just reaped */
279     _pool.sow = 1;
280     _pool.state = 1;
281 
282   } else if (_pool.state == 3) { /* just did self sow, work */
283     _pool.sow = 0;
284     _pool.state = 1;
285     /* _jobo set by fwork */
286 
287   } else if (_pool.state == 11) { /* loop in fslave */
288 
289   } else if (_pool.state == 12) { /* did self sow */
290     _pool.state = 13;
291   } else if (_pool.state == 13) { /* did self work */
292     _pool.state = 3;
293     return 0;
294 
295   } else if (_pool.state == 20) { /* no more jobs */
296     if (_pool.nactive) {
297       _mpool_ready, _pool, 1;
298       --_pool.nactive;
299       _pool.sow = 0;
300     } else {
301       /* need to shut down all slaves */
302       _jobi = _jobo = [];
303       _jobi = _mpool_create(_pool, 0);
304       _jobi = _pool.vsave? vsave(_jobi) : vpack(_jobi);
305       /* send shutdown like mp_handout, confirm like mp_handin */
306       staff = _pool.list? *_pool.list : indgen(0:(mp_size?mp_size-1:0));
307       staff = (numberof(staff)>1)? staff(2:) : [];
308       if (numberof(staff)) {
309         mp_send, staff, _jobi;
310         /* collect timing data, confirm shutdown of all slaves */
311         for (i=1 ; i<=numberof(staff) ; ++i)
312           _pool.twork += mp_recv(staff(i));
313       }
314       /* compute navg */
315       _pool.navg = _mpool_navg(_pool.tsow, _pool.twork, _pool.treap);
316       return 0;  /* exit fmaster */
317     }
318 
319   } else if (_pool.state == 30) { /* shut down slave */
320     _jobi = _jobo = [];
321     /* send timing info back to boss and exit fslave */
322     mp_send, _pool.master, _pool.twork;
323     return 0;
324 
325   } else {
326     error, "pool state corrupted";
327   }
328   return 1;
329 }
330 
331 func _mpool_ready(_pool, block)
332 {
333   /* test or wait for message from any slave */
334   local list;
335   eq_nocopy, list, *_pool.list;
336   for (;;) {
337     i = _pool.nignore;
338     q = mp_probe(block && !i);
339     if (numberof(q) <= i) {
340       if (!block) return 0;
341       q = mp_probe(2);
342     }
343     _pool.nqmax = max(numberof(q)-i, _pool.nqmax);
344     if (is_void(list)) {
345       q = q(1);
346     } else {
347       for (++i ; i<=numberof(q) ; ++i) if (anyof(list==q(i))) break;
348       _pool.nignore = i-1;
349       if (i > numberof(q)) {
350         if (!block) return 0;
351         continue;
352       }
353       q = q(i);
354     }
355     break;
356   }
357 
358   /* message ready to reap on rank q */
359   _pool.slave = q;
360   return 1;
361 }
362 
363 func _mpool_navg(tsow, twork, treap)
364 {
365   /* compute navg = steady state number of slaves */
366   t = tsow(3) + treap(3);
367   return t? twork(3)/t : 0.0;
368 }
369 
370 func mpool_test(_mp_fsow, _mp_fwork, _mp_freap,
371                 use_vsave=, self=, list=, nmax=)
372 /* DOCUMENT pool_stats = mpool_test(fsow, fwork, freap)
373  *   test FSOW, FWORK, and FREAP functions by conducting an mpool
374  *   on a single processor in serial mode.  This can be run in ordinary
375  *   serial yorick; it does not require mpy.  The mpool_test function
376  *   accepts and uses the use_vsave= keyword.  The other mpool keywords
377  *   are accepted but ignored.
378  * SEE ALSO: mpool, mpool_stats
379  */
380 {
381   extern vsave;
382   _pool = mpool_t(vsave=!(!use_vsave), self=1);
383 
384   for (;;) {
385     _mpool_timer, _pool, 0;
386     _jobi = _mpool_create(_pool, ++_pool.njobs);
387     __c__ = _mp_fsow(_pool.njobs, _jobi);
388     if (!__c__) --_pool.njobs;
389     _jobi = _pool.vsave? vsave(_jobi) : vpack(_jobi);
390     _mpool_timer, _pool, 1;
391 
392     _mpool_timer, _pool, 0;
393     __b__ = _mpool_open(_pool, _jobi);
394     if (__c__ > 0) {
395       _jobo = _mpool_create(_pool, __b__);
396       _mp_fwork, __b__, _jobi, _jobo;
397       _jobo = _pool.vsave? vsave(_jobo) : vpack(_jobo);
398     }
399     _mpool_timer, _pool, 2;
400     if (!__c__) break;
401 
402     _mpool_timer, _pool, 0;
403     __c__ = _mpool_open(_pool, _jobo);
404     _mp_freap, __c__, _jobo;
405     _mpool_timer, _pool, 3;
406   }
407   _pool.nself = _pool.njobs;
408   _pool.navg = _mpool_navg(_pool.tsow, _pool.twork, _pool.treap);
409   return _pool;
410 }
411 
412 /* work around timer misfeature */
413 func _mpool_timer(_pool, __a__)
414 {
415   if (!__a__) {
416     t=_pool.t0; timer,t; _pool.t0=t;
417   } else if (__a__==1) {
418     t=_pool.t0; u=_pool.tsow; timer,t,u; _pool.t0=t; _pool.tsow=u;
419   } else if (__a__==2) {
420     t=_pool.t0; u=_pool.twork; timer,t,u; _pool.t0=t; _pool.twork=u;
421   } else {
422     t=_pool.t0; u=_pool.treap; timer,t,u; _pool.t0=t; _pool.treap=u;
423   }
424 }
425 
426 func mpool_stats(_pool)
427 /* DOCUMENT mpool_stats, pool_stats
428  *   print statistics from POOL_STATS returned by mpool or mpool_test.
429  * SEE ALSO: mpool, mpool_test
430  */
431 {
432   f1 = "did %ld jobs employing %ld slaves\n";
433   f2 = "    %ld jobs done by master\n";
434   f3 = "task   CPU (s)     SYS (s)     WALL(s)\n";
435   f4 = " sow  %10.3e  %10.3e  %10.3e\n";
436   f5 = "work  %10.3e  %10.3e  %10.3e\n";
437   f6 = "reap  %10.3e  %10.3e  %10.3e\n";
438   f7 = "could employ %.1f slaves in steady state\n";
439   write, format=f1, _pool.njobs, _pool.nused;
440   if (_pool.self) write, format=f2, _pool.nself;
441   write, format="%s", f3;
442   write, format=f4, _pool.tsow(1), _pool.tsow(2), _pool.tsow(3);
443   write, format=f5, _pool.twork(1), _pool.twork(2), _pool.twork(3);
444   write, format=f6, _pool.treap(1), _pool.treap(2), _pool.treap(3);
445   write, format=f7, _pool.navg;
446 }
447