1 /*-
2 process.c -- priority scheduling
3
4 Copyright (C) 2012 Mikolaj Izdebski
5
6 This file is part of lbzip2.
7
8 lbzip2 is free software: you can redistribute it and/or modify
9 it under the terms of the GNU General Public License as published by
10 the Free Software Foundation, either version 3 of the License, or
11 (at your option) any later version.
12
13 lbzip2 is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with lbzip2. If not, see <http://www.gnu.org/licenses/>.
20 */
21
22 #include "common.h"
23
24 #include <arpa/inet.h> /* ntohl() */
25 #include <pthread.h> /* pthread_t */
26 #include <signal.h> /* SIGUSR2 */
27 #include <unistd.h> /* write() */
28
29 #include "timespec.h" /* struct timespec */
30 #include "main.h" /* work() */
31
32 #include "process.h" /* struct process */
33 #include "signals.h" /* halt() */
34
35
36 /*
37 JOB SCHEDULING
38
39 Jobs of different importance for the whole process are divided into several
40 categories. Each category has a distinct priority assigned to it. Those
41 priorities are static -- they were defined at lbzip2 design time and they
42 can't change during run time. Any job can be scheduled only if there are no
43 available jobs of higher priority. Therefore the primary scheduling
44 algorithm of lbzip2 is static priority scheduling.
45
46 The secondary algorithm is EDF (Earliest Deadline First). If two jobs of the
47 same priority are to be scheduled (which implies they fall to the same job
48 category) their scheduling order is determined basing on their underlying
49 blocks order within the bzip2 file. Blocks that need to be outputed sooner
50 have respectively higher priority than blocks that can be written later.
51
52 The scheduler maintains several priority queues. Each queue contains jobs of
53 the same type, ordered by their deadline. When there is some free computing
54 power available (that is a worker thread is idle), the scheduler picks the
55 first non-empty priority queue of the highest priority and removes from it
56 the job of the earliest deadline. This job is then passed to the worker
57 thread, which is executing it.
58
59 The EDF algorithm is proven to be optimal in single-worker configuration.
60 In this case it uses the least possible amount of resources (time, memory).
61 In case of two or more worker threads much more resources may be needed, but
62 the overall time should be no longer than in the case with one worker thread.
63 */
64
65
66 /* Error-checking POSIX thread macros.
67
68 For most pthread functions, non-zero return value means a programming bug.
69 In this case aborting seems wiser than printing error message and exiting
70 because abort() can produce code dumps that can be useful in debugging.
71 */
72 #define xjoin(t) ((void)(pthread_join((t), NULL) && (abort(), 0)))
73 #define xlock(m) ((void)(pthread_mutex_lock(m) && (abort(), 0)))
74 #define xunlock(m) ((void)(pthread_mutex_unlock(m) && (abort(), 0)))
75 #define xwait(c,m) ((void)(pthread_cond_wait((c),(m)) && (abort(), 0)))
76 #define xsignal(c) ((void)(pthread_cond_signal(c) && (abort(), 0)))
77 #define xbroadcast(c) ((void)(pthread_cond_broadcast(c) && (abort(), 0)))
78
79 static void *
thread_entry(void * real_entry)80 thread_entry(void *real_entry)
81 {
82 ((void (*)(void))real_entry)();
83 return NULL;
84 }
85
86 /* Create a POSIX thread with error checking. */
87 static pthread_t
xcreate(void (* entry)(void))88 xcreate(void (*entry)(void))
89 {
90 int err;
91 pthread_t thread;
92
93 err = pthread_create(&thread, NULL, thread_entry, entry);
94 if (err != 0)
95 failx(err, "unable to create a POSIX thread");
96
97 return thread;
98 }
99
100
101 void
xread(void * vbuf,size_t * vacant)102 xread(void *vbuf, size_t *vacant)
103 {
104 char *buffer = vbuf;
105
106 assert(*vacant > 0);
107
108 do {
109 ssize_t rd;
110
111 rd = read(ispec.fd, buffer, *vacant > (size_t)SSIZE_MAX ?
112 (size_t)SSIZE_MAX : *vacant);
113
114 /* End of file. */
115 if (0 == rd)
116 break;
117
118 /* Read error. */
119 if (-1 == rd) {
120 failfx(&ispec, errno, "read()");
121 }
122
123 *vacant -= (size_t)rd;
124 buffer += (size_t)rd;
125 ispec.total += (size_t)rd;
126 }
127 while (*vacant > 0);
128 }
129
130 void
xwrite(const void * vbuf,size_t size)131 xwrite(const void *vbuf, size_t size)
132 {
133 const char *buffer = vbuf;
134
135 ospec.total += size;
136
137 if (size > 0 && ospec.fd != -1) {
138 do {
139 ssize_t wr;
140
141 wr = write(ospec.fd, buffer, size > (size_t)SSIZE_MAX ?
142 (size_t)SSIZE_MAX : size);
143
144 /* Write error. */
145 if (-1 == wr) {
146 failfx(&ospec, errno, "write()");
147 }
148
149 size -= (size_t)wr;
150 buffer += (size_t)wr;
151 }
152 while (size > 0);
153 }
154 }
155
156
157 /* Parent and left child indices. */
158 #define parent(i) (((i)-1)/2)
159 #define left(i) ((i)*2+1)
160
161 void
up_heap(void * vroot,unsigned size)162 up_heap(void *vroot, unsigned size)
163 {
164 struct position **root = vroot;
165 struct position *el;
166 unsigned j;
167
168 j = size;
169 el = root[j];
170
171 while (j > 0 && pos_lt(*el, *root[parent(j)])) {
172 root[j] = root[parent(j)];
173 j = parent(j);
174 }
175
176 root[j] = el;
177 }
178
179 void
down_heap(void * vroot,unsigned size)180 down_heap(void *vroot, unsigned size)
181 {
182 struct position **root = vroot;
183 struct position *el;
184 unsigned j;
185
186 el = root[size];
187 root[size] = root[0];
188
189 j = 0;
190 while (left(j) < size) {
191 unsigned child = left(j);
192
193 if (child + 1 < size && pos_lt(*root[child + 1], *root[child]))
194 child++;
195 if (pos_le(*el, *root[child]))
196 break;
197 root[j] = root[child];
198 j = child;
199 }
200
201 root[j] = el;
202 }
203
204
205 static pthread_mutex_t source_mutex = PTHREAD_MUTEX_INITIALIZER;
206 static pthread_cond_t source_cond = PTHREAD_COND_INITIALIZER;
207 static pthread_mutex_t sink_mutex = PTHREAD_MUTEX_INITIALIZER;
208 static pthread_cond_t sink_cond = PTHREAD_COND_INITIALIZER;
209 static pthread_mutex_t sched_mutex = PTHREAD_MUTEX_INITIALIZER;
210 static pthread_cond_t sched_cond = PTHREAD_COND_INITIALIZER;
211
212 static const struct process *process;
213
214 bool eof;
215 unsigned work_units;
216 unsigned in_slots;
217 unsigned out_slots;
218 unsigned total_in_slots;
219 unsigned total_out_slots;
220 size_t in_granul;
221 size_t out_granul;
222
223 static bool request_close;
224
225 static pthread_t source_thread;
226 static pthread_t sink_thread;
227 static pthread_t *worker_thread;
228
229
230 struct block {
231 void *buffer;
232 size_t size;
233 size_t weight;
234 };
235
236 static struct deque(struct block) output_q;
237 static bool finish;
238
239 unsigned thread_id;
240 /* Highest priority runnable task or NULL if there are no runnable tasks. */
241 static const struct task *next_task;
242
243
244 static void
source_thread_proc(void)245 source_thread_proc(void)
246 {
247 Trace((" source: spawned"));
248
249 for (;;) {
250 void *buffer;
251 size_t vacant, avail;
252
253 xlock(&source_mutex);
254 while (in_slots == 0 && !request_close) {
255 Trace((" source: stalled"));
256 xwait(&source_cond, &source_mutex);
257 }
258
259 if (request_close) {
260 Trace((" source: received premature close requtest"));
261 xunlock(&source_mutex);
262 break;
263 }
264
265 Trace((" source: reading data (%u free slots)", in_slots));
266 in_slots--;
267 xunlock(&source_mutex);
268
269 vacant = in_granul;
270 avail = vacant;
271 buffer = XNMALLOC(vacant, uint8_t);
272 xread(buffer, &vacant);
273 avail -= vacant;
274
275 Trace((" source: block of %u bytes read", (unsigned)avail));
276
277 if (avail == 0u)
278 source_release_buffer(buffer);
279 else
280 process->on_block(buffer, avail);
281
282 if (vacant > 0u)
283 break;
284 }
285
286 sched_lock();
287 eof = 1;
288 sched_unlock();
289
290 Trace((" source: terminating"));
291 }
292
293
294 void
source_release_buffer(void * buffer)295 source_release_buffer(void *buffer)
296 {
297 free(buffer);
298
299 xlock(&source_mutex);
300 if (in_slots++ == 0)
301 xsignal(&source_cond);
302 xunlock(&source_mutex);
303 }
304
305
306 void
source_close(void)307 source_close(void)
308 {
309 xlock(&source_mutex);
310 request_close = true;
311 if (in_slots == 0)
312 xsignal(&source_cond);
313 xunlock(&source_mutex);
314 }
315
316
317 void
sink_write_buffer(void * buffer,size_t size,size_t weight)318 sink_write_buffer(void *buffer, size_t size, size_t weight)
319 {
320 struct block block;
321
322 block.buffer = buffer;
323 block.size = size;
324 block.weight = weight;
325
326 xlock(&sink_mutex);
327 push(output_q, block);
328 xsignal(&sink_cond);
329 xunlock(&sink_mutex);
330 }
331
332
333 static void
sink_thread_proc(void)334 sink_thread_proc(void)
335 {
336 bool progress_enabled;
337 uintmax_t processed;
338 struct timespec start_time;
339 struct timespec next_time;
340 struct timespec update_interval;
341 struct block block;
342 static const double UPDATE_INTERVAL = 0.1;
343
344 Trace((" sink: spawned"));
345
346 /* Progress info is displayed only if all the following conditions are met:
347 1) the user has specified -v or --verbose option
348 2) stderr is connected to a terminal device
349 3) the input file is a regular file
350 4) the input file is nonempty
351 */
352 progress_enabled = (verbose && ispec.size > 0 && isatty(STDERR_FILENO));
353 processed = 0u;
354 gettime(&start_time);
355 next_time = start_time;
356 update_interval = dtotimespec(UPDATE_INTERVAL);
357
358 for (;;) {
359 xlock(&sink_mutex);
360 while (empty(output_q) && !finish) {
361 Trace((" sink: stalled"));
362 xwait(&sink_cond, &sink_mutex);
363 }
364
365 if (empty(output_q))
366 break;
367
368 block = shift(output_q);
369 xunlock(&sink_mutex);
370
371 Trace((" sink: writing data (%u bytes)", (unsigned)block.size));
372 xwrite(block.buffer, block.size);
373 Trace((" sink: releasing output slot"));
374 process->on_written(block.buffer);
375
376 if (progress_enabled) {
377 struct timespec time_now;
378 double completed, elapsed;
379
380 processed = min(processed + block.weight, ispec.size);
381
382 gettime(&time_now);
383
384 if (timespec_cmp(time_now, next_time) > 0) {
385 next_time = timespec_add(time_now, update_interval);
386 elapsed = timespectod(timespec_sub(time_now, start_time));
387 completed = (double)processed / ispec.size;
388
389 if (elapsed < 5)
390 display("progress: %.2f%%\r", 100 * completed);
391 else
392 display("progress: %.2f%%, ETA: %.0f s \r",
393 100 * completed, elapsed * (1 / completed - 1));
394 }
395 }
396 }
397
398 xunlock(&sink_mutex);
399
400 Trace((" sink: terminating"));
401 }
402
403
404 static void
select_task(void)405 select_task(void)
406 {
407 const struct task *task;
408
409 for (task = process->tasks; task->ready != NULL; ++task) {
410 if (task->ready()) {
411 next_task = task;
412 return;
413 }
414 }
415
416 next_task = NULL;
417 }
418
419
420 static void
worker_thread_proc(void)421 worker_thread_proc(void)
422 {
423 unsigned id;
424
425 (void)id;
426
427 xlock(&sched_mutex);
428 Trace(("worker[%2u]: spawned", (id = thread_id++)));
429
430 for (;;) {
431 while (next_task != NULL) {
432 Trace(("worker[%2u]: scheduling task '%s'...", id, next_task->name));
433 next_task->run();
434 select_task();
435 }
436
437 if (process->finished())
438 break;
439
440 Trace(("worker[%2u]: stalled", id));
441 xwait(&sched_cond, &sched_mutex);
442 }
443
444 xbroadcast(&sched_cond);
445 xunlock(&sched_mutex);
446
447 Trace(("worker[%2u]: terminating", id));
448 }
449
450
451 /* Enter scheduler monitor. */
452 void
sched_lock(void)453 sched_lock(void)
454 {
455 xlock(&sched_mutex);
456 }
457
458
459 /* Leave scheduler monitor. */
460 void
sched_unlock(void)461 sched_unlock(void)
462 {
463 select_task();
464
465 if (next_task != NULL || process->finished())
466 xsignal(&sched_cond);
467
468 xunlock(&sched_mutex);
469 }
470
471
472 static void
init_io(void)473 init_io(void)
474 {
475 request_close = false;
476 finish = false;
477 deque_init(output_q, out_slots);
478
479 sink_thread = xcreate(sink_thread_proc);
480 source_thread = xcreate(source_thread_proc);
481 }
482
483
484 static void
uninit_io(void)485 uninit_io(void)
486 {
487 xjoin(source_thread);
488
489 xlock(&sink_mutex);
490 finish = true;
491 xsignal(&sink_cond);
492 xunlock(&sink_mutex);
493
494 xjoin(sink_thread);
495 deque_uninit(output_q);
496 }
497
498
499 static void
primary_thread(void)500 primary_thread(void)
501 {
502 unsigned i;
503
504 thread_id = 0;
505
506 eof = false;
507 in_slots = total_in_slots;
508 out_slots = total_out_slots;
509 work_units = num_worker;
510
511 process->init();
512 select_task();
513 init_io();
514
515 for (i = 1u; i < num_worker; ++i)
516 worker_thread[i] = xcreate(worker_thread_proc);
517
518 worker_thread_proc();
519
520 for (i = 1u; i < num_worker; ++i)
521 xjoin(worker_thread[i]);
522
523 uninit_io();
524 process->uninit();
525
526 assert(eof);
527 assert(in_slots == total_in_slots);
528 assert(out_slots == total_out_slots);
529 assert(work_units == num_worker);
530
531 xraise(SIGUSR2);
532 }
533
534
535 static void
copy_on_input_avail(void * buffer,size_t size)536 copy_on_input_avail(void *buffer, size_t size)
537 {
538 sched_lock();
539 out_slots--;
540 sched_unlock();
541
542 sink_write_buffer(buffer, size, size);
543 }
544
545
546 static void
copy_on_write_complete(void * buffer)547 copy_on_write_complete(void *buffer)
548 {
549 source_release_buffer(buffer);
550
551 sched_lock();
552 out_slots++;
553 sched_unlock();
554 }
555
556
557 static bool
copy_terminate(void)558 copy_terminate(void)
559 {
560 if (eof && out_slots == total_out_slots)
561 xraise(SIGUSR2);
562
563 return false;
564 }
565
566 static void
copy(void)567 copy(void)
568 {
569 static const struct task null_task = { NULL, NULL, NULL };
570
571 static const struct process pseudo_process = {
572 &null_task,
573 NULL,
574 NULL,
575 copy_terminate,
576 copy_on_input_avail,
577 copy_on_write_complete,
578 };
579
580 eof = false;
581 in_slots = 2;
582 out_slots = 2;
583 total_out_slots = 2;
584 in_granul = 65536;
585
586 process = &pseudo_process;
587 init_io();
588 halt();
589 uninit_io();
590 }
591
592
593 static void
schedule(const struct process * proc)594 schedule(const struct process *proc)
595 {
596 process = proc;
597
598 worker_thread = XNMALLOC(num_worker, pthread_t);
599 *worker_thread = xcreate(primary_thread);
600 halt();
601 xjoin(*worker_thread);
602 free(worker_thread);
603 }
604
605
606 /* TODO: Support -m switch */
607 static void
set_memory_constraints(void)608 set_memory_constraints(void)
609 {
610 if (!decompress) {
611 total_in_slots = 2u * num_worker;
612 total_out_slots = 2u * num_worker + /*TRANSM_THRESH*/2;
613 in_granul = bs100k * 100000u;
614 out_granul = -1; /* ignored during compression */
615 }
616 else if (!small) {
617 total_in_slots = 4u * num_worker;
618 total_out_slots = 16u * num_worker;
619 in_granul = 256u * 1024u;
620 out_granul = MAX_BLOCK_SIZE;
621 }
622 else {
623 total_in_slots = 2u;
624 total_out_slots = 2u * num_worker;
625 in_granul = 32768u;
626 out_granul = 900000u;
627 }
628 }
629
630
631 void
work(void)632 work(void)
633 {
634 if (verbose) {
635 info(decompress ? "decompressing %s%s%s to %s%s%s" :
636 "compressing %s%s%s to %s%s%s", ispec.sep, ispec.fmt,
637 ispec.sep, ospec.sep, ospec.fmt, ospec.sep);
638 }
639
640 set_memory_constraints();
641
642 if (!decompress) {
643 schedule(&compression);
644 }
645 else {
646 uint32_t header;
647 size_t vacant = sizeof(header);
648
649 xread(&header, &vacant);
650
651 #define MAGIC(k) (0x425A6830u + (k))
652 if (vacant == 0 && (ntohl(header) >= MAGIC(1) &&
653 ntohl(header) <= MAGIC(9))) {
654 bs100k = ntohl(header) - MAGIC(0);
655 schedule(&expansion);
656 }
657 else if (force && ospec.fd == STDOUT_FILENO) {
658 xwrite(&header, sizeof(header) - vacant);
659 copy();
660 }
661 else {
662 failf(&ispec, "not a valid bzip2 file");
663 }
664 }
665 }
666