1 /*
2 * $Id: mpool.i,v 1.2 2011-02-11 05:25:42 dhmunro Exp $
3 * Pool of jobs function for mpy.
4 */
5 /* Copyright (c) 2010, David H. Munro.
6 * All rights reserved.
7 * This file is part of yorick (http://yorick.sourceforge.net).
8 * Read the accompanying LICENSE file for details.
9 */
10
11 /*= SECTION() mpy pool of tasks programming paradigm =======================*/
12
mpool(__a__)13 func mpool(__a__) /* fsow, fwork, freap, use_vsave=, self=, list=, nmax= */
14 /* DOCUMENT pool_stats = mpool(fsow, fwork, freap)
15 * perform a pool-of-jobs parallel calculation, in which a master
16 * process farms out jobs to many slave processes. The FSOW function
17 * defines one job, which mpool sends via MPI to an idle slave. The
18 * slave calls the FWORK function to do the job, which mpool sends
19 * back to the master. The master calls the FREAP function to assimilate
20 * the result of one job. The whole cycle repeats until FSOW signals
21 * there are no more jobs. The return value of mpool is a mpool_t struct
22 * instance containing statistics about the pool.
23 *
24 * The mpool function can be called in serial mode on rank 0, or in
25 * parallel mode on all participating ranks (by default that is all ranks).
26 * The FSOW, FWORK, and FREAP functions must conform to the following
27 * templates:
28 * func FSOW(njob, job_define_handle) {
29 * if (no_more_jobs) return 0;
30 * <compute parameters p1,p2,... for job number njob>
31 * vpack, job_define_handle, p1, p2, ...;
32 * return 1;
33 * }
34 * func FWORK(njob, job_define_handle, job_result_handle) {
35 * local p1, p2, ...;
36 * vunpack, job_define_handle, p1, p2, ...;
37 * <do job number njob defined by p1, p2, ...>
38 * <produce results r1, r2, ...>
39 * vpack, job_result_handle, r1, r2, ...;
40 * }
41 * func FREAP(njob, job_result_handle) {
42 * local r1, r2, ...;
43 * vunpack, job_result_handle, r1, r2, ...;
44 * <assimilate results r1,r2,... of job number njob>
45 * }
46 *
47 * Several keywords are accepted by mpool:
48 * use_vsave=1 if FSOW, FWORK, and FREAP use vsave instead of vpack
49 * and restore instead of vunpack.
50 * self=1 if the FWORK function should be called on the master
51 * process when all slaves are busy. (This is usually not a good idea,
52 * because many slaves can become idle while the master is working.)
53 * list=[master_rank, slave_rank1, slave_rank2, ...]
54 * to specify a subset of processes to participate in the pool. If mpool
55 * is called in serial mode, any processes not in list will be idle. If
56 * mpool is called in parallel mode, the list= keyword must be identical
57 * in every participating process, but mpool will generate no message
58 * traffic outside the listed processes. You can use this mechanism to
59 * permit FWORK functions to themselves call mpool.
60 * Without list=, rank 0 is the master and all other ranks are slaves.
61 * nmax= do not use more than this many processes as slaves
62 *
63 * If tsow is the time to generate a job with FSOW, twork is the time to
64 * do a job with FWORK, and treap is the time to reap a job with FREAP,
65 * then mpool will employ twork/tsow slaves before the first slave finishes.
66 * In the long run, however, mpool can employ only twork/(tsow+treap) slaves
67 * in steady state, so unless treap is negligible compared to tsow, you risk
68 * starting more jobs than you can handle. Since all slaves send messages
69 * back to the master, there is a serious risk that mpool will create a
70 * "denial of service attack" against the master process, if a large number
71 * of slaves is available. By default, nmax=100, but even this may be too
72 * large, and you should consider carefully raising the limit. You don't
73 * want more than a couple of dozen slaves whose job results are in the
74 * master's mp_recv queue waiting to be reaped.
75 *
76 * The return value of mpool is meaningful only on the master process.
77 * Among other things, it contains the following information:
78 * pool.njobs total number of jobs done
79 * pool.nused maximum number of slaves employed
80 * pool.navg average number of slaves employed = twork/(tsow+treap)
81 * pool.nqmax largest number of jobs in mp_recv queue
82 * pool.nignore number of non-pool messages in mp_recv queue
83 * pool.nself number of jobs done by master
84 * pool.tsow total FSOW time [cpu,sys,wall] (help,timer)
85 * pool.twork total FWORK time [cpu,sys,wall]
86 * pool.treap total FREAP time [cpu,sys,wall]
87 *
88 * Algorithm:
89 * if no more jobs,
90 * if no slaves active, quit
91 * else block until message pending from slave
92 * if message pending from slave,
93 * reap pending, then sow next job to that slave
94 * else if nused<nmax
95 * sow to new slave, increment nused
96 * else if self work allowed
97 * sow to self, then reap from self
98 * else
99 * block until message pending from slave
100 *
101 * SEE ALSO: mpool_test, mpool_stats, mp_exec, mp_recv, vpack, vsave, timer
102 */
103 {
104 local _jobi, _jobo, _pool;
105 if (mp_exec()) {
106 mp_exec, "_mpool";
107
108 } else {
109 _mp_fsow = __a__(1);
110 _mp_fwork = __a__(2);
111 _mp_freap = __a__(3);
112 if (__a__(0)!=3 || !is_func(_mp_fsow)
113 || !is_func(_mp_fwork) || !is_func(_mp_freap))
114 error, "expecting exactly three function-valued arguments";
115
116 __b__ = __a__("list");
117 __c__ = numberof(__b__)? __b__(1) : 0;
118 if (__c__==mp_rank) __f__ = _mpool_fmaster;
119 else __f__ = _mpool_fslave;
120 _pool = mpool_t(master=__c__, list=&__b__, vsave=!(!__a__("use_vsave")),
121 self=!(!__a__("self")));
122 __c__ = __a__("nmax");
123 if (__c__) _pool.nmax = __c__;
124 if (!numberof(__b__) || anyof(__b__==mp_rank)) __f__, _pool;
125 }
126
127 return _pool;
128 }
129 wrap_args, mpool;
130
131 func _mpool /* parallel task that mpool launches in serial mode */
132 {
133 if (!mp_rank) {
134 /* mpool __a__ argument available as extern on rank 0 */
135 __b__ = __a__(-,1);
136 if (!__b__) __b__ = nameof(__a__(1));
137 __c__ = __a__(-,2);
138 if (!__c__) __c__ = nameof(__a__(2));
139 __c__ = "_pool=mpool("+__b__+","+__c__+",";
140 __b__ = __a__(-,3);
141 if (!__b__) __b__ = nameof(__a__(3));
142 __c__ += __b__;
143 __b__ = __a__("use_vsave");
144 if (!is_void(__b__)) __c__ += ",use_vsave="+print(__b__)(1);
145 __b__ = __a__("nmax");
146 if (!is_void(__b__)) __c__ += ",nmax="+print(__b__)(1);
147 __b__ = __a__("self");
148 if (!is_void(__b__)) __c__ += ",self="+print(__b__)(1);
149 __b__ = __a__("list");
150 if (!is_void(__b__)) __c__ += ",list=__b__";
151 __c__ += ")";
152 __c__ = vpack(__b__, __c__);
153 }
154 mp_handout, __c__;
155 vunpack, __c__, __b__, __c__;
156 include, [__c__], 1;
157 if (_pool.master) {
158 if (mp_rank == _pool.master) mp_send, 0, 0;
159 if (!mp_rank) __c__ = mp_recv(_pool.master);
160 }
161 }
162
163 struct mpool_t {
164 long master;
165 pointer list;
166 long nmax;
167 int self, vsave;
168
169 int sow; /* flag set if op is sow, clear if op is reap */
170 int state; /* for _mpool_active */
171 long slave, njobs, nused, nself, nqmax, nignore, nactive;
172 double navg, t0(3), tsow(3), twork(3), treap(3);
173 };
174
_mpool_fmaster(_pool)175 func _mpool_fmaster(_pool)
176 {
177 /* _jobi, _jobo local to mpool */
178 extern _jobi, _jobo;
179 while (_mpool_active(_pool)) {
180 if (_pool.sow) {
181 _mpool_timer, _pool, 0;
182 _jobi = _mpool_create(_pool, ++_pool.njobs);
183 __c__ = _mp_fsow(_pool.njobs, _jobi);
184 if (!__c__) --_pool.njobs;
185 _jobi = _pool.vsave? vsave(_jobi) : vpack(_jobi);
186 _mpool_timer, _pool, 1;
187 if (__c__) {
188 if (_pool.slave!=mp_rank) mp_send, _pool.slave, _jobi;
189 else _mpool_fslave, _pool;
190 } else {
191 if (_pool.slave!=mp_rank) --_pool.nactive; /* did no sow */
192 _pool.state = 20;
193 }
194 } else {
195 _mpool_timer, _pool, 0;
196 if (_pool.slave!=mp_rank) mp_recv, _pool.slave, _jobo;
197 __c__ = _mpool_open(_pool, _jobo);
198 _mp_freap, __c__, _jobo;
199 _mpool_timer, _pool, 3;
200 }
201 }
202 }
203
_mpool_fslave(_pool)204 func _mpool_fslave(_pool)
205 {
206 /* _jobi, _jobo local to mpool */
207 extern _jobi, _jobo;
208 while (_mpool_active(_pool)) {
209 _mpool_timer, _pool, 0;
210 if (_pool.master!=mp_rank) mp_recv, _pool.master, _jobi;
211 __c__ = _mpool_open(_pool, _jobi);
212 if (__c__ > 0) {
213 _jobo = _mpool_create(_pool, __c__);
214 _mp_fwork, __c__, _jobi, _jobo;
215 _jobo = _pool.vsave? vsave(_jobo) : vpack(_jobo);
216 if (_pool.master!=mp_rank) mp_send, _pool.master, _jobo;
217 } else {
218 _pool.state = 30;
219 }
220 _mpool_timer, _pool, 2;
221 }
222 }
223
_mpool_create(_pool,__f__)224 func _mpool_create(_pool, __f__)
225 {
226 if (_pool.vsave) {
227 __c__ = createb(char);
228 vsave, __c__, "_mpool", __f__;
229 } else {
230 __c__ = vopen(,1);
231 vpack, __c__, __f__;
232 }
233 return __c__
234 }
235
236 func _mpool_open(_pool, &__f__)
237 {
238 if (_pool.vsave) {
239 __f__ = openb(__f__);
240 __c__ = __f__._mpool;
241 } else {
242 __c__ = vunpack(__f__, -);
243 }
244 return __c__;
245 }
246
247 if (!mpool_nmax0) mpool_nmax0 = 100;
248 func _mpool_active(_pool)
249 {
250 if (!_pool.state) { /* not initialized */
251 if (!_pool.nmax) _pool.nmax = min(mp_size-1, mpool_nmax0);
252 if (_pool.list) _pool.nmax = min(_pool.nmax, numberof(*_pool.list)-1);
253 _pool.state = (mp_rank==_pool.master)? 1 : 11;
254 return _mpool_active(_pool);
255
256 } else if (_pool.state == 1) { /* just did non-self sow or self reap */
257 unused = (_pool.nused < _pool.nmax);
258 if (_pool.nactive &&
259 _mpool_ready(_pool, !unused && !_pool.self)) {
260 /* ready to reap a job */
261 _pool.sow = 0;
262 _pool.state = 2;
263 } else {
264 /* sow another job without reaping any */
265 _pool.sow = 1;
266 if (unused) {
267 ++_pool.nactive;
268 n = ++_pool.nused;
269 _pool.slave = _pool.list? (*_pool.list)(1+n) : n;
270 } else /* _pool.self */ {
271 ++_pool.nself;
272 _pool.slave = mp_rank;
273 _pool.state = 12; /* ready for fslave */
274 }
275 }
276
277 } else if (_pool.state == 2) { /* just did non-self reap */
278 /* sow immediately to rank just reaped */
279 _pool.sow = 1;
280 _pool.state = 1;
281
282 } else if (_pool.state == 3) { /* just did self sow, work */
283 _pool.sow = 0;
284 _pool.state = 1;
285 /* _jobo set by fwork */
286
287 } else if (_pool.state == 11) { /* loop in fslave */
288
289 } else if (_pool.state == 12) { /* did self sow */
290 _pool.state = 13;
291 } else if (_pool.state == 13) { /* did self work */
292 _pool.state = 3;
293 return 0;
294
295 } else if (_pool.state == 20) { /* no more jobs */
296 if (_pool.nactive) {
297 _mpool_ready, _pool, 1;
298 --_pool.nactive;
299 _pool.sow = 0;
300 } else {
301 /* need to shut down all slaves */
302 _jobi = _jobo = [];
303 _jobi = _mpool_create(_pool, 0);
304 _jobi = _pool.vsave? vsave(_jobi) : vpack(_jobi);
305 /* send shutdown like mp_handout, confirm like mp_handin */
306 staff = _pool.list? *_pool.list : indgen(0:(mp_size?mp_size-1:0));
307 staff = (numberof(staff)>1)? staff(2:) : [];
308 if (numberof(staff)) {
309 mp_send, staff, _jobi;
310 /* collect timing data, confirm shutdown of all slaves */
311 for (i=1 ; i<=numberof(staff) ; ++i)
312 _pool.twork += mp_recv(staff(i));
313 }
314 /* compute navg */
315 _pool.navg = _mpool_navg(_pool.tsow, _pool.twork, _pool.treap);
316 return 0; /* exit fmaster */
317 }
318
319 } else if (_pool.state == 30) { /* shut down slave */
320 _jobi = _jobo = [];
321 /* send timing info back to boss and exit fslave */
322 mp_send, _pool.master, _pool.twork;
323 return 0;
324
325 } else {
326 error, "pool state corrupted";
327 }
328 return 1;
329 }
330
331 func _mpool_ready(_pool, block)
332 {
333 /* test or wait for message from any slave */
334 local list;
335 eq_nocopy, list, *_pool.list;
336 for (;;) {
337 i = _pool.nignore;
338 q = mp_probe(block && !i);
339 if (numberof(q) <= i) {
340 if (!block) return 0;
341 q = mp_probe(2);
342 }
343 _pool.nqmax = max(numberof(q)-i, _pool.nqmax);
344 if (is_void(list)) {
345 q = q(1);
346 } else {
347 for (++i ; i<=numberof(q) ; ++i) if (anyof(list==q(i))) break;
348 _pool.nignore = i-1;
349 if (i > numberof(q)) {
350 if (!block) return 0;
351 continue;
352 }
353 q = q(i);
354 }
355 break;
356 }
357
358 /* message ready to reap on rank q */
359 _pool.slave = q;
360 return 1;
361 }
362
363 func _mpool_navg(tsow, twork, treap)
364 {
365 /* compute navg = steady state number of slaves */
366 t = tsow(3) + treap(3);
367 return t? twork(3)/t : 0.0;
368 }
369
370 func mpool_test(_mp_fsow, _mp_fwork, _mp_freap,
371 use_vsave=, self=, list=, nmax=)
372 /* DOCUMENT pool_stats = mpool_test(fsow, fwork, freap)
373 * test FSOW, FWORK, and FREAP functions by conducting an mpool
374 * on a single processor in serial mode. This can be run in ordinary
375 * serial yorick; it does not require mpy. The mpool_test function
376 * accepts and uses the use_vsave= keyword. The other mpool keywords
377 * are accepted but ignored.
378 * SEE ALSO: mpool, mpool_stats
379 */
380 {
381 extern vsave;
382 _pool = mpool_t(vsave=!(!use_vsave), self=1);
383
384 for (;;) {
385 _mpool_timer, _pool, 0;
386 _jobi = _mpool_create(_pool, ++_pool.njobs);
387 __c__ = _mp_fsow(_pool.njobs, _jobi);
388 if (!__c__) --_pool.njobs;
389 _jobi = _pool.vsave? vsave(_jobi) : vpack(_jobi);
390 _mpool_timer, _pool, 1;
391
392 _mpool_timer, _pool, 0;
393 __b__ = _mpool_open(_pool, _jobi);
394 if (__c__ > 0) {
395 _jobo = _mpool_create(_pool, __b__);
396 _mp_fwork, __b__, _jobi, _jobo;
397 _jobo = _pool.vsave? vsave(_jobo) : vpack(_jobo);
398 }
399 _mpool_timer, _pool, 2;
400 if (!__c__) break;
401
402 _mpool_timer, _pool, 0;
403 __c__ = _mpool_open(_pool, _jobo);
404 _mp_freap, __c__, _jobo;
405 _mpool_timer, _pool, 3;
406 }
407 _pool.nself = _pool.njobs;
408 _pool.navg = _mpool_navg(_pool.tsow, _pool.twork, _pool.treap);
409 return _pool;
410 }
411
412 /* work around timer misfeature */
413 func _mpool_timer(_pool, __a__)
414 {
415 if (!__a__) {
416 t=_pool.t0; timer,t; _pool.t0=t;
417 } else if (__a__==1) {
418 t=_pool.t0; u=_pool.tsow; timer,t,u; _pool.t0=t; _pool.tsow=u;
419 } else if (__a__==2) {
420 t=_pool.t0; u=_pool.twork; timer,t,u; _pool.t0=t; _pool.twork=u;
421 } else {
422 t=_pool.t0; u=_pool.treap; timer,t,u; _pool.t0=t; _pool.treap=u;
423 }
424 }
425
426 func mpool_stats(_pool)
427 /* DOCUMENT mpool_stats, pool_stats
428 * print statistics from POOL_STATS returned by mpool or mpool_test.
429 * SEE ALSO: mpool, mpool_test
430 */
431 {
432 f1 = "did %ld jobs employing %ld slaves\n";
433 f2 = " %ld jobs done by master\n";
434 f3 = "task CPU (s) SYS (s) WALL(s)\n";
435 f4 = " sow %10.3e %10.3e %10.3e\n";
436 f5 = "work %10.3e %10.3e %10.3e\n";
437 f6 = "reap %10.3e %10.3e %10.3e\n";
438 f7 = "could employ %.1f slaves in steady state\n";
439 write, format=f1, _pool.njobs, _pool.nused;
440 if (_pool.self) write, format=f2, _pool.nself;
441 write, format="%s", f3;
442 write, format=f4, _pool.tsow(1), _pool.tsow(2), _pool.tsow(3);
443 write, format=f5, _pool.twork(1), _pool.twork(2), _pool.twork(3);
444 write, format=f6, _pool.treap(1), _pool.treap(2), _pool.treap(3);
445 write, format=f7, _pool.navg;
446 }
447