1 /**
2 * thread-pipe.c
3 *
4 *
5 * Authors:
6 * Michi <st101564@stud.uni-stuttgart.de>
7 *
8 * Web page: https://ahoi.io/project/oregano
9 *
10 *
11 * This program is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU General Public License as
13 * published by the Free Software Foundation; either version 2 of the
14 * License, or (at your option) any later version.
15 *
16 * This program is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 * General Public License for more details.
20 *
21 * You should have received a copy of the GNU General Public
22 * License along with this program; if not, write to the
23 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
24 * Boston, MA 02110-1301, USA.
25 */
26
27 /*
28 *
29 * BASICS
30 * ------
31 * ThreadPipe can be used to efficiently communicate large
32 * or small data between 2 threads, whereas the data pops up
33 * bit by bit (or at once).
34 * It consists of a chain of data blocks. New
35 * data is appended at the one end of the chain, old data
36 * can be read at the other end of the pipe.
37 *
38 * If there is data (chain has more than 1 chain link), the
39 * reader gets to know this by locking a mutex and asking
40 * the state of a shared variable.
41 *
42 * If all data is read and the reader needs more data, he
43 * will wait until new data is available by locking a mutex
44 * and waiting for a conditional signal of the writer.
45 *
46 *
47 * Locking, unlocking and signaling can slow down the
48 * communication process very hard, if the data blocks are
49 * small. To solve this problem, an additional buffer is
50 * introduced.
51 *
52 * BUFFERED VERSION
53 * ----------------
54 * The problem of unbuffered ThreadPipe is that locking
55 * and unlocking is a very heavy work that should not
56 * be done too frequently. Now if the data blocks pushed
57 * to ThreadPipe are too small, the lock operation is
58 * done very often. However the positive side of small blocks
59 * is the real time ability.
60 *
61 * If you need only weak real time or no real time but an
62 * efficient use of time and money, you should use the
63 * buffered version of thread pipe with large (but not too
64 * large) buffer numbers (you can use a buffer by setting
65 * the buffer numbers in the constructor unequal 1).
66 *
67 * There are some approaches how to realize a buffer in the
68 * case of thread pipe:
69 * - melting incoming blocks together with realloc,
70 * counting the number of bytes or melting operations,
71 * release the block for popping when number is large enough,
72 * - appending incoming blocks together,
73 * counting the number of bytes or/and number of blocks not released,
74 * release the block queue if number is large enough,
75 * - counting time instead of space,
76 * - other shit
77 *
78 * Because blocks are structure elements that can be used to
79 * make the code more efficient AND melting operations
80 * can be heavy work, I think that simply appending some
81 * blocks and not locking/releasing every single block is
82 * a good way to go. Counting time instead of space may be
83 * a good approach for real time applications that want to
84 * be also efficient. But because real time is not demanded
85 * in a non real time simulation like ngspice or gnucap, we don't
86 * need to bother. But you can set the buffer numbers to 1
87 * and then you can use thread pipe in real time applications,
88 * that have far too much nop-time and really small amounts of
89 * data workload.
90 *
91 * Furthermore I have decided to count the number of blocks
92 * and the total size of not released blocks. When one of
93 * the counting numbers are larger than their corresponding
94 * buffer constants, the whole pipe queue is released for
95 * popping (what needs a small locking access).
96 *
97 * You have some possibilities to define the buffer constants:
98 * - define statements in thread-pipe.h
99 * - telling the constructor function what are your wishes
100 *
101 * THE FUTURE
102 * ----------
103 * An enhanced version of this could be an adaptive buffer size
104 * decision maker that detects the frequency of collisions
105 * and uses this information to adapt the optimal buffer size
106 * as a function of time, by extrapolating the behavior of
107 * the program like branch prediction technology that is used in
108 * processors. A good version of this could be real time capable
109 * in low workload times and would be optimal for time, space
110 * and money saving in times of high workload.
111 *
112 */
113
114 #include <glib.h>
115 #include <string.h>
116 #include <stdio.h>
117 #include <unistd.h>
118
119 #include "thread-pipe.h"
120
121 typedef struct _ThreadPipeData ThreadPipeData;
122
123 /**
124 * chain link/chain element
125 */
126 struct _ThreadPipeData {
127 // introduced for efficient string support
128 gpointer malloc_address;
129 // data block
130 gpointer data;
131 // size of data block
132 gsize size;
133 // link to next chain element
134 ThreadPipeData *next;
135 };
136
137 /**
138 * structuring structure
139 */
140 typedef struct {
141 // closing/freeing information
142 gboolean write_eof;
143 gboolean read_eof;
144
145 // buffer state information
146 gsize size_total;
147 gsize block_counter;
148 } ThreadPipeBufferData;
149
150 struct _ThreadPipe {
151 /**
152 * variables of reading thread
153 */
154 ThreadPipeData *read_data;
155 ThreadPipeBufferData read_buffer_data;
156
157 /**
158 * variables of writing thread
159 */
160 ThreadPipeData *write_data;
161 ThreadPipeBufferData write_buffer_data;
162
163 /**
164 * shared variables
165 */
166 ThreadPipeBufferData ready_buffer_data;
167
168 /**
169 * synchronizing variables
170 */
171 GMutex mutex;
172 GCond cond;
173
174 /**
175 * read-only variables
176 */
177 guint max_block_counter;
178 gsize max_size_total;
179 };
180
181 static ThreadPipeData *thread_pipe_data_new(gpointer data, gsize size);
182 static ThreadPipeData *thread_pipe_data_destroy(ThreadPipe *pipe);
183 static void thread_pipe_destroy(ThreadPipe *pipe);
184
185 /**
186 * Creates a new ThreadPipe structure.
187 *
188 * I recommend using ThreadPipes like normal FIFO pipes, whereas one thread
189 * uses only read functions and another thread uses only write functions.
190 * The reading thread should make sure that read_eof will be set,
191 * the writing thread should make sure that write_eof will be set,
192 * because:
193 *
194 * ThreadPipes have 3 possibilities to close:
195 * 1. call thread_pipe_set_write_eof and after that read the pipe to the end
196 * 2. call thread_pipe_set_write_eof and after that call thread_pipe_set_read_eof
197 * 3. call thread_pipe_set_read_eof and after that call thread_pipe_set_write_eof
198 *
199 * Now if the creating thread of the ThreadPipe wants to close the ThreadPipe,
200 * and he did not touch any reading or writing functions, he does not know
201 * whether the pipe has been closed already automatically and will cause a
202 * segmentation fault eventually if he tries.
203 *
204 * @max_buffer_block_counter: 1 is the lowest number (you can't push no data).
205 * If 0, the default value from thread-pipe.h will be used.
206 * @max_buffer_block_counter: 1 is the lowest value (ThreadPipe does not allow
207 * to push blocks of size 0). If 0, the default value from thread-pipe.h
208 * will be used.
209 *
210 * returns a new ThreadPipe structure
211 */
thread_pipe_new(guint max_buffer_block_counter,gsize max_buffer_size_total)212 ThreadPipe *thread_pipe_new(
213 guint max_buffer_block_counter,
214 gsize max_buffer_size_total) {
215 ThreadPipe *thread_pipe = g_new0(ThreadPipe, 1);
216
217 g_mutex_init(&thread_pipe->mutex);
218 g_cond_init(&thread_pipe->cond);
219
220 ThreadPipeData *pipe_data = thread_pipe_data_new(NULL, 0);
221 thread_pipe->read_data = pipe_data;
222 thread_pipe->write_data = pipe_data;
223
224 thread_pipe->max_block_counter = max_buffer_block_counter != 0 ? max_buffer_block_counter : THREAD_PIPE_MAX_BUFFER_BLOCK_COUNTER_DEFAULT;
225 thread_pipe->max_size_total = max_buffer_size_total != 0 ? max_buffer_size_total : THREAD_PIPE_MAX_BUFFER_SIZE_TOTAL_DEFAULT;
226
227 return thread_pipe;
228 }
229
230 /**
231 * Pushes a block of size size to the end of the pipe. The data is copied
232 * to heap.
233 *
234 * Don't push, if you set write_eof already.
235 *
236 * return value will be NULL, if
237 * - read_eof has been set by thread_pipe_set_read_eof or
238 * - write_eof has been set by thread_pipe_set_write_eof (with fail message) or
239 * - pipe is NULL (with fail message).
240 *
241 * If read_eof has been set by thread_pipe_set_read_eof before, you can close
242 * the pipe by setting write_eof and you will save much time and money,
243 * so be sure to check return value every call. But you can also call this
244 * function if it makes no sense. The function recognizes then that
245 * pushing makes no sense and returns fast. The function is not closing the
246 * pipe automatically in this case because it is safer for example if the
247 * programmer does not check the return value.
248 *
249 * returns TRUE or FALSE
250 * - FALSE, if the pipe has been closed by read_eof or other events that
251 * indicate that you can close the pipe by setting write_eof
252 * - TRUE, if the pipe is living further on and it makes sense that it
253 * will live further on
254 */
thread_pipe_push(ThreadPipe * pipe,gpointer data,gsize size)255 gboolean thread_pipe_push(ThreadPipe *pipe, gpointer data, gsize size) {
256 // Give me an object.
257 g_return_val_if_fail(pipe != NULL, FALSE);
258
259 // Don't push, if you set write_eof already.
260 g_return_val_if_fail(pipe->write_buffer_data.write_eof != TRUE, FALSE);
261
262 // Don't push no data to pipe.
263 g_return_val_if_fail(data != NULL, !pipe->write_buffer_data.read_eof);
264 g_return_val_if_fail(size != 0, !pipe->write_buffer_data.read_eof);
265
266 // pipe not active any more because no reader has interest.
267 if (pipe->write_buffer_data.read_eof)
268 return FALSE;
269
270
271 pipe->write_data->next = thread_pipe_data_new(data, size);
272 pipe->write_data = pipe->write_data->next;
273
274 pipe->write_buffer_data.block_counter++;
275 pipe->write_buffer_data.size_total += size;
276
277 if (pipe->write_buffer_data.block_counter < pipe->max_block_counter
278 && pipe->write_buffer_data.size_total < pipe->max_size_total)
279 return TRUE;
280
281
282 g_mutex_lock(&pipe->mutex);
283
284 pipe->ready_buffer_data.block_counter += pipe->write_buffer_data.block_counter;
285 pipe->write_buffer_data.block_counter = 0;
286
287 pipe->ready_buffer_data.size_total += pipe->write_buffer_data.size_total;
288 pipe->write_buffer_data.size_total = 0;
289
290 pipe->write_buffer_data.read_eof = pipe->ready_buffer_data.read_eof;
291
292 g_cond_signal(&pipe->cond);
293 g_mutex_unlock(&pipe->mutex);
294
295 return !pipe->write_buffer_data.read_eof;
296 }
297
298 /**
299 * Reads a block of memory that has been pushed earlier by thread_pipe_push.
300 * If there is no block that has been pushed earlier, this function will
301 * wait until a block will be pushed or write_eof will be set.
302 *
303 * pipe will be destroyed automatically if write_eof has been set && pipe is
304 * empty, so be sure to check return value always!
305 *
306 * The data, stored to data_out, will be freed in the next thread_pipe_pop call,
307 * so be sure to copy the data, if you need it longer than one thread_pipe_pop
308 * cycle.
309 *
310 * Don't pop, if you set read_eof already.
311 *
312 * returns pipe or NULL
313 * - NULL, if the pipe has been destroyed
314 * - pipe, if the pipe is living further on
315 */
thread_pipe_pop(ThreadPipe * pipe,gpointer * data_out,gsize * size)316 ThreadPipe *thread_pipe_pop(ThreadPipe *pipe, gpointer *data_out, gsize *size) {
317 g_return_val_if_fail(pipe != NULL, pipe);
318 g_return_val_if_fail(data_out != NULL, pipe);
319 g_return_val_if_fail(size != NULL, pipe);
320 //Don't pop, if you set read_eof already.
321 g_return_val_if_fail(pipe->read_buffer_data.read_eof != TRUE, NULL);
322
323 *data_out = NULL;
324 *size = 0;
325
326 if (pipe->read_buffer_data.block_counter <= 0) {
327
328 g_mutex_lock(&pipe->mutex);
329
330 while (pipe->ready_buffer_data.block_counter <= 0 && !pipe->ready_buffer_data.write_eof)
331 g_cond_wait(&pipe->cond, &pipe->mutex);
332
333 pipe->read_buffer_data.block_counter = pipe->ready_buffer_data.block_counter;
334 pipe->ready_buffer_data.block_counter = 0;
335
336 pipe->read_buffer_data.size_total = pipe->ready_buffer_data.size_total;
337 pipe->ready_buffer_data.size_total = 0;
338
339 pipe->read_buffer_data.write_eof = pipe->ready_buffer_data.write_eof;
340
341 g_mutex_unlock(&pipe->mutex);
342
343 }
344
345 if (!thread_pipe_data_destroy(pipe)) {
346 thread_pipe_destroy(pipe);
347 return NULL;
348 }
349 *data_out = pipe->read_data->data;
350 *size = pipe->read_data->size;
351
352 return pipe;
353 }
354
355 /**
356 * Reads to the end of a line like fgets and pops it, or reads to the end of pipe.
357 *
358 * size_out will be the length + 1 of the string like strlen.
359 *
360 * Pushed data blocks should be 0 terminated, but don't have to.
361 *
362 * possible independent cases:
363 * - newline at position of block, where position in Po := {nowhere, beginning/middle, end}
364 * - newline at is_first block, where is_first in If := {first, not first}
365 * - newline at is_last block, where is_last in Il := {last, not last}
366 * In total there are 12 pairwise unequal cases by forming the Cartesian product of Po, If and Il
367 * and adding the case where there is no newline at all.
368 *
369 * returns pipe or NULL like thread_pipe_pop
370 */
thread_pipe_pop_line(ThreadPipe * pipe_in,gchar ** string_out,gsize * size_out)371 ThreadPipe *thread_pipe_pop_line(ThreadPipe *pipe_in, gchar **string_out, gsize *size_out) {
372 g_return_val_if_fail(pipe_in != NULL, pipe_in);
373 g_return_val_if_fail(string_out != NULL, pipe_in);
374 g_return_val_if_fail(size_out != NULL, pipe_in);
375 //Don't pop, if you set read_eof already.
376 g_return_val_if_fail(pipe_in->read_buffer_data.read_eof != TRUE, NULL);
377
378
379 *string_out = NULL;
380 *size_out = 0;
381
382 size_t line_size;
383 gchar *line;
384 FILE *line_file = open_memstream(&line, &line_size);
385
386 ThreadPipeData *current = NULL;
387 gchar *ptr = NULL;
388
389 while (TRUE) {
390
391 ptr = NULL;
392
393 if (pipe_in->read_buffer_data.block_counter <= 0) {
394
395 g_mutex_lock(&pipe_in->mutex);
396
397 while (pipe_in->ready_buffer_data.block_counter <= 0 && !pipe_in->ready_buffer_data.write_eof)
398 g_cond_wait(&pipe_in->cond, &pipe_in->mutex);
399
400 pipe_in->read_buffer_data.block_counter = pipe_in->ready_buffer_data.block_counter;
401 pipe_in->ready_buffer_data.block_counter = 0;
402
403 pipe_in->read_buffer_data.size_total = pipe_in->ready_buffer_data.size_total;
404 pipe_in->ready_buffer_data.size_total = 0;
405
406 pipe_in->read_buffer_data.write_eof = pipe_in->ready_buffer_data.write_eof;
407
408 g_mutex_unlock(&pipe_in->mutex);
409
410 }
411
412 current = pipe_in->read_data->next;
413
414
415 if (current == NULL)
416 break;
417
418 ptr = current->data;
419 while (*ptr != '\n' &&
420 *ptr != 0 &&
421 ptr - (gchar *)current->data < current->size //somebody forgot to close the string with 0?
422 ) {
423 fputc(*ptr, line_file);
424 ptr++;
425 }
426
427 if (ptr - (gchar *)current->data < current->size && *ptr == '\n') {
428 fputc(*ptr, line_file);
429 ptr++;
430 break;
431 }
432
433 thread_pipe_data_destroy(pipe_in);
434 }
435
436 fputc(0, line_file);
437 fclose(line_file);
438
439 if (current == NULL) {
440
441 if (line_size == 1) {
442 g_free(line);
443 thread_pipe_destroy(pipe_in);
444 return NULL;
445 }
446
447 } else {
448
449 gchar **current_data = (gchar **)¤t->data;
450 gsize *current_size = ¤t->size;
451
452 *current_size -= (ptr - *current_data);
453 pipe_in->read_buffer_data.size_total -= (ptr - *current_data);
454 *current_data = ptr;
455
456 if (*current_size == 0 || (*current_size == 1 && *ptr == 0))
457 thread_pipe_data_destroy(pipe_in);
458
459 }
460
461 /**
462 * current == NULL && line_size != 1
463 * ||
464 * current != NULL
465 */
466
467 gchar **old_data = (gchar **)&pipe_in->read_data->malloc_address;
468 gsize *old_size = &pipe_in->read_data->size;
469
470 g_free(*old_data);
471 *old_data = line;
472 *old_size = line_size;
473
474 *string_out = *old_data;
475 *size_out = *old_size;
476
477 return pipe_in;
478 }
479
480 /**
481 * If you are finished with writing, you have to set write_eof so that the
482 * memory of pipe can be freed. You can set write_eof independent of
483 * - read_eof
484 * - emptiness of pipe
485 * - other shit
486 *
487 * Don't push, after you called thread_pipe_set_write_eof.
488 *
489 * The memory of data and pipe will be freed, if read_eof && write_eof == TRUE.
490 */
thread_pipe_set_write_eof(ThreadPipe * pipe)491 void thread_pipe_set_write_eof(ThreadPipe *pipe) {
492 g_return_if_fail(pipe != NULL);
493 g_return_if_fail(pipe->write_buffer_data.write_eof != TRUE);
494
495 g_mutex_lock(&pipe->mutex);
496 gboolean destroy = pipe->ready_buffer_data.read_eof;
497
498 pipe->ready_buffer_data.write_eof = TRUE;
499 // pipe->read_buffer_data.write_eof = TRUE;
500 pipe->write_buffer_data.write_eof = TRUE;
501
502 pipe->ready_buffer_data.block_counter += pipe->write_buffer_data.block_counter;
503 pipe->write_buffer_data.block_counter = 0;
504
505 pipe->ready_buffer_data.size_total += pipe->write_buffer_data.size_total;
506 pipe->write_buffer_data.size_total = 0;
507
508 g_cond_signal(&pipe->cond);
509 g_mutex_unlock(&pipe->mutex);
510
511 if (destroy)
512 thread_pipe_destroy(pipe);
513 }
514
515 /**
516 * If you are finished with reading, you have to set read_eof so that the
517 * memory of pipe can be freed. You can set read_eof independent of
518 * - write_eof
519 * - emptiness of pipe
520 * - other shit
521 *
522 * Don't pop, after you called thread_pipe_set_read_eof.
523 *
524 * The memory of data waiting in pipe to be popped, will all be freed.
525 *
526 * The memory of pipe will be freed also, if write_eof && write_eof == TRUE.
527 */
thread_pipe_set_read_eof(ThreadPipe * pipe)528 void thread_pipe_set_read_eof(ThreadPipe *pipe) {
529 g_return_if_fail(pipe != NULL);
530 g_return_if_fail(pipe->read_buffer_data.read_eof != TRUE);
531
532 g_mutex_lock(&pipe->mutex);
533 gboolean destroy = pipe->ready_buffer_data.write_eof;
534 pipe->ready_buffer_data.read_eof = TRUE;
535 pipe->read_buffer_data.read_eof = TRUE;
536 // pipe->write_buffer_data.read_eof = TRUE;
537
538 pipe->read_buffer_data.write_eof = pipe->ready_buffer_data.write_eof;
539
540 pipe->read_buffer_data.block_counter += pipe->ready_buffer_data.block_counter;
541 pipe->ready_buffer_data.block_counter = 0;
542
543 pipe->read_buffer_data.size_total += pipe->ready_buffer_data.size_total;
544 pipe->ready_buffer_data.size_total = 0;
545
546 while (pipe->read_buffer_data.block_counter)
547 thread_pipe_data_destroy(pipe);
548 g_mutex_unlock(&pipe->mutex);
549
550 if (destroy)
551 thread_pipe_destroy(pipe);
552 }
553
554 /**
555 * copy data to a new ThreadPipeData structure
556 */
thread_pipe_data_new(gpointer data,gsize size)557 static ThreadPipeData *thread_pipe_data_new(gpointer data, gsize size) {
558 ThreadPipeData *pipe_data = g_new0(ThreadPipeData, 1);
559 if (data != NULL && size != 0) {
560 pipe_data->malloc_address = g_malloc(size);
561 memcpy(pipe_data->malloc_address, data, size);
562 pipe_data->data = pipe_data->malloc_address;
563 pipe_data->size = size;
564 }
565
566 return pipe_data;
567 }
568
569 /**
570 * free all memory of a ThreadPipeData structure
571 *
572 * returns
573 * - the next block of the linked list, if there is one
574 * - else NULL
575 */
thread_pipe_data_destroy(ThreadPipe * pipe)576 static ThreadPipeData *thread_pipe_data_destroy(ThreadPipe *pipe) {
577 ThreadPipeData *pipe_data = pipe->read_data;
578 ThreadPipeData *next = pipe_data->next;
579 g_free(pipe_data->malloc_address);
580 g_free(pipe_data);
581 if (next != NULL) {
582 pipe->read_buffer_data.block_counter--;
583 pipe->read_buffer_data.size_total -= next->size;
584 }
585 pipe->read_data = next;
586 return next;
587 }
588
589 /**
590 * free all memory of a ThreadPipe structure
591 */
thread_pipe_destroy(ThreadPipe * pipe)592 static void thread_pipe_destroy(ThreadPipe *pipe) {
593 g_return_if_fail(pipe != NULL);
594
595 while (pipe->read_data)
596 thread_pipe_data_destroy(pipe);
597 g_mutex_clear(&pipe->mutex);
598 g_cond_clear(&pipe->cond);
599 g_free(pipe);
600 }
601