1 
2 /******************************************************************************
3  *
4  *  This file is part of meryl-utility, a collection of miscellaneous code
5  *  used by Meryl, Canu and others.
6  *
7  *  This software is based on:
8  *    'Canu' v2.0              (https://github.com/marbl/canu)
9  *  which is based on:
10  *    'Celera Assembler' r4587 (http://wgs-assembler.sourceforge.net)
11  *    the 'kmer package' r1994 (http://kmer.sourceforge.net)
12  *
13  *  Except as indicated otherwise, this is a 'United States Government Work',
14  *  and is released in the public domain.
15  *
16  *  File 'README.licenses' in the root directory of this distribution
17  *  contains full conditions and disclaimers.
18  */
19 
20 #include "sweatShop.H"
21 #include "system.H"
22 
23 #include <sched.h>  //  pthread scheduling stuff
24 
25 
26 class sweatShopWorker {
27 public:
sweatShopWorker()28   sweatShopWorker() {
29     shop            = 0L;
30     threadUserData  = 0L;
31     numComputed     = 0;
32     workerQueue     = 0L;
33     workerQueueLen  = 0L;
34   };
35 
36   sweatShop        *shop;
37   void             *threadUserData;
38   pthread_t         threadID;
39   uint32            numComputed;
40   sweatShopState  **workerQueue;
41   uint32            workerQueueLen;
42 };
43 
44 
45 //  This gets created by the loader, passed to the worker, and printed
46 //  by the writer.  userData is controlled by the user.
47 //
48 class sweatShopState {
49 public:
sweatShopState(void * userData)50   sweatShopState(void *userData) {
51     _user      = userData;
52     _computed  = false;
53     _outputted = false;
54     _next      = 0L;
55   };
~sweatShopState()56   ~sweatShopState() {
57   };
58 
59   void             *_user;
60   bool              _computed;
61   bool              _outputted;
62   sweatShopState   *_next;
63 };
64 
65 
66 
67 
68 //  Simply forwards control to the class
69 void*
_sweatshop_loaderThread(void * ss_)70 _sweatshop_loaderThread(void *ss_) {
71   sweatShop *ss = (sweatShop *)ss_;
72   return(ss->loader());
73 }
74 
75 void*
_sweatshop_workerThread(void * sw_)76 _sweatshop_workerThread(void *sw_) {
77   sweatShopWorker *sw = (sweatShopWorker *)sw_;
78   return(sw->shop->worker(sw));
79 }
80 
81 void*
_sweatshop_writerThread(void * ss_)82 _sweatshop_writerThread(void *ss_) {
83   sweatShop *ss = (sweatShop *)ss_;
84   return(ss->writer());
85 }
86 
87 void*
_sweatshop_statusThread(void * ss_)88 _sweatshop_statusThread(void *ss_) {
89   sweatShop *ss = (sweatShop *)ss_;
90   return(ss->status());
91 }
92 
93 
94 
sweatShop(void * (* loaderfcn)(void * G),void (* workerfcn)(void * G,void * T,void * S),void (* writerfcn)(void * G,void * S))95 sweatShop::sweatShop(void*(*loaderfcn)(void *G),
96                      void (*workerfcn)(void *G, void *T, void *S),
97                      void (*writerfcn)(void *G, void *S)) {
98 
99   _userLoader       = loaderfcn;
100   _userWorker       = workerfcn;
101   _userWriter       = writerfcn;
102 
103   _globalUserData   = 0L;
104 
105   _writerP          = 0L;
106   _workerP          = 0L;
107   _loaderP          = 0L;
108 
109   _showStatus       = false;
110   _writeInOrder     = true;
111 
112   _loaderQueueSize  = 1024;
113   _loaderQueueMax   = 10240;
114   _loaderQueueMin   = 4;  //  _numberOfWorkers * 2, reset when that changes
115   _loaderBatchSize  = 1;
116   _workerBatchSize  = 1;
117   _writerQueueSize  = 4096;
118   _writerQueueMax   = 10240;
119 
120   _numberOfWorkers  = 2;
121 
122   _workerData       = 0L;
123 
124   _numberLoaded     = 0;
125   _numberComputed   = 0;
126   _numberOutput     = 0;
127 }
128 
129 
~sweatShop()130 sweatShop::~sweatShop() {
131   delete [] _workerData;
132 }
133 
134 
135 
136 void
setThreadData(uint32 t,void * x)137 sweatShop::setThreadData(uint32 t, void *x) {
138   if (_workerData == 0L)
139     _workerData = new sweatShopWorker [_numberOfWorkers];
140 
141   if (t >= _numberOfWorkers)
142     fprintf(stderr, "sweatShop::setThreadData()-- worker ID " F_U32 " more than number of workers=" F_U32 "\n", t, _numberOfWorkers), exit(1);
143 
144   _workerData[t].threadUserData = x;
145 }
146 
147 
148 
149 //  Build a list of states to add in one swoop
150 //
151 void
loaderAddToLocal(sweatShopState * & tail,sweatShopState * & head,sweatShopState * thisState)152 sweatShop::loaderAddToLocal(sweatShopState *&tail, sweatShopState *&head, sweatShopState *thisState) {
153 
154   thisState->_next  = 0L;
155 
156   if (tail) {
157     head->_next = thisState;
158     head        = thisState;
159   } else {
160     tail = head = thisState;
161   }
162 }
163 
164 
165 //  Add a bunch of new states to the queue.
166 //
167 void
loaderAppendToGlobal(sweatShopState * & tail,sweatShopState * & head,uint32 num)168 sweatShop::loaderAppendToGlobal(sweatShopState *&tail, sweatShopState *&head, uint32 num) {
169   int err;
170 
171   if ((tail == 0L) || (head == 0L))
172     return;
173 
174   err = pthread_mutex_lock(&_stateMutex);
175   if (err != 0)
176     fprintf(stderr, "sweatShop::loaderAppend()--  Failed to lock mutex (%d).  Fail.\n", err), exit(1);
177 
178   if (_loaderP == 0L) {
179     _writerP      = tail;
180     _workerP      = tail;
181     _loaderP      = head;
182   } else {
183     _loaderP->_next = tail;
184   }
185   _loaderP        = head;
186 
187   _numberLoaded += num;
188 
189   err = pthread_mutex_unlock(&_stateMutex);
190   if (err != 0)
191     fprintf(stderr, "sweatShop::loaderAppend()--  Failed to unlock mutex (%d).  Fail.\n", err), exit(1);
192 
193   tail = 0L;
194   head = 0L;
195 }
196 
197 
198 
199 void*
loader(void)200 sweatShop::loader(void) {
201   struct timespec   naptime;
202   sweatShopState   *tail       = nullptr;  //  A local list, to reduce the number of times we
203   sweatShopState   *head       = nullptr;  //  lock the global list.
204   uint32            numLoaded  = 0;
205 
206   naptime.tv_sec      = 0;
207   naptime.tv_nsec     = 166666666ULL;  //  1/6 second
208 
209   while (1) {
210     void *object = NULL;
211 
212     while (_numberLoaded > _numberComputed + _loaderQueueSize)  //  Sleep if the queue is too big.
213       nanosleep(&naptime, 0L);
214 
215     //  If a userLoader function exists, use it to load the data object, then
216     //  make a new state for that object.
217 
218     if (_userLoader)
219       object = (*_userLoader)(_globalUserData);
220 
221     sweatShopState  *thisState = new sweatShopState(object);
222 
223     //  If there is no user pointer, we've run out of inputs.
224     //  Push on the empty state to the local list, force an append
225     //  to the global list, and exit this loader function.
226 
227     if (thisState->_user == nullptr) {
228       loaderAddToLocal(tail, head, thisState);
229       loaderAppendToGlobal(tail, head, numLoaded + 1);
230 
231       return(nullptr);
232     }
233 
234     //  Otherwise, we've loaded a user object.  Push it onto the local list,
235     //  then merge into the global list if the local list is long enough.
236 
237     loaderAddToLocal(tail, head, thisState);
238     numLoaded++;
239 
240     if (numLoaded >= _loaderBatchSize) {
241       loaderAppendToGlobal(tail, head, numLoaded);
242       numLoaded = 0;
243     }
244   }
245 
246   return(nullptr);  //  Never returns.
247 }
248 
249 
250 
251 void*
worker(sweatShopWorker * workerData)252 sweatShop::worker(sweatShopWorker *workerData) {
253 
254   struct timespec   naptime;
255   naptime.tv_sec      = 0;
256   naptime.tv_nsec     = 50000000ULL;
257 
258   bool    moreToCompute = true;
259   int     err;
260 
261   while (moreToCompute) {
262 
263     //  Usually beacuse some worker is taking a long time, and the
264     //  output queue isn't big enough.
265     //
266     while (_numberOutput + _writerQueueSize < _numberComputed)
267       nanosleep(&naptime, 0L);
268 
269     //  Grab the next state.  We don't grab it if it's the last in the
270     //  queue (else we would fall off the end) UNLESS it really is the
271     //  last one.
272     //
273     err = pthread_mutex_lock(&_stateMutex);
274     if (err != 0)
275       fprintf(stderr, "sweatShop::worker()--  Failed to lock mutex (%d).  Fail.\n", err), exit(1);
276 
277     for (workerData->workerQueueLen = 0; ((workerData->workerQueueLen < _workerBatchSize) &&
278                                           (_workerP) &&
279                                           ((_workerP->_next != 0L) || (_workerP->_user == 0L))); workerData->workerQueueLen++) {
280       workerData->workerQueue[workerData->workerQueueLen] = _workerP;
281       _workerP = _workerP->_next;
282     }
283 
284     if (_workerP == 0L)
285       moreToCompute = false;
286 
287     err = pthread_mutex_unlock(&_stateMutex);
288     if (err != 0)
289       fprintf(stderr, "sweatShop::worker()--  Failed to lock mutex (%d).  Fail.\n", err), exit(1);
290 
291 
292     if (workerData->workerQueueLen == 0) {
293       //  No work, sleep a bit to prevent thrashing the mutex and resume.
294       nanosleep(&naptime, 0L);
295       continue;
296     }
297 
298     //  Execute
299     //
300     for (uint32 x=0; x<workerData->workerQueueLen; x++) {
301       sweatShopState *ts = workerData->workerQueue[x];
302 
303       if (ts && ts->_user) {
304         if (_userWorker)
305           (*_userWorker)(_globalUserData, workerData->threadUserData, ts->_user);
306         ts->_computed = true;
307         workerData->numComputed++;
308       } else {
309         //  When we really do run out of stuff to do, we'll end up here
310         //  (only one thread will end up in the other case, with
311         //  something to do and moreToCompute=false).  If it's actually
312         //  the end, skip the sleep and just get outta here.
313         //
314         if (moreToCompute == true) {
315           fprintf(stderr, "WARNING!  Worker is sleeping because the reader is slow!\n");
316           nanosleep(&naptime, 0L);
317         }
318       }
319     }
320   }
321 
322   //fprintf(stderr, "sweatShop::worker exits.\n");
323   return(0L);
324 }
325 
326 
327 void
writerWrite(sweatShopState * w)328 sweatShop::writerWrite(sweatShopState *w) {
329 
330   if (_userWriter)
331     (*_userWriter)(_globalUserData, w->_user);
332   _numberOutput++;
333 
334   w->_outputted = true;
335 }
336 
337 
338 void*
writer(void)339 sweatShop::writer(void) {
340   sweatShopState  *deleteState = 0L;
341   struct timespec naptime1 = { .tv_sec = 0, .tv_nsec = 5000000ULL };
342   struct timespec naptime2 = { .tv_sec = 0, .tv_nsec = 5000000ULL };
343 
344 
345   while ((_writerP        != nullptr) &&
346          (_writerP->_user != nullptr)) {
347 
348     //  If a complete result, write it.
349     if ((_writerP->_computed  == true) &&
350         (_writerP->_outputted == false)) {
351       writerWrite(_writerP);
352       continue;
353     }
354 
355     //  If we can write output out-of-order, search ahead
356     //  for any results and output them.
357     //  if (_outOfOrder == true)
358     if (_writeInOrder == false) {
359       for (sweatShopState *ss = _writerP; ss != nullptr; ss = ss->_next)
360         if ((ss->_computed  == true) &&
361             (ss->_outputted == false)) {
362           writerWrite(ss);
363         }
364     }
365 
366     //  If no next, wait for input to appear.  We can't purge this node
367     //  from the list until there is a next, else we lose the list!
368     if (_writerP->_next == nullptr) {
369       nanosleep(&naptime1, 0L);
370       continue;
371     }
372 
373     //  If already output, remove the node.
374     if (_writerP->_outputted == true) {
375       sweatShopState *ds = _writerP;
376       _writerP           = _writerP->_next;
377 
378       delete ds;
379       continue;
380     }
381 
382     //  Otherwise, we need to wait for a state to appear on the queue.
383     nanosleep(&naptime2, 0L);
384   }
385 
386   //  Tell status to stop.
387   _writerP = 0L;
388 
389   return(0L);
390 }
391 
392 
393 //  This thread not only shows a status message, but it also updates the critical shared variable
394 //  _numberComputed.  Worker threads use this to throttle themselves.  Thus, even if _showStatus is
395 //  not set, and this thread doesn't _appear_ to be doing anything useful....it is.
396 //
397 void*
status(void)398 sweatShop::status(void) {
399 
400   struct timespec   naptime;
401   naptime.tv_sec      = 0;
402   naptime.tv_nsec     = 250000000ULL;
403 
404   double  startTime = getTime() - 0.001;
405   double  thisTime  = 0;
406 
407   uint64  deltaOut = 0;
408   uint64  deltaCPU = 0;
409 
410   double  cpuPerSec = 0;
411 
412   uint64  readjustAt = 16384;
413 
414   while (_writerP) {
415     uint32 nc = 0;
416     for (uint32 i=0; i<_numberOfWorkers; i++)
417       nc += _workerData[i].numComputed;
418     _numberComputed = nc;
419 
420     deltaOut = deltaCPU = 0;
421 
422     thisTime = getTime();
423 
424     if (_numberComputed > _numberOutput)
425       deltaOut = _numberComputed - _numberOutput;
426     if (_numberLoaded > _numberComputed)
427       deltaCPU = _numberLoaded - _numberComputed;
428 
429     cpuPerSec = _numberComputed / (thisTime - startTime);
430 
431     if (_showStatus) {
432       fprintf(stderr, " %6.1f/s - %8" F_U64P " loaded; %8" F_U64P " queued for compute; %8" F_U64P " finished; %8" F_U64P " written; %8" F_U64P " queued for output)\r",
433               cpuPerSec, _numberLoaded, deltaCPU, _numberComputed, _numberOutput, deltaOut);
434       fflush(stderr);
435     }
436 
437     //  Readjust queue sizes based on current performance, but don't let it get too big or small.
438     //  In particular, don't let it get below 2*numberOfWorkers.
439     //
440      if (_numberComputed > readjustAt) {
441        readjustAt       += (uint64)(2 * cpuPerSec);
442        _loaderQueueSize  = (uint32)(5 * cpuPerSec);
443      }
444 
445     if (_loaderQueueSize < _loaderQueueMin)
446       _loaderQueueSize = _loaderQueueMin;
447 
448     if (_loaderQueueSize < 2 * _numberOfWorkers)
449       _loaderQueueSize = 2 * _numberOfWorkers;
450 
451     if (_loaderQueueSize > _loaderQueueMax)
452       _loaderQueueSize = _loaderQueueMax;
453 
454     nanosleep(&naptime, 0L);
455   }
456 
457   if (_showStatus) {
458     thisTime = getTime();
459 
460     if (_numberComputed > _numberOutput)
461       deltaOut = _numberComputed - _numberOutput;
462     if (_numberLoaded > _numberComputed)
463       deltaCPU = _numberLoaded - _numberComputed;
464 
465     cpuPerSec = _numberComputed / (thisTime - startTime);
466 
467     fprintf(stderr, " %6.1f/s - %08" F_U64P " queued for compute; %08" F_U64P " finished; %08" F_U64P " queued for output)\n",
468             cpuPerSec, deltaCPU, _numberComputed, deltaOut);
469   }
470 
471   //fprintf(stderr, "sweatShop::status exits.\n");
472   return(0L);
473 }
474 
475 
476 
477 
478 
479 void
run(void * user,bool beVerbose)480 sweatShop::run(void *user, bool beVerbose) {
481   pthread_attr_t      threadAttr;
482   pthread_t           threadIDloader;
483   pthread_t           threadIDwriter;
484   pthread_t           threadIDstats;
485 #if 0
486   int                 threadSchedPolicy = 0;
487   struct sched_param  threadSchedParamDef;
488   struct sched_param  threadSchedParamMax;
489 #endif
490   int                 err = 0;
491 
492   _globalUserData = user;
493   _showStatus     = beVerbose;
494 
495   //  Configure everything ahead of time.
496 
497   if (_workerBatchSize < 1)
498     _workerBatchSize = 1;
499 
500   if (_workerData == 0L)
501     _workerData = new sweatShopWorker [_numberOfWorkers];
502 
503   for (uint32 i=0; i<_numberOfWorkers; i++) {
504     _workerData[i].shop        = this;
505     _workerData[i].workerQueue = new sweatShopState * [_workerBatchSize];
506   }
507 
508   //  Open the doors.
509 
510   errno = 0;
511 
512   err = pthread_mutex_init(&_stateMutex, NULL);
513   if (err)
514     fprintf(stderr, "sweatShop::run()--  Failed to configure pthreads (state mutex): %s.\n", strerror(err)), exit(1);
515 
516   err = pthread_attr_init(&threadAttr);
517   if (err)
518     fprintf(stderr, "sweatShop::run()--  Failed to configure pthreads (attr init): %s.\n", strerror(err)), exit(1);
519 
520   err = pthread_attr_setscope(&threadAttr, PTHREAD_SCOPE_SYSTEM);
521   if (err)
522     fprintf(stderr, "sweatShop::run()--  Failed to configure pthreads (set scope): %s.\n", strerror(err)), exit(1);
523 
524   err = pthread_attr_setdetachstate(&threadAttr, PTHREAD_CREATE_JOINABLE);
525   if (err)
526     fprintf(stderr, "sweatShop::run()--  Failed to configure pthreads (joinable): %s.\n", strerror(err)), exit(1);
527 
528 #if 0
529   err = pthread_attr_getschedparam(&threadAttr, &threadSchedParamDef);
530   if (err)
531     fprintf(stderr, "sweatShop::run()--  Failed to configure pthreads (get default param): %s.\n", strerror(err)), exit(1);
532 
533   err = pthread_attr_getschedparam(&threadAttr, &threadSchedParamMax);
534   if (err)
535     fprintf(stderr, "sweatShop::run()--  Failed to configure pthreads (get max param): %s.\n", strerror(err)), exit(1);
536 #endif
537 
538   //  SCHED_RR needs root privs to run on FreeBSD.
539   //
540   //err = pthread_attr_setschedpolicy(&threadAttr, SCHED_RR);
541   //if (err)
542   //  fprintf(stderr, "sweatShop::run()--  Failed to configure pthreads (sched policy): %s.\n", strerror(err)), exit(1);
543 
544 #if 0
545   err = pthread_attr_getschedpolicy(&threadAttr, &threadSchedPolicy);
546   if (err)
547     fprintf(stderr, "sweatShop::run()--  Failed to configure pthreads (sched policy): %s.\n", strerror(err)), exit(1);
548 
549   errno = 0;
550   threadSchedParamMax.sched_priority = sched_get_priority_max(threadSchedPolicy);
551   if (errno)
552     fprintf(stderr, "sweatShop::run()--  WARNING: Failed to configure pthreads (set max param priority): %s.\n", strerror(errno));
553 
554   //  Fire off the loader
555 
556   err = pthread_attr_setschedparam(&threadAttr, &threadSchedParamMax);
557   if (err)
558     fprintf(stderr, "sweatShop::run()--  Failed to set loader priority: %s.\n", strerror(err)), exit(1);
559 #endif
560 
561   err = pthread_create(&threadIDloader, &threadAttr, _sweatshop_loaderThread, this);
562   if (err)
563     fprintf(stderr, "sweatShop::run()--  Failed to launch loader thread: %s.\n", strerror(err)), exit(1);
564 
565   //  Wait for it to actually load something (otherwise all the
566   //  workers immediately go home)
567 
568   while (!_writerP && !_workerP && !_loaderP) {
569     struct timespec   naptime;
570     naptime.tv_sec      = 0;
571     naptime.tv_nsec     = 250000ULL;
572     nanosleep(&naptime, 0L);
573   }
574 
575   //  Start the statistics and writer
576 
577 #if 0
578   err = pthread_attr_setschedparam(&threadAttr, &threadSchedParamMax);
579   if (err)
580     fprintf(stderr, "sweatShop::run()--  Failed to set status and writer priority: %s.\n", strerror(err)), exit(1);
581 #endif
582 
583   err = pthread_create(&threadIDstats,  &threadAttr, _sweatshop_statusThread, this);
584   if (err)
585     fprintf(stderr, "sweatShop::run()--  Failed to launch status thread: %s.\n", strerror(err)), exit(1);
586 
587   err = pthread_create(&threadIDwriter, &threadAttr, _sweatshop_writerThread, this);
588   if (err)
589     fprintf(stderr, "sweatShop::run()--  Failed to launch writer thread: %s.\n", strerror(err)), exit(1);
590 
591   //  And some labor
592 
593 #if 0
594   err = pthread_attr_setschedparam(&threadAttr, &threadSchedParamDef);
595   if (err)
596     fprintf(stderr, "sweatShop::run()--  Failed to set worker priority: %s.\n", strerror(err)), exit(1);
597 #endif
598 
599   for (uint32 i=0; i<_numberOfWorkers; i++) {
600     err = pthread_create(&_workerData[i].threadID, &threadAttr, _sweatshop_workerThread, _workerData + i);
601     if (err)
602       fprintf(stderr, "sweatShop::run()--  Failed to launch worker thread " F_U32 ": %s.\n", i, strerror(err)), exit(1);
603   }
604 
605   //  Now sit back and relax.
606 
607   err = pthread_join(threadIDloader, 0L);
608   if (err)
609     fprintf(stderr, "sweatShop::run()--  Failed to join loader thread: %s.\n", strerror(err)), exit(1);
610 
611   err = pthread_join(threadIDwriter, 0L);
612   if (err)
613     fprintf(stderr, "sweatShop::run()--  Failed to join writer thread: %s.\n", strerror(err)), exit(1);
614 
615   err = pthread_join(threadIDstats,  0L);
616   if (err)
617     fprintf(stderr, "sweatShop::run()--  Failed to join status thread: %s.\n", strerror(err)), exit(1);
618 
619   for (uint32 i=0; i<_numberOfWorkers; i++) {
620     err = pthread_join(_workerData[i].threadID, 0L);
621     if (err)
622       fprintf(stderr, "sweatShop::run()--  Failed to join worker thread " F_U32 ": %s.\n", i, strerror(err)), exit(1);
623   }
624 
625   //  Cleanup.
626 
627   delete _loaderP;
628   _loaderP = _workerP = _writerP = 0L;
629 }
630