1
2 /******************************************************************************
3 *
4 * This file is part of meryl-utility, a collection of miscellaneous code
5 * used by Meryl, Canu and others.
6 *
7 * This software is based on:
8 * 'Canu' v2.0 (https://github.com/marbl/canu)
9 * which is based on:
10 * 'Celera Assembler' r4587 (http://wgs-assembler.sourceforge.net)
11 * the 'kmer package' r1994 (http://kmer.sourceforge.net)
12 *
13 * Except as indicated otherwise, this is a 'United States Government Work',
14 * and is released in the public domain.
15 *
16 * File 'README.licenses' in the root directory of this distribution
17 * contains full conditions and disclaimers.
18 */
19
20 #include "sweatShop.H"
21 #include "system.H"
22
23 #include <sched.h> // pthread scheduling stuff
24
25
26 class sweatShopWorker {
27 public:
sweatShopWorker()28 sweatShopWorker() {
29 shop = 0L;
30 threadUserData = 0L;
31 numComputed = 0;
32 workerQueue = 0L;
33 workerQueueLen = 0L;
34 };
35
36 sweatShop *shop;
37 void *threadUserData;
38 pthread_t threadID;
39 uint32 numComputed;
40 sweatShopState **workerQueue;
41 uint32 workerQueueLen;
42 };
43
44
45 // This gets created by the loader, passed to the worker, and printed
46 // by the writer. userData is controlled by the user.
47 //
48 class sweatShopState {
49 public:
sweatShopState(void * userData)50 sweatShopState(void *userData) {
51 _user = userData;
52 _computed = false;
53 _outputted = false;
54 _next = 0L;
55 };
~sweatShopState()56 ~sweatShopState() {
57 };
58
59 void *_user;
60 bool _computed;
61 bool _outputted;
62 sweatShopState *_next;
63 };
64
65
66
67
68 // Simply forwards control to the class
69 void*
_sweatshop_loaderThread(void * ss_)70 _sweatshop_loaderThread(void *ss_) {
71 sweatShop *ss = (sweatShop *)ss_;
72 return(ss->loader());
73 }
74
75 void*
_sweatshop_workerThread(void * sw_)76 _sweatshop_workerThread(void *sw_) {
77 sweatShopWorker *sw = (sweatShopWorker *)sw_;
78 return(sw->shop->worker(sw));
79 }
80
81 void*
_sweatshop_writerThread(void * ss_)82 _sweatshop_writerThread(void *ss_) {
83 sweatShop *ss = (sweatShop *)ss_;
84 return(ss->writer());
85 }
86
87 void*
_sweatshop_statusThread(void * ss_)88 _sweatshop_statusThread(void *ss_) {
89 sweatShop *ss = (sweatShop *)ss_;
90 return(ss->status());
91 }
92
93
94
sweatShop(void * (* loaderfcn)(void * G),void (* workerfcn)(void * G,void * T,void * S),void (* writerfcn)(void * G,void * S))95 sweatShop::sweatShop(void*(*loaderfcn)(void *G),
96 void (*workerfcn)(void *G, void *T, void *S),
97 void (*writerfcn)(void *G, void *S)) {
98
99 _userLoader = loaderfcn;
100 _userWorker = workerfcn;
101 _userWriter = writerfcn;
102
103 _globalUserData = 0L;
104
105 _writerP = 0L;
106 _workerP = 0L;
107 _loaderP = 0L;
108
109 _showStatus = false;
110 _writeInOrder = true;
111
112 _loaderQueueSize = 1024;
113 _loaderQueueMax = 10240;
114 _loaderQueueMin = 4; // _numberOfWorkers * 2, reset when that changes
115 _loaderBatchSize = 1;
116 _workerBatchSize = 1;
117 _writerQueueSize = 4096;
118 _writerQueueMax = 10240;
119
120 _numberOfWorkers = 2;
121
122 _workerData = 0L;
123
124 _numberLoaded = 0;
125 _numberComputed = 0;
126 _numberOutput = 0;
127 }
128
129
~sweatShop()130 sweatShop::~sweatShop() {
131 delete [] _workerData;
132 }
133
134
135
136 void
setThreadData(uint32 t,void * x)137 sweatShop::setThreadData(uint32 t, void *x) {
138 if (_workerData == 0L)
139 _workerData = new sweatShopWorker [_numberOfWorkers];
140
141 if (t >= _numberOfWorkers)
142 fprintf(stderr, "sweatShop::setThreadData()-- worker ID " F_U32 " more than number of workers=" F_U32 "\n", t, _numberOfWorkers), exit(1);
143
144 _workerData[t].threadUserData = x;
145 }
146
147
148
149 // Build a list of states to add in one swoop
150 //
151 void
loaderAddToLocal(sweatShopState * & tail,sweatShopState * & head,sweatShopState * thisState)152 sweatShop::loaderAddToLocal(sweatShopState *&tail, sweatShopState *&head, sweatShopState *thisState) {
153
154 thisState->_next = 0L;
155
156 if (tail) {
157 head->_next = thisState;
158 head = thisState;
159 } else {
160 tail = head = thisState;
161 }
162 }
163
164
165 // Add a bunch of new states to the queue.
166 //
167 void
loaderAppendToGlobal(sweatShopState * & tail,sweatShopState * & head,uint32 num)168 sweatShop::loaderAppendToGlobal(sweatShopState *&tail, sweatShopState *&head, uint32 num) {
169 int err;
170
171 if ((tail == 0L) || (head == 0L))
172 return;
173
174 err = pthread_mutex_lock(&_stateMutex);
175 if (err != 0)
176 fprintf(stderr, "sweatShop::loaderAppend()-- Failed to lock mutex (%d). Fail.\n", err), exit(1);
177
178 if (_loaderP == 0L) {
179 _writerP = tail;
180 _workerP = tail;
181 _loaderP = head;
182 } else {
183 _loaderP->_next = tail;
184 }
185 _loaderP = head;
186
187 _numberLoaded += num;
188
189 err = pthread_mutex_unlock(&_stateMutex);
190 if (err != 0)
191 fprintf(stderr, "sweatShop::loaderAppend()-- Failed to unlock mutex (%d). Fail.\n", err), exit(1);
192
193 tail = 0L;
194 head = 0L;
195 }
196
197
198
199 void*
loader(void)200 sweatShop::loader(void) {
201 struct timespec naptime;
202 sweatShopState *tail = nullptr; // A local list, to reduce the number of times we
203 sweatShopState *head = nullptr; // lock the global list.
204 uint32 numLoaded = 0;
205
206 naptime.tv_sec = 0;
207 naptime.tv_nsec = 166666666ULL; // 1/6 second
208
209 while (1) {
210 void *object = NULL;
211
212 while (_numberLoaded > _numberComputed + _loaderQueueSize) // Sleep if the queue is too big.
213 nanosleep(&naptime, 0L);
214
215 // If a userLoader function exists, use it to load the data object, then
216 // make a new state for that object.
217
218 if (_userLoader)
219 object = (*_userLoader)(_globalUserData);
220
221 sweatShopState *thisState = new sweatShopState(object);
222
223 // If there is no user pointer, we've run out of inputs.
224 // Push on the empty state to the local list, force an append
225 // to the global list, and exit this loader function.
226
227 if (thisState->_user == nullptr) {
228 loaderAddToLocal(tail, head, thisState);
229 loaderAppendToGlobal(tail, head, numLoaded + 1);
230
231 return(nullptr);
232 }
233
234 // Otherwise, we've loaded a user object. Push it onto the local list,
235 // then merge into the global list if the local list is long enough.
236
237 loaderAddToLocal(tail, head, thisState);
238 numLoaded++;
239
240 if (numLoaded >= _loaderBatchSize) {
241 loaderAppendToGlobal(tail, head, numLoaded);
242 numLoaded = 0;
243 }
244 }
245
246 return(nullptr); // Never returns.
247 }
248
249
250
251 void*
worker(sweatShopWorker * workerData)252 sweatShop::worker(sweatShopWorker *workerData) {
253
254 struct timespec naptime;
255 naptime.tv_sec = 0;
256 naptime.tv_nsec = 50000000ULL;
257
258 bool moreToCompute = true;
259 int err;
260
261 while (moreToCompute) {
262
263 // Usually beacuse some worker is taking a long time, and the
264 // output queue isn't big enough.
265 //
266 while (_numberOutput + _writerQueueSize < _numberComputed)
267 nanosleep(&naptime, 0L);
268
269 // Grab the next state. We don't grab it if it's the last in the
270 // queue (else we would fall off the end) UNLESS it really is the
271 // last one.
272 //
273 err = pthread_mutex_lock(&_stateMutex);
274 if (err != 0)
275 fprintf(stderr, "sweatShop::worker()-- Failed to lock mutex (%d). Fail.\n", err), exit(1);
276
277 for (workerData->workerQueueLen = 0; ((workerData->workerQueueLen < _workerBatchSize) &&
278 (_workerP) &&
279 ((_workerP->_next != 0L) || (_workerP->_user == 0L))); workerData->workerQueueLen++) {
280 workerData->workerQueue[workerData->workerQueueLen] = _workerP;
281 _workerP = _workerP->_next;
282 }
283
284 if (_workerP == 0L)
285 moreToCompute = false;
286
287 err = pthread_mutex_unlock(&_stateMutex);
288 if (err != 0)
289 fprintf(stderr, "sweatShop::worker()-- Failed to lock mutex (%d). Fail.\n", err), exit(1);
290
291
292 if (workerData->workerQueueLen == 0) {
293 // No work, sleep a bit to prevent thrashing the mutex and resume.
294 nanosleep(&naptime, 0L);
295 continue;
296 }
297
298 // Execute
299 //
300 for (uint32 x=0; x<workerData->workerQueueLen; x++) {
301 sweatShopState *ts = workerData->workerQueue[x];
302
303 if (ts && ts->_user) {
304 if (_userWorker)
305 (*_userWorker)(_globalUserData, workerData->threadUserData, ts->_user);
306 ts->_computed = true;
307 workerData->numComputed++;
308 } else {
309 // When we really do run out of stuff to do, we'll end up here
310 // (only one thread will end up in the other case, with
311 // something to do and moreToCompute=false). If it's actually
312 // the end, skip the sleep and just get outta here.
313 //
314 if (moreToCompute == true) {
315 fprintf(stderr, "WARNING! Worker is sleeping because the reader is slow!\n");
316 nanosleep(&naptime, 0L);
317 }
318 }
319 }
320 }
321
322 //fprintf(stderr, "sweatShop::worker exits.\n");
323 return(0L);
324 }
325
326
327 void
writerWrite(sweatShopState * w)328 sweatShop::writerWrite(sweatShopState *w) {
329
330 if (_userWriter)
331 (*_userWriter)(_globalUserData, w->_user);
332 _numberOutput++;
333
334 w->_outputted = true;
335 }
336
337
338 void*
writer(void)339 sweatShop::writer(void) {
340 sweatShopState *deleteState = 0L;
341 struct timespec naptime1 = { .tv_sec = 0, .tv_nsec = 5000000ULL };
342 struct timespec naptime2 = { .tv_sec = 0, .tv_nsec = 5000000ULL };
343
344
345 while ((_writerP != nullptr) &&
346 (_writerP->_user != nullptr)) {
347
348 // If a complete result, write it.
349 if ((_writerP->_computed == true) &&
350 (_writerP->_outputted == false)) {
351 writerWrite(_writerP);
352 continue;
353 }
354
355 // If we can write output out-of-order, search ahead
356 // for any results and output them.
357 // if (_outOfOrder == true)
358 if (_writeInOrder == false) {
359 for (sweatShopState *ss = _writerP; ss != nullptr; ss = ss->_next)
360 if ((ss->_computed == true) &&
361 (ss->_outputted == false)) {
362 writerWrite(ss);
363 }
364 }
365
366 // If no next, wait for input to appear. We can't purge this node
367 // from the list until there is a next, else we lose the list!
368 if (_writerP->_next == nullptr) {
369 nanosleep(&naptime1, 0L);
370 continue;
371 }
372
373 // If already output, remove the node.
374 if (_writerP->_outputted == true) {
375 sweatShopState *ds = _writerP;
376 _writerP = _writerP->_next;
377
378 delete ds;
379 continue;
380 }
381
382 // Otherwise, we need to wait for a state to appear on the queue.
383 nanosleep(&naptime2, 0L);
384 }
385
386 // Tell status to stop.
387 _writerP = 0L;
388
389 return(0L);
390 }
391
392
393 // This thread not only shows a status message, but it also updates the critical shared variable
394 // _numberComputed. Worker threads use this to throttle themselves. Thus, even if _showStatus is
395 // not set, and this thread doesn't _appear_ to be doing anything useful....it is.
396 //
397 void*
status(void)398 sweatShop::status(void) {
399
400 struct timespec naptime;
401 naptime.tv_sec = 0;
402 naptime.tv_nsec = 250000000ULL;
403
404 double startTime = getTime() - 0.001;
405 double thisTime = 0;
406
407 uint64 deltaOut = 0;
408 uint64 deltaCPU = 0;
409
410 double cpuPerSec = 0;
411
412 uint64 readjustAt = 16384;
413
414 while (_writerP) {
415 uint32 nc = 0;
416 for (uint32 i=0; i<_numberOfWorkers; i++)
417 nc += _workerData[i].numComputed;
418 _numberComputed = nc;
419
420 deltaOut = deltaCPU = 0;
421
422 thisTime = getTime();
423
424 if (_numberComputed > _numberOutput)
425 deltaOut = _numberComputed - _numberOutput;
426 if (_numberLoaded > _numberComputed)
427 deltaCPU = _numberLoaded - _numberComputed;
428
429 cpuPerSec = _numberComputed / (thisTime - startTime);
430
431 if (_showStatus) {
432 fprintf(stderr, " %6.1f/s - %8" F_U64P " loaded; %8" F_U64P " queued for compute; %8" F_U64P " finished; %8" F_U64P " written; %8" F_U64P " queued for output)\r",
433 cpuPerSec, _numberLoaded, deltaCPU, _numberComputed, _numberOutput, deltaOut);
434 fflush(stderr);
435 }
436
437 // Readjust queue sizes based on current performance, but don't let it get too big or small.
438 // In particular, don't let it get below 2*numberOfWorkers.
439 //
440 if (_numberComputed > readjustAt) {
441 readjustAt += (uint64)(2 * cpuPerSec);
442 _loaderQueueSize = (uint32)(5 * cpuPerSec);
443 }
444
445 if (_loaderQueueSize < _loaderQueueMin)
446 _loaderQueueSize = _loaderQueueMin;
447
448 if (_loaderQueueSize < 2 * _numberOfWorkers)
449 _loaderQueueSize = 2 * _numberOfWorkers;
450
451 if (_loaderQueueSize > _loaderQueueMax)
452 _loaderQueueSize = _loaderQueueMax;
453
454 nanosleep(&naptime, 0L);
455 }
456
457 if (_showStatus) {
458 thisTime = getTime();
459
460 if (_numberComputed > _numberOutput)
461 deltaOut = _numberComputed - _numberOutput;
462 if (_numberLoaded > _numberComputed)
463 deltaCPU = _numberLoaded - _numberComputed;
464
465 cpuPerSec = _numberComputed / (thisTime - startTime);
466
467 fprintf(stderr, " %6.1f/s - %08" F_U64P " queued for compute; %08" F_U64P " finished; %08" F_U64P " queued for output)\n",
468 cpuPerSec, deltaCPU, _numberComputed, deltaOut);
469 }
470
471 //fprintf(stderr, "sweatShop::status exits.\n");
472 return(0L);
473 }
474
475
476
477
478
479 void
run(void * user,bool beVerbose)480 sweatShop::run(void *user, bool beVerbose) {
481 pthread_attr_t threadAttr;
482 pthread_t threadIDloader;
483 pthread_t threadIDwriter;
484 pthread_t threadIDstats;
485 #if 0
486 int threadSchedPolicy = 0;
487 struct sched_param threadSchedParamDef;
488 struct sched_param threadSchedParamMax;
489 #endif
490 int err = 0;
491
492 _globalUserData = user;
493 _showStatus = beVerbose;
494
495 // Configure everything ahead of time.
496
497 if (_workerBatchSize < 1)
498 _workerBatchSize = 1;
499
500 if (_workerData == 0L)
501 _workerData = new sweatShopWorker [_numberOfWorkers];
502
503 for (uint32 i=0; i<_numberOfWorkers; i++) {
504 _workerData[i].shop = this;
505 _workerData[i].workerQueue = new sweatShopState * [_workerBatchSize];
506 }
507
508 // Open the doors.
509
510 errno = 0;
511
512 err = pthread_mutex_init(&_stateMutex, NULL);
513 if (err)
514 fprintf(stderr, "sweatShop::run()-- Failed to configure pthreads (state mutex): %s.\n", strerror(err)), exit(1);
515
516 err = pthread_attr_init(&threadAttr);
517 if (err)
518 fprintf(stderr, "sweatShop::run()-- Failed to configure pthreads (attr init): %s.\n", strerror(err)), exit(1);
519
520 err = pthread_attr_setscope(&threadAttr, PTHREAD_SCOPE_SYSTEM);
521 if (err)
522 fprintf(stderr, "sweatShop::run()-- Failed to configure pthreads (set scope): %s.\n", strerror(err)), exit(1);
523
524 err = pthread_attr_setdetachstate(&threadAttr, PTHREAD_CREATE_JOINABLE);
525 if (err)
526 fprintf(stderr, "sweatShop::run()-- Failed to configure pthreads (joinable): %s.\n", strerror(err)), exit(1);
527
528 #if 0
529 err = pthread_attr_getschedparam(&threadAttr, &threadSchedParamDef);
530 if (err)
531 fprintf(stderr, "sweatShop::run()-- Failed to configure pthreads (get default param): %s.\n", strerror(err)), exit(1);
532
533 err = pthread_attr_getschedparam(&threadAttr, &threadSchedParamMax);
534 if (err)
535 fprintf(stderr, "sweatShop::run()-- Failed to configure pthreads (get max param): %s.\n", strerror(err)), exit(1);
536 #endif
537
538 // SCHED_RR needs root privs to run on FreeBSD.
539 //
540 //err = pthread_attr_setschedpolicy(&threadAttr, SCHED_RR);
541 //if (err)
542 // fprintf(stderr, "sweatShop::run()-- Failed to configure pthreads (sched policy): %s.\n", strerror(err)), exit(1);
543
544 #if 0
545 err = pthread_attr_getschedpolicy(&threadAttr, &threadSchedPolicy);
546 if (err)
547 fprintf(stderr, "sweatShop::run()-- Failed to configure pthreads (sched policy): %s.\n", strerror(err)), exit(1);
548
549 errno = 0;
550 threadSchedParamMax.sched_priority = sched_get_priority_max(threadSchedPolicy);
551 if (errno)
552 fprintf(stderr, "sweatShop::run()-- WARNING: Failed to configure pthreads (set max param priority): %s.\n", strerror(errno));
553
554 // Fire off the loader
555
556 err = pthread_attr_setschedparam(&threadAttr, &threadSchedParamMax);
557 if (err)
558 fprintf(stderr, "sweatShop::run()-- Failed to set loader priority: %s.\n", strerror(err)), exit(1);
559 #endif
560
561 err = pthread_create(&threadIDloader, &threadAttr, _sweatshop_loaderThread, this);
562 if (err)
563 fprintf(stderr, "sweatShop::run()-- Failed to launch loader thread: %s.\n", strerror(err)), exit(1);
564
565 // Wait for it to actually load something (otherwise all the
566 // workers immediately go home)
567
568 while (!_writerP && !_workerP && !_loaderP) {
569 struct timespec naptime;
570 naptime.tv_sec = 0;
571 naptime.tv_nsec = 250000ULL;
572 nanosleep(&naptime, 0L);
573 }
574
575 // Start the statistics and writer
576
577 #if 0
578 err = pthread_attr_setschedparam(&threadAttr, &threadSchedParamMax);
579 if (err)
580 fprintf(stderr, "sweatShop::run()-- Failed to set status and writer priority: %s.\n", strerror(err)), exit(1);
581 #endif
582
583 err = pthread_create(&threadIDstats, &threadAttr, _sweatshop_statusThread, this);
584 if (err)
585 fprintf(stderr, "sweatShop::run()-- Failed to launch status thread: %s.\n", strerror(err)), exit(1);
586
587 err = pthread_create(&threadIDwriter, &threadAttr, _sweatshop_writerThread, this);
588 if (err)
589 fprintf(stderr, "sweatShop::run()-- Failed to launch writer thread: %s.\n", strerror(err)), exit(1);
590
591 // And some labor
592
593 #if 0
594 err = pthread_attr_setschedparam(&threadAttr, &threadSchedParamDef);
595 if (err)
596 fprintf(stderr, "sweatShop::run()-- Failed to set worker priority: %s.\n", strerror(err)), exit(1);
597 #endif
598
599 for (uint32 i=0; i<_numberOfWorkers; i++) {
600 err = pthread_create(&_workerData[i].threadID, &threadAttr, _sweatshop_workerThread, _workerData + i);
601 if (err)
602 fprintf(stderr, "sweatShop::run()-- Failed to launch worker thread " F_U32 ": %s.\n", i, strerror(err)), exit(1);
603 }
604
605 // Now sit back and relax.
606
607 err = pthread_join(threadIDloader, 0L);
608 if (err)
609 fprintf(stderr, "sweatShop::run()-- Failed to join loader thread: %s.\n", strerror(err)), exit(1);
610
611 err = pthread_join(threadIDwriter, 0L);
612 if (err)
613 fprintf(stderr, "sweatShop::run()-- Failed to join writer thread: %s.\n", strerror(err)), exit(1);
614
615 err = pthread_join(threadIDstats, 0L);
616 if (err)
617 fprintf(stderr, "sweatShop::run()-- Failed to join status thread: %s.\n", strerror(err)), exit(1);
618
619 for (uint32 i=0; i<_numberOfWorkers; i++) {
620 err = pthread_join(_workerData[i].threadID, 0L);
621 if (err)
622 fprintf(stderr, "sweatShop::run()-- Failed to join worker thread " F_U32 ": %s.\n", i, strerror(err)), exit(1);
623 }
624
625 // Cleanup.
626
627 delete _loaderP;
628 _loaderP = _workerP = _writerP = 0L;
629 }
630