1 /** @file threads.c
2  *
3  *  Routines for the interface of FORM with the pthreads library
4  *
5  *	This is the main part of the parallelization of TFORM.
6  *	It is important to also look in the files structs.h and variable.h
7  *	because the treatment of the A and B structs is essential (these
8  *	structs are used by means of the macros AM, AP, AC, AS, AR, AT, AN,
9  *	AO and AX). Also the definitions and use of the macros BHEAD and PHEAD
10  *	should be looked up.
11  *
12  *	The sources are set up in such a way that if WITHPTHREADS isn't defined
13  *	there is no trace of pthread parallelization.
14  *	The reason is that TFORM is far more memory hungry than sequential FORM.
15  *
16  *	Special attention should also go to the locks. The proper use of the
17  *	locks is essential and determines whether TFORM can work at all.
18  *	We use the LOCK/UNLOCK macros which are empty in the case of sequential FORM
19  *	These locks are at many places in the source files when workers can
20  *	interfere with each others data or with the data of the master.
21  */
22 /* #[ License : */
23 /*
24  *   Copyright (C) 1984-2017 J.A.M. Vermaseren
25  *   When using this file you are requested to refer to the publication
26  *   J.A.M.Vermaseren "New features of FORM" math-ph/0010025
27  *   This is considered a matter of courtesy as the development was paid
28  *   for by FOM the Dutch physics granting agency and we would like to
29  *   be able to track its scientific use to convince FOM of its value
30  *   for the community.
31  *
32  *   This file is part of FORM.
33  *
34  *   FORM is free software: you can redistribute it and/or modify it under the
35  *   terms of the GNU General Public License as published by the Free Software
36  *   Foundation, either version 3 of the License, or (at your option) any later
37  *   version.
38  *
39  *   FORM is distributed in the hope that it will be useful, but WITHOUT ANY
40  *   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
41  *   FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
42  *   details.
43  *
44  *   You should have received a copy of the GNU General Public License along
45  *   with FORM.  If not, see <http://www.gnu.org/licenses/>.
46  */
47 /* #] License : */
48 
49 #ifdef WITHPTHREADS
50 
51 #define WHOLEBRACKETS
52 /*
53   	#[ Variables :
54 
55 	The sortbot additions are from 17-may-2007 and after. They consitute
56 	an attempt to make the final merge sorting faster for the master.
57 	This way the master has only one compare per term.
58 	It does add some complexity, but the final merge routine (MasterMerge)
59 	is much simpler for the sortbots. On the other hand the original merging is
60 	for a large part a copy of the MergePatches routine in sort.c and hence
61 	even though complex the bad part has been thoroughly debugged.
62 */
63 
64 #include "form3.h"
65 
66 static int numberofthreads;
67 static int numberofworkers;
68 static int identityofthreads = 0;
69 static int *listofavailables;
70 static int topofavailables = 0;
71 static pthread_key_t identitykey;
72 static INILOCK(numberofthreadslock)
73 static INILOCK(availabilitylock)
74 static pthread_t *threadpointers = 0;
75 static pthread_mutex_t *wakeuplocks;
76 static pthread_mutex_t *wakeupmasterthreadlocks;
77 static pthread_cond_t *wakeupconditions;
78 static pthread_condattr_t *wakeupconditionattributes;
79 static int *wakeup;
80 static int *wakeupmasterthread;
81 static INILOCK(wakeupmasterlock)
82 static pthread_cond_t wakeupmasterconditions = PTHREAD_COND_INITIALIZER;
83 static pthread_cond_t *wakeupmasterthreadconditions;
84 static int wakeupmaster = 0;
85 static int identityretval;
86 /* static INILOCK(clearclocklock) */
87 static LONG *timerinfo;
88 static LONG *sumtimerinfo;
89 static int numberclaimed;
90 
91 static THREADBUCKET **threadbuckets, **freebuckets;
92 static int numthreadbuckets;
93 static int numberoffullbuckets;
94 
95 /* static int numberbusy = 0; */
96 
97 INILOCK(dummylock)
98 INIRWLOCK(dummyrwlock)
99 static pthread_cond_t dummywakeupcondition = PTHREAD_COND_INITIALIZER;
100 
101 #ifdef WITHSORTBOTS
102 static POSITION SortBotPosition;
103 static int numberofsortbots;
104 static INILOCK(wakeupsortbotlock)
105 static pthread_cond_t wakeupsortbotconditions = PTHREAD_COND_INITIALIZER;
106 static int topsortbotavailables = 0;
107 static LONG numberofterms;
108 #endif
109 
110 /*
111   	#] Variables :
112   	#[ Identity :
113  		#[ StartIdentity :
114 */
115 /**
116  *	To be called once when we start up the threads.
117  *	Starts our identity administration.
118  */
119 
StartIdentity()120 void StartIdentity()
121 {
122 	pthread_key_create(&identitykey,FinishIdentity);
123 }
124 
125 /*
126  		#] StartIdentity :
127  		#[ FinishIdentity :
128 */
129 /**
130  *	The library needs a finishing routine
131  */
132 
FinishIdentity(void * keyp)133 void FinishIdentity(void *keyp)
134 {
135 	DUMMYUSE(keyp);
136 /*	free(keyp); */
137 }
138 
139 /*
140  		#] FinishIdentity :
141  		#[ SetIdentity :
142 */
143 /**
144  *	Assigns an integer value to a thread, starting at zero.
145  */
146 
SetIdentity(int * identityretval)147 int SetIdentity(int *identityretval)
148 {
149 /*
150 #ifdef _MSC_VER
151 	printf("addr %d\n",&numberofthreadslock);
152 	printf("size %d\n",sizeof(numberofthreadslock));
153 #endif
154 */
155 	LOCK(numberofthreadslock);
156 	*identityretval = identityofthreads++;
157 	UNLOCK(numberofthreadslock);
158 	pthread_setspecific(identitykey,(void *)identityretval);
159 	return(*identityretval);
160 }
161 
162 /*
163  		#] SetIdentity :
164  		#[ WhoAmI :
165 */
166 
167 /**
168  *	Returns the number of the current thread in our administration
169  *
170  *	This routine is to be called in routines that need access to the thread
171  *	specific data and that don't get their B-struct passed as an argument.
172  *	Routines that get called frequently need their B-struct passed.
173  *	This is done with BHEAD and the argumentfield gets declared with
174  *	one of the BARG macros rather than the ARG macros.
175  */
176 
WhoAmI()177 int WhoAmI()
178 {
179 	int *identity;
180 /*
181 	First a fast exit for when there is at most one thread
182 */
183 	if ( identityofthreads <= 1 ) return(0);
184 /*
185 	Now the reading of the key.
186 	According to the book the statement should read:
187 
188 	pthread_getspecific(identitykey,(void **)(&identity));
189 
190 	but according to the information in pthread.h it is:
191 */
192 	identity = (int *)pthread_getspecific(identitykey);
193 	return(*identity);
194 }
195 
196 /*
197  		#] WhoAmI :
198  		#[ BeginIdentities :
199 */
200 /**
201  *	Starts up the identity registration. This is the routine to be called
202  *	at the startup of TFORM.
203  */
204 
BeginIdentities()205 VOID BeginIdentities()
206 {
207 	StartIdentity();
208 	SetIdentity(&identityretval);
209 }
210 
211 /*
212  		#] BeginIdentities :
213   	#] Identity :
214   	#[ StartHandleLock :
215 */
216 /**
217  *	Routine to be called at the startup of TFORM.
218  *	We have this routine because we would like to keep all access to TFORM
219  *	specific data in this file.
220  */
221 
StartHandleLock()222 void StartHandleLock()
223 {
224 	AM.handlelock = dummyrwlock;
225 }
226 
227 /*
228   	#] StartHandleLock :
229   	#[ StartAllThreads :
230 */
231 /**
232  *	In this routine we start 'number' threats
233  *	The routine that runs the show for each worker is called RunThread.
234  *	It will call the allocations and all the worker specific action.
235  *	Then the master has to wait till all workers are asleep before continuing.
236  *	If we use SortBots (special threads to help the master during the
237  *	final stages of a big sort) they are started and their routine is
238  *	called RunSortBot.
239  *	The master then waits till all sortbots are asleep before continuing.
240  *	Finally the sort buffers of the master are parcelled up for the final
241  *	merge in big sorts in which the workers have to feed the master.
242  *
243  *	@param number The number of main threads (including the master)
244  *	              The number of workers is number-1.
245  *	@return  Standard return conventions (OK -> 0)
246  */
247 
StartAllThreads(int number)248 int StartAllThreads(int number)
249 {
250 	int identity, j, dummy, mul;
251 	ALLPRIVATES *B;
252 	pthread_t thethread;
253 	identity = WhoAmI();
254 
255 #ifdef WITHSORTBOTS
256 	timerinfo = (LONG *)Malloc1(sizeof(LONG)*number*2,"timerinfo");
257 	sumtimerinfo = (LONG *)Malloc1(sizeof(LONG)*number*2,"sumtimerinfo");
258 	for ( j = 0; j < number*2; j++ ) { timerinfo[j] = 0; sumtimerinfo[j] = 0; }
259 	mul = 2;
260 #else
261 	timerinfo = (LONG *)Malloc1(sizeof(LONG)*number,"timerinfo");
262 	sumtimerinfo = (LONG *)Malloc1(sizeof(LONG)*number,"sumtimerinfo");
263 	for ( j = 0; j < number; j++ ) { timerinfo[j] = 0; sumtimerinfo[j] = 0; }
264 	mul = 1;
265 #endif
266 
267 	listofavailables = (int *)Malloc1(sizeof(int)*(number+1),"listofavailables");
268 	threadpointers = (pthread_t *)Malloc1(sizeof(pthread_t)*number*mul,"threadpointers");
269 	AB = (ALLPRIVATES **)Malloc1(sizeof(ALLPRIVATES *)*number*mul,"Private structs");
270 
271 	wakeup = (int *)Malloc1(sizeof(int)*number*mul,"wakeup");
272 	wakeuplocks = (pthread_mutex_t *)Malloc1(sizeof(pthread_mutex_t)*number*mul,"wakeuplocks");
273 	wakeupconditions = (pthread_cond_t *)Malloc1(sizeof(pthread_cond_t)*number*mul,"wakeupconditions");
274 	wakeupconditionattributes = (pthread_condattr_t *)
275 			Malloc1(sizeof(pthread_condattr_t)*number*mul,"wakeupconditionattributes");
276 
277 	wakeupmasterthread = (int *)Malloc1(sizeof(int)*number*mul,"wakeupmasterthread");
278 	wakeupmasterthreadlocks = (pthread_mutex_t *)Malloc1(sizeof(pthread_mutex_t)*number*mul,"wakeupmasterthreadlocks");
279 	wakeupmasterthreadconditions = (pthread_cond_t *)Malloc1(sizeof(pthread_cond_t)*number*mul,"wakeupmasterthread");
280 
281 	numberofthreads = number;
282 	numberofworkers = number - 1;
283 	threadpointers[identity] = pthread_self();
284 	topofavailables = 0;
285 	for ( j = 1; j < number; j++ ) {
286 		if ( pthread_create(&thethread,NULL,RunThread,(void *)(&dummy)) )
287 			goto failure;
288 	}
289 /*
290 	Now we initialize the master at the same time that the workers are doing so.
291 */
292 	B = InitializeOneThread(identity);
293 	AR.infile = &(AR.Fscr[0]);
294 	AR.outfile = &(AR.Fscr[1]);
295 	AR.hidefile = &(AR.Fscr[2]);
296 	AM.sbuflock = dummylock;
297 	AS.inputslock = dummylock;
298 	AS.outputslock = dummylock;
299 	AS.MaxExprSizeLock = dummylock;
300 	AP.PreVarLock = dummylock;
301 	AC.halfmodlock = dummylock;
302 	MakeThreadBuckets(number,0);
303 /*
304 	Now we wait for the workers to finish their startup.
305 	We don't want to initialize the sortbots yet and run the risk that
306 	some of them may end up with a lower number than one of the workers.
307 */
308 	MasterWaitAll();
309 #ifdef WITHSORTBOTS
310 	if ( numberofworkers > 2 ) {
311 		numberofsortbots = numberofworkers-2;
312 		for ( j = numberofworkers+1; j < 2*numberofworkers-1; j++ ) {
313 			if ( pthread_create(&thethread,NULL,RunSortBot,(void *)(&dummy)) )
314 				goto failure;
315 		}
316 	}
317 	else {
318 		numberofsortbots = 0;
319 	}
320 	MasterWaitAllSortBots();
321 	DefineSortBotTree();
322 #endif
323 	IniSortBlocks(number-1);
324 	AS.MasterSort = 0;
325 	AM.storefilelock = dummylock;
326 /*
327 MesPrint("AB = %x %x %x  %d",AB[0],AB[1],AB[2], identityofthreads);
328 */
329 	return(0);
330 failure:
331 	MesPrint("Cannot start %d threads",number);
332 	Terminate(-1);
333 	return(-1);
334 }
335 
336 /*
337   	#] StartAllThreads :
338   	#[ InitializeOneThread :
339 */
340 /**
341  *	Array for putting a label on memory allocations and error messages.
342  */
343 UBYTE *scratchname[] = { (UBYTE *)"scratchsize",
344                          (UBYTE *)"scratchsize",
345                          (UBYTE *)"hidesize" };
346 /**
347  *	Initializes one thread. This includes the allocation of its private
348  *	space and all its buffers. Also the initialization of variables.
349  *
350  *	@param identity The (TFORM defined) integer identifier of the thread.
351  *	@return A pointer to the struct with all private data of the thread.
352  *	We call this struct B and we have a system of macros
353  *	(defined in variable.h) that allows us to access its substructs in
354  *	the same way as the corresponding substructs in sequential FORM are
355  *	accessed. Example:
356  *		In TFORM  AR is defined as B->R
357  *		In FORM   AR is defined as A.R  (here it is part of the A struct)
358  *
359  *	One complication:
360  *		AM.ScratSize can be rather big. We don't want all the workers
361  *		to have an allocation of that size. Some computers may run out
362  *		of allocations.
363  *		We need on the workers:
364  *			AR.Fscr[0] : input for keep brackets and expressions in rhs
365  *			AR.Fscr[1] : output of the sorting to be fed to the master
366  *			AR.Fscr[2] : input for keep brackets and expressions in rhs
367  *		Hence the 0 and 2 channels can use a rather small buffer like
368  *			10*AM.MaxTer.
369  *		The 1 channel needs a buffer roughly AM.ScratSize/#ofworkers.
370  */
371 
InitializeOneThread(int identity)372 ALLPRIVATES *InitializeOneThread(int identity)
373 {
374 	WORD *t, *ScratchBuf;
375 	int i, j, bsize, *bp;
376 	LONG ScratchSize[3], IOsize;
377 	ALLPRIVATES *B;
378 	UBYTE *s;
379 
380 	wakeup[identity] = 0;
381 	wakeuplocks[identity] = dummylock;
382 	pthread_condattr_init(&(wakeupconditionattributes[identity]));
383 	pthread_condattr_setpshared(&(wakeupconditionattributes[identity]),PTHREAD_PROCESS_PRIVATE);
384 	wakeupconditions[identity] = dummywakeupcondition;
385 	pthread_cond_init(&(wakeupconditions[identity]),&(wakeupconditionattributes[identity]));
386 	wakeupmasterthread[identity] = 0;
387 	wakeupmasterthreadlocks[identity] = dummylock;
388 	wakeupmasterthreadconditions[identity] = dummywakeupcondition;
389 
390 	bsize = sizeof(ALLPRIVATES);
391 	bsize = (bsize+sizeof(int)-1)/sizeof(int);
392 	B = (ALLPRIVATES *)Malloc1(sizeof(int)*bsize,"B struct");
393 	for ( bp = (int *)B, j = 0; j < bsize; j++ ) *bp++ = 0;
394 
395 	AB[identity] = B;
396 /*
397 			12-jun-2007 JV:
398 	For the timing one has to know a few things:
399 	The POSIX standard is that there is only a single process ID and that
400 	getrusage returns the time of all the threads together.
401 	Under Linux there are two methods though: The older LinuxThreads and NPTL.
402 	LinuxThreads gives each thread its own process id. This makes that we
403 	can time the threads with getrusage, and hence this was done. Under NPTL
404 	this has been 'corrected' and suddenly getruage doesn't work anymore the
405 	way it used to. Now we need
406 		clock_gettime(CLOCK_THREAD_CPUTIME_ID,&timing)
407 	which is declared in <time.h> and we need -lrt extra in the link statement.
408 	(this is at least the case on blade02 at DESY-Zeuthen).
409 	See also the code in tools.c at the routine Timer.
410 	We may still have to include more stuff there.
411 */
412 	if ( identity > 0 ) TimeCPU(0);
413 
414 #ifdef WITHSORTBOTS
415 
416 	if ( identity > numberofworkers ) {
417 /*
418 		Some workspace is needed when we have a PolyFun and we have to add
419 		two terms and the new result is going to be longer than the old result.
420 */
421 		LONG length = AM.WorkSize*sizeof(WORD)/8+AM.MaxTer*2;
422 		AT.WorkSpace = (WORD *)Malloc1(length,"WorkSpace");
423 		AT.WorkTop = AT.WorkSpace + length/sizeof(WORD);
424 		AT.WorkPointer = AT.WorkSpace;
425 		AT.identity = identity;
426 /*
427 		The SB struct gets treated in IniSortBlocks.
428 		The SortBotIn variables will be defined DefineSortBotTree.
429 */
430 		if ( AN.SoScratC == 0 ) {
431 			AN.SoScratC = (UWORD *)Malloc1(2*(AM.MaxTal+2)*sizeof(UWORD),"Scratch in SortBot");
432 		}
433 		AT.SS = (SORTING *)Malloc1(sizeof(SORTING),"dummy sort buffer");
434 		AT.SS->PolyFlag = 0;
435 
436 		AT.comsym[0] = 8;
437 		AT.comsym[1] = SYMBOL;
438 		AT.comsym[2] = 4;
439 		AT.comsym[3] = 0;
440 		AT.comsym[4] = 1;
441 		AT.comsym[5] = 1;
442 		AT.comsym[6] = 1;
443 		AT.comsym[7] = 3;
444 		AT.comnum[0] = 4;
445 		AT.comnum[1] = 1;
446 		AT.comnum[2] = 1;
447 		AT.comnum[3] = 3;
448 		AT.comfun[0] = FUNHEAD+4;
449 		AT.comfun[1] = FUNCTION;
450 		AT.comfun[2] = FUNHEAD;
451 		AT.comfun[3] = 0;
452 #if FUNHEAD > 3
453 		for ( i = 4; i <= FUNHEAD; i++ )
454 			AT.comfun[i] = 0;
455 #endif
456 		AT.comfun[FUNHEAD+1] = 1;
457 		AT.comfun[FUNHEAD+2] = 1;
458 		AT.comfun[FUNHEAD+3] = 3;
459 		AT.comind[0] = 7;
460 		AT.comind[1] = INDEX;
461 		AT.comind[2] = 3;
462 		AT.comind[3] = 0;
463 		AT.comind[4] = 1;
464 		AT.comind[5] = 1;
465 		AT.comind[6] = 3;
466 
467 		AT.inprimelist = -1;
468 		AT.sizeprimelist = 0;
469 		AT.primelist = 0;
470 		AT.bracketinfo = 0;
471 
472 		AR.CompareRoutine = &Compare1;
473 
474 		AR.sLevel = 0;
475 		AR.wranfia = 0;
476 		AR.wranfcall = 0;
477 		AR.wranfnpair1 = NPAIR1;
478 		AR.wranfnpair2 = NPAIR2;
479 		AN.NumFunSorts = 5;
480 		AN.MaxFunSorts = 5;
481 		AN.SplitScratch = 0;
482 		AN.SplitScratchSize = AN.InScratch = 0;
483 		AN.SplitScratch1 = 0;
484 		AN.SplitScratchSize1 = AN.InScratch1 = 0;
485 
486 		AN.FunSorts = (SORTING **)Malloc1((AN.NumFunSorts+1)*sizeof(SORTING *),"FunSort pointers");
487 		for ( i = 0; i <= AN.NumFunSorts; i++ ) AN.FunSorts[i] = 0;
488 		AN.FunSorts[0] = AT.S0 = AT.SS;
489 		AN.idfunctionflag = 0;
490 		AN.tryterm = 0;
491 
492 		return(B);
493 	}
494 	if ( identity == 0 && AN.SoScratC == 0 ) {
495 		AN.SoScratC = (UWORD *)Malloc1(2*(AM.MaxTal+2)*sizeof(UWORD),"Scratch in SortBot");
496 	}
497 #endif
498 	AR.CurDum = AM.IndDum;
499 	for ( j = 0; j < 3; j++ ) {
500 		if ( identity == 0 ) {
501 			if ( j == 2 ) {
502 				ScratchSize[j] = AM.HideSize;
503 			}
504 			else {
505 				ScratchSize[j] = AM.ScratSize;
506 			}
507 			if ( ScratchSize[j] < 10*AM.MaxTer ) ScratchSize[j] = 10 * AM.MaxTer;
508 		}
509 		else {
510 /*
511 			ScratchSize[j] = AM.ScratSize / (numberofthreads-1);
512 			ScratchSize[j] = ScratchSize[j] / 20;
513 			if ( ScratchSize[j] < 10*AM.MaxTer ) ScratchSize[j] = 10 * AM.MaxTer;
514 */
515 			if ( j == 1 ) ScratchSize[j] = AM.ThreadScratOutSize;
516 			else          ScratchSize[j] = AM.ThreadScratSize;
517 			if ( ScratchSize[j] < 4*AM.MaxTer ) ScratchSize[j] = 4 * AM.MaxTer;
518 			AR.Fscr[j].name = 0;
519 		}
520 		ScratchSize[j] = ( ScratchSize[j] + 255 ) / 256;
521 		ScratchSize[j] = ScratchSize[j] * 256;
522 		ScratchBuf = (WORD *)Malloc1(ScratchSize[j]*sizeof(WORD),(char *)(scratchname[j]));
523 		AR.Fscr[j].POsize = ScratchSize[j] * sizeof(WORD);
524 		AR.Fscr[j].POfull = AR.Fscr[j].POfill = AR.Fscr[j].PObuffer = ScratchBuf;
525 		AR.Fscr[j].POstop = AR.Fscr[j].PObuffer + ScratchSize[j];
526 		PUTZERO(AR.Fscr[j].POposition);
527 		AR.Fscr[j].pthreadslock = dummylock;
528 		AR.Fscr[j].wPOsize = AR.Fscr[j].POsize;
529 		AR.Fscr[j].wPObuffer = AR.Fscr[j].PObuffer;
530 		AR.Fscr[j].wPOfill = AR.Fscr[j].POfill;
531 		AR.Fscr[j].wPOfull = AR.Fscr[j].POfull;
532 		AR.Fscr[j].wPOstop = AR.Fscr[j].POstop;
533 	}
534 	AR.InInBuf = 0;
535 	AR.InHiBuf = 0;
536 	AR.Fscr[0].handle = -1;
537 	AR.Fscr[1].handle = -1;
538 	AR.Fscr[2].handle = -1;
539 	AR.FoStage4[0].handle = -1;
540 	AR.FoStage4[1].handle = -1;
541 	IOsize = AM.S0->file.POsize;
542 #ifdef WITHZLIB
543 	AR.FoStage4[0].ziosize = IOsize;
544 	AR.FoStage4[1].ziosize = IOsize;
545 	AR.FoStage4[0].ziobuffer = 0;
546 	AR.FoStage4[1].ziobuffer = 0;
547 #endif
548 	AR.FoStage4[0].POsize  = ((IOsize+sizeof(WORD)-1)/sizeof(WORD))*sizeof(WORD);
549 	AR.FoStage4[1].POsize  = ((IOsize+sizeof(WORD)-1)/sizeof(WORD))*sizeof(WORD);
550 
551 	AR.hidefile = &(AR.Fscr[2]);
552 	AR.StoreData.Handle = -1;
553 	AR.SortType = AC.SortType;
554 
555 	AN.IndDum = AM.IndDum;
556 
557 	if ( identity > 0 ) {
558 		s = (UBYTE *)(FG.fname); i = 0;
559 		while ( *s ) { s++; i++; }
560 		s = (UBYTE *)Malloc1(sizeof(char)*(i+12),"name for Fscr[0] file");
561 		sprintf((char *)s,"%s.%d",FG.fname,identity);
562 		s[i-3] = 's'; s[i-2] = 'c'; s[i-1] = '0';
563 		AR.Fscr[0].name = (char *)s;
564 		s = (UBYTE *)(FG.fname); i = 0;
565 		while ( *s ) { s++; i++; }
566 		s = (UBYTE *)Malloc1(sizeof(char)*(i+12),"name for Fscr[1] file");
567 		sprintf((char *)s,"%s.%d",FG.fname,identity);
568 		s[i-3] = 's'; s[i-2] = 'c'; s[i-1] = '1';
569 		AR.Fscr[1].name = (char *)s;
570 	}
571 
572 	AR.CompressBuffer = (WORD *)Malloc1((AM.CompressSize+10)*sizeof(WORD),"compresssize");
573 	AR.ComprTop = AR.CompressBuffer + AM.CompressSize;
574 	AR.CompareRoutine = &Compare1;
575 /*
576 	Here we make all allocations for the struct AT
577 	(which is AB[identity].T or B->T with B = AB+identity).
578 */
579 	AT.WorkSpace = (WORD *)Malloc1(AM.WorkSize*sizeof(WORD),"WorkSpace");
580 	AT.WorkTop = AT.WorkSpace + AM.WorkSize;
581 	AT.WorkPointer = AT.WorkSpace;
582 
583 	AT.Nest = (NESTING)Malloc1((LONG)sizeof(struct NeStInG)*AM.maxFlevels,"functionlevels");
584 	AT.NestStop = AT.Nest + AM.maxFlevels;
585 	AT.NestPoin = AT.Nest;
586 
587 	AT.WildMask = (WORD *)Malloc1((LONG)AM.MaxWildcards*sizeof(WORD),"maxwildcards");
588 
589 	LOCK(availabilitylock);
590 	AT.ebufnum = inicbufs();		/* Buffer for extras during execution */
591 	AT.fbufnum = inicbufs();		/* Buffer for caching in factorization */
592 	AT.allbufnum = inicbufs();		/* Buffer for id,all */
593 	AT.aebufnum = inicbufs();		/* Buffer for id,all */
594 	UNLOCK(availabilitylock);
595 
596 	AT.RepCount = (int *)Malloc1((LONG)((AM.RepMax+3)*sizeof(int)),"repeat buffers");
597 	AN.RepPoint = AT.RepCount;
598 	AN.polysortflag = 0;
599 	AN.subsubveto = 0;
600 	AN.tryterm = 0;
601 	AT.RepTop = AT.RepCount + AM.RepMax;
602 
603 	AT.WildArgTaken = (WORD *)Malloc1((LONG)AC.WildcardBufferSize*sizeof(WORD)/2
604 				,"argument list names");
605 	AT.WildcardBufferSize = AC.WildcardBufferSize;
606 	AT.previousEfactor = 0;
607 
608 	AT.identity = identity;
609 	AT.LoadBalancing = 0;
610 /*
611 	Still to do: the SS stuff.
612 	             the Fscr[3]
613 	             the FoStage4[2]
614 */
615 	if ( AT.WorkSpace == 0 ||
616 	     AT.Nest == 0 ||
617 	     AT.WildMask == 0 ||
618 	     AT.RepCount == 0 ||
619 	     AT.WildArgTaken == 0 ) goto OnError;
620 /*
621 	And initializations
622 */
623 	AT.comsym[0] = 8;
624 	AT.comsym[1] = SYMBOL;
625 	AT.comsym[2] = 4;
626 	AT.comsym[3] = 0;
627 	AT.comsym[4] = 1;
628 	AT.comsym[5] = 1;
629 	AT.comsym[6] = 1;
630 	AT.comsym[7] = 3;
631 	AT.comnum[0] = 4;
632 	AT.comnum[1] = 1;
633 	AT.comnum[2] = 1;
634 	AT.comnum[3] = 3;
635 	AT.comfun[0] = FUNHEAD+4;
636 	AT.comfun[1] = FUNCTION;
637 	AT.comfun[2] = FUNHEAD;
638 	AT.comfun[3] = 0;
639 #if FUNHEAD > 3
640 	for ( i = 4; i <= FUNHEAD; i++ )
641 		AT.comfun[i] = 0;
642 #endif
643 	AT.comfun[FUNHEAD+1] = 1;
644 	AT.comfun[FUNHEAD+2] = 1;
645 	AT.comfun[FUNHEAD+3] = 3;
646 	AT.comind[0] = 7;
647 	AT.comind[1] = INDEX;
648 	AT.comind[2] = 3;
649 	AT.comind[3] = 0;
650 	AT.comind[4] = 1;
651 	AT.comind[5] = 1;
652 	AT.comind[6] = 3;
653 	AT.locwildvalue[0] = SUBEXPRESSION;
654 	AT.locwildvalue[1] = SUBEXPSIZE;
655 	for ( i = 2; i < SUBEXPSIZE; i++ ) AT.locwildvalue[i] = 0;
656 	AT.mulpat[0] = TYPEMULT;
657 	AT.mulpat[1] = SUBEXPSIZE+3;
658 	AT.mulpat[2] = 0;
659 	AT.mulpat[3] = SUBEXPRESSION;
660 	AT.mulpat[4] = SUBEXPSIZE;
661 	AT.mulpat[5] = 0;
662 	AT.mulpat[6] = 1;
663 	for ( i = 7; i < SUBEXPSIZE+5; i++ ) AT.mulpat[i] = 0;
664 	AT.proexp[0] = SUBEXPSIZE+4;
665 	AT.proexp[1] = EXPRESSION;
666 	AT.proexp[2] = SUBEXPSIZE;
667 	AT.proexp[3] = -1;
668 	AT.proexp[4] = 1;
669 	for ( i = 5; i < SUBEXPSIZE+1; i++ ) AT.proexp[i] = 0;
670 	AT.proexp[SUBEXPSIZE+1] = 1;
671 	AT.proexp[SUBEXPSIZE+2] = 1;
672 	AT.proexp[SUBEXPSIZE+3] = 3;
673 	AT.proexp[SUBEXPSIZE+4] = 0;
674 	AT.dummysubexp[0] = SUBEXPRESSION;
675 	AT.dummysubexp[1] = SUBEXPSIZE+4;
676 	for ( i = 2; i < SUBEXPSIZE; i++ ) AT.dummysubexp[i] = 0;
677 	AT.dummysubexp[SUBEXPSIZE] = WILDDUMMY;
678 	AT.dummysubexp[SUBEXPSIZE+1] = 4;
679 	AT.dummysubexp[SUBEXPSIZE+2] = 0;
680 	AT.dummysubexp[SUBEXPSIZE+3] = 0;
681 
682 	AT.MinVecArg[0] = 7+ARGHEAD;
683 	AT.MinVecArg[ARGHEAD] = 7;
684 	AT.MinVecArg[1+ARGHEAD] = INDEX;
685 	AT.MinVecArg[2+ARGHEAD] = 3;
686 	AT.MinVecArg[3+ARGHEAD] = 0;
687 	AT.MinVecArg[4+ARGHEAD] = 1;
688 	AT.MinVecArg[5+ARGHEAD] = 1;
689 	AT.MinVecArg[6+ARGHEAD] = -3;
690 	t = AT.FunArg;
691 	*t++ = 4+ARGHEAD+FUNHEAD;
692 	for ( i = 1; i < ARGHEAD; i++ ) *t++ = 0;
693 	*t++ = 4+FUNHEAD;
694 	*t++ = 0;
695 	*t++ = FUNHEAD;
696 	for ( i = 2; i < FUNHEAD; i++ ) *t++ = 0;
697 	*t++ = 1; *t++ = 1; *t++ = 3;
698 
699 	AT.inprimelist = -1;
700 	AT.sizeprimelist = 0;
701 	AT.primelist = 0;
702 	AT.nfac = AT.nBer = 0;
703 	AT.factorials = 0;
704 	AT.bernoullis = 0;
705 	AR.wranfia = 0;
706 	AR.wranfcall = 0;
707 	AR.wranfnpair1 = NPAIR1;
708 	AR.wranfnpair2 = NPAIR2;
709 	AR.wranfseed = 0;
710 	AN.SplitScratch = 0;
711 	AN.SplitScratchSize = AN.InScratch = 0;
712 	AN.SplitScratch1 = 0;
713 	AN.SplitScratchSize1 = AN.InScratch1 = 0;
714 /*
715 	Now the sort buffers. They depend on which thread. The master
716 	inherits the sortbuffer from AM.S0
717 */
718 	if ( identity == 0 ) {
719 		AT.S0 = AM.S0;
720 	}
721 	else {
722 /*
723 		For the moment we don't have special settings.
724 		They may become costly in virtual memory.
725 */
726 		AT.S0 = AllocSort(AM.S0->LargeSize*sizeof(WORD)/numberofworkers
727 						 ,AM.S0->SmallSize*sizeof(WORD)/numberofworkers
728 						 ,AM.S0->SmallEsize*sizeof(WORD)/numberofworkers
729 						 ,AM.S0->TermsInSmall
730 						 ,AM.S0->MaxPatches
731 /*						 ,AM.S0->MaxPatches/numberofworkers  */
732 						 ,AM.S0->MaxFpatches/numberofworkers
733 						 ,AM.S0->file.POsize);
734 	}
735 	AR.CompressPointer = AR.CompressBuffer;
736 /*
737 	Install the store caches (15-aug-2006 JV)
738 */
739 	AT.StoreCache = AT.StoreCacheAlloc = 0;
740 	if ( AM.NumStoreCaches > 0 ) {
741 		STORECACHE sa, sb;
742 		LONG size;
743 		size = sizeof(struct StOrEcAcHe)+AM.SizeStoreCache;
744 		size = ((size-1)/sizeof(size_t)+1)*sizeof(size_t);
745 		AT.StoreCacheAlloc = (STORECACHE)Malloc1(size*AM.NumStoreCaches,"StoreCaches");
746 		sa = AT.StoreCache = AT.StoreCacheAlloc;
747 		for ( i = 0; i < AM.NumStoreCaches; i++ ) {
748 			sb = (STORECACHE)(VOID *)((UBYTE *)sa+size);
749 			if ( i == AM.NumStoreCaches-1 ) {
750 				sa->next = 0;
751 			}
752 			else {
753 				sa->next = sb;
754 			}
755 			SETBASEPOSITION(sa->position,-1);
756 			SETBASEPOSITION(sa->toppos,-1);
757 			sa = sb;
758 		}
759 	}
760 
761 	ReserveTempFiles(2);
762 	return(B);
763 OnError:;
764 	MLOCK(ErrorMessageLock);
765 	MesPrint("Error initializing thread %d",identity);
766 	MUNLOCK(ErrorMessageLock);
767 	Terminate(-1);
768 	return(B);
769 }
770 
771 /*
772   	#] InitializeOneThread :
773   	#[ FinalizeOneThread :
774 */
775 /**
776  *	To be called at the end of the run to give the final time statistics for
777  *	this thread.
778  *
779  *	@param identity The TFORM defined integer identity of the thread.
780  *	                In principle we could find it out from here with a call
781  *	                to WhoAmI but because this is to be called at a very
782  *	                late stage during clean up, we don't want to run any risks.
783  */
784 
FinalizeOneThread(int identity)785 void FinalizeOneThread(int identity)
786 {
787 	timerinfo[identity] = TimeCPU(1);
788 }
789 
790 /*
791   	#] FinalizeOneThread :
792   	#[ ClearAllThreads :
793 */
794 /**
795  *	To be called at the end of running TFORM.
796  *	Theoretically the system can clean up after up, but it may be better
797  *	to do it ourselves.
798  */
799 
ClearAllThreads()800 VOID ClearAllThreads()
801 {
802 	int i;
803 	MasterWaitAll();
804 	for ( i = 1; i <= numberofworkers; i++ ) {
805 		WakeupThread(i,CLEARCLOCK);
806 	}
807 #ifdef WITHSORTBOTS
808 	for ( i = numberofworkers+1; i <= numberofworkers+numberofsortbots; i++ ) {
809 		WakeupThread(i,CLEARCLOCK);
810 	}
811 #endif
812 }
813 
814 /*
815   	#] ClearAllThreads :
816   	#[ TerminateAllThreads :
817 */
818 /**
819  *	To be called at the end of running TFORM.
820  *	Theoretically the system can clean up after up, but it may be better
821  *	to do it ourselves.
822  */
823 
TerminateAllThreads()824 VOID TerminateAllThreads()
825 {
826 	int i;
827 	for ( i = 1; i <= numberofworkers; i++ ) {
828 		GetThread(i);
829 		WakeupThread(i,TERMINATETHREAD);
830 	}
831 #ifdef WITHSORTBOTS
832 	for ( i = numberofworkers+1; i <= numberofworkers+numberofsortbots; i++ ) {
833 		WakeupThread(i,TERMINATETHREAD);
834 	}
835 #endif
836 	for ( i = 1; i <= numberofworkers; i++ ) {
837 		pthread_join(threadpointers[i],NULL);
838 	}
839 #ifdef WITHSORTBOTS
840 	for ( i = numberofworkers+1; i <= numberofworkers+numberofsortbots; i++ ) {
841 		pthread_join(threadpointers[i],NULL);
842 	}
843 #endif
844 }
845 
846 /*
847   	#] TerminateAllThreads :
848   	#[ MakeThreadBuckets :
849 */
850 /**
851  *	Creates 2*number thread buckets. We want double the number because
852  *	we want to prepare number of them while another number are occupied.
853  *
854  *	Each bucket should have about AC.ThreadBucketSize*AM.MaxTerm words.
855  *
856  *	When loading a thread we only have to pass the address of a full bucket.
857  *	This gives more overlap between the master and the workers and hence
858  *	less waiting.
859  *
860  *	The buckets are used because sending terms one by one to the workers
861  *	costs too much overhead. Hence we put a number of terms in each bucket
862  *	and then pass the whole bucket. In the ideal case the master loads the
863  *	buckets while the workers are processing the contents of the buckets
864  *	they have been assigned. In practise often the processing can go faster
865  *	than that the master can fill the buckets for all workers.
866  *	It should be possible to improve this bucket system, but the trivial
867  *	idea
868  *
869  *	@param number The number of workers
870  *	@param par    par = 0: First allocation
871  *	              par = 1: Reallocation when we change the bucket size with the
872  *	                       threadbucketsize statement.
873  */
874 
MakeThreadBuckets(int number,int par)875 int MakeThreadBuckets(int number, int par)
876 {
877 	int i;
878 	LONG sizethreadbuckets;
879 	THREADBUCKET *thr;
880 /*
881 	First we need a decent estimate. Not all terms should be maximal.
882 	Note that AM.MaxTer is in bytes!!!
883 	Maybe we should try to limit the size here a bit more effectively.
884 	This is a great consumer of memory.
885 */
886 	sizethreadbuckets = ( AC.ThreadBucketSize + 1 ) * AM.MaxTer + 2*sizeof(WORD);
887 	if ( AC.ThreadBucketSize >= 250 )      sizethreadbuckets /= 4;
888 	else if ( AC.ThreadBucketSize >= 90 )  sizethreadbuckets /= 3;
889 	else if ( AC.ThreadBucketSize >= 40 )  sizethreadbuckets /= 2;
890 	sizethreadbuckets /= sizeof(WORD);
891 
892 	if ( par == 0 ) {
893 		numthreadbuckets = 2*(number-1);
894 		threadbuckets = (THREADBUCKET **)Malloc1(numthreadbuckets*sizeof(THREADBUCKET *),"threadbuckets");
895 		freebuckets = (THREADBUCKET **)Malloc1(numthreadbuckets*sizeof(THREADBUCKET *),"threadbuckets");
896 	}
897 	if ( par > 0 ) {
898 		if ( sizethreadbuckets <= threadbuckets[0]->threadbuffersize ) return(0);
899 		for ( i = 0; i < numthreadbuckets; i++ ) {
900 			thr = threadbuckets[i];
901 			M_free(thr->deferbuffer,"deferbuffer");
902 		}
903 	}
904 	else {
905 		for ( i = 0; i < numthreadbuckets; i++ ) {
906 			threadbuckets[i] = (THREADBUCKET *)Malloc1(sizeof(THREADBUCKET),"threadbuckets");
907 			threadbuckets[i]->lock = dummylock;
908 		}
909 	}
910 	for ( i = 0; i < numthreadbuckets; i++ ) {
911 		thr = threadbuckets[i];
912 		thr->threadbuffersize = sizethreadbuckets;
913 		thr->free = BUCKETFREE;
914 		thr->deferbuffer = (POSITION *)Malloc1(2*sizethreadbuckets*sizeof(WORD)
915 					+(AC.ThreadBucketSize+1)*sizeof(POSITION),"deferbuffer");
916 		thr->threadbuffer = (WORD *)(thr->deferbuffer+AC.ThreadBucketSize+1);
917 		thr->compressbuffer = (WORD *)(thr->threadbuffer+sizethreadbuckets);
918 		thr->busy = BUCKETPREPARINGTERM;
919 		thr->usenum = thr->totnum = 0;
920 		thr->type = BUCKETDOINGTERMS;
921 	}
922 	return(0);
923 }
924 
925 /*
926   	#] MakeThreadBuckets :
927   	#[ GetTimerInfo :
928 */
929 
930 /**
931  *  Returns a pointer to the static timerinfo together with information about
932  *  its size. This is used by the checkpoint code to save this information in
933  *  the recovery file.
934  */
GetTimerInfo(LONG ** ti,LONG ** sti)935 int GetTimerInfo(LONG** ti,LONG** sti)
936 {
937 	*ti = timerinfo;
938 	*sti = sumtimerinfo;
939 #ifdef WITHSORTBOTS
940 	return AM.totalnumberofthreads*2;
941 #else
942 	return AM.totalnumberofthreads;
943 #endif
944 }
945 
946 /*
947   	#] GetTimerInfo :
948   	#[ WriteTimerInfo :
949 */
950 
951 /**
952  *  Writes data into the static timerinfo variable. This is used by the
953  *  checkpoint code to restore the correct timings for the individual threads.
954  */
WriteTimerInfo(LONG * ti,LONG * sti)955 void WriteTimerInfo(LONG* ti,LONG* sti)
956 {
957 	int i;
958 #ifdef WITHSORTBOTS
959 	int max = AM.totalnumberofthreads*2;
960 #else
961 	int max = AM.totalnumberofthreads;
962 #endif
963 	for ( i=0; i<max; ++i ) {
964 		timerinfo[i] = ti[i];
965 		sumtimerinfo[i] = sti[i];
966 	}
967 }
968 
969 /*
970   	#] WriteTimerInfo :
971   	#[ GetWorkerTimes :
972 */
973 /**
974  *	Gets the total CPU time of all workers together.
975  *	To be called at the end of the TFORM run.
976  */
977 
GetWorkerTimes()978 LONG GetWorkerTimes()
979 {
980 	LONG retval = 0;
981 	int i;
982 	for ( i = 1; i <= numberofworkers; i++ ) retval += timerinfo[i] + sumtimerinfo[i];
983 #ifdef WITHSORTBOTS
984 	for ( i = numberofworkers+1; i <= numberofworkers+numberofsortbots; i++ )
985 		retval += timerinfo[i] + sumtimerinfo[i];
986 #endif
987 	return(retval);
988 }
989 
990 /*
991   	#] GetWorkerTimes :
992   	#[ UpdateOneThread :
993 */
994 /**
995  *	Fix up some of the things that happened at compiler time.
996  *
997  *	@param identity The TFORM defined integer thread identifier.
998  */
999 
UpdateOneThread(int identity)1000 int UpdateOneThread(int identity)
1001 {
1002 	ALLPRIVATES *B = AB[identity], *B0 = AB[0];
1003 	AR.GetFile = AR0.GetFile;
1004 	AR.KeptInHold = AR0.KeptInHold;
1005 	AR.CurExpr = AR0.CurExpr;
1006 	AR.SortType = AC.SortType;
1007 	if ( AT.WildcardBufferSize < AC.WildcardBufferSize ) {
1008 		M_free(AT.WildArgTaken,"argument list names");
1009 		AT.WildcardBufferSize = AC.WildcardBufferSize;
1010 		AT.WildArgTaken = (WORD *)Malloc1((LONG)AC.WildcardBufferSize*sizeof(WORD)/2
1011 				,"argument list names");
1012 		if ( AT.WildArgTaken == 0 ) return(-1);
1013 	}
1014 	return(0);
1015 }
1016 
1017 /*
1018   	#] UpdateOneThread :
1019   	#[ LoadOneThread :
1020 */
1021 /**
1022  *	Loads all relevant variables from thread 'from' into thread 'identity'
1023  *	This is to be done just prior to waking up the thread.
1024  *	It is important to keep the number of variables to be copied to a minimum
1025  *	because this is part of the 'overhead'.
1026  *
1027  *	@param from     the source thread which has all the variables already
1028  *	@param identity the TFORM defined integer thread identitier of the thread that needs the copy
1029  *	@param thr      the bucket that contains the terms to be processed by 'identity'
1030  *	@param par		if 1 copies the already active pieces in the (de)compress buffer
1031  *	@return Standard return convention (OK -> 0)
1032  */
1033 
LoadOneThread(int from,int identity,THREADBUCKET * thr,int par)1034 int LoadOneThread(int from, int identity, THREADBUCKET *thr, int par)
1035 {
1036 	WORD *t1, *t2;
1037 	ALLPRIVATES *B = AB[identity], *B0 = AB[from];
1038 
1039 	AR.DefPosition = AR0.DefPosition;
1040 	AR.NoCompress = AR0.NoCompress;
1041 	AR.gzipCompress = AR0.gzipCompress;
1042 	AR.BracketOn = AR0.BracketOn;
1043 	AR.CurDum = AR0.CurDum;
1044 	AR.DeferFlag = AR0.DeferFlag;
1045 	AR.TePos = 0;
1046 	AR.sLevel = AR0.sLevel;
1047 	AR.Stage4Name = AR0.Stage4Name;
1048 	AR.GetOneFile = AR0.GetOneFile;
1049 	AR.PolyFun = AR0.PolyFun;
1050 	AR.PolyFunInv = AR0.PolyFunInv;
1051 	AR.PolyFunType = AR0.PolyFunType;
1052 	AR.PolyFunExp = AR0.PolyFunExp;
1053 	AR.PolyFunVar = AR0.PolyFunVar;
1054 	AR.PolyFunPow = AR0.PolyFunPow;
1055 	AR.Eside = AR0.Eside;
1056 	AR.Cnumlhs = AR0.Cnumlhs;
1057 /*
1058 	AR.MaxBracket = AR0.MaxBracket;
1059 
1060 	The compressbuffer contents are mainly relevant for keep brackets
1061 	We should do this only if there is a keep brackets statement
1062 	We may however still need the compressbuffer for expressions in the rhs.
1063 */
1064 	if ( par >= 1 ) {
1065 /*
1066 		We may not need this %%%%% 7-apr-2006
1067 */
1068 		t1 = AR.CompressBuffer; t2 = AR0.CompressBuffer;
1069 		while ( t2 < AR0.CompressPointer ) *t1++ = *t2++;
1070 		AR.CompressPointer = t1;
1071 
1072 	}
1073 	else {
1074 		AR.CompressPointer = AR.CompressBuffer;
1075 	}
1076 	if ( AR.DeferFlag ) {
1077 		if ( AR.infile->handle < 0 ) {
1078 			AR.infile->POfill = AR0.infile->POfill;
1079 		}
1080 		else {
1081 /*
1082 			We have to set the value of POposition to something that will
1083 			force a read in the first try.
1084 */
1085 			AR.infile->POfull = AR.infile->POfill = AR.infile->PObuffer;
1086 		}
1087 	}
1088 	if ( par == 0 ) {
1089 		AN.threadbuck = thr;
1090 		AN.ninterms = thr->firstterm;
1091 	}
1092 	else if ( par == 1 ) {
1093 		WORD *tstop;
1094 		t1 = thr->threadbuffer; tstop = t1 + *t1;
1095 		t2 = AT.WorkPointer;
1096 		while ( t1 < tstop ) *t2++ = *t1++;
1097 		AN.ninterms = thr->firstterm;
1098 	}
1099 	AN.TeInFun = 0;
1100 	AN.ncmod = AC.ncmod;
1101 	AT.BrackBuf = AT0.BrackBuf;
1102 	AT.bracketindexflag = AT0.bracketindexflag;
1103 	AN.PolyFunTodo = 0;
1104 /*
1105 	The relevant variables and the term are in their place.
1106 	There is nothing more to do.
1107 */
1108 	return(0);
1109 }
1110 
1111 /*
1112   	#] LoadOneThread :
1113   	#[ BalanceRunThread :
1114 */
1115 /**
1116  *	To start a thread from the Generator routine we need to pass a number
1117  *	of variables.
1118  *	This is part of the second stage load balancing. The second stage is
1119  *	when we interfere with the expansion tree in Generator and let branches
1120  *	of the tree be treated by other workers.
1121  *	Early experiments show disappointing results and hence the system is
1122  *	currently disabled.
1123  *
1124  *	@param identity  The identity of the thread that will receive the term.
1125  *	@param term      The term to be passed to thread 'identity'
1126  *	@param level     The level at which we are in the tree. Defines the statement.
1127  *	@return Standard return convention (OK -> 0)
1128  */
1129 
BalanceRunThread(PHEAD int identity,WORD * term,WORD level)1130 int BalanceRunThread(PHEAD int identity, WORD *term, WORD level)
1131 {
1132 	GETBIDENTITY
1133 	ALLPRIVATES *BB;
1134 	WORD *t, *tt;
1135 	int i, *ti, *tti;
1136 
1137 	LoadOneThread(AT.identity,identity,0,2);
1138 /*
1139 	Extra loading if needed. Quantities changed in Generator.
1140 	Like the level that has to be passed.
1141 */
1142 	BB = AB[identity];
1143 	BB->R.level = level;
1144 	BB->T.TMbuff = AT.TMbuff;
1145 	ti = AT.RepCount; tti = BB->T.RepCount;
1146 	i = AN.RepPoint - AT.RepCount;
1147 	BB->N.RepPoint = BB->T.RepCount + i;
1148 	for ( ; i >= 0; i-- ) tti[i] = ti[i];
1149 
1150 	t = term; i = *term;
1151 	tt = BB->T.WorkSpace;
1152 	NCOPY(tt,t,i);
1153 	BB->T.WorkPointer = tt;
1154 
1155 	WakeupThread(identity,HIGHERLEVELGENERATION);
1156 
1157 	return(0);
1158 }
1159 
1160 /*
1161   	#] BalanceRunThread :
1162   	#[ SetWorkerFiles :
1163 */
1164 /**
1165  *	Initializes the scratch files at the start of the execution of a module.
1166  */
1167 
SetWorkerFiles()1168 void SetWorkerFiles()
1169 {
1170 	int id;
1171 	ALLPRIVATES *B, *B0 = AB[0];
1172 	for ( id = 1; id < AM.totalnumberofthreads; id++ ) {
1173 		B = AB[id];
1174 		AR.infile = &(AR.Fscr[0]);
1175 		AR.outfile = &(AR.Fscr[1]);
1176 		AR.hidefile = &(AR.Fscr[2]);
1177 		AR.infile->handle = AR0.infile->handle;
1178 		AR.hidefile->handle = AR0.hidefile->handle;
1179 		if ( AR.infile->handle < 0 ) {
1180 			AR.infile->PObuffer = AR0.infile->PObuffer;
1181 			AR.infile->POstop = AR0.infile->POstop;
1182 			AR.infile->POfill = AR0.infile->POfill;
1183 			AR.infile->POfull = AR0.infile->POfull;
1184 			AR.infile->POsize = AR0.infile->POsize;
1185 			AR.InInBuf = AR0.InInBuf;
1186 			AR.infile->POposition = AR0.infile->POposition;
1187 			AR.infile->filesize = AR0.infile->filesize;
1188 		}
1189 		else {
1190 			AR.infile->PObuffer = AR.infile->wPObuffer;
1191 			AR.infile->POstop = AR.infile->wPOstop;
1192 			AR.infile->POfill = AR.infile->wPOfill;
1193 			AR.infile->POfull = AR.infile->wPOfull;
1194 			AR.infile->POsize = AR.infile->wPOsize;
1195 			AR.InInBuf = 0;
1196 			PUTZERO(AR.infile->POposition);
1197 		}
1198 /*
1199 		If there is some writing, it betters happens to ones own outfile.
1200 		Currently this is to be done only for InParallel.
1201 		Merging of the outputs is then done by the CopyExpression routine.
1202 */
1203 		{
1204 			AR.outfile->PObuffer = AR.outfile->wPObuffer;
1205 			AR.outfile->POstop = AR.outfile->wPOstop;
1206 			AR.outfile->POfill = AR.outfile->wPOfill;
1207 			AR.outfile->POfull = AR.outfile->wPOfull;
1208 			AR.outfile->POsize = AR.outfile->wPOsize;
1209 			PUTZERO(AR.outfile->POposition);
1210 		}
1211 		if ( AR.hidefile->handle < 0 ) {
1212 			AR.hidefile->PObuffer = AR0.hidefile->PObuffer;
1213 			AR.hidefile->POstop = AR0.hidefile->POstop;
1214 			AR.hidefile->POfill = AR0.hidefile->POfill;
1215 			AR.hidefile->POfull = AR0.hidefile->POfull;
1216 			AR.hidefile->POsize = AR0.hidefile->POsize;
1217 			AR.InHiBuf = AR0.InHiBuf;
1218 			AR.hidefile->POposition = AR0.hidefile->POposition;
1219 			AR.hidefile->filesize = AR0.hidefile->filesize;
1220 		}
1221 		else {
1222 			AR.hidefile->PObuffer = AR.hidefile->wPObuffer;
1223 			AR.hidefile->POstop = AR.hidefile->wPOstop;
1224 			AR.hidefile->POfill = AR.hidefile->wPOfill;
1225 			AR.hidefile->POfull = AR.hidefile->wPOfull;
1226 			AR.hidefile->POsize = AR.hidefile->wPOsize;
1227 			AR.InHiBuf = 0;
1228 			PUTZERO(AR.hidefile->POposition);
1229 		}
1230 	}
1231 	if ( AR0.StoreData.dirtyflag ) {
1232 		for ( id = 1; id < AM.totalnumberofthreads; id++ ) {
1233 			B = AB[id];
1234 			AR.StoreData = AR0.StoreData;
1235 		}
1236 	}
1237 }
1238 
1239 /*
1240   	#] SetWorkerFiles :
1241   	#[ RunThread :
1242 */
1243 /**
1244  *	This is the routine that represents each worker.
1245  *	The model is that the worker waits for a 'signal'.
1246  *	If there is a signal it wakes up, looks at what signal and then takes
1247  *	the corresponding action. After this it goes back to sleep.
1248  */
1249 
RunThread(void * dummy)1250 void *RunThread(void *dummy)
1251 {
1252 	WORD *term, *ttin, *tt, *ttco, *oldwork;
1253 	int identity, wakeupsignal, identityretv, i, tobereleased, errorcode;
1254 	ALLPRIVATES *B;
1255 	THREADBUCKET *thr;
1256 	POSITION *ppdef;
1257 	EXPRESSIONS e;
1258 	DUMMYUSE(dummy);
1259 	identity = SetIdentity(&identityretv);
1260 	threadpointers[identity] = pthread_self();
1261 	B = InitializeOneThread(identity);
1262 	while ( ( wakeupsignal = ThreadWait(identity) ) > 0 ) {
1263 		switch ( wakeupsignal ) {
1264 /*
1265 			#[ STARTNEWEXPRESSION :
1266 */
1267 			case STARTNEWEXPRESSION:
1268 /*
1269 				Set up the sort routines etc.
1270 				Start with getting some buffers synchronized with the compiler
1271 */
1272 				if ( UpdateOneThread(identity) ) {
1273 					MLOCK(ErrorMessageLock);
1274 					MesPrint("Update error in starting expression in thread %d in module %d",identity,AC.CModule);
1275 					MUNLOCK(ErrorMessageLock);
1276 					Terminate(-1);
1277 				}
1278 				AR.DeferFlag = AC.ComDefer;
1279 				AR.sLevel = AS.sLevel;
1280 				AR.MaxDum = AM.IndDum;
1281 				AR.expchanged = AB[0]->R.expchanged;
1282 				AR.expflags = AB[0]->R.expflags;
1283 				AR.PolyFun = AB[0]->R.PolyFun;
1284 				AR.PolyFunInv = AB[0]->R.PolyFunInv;
1285 				AR.PolyFunType = AB[0]->R.PolyFunType;
1286 				AR.PolyFunExp = AB[0]->R.PolyFunExp;
1287 				AR.PolyFunVar = AB[0]->R.PolyFunVar;
1288 				AR.PolyFunPow = AB[0]->R.PolyFunPow;
1289 /*
1290 				Now fire up the sort buffer.
1291 */
1292 				NewSort(BHEAD0);
1293 				break;
1294 /*
1295 			#] STARTNEWEXPRESSION :
1296 			#[ LOWESTLEVELGENERATION :
1297 */
1298 			case LOWESTLEVELGENERATION:
1299 #ifdef INNERTEST
1300 				if ( AC.InnerTest ) {
1301 					if ( StrCmp(AC.TestValue,(UBYTE *)INNERTEST) == 0 ) {
1302 						MesPrint("Testing(Worker%d): value = %s",AT.identity,AC.TestValue);
1303 					}
1304 				}
1305 #endif
1306 				e = Expressions + AR.CurExpr;
1307 				thr = AN.threadbuck;
1308 				ppdef = thr->deferbuffer;
1309 				ttin = thr->threadbuffer;
1310 				ttco = thr->compressbuffer;
1311 				term = AT.WorkPointer;
1312 				thr->usenum = 0;
1313 				tobereleased = 0;
1314 				AN.inputnumber = thr->firstterm;
1315 				AN.ninterms = thr->firstterm;
1316 				do {
1317 				  thr->usenum++;	/* For if the master wants to steal the bucket */
1318 				  tt = term; i = *ttin;
1319 				  NCOPY(tt,ttin,i);
1320 				  AT.WorkPointer = tt;
1321 				  if ( AR.DeferFlag ) {
1322 					tt = AR.CompressBuffer; i = *ttco;
1323 					NCOPY(tt,ttco,i);
1324 					AR.CompressPointer = tt;
1325 					AR.DefPosition = ppdef[0]; ppdef++;
1326 				  }
1327 				  if ( thr->free == BUCKETTERMINATED ) {
1328 /*
1329 				    The next statement allows the master to steal the bucket
1330 				    for load balancing purposes. We do still execute the current
1331 					term, but afterwards we drop out.
1332 					Once we have written the release code, we cannot use this
1333 					bucket anymore. Hence the exit to the label bucketstolen.
1334 */
1335 					if ( thr->usenum == thr->totnum ) {
1336 						thr->free = BUCKETCOMINGFREE;
1337 					}
1338 					else {
1339 						thr->free = BUCKETRELEASED;
1340 						tobereleased = 1;
1341 					}
1342 				  }
1343 /*
1344 					What if we want to steal and we set thr->free while
1345 					the thread is inside the next code for a long time?
1346 				  if ( AT.LoadBalancing ) {
1347 */
1348 					LOCK(thr->lock);
1349 					thr->busy = BUCKETDOINGTERM;
1350 					UNLOCK(thr->lock);
1351 /*
1352 				  }
1353 				  else {
1354 					thr->busy = BUCKETDOINGTERM;
1355 				  }
1356 */
1357 				  AN.RepPoint = AT.RepCount + 1;
1358 
1359 				  if ( ( e->vflags & ISFACTORIZED ) != 0 && term[1] == HAAKJE ) {
1360 				    StoreTerm(BHEAD term);
1361 				  }
1362 				  else {
1363 				  if ( AR.DeferFlag ) {
1364 					AR.CurDum = AN.IndDum = Expressions[AR.CurExpr].numdummies + AM.IndDum;
1365 				  }
1366 				  else {
1367 					AN.IndDum = AM.IndDum;
1368 					AR.CurDum = ReNumber(BHEAD term);
1369 				  }
1370 				  if ( AC.SymChangeFlag ) MarkDirty(term,DIRTYSYMFLAG);
1371 				  if ( AN.ncmod ) {
1372 					if ( ( AC.modmode & ALSOFUNARGS ) != 0 ) MarkDirty(term,DIRTYFLAG);
1373 					else if ( AR.PolyFun ) PolyFunDirty(BHEAD term);
1374 				  }
1375 				  else if ( AC.PolyRatFunChanged ) PolyFunDirty(BHEAD term);
1376 				  if ( ( AP.PreDebug & THREADSDEBUG ) != 0 ) {
1377 					MLOCK(ErrorMessageLock);
1378 					MesPrint("Thread %w executing term:");
1379 					PrintTerm(term,"LLG");
1380 					MUNLOCK(ErrorMessageLock);
1381 				  }
1382 				  if ( ( AR.PolyFunType == 2 ) && ( AC.PolyRatFunChanged == 0 )
1383 						&& ( e->status == LOCALEXPRESSION || e->status == GLOBALEXPRESSION ) ) {
1384 						PolyFunClean(BHEAD term);
1385 				  }
1386 				  if ( Generator(BHEAD term,0) ) {
1387 					LowerSortLevel();
1388 					MLOCK(ErrorMessageLock);
1389 					MesPrint("Error in processing one term in thread %d in module %d",identity,AC.CModule);
1390 					MUNLOCK(ErrorMessageLock);
1391 					Terminate(-1);
1392 				  }
1393 				  AN.ninterms++;
1394 				  }
1395 /*				  if ( AT.LoadBalancing ) { */
1396 					LOCK(thr->lock);
1397 					thr->busy = BUCKETPREPARINGTERM;
1398 					UNLOCK(thr->lock);
1399 /*
1400 				  }
1401 				  else {
1402 					thr->busy = BUCKETPREPARINGTERM;
1403 				  }
1404 */
1405 				  if ( thr->free == BUCKETTERMINATED ) {
1406 					if ( thr->usenum == thr->totnum ) {
1407 						thr->free = BUCKETCOMINGFREE;
1408 					}
1409 					else {
1410 						thr->free = BUCKETRELEASED;
1411 						tobereleased = 1;
1412 					}
1413 				  }
1414 				  if ( tobereleased ) goto bucketstolen;
1415 				} while ( *ttin );
1416 				thr->free = BUCKETCOMINGFREE;
1417 bucketstolen:;
1418 /*				if ( AT.LoadBalancing ) { */
1419 					LOCK(thr->lock);
1420 					thr->busy = BUCKETTOBERELEASED;
1421 					UNLOCK(thr->lock);
1422 /*				}
1423 				else {
1424 					thr->busy = BUCKETTOBERELEASED;
1425 				}
1426 */
1427 				AT.WorkPointer = term;
1428 				break;
1429 /*
1430 			#] LOWESTLEVELGENERATION :
1431 			#[ FINISHEXPRESSION :
1432 */
1433 #ifdef WITHSORTBOTS
1434 			case CLAIMOUTPUT:
1435 				LOCK(AT.SB.MasterBlockLock[1]);
1436 				break;
1437 #endif
1438 			case FINISHEXPRESSION:
1439 /*
1440 				Finish the sort
1441 
1442 				Start with claiming the first block
1443 				Once we have claimed it we can let the master know that
1444 				everything is all right.
1445 */
1446 				LOCK(AT.SB.MasterBlockLock[1]);
1447 				ThreadClaimedBlock(identity);
1448 /*
1449 				Entry for when we work with sortbots
1450 */
1451 #ifdef WITHSORTBOTS
1452 				/* fall through */
1453 			case FINISHEXPRESSION2:
1454 #endif
1455 /*
1456 				Now we may need here an fsync on the sort file
1457 */
1458 				if ( AC.ThreadSortFileSynch ) {
1459 				  if ( AT.S0->file.handle >= 0 ) {
1460 					SynchFile(AT.S0->file.handle);
1461 				  }
1462 				}
1463 				AT.SB.FillBlock = 1;
1464 				AT.SB.MasterFill[1] = AT.SB.MasterStart[1];
1465 				errorcode = EndSort(BHEAD AT.S0->sBuffer,0);
1466 				UNLOCK(AT.SB.MasterBlockLock[AT.SB.FillBlock]);
1467 				UpdateMaxSize();
1468 				if ( errorcode ) {
1469 					MLOCK(ErrorMessageLock);
1470 					MesPrint("Error terminating sort in thread %d in module %d",identity,AC.CModule);
1471 					MUNLOCK(ErrorMessageLock);
1472 					Terminate(-1);
1473 				}
1474 				break;
1475 /*
1476 			#] FINISHEXPRESSION :
1477 			#[ CLEANUPEXPRESSION :
1478 */
1479 			case CLEANUPEXPRESSION:
1480 /*
1481 				Cleanup everything and wait for the next expression
1482 */
1483 				if ( AR.outfile->handle >= 0 ) {
1484 					CloseFile(AR.outfile->handle);
1485 					AR.outfile->handle = -1;
1486 					remove(AR.outfile->name);
1487 					AR.outfile->POfill = AR.outfile->POfull = AR.outfile->PObuffer;
1488 					PUTZERO(AR.outfile->POposition);
1489 					PUTZERO(AR.outfile->filesize);
1490 				}
1491 				else {
1492 					AR.outfile->POfill = AR.outfile->POfull = AR.outfile->PObuffer;
1493 					PUTZERO(AR.outfile->POposition);
1494 					PUTZERO(AR.outfile->filesize);
1495 				}
1496 				{
1497 					CBUF *C = cbuf+AT.ebufnum;
1498 					WORD **w, ii;
1499 					if ( C->numrhs > 0 || C->numlhs > 0 ) {
1500 						if ( C->rhs ) {
1501 							w = C->rhs; ii = C->numrhs;
1502 							do { *w++ = 0; } while ( --ii > 0 );
1503 						}
1504 						if ( C->lhs ) {
1505 							w = C->lhs; ii = C->numlhs;
1506 							do { *w++ = 0; } while ( --ii > 0 );
1507 						}
1508 						C->numlhs = C->numrhs = 0;
1509 						ClearTree(AT.ebufnum);
1510 						C->Pointer = C->Buffer;
1511 					}
1512 				}
1513 				break;
1514 /*
1515 			#] CLEANUPEXPRESSION :
1516 			#[ HIGHERLEVELGENERATION :
1517 */
1518 			case HIGHERLEVELGENERATION:
1519 /*
1520 				When foliating halfway the tree.
1521 				This should only be needed in a second level load balancing
1522 */
1523 				term = AT.WorkSpace; AT.WorkPointer = term + *term;
1524 				if ( Generator(BHEAD term,AR.level) ) {
1525 					LowerSortLevel();
1526 					MLOCK(ErrorMessageLock);
1527 					MesPrint("Error in load balancing one term at level %d in thread %d in module %d",AR.level,AT.identity,AC.CModule);
1528 					MUNLOCK(ErrorMessageLock);
1529 					Terminate(-1);
1530 				}
1531 				AT.WorkPointer = term;
1532 				break;
1533 /*
1534 			#] HIGHERLEVELGENERATION :
1535 			#[ STARTNEWMODULE :
1536 */
1537 			case STARTNEWMODULE:
1538 /*
1539 				For resetting variables.
1540 */
1541 				SpecialCleanup(B);
1542 				break;
1543 /*
1544 			#] STARTNEWMODULE :
1545 			#[ TERMINATETHREAD :
1546 */
1547 			case TERMINATETHREAD:
1548 				goto EndOfThread;
1549 /*
1550 			#] TERMINATETHREAD :
1551 			#[ DOONEEXPRESSION :
1552 
1553 				When a thread has to do a complete (not too big) expression.
1554 				The number of the expression to be done is in AR.exprtodo.
1555 				The code is mostly taken from Processor. The only difference
1556 				is with what to do with the output.
1557 				The output should go to the scratch buffer of the worker
1558 				(which is free at the right moment). If this buffer is too
1559 				small we have a problem. We could write to file or give the
1560 				master what we have and from now on the master has to collect
1561 				pieces until things are complete.
1562 				Note: this assumes that the expressions don't keep their order.
1563 				If they have to keep their order, don't use this feature.
1564 */
1565 			case DOONEEXPRESSION: {
1566 
1567 				POSITION position, outposition;
1568 				FILEHANDLE *fi, *fout, *oldoutfile;
1569 				LONG dd = 0;
1570 				WORD oldBracketOn = AR.BracketOn;
1571 				WORD *oldBrackBuf = AT.BrackBuf;
1572 				WORD oldbracketindexflag = AT.bracketindexflag;
1573 				WORD fromspectator = 0;
1574 				e = Expressions + AR.exprtodo;
1575 				i = AR.exprtodo;
1576 				AR.CurExpr = i;
1577 				AR.SortType = AC.SortType;
1578 				AR.expchanged = 0;
1579 				if ( ( e->vflags & ISFACTORIZED ) != 0 ) {
1580 					AR.BracketOn = 1;
1581 					AT.BrackBuf = AM.BracketFactors;
1582 					AT.bracketindexflag = 1;
1583 				}
1584 
1585 				position = AS.OldOnFile[i];
1586 				if ( e->status == HIDDENLEXPRESSION || e->status == HIDDENGEXPRESSION ) {
1587 					AR.GetFile = 2; fi = AR.hidefile;
1588 				}
1589 				else {
1590 					AR.GetFile = 0; fi = AR.infile;
1591 				}
1592 /*
1593 				PUTZERO(fi->POposition);
1594 				if ( fi->handle >= 0 ) {
1595 					fi->POfill = fi->POfull = fi->PObuffer;
1596 				}
1597 */
1598 				SetScratch(fi,&position);
1599 				term = oldwork = AT.WorkPointer;
1600 				AR.CompressPointer = AR.CompressBuffer;
1601 				AR.CompressPointer[0] = 0;
1602 				AR.KeptInHold = 0;
1603 				if ( GetTerm(BHEAD term) <= 0 ) {
1604 					MLOCK(ErrorMessageLock);
1605 					MesPrint("Expression %d has problems in scratchfile (t)",i);
1606 					MUNLOCK(ErrorMessageLock);
1607 					Terminate(-1);
1608 				}
1609 				if ( AT.bracketindexflag > 0 ) OpenBracketIndex(i);
1610 				term[3] = i;
1611 				if ( term[5] < 0 ) {
1612 					fromspectator = -term[5];
1613 					PUTZERO(AM.SpectatorFiles[fromspectator-1].readpos);
1614 					term[5] = AC.cbufnum;
1615 				}
1616 				PUTZERO(outposition);
1617 				fout = AR.outfile;
1618 				fout->POfill = fout->POfull = fout->PObuffer;
1619 				fout->POposition = outposition;
1620 				if ( fout->handle >= 0 ) {
1621 					fout->POposition = outposition;
1622 				}
1623 /*
1624 				The next statement is needed because we need the system
1625 				to believe that the expression is at position zero for
1626 				the moment. In this worker, with no memory of other expressions,
1627 				it is. This is needed for when a bracket index is made
1628 				because there e->onfile is an offset. Afterwards, when the
1629 				expression is written to its final location in the masters
1630 				output e->onfile will get its real value.
1631 */
1632 				PUTZERO(e->onfile);
1633 				if ( PutOut(BHEAD term,&outposition,fout,0) < 0 ) goto ProcErr;
1634 
1635 				AR.DeferFlag = AC.ComDefer;
1636 
1637 				AR.sLevel = AB[0]->R.sLevel;
1638 				term = AT.WorkPointer;
1639 				NewSort(BHEAD0);
1640 				AR.MaxDum = AM.IndDum;
1641 				AN.ninterms = 0;
1642 				if ( fromspectator ) {
1643 				 while ( GetFromSpectator(term,fromspectator-1) ) {
1644 				  AT.WorkPointer = term + *term;
1645 				  AN.RepPoint = AT.RepCount + 1;
1646 				  AN.IndDum = AM.IndDum;
1647 				  AR.CurDum = ReNumber(BHEAD term);
1648 				  if ( AC.SymChangeFlag ) MarkDirty(term,DIRTYSYMFLAG);
1649 				  if ( AN.ncmod ) {
1650 					if ( ( AC.modmode & ALSOFUNARGS ) != 0 ) MarkDirty(term,DIRTYFLAG);
1651 					else if ( AR.PolyFun ) PolyFunDirty(BHEAD term);
1652 				  }
1653 				  else if ( AC.PolyRatFunChanged ) PolyFunDirty(BHEAD term);
1654 				  if ( ( AR.PolyFunType == 2 ) && ( AC.PolyRatFunChanged == 0 )
1655 						&& ( e->status == LOCALEXPRESSION || e->status == GLOBALEXPRESSION ) ) {
1656 						PolyFunClean(BHEAD term);
1657 				  }
1658 				  if ( Generator(BHEAD term,0) ) {
1659 					LowerSortLevel(); goto ProcErr;
1660 				  }
1661 				 }
1662 				}
1663 				else {
1664 				 while ( GetTerm(BHEAD term) ) {
1665 				  SeekScratch(fi,&position);
1666 				  AN.ninterms++; dd = AN.deferskipped;
1667 				  if ( ( e->vflags & ISFACTORIZED ) != 0 && term[1] == HAAKJE ) {
1668 					  StoreTerm(BHEAD term);
1669 				  }
1670 				  else {
1671 				  if ( AC.CollectFun && *term <= (AM.MaxTer/(2*(LONG)sizeof(WORD))) ) {
1672 					if ( GetMoreTerms(term) < 0 ) {
1673 					  LowerSortLevel(); goto ProcErr;
1674 					}
1675 				    SeekScratch(fi,&position);
1676 				  }
1677 				  AT.WorkPointer = term + *term;
1678 				  AN.RepPoint = AT.RepCount + 1;
1679 				  if ( AR.DeferFlag ) {
1680 					AR.CurDum = AN.IndDum = Expressions[AR.exprtodo].numdummies;
1681 				  }
1682 				  else {
1683 					AN.IndDum = AM.IndDum;
1684 					AR.CurDum = ReNumber(BHEAD term);
1685 				  }
1686 				  if ( AC.SymChangeFlag ) MarkDirty(term,DIRTYSYMFLAG);
1687 				  if ( AN.ncmod ) {
1688 					if ( ( AC.modmode & ALSOFUNARGS ) != 0 ) MarkDirty(term,DIRTYFLAG);
1689 					else if ( AR.PolyFun ) PolyFunDirty(BHEAD term);
1690 				  }
1691 				  else if ( AC.PolyRatFunChanged ) PolyFunDirty(BHEAD term);
1692 				  if ( ( AR.PolyFunType == 2 ) && ( AC.PolyRatFunChanged == 0 )
1693 						&& ( e->status == LOCALEXPRESSION || e->status == GLOBALEXPRESSION ) ) {
1694 						PolyFunClean(BHEAD term);
1695 				  }
1696 				  if ( Generator(BHEAD term,0) ) {
1697 					LowerSortLevel(); goto ProcErr;
1698 				  }
1699 				  AN.ninterms += dd;
1700 				  }
1701 				  SetScratch(fi,&position);
1702 				  if ( fi == AR.hidefile ) {
1703 					AR.InHiBuf = (fi->POfull-fi->PObuffer)
1704 						-DIFBASE(position,fi->POposition)/sizeof(WORD);
1705 				  }
1706 				  else {
1707 					AR.InInBuf = (fi->POfull-fi->PObuffer)
1708 						-DIFBASE(position,fi->POposition)/sizeof(WORD);
1709 				  }
1710 				 }
1711 				}
1712 				AN.ninterms += dd;
1713 				if ( EndSort(BHEAD AT.S0->sBuffer,0) < 0 ) goto ProcErr;
1714 				e->numdummies = AR.MaxDum - AM.IndDum;
1715 				AR.BracketOn = oldBracketOn;
1716 				AT.BrackBuf = oldBrackBuf;
1717 				if ( ( e->vflags & TOBEFACTORED ) != 0 )
1718 						poly_factorize_expression(e);
1719 				else if ( ( ( e->vflags & TOBEUNFACTORED ) != 0 )
1720 				 && ( ( e->vflags & ISFACTORIZED ) != 0 ) )
1721 						poly_unfactorize_expression(e);
1722 				if ( AT.S0->TermsLeft )   e->vflags &= ~ISZERO;
1723 				else                      e->vflags |= ISZERO;
1724 				if ( AR.expchanged == 0 ) e->vflags |= ISUNMODIFIED;
1725 				if ( AT.S0->TermsLeft ) AR.expflags |= ISZERO;
1726 				if ( AR.expchanged )    AR.expflags |= ISUNMODIFIED;
1727 				AR.GetFile = 0;
1728 				AT.bracketindexflag = oldbracketindexflag;
1729 /*
1730 				Now copy the whole thing from fout to AR0.outfile
1731 				Do this in one go to keep the lock occupied as short as possible
1732 */
1733 				SeekScratch(fout,&outposition);
1734 				LOCK(AS.outputslock);
1735 				oldoutfile = AB[0]->R.outfile;
1736 				if ( e->status == INTOHIDELEXPRESSION || e->status == INTOHIDEGEXPRESSION ) {
1737 					AB[0]->R.outfile = AB[0]->R.hidefile;
1738 				}
1739 				SeekScratch(AB[0]->R.outfile,&position);
1740 				e->onfile = position;
1741 				if ( CopyExpression(fout,AB[0]->R.outfile) < 0 ) {
1742 					AB[0]->R.outfile = oldoutfile;
1743 					UNLOCK(AS.outputslock);
1744 					MLOCK(ErrorMessageLock);
1745 					MesPrint("Error copying output of 'InParallel' expression to master. Thread: %d",identity);
1746 					MUNLOCK(ErrorMessageLock);
1747 					goto ProcErr;
1748 				}
1749 				AB[0]->R.outfile = oldoutfile;
1750 				AB[0]->R.hidefile->POfull = AB[0]->R.hidefile->POfill;
1751 				AB[0]->R.expflags = AR.expflags;
1752 				UNLOCK(AS.outputslock);
1753 
1754 				if ( fout->handle >= 0 ) {	/* Now get rid of the file */
1755 					CloseFile(fout->handle);
1756 					fout->handle = -1;
1757 					remove(fout->name);
1758 					PUTZERO(fout->POposition);
1759 					PUTZERO(fout->filesize);
1760 					fout->POfill = fout->POfull = fout->PObuffer;
1761 				}
1762 				UpdateMaxSize();
1763 
1764 				AT.WorkPointer = oldwork;
1765 
1766 				} break;
1767 /*
1768 			#] DOONEEXPRESSION :
1769 			#[ DOBRACKETS :
1770 
1771 				In case we have a bracket index we can have the worker treat
1772 				one or more of the entries in the bracket index.
1773 				The advantage is that identical terms will meet each other
1774 				sooner in the sorting and hence fewer compares will be needed.
1775 				Also this way the master doesn't need to fill the buckets.
1776 				The main problem is the load balancing which can become very
1777 				bad when there is a long tail without things outside the bracket.
1778 
1779 				We get sent:
1780 				1: The number of the first bracket to be done
1781 				2: The number of the last bracket to be done
1782 */
1783 			case DOBRACKETS: {
1784 				BRACKETINFO *binfo;
1785 				BRACKETINDEX *bi;
1786 				FILEHANDLE *fi;
1787 				POSITION stoppos,where;
1788 				e = Expressions + AR.CurExpr;
1789 				binfo = e->bracketinfo;
1790 				thr = AN.threadbuck;
1791 				bi = &(binfo->indexbuffer[thr->firstbracket]);
1792 				if ( AR.GetFile == 2 ) fi = AR.hidefile;
1793 				else                   fi = AR.infile;
1794 				where = bi->start;
1795 				ADD2POS(where,AS.OldOnFile[AR.CurExpr]);
1796 				SetScratch(fi,&(where));
1797 				stoppos = binfo->indexbuffer[thr->lastbracket].next;
1798 				ADD2POS(stoppos,AS.OldOnFile[AR.CurExpr]);
1799 				AN.ninterms = thr->firstterm;
1800 /*
1801 				Now we have to put the 'value' of the bracket in the
1802 				Compress buffer.
1803 */
1804 				ttco = AR.CompressBuffer;
1805 				tt = binfo->bracketbuffer + bi->bracket;
1806 				i = *tt;
1807 				NCOPY(ttco,tt,i)
1808 				AR.CompressPointer = ttco;
1809 				term = AT.WorkPointer;
1810 				while ( GetTerm(BHEAD term) ) {
1811 					SeekScratch(fi,&where);
1812 					AT.WorkPointer = term + *term;
1813 					AN.IndDum = AM.IndDum;
1814 					AR.CurDum = ReNumber(BHEAD term);
1815 					if ( AC.SymChangeFlag ) MarkDirty(term,DIRTYSYMFLAG);
1816 					if ( AN.ncmod ) {
1817 						if ( ( AC.modmode & ALSOFUNARGS ) != 0 ) MarkDirty(term,DIRTYFLAG);
1818 						else if ( AR.PolyFun ) PolyFunDirty(BHEAD term);
1819 					}
1820 					else if ( AC.PolyRatFunChanged ) PolyFunDirty(BHEAD term);
1821 					if ( ( AR.PolyFunType == 2 ) && ( AC.PolyRatFunChanged == 0 )
1822 						&& ( e->status == LOCALEXPRESSION || e->status == GLOBALEXPRESSION ) ) {
1823 						PolyFunClean(BHEAD term);
1824 					}
1825 					if ( ( AP.PreDebug & THREADSDEBUG ) != 0 ) {
1826 						MLOCK(ErrorMessageLock);
1827 						MesPrint("Thread %w executing term:");
1828 						PrintTerm(term,"DoBrackets");
1829 						MUNLOCK(ErrorMessageLock);
1830 					}
1831 					AT.WorkPointer = term + *term;
1832 					if ( Generator(BHEAD term,0) ) {
1833 						LowerSortLevel();
1834 						MLOCK(ErrorMessageLock);
1835 						MesPrint("Error in processing one term in thread %d in module %d",identity,AC.CModule);
1836 						MUNLOCK(ErrorMessageLock);
1837 						Terminate(-1);
1838 					}
1839 					AN.ninterms++;
1840 					SetScratch(fi,&(where));
1841 					if ( ISGEPOS(where,stoppos) ) break;
1842 				}
1843 				AT.WorkPointer = term;
1844 				thr->free = BUCKETCOMINGFREE;
1845 				break;
1846 			}
1847 /*
1848 			#] DOBRACKETS :
1849 			#[ CLEARCLOCK :
1850 
1851 			The program only comes here after a .clear
1852 */
1853 			case CLEARCLOCK:
1854 /*				LOCK(clearclocklock); */
1855 				sumtimerinfo[identity] += TimeCPU(1);
1856 				timerinfo[identity] = TimeCPU(0);
1857 /*				UNLOCK(clearclocklock); */
1858 				break;
1859 /*
1860 			#] CLEARCLOCK :
1861 			#[ MCTSEXPANDTREE :
1862 */
1863 			case MCTSEXPANDTREE:
1864 				AT.optimtimes = AB[0]->T.optimtimes;
1865 				find_Horner_MCTS_expand_tree();
1866 				break;
1867 /*
1868 			#] MCTSEXPANDTREE :
1869 			#[ OPTIMIZEEXPRESSION :
1870 */
1871 			case OPTIMIZEEXPRESSION:
1872 				optimize_expression_given_Horner();
1873 				break;
1874 /*
1875 			#] OPTIMIZEEXPRESSION :
1876 */
1877 			default:
1878 				MLOCK(ErrorMessageLock);
1879 				MesPrint("Illegal wakeup signal %d for thread %d",wakeupsignal,identity);
1880 				MUNLOCK(ErrorMessageLock);
1881 				Terminate(-1);
1882 				break;
1883 		}
1884 		/* we need the following update in case we are using checkpoints. then we
1885 		   need to readjust the clocks when recovering using this information */
1886 		timerinfo[identity] = TimeCPU(1);
1887 	}
1888 EndOfThread:;
1889 /*
1890 	This is the end of the thread. We cleanup and exit.
1891 */
1892 	FinalizeOneThread(identity);
1893 	return(0);
1894 ProcErr:
1895 	Terminate(-1);
1896 	return(0);
1897 }
1898 
1899 /*
1900   	#] RunThread :
1901   	#[ RunSortBot :
1902 */
1903 /**
1904  *	This is the routine that represents each sortbot.
1905  *	The model is that the sortbot waits for a 'signal'.
1906  *	If there is a signal it wakes up, looks at what signal and then takes
1907  *	the corresponding action. After this it goes back to sleep.
1908  */
1909 
1910 #ifdef WITHSORTBOTS
1911 
RunSortBot(void * dummy)1912 void *RunSortBot(void *dummy)
1913 {
1914 	int identity, wakeupsignal, identityretv;
1915 	ALLPRIVATES *B, *BB;
1916 	DUMMYUSE(dummy);
1917 	identity = SetIdentity(&identityretv);
1918 	threadpointers[identity] = pthread_self();
1919 	B = InitializeOneThread(identity);
1920 	while ( ( wakeupsignal = SortBotWait(identity) ) > 0 ) {
1921 		switch ( wakeupsignal ) {
1922 /*
1923 			#[ INISORTBOT :
1924 */
1925 			case INISORTBOT:
1926 				AR.CurExpr = AB[0]->R.CurExpr;
1927 				AR.PolyFun = AB[0]->R.PolyFun;
1928 				AR.PolyFunInv = AB[0]->R.PolyFunInv;
1929 				AR.PolyFunType = AB[0]->R.PolyFunType;
1930 				AR.PolyFunExp = AB[0]->R.PolyFunExp;
1931 				AR.PolyFunVar = AB[0]->R.PolyFunVar;
1932 				AR.PolyFunPow = AB[0]->R.PolyFunPow;
1933 				AR.SortType = AC.SortType;
1934 				if ( AR.PolyFun == 0 ) { AT.SS->PolyFlag = 0; }
1935 				else if ( AR.PolyFunType == 1 ) { AT.SS->PolyFlag = 1; }
1936 				else if ( AR.PolyFunType == 2 ) {
1937 					if ( AR.PolyFunExp == 2
1938 					  || AR.PolyFunExp == 3 ) AT.SS->PolyFlag = 1;
1939 					else                      AT.SS->PolyFlag = 2;
1940 				}
1941 				AT.SS->PolyWise = 0;
1942 				AN.ncmod = AC.ncmod;
1943 				LOCK(AT.SB.MasterBlockLock[1]);
1944 				BB = AB[AT.SortBotIn1];
1945 				LOCK(BB->T.SB.MasterBlockLock[BB->T.SB.MasterNumBlocks]);
1946 				BB = AB[AT.SortBotIn2];
1947 				LOCK(BB->T.SB.MasterBlockLock[BB->T.SB.MasterNumBlocks]);
1948 				AT.SB.FillBlock = 1;
1949 				AT.SB.MasterFill[1] = AT.SB.MasterStart[1];
1950 				SETBASEPOSITION(AN.theposition,0);
1951 				break;
1952 /*
1953 			#] INISORTBOT :
1954 			#[ RUNSORTBOT :
1955 */
1956 			case RUNSORTBOT:
1957 				SortBotMerge(B);
1958 				break;
1959 /*
1960 			#] RUNSORTBOT :
1961 			#[ TERMINATETHREAD :
1962 */
1963 			case TERMINATETHREAD:
1964 				goto EndOfThread;
1965 /*
1966 			#] TERMINATETHREAD :
1967 			#[ CLEARCLOCK :
1968 
1969 			The program only comes here after a .clear
1970 */
1971 			case CLEARCLOCK:
1972 /*				LOCK(clearclocklock); */
1973 				sumtimerinfo[identity] += TimeCPU(1);
1974 				timerinfo[identity] = TimeCPU(0);
1975 /*				UNLOCK(clearclocklock); */
1976 				break;
1977 /*
1978 			#] CLEARCLOCK :
1979 */
1980 			default:
1981 				MLOCK(ErrorMessageLock);
1982 				MesPrint("Illegal wakeup signal %d for thread %d",wakeupsignal,identity);
1983 				MUNLOCK(ErrorMessageLock);
1984 				Terminate(-1);
1985 				break;
1986 		}
1987 	}
1988 EndOfThread:;
1989 /*
1990 	This is the end of the thread. We cleanup and exit.
1991 */
1992 	FinalizeOneThread(identity);
1993 	return(0);
1994 }
1995 
1996 #endif
1997 
1998 /*
1999   	#] RunSortBot :
2000   	#[ IAmAvailable :
2001 */
2002 /**
2003  *	To be called by a thread when it becomes available.
2004  *	Puts it on a stack.
2005  *	We use a stack model. It is also possible to define a circular queue.
2006  *	This will be tried out at a later stage.
2007  *	One advantage of a stack could be that if we cannot feed all threads
2008  *	more sorting is done at the threads and the master has to do less.
2009  *
2010  *	@param identity The identity thread that signals its availability.
2011  */
2012 
IAmAvailable(int identity)2013 void IAmAvailable(int identity)
2014 {
2015 	int top;
2016 	LOCK(availabilitylock);
2017 	top = topofavailables;
2018 	listofavailables[topofavailables++] = identity;
2019 	if ( top == 0 ) {
2020 		UNLOCK(availabilitylock);
2021 		LOCK(wakeupmasterlock);
2022 		wakeupmaster = identity;
2023 		pthread_cond_signal(&wakeupmasterconditions);
2024 		UNLOCK(wakeupmasterlock);
2025 	}
2026 	else {
2027 		UNLOCK(availabilitylock);
2028 	}
2029 }
2030 
2031 /*
2032   	#] IAmAvailable :
2033   	#[ GetAvailableThread :
2034 */
2035 /**
2036  *	Gets an available thread from the top of the stack.
2037  *	Maybe a circular buffer model would work better. This would mean that
2038  *	we take the lowest available worker, rather than the highest.
2039  *	We then have to work with high water marks and low water marks.
2040  *	(writing point and reading point). Still to be investigated.
2041  */
2042 
GetAvailableThread()2043 int GetAvailableThread()
2044 {
2045 	int retval = -1;
2046 	LOCK(availabilitylock);
2047 	if ( topofavailables > 0 ) retval = listofavailables[--topofavailables];
2048 	UNLOCK(availabilitylock);
2049 	if ( retval >= 0 ) {
2050 /*
2051 		Make sure the thread is indeed waiting and not between
2052 		saying that it is available and starting to wait.
2053 */
2054 		LOCK(wakeuplocks[retval]);
2055 		UNLOCK(wakeuplocks[retval]);
2056 	}
2057 	return(retval);
2058 }
2059 
2060 /*
2061   	#] GetAvailableThread :
2062   	#[ ConditionalGetAvailableThread :
2063 */
2064 /**
2065  *	Looks whether a thread is available.
2066  *	If a thread is available it is taken from the stack of available threads.
2067  *
2068  *	@return the identity of an available thread or -1 if none is available.
2069  */
2070 
ConditionalGetAvailableThread()2071 int ConditionalGetAvailableThread()
2072 {
2073 	int retval = -1;
2074 	if ( topofavailables > 0 ) {
2075 		LOCK(availabilitylock);
2076 		if ( topofavailables > 0 ) {
2077 			retval = listofavailables[--topofavailables];
2078 		}
2079 		UNLOCK(availabilitylock);
2080 		if ( retval >= 0 ) {
2081 /*
2082 			Make sure the thread is indeed waiting and not between
2083 			saying that it is available and starting to wait.
2084 */
2085 			LOCK(wakeuplocks[retval]);
2086 			UNLOCK(wakeuplocks[retval]);
2087 		}
2088 	}
2089 	return(retval);
2090 }
2091 
2092 /*
2093   	#] ConditionalGetAvailableThread :
2094   	#[ GetThread :
2095 */
2096 /**
2097  *	Gets a given thread from the list of available threads, even if
2098  *	it isn't on the top of the stack.
2099  *
2100  *	@param identity The number of the thread that we want to remove from the
2101  *	                list of available threads.
2102  *	@return The number of the thread if it was available. -1 otherwise.
2103  */
2104 
GetThread(int identity)2105 int GetThread(int identity)
2106 {
2107 	int retval = -1, j;
2108 	LOCK(availabilitylock);
2109 	for ( j = 0; j < topofavailables; j++ ) {
2110 		if ( identity == listofavailables[j] ) break;
2111 	}
2112 	if ( j < topofavailables ) {
2113 		--topofavailables;
2114 		for ( ; j < topofavailables; j++ ) {
2115 			listofavailables[j] = listofavailables[j+1];
2116 		}
2117 		retval = identity;
2118 	}
2119 	UNLOCK(availabilitylock);
2120 	return(retval);
2121 }
2122 
2123 /*
2124   	#] GetThread :
2125   	#[ ThreadWait :
2126 */
2127 /**
2128  *	To be called by a thread when it has nothing to do.
2129  *	It goes to sleep and waits for a wakeup call.
2130  *	The return value is the number of the wakeup signal.
2131  *
2132  *	@param identity The number of the thread.
2133  *	@return The number of the wake-up signal.
2134  */
2135 
ThreadWait(int identity)2136 int ThreadWait(int identity)
2137 {
2138 	int retval, top, j;
2139 	LOCK(wakeuplocks[identity]);
2140 	LOCK(availabilitylock);
2141 	top = topofavailables;
2142 	for ( j = topofavailables; j > 0; j-- )
2143 		listofavailables[j] = listofavailables[j-1];
2144 	listofavailables[0] = identity;
2145 	topofavailables++;
2146 	if ( top == 0 || topofavailables == numberofworkers ) {
2147 		UNLOCK(availabilitylock);
2148 		LOCK(wakeupmasterlock);
2149 		wakeupmaster = identity;
2150 		pthread_cond_signal(&wakeupmasterconditions);
2151 		UNLOCK(wakeupmasterlock);
2152 	}
2153 	else {
2154 		UNLOCK(availabilitylock);
2155 	}
2156 	while ( wakeup[identity] == 0 ) {
2157 		pthread_cond_wait(&(wakeupconditions[identity]),&(wakeuplocks[identity]));
2158 	}
2159 	retval = wakeup[identity];
2160 	wakeup[identity] = 0;
2161 	UNLOCK(wakeuplocks[identity]);
2162 	return(retval);
2163 }
2164 
2165 /*
2166   	#] ThreadWait :
2167   	#[ SortBotWait :
2168 */
2169 
2170 #ifdef WITHSORTBOTS
2171 /**
2172  *	To be called by a sortbot thread when it has nothing to do.
2173  *	It goes to sleep and waits for a wakeup call.
2174  *	The return value is the number of the wakeup signal.
2175  *
2176  *	@param identity The number of the sortbot thread.
2177  *	@return The number of the wake-up signal.
2178  */
2179 
SortBotWait(int identity)2180 int SortBotWait(int identity)
2181 {
2182 	int retval;
2183 	LOCK(wakeuplocks[identity]);
2184 	LOCK(availabilitylock);
2185 	topsortbotavailables++;
2186 	if ( topsortbotavailables >= numberofsortbots ) {
2187 		UNLOCK(availabilitylock);
2188 		LOCK(wakeupsortbotlock);
2189 		wakeupmaster = identity;
2190 		pthread_cond_signal(&wakeupsortbotconditions);
2191 		UNLOCK(wakeupsortbotlock);
2192 	}
2193 	else {
2194 		UNLOCK(availabilitylock);
2195 	}
2196 	while ( wakeup[identity] == 0 ) {
2197 		pthread_cond_wait(&(wakeupconditions[identity]),&(wakeuplocks[identity]));
2198 	}
2199 	retval = wakeup[identity];
2200 	wakeup[identity] = 0;
2201 	UNLOCK(wakeuplocks[identity]);
2202 	return(retval);
2203 }
2204 
2205 #endif
2206 
2207 /*
2208   	#] SortBotWait :
2209   	#[ ThreadClaimedBlock :
2210 */
2211 /**
2212  *	When the final sort of an expression starts the workers have to claim
2213  *	the first block in the buffers of the master for their output.
2214  *	The master may only continue after all workers have claimed their block
2215  *	because otherwise it is possible that the master may claim this block for
2216  *	reading before it has been written in.
2217  *	Hence the master must wait till all blocks have been claimed. Then the
2218  *	master will get signalled that it can continue.
2219  */
2220 
ThreadClaimedBlock(int identity)2221 int ThreadClaimedBlock(int identity)
2222 {
2223 	LOCK(availabilitylock);
2224 	numberclaimed++;
2225 	if ( numberclaimed >= numberofworkers ) {
2226 		UNLOCK(availabilitylock);
2227 		LOCK(wakeupmasterlock);
2228 		wakeupmaster = identity;
2229 		pthread_cond_signal(&wakeupmasterconditions);
2230 		UNLOCK(wakeupmasterlock);
2231 	}
2232 	else {
2233 		UNLOCK(availabilitylock);
2234 	}
2235 	return(0);
2236 }
2237 
2238 /*
2239   	#] ThreadClaimedBlock :
2240   	#[ MasterWait :
2241 */
2242 /**
2243  *	To be called by the master when it has to wait for one of the
2244  *	workers to become available.
2245  *	It goes to sleep and waits for a wakeupmaster call.
2246  *	The return value is the identity of the process that wakes up the master.
2247  */
2248 
MasterWait()2249 int MasterWait()
2250 {
2251 	int retval;
2252 	LOCK(wakeupmasterlock);
2253 	while ( wakeupmaster == 0 ) {
2254 		pthread_cond_wait(&wakeupmasterconditions,&wakeupmasterlock);
2255 	}
2256 	retval = wakeupmaster;
2257 	wakeupmaster = 0;
2258 	UNLOCK(wakeupmasterlock);
2259 	return(retval);
2260 }
2261 
2262 /*
2263   	#] MasterWait :
2264   	#[ MasterWaitThread :
2265 */
2266 /**
2267  *	To be called by the master when it has to wait for a specific one of the
2268  *	workers to become available.
2269  *	The return value is the value of the signal.
2270  */
2271 
MasterWaitThread(int identity)2272 int MasterWaitThread(int identity)
2273 {
2274 	int retval;
2275 	LOCK(wakeupmasterthreadlocks[identity]);
2276 	while ( wakeupmasterthread[identity] == 0 ) {
2277 		pthread_cond_wait(&(wakeupmasterthreadconditions[identity])
2278 				,&(wakeupmasterthreadlocks[identity]));
2279 	}
2280 	retval = wakeupmasterthread[identity];
2281 	wakeupmasterthread[identity] = 0;
2282 	UNLOCK(wakeupmasterthreadlocks[identity]);
2283 	return(retval);
2284 }
2285 
2286 /*
2287   	#] MasterWaitThread :
2288   	#[ MasterWaitAll :
2289 */
2290 /**
2291  *	To be called by the master when it has to wait for all of the
2292  *	workers to finish a given task.
2293  *	It goes to sleep and waits for a wakeup call in ThreadWait
2294  */
2295 
MasterWaitAll()2296 void MasterWaitAll()
2297 {
2298 	LOCK(wakeupmasterlock);
2299 	while ( topofavailables < numberofworkers ) {
2300 		pthread_cond_wait(&wakeupmasterconditions,&wakeupmasterlock);
2301 	}
2302 	UNLOCK(wakeupmasterlock);
2303 	return;
2304 }
2305 
2306 /*
2307   	#] MasterWaitAll :
2308   	#[ MasterWaitAllSortBots :
2309 */
2310 
2311 #ifdef WITHSORTBOTS
2312 
2313 /**
2314  *	To be called by the master when it has to wait for all of the
2315  *	sortbots to start their task.
2316  */
2317 
MasterWaitAllSortBots()2318 void MasterWaitAllSortBots()
2319 {
2320 	LOCK(wakeupsortbotlock);
2321 	while ( topsortbotavailables < numberofsortbots ) {
2322 		pthread_cond_wait(&wakeupsortbotconditions,&wakeupsortbotlock);
2323 	}
2324 	UNLOCK(wakeupsortbotlock);
2325 	return;
2326 }
2327 
2328 #endif
2329 
2330 /*
2331   	#] MasterWaitAllSortBots :
2332   	#[ MasterWaitAllBlocks :
2333 */
2334 /**
2335  *	To be called by the master when it has to wait for all of the
2336  *	workers to claim their first block in the sort buffers of the master.
2337  *	It goes to sleep and waits for a wakeup call.
2338  */
2339 
MasterWaitAllBlocks()2340 void MasterWaitAllBlocks()
2341 {
2342 	LOCK(wakeupmasterlock);
2343 	while ( numberclaimed < numberofworkers ) {
2344 		pthread_cond_wait(&wakeupmasterconditions,&wakeupmasterlock);
2345 	}
2346 	UNLOCK(wakeupmasterlock);
2347 	return;
2348 }
2349 
2350 /*
2351   	#] MasterWaitAllBlocks :
2352   	#[ WakeupThread :
2353 */
2354 /**
2355  *	To be called when the indicated thread needs waking up.
2356  *	The signal number should be nonzero!
2357  *
2358  *	@param identity     The number of the worker to be woken up
2359  *	@param signalnumber The signal with which it should be woken up.
2360  */
2361 
WakeupThread(int identity,int signalnumber)2362 void WakeupThread(int identity, int signalnumber)
2363 {
2364 	if ( signalnumber == 0 ) {
2365 		MLOCK(ErrorMessageLock);
2366 		MesPrint("Illegal wakeup signal for thread %d",identity);
2367 		MUNLOCK(ErrorMessageLock);
2368 		Terminate(-1);
2369 	}
2370 	LOCK(wakeuplocks[identity]);
2371 	wakeup[identity] = signalnumber;
2372 	pthread_cond_signal(&(wakeupconditions[identity]));
2373 	UNLOCK(wakeuplocks[identity]);
2374 }
2375 
2376 /*
2377   	#] WakeupThread :
2378   	#[ WakeupMasterFromThread :
2379 */
2380 /**
2381  *	To be called when the indicated thread needs to wake up the master.
2382  *	The signal number should be nonzero!
2383  *
2384  *	@param identity     The number of the worker who wakes up the master.
2385  *	@param signalnumber The signal with which the master should be woken up.
2386  */
2387 
WakeupMasterFromThread(int identity,int signalnumber)2388 void WakeupMasterFromThread(int identity, int signalnumber)
2389 {
2390 	if ( signalnumber == 0 ) {
2391 		MLOCK(ErrorMessageLock);
2392 		MesPrint("Illegal wakeup signal for master %d",identity);
2393 		MUNLOCK(ErrorMessageLock);
2394 		Terminate(-1);
2395 	}
2396 	LOCK(wakeupmasterthreadlocks[identity]);
2397 	wakeupmasterthread[identity] = signalnumber;
2398 	pthread_cond_signal(&(wakeupmasterthreadconditions[identity]));
2399 	UNLOCK(wakeupmasterthreadlocks[identity]);
2400 }
2401 
2402 /*
2403   	#] WakeupMasterFromThread :
2404   	#[ SendOneBucket :
2405 */
2406 /**
2407  *	To be called when there is a full bucket and an available thread
2408  *	It prepares the thread and then wakes it up.
2409  */
2410 
SendOneBucket(int type)2411 int SendOneBucket(int type)
2412 {
2413 	ALLPRIVATES *B0 = AB[0];
2414 	THREADBUCKET *thr = 0;
2415 	int j, k, id;
2416 	for ( j = 0; j < numthreadbuckets; j++ ) {
2417 		if ( threadbuckets[j]->free == BUCKETFILLED ) {
2418 			thr = threadbuckets[j];
2419 			for ( k = j+1; k < numthreadbuckets; k++ )
2420 				threadbuckets[k-1] = threadbuckets[k];
2421 			threadbuckets[numthreadbuckets-1] = thr;
2422 			break;
2423 		}
2424 	}
2425 	AN0.ninterms++;
2426 	while ( ( id = GetAvailableThread() ) < 0 ) { MasterWait(); }
2427 /*
2428 	Prepare the thread. Give it the term and variables.
2429 */
2430 	LoadOneThread(0,id,thr,0);
2431 	thr->busy = BUCKETASSIGNED;
2432 	thr->free = BUCKETINUSE;
2433 	numberoffullbuckets--;
2434 /*
2435 	And signal the thread to run.
2436 	Form now on we may only interfere with this bucket
2437 	1: after it has been marked BUCKETCOMINGFREE
2438 	2: when thr->busy == BUCKETDOINGTERM and then only when protected by
2439 	   thr->lock. This would be for load balancing.
2440 */
2441 	WakeupThread(id,type);
2442 /*	AN0.ninterms += thr->ddterms; */
2443 	return(0);
2444 }
2445 
2446 /*
2447   	#] SendOneBucket :
2448   	#[ InParallelProcessor :
2449 */
2450 /**
2451  *	We divide the expressions marked by partodo over the workers.
2452  *	The workers are responsible for writing their results into the buffers
2453  *	of the master (output). This is to be controled by locks.
2454  *	The order of the expressions may get changed this way.
2455  *
2456  *	The InParallel statement allows the execution of complete expressions
2457  *	in a single worker simultaneously. This is useful for when there are
2458  *	many short expressions. This way we don't need the bottleneck of the
2459  *	merging by the master. The complete sort for each expression is done
2460  *	inside its own single worker. The bottleneck here is the writing of the
2461  *	result into the scratch system. This is now done by the workers themselves.
2462  *	Because each expression must be contiguous, the writing should be done
2463  *	as quickly as possible and be protected by locks.
2464  *
2465  *	The implementation of this statement gave a significant increase in
2466  *	efficiency in the running of the Multiple Zeta Values program.
2467  */
2468 
InParallelProcessor()2469 int InParallelProcessor()
2470 {
2471 	GETIDENTITY
2472 	int i, id, retval = 0, num = 0;
2473 	EXPRESSIONS e;
2474 	if ( numberofworkers >= 2 ) {
2475 		SetWorkerFiles();
2476 		for ( i = 0; i < NumExpressions; i++ ) {
2477 			e = Expressions+i;
2478 			if ( e->partodo <= 0 ) continue;
2479 			if ( e->status == LOCALEXPRESSION || e->status == GLOBALEXPRESSION
2480 			|| e->status == UNHIDELEXPRESSION || e->status == UNHIDEGEXPRESSION
2481 			|| e->status == INTOHIDELEXPRESSION || e->status == INTOHIDEGEXPRESSION ) {
2482 			}
2483 			else {
2484 				e->partodo = 0;
2485 				continue;
2486 			}
2487 			if ( e->counter == 0 ) { /* Expression with zero terms */
2488 				e->partodo = 0;
2489 				continue;
2490 			}
2491 /*
2492 			This expression should go to an idle worker
2493 */
2494 			while ( ( id = GetAvailableThread() ) < 0 ) { MasterWait(); }
2495 			LoadOneThread(0,id,0,-1);
2496 			AB[id]->R.exprtodo = i;
2497 			WakeupThread(id,DOONEEXPRESSION);
2498 			num++;
2499 		}
2500 /*
2501 		Now we have to wait for all workers to finish
2502 */
2503 		if ( num > 0 ) MasterWaitAll();
2504 
2505 		if ( AC.CollectFun ) AR.DeferFlag = 0;
2506 	}
2507 	else {
2508 		for ( i = 0; i < NumExpressions; i++ ) {
2509 			Expressions[i].partodo = 0;
2510 		}
2511 	}
2512 	return(retval);
2513 }
2514 
2515 /*
2516   	#] InParallelProcessor :
2517   	#[ ThreadsProcessor :
2518 */
2519 /**
2520  *	This routine takes the role of the central part of the Processor routine
2521  *	in the file proces.c when multiple threads are available.
2522  *	It deals with the expressions that are not marked in the InParallel
2523  *	statement. These are usually the large expressions. It will divide
2524  *	the terms of these expressions over the workers, using a bucket system
2525  *	to reduce overhead (buckets are collections of a number of terms that
2526  *	are transfered together).
2527  *	At the end of the expression when all terms have been assigned and
2528  *	workers become available again, there is a load balancing system to
2529  *	take terms from the buckets of workers that still have to do many terms
2530  *	and give them to idle workers. This is called first level load balancing.
2531  *
2532  *	A new feature is that for expressions with a bracket index the terms
2533  *	can be distributed in collections of complete brackets (12-nov-2009).
2534  *
2535  *	The routine is called for each expression separately by Processor.
2536  *
2537  *	@param e              The expression to be executed
2538  *	@param LastExpression Indicates whether it is the last expression in which case
2539  *	                      in the end the input scratch file can be deleted before
2540  *	                      the output is written. This saves diskspace.
2541  */
2542 
ThreadsProcessor(EXPRESSIONS e,WORD LastExpression,WORD fromspectator)2543 int ThreadsProcessor(EXPRESSIONS e, WORD LastExpression, WORD fromspectator)
2544 {
2545 	ALLPRIVATES *B0 = AB[0], *B = B0;
2546 	int id, oldgzipCompress, endofinput = 0, j, still, k, defcount = 0, bra = 0, first = 1;
2547 	LONG dd = 0, ddd, thrbufsiz, thrbufsiz0, thrbufsiz2, numbucket = 0, numpasses;
2548 	LONG num, i;
2549 	WORD *oldworkpointer = AT0.WorkPointer, *tt, *ttco = 0, *t1 = 0, ter, *tstop = 0, *t2;
2550 	THREADBUCKET *thr = 0;
2551 	FILEHANDLE *oldoutfile = AR0.outfile;
2552 	GETTERM GetTermP = &GetTerm;
2553 	POSITION eonfile = AS.OldOnFile[e-Expressions];
2554 	numberoffullbuckets = 0;
2555 /*
2556 	Start up all threads. The lock needs to be around the whole loop
2557 	to keep processes from terminating quickly and putting themselves
2558 	in the list of available threads again.
2559 */
2560 	AM.tracebackflag = 1;
2561 
2562 	AS.sLevel = AR0.sLevel;
2563 	LOCK(availabilitylock);
2564 	topofavailables = 0;
2565 	for ( id = 1; id <= numberofworkers; id++ ) {
2566 		WakeupThread(id,STARTNEWEXPRESSION);
2567 	}
2568 	UNLOCK(availabilitylock);
2569 	NewSort(BHEAD0);
2570 	AN0.ninterms = 1;
2571 /*
2572 	Now for redefine
2573 */
2574 	if ( AC.numpfirstnum > 0 ) {
2575 		for ( j = 0; j < AC.numpfirstnum; j++ ) {
2576 			AC.inputnumbers[j] = -1;
2577 		}
2578 	}
2579 	MasterWaitAll();
2580 /*
2581 	Determine a reasonable bucketsize.
2582 	This is based on the value of AC.ThreadBucketSize and the number
2583 	of terms. We want at least 5 buckets per worker at the moment.
2584 	Some research should show whether this is reasonable.
2585 
2586 	The number of terms in the expression is in e->counter
2587 */
2588 	thrbufsiz2 = thrbufsiz = AC.ThreadBucketSize-1;
2589 	if ( ( e->counter / ( numberofworkers * 5 ) ) < thrbufsiz ) {
2590 		thrbufsiz = e->counter / ( numberofworkers * 5 ) - 1;
2591 		if ( thrbufsiz < 0 ) thrbufsiz = 0;
2592 	}
2593 	thrbufsiz0 = thrbufsiz;
2594 	numpasses = 5; /* this is just for trying */
2595 	thrbufsiz = thrbufsiz0 / (2 << numpasses);
2596 /*
2597 	Mark all buckets as free and take the first.
2598 */
2599 	for ( j = 0; j < numthreadbuckets; j++ )
2600 		threadbuckets[j]->free = BUCKETFREE;
2601 	thr = threadbuckets[0];
2602 /*
2603   	#[ Whole brackets :
2604 
2605 	First we look whether we have to work with entire brackets
2606 	This is the case when there is a non-NULL address in e->bracketinfo.
2607 	Of course we shouldn't have interference from a collect or keep statement.
2608 */
2609 #ifdef WHOLEBRACKETS
2610 	if ( e->bracketinfo && AC.CollectFun == 0 && AR0.DeferFlag == 0 ) {
2611 		FILEHANDLE *curfile;
2612 		int didone = 0;
2613 		LONG num, n;
2614 		AN0.expr = e;
2615 		for ( n = 0; n < e->bracketinfo->indexfill; n++ ) {
2616 			num = TreatIndexEntry(B0,n);
2617 			if ( num > 0 ) {
2618 				didone = 1;
2619 /*
2620 				This bracket can be sent off.
2621 				1: Look for an empty bucket
2622 */
2623 ReTry:;
2624 				for ( j = 0; j < numthreadbuckets; j++ ) {
2625 					switch ( threadbuckets[j]->free ) {
2626 						case BUCKETFREE:
2627 							thr = threadbuckets[j];
2628 							goto Found1;
2629 						case BUCKETCOMINGFREE:
2630 							thr = threadbuckets[j];
2631 							thr->free = BUCKETFREE;
2632 							for ( k = j+1; k < numthreadbuckets; k++ )
2633 								threadbuckets[k-1] = threadbuckets[k];
2634 							threadbuckets[numthreadbuckets-1] = thr;
2635 							j--;
2636 							break;
2637 						default:
2638 							break;
2639 					}
2640 				}
2641 Found1:;
2642 				if ( j < numthreadbuckets ) {
2643 /*
2644 					Found an empty bucket. Fill it.
2645 */
2646 					thr->firstbracket = n;
2647 					thr->lastbracket = n + num - 1;
2648 					thr->type = BUCKETDOINGBRACKET;
2649 					thr->free = BUCKETFILLED;
2650 					thr->firstterm = AN0.ninterms;
2651 					for ( j = n; j < n+num; j++ ) {
2652 						AN0.ninterms += e->bracketinfo->indexbuffer[j].termsinbracket;
2653 					}
2654 					n += num-1;
2655 					numberoffullbuckets++;
2656 					if ( topofavailables > 0 ) {
2657 						SendOneBucket(DOBRACKETS);
2658 					}
2659 				}
2660 /*
2661 					All buckets are in use.
2662 					Look/wait for an idle worker. Give it a bucket.
2663 					After that, retry for a bucket
2664 */
2665 				else {
2666 					while ( topofavailables <= 0 ) {
2667 						MasterWait();
2668 					}
2669 					SendOneBucket(DOBRACKETS);
2670 					goto ReTry;
2671 				}
2672 			}
2673 		}
2674 		if ( didone ) {
2675 /*
2676 			And now put the input back in the original position.
2677 */
2678 			switch ( e->status ) {
2679 				case UNHIDELEXPRESSION:
2680 				case UNHIDEGEXPRESSION:
2681 				case DROPHLEXPRESSION:
2682 				case DROPHGEXPRESSION:
2683 				case HIDDENLEXPRESSION:
2684 				case HIDDENGEXPRESSION:
2685 					curfile = AR0.hidefile;
2686 					break;
2687 				default:
2688 					curfile = AR0.infile;
2689 					break;
2690 			}
2691 			SetScratch(curfile,&eonfile);
2692 			GetTerm(B0,AT0.WorkPointer);
2693 /*
2694 			Now we point the GetTerm that is used to the one that is selective
2695 */
2696 			GetTermP = &GetTerm2;
2697 /*
2698 			Next wait till there is a bucket available and initialize thr to it.
2699 */
2700 			for(;;) {
2701 				for ( j = 0; j < numthreadbuckets; j++ ) {
2702 					switch ( threadbuckets[j]->free ) {
2703 						case BUCKETFREE:
2704 							thr = threadbuckets[j];
2705 							goto Found2;
2706 						case BUCKETCOMINGFREE:
2707 							thr = threadbuckets[j];
2708 							thr->free = BUCKETFREE;
2709 							for ( k = j+1; k < numthreadbuckets; k++ )
2710 								threadbuckets[k-1] = threadbuckets[k];
2711 							threadbuckets[numthreadbuckets-1] = thr;
2712 							j--;
2713 							break;
2714 						default:
2715 							break;
2716 					}
2717 				}
2718 				while ( topofavailables <= 0 ) {
2719 					MasterWait();
2720 				}
2721 				while ( topofavailables > 0 && numberoffullbuckets > 0 ) {
2722 					SendOneBucket(DOBRACKETS);
2723 				}
2724 			}
2725 Found2:;
2726 			while ( numberoffullbuckets > 0 ) {
2727 				while ( topofavailables <= 0 ) {
2728 					MasterWait();
2729 				}
2730 				while ( topofavailables > 0 && numberoffullbuckets > 0 ) {
2731 					SendOneBucket(DOBRACKETS);
2732 				}
2733 			}
2734 /*
2735 			Disable the 'warming up' with smaller buckets.
2736 
2737 			numpasses = 0;
2738 			thrbufsiz = thrbufsiz0;
2739 */
2740 			AN0.lastinindex = -1;
2741 		}
2742 		MasterWaitAll();
2743 	}
2744 #endif
2745 /*
2746   	#] Whole brackets :
2747 
2748 	Now the loop to start a bucket
2749 */
2750 	for(;;) {
2751 		if ( fromspectator ) {
2752 			ter = GetFromSpectator(thr->threadbuffer,fromspectator-1);
2753 			if ( ter == 0 ) fromspectator = 0;
2754 		}
2755 		else {
2756 			ter = GetTermP(B0,thr->threadbuffer);
2757 		}
2758 		if ( ter < 0 ) break;
2759 		if ( ter == 0 ) { endofinput = 1; goto Finalize; }
2760 		dd = AN0.deferskipped;
2761 		if ( AR0.DeferFlag ) {
2762 			defcount = 0;
2763 			thr->deferbuffer[defcount++] = AR0.DefPosition;
2764 			ttco = thr->compressbuffer; t1 = AR0.CompressBuffer; j = *t1;
2765 			NCOPY(ttco,t1,j);
2766 		}
2767 		else if ( first && ( AC.CollectFun == 0 ) ) { /* Brackets ? */
2768 			first = 0;
2769 			t1 = tstop = thr->threadbuffer;
2770 			tstop += *tstop; tstop -= ABS(tstop[-1]);
2771 			t1++;
2772 			while ( t1 < tstop ) {
2773 				if ( t1[0] == HAAKJE ) { bra = 1; break; }
2774 				t1 += t1[1];
2775 			}
2776 			t1 = thr->threadbuffer;
2777 		}
2778 /*
2779 		Check whether we have a collect,function going. If so execute it.
2780 */
2781 		if ( AC.CollectFun && *(thr->threadbuffer) < (AM.MaxTer/((LONG)sizeof(WORD))-10) ) {
2782 			if ( ( dd = GetMoreTerms(thr->threadbuffer) ) < 0 ) {
2783 				LowerSortLevel(); goto ProcErr;
2784 			}
2785 		}
2786 /*
2787 		Check whether we have a priority task:
2788 */
2789 		if ( topofavailables > 0 && numberoffullbuckets > 0 ) SendOneBucket(LOWESTLEVELGENERATION);
2790 /*
2791 		Now put more terms in the bucket. Position tt after the first term
2792 */
2793 		tt = thr->threadbuffer; tt += *tt;
2794 		thr->totnum = 1;
2795 		thr->usenum = 0;
2796 /*
2797 		Next we worry about the 'slow startup' in which we make the initial
2798 		buckets smaller, so that we get all threads busy as soon as possible.
2799 */
2800 		if ( numpasses > 0 ) {
2801 			numbucket++;
2802 			if ( numbucket >= numberofworkers ) {
2803 				numbucket = 0;
2804 				numpasses--;
2805 				if ( numpasses == 0 ) thrbufsiz = thrbufsiz0;
2806 				else                  thrbufsiz = thrbufsiz0 / (2 << numpasses);
2807 			}
2808 			thrbufsiz2 = thrbufsiz + thrbufsiz/5; /* for completing brackets */
2809 		}
2810 /*
2811 		we have already 1+dd terms
2812 */
2813 		while ( ( dd < thrbufsiz ) &&
2814 			( tt - thr->threadbuffer ) < ( thr->threadbuffersize - AM.MaxTer/((LONG)sizeof(WORD)) - 2 ) ) {
2815 /*
2816 			First check:
2817 */
2818 			if ( topofavailables > 0 && numberoffullbuckets > 0 ) SendOneBucket(LOWESTLEVELGENERATION);
2819 /*
2820 			There is room in the bucket. Fill yet another term.
2821 */
2822 			if ( GetTermP(B0,tt) == 0 ) { endofinput = 1; break; }
2823 			dd++;
2824 			thr->totnum++;
2825 			dd += AN0.deferskipped;
2826 			if ( AR0.DeferFlag ) {
2827 				thr->deferbuffer[defcount++] = AR0.DefPosition;
2828 				t1 = AR0.CompressBuffer; j = *t1;
2829 				NCOPY(ttco,t1,j);
2830 			}
2831 			if ( AC.CollectFun && *tt < (AM.MaxTer/((LONG)sizeof(WORD))-10) ) {
2832 				if ( ( ddd = GetMoreTerms(tt) ) < 0 ) {
2833 					LowerSortLevel(); goto ProcErr;
2834 				}
2835 				dd += ddd;
2836 			}
2837 			t1 = tt;
2838 			tt += *tt;
2839 		}
2840 /*
2841 		Check whether there are regular brackets and if we have no DeferFlag
2842 		and no collect, we try to add more terms till we finish the current
2843 		bracket. We should however not overdo it. Let us say: up to 20%
2844 		more terms are allowed.
2845 */
2846 		if ( bra ) {
2847 			tstop = t1 + *t1; tstop -= ABS(tstop[-1]);
2848 			t2 = t1+1;
2849 			while ( t2 < tstop ) {
2850 				if ( t2[0] == HAAKJE ) { break; }
2851 				t2 += t2[1];
2852 			}
2853 			if ( t2[0] == HAAKJE ) {
2854 			  t2 += t2[1]; num = t2 - t1;
2855 			  while ( ( dd < thrbufsiz2 ) &&
2856 				( tt - thr->threadbuffer ) < ( thr->threadbuffersize - AM.MaxTer - 2 ) ) {
2857 /*
2858 				First check:
2859 */
2860 				if ( topofavailables > 0 && numberoffullbuckets > 0 ) SendOneBucket(LOWESTLEVELGENERATION);
2861 /*
2862 				There is room in the bucket. Fill yet another term.
2863 */
2864 				if ( GetTermP(B0,tt) == 0 ) { endofinput = 1; break; }
2865 /*
2866 				Same bracket?
2867 */
2868 				tstop = tt + *tt; tstop -= ABS(tstop[-1]);
2869 				if ( tstop-tt < num ) { /* Different: abort */
2870 					AR0.KeptInHold = 1;
2871 					break;
2872 				}
2873 				for ( i = 1; i < num; i++ ) {
2874 					if ( t1[i] != tt[i] ) break;
2875 				}
2876 				if ( i < num ) { /* Different: abort */
2877 					AR0.KeptInHold = 1;
2878 					break;
2879 				}
2880 /*
2881 				Same bracket. We need this term.
2882 */
2883 				dd++;
2884 				thr->totnum++;
2885 				tt += *tt;
2886 			  }
2887 			}
2888 		}
2889 		thr->ddterms = dd; /* total number of terms including keep brackets */
2890 		thr->firstterm = AN0.ninterms;
2891 		AN0.ninterms += dd;
2892 		*tt = 0;           /* mark end of bucket */
2893 		thr->free = BUCKETFILLED;
2894 		thr->type = BUCKETDOINGTERMS;
2895 		numberoffullbuckets++;
2896 		if ( topofavailables <= 0 && endofinput == 0 ) {
2897 /*
2898 			Problem: topofavailables may already be > 0, but the
2899 			thread has not yet gone into waiting. Can the signal get lost?
2900 			How can we tell that a thread is waiting for a signal?
2901 
2902 			All threads are busy. Try to load up another bucket.
2903 			In the future we could be more sophisticated.
2904 			At the moment we load a complete bucket which could be
2905 			1000 terms or even more.
2906 			In principle it is better to keep a full bucket ready
2907 			and check after each term we put in the next bucket. That
2908 			way we don't waste time of the workers.
2909 */
2910 			for ( j = 0; j < numthreadbuckets; j++ ) {
2911 				switch ( threadbuckets[j]->free ) {
2912 					case BUCKETFREE:
2913 						thr = threadbuckets[j];
2914 						if ( !endofinput ) goto NextBucket;
2915 /*
2916 						If we are at the end of the input we mark
2917 						the free buckets in a special way. That way
2918 						we don't keep running into them.
2919 */
2920 						thr->free = BUCKETATEND;
2921 						break;
2922 					case BUCKETCOMINGFREE:
2923 						thr = threadbuckets[j];
2924 						thr->free = BUCKETFREE;
2925 /*
2926 						Bucket has just been finished.
2927 						Put at the end of the list. We don't want
2928 						an early bucket to wait to be treated last.
2929 */
2930 						for ( k = j+1; k < numthreadbuckets; k++ )
2931 							threadbuckets[k-1] = threadbuckets[k];
2932 						threadbuckets[numthreadbuckets-1] = thr;
2933 						j--;   /* we have to redo the same number j. */
2934 						break;
2935 					default:
2936 						break;
2937 				}
2938 			}
2939 /*
2940 			We have no free bucket or we are at the end.
2941 			The only thing we can do now is wait for a worker to come free,
2942 			provided there are still buckets to send.
2943 */
2944 		}
2945 /*
2946 		Look for the next bucket to send. There is at least one full bucket!
2947 */
2948 		for ( j = 0; j < numthreadbuckets; j++ ) {
2949 			if ( threadbuckets[j]->free == BUCKETFILLED ) {
2950 				thr = threadbuckets[j];
2951 				for ( k = j+1; k < numthreadbuckets; k++ )
2952 					threadbuckets[k-1] = threadbuckets[k];
2953 				threadbuckets[numthreadbuckets-1] = thr;
2954 				break;
2955 			}
2956 		}
2957 /*
2958 		Wait for a thread to become available
2959 		The bucket we are going to use is in thr.
2960 */
2961 DoBucket:;
2962 		AN0.ninterms++;
2963 		while ( ( id = GetAvailableThread() ) < 0 ) { MasterWait(); }
2964 /*
2965 		Prepare the thread. Give it the term and variables.
2966 */
2967 		LoadOneThread(0,id,thr,0);
2968 		LOCK(thr->lock);
2969 		thr->busy = BUCKETASSIGNED;
2970 		UNLOCK(thr->lock);
2971 		thr->free = BUCKETINUSE;
2972 		numberoffullbuckets--;
2973 /*
2974 		And signal the thread to run.
2975 		Form now on we may only interfere with this bucket
2976 		1: after it has been marked BUCKETCOMINGFREE
2977 		2: when thr->busy == BUCKETDOINGTERM and then only when protected by
2978 		   thr->lock. This would be for load balancing.
2979 */
2980 		WakeupThread(id,LOWESTLEVELGENERATION);
2981 /*		AN0.ninterms += thr->ddterms; */
2982 /*
2983 		Now look whether there is another bucket filled and a worker available
2984 */
2985 		if ( topofavailables > 0 ) {  /* there is a worker */
2986 			for ( j = 0; j < numthreadbuckets; j++ ) {
2987 				if ( threadbuckets[j]->free == BUCKETFILLED ) {
2988 					thr = threadbuckets[j];
2989 					for ( k = j+1; k < numthreadbuckets; k++ )
2990 						threadbuckets[k-1] = threadbuckets[k];
2991 					threadbuckets[numthreadbuckets-1] = thr;
2992 					goto DoBucket; /* and we found a bucket */
2993 				}
2994 			}
2995 /*
2996 			no bucket is loaded but there is a thread available
2997 			find a bucket to load. If there is none (all are USED or ATEND)
2998 			we jump out of the loop.
2999 */
3000 			for ( j = 0; j < numthreadbuckets; j++ ) {
3001 				switch ( threadbuckets[j]->free ) {
3002 					case BUCKETFREE:
3003 						thr = threadbuckets[j];
3004 						if ( !endofinput ) goto NextBucket;
3005 						thr->free = BUCKETATEND;
3006 						break;
3007 					case BUCKETCOMINGFREE:
3008 						thr = threadbuckets[j];
3009 						if ( endofinput ) {
3010 							thr->free = BUCKETATEND;
3011 						}
3012 						else {
3013 							thr->free = BUCKETFREE;
3014 							for ( k = j+1; k < numthreadbuckets; k++ )
3015 								threadbuckets[k-1] = threadbuckets[k];
3016 							threadbuckets[numthreadbuckets-1] = thr;
3017 							j--;
3018 						}
3019 						break;
3020 					default:
3021 						break;
3022 				}
3023 			}
3024 			if ( j >= numthreadbuckets ) break;
3025 		}
3026 		else {
3027 /*
3028 			No worker available.
3029 			Look for a bucket to load.
3030 			Its number will be in "still"
3031 */
3032 Finalize:;
3033 			still = -1;
3034 			for ( j = 0; j < numthreadbuckets; j++ ) {
3035 				switch ( threadbuckets[j]->free ) {
3036 					case BUCKETFREE:
3037 						thr = threadbuckets[j];
3038 						if ( !endofinput ) goto NextBucket;
3039 						thr->free = BUCKETATEND;
3040 						break;
3041 					case BUCKETCOMINGFREE:
3042 						thr = threadbuckets[j];
3043 						if ( endofinput ) thr->free = BUCKETATEND;
3044 						else {
3045 							thr->free = BUCKETFREE;
3046 							for ( k = j+1; k < numthreadbuckets; k++ )
3047 								threadbuckets[k-1] = threadbuckets[k];
3048 							threadbuckets[numthreadbuckets-1] = thr;
3049 							j--;
3050 						}
3051 						break;
3052 					case BUCKETFILLED:
3053 						if ( still < 0 ) still = j;
3054 						break;
3055 					default:
3056 						break;
3057 				}
3058 			}
3059 			if ( still < 0 ) {
3060 /*
3061 				No buckets to be executed and no buckets FREE.
3062 				We must be at the end. Break out of the loop.
3063 */
3064 				break;
3065 			}
3066 			thr = threadbuckets[still];
3067 			for ( k = still+1; k < numthreadbuckets; k++ )
3068 				threadbuckets[k-1] = threadbuckets[k];
3069 			threadbuckets[numthreadbuckets-1] = thr;
3070 			goto DoBucket;
3071 		}
3072 NextBucket:;
3073 	}
3074 /*
3075 	Now the stage one load balancing.
3076 	If the load has been readjusted we have again filled buckets.
3077 	In that case we jump back in the loop.
3078 
3079 	Tricky point: when do the workers see the new value of AT.LoadBalancing?
3080 	It should activate the locks on thr->busy
3081 */
3082 	if ( AC.ThreadBalancing ) {
3083 		for ( id = 1; id <= numberofworkers; id++ ) {
3084 			AB[id]->T.LoadBalancing = 1;
3085 		}
3086 		if ( LoadReadjusted() ) goto Finalize;
3087 		for ( id = 1; id <= numberofworkers; id++ ) {
3088 			AB[id]->T.LoadBalancing = 0;
3089 		}
3090 	}
3091 	if ( AC.ThreadBalancing ) {
3092 /*
3093 		The AS.Balancing flag should have Generator look for
3094 		free workers and apply the "buro" method.
3095 
3096 		There is still a serious problem.
3097 		When for instance a sum_, there may be space created in a local
3098 		compiler buffer for a wildcard substitution or whatever.
3099 		Compiler buffer execution scribble space.....
3100 		This isn't copied along?
3101 		Look up ebufnum. There are 12 places with AddRHS!
3102 		Problem: one process alloces in ebuf. Then term is given to
3103 		other process. It would like to use from this ebuf, but the sender
3104 		finishes first and removes the ebuf (and/or overwrites it).
3105 
3106 		Other problem: local $ variables aren't copied along.
3107 */
3108 		AS.Balancing = 0;
3109 	}
3110 	MasterWaitAll();
3111 	AS.Balancing = 0;
3112 /*
3113 	When we deal with the last expression we can now remove the input
3114 	scratch file. This saves potentially much disk space (up to 1/3)
3115 */
3116 	if ( LastExpression ) {
3117 		UpdateMaxSize();
3118 		if ( AR0.infile->handle >= 0 ) {
3119 			CloseFile(AR0.infile->handle);
3120 			AR0.infile->handle = -1;
3121 			remove(AR0.infile->name);
3122 			PUTZERO(AR0.infile->POposition);
3123 			AR0.infile->POfill = AR0.infile->POfull = AR0.infile->PObuffer;
3124 		}
3125 	}
3126 /*
3127 	We order the threads to finish in the MasterMerge routine
3128 	It will start with waiting for all threads to finish.
3129 	One could make an administration in which threads that have
3130 	finished can start already with the final sort but
3131 	1: The load balancing should not make this super urgent
3132 	2: It would definitely not be very compatible with the second
3133 	   stage load balancing.
3134 */
3135 	oldgzipCompress = AR0.gzipCompress;
3136 	AR0.gzipCompress = 0;
3137 	if ( AR0.outtohide ) AR0.outfile = AR0.hidefile;
3138 	if ( MasterMerge() < 0 ) {
3139 		if ( AR0.outtohide ) AR0.outfile = oldoutfile;
3140 		AR0.gzipCompress = oldgzipCompress;
3141 		goto ProcErr;
3142 	}
3143 	if ( AR0.outtohide ) AR0.outfile = oldoutfile;
3144 	AR0.gzipCompress = oldgzipCompress;
3145 /*
3146 	Now wait for all threads to be ready to give them the cleaning up signal.
3147 	With the new MasterMerge routine we can do the cleanup already automatically
3148 	avoiding having to send these signals.
3149 */
3150 	MasterWaitAll();
3151 	AR0.sLevel--;
3152 	for ( id = 1; id < AM.totalnumberofthreads; id++ ) {
3153 		if ( GetThread(id) > 0 ) WakeupThread(id,CLEANUPEXPRESSION);
3154 	}
3155 	e->numdummies = 0;
3156 	for ( id = 1; id < AM.totalnumberofthreads; id++ ) {
3157 		if ( AB[id]->R.MaxDum - AM.IndDum > e->numdummies )
3158 			e->numdummies = AB[id]->R.MaxDum - AM.IndDum;
3159 		AR0.expchanged |= AB[id]->R.expchanged;
3160 	}
3161 /*
3162 	And wait for all to be clean.
3163 */
3164 	MasterWaitAll();
3165 	AT0.WorkPointer = oldworkpointer;
3166 	return(0);
3167 ProcErr:;
3168 	return(-1);
3169 }
3170 
3171 /*
3172   	#] ThreadsProcessor :
3173   	#[ LoadReadjusted :
3174 */
3175 /**
3176  *	This routine does the load readjustment at the end of a module.
3177  *	It may be that there are still some threads that have a bucket full of
3178  *	difficult terms. In that case we steal the bucket from such a thread
3179  *	and redistribute the terms over the available buckets to be sent to
3180  *	the free threads. As we steal all remaining terms from the bucket
3181  *	it can happen that eventually the same worker gets some of the terms
3182  *	back at a later stage.
3183  *
3184  *	The only tricky point is the stealing process. We have to do this
3185  *	without having to send signals or testing locks for each term processed.
3186  *	The lock is set around thr->busy when AT.LoadBalancing == 1 but
3187  *	when does the worker see this? (caching?)
3188  *
3189  *	Remark: the thr->busy == BUCKETASSIGNED flag is to prevent stealing
3190  *	from a thread that has not done anything yet.
3191  */
3192 
LoadReadjusted()3193 int LoadReadjusted()
3194 {
3195 	ALLPRIVATES *B0 = AB[0];
3196 	THREADBUCKET *thr = 0, *thrtogo = 0;
3197 	int numtogo, numfree, numbusy, n, nperbucket, extra, i, j, u, bus;
3198 	LONG numinput;
3199 	WORD *t1, *c1, *t2, *c2, *t3;
3200 /*
3201 	Start with waiting for at least one free processor.
3202 	We don't want the master competing for time when all are busy.
3203 */
3204 	while ( topofavailables <= 0 ) MasterWait();
3205 /*
3206 	Now look for the fullest bucket and make a list of free buckets
3207 	The bad part is that most numbers can change at any moment.
3208 */
3209 restart:;
3210 	numtogo = 0;
3211 	numfree = 0;
3212 	numbusy = 0;
3213 	for ( j = 0; j < numthreadbuckets; j++ ) {
3214 		thr = threadbuckets[j];
3215 		if ( thr->free == BUCKETFREE || thr->free == BUCKETATEND
3216 		|| thr->free == BUCKETCOMINGFREE ) {
3217 			freebuckets[numfree++] = thr;
3218 		}
3219 		else if ( thr->type != BUCKETDOINGTERMS ) {}
3220 		else if ( thr->totnum > 1 ) { /* never steal from a bucket with one term */
3221 			LOCK(thr->lock);
3222 			bus = thr->busy;
3223 			UNLOCK(thr->lock);
3224 			if ( thr->free == BUCKETINUSE ) {
3225 				n = thr->totnum-thr->usenum;
3226 				if ( bus == BUCKETASSIGNED ) numbusy++;
3227 				else if ( ( bus != BUCKETASSIGNED )
3228 					   && ( n > numtogo ) ) {
3229 					numtogo = n;
3230 					thrtogo = thr;
3231 				}
3232 			}
3233 			else if ( bus == BUCKETTOBERELEASED
3234 			 && thr->free == BUCKETRELEASED ) {
3235 				freebuckets[numfree++] = thr;
3236 				thr->free = BUCKETATEND;
3237 				LOCK(thr->lock);
3238 				thr->busy = BUCKETPREPARINGTERM;
3239 				UNLOCK(thr->lock);
3240 			}
3241 		}
3242 	}
3243 	if ( numfree == 0 ) return(0); /* serious problem */
3244 	if ( numtogo > 0 ) {   /* provisionally there is something to be stolen */
3245 		thr = thrtogo;
3246 /*
3247 		If the number has changed there is good progress.
3248 		Maybe there is another thread that needs assistence.
3249 		We start all over.
3250 */
3251 		if ( thr->totnum-thr->usenum < numtogo ) goto restart;
3252 /*
3253 		If the thread is in the term loading phace
3254 		(thr->busy == BUCKETPREPARINGTERM) we better stay away from it.
3255 		We wait now for the thread to be busy, and don't allow it
3256 		now to drop out of this state till we are done here.
3257 		This all depends on whether AT.LoadBalancing == 1 is seen by
3258 		the thread.
3259 */
3260 		LOCK(thr->lock);
3261 		if ( thr->busy != BUCKETDOINGTERM ) {
3262 			UNLOCK(thr->lock);
3263 			goto restart;
3264 		}
3265 		if ( thr->totnum-thr->usenum < numtogo ) {
3266 			UNLOCK(thr->lock);
3267 			goto restart;
3268 		}
3269 		thr->free = BUCKETTERMINATED;
3270 /*
3271 		The above will signal the thread we want to terminate.
3272 		Next all effort goes into making sure the landing is soft.
3273 		Unfortunately we don't want to wait for a signal, because the thread
3274 		may be working for a long time on a single term.
3275 */
3276 		if ( thr->usenum == thr->totnum ) {
3277 /*
3278 			Terminated in the mean time or by now working on the
3279 			last term. Try again.
3280 */
3281 			thr->free = BUCKETATEND;
3282 			UNLOCK(thr->lock);
3283 			goto restart;
3284 		}
3285 		goto intercepted;
3286 	}
3287 /*	UNLOCK(thr->lock); */
3288 	if ( numbusy > 0 ) return(1); /* Wait a bit.... */
3289 	return(0);
3290 intercepted:;
3291 /*
3292 	We intercepted one successfully. Now it becomes interesting. Action:
3293 	1: determine how many terms per free bucket.
3294 	2: find the first untreated term.
3295 	3: put the terms in the free buckets.
3296 
3297 	Remember: we have the lock to avoid interference from the thread
3298 	that is being robbed.
3299 */
3300 	numinput = thr->firstterm + thr->usenum;
3301 	nperbucket = numtogo / numfree;
3302 	extra = numtogo - nperbucket*numfree;
3303 	if ( AR0.DeferFlag ) {
3304 		t1 = thr->threadbuffer; c1 = thr->compressbuffer; u = thr->usenum;
3305 		for ( n = 0; n < thr->usenum; n++ ) { t1 += *t1; c1 += *c1; }
3306 		t3 = t1;
3307 		if ( extra > 0 ) {
3308 		  for ( i = 0; i < extra; i++ ) {
3309 			thrtogo = freebuckets[i];
3310 			t2 = thrtogo->threadbuffer;
3311 			c2 = thrtogo->compressbuffer;
3312 			thrtogo->free = BUCKETFILLED;
3313 			thrtogo->type = BUCKETDOINGTERMS;
3314 			thrtogo->totnum = nperbucket+1;
3315 			thrtogo->ddterms = 0;
3316 			thrtogo->usenum = 0;
3317 			thrtogo->busy = BUCKETASSIGNED;
3318 			thrtogo->firstterm = numinput;
3319 			numinput += nperbucket+1;
3320 			for ( n = 0; n <= nperbucket; n++ ) {
3321 				j = *t1; NCOPY(t2,t1,j);
3322 				j = *c1; NCOPY(c2,c1,j);
3323 				thrtogo->deferbuffer[n] = thr->deferbuffer[u++];
3324 			}
3325 			*t2 = *c2 = 0;
3326 		  }
3327 		}
3328 		if ( nperbucket > 0 ) {
3329 		  for ( i = extra; i < numfree; i++ ) {
3330 			thrtogo = freebuckets[i];
3331 			t2 = thrtogo->threadbuffer;
3332 			c2 = thrtogo->compressbuffer;
3333 			thrtogo->free = BUCKETFILLED;
3334 			thrtogo->type = BUCKETDOINGTERMS;
3335 			thrtogo->totnum = nperbucket;
3336 			thrtogo->ddterms = 0;
3337 			thrtogo->usenum = 0;
3338 			thrtogo->busy = BUCKETASSIGNED;
3339 			thrtogo->firstterm = numinput;
3340 			numinput += nperbucket;
3341 			for ( n = 0; n < nperbucket; n++ ) {
3342 				j = *t1; NCOPY(t2,t1,j);
3343 				j = *c1; NCOPY(c2,c1,j);
3344 				thrtogo->deferbuffer[n] = thr->deferbuffer[u++];
3345 			}
3346 			*t2 = *c2 = 0;
3347 		  }
3348 		}
3349 	}
3350 	else {
3351 		t1 = thr->threadbuffer;
3352 		for ( n = 0; n < thr->usenum; n++ ) { t1 += *t1; }
3353 		t3 = t1;
3354 		if ( extra > 0 ) {
3355 		  for ( i = 0; i < extra; i++ ) {
3356 			thrtogo = freebuckets[i];
3357 			t2 = thrtogo->threadbuffer;
3358 			thrtogo->free = BUCKETFILLED;
3359 			thrtogo->type = BUCKETDOINGTERMS;
3360 			thrtogo->totnum = nperbucket+1;
3361 			thrtogo->ddterms = 0;
3362 			thrtogo->usenum = 0;
3363 			thrtogo->busy = BUCKETASSIGNED;
3364 			thrtogo->firstterm = numinput;
3365 			numinput += nperbucket+1;
3366 			for ( n = 0; n <= nperbucket; n++ ) {
3367 				j = *t1; NCOPY(t2,t1,j);
3368 			}
3369 			*t2 = 0;
3370 		  }
3371 		}
3372 		if ( nperbucket > 0 ) {
3373 		  for ( i = extra; i < numfree; i++ ) {
3374 			thrtogo = freebuckets[i];
3375 			t2 = thrtogo->threadbuffer;
3376 			thrtogo->free = BUCKETFILLED;
3377 			thrtogo->type = BUCKETDOINGTERMS;
3378 			thrtogo->totnum = nperbucket;
3379 			thrtogo->ddterms = 0;
3380 			thrtogo->usenum = 0;
3381 			thrtogo->busy = BUCKETASSIGNED;
3382 			thrtogo->firstterm = numinput;
3383 			numinput += nperbucket;
3384 			for ( n = 0; n < nperbucket; n++ ) {
3385 				j = *t1; NCOPY(t2,t1,j);
3386 			}
3387 			*t2 = 0;
3388 		  }
3389 		}
3390 	}
3391 	*t3 = 0;   /* This is some form of extra insurance */
3392 	if ( thr->free == BUCKETRELEASED && thr->busy == BUCKETTOBERELEASED ) {
3393 		thr->free = BUCKETATEND; thr->busy = BUCKETPREPARINGTERM;
3394 	}
3395 	UNLOCK(thr->lock);
3396 	return(1);
3397 }
3398 
3399 /*
3400   	#] LoadReadjusted :
3401   	#[ SortStrategy :
3402 */
3403 /**
3404  *	When the final sort to the scratch file should take place
3405  *	in a thread we should redirect to a different PutOut say PutToMaster.
3406  *	The buffer in the Master should be an integer number times the size
3407  *	of the buffer for PutToMaster (the PObuffersize in the 'scratchfile').
3408  *	The action should be (assume the multiple is 3):
3409  *		Once the worker has its buffer full it fills block 1. Next 2. etc.
3410  *		After filling block 3 the next fill will be at 1 when it is available
3411  *		again. Becarefull to have a locked variable that indicates whether the
3412  *		Master has started to claim all blocks 1.
3413  *		The Master starts working once all blocks 1 are full.
3414  *		Each Worker has an array for the blocks that tells their status. ???
3415  *		(Maybe better the lock on the whole block).
3416  *		There should be a lock on them. The locks will make the threads
3417  *		wait properly. When the Master finished a block, it marks it as
3418  *		empty. When the master reaches the end of the last block it moves
3419  *		the remainder to the piece before block 1. Etc.
3420  *		Once terminated the worker can do the same as currently after
3421  *		the call to EndSort (leave control to the master).
3422  *		The master starts after there is a signal that all blocks 1 have
3423  *		been filled. The tricky point is this signal without having
3424  *		threads spend time in a waiting loop.
3425  *	Don't compress the terms. It costs more time and serves here no real
3426  *	purpose. It only makes things slower for the master.
3427  *
3428  *	At the moment the scratch buffer of the workers is 1/N times the scratch
3429  *	buffer of the master which is usually about the size of the Large buffer
3430  *	of the master. This way we can save a factor on the scratch buffer size
3431  *	of the workers. Alternative: let PutToMaster write directly into the
3432  *	buffer/block of the master and leave out the scratch of the worker
3433  *	completely.
3434 */
3435 /*
3436   	#] SortStrategy :
3437   	#[ PutToMaster :
3438 */
3439 /**
3440  *		Writes the term (uncompressed) to the masters buffers.
3441  *		We put it inside a block. The blocks have locks. This makes
3442  *		that we have to wait automatically when all blocks are full.
3443  *		This routine takes the place of PutOut when making the final
3444  *		sort in a thread.
3445  *		It takes the place of FlushOut when the argument is NULL.
3446  *
3447  *		We need an initialization first in which the first MasterBlockLock
3448  *		is set and MasterBlock is set to 1.
3449  *		At the end we need to unlock the last block. Both actions can
3450  *		be done in the routine that calls EndSort for the thread.
3451  *
3452  *		The initialization of the variables in SB is done in
3453  *		IniSortBlocks. This is done only once but it has to wait till
3454  *		all threads exist and the masters sort buffers have been allocated.
3455  *
3456  *		Note: the zero block is reserved for leftovers at the end of the
3457  *		last block that get moved back to the front to keep the terms
3458  *		contiguous (done in MasterMerge).
3459  */
3460 
PutToMaster(PHEAD WORD * term)3461 int PutToMaster(PHEAD WORD *term)
3462 {
3463 	int i,j,nexti,ret = 0;
3464 	WORD *t, *fill, *top, zero = 0;
3465 	if ( term == 0 ) { /* Mark the end of the expression */
3466 		t = &zero; j = 1;
3467 	}
3468 	else {
3469 		t = term; ret = j = *term;
3470 		if ( j == 0 ) { j = 1; } /* Just in case there is a spurious end */
3471 	}
3472 	i = AT.SB.FillBlock;          /* The block we are working at */
3473 	fill = AT.SB.MasterFill[i];     /* Where we are filling */
3474 	top = AT.SB.MasterStop[i];      /* End of the block */
3475 	while ( j > 0 ) {
3476 		while ( j > 0 && fill < top ) {
3477 			*fill++ = *t++; j--;
3478 		}
3479 		if ( j > 0 ) {
3480 /*
3481 			We reached the end of the block.
3482 			Get the next block and release this block.
3483 			The order is important. This is why there should be at least
3484 			4 blocks or deadlocks can occur.
3485 */
3486 			nexti = i+1;
3487 			if ( nexti > AT.SB.MasterNumBlocks ) {
3488 				nexti = 1;
3489 			}
3490 			LOCK(AT.SB.MasterBlockLock[nexti]);
3491 			UNLOCK(AT.SB.MasterBlockLock[i]);
3492 			AT.SB.MasterFill[i] = AT.SB.MasterStart[i];
3493 			AT.SB.FillBlock = i = nexti;
3494 			fill = AT.SB.MasterStart[i];
3495 			top = AT.SB.MasterStop[i];
3496 		}
3497 	}
3498 	AT.SB.MasterFill[i] = fill;
3499 	return(ret);
3500 }
3501 
3502 /*
3503   	#] PutToMaster :
3504   	#[ SortBotOut :
3505 */
3506 
3507 #ifdef WITHSORTBOTS
3508 
3509 /**
3510  *		This is the output routine of the SortBots.
3511  *		It can run PutToMaster, except for the final merge.
3512  *		In that case we need to do special things like calling PutOut.
3513  *		Hence the first thing we have to do is to figure out where our
3514  *		output should be going.
3515  */
3516 
3517 int
SortBotOut(PHEAD WORD * term)3518 SortBotOut(PHEAD WORD *term)
3519 {
3520 	WORD im;
3521 
3522 	if ( AT.identity != 0 ) return(PutToMaster(BHEAD term));
3523 
3524 	if ( term == 0 ) {
3525 		if ( FlushOut(&SortBotPosition,AR.outfile,1) ) return(-1);
3526 		ADDPOS(AT.SS->SizeInFile[0],1);
3527 		return(0);
3528 	}
3529 	else {
3530 		numberofterms++;
3531 		if ( ( im = PutOut(BHEAD term,&SortBotPosition,AR.outfile,1) ) < 0 ) {
3532 			MLOCK(ErrorMessageLock);
3533 			MesPrint("Called from MasterMerge/SortBotOut");
3534 			MUNLOCK(ErrorMessageLock);
3535 			return(-1);
3536 		}
3537 		ADDPOS(AT.SS->SizeInFile[0],im);
3538 		return(im);
3539 	}
3540 }
3541 
3542 #endif
3543 
3544 /*
3545   	#] SortBotOut :
3546   	#[ MasterMerge :
3547 */
3548 /**
3549  *	This is the routine in which the master merges the sorted output that
3550  *	comes from the workers. It is similar to MergePatches in sort.c from which
3551  *	it takes much code.
3552  *	The important concept here is that we want the master to be working as
3553  *	little as possible because it constitutes the bottleneck.
3554  *	The workers fill the buffers of the master. These buffers are divided
3555  *	into parts for each worker as is done with the file patches in MergePatches
3556  *	but now also each worker part is divided into blocks. This allows the
3557  *	worker to fill blocks while the master is already working on blocks that
3558  *	were filled before. The blocks are arranged in a circular fashion.
3559  *	The whole is controled by locks which seems faster than setting it up
3560  *	with signals.
3561  *
3562  *	This routine is run by the master when we don't use the sortbots.
3563  */
3564 
MasterMerge()3565 int MasterMerge()
3566 {
3567 	ALLPRIVATES *B0 = AB[0], *B = 0;
3568 	SORTING *S = AT0.SS;
3569 	WORD **poin, **poin2, ul, k, i, im, *m1, j;
3570 	WORD lpat, mpat, level, l1, l2, r1, r2, r3, c;
3571 	WORD *m2, *m3, r31, r33, ki, *rr;
3572 	UWORD *coef;
3573 	POSITION position;
3574 	FILEHANDLE *fin, *fout;
3575 #ifdef WITHSORTBOTS
3576 	if ( numberofworkers > 2 ) return(SortBotMasterMerge());
3577 #endif
3578 	fin = &S->file;
3579 	if ( AR0.PolyFun == 0 ) { S->PolyFlag = 0; }
3580 	else if ( AR0.PolyFunType == 1 ) { S->PolyFlag = 1; }
3581 	else if ( AR0.PolyFunType == 2 ) {
3582 		if ( AR0.PolyFunExp == 2
3583 		  || AR0.PolyFunExp == 3 ) S->PolyFlag = 1;
3584 		else                       S->PolyFlag = 2;
3585 	}
3586 	S->TermsLeft = 0;
3587 	coef = AN0.SoScratC;
3588 	poin = S->poina; poin2 = S->poin2a;
3589 	rr = AR0.CompressPointer;
3590 	*rr = 0;
3591 /*
3592  		#[ Setup :
3593 */
3594 	S->inNum = numberofthreads;
3595 	fout = AR0.outfile;
3596 /*
3597 	Load the patches. The threads have to finish their sort first.
3598 */
3599 	S->lPatch = S->inNum - 1;
3600 /*
3601 	Claim all zero blocks. We need them anyway.
3602 	In principle the workers should never get into these.
3603 	We also claim all last blocks. This is a safety procedure that
3604 	should prevent the workers from working their way around the clock
3605 	before the master gets started again.
3606 */
3607 	AS.MasterSort = 1;
3608 	numberclaimed = 0;
3609 	for ( i = 1; i <= S->lPatch; i++ ) {
3610 		B = AB[i];
3611 		LOCK(AT.SB.MasterBlockLock[0]);
3612 		LOCK(AT.SB.MasterBlockLock[AT.SB.MasterNumBlocks]);
3613 	}
3614 /*
3615 	Now wake up the threads and have them start their final sorting.
3616 	They should start with claiming their block and the master is
3617 	not allowed to continue until that has been done.
3618 	This waiting of the master will be done below in MasterWaitAllBlocks
3619 */
3620 	for ( i = 0; i < S->lPatch; i++ ) {
3621 		GetThread(i+1);
3622 		WakeupThread(i+1,FINISHEXPRESSION);
3623 	}
3624 /*
3625 	Prepare the output file.
3626 */
3627 	if ( fout->handle >= 0 ) {
3628 		PUTZERO(position);
3629 		SeekFile(fout->handle,&position,SEEK_END);
3630 		ADDPOS(position,((fout->POfill-fout->PObuffer)*sizeof(WORD)));
3631 	}
3632 	else {
3633 		SETBASEPOSITION(position,(fout->POfill-fout->PObuffer)*sizeof(WORD));
3634 	}
3635 /*
3636 	Wait for all threads to finish loading their first block.
3637 */
3638 	MasterWaitAllBlocks();
3639 /*
3640 	Claim all first blocks.
3641 	We don't release the last blocks.
3642 	The strategy is that we always keep the previous block.
3643 	In principle it looks like it isn't needed for the last block but
3644 	actually it is to keep the front from overrunning the tail when writing.
3645 */
3646 	for ( i = 1; i <= S->lPatch; i++ ) {
3647 		B = AB[i];
3648 		LOCK(AT.SB.MasterBlockLock[1]);
3649 		AT.SB.MasterBlock = 1;
3650 	}
3651 /*
3652  		#] Setup :
3653 
3654 	Now construct the tree:
3655 */
3656 	lpat = 1;
3657 	do { lpat <<= 1; } while ( lpat < S->lPatch );
3658 	mpat = ( lpat >> 1 ) - 1;
3659 	k = lpat - S->lPatch;
3660 /*
3661 	k is the number of empty places in the tree. they will
3662 	be at the even positions from 2 to 2*k
3663 */
3664 	for ( i = 1; i < lpat; i++ ) { S->tree[i] = -1; }
3665 	for ( i = 1; i <= k; i++ ) {
3666 		im = ( i * 2 ) - 1;
3667 		poin[im] = AB[i]->T.SB.MasterStart[AB[i]->T.SB.MasterBlock];
3668 		poin2[im] = poin[im] + *(poin[im]);
3669 		S->used[i] = im;
3670 		S->ktoi[im] = i-1;
3671 		S->tree[mpat+i] = 0;
3672 		poin[im-1] = poin2[im-1] = 0;
3673 	}
3674 	for ( i = (k*2)+1; i <= lpat; i++ ) {
3675 		S->used[i-k] = i;
3676 		S->ktoi[i] = i-k-1;
3677 		poin[i] = AB[i-k]->T.SB.MasterStart[AB[i-k]->T.SB.MasterBlock];
3678 		poin2[i] = poin[i] + *(poin[i]);
3679 	}
3680 /*
3681 	the array poin tells the position of the i-th element of the S->tree
3682 	'S->used' is a stack with the S->tree elements that need to be entered
3683 	into the S->tree. at the beginning this is S->lPatch. during the
3684 	sort there will be only very few elements.
3685 	poin2 is the next value of poin. it has to be determined
3686 	before the comparisons as the position or the size of the
3687 	term indicated by poin may change.
3688 	S->ktoi translates a S->tree element back to its stream number.
3689 
3690 	start the sort
3691 */
3692 	level = S->lPatch;
3693 /*
3694 	introduce one term
3695 */
3696 OneTerm:
3697 	k = S->used[level];
3698 	i = k + lpat - 1;
3699 	if ( !*(poin[k]) ) {
3700 		do { if ( !( i >>= 1 ) ) goto EndOfMerge; } while ( !S->tree[i] );
3701 		if ( S->tree[i] == -1 ) {
3702 			S->tree[i] = 0;
3703 			level--;
3704 			goto OneTerm;
3705 		}
3706 		k = S->tree[i];
3707 		S->used[level] = k;
3708 		S->tree[i] = 0;
3709 	}
3710 /*
3711 	move terms down the tree
3712 */
3713 	while ( i >>= 1 ) {
3714 		if ( S->tree[i] > 0 ) {
3715 /*
3716 			In the old setup we had here B0 for the first argument
3717 */
3718 			if ( ( c = CompareTerms(poin[S->tree[i]],poin[k],(WORD)0) ) > 0 ) {
3719 /*
3720 				S->tree[i] is the smaller. Exchange and go on.
3721 */
3722 				S->used[level] = S->tree[i];
3723 				S->tree[i] = k;
3724 				k = S->used[level];
3725 			}
3726 			else if ( !c ) {	/* Terms are equal */
3727 /*
3728 				S->TermsLeft--;
3729 					Here the terms are equal and their coefficients
3730 					have to be added.
3731 */
3732 				l1 = *( m1 = poin[S->tree[i]] );
3733 				l2 = *( m2 = poin[k] );
3734 				if ( S->PolyWise ) {  /* Here we work with PolyFun */
3735 					WORD *tt1, *w;
3736 					tt1 = m1;
3737 					m1 += S->PolyWise;
3738 					m2 += S->PolyWise;
3739 					if ( S->PolyFlag == 2 ) {
3740 						w = poly_ratfun_add(B0,m1,m2);
3741 						if ( *tt1 + w[1] - m1[1] > AM.MaxTer/((LONG)sizeof(WORD)) ) {
3742 							MLOCK(ErrorMessageLock);
3743 							MesPrint("Term too complex in PolyRatFun addition. MaxTermSize of %10l is too small",AM.MaxTer);
3744 							MUNLOCK(ErrorMessageLock);
3745 							Terminate(-1);
3746 						}
3747 						AT0.WorkPointer = w;
3748 						if ( w[FUNHEAD] == -SNUMBER && w[FUNHEAD+1] == 0 && w[1] > FUNHEAD ) {
3749 							goto cancelled;
3750 						}
3751 					}
3752 					else {
3753 						w = AT0.WorkPointer;
3754 						if ( w + m1[1] + m2[1] > AT0.WorkTop ) {
3755 							MLOCK(ErrorMessageLock);
3756 							MesPrint("MasterMerge: A WorkSpace of %10l is too small",AM.WorkSize);
3757 							MUNLOCK(ErrorMessageLock);
3758 							Terminate(-1);
3759 						}
3760 						AddArgs(B0,m1,m2,w);
3761 					}
3762 					r1 = w[1];
3763 					if ( r1 <= FUNHEAD
3764 						|| ( w[FUNHEAD] == -SNUMBER && w[FUNHEAD+1] == 0 ) )
3765 							 { goto cancelled; }
3766 					if ( r1 == m1[1] ) {
3767 						NCOPY(m1,w,r1);
3768 					}
3769 					else if ( r1 < m1[1] ) {
3770 						r2 = m1[1] - r1;
3771 						m2 = w + r1;
3772 						m1 += m1[1];
3773 						while ( --r1 >= 0 ) *--m1 = *--m2;
3774 						m2 = m1 - r2;
3775 						r1 = S->PolyWise;
3776 						while ( --r1 >= 0 ) *--m1 = *--m2;
3777 						*m1 -= r2;
3778 						poin[S->tree[i]] = m1;
3779 					}
3780 					else {
3781 						r2 = r1 - m1[1];
3782 						m2 = tt1 - r2;
3783 						r1 = S->PolyWise;
3784 						m1 = tt1;
3785 						*m1 += r2;
3786 						poin[S->tree[i]] = m2;
3787 						NCOPY(m2,m1,r1);
3788 						r1 = w[1];
3789 						NCOPY(m2,w,r1);
3790 					}
3791 				}
3792 				else {
3793 					r1 = *( m1 += l1 - 1 );
3794 					m1 -= ABS(r1) - 1;
3795 					r1 = ( ( r1 > 0 ) ? (r1-1) : (r1+1) ) >> 1;
3796 					r2 = *( m2 += l2 - 1 );
3797 					m2 -= ABS(r2) - 1;
3798 					r2 = ( ( r2 > 0 ) ? (r2-1) : (r2+1) ) >> 1;
3799 
3800 					if ( AddRat(B0,(UWORD *)m1,r1,(UWORD *)m2,r2,coef,&r3) ) {
3801 						MLOCK(ErrorMessageLock);
3802 						MesCall("MasterMerge");
3803 						MUNLOCK(ErrorMessageLock);
3804 						SETERROR(-1)
3805 					}
3806 
3807 					if ( AN.ncmod != 0 ) {
3808 						if ( ( AC.modmode & POSNEG ) != 0 ) {
3809 							NormalModulus(coef,&r3);
3810 						}
3811 						else if ( BigLong(coef,r3,(UWORD *)AC.cmod,ABS(AN.ncmod)) >= 0 ) {
3812 							WORD ii;
3813 							SubPLon(coef,r3,(UWORD *)AC.cmod,ABS(AN.ncmod),coef,&r3);
3814 							coef[r3] = 1;
3815 							for ( ii = 1; ii < r3; ii++ ) coef[r3+ii] = 0;
3816 						}
3817 					}
3818 					r3 *= 2;
3819 					r33 = ( r3 > 0 ) ? ( r3 + 1 ) : ( r3 - 1 );
3820 					if ( r3 < 0 ) r3 = -r3;
3821 					if ( r1 < 0 ) r1 = -r1;
3822 					r1 *= 2;
3823 					r31 = r3 - r1;
3824 					if ( !r3 ) {		/* Terms cancel */
3825 cancelled:
3826 						ul = S->used[level] = S->tree[i];
3827 						S->tree[i] = -1;
3828 /*
3829 						We skip to the next term in stream ul
3830 */
3831 						im = *poin2[ul];
3832 						poin[ul] = poin2[ul];
3833 						ki = S->ktoi[ul];
3834 						if ( (poin[ul] + im + COMPINC) >=
3835 						AB[ki+1]->T.SB.MasterStop[AB[ki+1]->T.SB.MasterBlock]
3836 						&& im > 0 ) {
3837 /*
3838 							We made it to the end of the block. We have to
3839 							release the previous block and claim the next.
3840 */
3841 							B = AB[ki+1];
3842 							i = AT.SB.MasterBlock;
3843 							if ( i == 1 ) {
3844 								UNLOCK(AT.SB.MasterBlockLock[AT.SB.MasterNumBlocks]);
3845 							}
3846 							else {
3847 								UNLOCK(AT.SB.MasterBlockLock[i-1]);
3848 							}
3849 							if ( i == AT.SB.MasterNumBlocks ) {
3850 /*
3851 								Move the remainder down into block 0
3852 */
3853 								WORD *from, *to;
3854 								to = AT.SB.MasterStart[1];
3855 								from = AT.SB.MasterStop[i];
3856 								while ( from > poin[ul] ) *--to = *--from;
3857 								poin[ul] = to;
3858 								i = 1;
3859 							}
3860 							else { i++; }
3861 							LOCK(AT.SB.MasterBlockLock[i]);
3862 							AT.SB.MasterBlock = i;
3863 							poin2[ul] = poin[ul] + im;
3864 						}
3865 						else {
3866 							poin2[ul] += im;
3867 						}
3868 						S->used[++level] = k;
3869 /*						S->TermsLeft--; */
3870 					}
3871 					else if ( !r31 ) {		/* copy coef into term1 */
3872 						goto CopCof2;
3873 					}
3874 					else if ( r31 < 0 ) {		/* copy coef into term1
3875 											and adjust the length of term1 */
3876 						goto CopCoef;
3877 					}
3878 					else {
3879 /*
3880 							this is the dreaded calamity.
3881 							is there enough space?
3882 */
3883 						if( (poin[S->tree[i]]+l1+r31) >= poin2[S->tree[i]] ) {
3884 /*
3885 								no space! now the special trick for which
3886 								we left 2*maxlng spaces open at the beginning
3887 								of each patch.
3888 */
3889 							if ( (l1 + r31)*((LONG)sizeof(WORD)) >= AM.MaxTer ) {
3890 								MLOCK(ErrorMessageLock);
3891 								MesPrint("MasterMerge: Coefficient overflow during sort");
3892 								MUNLOCK(ErrorMessageLock);
3893 								goto ReturnError;
3894 							}
3895 							m2 = poin[S->tree[i]];
3896 							m3 = ( poin[S->tree[i]] -= r31 );
3897 							do { *m3++ = *m2++; } while ( m2 < m1 );
3898 							m1 = m3;
3899 						}
3900 CopCoef:
3901 						*(poin[S->tree[i]]) += r31;
3902 CopCof2:
3903 						m2 = (WORD *)coef; im = r3;
3904 						NCOPY(m1,m2,im);
3905 						*m1 = r33;
3906 					}
3907 				}
3908 /*
3909 				Now skip to the next term in stream k.
3910 */
3911 NextTerm:
3912 				im = poin2[k][0];
3913 				poin[k] = poin2[k];
3914 				ki = S->ktoi[k];
3915 				if ( (poin[k] + im + COMPINC) >=
3916 				AB[ki+1]->T.SB.MasterStop[AB[ki+1]->T.SB.MasterBlock]
3917 				&& im > 0 ) {
3918 /*
3919 				We made it to the end of the block. We have to
3920 				release the previous block and claim the next.
3921 */
3922 					B = AB[ki+1];
3923 					i = AT.SB.MasterBlock;
3924 					if ( i == 1 ) {
3925 						UNLOCK(AT.SB.MasterBlockLock[AT.SB.MasterNumBlocks]);
3926 					}
3927 					else {
3928 						UNLOCK(AT.SB.MasterBlockLock[i-1]);
3929 					}
3930 					if ( i == AT.SB.MasterNumBlocks ) {
3931 /*
3932 						Move the remainder down into block 0
3933 */
3934 						WORD *from, *to;
3935 						to = AT.SB.MasterStart[1];
3936 						from = AT.SB.MasterStop[i];
3937 						while ( from > poin[k] ) *--to = *--from;
3938 						poin[k] = to;
3939 						i = 1;
3940 					}
3941 					else { i++; }
3942 					LOCK(AT.SB.MasterBlockLock[i]);
3943 					AT.SB.MasterBlock = i;
3944 					poin2[k] = poin[k] + im;
3945 				}
3946 				else {
3947 					poin2[k] += im;
3948 				}
3949 				goto OneTerm;
3950 			}
3951 		}
3952 		else if ( S->tree[i] < 0 ) {
3953 			S->tree[i] = k;
3954 			level--;
3955 			goto OneTerm;
3956 		}
3957 	}
3958 /*
3959 	found the smallest in the set. indicated by k.
3960 	write to its destination.
3961 */
3962 	S->TermsLeft++;
3963 	if ( ( im = PutOut(B0,poin[k],&position,fout,1) ) < 0 ) {
3964 		MLOCK(ErrorMessageLock);
3965 		MesPrint("Called from MasterMerge with k = %d (stream %d)",k,S->ktoi[k]);
3966 		MUNLOCK(ErrorMessageLock);
3967 		goto ReturnError;
3968 	}
3969 	ADDPOS(S->SizeInFile[0],im);
3970 	goto NextTerm;
3971 EndOfMerge:
3972 	if ( FlushOut(&position,fout,1) ) goto ReturnError;
3973 	ADDPOS(S->SizeInFile[0],1);
3974 	CloseFile(fin->handle);
3975 	remove(fin->name);
3976 	fin->handle = -1;
3977 	position = S->SizeInFile[0];
3978 	MULPOS(position,sizeof(WORD));
3979 	S->GenTerms = 0;
3980 	for ( j = 1; j <= numberofworkers; j++ ) {
3981 		S->GenTerms += AB[j]->T.SS->GenTerms;
3982 	}
3983 	WriteStats(&position,2);
3984 	Expressions[AR0.CurExpr].counter = S->TermsLeft;
3985 	Expressions[AR0.CurExpr].size = position;
3986 /*
3987 	Release all locks
3988 */
3989 	for ( i = 1; i <= S->lPatch; i++ ) {
3990 		B = AB[i];
3991 		UNLOCK(AT.SB.MasterBlockLock[0]);
3992 		if ( AT.SB.MasterBlock == 1 ) {
3993 			UNLOCK(AT.SB.MasterBlockLock[AT.SB.MasterNumBlocks]);
3994 		}
3995 		else {
3996 			UNLOCK(AT.SB.MasterBlockLock[AT.SB.MasterBlock-1]);
3997 		}
3998 		UNLOCK(AT.SB.MasterBlockLock[AT.SB.MasterBlock]);
3999 	}
4000 	AS.MasterSort = 0;
4001 	return(0);
4002 ReturnError:
4003 	for ( i = 1; i <= S->lPatch; i++ ) {
4004 		B = AB[i];
4005 		UNLOCK(AT.SB.MasterBlockLock[0]);
4006 		if ( AT.SB.MasterBlock == 1 ) {
4007 			UNLOCK(AT.SB.MasterBlockLock[AT.SB.MasterNumBlocks]);
4008 		}
4009 		else {
4010 			UNLOCK(AT.SB.MasterBlockLock[AT.SB.MasterBlock-1]);
4011 		}
4012 		UNLOCK(AT.SB.MasterBlockLock[AT.SB.MasterBlock]);
4013 	}
4014 	AS.MasterSort = 0;
4015 	return(-1);
4016 }
4017 
4018 /*
4019   	#] MasterMerge :
4020   	#[ SortBotMasterMerge :
4021 */
4022 
4023 #ifdef WITHSORTBOTS
4024 
4025 /**
4026  *	This is the master routine for the final stage in a sortbot merge.
4027  *	A sortbot merge is a merge in which the output of two workers is
4028  *	merged into a single output which then can be given as one of two
4029  *	streams to another sortbot. The idea is that each sortbot is responsible
4030  *	for one one compare per term. In the end the master does the last
4031  *	merge of only two streams and writes the result to the output.
4032  *	There doesn't seem to be an advantage to splitting this last task.
4033  *
4034  *	The use of the sortbots gives a measurable improvement but it isn't
4035  *	optimal yet.
4036  *
4037  *	This routine is run as master. Hence B = B0. Etc.
4038  */
4039 
SortBotMasterMerge()4040 int SortBotMasterMerge()
4041 {
4042 	FILEHANDLE *fin, *fout;
4043 	ALLPRIVATES *B = AB[0], *BB;
4044 	POSITION position;
4045 	SORTING *S = AT.SS;
4046 	int i, j;
4047 /*
4048 	Get the sortbots get to claim their writing blocks.
4049 	We have to wait till all have been claimed because they also have to
4050 	claim the last writing blocks of the workers to prevent the head of
4051 	the circular buffer to overrun the tail.
4052 
4053 	Before waiting we can do some needed initializations.
4054 	Also the master has to claim the last writing blocks of its input.
4055 */
4056 	topsortbotavailables = 0;
4057 	for ( i = numberofworkers+1; i <= numberofworkers+numberofsortbots; i++ ) {
4058 		WakeupThread(i,INISORTBOT);
4059 	}
4060 
4061 	AS.MasterSort = 1;
4062 	fout = AR.outfile;
4063 	numberofterms = 0;
4064 	AR.CompressPointer[0] = 0;
4065 	numberclaimed = 0;
4066 	BB = AB[AT.SortBotIn1];
4067 	LOCK(BB->T.SB.MasterBlockLock[BB->T.SB.MasterNumBlocks]);
4068 	BB = AB[AT.SortBotIn2];
4069 	LOCK(BB->T.SB.MasterBlockLock[BB->T.SB.MasterNumBlocks]);
4070 
4071 	MasterWaitAllSortBots();
4072 /*
4073 	Now we can start up the workers. They will claim their writing blocks.
4074 	Here the master will wait till all writing blocks have been claimed.
4075 */
4076 	for ( i = 1; i <= numberofworkers; i++ ) {
4077 		j = GetThread(i);
4078 		WakeupThread(i,FINISHEXPRESSION);
4079 	}
4080 /*
4081 	Prepare the output file in the mean time.
4082 */
4083 	if ( fout->handle >= 0 ) {
4084 		PUTZERO(SortBotPosition);
4085 		SeekFile(fout->handle,&SortBotPosition,SEEK_END);
4086 		ADDPOS(SortBotPosition,((fout->POfill-fout->PObuffer)*sizeof(WORD)));
4087 	}
4088 	else {
4089 		SETBASEPOSITION(SortBotPosition,(fout->POfill-fout->PObuffer)*sizeof(WORD));
4090 	}
4091 	MasterWaitAllBlocks();
4092 /*
4093 	Now we can start the sortbots after which the master goes in
4094 	sortbot mode to do its part of the job (the very final merge and
4095 	the writing to output file).
4096 */
4097 	topsortbotavailables = 0;
4098 	for ( i = numberofworkers+1; i <= numberofworkers+numberofsortbots; i++ ) {
4099 		WakeupThread(i,RUNSORTBOT);
4100 	}
4101 	if ( SortBotMerge(BHEAD0) ) {
4102 		MLOCK(ErrorMessageLock);
4103 		MesPrint("Called from SortBotMasterMerge");
4104 		MUNLOCK(ErrorMessageLock);
4105 		AS.MasterSort = 0;
4106 		return(-1);
4107 	}
4108 /*
4109 	And next the cleanup
4110 */
4111 	if ( S->file.handle >= 0 )
4112 	{
4113 		fin = &S->file;
4114 		CloseFile(fin->handle);
4115 		remove(fin->name);
4116 		fin->handle = -1;
4117 	}
4118 	position = S->SizeInFile[0];
4119 	MULPOS(position,sizeof(WORD));
4120 	S->GenTerms = 0;
4121 	for ( j = 1; j <= numberofworkers; j++ ) {
4122 		S->GenTerms += AB[j]->T.SS->GenTerms;
4123 	}
4124 	S->TermsLeft = numberofterms;
4125 	WriteStats(&position,2);
4126 	Expressions[AR.CurExpr].counter = S->TermsLeft;
4127 	Expressions[AR.CurExpr].size = position;
4128 	AS.MasterSort = 0;
4129 /*
4130 	The next statement is to prevent one of the sortbots not having
4131 	completely cleaned up before the next module starts.
4132 	If this statement is omitted every once in a while one of the sortbots
4133 	is still running when the next expression starts and misses its
4134 	initialization. The result is usually disastrous.
4135 */
4136 	MasterWaitAllSortBots();
4137 
4138 	return(0);
4139 }
4140 
4141 #endif
4142 
4143 /*
4144   	#] SortBotMasterMerge :
4145   	#[ SortBotMerge :
4146 */
4147 
4148 #ifdef WITHSORTBOTS
4149 
4150 /**
4151  *	This routine is run by a sortbot and merges two sorted output streams into
4152  *	a single sorted stream.
4153  */
4154 
SortBotMerge(PHEAD0)4155 int SortBotMerge(PHEAD0)
4156 {
4157 	GETBIDENTITY
4158 	ALLPRIVATES *Bin1 = AB[AT.SortBotIn1],*Bin2 = AB[AT.SortBotIn2];
4159 	WORD *term1, *term2, *next, *wp;
4160 	int blin1, blin2;	/* Current block numbers */
4161 	int error = 0;
4162 	WORD l1, l2, *m1, *m2, *w, r1, r2, r3, r33, r31, *tt1, ii;
4163 	WORD *to, *from, im, c;
4164 	UWORD *coef;
4165 	SORTING *S = AT.SS;
4166 /*
4167 	Set the pointers to the input terms and the output space
4168 */
4169 	coef = AN.SoScratC;
4170 	blin1 = 1;
4171 	blin2 = 1;
4172 	if ( AT.identity == 0 ) {
4173 		wp = AT.WorkPointer;
4174 	}
4175 	else {
4176 		wp = AT.WorkPointer = AT.WorkSpace;
4177 	}
4178 /*
4179 	Get the locks for reading the input
4180 	This means that we can start once these locks have been cleared
4181 	which means that there will be input.
4182 */
4183 	LOCK(Bin1->T.SB.MasterBlockLock[blin1]);
4184 	LOCK(Bin2->T.SB.MasterBlockLock[blin2]);
4185 
4186 	term1 = Bin1->T.SB.MasterStart[blin1];
4187 	term2 = Bin2->T.SB.MasterStart[blin2];
4188 	AT.SB.FillBlock = 1;
4189 /*
4190 	Now the main loop. Keep going until one of the two hits the end.
4191 */
4192 	while ( *term1 && *term2 ) {
4193 		if ( ( c = CompareTerms(term1,term2,(WORD)0) ) > 0 ) {
4194 /*
4195 			#[ One is smallest :
4196 */
4197 			if ( SortBotOut(BHEAD term1) < 0 ) {
4198 				MLOCK(ErrorMessageLock);
4199 				MesPrint("Called from SortBotMerge with thread = %d",AT.identity);
4200 				MUNLOCK(ErrorMessageLock);
4201 				error = -1;
4202 				goto ReturnError;
4203 			}
4204 			im = *term1;
4205 			next = term1 + im;
4206 			if ( next >= Bin1->T.SB.MasterStop[blin1] || ( *next &&
4207 			next+*next+COMPINC > Bin1->T.SB.MasterStop[blin1] ) ) {
4208 				if ( blin1 == 1 ) {
4209 					UNLOCK(Bin1->T.SB.MasterBlockLock[Bin1->T.SB.MasterNumBlocks]);
4210 				}
4211 				else {
4212 					UNLOCK(Bin1->T.SB.MasterBlockLock[blin1-1]);
4213 				}
4214 				if ( blin1 == Bin1->T.SB.MasterNumBlocks ) {
4215 /*
4216 					Move the remainder down into block 0
4217 */
4218 					to = Bin1->T.SB.MasterStart[1];
4219 					from = Bin1->T.SB.MasterStop[Bin1->T.SB.MasterNumBlocks];
4220 					while ( from > next ) *--to = *--from;
4221 					next = to;
4222 					blin1 = 1;
4223 				}
4224 				else {
4225 					blin1++;
4226 				}
4227 				LOCK(Bin1->T.SB.MasterBlockLock[blin1]);
4228 				Bin1->T.SB.MasterBlock = blin1;
4229 			}
4230 			term1 = next;
4231 /*
4232 			#] One is smallest :
4233 */
4234 		}
4235 		else if ( c < 0 ) {
4236 /*
4237 			#[ Two is smallest :
4238 */
4239 			if ( SortBotOut(BHEAD term2) < 0 ) {
4240 				MLOCK(ErrorMessageLock);
4241 				MesPrint("Called from SortBotMerge with thread = %d",AT.identity);
4242 				MUNLOCK(ErrorMessageLock);
4243 				error = -1;
4244 				goto ReturnError;
4245 			}
4246 next2:		im = *term2;
4247 			next = term2 + im;
4248 			if ( next >= Bin2->T.SB.MasterStop[blin2] || ( *next
4249 			&& next+*next+COMPINC > Bin2->T.SB.MasterStop[blin2] ) ) {
4250 				if ( blin2 == 1 ) {
4251 					UNLOCK(Bin2->T.SB.MasterBlockLock[Bin2->T.SB.MasterNumBlocks]);
4252 				}
4253 				else {
4254 					UNLOCK(Bin2->T.SB.MasterBlockLock[blin2-1]);
4255 				}
4256 				if ( blin2 == Bin2->T.SB.MasterNumBlocks ) {
4257 /*
4258 					Move the remainder down into block 0
4259 */
4260 					to = Bin2->T.SB.MasterStart[1];
4261 					from = Bin2->T.SB.MasterStop[Bin2->T.SB.MasterNumBlocks];
4262 					while ( from > next ) *--to = *--from;
4263 					next = to;
4264 					blin2 = 1;
4265 				}
4266 				else {
4267 					blin2++;
4268 				}
4269 				LOCK(Bin2->T.SB.MasterBlockLock[blin2]);
4270 				Bin2->T.SB.MasterBlock = blin2;
4271 			}
4272 			term2 = next;
4273 /*
4274 			#] Two is smallest :
4275 */
4276 		}
4277 		else {
4278 /*
4279 			#[ Equal :
4280 */
4281 			l1 = *( m1 = term1 );
4282 			l2 = *( m2 = term2 );
4283 			if ( S->PolyWise ) {  /* Here we work with PolyFun */
4284 				tt1 = m1;
4285 				m1 += S->PolyWise;
4286 				m2 += S->PolyWise;
4287 				if ( S->PolyFlag == 2 ) {
4288 					AT.WorkPointer = wp;
4289 					w = poly_ratfun_add(BHEAD m1,m2);
4290 					if ( *tt1 + w[1] - m1[1] > AM.MaxTer/((LONG)sizeof(WORD)) ) {
4291 						MLOCK(ErrorMessageLock);
4292 						MesPrint("Term too complex in PolyRatFun addition. MaxTermSize of %10l is too small",AM.MaxTer);
4293 						MUNLOCK(ErrorMessageLock);
4294 						Terminate(-1);
4295 					}
4296 					AT.WorkPointer = wp;
4297 					if ( w[FUNHEAD] == -SNUMBER && w[FUNHEAD+1] == 0 && w[1] > FUNHEAD ) {
4298 						goto cancelled;
4299 					}
4300 				}
4301 				else {
4302 					w = wp;
4303 					if ( w + m1[1] + m2[1] > AT.WorkTop ) {
4304 						MLOCK(ErrorMessageLock);
4305 						MesPrint("SortBotMerge(%d): A Maxtermsize of %10l is too small",
4306 								AT.identity,AM.MaxTer/sizeof(WORD));
4307 						MesPrint("m1[1] = %d, m2[1] = %d, Space = %l",m1[1],m2[1],(LONG)(AT.WorkTop-wp));
4308 						PrintTerm(term1,"term1");
4309 						PrintTerm(term2,"term2");
4310 						MesPrint("PolyWise = %d",S->PolyWise);
4311 						MUNLOCK(ErrorMessageLock);
4312 						Terminate(-1);
4313 					}
4314 					AddArgs(BHEAD m1,m2,w);
4315 				}
4316 				r1 = w[1];
4317 				if ( r1 <= FUNHEAD
4318 					|| ( w[FUNHEAD] == -SNUMBER && w[FUNHEAD+1] == 0 ) )
4319 						 { goto cancelled; }
4320 				if ( r1 == m1[1] ) {
4321 					NCOPY(m1,w,r1);
4322 				}
4323 				else if ( r1 < m1[1] ) {
4324 					r2 = m1[1] - r1;
4325 					m2 = w + r1;
4326 					m1 += m1[1];
4327 					while ( --r1 >= 0 ) *--m1 = *--m2;
4328 					m2 = m1 - r2;
4329 					r1 = S->PolyWise;
4330 					while ( --r1 >= 0 ) *--m1 = *--m2;
4331 					*m1 -= r2;
4332 					term1 = m1;
4333 				}
4334 				else {
4335 					r2 = r1 - m1[1];
4336 					m2 = tt1 - r2;
4337 					r1 = S->PolyWise;
4338 					m1 = tt1;
4339 					*m1 += r2;
4340 					term1 = m2;
4341 					NCOPY(m2,m1,r1);
4342 					r1 = w[1];
4343 					NCOPY(m2,w,r1);
4344 				}
4345 			}
4346 			else {
4347 				r1 = *( m1 += l1 - 1 );
4348 				m1 -= ABS(r1) - 1;
4349 				r1 = ( ( r1 > 0 ) ? (r1-1) : (r1+1) ) >> 1;
4350 				r2 = *( m2 += l2 - 1 );
4351 				m2 -= ABS(r2) - 1;
4352 				r2 = ( ( r2 > 0 ) ? (r2-1) : (r2+1) ) >> 1;
4353 
4354 				if ( AddRat(BHEAD (UWORD *)m1,r1,(UWORD *)m2,r2,coef,&r3) ) {
4355 					MLOCK(ErrorMessageLock);
4356 					MesCall("SortBotMerge");
4357 					MUNLOCK(ErrorMessageLock);
4358 					SETERROR(-1)
4359 				}
4360 
4361 				if ( AN.ncmod != 0 ) {
4362 					if ( ( AC.modmode & POSNEG ) != 0 ) {
4363 						NormalModulus(coef,&r3);
4364 					}
4365 					else if ( BigLong(coef,r3,(UWORD *)AC.cmod,ABS(AN.ncmod)) >= 0 ) {
4366 						SubPLon(coef,r3,(UWORD *)AC.cmod,ABS(AN.ncmod),coef,&r3);
4367 						coef[r3] = 1;
4368 						for ( ii = 1; ii < r3; ii++ ) coef[r3+ii] = 0;
4369 					}
4370 				}
4371 				if ( !r3 ) { goto cancelled; }
4372 				r3 *= 2;
4373 				r33 = ( r3 > 0 ) ? ( r3 + 1 ) : ( r3 - 1 );
4374 				if ( r3 < 0 ) r3 = -r3;
4375 				if ( r1 < 0 ) r1 = -r1;
4376 				r1 *= 2;
4377 				r31 = r3 - r1;
4378 				if ( !r31 ) {		/* copy coef into term1 */
4379 					m2 = (WORD *)coef; im = r3;
4380 					NCOPY(m1,m2,im);
4381 					*m1 = r33;
4382 				}
4383 /*
4384 				else if ( r31 < 0 ) {
4385 					*term1 += r31;
4386 					m2 = (WORD *)coef; im = r3;
4387 					NCOPY(m1,m2,im);
4388 					*m1 = r33;
4389 				}
4390 */
4391 				else {
4392 					to = wp; from = term1;
4393 					while ( from < m1 ) *to++ = *from++;
4394 					from = (WORD *)coef; im = r3;
4395 					NCOPY(to,from,im);
4396 					*to++ = r33;
4397 					wp[0] = to - wp;
4398 					if ( SortBotOut(BHEAD wp) < 0 ) {
4399 						MLOCK(ErrorMessageLock);
4400 						MesPrint("Called from SortBotMerge with thread = %d",AT.identity);
4401 						MUNLOCK(ErrorMessageLock);
4402 						error = -1;
4403 						goto ReturnError;
4404 					}
4405 					goto cancelled;
4406 				}
4407 			}
4408 			if ( SortBotOut(BHEAD term1) < 0 ) {
4409 				MLOCK(ErrorMessageLock);
4410 				MesPrint("Called from SortBotMerge with thread = %d",AT.identity);
4411 				MUNLOCK(ErrorMessageLock);
4412 				error = -1;
4413 				goto ReturnError;
4414 			}
4415 cancelled:;		/* Now we need two new terms */
4416 			im = *term1;
4417 			next = term1 + im;
4418 			if ( next >= Bin1->T.SB.MasterStop[blin1] || ( *next &&
4419 			next+*next+COMPINC > Bin1->T.SB.MasterStop[blin1] ) ) {
4420 				if ( blin1 == 1 ) {
4421 					UNLOCK(Bin1->T.SB.MasterBlockLock[Bin1->T.SB.MasterNumBlocks]);
4422 				}
4423 				else {
4424 					UNLOCK(Bin1->T.SB.MasterBlockLock[blin1-1]);
4425 				}
4426 				if ( blin1 == Bin1->T.SB.MasterNumBlocks ) {
4427 /*
4428 					Move the remainder down into block 0
4429 */
4430 					to = Bin1->T.SB.MasterStart[1];
4431 					from = Bin1->T.SB.MasterStop[Bin1->T.SB.MasterNumBlocks];
4432 					while ( from > next ) *--to = *--from;
4433 					next = to;
4434 					blin1 = 1;
4435 				}
4436 				else {
4437 					blin1++;
4438 				}
4439 				LOCK(Bin1->T.SB.MasterBlockLock[blin1]);
4440 				Bin1->T.SB.MasterBlock = blin1;
4441 			}
4442 			term1 = next;
4443 			goto next2;
4444 /*
4445 			#] Equal :
4446 */
4447 		}
4448 	}
4449 /*
4450 	Copy the tail
4451 */
4452 	if ( *term1 ) {
4453 /*
4454 			#[ Tail in one :
4455 */
4456 		while ( *term1 ) {
4457 			if ( SortBotOut(BHEAD term1) < 0 ) {
4458 				MLOCK(ErrorMessageLock);
4459 				MesPrint("Called from SortBotMerge with thread = %d",AT.identity);
4460 				MUNLOCK(ErrorMessageLock);
4461 				error = -1;
4462 				goto ReturnError;
4463 			}
4464 			im = *term1;
4465 			next = term1 + im;
4466 			if ( next >= Bin1->T.SB.MasterStop[blin1] || ( *next &&
4467 			next+*next+COMPINC > Bin1->T.SB.MasterStop[blin1] ) ) {
4468 				if ( blin1 == 1 ) {
4469 					UNLOCK(Bin1->T.SB.MasterBlockLock[Bin1->T.SB.MasterNumBlocks]);
4470 				}
4471 				else {
4472 					UNLOCK(Bin1->T.SB.MasterBlockLock[blin1-1]);
4473 				}
4474 				if ( blin1 == Bin1->T.SB.MasterNumBlocks ) {
4475 /*
4476 					Move the remainder down into block 0
4477 */
4478 					to = Bin1->T.SB.MasterStart[1];
4479 					from = Bin1->T.SB.MasterStop[Bin1->T.SB.MasterNumBlocks];
4480 					while ( from > next ) *--to = *--from;
4481 					next = to;
4482 					blin1 = 1;
4483 				}
4484 				else {
4485 					blin1++;
4486 				}
4487 				LOCK(Bin1->T.SB.MasterBlockLock[blin1]);
4488 				Bin1->T.SB.MasterBlock = blin1;
4489 			}
4490 			term1 = next;
4491 		}
4492 /*
4493 			#] Tail in one :
4494 */
4495 	}
4496 	else if ( *term2 ) {
4497 /*
4498 			#[ Tail in two :
4499 */
4500 		while ( *term2 ) {
4501 			if ( SortBotOut(BHEAD term2) < 0 ) {
4502 				MLOCK(ErrorMessageLock);
4503 				MesPrint("Called from SortBotMerge with thread = %d",AT.identity);
4504 				MUNLOCK(ErrorMessageLock);
4505 				error = -1;
4506 				goto ReturnError;
4507 			}
4508 			im = *term2;
4509 			next = term2 + im;
4510 			if ( next >= Bin2->T.SB.MasterStop[blin2] || ( *next
4511 			&& next+*next+COMPINC > Bin2->T.SB.MasterStop[blin2] ) ) {
4512 				if ( blin2 == 1 ) {
4513 					UNLOCK(Bin2->T.SB.MasterBlockLock[Bin2->T.SB.MasterNumBlocks]);
4514 				}
4515 				else {
4516 					UNLOCK(Bin2->T.SB.MasterBlockLock[blin2-1]);
4517 				}
4518 				if ( blin2 == Bin2->T.SB.MasterNumBlocks ) {
4519 /*
4520 					Move the remainder down into block 0
4521 */
4522 					to = Bin2->T.SB.MasterStart[1];
4523 					from = Bin2->T.SB.MasterStop[Bin2->T.SB.MasterNumBlocks];
4524 					while ( from > next ) *--to = *--from;
4525 					next = to;
4526 					blin2 = 1;
4527 				}
4528 				else {
4529 					blin2++;
4530 				}
4531 				LOCK(Bin2->T.SB.MasterBlockLock[blin2]);
4532 				Bin2->T.SB.MasterBlock = blin2;
4533 			}
4534 			term2 = next;
4535 		}
4536 /*
4537 			#] Tail in two :
4538 */
4539 	}
4540 	SortBotOut(BHEAD 0);
4541 ReturnError:;
4542 /*
4543 	Release all locks
4544 */
4545 	UNLOCK(Bin1->T.SB.MasterBlockLock[blin1]);
4546 	if ( blin1 > 1 ) {
4547 		UNLOCK(Bin1->T.SB.MasterBlockLock[blin1-1]);
4548 	}
4549 	else {
4550 		UNLOCK(Bin1->T.SB.MasterBlockLock[Bin1->T.SB.MasterNumBlocks]);
4551 	}
4552 	UNLOCK(Bin2->T.SB.MasterBlockLock[blin2]);
4553 	if ( blin2 > 1 ) {
4554 		UNLOCK(Bin2->T.SB.MasterBlockLock[blin2-1]);
4555 	}
4556 	else {
4557 		UNLOCK(Bin2->T.SB.MasterBlockLock[Bin2->T.SB.MasterNumBlocks]);
4558 	}
4559 	if ( AT.identity > 0 ) {
4560 		UNLOCK(AT.SB.MasterBlockLock[AT.SB.FillBlock]);
4561 	}
4562 /*
4563 	And that was all folks
4564 */
4565 	return(error);
4566 }
4567 
4568 #endif
4569 
4570 /*
4571   	#] SortBotMerge :
4572   	#[ IniSortBlocks :
4573 */
4574 
4575 static int SortBlocksInitialized = 0;
4576 
4577 /**
4578  *	Initializes the blocks in the sort buffers of the master.
4579  *	These blocks are needed to keep both the workers and the master working
4580  *	simultaneously. See also the commentary at the routine MasterMerge.
4581  */
4582 
IniSortBlocks(int numworkers)4583 int IniSortBlocks(int numworkers)
4584 {
4585 	ALLPRIVATES *B;
4586 	SORTING *S;
4587 	LONG totalsize, workersize, blocksize, numberofterms;
4588 	int maxter, id, j;
4589 	int numberofblocks = NUMBEROFBLOCKSINSORT, numparts;
4590 	WORD *w;
4591 
4592 	if ( SortBlocksInitialized ) return(0);
4593 	SortBlocksInitialized = 1;
4594 	if ( numworkers == 0 ) return(0);
4595 
4596 #ifdef WITHSORTBOTS
4597 	if ( numworkers > 2 ) {
4598 		numparts = 2*numworkers - 2;
4599 		numberofblocks = numberofblocks/2;
4600 	}
4601 	else {
4602 		numparts = numworkers;
4603 	}
4604 #else
4605 	numparts = numworkers;
4606 #endif
4607 	S = AM.S0;
4608 	totalsize = S->LargeSize + S->SmallEsize;
4609 	workersize = totalsize / numparts;
4610 	maxter = AM.MaxTer/sizeof(WORD);
4611 	blocksize = ( workersize - maxter )/numberofblocks;
4612 	numberofterms = blocksize / maxter;
4613 	if ( numberofterms < MINIMUMNUMBEROFTERMS ) {
4614 /*
4615 		This should have been taken care of in RecalcSetups.
4616 */
4617 		MesPrint("We have a problem with the size of the blocks in IniSortBlocks");
4618 		Terminate(-1);
4619 	}
4620 /*
4621 	Layout:  For each worker
4622 				block 0: size is maxter WORDS
4623 				numberofblocks blocks of size blocksize WORDS
4624 */
4625 	w = S->lBuffer;
4626 	if ( w == 0 ) w = S->sBuffer;
4627 	for ( id = 1; id <= numparts; id++ ) {
4628 		B = AB[id];
4629 		AT.SB.MasterBlockLock = (pthread_mutex_t *)Malloc1(
4630 			sizeof(pthread_mutex_t)*(numberofblocks+1),"MasterBlockLock");
4631 		AT.SB.MasterStart = (WORD **)Malloc1(sizeof(WORD *)*(numberofblocks+1)*3,"MasterBlock");
4632 		AT.SB.MasterFill = AT.SB.MasterStart + (numberofblocks+1);
4633 		AT.SB.MasterStop = AT.SB.MasterFill  + (numberofblocks+1);
4634 		AT.SB.MasterNumBlocks = numberofblocks;
4635 		AT.SB.MasterBlock = 0;
4636 		AT.SB.FillBlock = 0;
4637 		AT.SB.MasterFill[0] = AT.SB.MasterStart[0] = w;
4638 		w += maxter;
4639 		AT.SB.MasterStop[0] = w;
4640 		AT.SB.MasterBlockLock[0] = dummylock;
4641 		for ( j = 1; j <= numberofblocks; j++ ) {
4642 			AT.SB.MasterFill[j] = AT.SB.MasterStart[j] = w;
4643 			w += blocksize;
4644 			AT.SB.MasterStop[j] = w;
4645 			AT.SB.MasterBlockLock[j] = dummylock;
4646 		}
4647 	}
4648 	if ( w > S->sTop2 ) {
4649 		MesPrint("Counting problem in IniSortBlocks");
4650 		Terminate(-1);
4651 	}
4652 	return(0);
4653 }
4654 
4655 /*
4656   	#] IniSortBlocks :
4657   	#[ DefineSortBotTree :
4658 */
4659 
4660 #ifdef WITHSORTBOTS
4661 
4662 /**
4663  *	To be used in a sortbot merge. It initializes the whole sortbot
4664  *	system by telling the sortbot which threads provide their input.
4665  */
4666 
DefineSortBotTree()4667 void DefineSortBotTree()
4668 {
4669 	ALLPRIVATES *B;
4670 	int n, i, from;
4671 	if ( numberofworkers <= 2 ) return;
4672 	n = numberofworkers*2-2;
4673 	for ( i = numberofworkers+1, from = 1; i <= n; i++ ) {
4674 		B = AB[i];
4675 		AT.SortBotIn1 = from++;
4676 		AT.SortBotIn2 = from++;
4677 	}
4678 	B = AB[0];
4679 	AT.SortBotIn1 = from++;
4680 	AT.SortBotIn2 = from++;
4681 }
4682 
4683 #endif
4684 
4685 /*
4686   	#] DefineSortBotTree :
4687   	#[ GetTerm2 :
4688 
4689 	Routine does a GetTerm but only when a bracket index is involved and
4690 	only from brackets that have been judged not suitable for treatment
4691 	as complete brackets by a single worker. Whether or not a bracket should
4692 	be treated by a single worker is decided by TreatIndexEntry
4693 */
4694 
GetTerm2(PHEAD WORD * term)4695 WORD GetTerm2(PHEAD WORD *term)
4696 {
4697 	WORD *ttco, *tt, retval;
4698 	LONG n,i;
4699 	FILEHANDLE *fi;
4700 	EXPRESSIONS e = AN.expr;
4701 	BRACKETINFO *b  = e->bracketinfo;
4702 	BRACKETINDEX *bi = b->indexbuffer;
4703 	POSITION where, eonfile = AS.OldOnFile[e-Expressions], bstart, bnext;
4704 /*
4705 	1: Get the current position.
4706 */
4707 	switch ( e->status ) {
4708 		case UNHIDELEXPRESSION:
4709 		case UNHIDEGEXPRESSION:
4710 		case DROPHLEXPRESSION:
4711 		case DROPHGEXPRESSION:
4712 		case HIDDENLEXPRESSION:
4713 		case HIDDENGEXPRESSION:
4714 			fi = AR.hidefile;
4715 			break;
4716 		default:
4717 			fi = AR.infile;
4718 			break;
4719 	}
4720 	if ( AR.KeptInHold ) {
4721 		retval = GetTerm(BHEAD term);
4722 		return(retval);
4723 	}
4724 	SeekScratch(fi,&where);
4725 	if ( AN.lastinindex < 0 ) {
4726 /*
4727 		We have to test whether we have to do the first bracket
4728 */
4729 		if ( ( n = TreatIndexEntry(BHEAD 0) ) <= 0 ) {
4730 			AN.lastinindex = n;
4731 			where = bi[n].start;
4732 			ADD2POS(where,eonfile);
4733 			SetScratch(fi,&where);
4734 /*
4735 			Put the bracket in the Compress buffer.
4736 */
4737 			ttco = AR.CompressBuffer;
4738 			tt = b->bracketbuffer + bi[0].bracket;
4739 			i = *tt;
4740 			NCOPY(ttco,tt,i)
4741 			AR.CompressPointer = ttco;
4742 			retval = GetTerm(BHEAD term);
4743 			return(retval);
4744 		}
4745 		else AN.lastinindex = n-1;
4746 	}
4747 /*
4748 	2: Find the corresponding index number
4749 	   a: test whether it is in the current bracket
4750 */
4751 	n = AN.lastinindex;
4752 	bstart = bi[n].start;
4753 	ADD2POS(bstart,eonfile);
4754 	bnext = bi[n].next;
4755 	ADD2POS(bnext,eonfile);
4756 	if ( ISLESSPOS(bstart,where) && ISLESSPOS(where,bnext) ) {
4757 		retval = GetTerm(BHEAD term);
4758 		return(retval);
4759 	}
4760 	for ( n++ ; n < b->indexfill; n++ ) {
4761 		i = TreatIndexEntry(BHEAD n);
4762 		if ( i <= 0 ) {
4763 /*
4764 			Put the bracket in the Compress buffer.
4765 */
4766 			ttco = AR.CompressBuffer;
4767 			tt = b->bracketbuffer + bi[n].bracket;
4768 			i = *tt;
4769 			NCOPY(ttco,tt,i)
4770 			AR.CompressPointer = ttco;
4771 			AN.lastinindex = n;
4772 			where = bi[n].start;
4773 			ADD2POS(where,eonfile);
4774 			SetScratch(fi,&(where));
4775 			retval = GetTerm(BHEAD term);
4776 			return(retval);
4777 		}
4778 		else n += i - 1;
4779 	}
4780 	return(0);
4781 }
4782 
4783 /*
4784   	#] GetTerm2 :
4785   	#[ TreatIndexEntry :
4786 */
4787 /**
4788  *	Routine has to decide whether a bracket has to be sent as a complete
4789  *	bracket to a worker or whether it has to be treated by the bucket system.
4790  *	Return value is positive when we should send it as a complete bracket and
4791  *	0 when it should be done via the buckets.
4792  *	The positive return value indicates how many brackets should be treated.
4793  */
4794 
TreatIndexEntry(PHEAD LONG n)4795 int TreatIndexEntry(PHEAD LONG n)
4796 {
4797 	BRACKETINFO *b  = AN.expr->bracketinfo;
4798 	LONG numbra = b->indexfill - 1, i;
4799 	LONG totterms;
4800 	BRACKETINDEX *bi;
4801 	POSITION pos1, average;
4802 /*
4803 	1: number of the bracket should be such that there is one bucket
4804 	   for each worker remaining.
4805 */
4806 	if ( ( numbra - n ) <= numberofworkers ) return(0);
4807 /*
4808 	2: size of the bracket contents should be less than what remains in
4809 	   the expression divided by the number of workers.
4810 */
4811 	bi = b->indexbuffer;
4812 	DIFPOS(pos1,bi[numbra].next,bi[n].next);  /* Size of what remains */
4813 	BASEPOSITION(average) = DIVPOS(pos1,(3*numberofworkers));
4814 	DIFPOS(pos1,bi[n].next,bi[n].start);      /* Size of the current bracket */
4815 	if ( ISLESSPOS(average,pos1) ) return(0);
4816 /*
4817 	It passes.
4818 	Now check whether we can do more brackets
4819 */
4820 	totterms = bi->termsinbracket;
4821 	if ( totterms > 2*AC.ThreadBucketSize ) return(1);
4822 	for ( i = 1; i < numbra-n; i++ ) {
4823 		DIFPOS(pos1,bi[n+i].next,bi[n].start); /* Size of the combined brackets */
4824 		if ( ISLESSPOS(average,pos1) ) return(i);
4825 		totterms += bi->termsinbracket;
4826 		if ( totterms > 2*AC.ThreadBucketSize ) return(i+1);
4827 	}
4828 /*
4829 	We have a problem at the end of the system. Just do one.
4830 */
4831 	return(1);
4832 }
4833 
4834 /*
4835   	#] TreatIndexEntry :
4836   	#[ SetHideFiles :
4837 */
4838 
SetHideFiles()4839 void SetHideFiles() {
4840 	int i;
4841 	ALLPRIVATES *B, *B0 = AB[0];
4842 	for ( i = 1; i <= numberofworkers; i++ ) {
4843 		B = AB[i];
4844 		AR.hidefile->handle = AR0.hidefile->handle;
4845 		if ( AR.hidefile->handle < 0 ) {
4846 			AR.hidefile->PObuffer = AR0.hidefile->PObuffer;
4847 			AR.hidefile->POstop = AR0.hidefile->POstop;
4848 			AR.hidefile->POfill = AR0.hidefile->POfill;
4849 			AR.hidefile->POfull = AR0.hidefile->POfull;
4850 			AR.hidefile->POsize = AR0.hidefile->POsize;
4851 			AR.hidefile->POposition = AR0.hidefile->POposition;
4852 			AR.hidefile->filesize = AR0.hidefile->filesize;
4853 		}
4854 		else {
4855 			AR.hidefile->PObuffer = AR.hidefile->wPObuffer;
4856 			AR.hidefile->POstop = AR.hidefile->wPOstop;
4857 			AR.hidefile->POfill = AR.hidefile->wPOfill;
4858 			AR.hidefile->POfull = AR.hidefile->wPOfull;
4859 			AR.hidefile->POsize = AR.hidefile->wPOsize;
4860 			PUTZERO(AR.hidefile->POposition);
4861 		}
4862 	}
4863 }
4864 
4865 /*
4866   	#] SetHideFiles :
4867   	#[ IniFbufs :
4868 */
4869 
IniFbufs(VOID)4870 void IniFbufs(VOID)
4871 {
4872 	int i;
4873 	for ( i = 0; i < AM.totalnumberofthreads; i++ ) {
4874 		IniFbuffer(AB[i]->T.fbufnum);
4875 	}
4876 }
4877 
4878 /*
4879   	#] IniFbufs :
4880   	#[ SetMods :
4881 */
4882 
SetMods()4883 void SetMods()
4884 {
4885 	ALLPRIVATES *B;
4886 	int i, n, j;
4887 	for ( j = 0; j < AM.totalnumberofthreads; j++ ) {
4888 		B = AB[j];
4889 		AN.ncmod = AC.ncmod;
4890 		if ( AN.cmod != 0 ) M_free(AN.cmod,"AN.cmod");
4891 		n = ABS(AN.ncmod);
4892 		AN.cmod = (UWORD *)Malloc1(sizeof(WORD)*n,"AN.cmod");
4893 		for ( i = 0; i < n; i++ ) AN.cmod[i] = AC.cmod[i];
4894 	}
4895 }
4896 
4897 /*
4898   	#] SetMods :
4899   	#[ UnSetMods :
4900 */
4901 
UnSetMods()4902 void UnSetMods()
4903 {
4904 	ALLPRIVATES *B;
4905 	int j;
4906 	for ( j = 0; j < AM.totalnumberofthreads; j++ ) {
4907 		B = AB[j];
4908 		if ( AN.cmod != 0 ) M_free(AN.cmod,"AN.cmod");
4909 		AN.cmod = 0;
4910 	}
4911 }
4912 
4913 /*
4914   	#] UnSetMods :
4915   	#[ find_Horner_MCTS_expand_tree_threaded :
4916 */
4917 
find_Horner_MCTS_expand_tree_threaded()4918 void find_Horner_MCTS_expand_tree_threaded() {
4919 	int id;
4920 	while (( id = GetAvailableThread() ) < 0)
4921 		MasterWait();
4922 	WakeupThread(id,MCTSEXPANDTREE);
4923 }
4924 
4925 /*
4926   	#] find_Horner_MCTS_expand_tree_threaded :
4927   	#[ optimize_expression_given_Horner_threaded :
4928 */
4929 
optimize_expression_given_Horner_threaded()4930 extern void optimize_expression_given_Horner_threaded() {
4931 	int id;
4932 	while (( id = GetAvailableThread() ) < 0)
4933 		MasterWait();
4934 	WakeupThread(id,OPTIMIZEEXPRESSION);
4935 }
4936 
4937 /*
4938   	#] optimize_expression_given_Horner_threaded :
4939 */
4940 
4941 #endif
4942