1 /*-
2   process.c -- priority scheduling
3 
4   Copyright (C) 2012 Mikolaj Izdebski
5 
6   This file is part of lbzip2.
7 
8   lbzip2 is free software: you can redistribute it and/or modify
9   it under the terms of the GNU General Public License as published by
10   the Free Software Foundation, either version 3 of the License, or
11   (at your option) any later version.
12 
13   lbzip2 is distributed in the hope that it will be useful,
14   but WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16   GNU General Public License for more details.
17 
18   You should have received a copy of the GNU General Public License
19   along with lbzip2.  If not, see <http://www.gnu.org/licenses/>.
20 */
21 
22 #include "common.h"
23 
24 #include <arpa/inet.h>          /* ntohl() */
25 #include <pthread.h>            /* pthread_t */
26 #include <signal.h>             /* SIGUSR2 */
27 #include <unistd.h>             /* write() */
28 
29 #include "timespec.h"           /* struct timespec */
30 #include "main.h"               /* work() */
31 
32 #include "process.h"            /* struct process */
33 #include "signals.h"            /* halt() */
34 
35 
36 /*
37   JOB SCHEDULING
38 
39   Jobs of different importance for the whole process are divided into several
40   categories. Each category has a distinct priority assigned to it. Those
41   priorities are static -- they were defined at lbzip2 design time and they
42   can't change during run time. Any job can be scheduled only if there are no
43   available jobs of higher priority. Therefore the primary scheduling
44   algorithm of lbzip2 is static priority scheduling.
45 
46   The secondary algorithm is EDF (Earliest Deadline First). If two jobs of the
47   same priority are to be scheduled (which implies they fall to the same job
48   category) their scheduling order is determined basing on their underlying
49   blocks order within the bzip2 file. Blocks that need to be outputed sooner
50   have respectively higher priority than blocks that can be written later.
51 
52   The scheduler maintains several priority queues. Each queue contains jobs of
53   the same type, ordered by their deadline. When there is some free computing
54   power available (that is a worker thread is idle), the scheduler picks the
55   first non-empty priority queue of the highest priority and removes from it
56   the job of the earliest deadline. This job is then passed to the worker
57   thread, which is executing it.
58 
59   The EDF algorithm is proven to be optimal in single-worker configuration.
60   In this case it uses the least possible amount of resources (time, memory).
61   In case of two or more worker threads much more resources may be needed, but
62   the overall time should be no longer than in the case with one worker thread.
63 */
64 
65 
66 /* Error-checking POSIX thread macros.
67 
68    For most pthread functions, non-zero return value means a programming bug.
69    In this case aborting seems wiser than printing error message and exiting
70    because abort() can produce code dumps that can be useful in debugging.
71 */
72 #define xjoin(t)      ((void)(pthread_join((t), NULL)    && (abort(), 0)))
73 #define xlock(m)      ((void)(pthread_mutex_lock(m)      && (abort(), 0)))
74 #define xunlock(m)    ((void)(pthread_mutex_unlock(m)    && (abort(), 0)))
75 #define xwait(c,m)    ((void)(pthread_cond_wait((c),(m)) && (abort(), 0)))
76 #define xsignal(c)    ((void)(pthread_cond_signal(c)     && (abort(), 0)))
77 #define xbroadcast(c) ((void)(pthread_cond_broadcast(c)  && (abort(), 0)))
78 
79 static void *
thread_entry(void * real_entry)80 thread_entry(void *real_entry)
81 {
82   ((void (*)(void))real_entry)();
83   return NULL;
84 }
85 
86 /* Create a POSIX thread with error checking. */
87 static pthread_t
xcreate(void (* entry)(void))88 xcreate(void (*entry)(void))
89 {
90   int err;
91   pthread_t thread;
92 
93   err = pthread_create(&thread, NULL, thread_entry, entry);
94   if (err != 0)
95     failx(err, "unable to create a POSIX thread");
96 
97   return thread;
98 }
99 
100 
101 void
xread(void * vbuf,size_t * vacant)102 xread(void *vbuf, size_t *vacant)
103 {
104   char *buffer = vbuf;
105 
106   assert(*vacant > 0);
107 
108   do {
109     ssize_t rd;
110 
111     rd = read(ispec.fd, buffer, *vacant > (size_t)SSIZE_MAX ?
112               (size_t)SSIZE_MAX : *vacant);
113 
114     /* End of file. */
115     if (0 == rd)
116       break;
117 
118     /* Read error. */
119     if (-1 == rd) {
120       failfx(&ispec, errno, "read()");
121     }
122 
123     *vacant -= (size_t)rd;
124     buffer += (size_t)rd;
125     ispec.total += (size_t)rd;
126   }
127   while (*vacant > 0);
128 }
129 
130 void
xwrite(const void * vbuf,size_t size)131 xwrite(const void *vbuf, size_t size)
132 {
133   const char *buffer = vbuf;
134 
135   ospec.total += size;
136 
137   if (size > 0 && ospec.fd != -1) {
138     do {
139       ssize_t wr;
140 
141       wr = write(ospec.fd, buffer, size > (size_t)SSIZE_MAX ?
142                  (size_t)SSIZE_MAX : size);
143 
144       /* Write error. */
145       if (-1 == wr) {
146         failfx(&ospec, errno, "write()");
147       }
148 
149       size -= (size_t)wr;
150       buffer += (size_t)wr;
151     }
152     while (size > 0);
153   }
154 }
155 
156 
157 /* Parent and left child indices. */
158 #define parent(i) (((i)-1)/2)
159 #define left(i) ((i)*2+1)
160 
161 void
up_heap(void * vroot,unsigned size)162 up_heap(void *vroot, unsigned size)
163 {
164   struct position **root = vroot;
165   struct position *el;
166   unsigned j;
167 
168   j = size;
169   el = root[j];
170 
171   while (j > 0 && pos_lt(*el, *root[parent(j)])) {
172     root[j] = root[parent(j)];
173     j = parent(j);
174   }
175 
176   root[j] = el;
177 }
178 
179 void
down_heap(void * vroot,unsigned size)180 down_heap(void *vroot, unsigned size)
181 {
182   struct position **root = vroot;
183   struct position *el;
184   unsigned j;
185 
186   el = root[size];
187   root[size] = root[0];
188 
189   j = 0;
190   while (left(j) < size) {
191     unsigned child = left(j);
192 
193     if (child + 1 < size && pos_lt(*root[child + 1], *root[child]))
194       child++;
195     if (pos_le(*el, *root[child]))
196       break;
197     root[j] = root[child];
198     j = child;
199   }
200 
201   root[j] = el;
202 }
203 
204 
205 static pthread_mutex_t source_mutex = PTHREAD_MUTEX_INITIALIZER;
206 static pthread_cond_t  source_cond  = PTHREAD_COND_INITIALIZER;
207 static pthread_mutex_t sink_mutex   = PTHREAD_MUTEX_INITIALIZER;
208 static pthread_cond_t  sink_cond    = PTHREAD_COND_INITIALIZER;
209 static pthread_mutex_t sched_mutex  = PTHREAD_MUTEX_INITIALIZER;
210 static pthread_cond_t  sched_cond   = PTHREAD_COND_INITIALIZER;
211 
212 static const struct process *process;
213 
214 bool eof;
215 unsigned work_units;
216 unsigned in_slots;
217 unsigned out_slots;
218 unsigned total_in_slots;
219 unsigned total_out_slots;
220 size_t in_granul;
221 size_t out_granul;
222 
223 static bool request_close;
224 
225 static pthread_t source_thread;
226 static pthread_t sink_thread;
227 static pthread_t *worker_thread;
228 
229 
230 struct block {
231   void *buffer;
232   size_t size;
233   size_t weight;
234 };
235 
236 static struct deque(struct block) output_q;
237 static bool finish;
238 
239 unsigned thread_id;
240 /* Highest priority runnable task or NULL if there are no runnable tasks. */
241 static const struct task *next_task;
242 
243 
244 static void
source_thread_proc(void)245 source_thread_proc(void)
246 {
247   Trace(("    source: spawned"));
248 
249   for (;;) {
250     void *buffer;
251     size_t vacant, avail;
252 
253     xlock(&source_mutex);
254     while (in_slots == 0 && !request_close) {
255       Trace(("    source: stalled"));
256       xwait(&source_cond, &source_mutex);
257     }
258 
259     if (request_close) {
260       Trace(("    source: received premature close requtest"));
261       xunlock(&source_mutex);
262       break;
263     }
264 
265     Trace(("    source: reading data (%u free slots)", in_slots));
266     in_slots--;
267     xunlock(&source_mutex);
268 
269     vacant = in_granul;
270     avail = vacant;
271     buffer = XNMALLOC(vacant, uint8_t);
272     xread(buffer, &vacant);
273     avail -= vacant;
274 
275     Trace(("    source: block of %u bytes read", (unsigned)avail));
276 
277     if (avail == 0u)
278       source_release_buffer(buffer);
279     else
280       process->on_block(buffer, avail);
281 
282     if (vacant > 0u)
283       break;
284   }
285 
286   sched_lock();
287   eof = 1;
288   sched_unlock();
289 
290   Trace(("    source: terminating"));
291 }
292 
293 
294 void
source_release_buffer(void * buffer)295 source_release_buffer(void *buffer)
296 {
297   free(buffer);
298 
299   xlock(&source_mutex);
300   if (in_slots++ == 0)
301     xsignal(&source_cond);
302   xunlock(&source_mutex);
303 }
304 
305 
306 void
source_close(void)307 source_close(void)
308 {
309   xlock(&source_mutex);
310   request_close = true;
311   if (in_slots == 0)
312     xsignal(&source_cond);
313   xunlock(&source_mutex);
314 }
315 
316 
317 void
sink_write_buffer(void * buffer,size_t size,size_t weight)318 sink_write_buffer(void *buffer, size_t size, size_t weight)
319 {
320   struct block block;
321 
322   block.buffer = buffer;
323   block.size = size;
324   block.weight = weight;
325 
326   xlock(&sink_mutex);
327   push(output_q, block);
328   xsignal(&sink_cond);
329   xunlock(&sink_mutex);
330 }
331 
332 
333 static void
sink_thread_proc(void)334 sink_thread_proc(void)
335 {
336   bool progress_enabled;
337   uintmax_t processed;
338   struct timespec start_time;
339   struct timespec next_time;
340   struct timespec update_interval;
341   struct block block;
342   static const double UPDATE_INTERVAL = 0.1;
343 
344   Trace(("      sink: spawned"));
345 
346   /* Progress info is displayed only if all the following conditions are met:
347      1) the user has specified -v or --verbose option
348      2) stderr is connected to a terminal device
349      3) the input file is a regular file
350      4) the input file is nonempty
351    */
352   progress_enabled = (verbose && ispec.size > 0 && isatty(STDERR_FILENO));
353   processed = 0u;
354   gettime(&start_time);
355   next_time = start_time;
356   update_interval = dtotimespec(UPDATE_INTERVAL);
357 
358   for (;;) {
359     xlock(&sink_mutex);
360     while (empty(output_q) && !finish) {
361       Trace(("      sink: stalled"));
362       xwait(&sink_cond, &sink_mutex);
363     }
364 
365     if (empty(output_q))
366       break;
367 
368     block = shift(output_q);
369     xunlock(&sink_mutex);
370 
371     Trace(("      sink: writing data (%u bytes)", (unsigned)block.size));
372     xwrite(block.buffer, block.size);
373     Trace(("      sink: releasing output slot"));
374     process->on_written(block.buffer);
375 
376     if (progress_enabled) {
377       struct timespec time_now;
378       double completed, elapsed;
379 
380       processed = min(processed + block.weight, ispec.size);
381 
382       gettime(&time_now);
383 
384       if (timespec_cmp(time_now, next_time) > 0) {
385         next_time = timespec_add(time_now, update_interval);
386         elapsed = timespectod(timespec_sub(time_now, start_time));
387         completed = (double)processed / ispec.size;
388 
389         if (elapsed < 5)
390           display("progress: %.2f%%\r", 100 * completed);
391         else
392           display("progress: %.2f%%, ETA: %.0f s    \r",
393                   100 * completed, elapsed * (1 / completed - 1));
394       }
395     }
396   }
397 
398   xunlock(&sink_mutex);
399 
400   Trace(("      sink: terminating"));
401 }
402 
403 
404 static void
select_task(void)405 select_task(void)
406 {
407   const struct task *task;
408 
409   for (task = process->tasks; task->ready != NULL; ++task) {
410     if (task->ready()) {
411       next_task = task;
412       return;
413     }
414   }
415 
416   next_task = NULL;
417 }
418 
419 
420 static void
worker_thread_proc(void)421 worker_thread_proc(void)
422 {
423   unsigned id;
424 
425   (void)id;
426 
427   xlock(&sched_mutex);
428   Trace(("worker[%2u]: spawned", (id = thread_id++)));
429 
430   for (;;) {
431     while (next_task != NULL) {
432       Trace(("worker[%2u]: scheduling task '%s'...", id, next_task->name));
433       next_task->run();
434       select_task();
435     }
436 
437     if (process->finished())
438       break;
439 
440     Trace(("worker[%2u]: stalled", id));
441     xwait(&sched_cond, &sched_mutex);
442   }
443 
444   xbroadcast(&sched_cond);
445   xunlock(&sched_mutex);
446 
447   Trace(("worker[%2u]: terminating", id));
448 }
449 
450 
451 /* Enter scheduler monitor. */
452 void
sched_lock(void)453 sched_lock(void)
454 {
455   xlock(&sched_mutex);
456 }
457 
458 
459 /* Leave scheduler monitor. */
460 void
sched_unlock(void)461 sched_unlock(void)
462 {
463   select_task();
464 
465   if (next_task != NULL || process->finished())
466     xsignal(&sched_cond);
467 
468   xunlock(&sched_mutex);
469 }
470 
471 
472 static void
init_io(void)473 init_io(void)
474 {
475   request_close = false;
476   finish = false;
477   deque_init(output_q, out_slots);
478 
479   sink_thread = xcreate(sink_thread_proc);
480   source_thread = xcreate(source_thread_proc);
481 }
482 
483 
484 static void
uninit_io(void)485 uninit_io(void)
486 {
487   xjoin(source_thread);
488 
489   xlock(&sink_mutex);
490   finish = true;
491   xsignal(&sink_cond);
492   xunlock(&sink_mutex);
493 
494   xjoin(sink_thread);
495   deque_uninit(output_q);
496 }
497 
498 
499 static void
primary_thread(void)500 primary_thread(void)
501 {
502   unsigned i;
503 
504   thread_id = 0;
505 
506   eof = false;
507   in_slots = total_in_slots;
508   out_slots = total_out_slots;
509   work_units = num_worker;
510 
511   process->init();
512   select_task();
513   init_io();
514 
515   for (i = 1u; i < num_worker; ++i)
516     worker_thread[i] = xcreate(worker_thread_proc);
517 
518   worker_thread_proc();
519 
520   for (i = 1u; i < num_worker; ++i)
521     xjoin(worker_thread[i]);
522 
523   uninit_io();
524   process->uninit();
525 
526   assert(eof);
527   assert(in_slots == total_in_slots);
528   assert(out_slots == total_out_slots);
529   assert(work_units == num_worker);
530 
531   xraise(SIGUSR2);
532 }
533 
534 
535 static void
copy_on_input_avail(void * buffer,size_t size)536 copy_on_input_avail(void *buffer, size_t size)
537 {
538   sched_lock();
539   out_slots--;
540   sched_unlock();
541 
542   sink_write_buffer(buffer, size, size);
543 }
544 
545 
546 static void
copy_on_write_complete(void * buffer)547 copy_on_write_complete(void *buffer)
548 {
549   source_release_buffer(buffer);
550 
551   sched_lock();
552   out_slots++;
553   sched_unlock();
554 }
555 
556 
557 static bool
copy_terminate(void)558 copy_terminate(void)
559 {
560   if (eof && out_slots == total_out_slots)
561     xraise(SIGUSR2);
562 
563   return false;
564 }
565 
566 static void
copy(void)567 copy(void)
568 {
569   static const struct task null_task = { NULL, NULL, NULL };
570 
571   static const struct process pseudo_process = {
572     &null_task,
573     NULL,
574     NULL,
575     copy_terminate,
576     copy_on_input_avail,
577     copy_on_write_complete,
578   };
579 
580   eof = false;
581   in_slots = 2;
582   out_slots = 2;
583   total_out_slots = 2;
584   in_granul = 65536;
585 
586   process = &pseudo_process;
587   init_io();
588   halt();
589   uninit_io();
590 }
591 
592 
593 static void
schedule(const struct process * proc)594 schedule(const struct process *proc)
595 {
596   process = proc;
597 
598   worker_thread = XNMALLOC(num_worker, pthread_t);
599   *worker_thread = xcreate(primary_thread);
600   halt();
601   xjoin(*worker_thread);
602   free(worker_thread);
603 }
604 
605 
606 /* TODO: Support -m switch */
607 static void
set_memory_constraints(void)608 set_memory_constraints(void)
609 {
610   if (!decompress) {
611     total_in_slots = 2u * num_worker;
612     total_out_slots = 2u * num_worker + /*TRANSM_THRESH*/2;
613     in_granul = bs100k * 100000u;
614     out_granul = -1;            /* ignored during compression */
615   }
616   else if (!small) {
617     total_in_slots = 4u * num_worker;
618     total_out_slots = 16u * num_worker;
619     in_granul = 256u * 1024u;
620     out_granul = MAX_BLOCK_SIZE;
621   }
622   else {
623     total_in_slots = 2u;
624     total_out_slots = 2u * num_worker;
625     in_granul = 32768u;
626     out_granul = 900000u;
627   }
628 }
629 
630 
631 void
work(void)632 work(void)
633 {
634   if (verbose) {
635     info(decompress ? "decompressing %s%s%s to %s%s%s" :
636          "compressing %s%s%s to %s%s%s", ispec.sep, ispec.fmt,
637          ispec.sep, ospec.sep, ospec.fmt, ospec.sep);
638   }
639 
640   set_memory_constraints();
641 
642   if (!decompress) {
643     schedule(&compression);
644   }
645   else {
646     uint32_t header;
647     size_t vacant = sizeof(header);
648 
649     xread(&header, &vacant);
650 
651 #define MAGIC(k) (0x425A6830u + (k))
652     if (vacant == 0 && (ntohl(header) >= MAGIC(1) &&
653                         ntohl(header) <= MAGIC(9))) {
654       bs100k = ntohl(header) - MAGIC(0);
655       schedule(&expansion);
656     }
657     else if (force && ospec.fd == STDOUT_FILENO) {
658       xwrite(&header, sizeof(header) - vacant);
659       copy();
660     }
661     else {
662       failf(&ispec, "not a valid bzip2 file");
663     }
664   }
665 }
666