1 /**
2  * thread-pipe.c
3  *
4  *
5  * Authors:
6  *  Michi <st101564@stud.uni-stuttgart.de>
7  *
8  * Web page: https://ahoi.io/project/oregano
9  *
10  *
11  * This program is free software; you can redistribute it and/or
12  * modify it under the terms of the GNU General Public License as
13  * published by the Free Software Foundation; either version 2 of the
14  * License, or (at your option) any later version.
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19  * General Public License for more details.
20  *
21  * You should have received a copy of the GNU General Public
22  * License along with this program; if not, write to the
23  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
24  * Boston, MA 02110-1301, USA.
25  */
26 
27 /*
28  *
29  * BASICS
30  * ------
31  * ThreadPipe can be used to efficiently communicate large
32  * or small data between 2 threads, whereas the data pops up
33  * bit by bit (or at once).
34  * It consists of a chain of data blocks. New
35  * data is appended at the one end of the chain, old data
36  * can be read at the other end of the pipe.
37  *
38  * If there is data (chain has more than 1 chain link), the
39  * reader gets to know this by locking a mutex and asking
40  * the state of a shared variable.
41  *
42  * If all data is read and the reader needs more data, he
43  * will wait until new data is available by locking a mutex
44  * and waiting for a conditional signal of the writer.
45  *
46  *
47  * Locking, unlocking and signaling can slow down the
48  * communication process very hard, if the data blocks are
49  * small. To solve this problem, an additional buffer is
50  * introduced.
51  *
52  * BUFFERED VERSION
53  * ----------------
54  * The problem of unbuffered ThreadPipe is that locking
55  * and unlocking is a very heavy work that should not
56  * be done too frequently. Now if the data blocks pushed
57  * to ThreadPipe are too small, the lock operation is
58  * done very often. However the positive side of small blocks
59  * is the real time ability.
60  *
61  * If you need only weak real time or no real time but an
62  * efficient use of time and money, you should use the
63  * buffered version of thread pipe with large (but not too
64  * large) buffer numbers (you can use a buffer by setting
65  * the buffer numbers in the constructor unequal 1).
66  *
67  * There are some approaches how to realize a buffer in the
68  * case of thread pipe:
69  * - melting incoming blocks together with realloc,
70  * counting the number of bytes or melting operations,
71  * release the block for popping when number is large enough,
72  * - appending incoming blocks together,
73  * counting the number of bytes or/and number of blocks not released,
74  * release the block queue if number is large enough,
75  * - counting time instead of space,
76  * - other shit
77  *
78  * Because blocks are structure elements that can be used to
79  * make the code more efficient AND melting operations
80  * can be heavy work, I think that simply appending some
81  * blocks and not locking/releasing every single block is
82  * a good way to go. Counting time instead of space may be
83  * a good approach for real time applications that want to
84  * be also efficient. But because real time is not demanded
85  * in a non real time simulation like ngspice or gnucap, we don't
86  * need to bother. But you can set the buffer numbers to 1
87  * and then you can use thread pipe in real time applications,
88  * that have far too much nop-time and really small amounts of
89  * data workload.
90  *
91  * Furthermore I have decided to count the number of blocks
92  * and the total size of not released blocks. When one of
93  * the counting numbers are larger than their corresponding
94  * buffer constants, the whole pipe queue is released for
95  * popping (what needs a small locking access).
96  *
97  * You have some possibilities to define the buffer constants:
98  * - define statements in thread-pipe.h
99  * - telling the constructor function what are your wishes
100  *
101  * THE FUTURE
102  * ----------
103  * An enhanced version of this could be an adaptive buffer size
104  * decision maker that detects the frequency of collisions
105  * and uses this information to adapt the optimal buffer size
106  * as a function of time, by extrapolating the behavior of
107  * the program like branch prediction technology that is used in
108  * processors. A good version of this could be real time capable
109  * in low workload times and would be optimal for time, space
110  * and money saving in times of high workload.
111  *
112  */
113 
114 #include <glib.h>
115 #include <string.h>
116 #include <stdio.h>
117 #include <unistd.h>
118 
119 #include "thread-pipe.h"
120 
121 typedef struct _ThreadPipeData ThreadPipeData;
122 
123 /**
124  * chain link/chain element
125  */
126 struct _ThreadPipeData {
127 	// introduced for efficient string support
128 	gpointer malloc_address;
129 	// data block
130 	gpointer data;
131 	// size of data block
132 	gsize size;
133 	// link to next chain element
134 	ThreadPipeData *next;
135 };
136 
137 /**
138  * structuring structure
139  */
140 typedef struct {
141 	// closing/freeing information
142 	gboolean write_eof;
143 	gboolean read_eof;
144 
145 	// buffer state information
146 	gsize size_total;
147 	gsize block_counter;
148 } ThreadPipeBufferData;
149 
150 struct _ThreadPipe {
151 	/**
152 	 * variables of reading thread
153 	 */
154 	ThreadPipeData *read_data;
155 	ThreadPipeBufferData read_buffer_data;
156 
157 	/**
158 	 * variables of writing thread
159 	 */
160 	ThreadPipeData *write_data;
161 	ThreadPipeBufferData write_buffer_data;
162 
163 	/**
164 	 * shared variables
165 	 */
166 	ThreadPipeBufferData ready_buffer_data;
167 
168 	/**
169 	 * synchronizing variables
170 	 */
171 	GMutex mutex;
172 	GCond cond;
173 
174 	/**
175 	 * read-only variables
176 	 */
177 	guint max_block_counter;
178 	gsize max_size_total;
179 };
180 
181 static ThreadPipeData *thread_pipe_data_new(gpointer data, gsize size);
182 static ThreadPipeData *thread_pipe_data_destroy(ThreadPipe *pipe);
183 static void thread_pipe_destroy(ThreadPipe *pipe);
184 
185 /**
186  * Creates a new ThreadPipe structure.
187  *
188  * I recommend using ThreadPipes like normal FIFO pipes, whereas one thread
189  * uses only read functions and another thread uses only write functions.
190  * The reading thread should make sure that read_eof will be set,
191  * the writing thread should make sure that write_eof will be set,
192  * because:
193  *
194  * ThreadPipes have 3 possibilities to close:
195  * 1. call thread_pipe_set_write_eof and after that read the pipe to the end
196  * 2. call thread_pipe_set_write_eof and after that call thread_pipe_set_read_eof
197  * 3. call thread_pipe_set_read_eof and after that call thread_pipe_set_write_eof
198  *
199  * Now if the creating thread of the ThreadPipe wants to close the ThreadPipe,
200  * and he did not touch any reading or writing functions, he does not know
201  * whether the pipe has been closed already automatically and will cause a
202  * segmentation fault eventually if he tries.
203  *
204  * @max_buffer_block_counter: 1 is the lowest number (you can't push no data).
205  * If 0, the default value from thread-pipe.h will be used.
206  * @max_buffer_block_counter: 1 is the lowest value (ThreadPipe does not allow
207  * to push blocks of size 0). If 0, the default value from thread-pipe.h
208  * will be used.
209  *
210  * returns a new ThreadPipe structure
211  */
thread_pipe_new(guint max_buffer_block_counter,gsize max_buffer_size_total)212 ThreadPipe *thread_pipe_new(
213 		guint max_buffer_block_counter,
214 		gsize max_buffer_size_total) {
215 	ThreadPipe *thread_pipe = g_new0(ThreadPipe, 1);
216 
217 	g_mutex_init(&thread_pipe->mutex);
218 	g_cond_init(&thread_pipe->cond);
219 
220 	ThreadPipeData *pipe_data = thread_pipe_data_new(NULL, 0);
221 	thread_pipe->read_data = pipe_data;
222 	thread_pipe->write_data = pipe_data;
223 
224 	thread_pipe->max_block_counter = max_buffer_block_counter != 0 ? max_buffer_block_counter : THREAD_PIPE_MAX_BUFFER_BLOCK_COUNTER_DEFAULT;
225 	thread_pipe->max_size_total = max_buffer_size_total != 0 ? max_buffer_size_total : THREAD_PIPE_MAX_BUFFER_SIZE_TOTAL_DEFAULT;
226 
227 	return thread_pipe;
228 }
229 
230 /**
231  * Pushes a block of size size to the end of the pipe. The data is copied
232  * to heap.
233  *
234  * Don't push, if you set write_eof already.
235  *
236  * return value will be NULL, if
237  * - read_eof has been set by thread_pipe_set_read_eof or
238  * - write_eof has been set by thread_pipe_set_write_eof (with fail message) or
239  * - pipe is NULL (with fail message).
240  *
241  * If read_eof has been set by thread_pipe_set_read_eof before, you can close
242  * the pipe by setting write_eof and you will save much time and money,
243  * so be sure to check return value every call. But you can also call this
244  * function if it makes no sense. The function recognizes then that
245  * pushing makes no sense and returns fast. The function is not closing the
246  * pipe automatically in this case because it is safer for example if the
247  * programmer does not check the return value.
248  *
249  * returns TRUE or FALSE
250  * - FALSE, if the pipe has been closed by read_eof or other events that
251  * indicate that you can close the pipe by setting write_eof
252  * - TRUE, if the pipe is living further on and it makes sense that it
253  * will live further on
254  */
thread_pipe_push(ThreadPipe * pipe,gpointer data,gsize size)255 gboolean thread_pipe_push(ThreadPipe *pipe, gpointer data, gsize size) {
256 	// Give me an object.
257 	g_return_val_if_fail(pipe != NULL, FALSE);
258 
259 	// Don't push, if you set write_eof already.
260 	g_return_val_if_fail(pipe->write_buffer_data.write_eof != TRUE, FALSE);
261 
262 	// Don't push no data to pipe.
263 	g_return_val_if_fail(data != NULL, !pipe->write_buffer_data.read_eof);
264 	g_return_val_if_fail(size != 0, !pipe->write_buffer_data.read_eof);
265 
266 	// pipe not active any more because no reader has interest.
267 	if (pipe->write_buffer_data.read_eof)
268 		return FALSE;
269 
270 
271 	pipe->write_data->next = thread_pipe_data_new(data, size);
272 	pipe->write_data = pipe->write_data->next;
273 
274 	pipe->write_buffer_data.block_counter++;
275 	pipe->write_buffer_data.size_total += size;
276 
277 	if (pipe->write_buffer_data.block_counter < pipe->max_block_counter
278 			&& pipe->write_buffer_data.size_total < pipe->max_size_total)
279 		return TRUE;
280 
281 
282 	g_mutex_lock(&pipe->mutex);
283 
284 	pipe->ready_buffer_data.block_counter += pipe->write_buffer_data.block_counter;
285 	pipe->write_buffer_data.block_counter = 0;
286 
287 	pipe->ready_buffer_data.size_total += pipe->write_buffer_data.size_total;
288 	pipe->write_buffer_data.size_total = 0;
289 
290 	pipe->write_buffer_data.read_eof = pipe->ready_buffer_data.read_eof;
291 
292 	g_cond_signal(&pipe->cond);
293 	g_mutex_unlock(&pipe->mutex);
294 
295 	return !pipe->write_buffer_data.read_eof;
296 }
297 
298 /**
299  * Reads a block of memory that has been pushed earlier by thread_pipe_push.
300  * If there is no block that has been pushed earlier, this function will
301  * wait until a block will be pushed or write_eof will be set.
302  *
303  * pipe will be destroyed automatically if write_eof has been set && pipe is
304  * empty, so be sure to check return value always!
305  *
306  * The data, stored to data_out, will be freed in the next thread_pipe_pop call,
307  * so be sure to copy the data, if you need it longer than one thread_pipe_pop
308  * cycle.
309  *
310  * Don't pop, if you set read_eof already.
311  *
312  * returns pipe or NULL
313  * - NULL, if the pipe has been destroyed
314  * - pipe, if the pipe is living further on
315  */
thread_pipe_pop(ThreadPipe * pipe,gpointer * data_out,gsize * size)316 ThreadPipe *thread_pipe_pop(ThreadPipe *pipe, gpointer *data_out, gsize *size) {
317 	g_return_val_if_fail(pipe != NULL, pipe);
318 	g_return_val_if_fail(data_out != NULL, pipe);
319 	g_return_val_if_fail(size != NULL, pipe);
320 	//Don't pop, if you set read_eof already.
321 	g_return_val_if_fail(pipe->read_buffer_data.read_eof != TRUE, NULL);
322 
323 	*data_out = NULL;
324 	*size = 0;
325 
326 	if (pipe->read_buffer_data.block_counter <= 0) {
327 
328 		g_mutex_lock(&pipe->mutex);
329 
330 		while (pipe->ready_buffer_data.block_counter <= 0 && !pipe->ready_buffer_data.write_eof)
331 			g_cond_wait(&pipe->cond, &pipe->mutex);
332 
333 		pipe->read_buffer_data.block_counter = pipe->ready_buffer_data.block_counter;
334 		pipe->ready_buffer_data.block_counter = 0;
335 
336 		pipe->read_buffer_data.size_total = pipe->ready_buffer_data.size_total;
337 		pipe->ready_buffer_data.size_total = 0;
338 
339 		pipe->read_buffer_data.write_eof = pipe->ready_buffer_data.write_eof;
340 
341 		g_mutex_unlock(&pipe->mutex);
342 
343 	}
344 
345 	if (!thread_pipe_data_destroy(pipe)) {
346 		thread_pipe_destroy(pipe);
347 		return NULL;
348 	}
349 	*data_out = pipe->read_data->data;
350 	*size = pipe->read_data->size;
351 
352 	return pipe;
353 }
354 
355 /**
356  * Reads to the end of a line like fgets and pops it, or reads to the end of pipe.
357  *
358  * size_out will be the length + 1 of the string like strlen.
359  *
360  * Pushed data blocks should be 0 terminated, but don't have to.
361  *
362  * possible independent cases:
363  * - newline at position of block, where position in Po := {nowhere, beginning/middle, end}
364  * - newline at is_first block, where is_first in If := {first, not first}
365  * - newline at is_last block, where is_last in Il := {last, not last}
366  * In total there are 12 pairwise unequal cases by forming the Cartesian product of Po, If and Il
367  * and adding the case where there is no newline at all.
368  *
369  * returns pipe or NULL like thread_pipe_pop
370  */
thread_pipe_pop_line(ThreadPipe * pipe_in,gchar ** string_out,gsize * size_out)371 ThreadPipe *thread_pipe_pop_line(ThreadPipe *pipe_in, gchar **string_out, gsize *size_out) {
372 	g_return_val_if_fail(pipe_in != NULL, pipe_in);
373 	g_return_val_if_fail(string_out != NULL, pipe_in);
374 	g_return_val_if_fail(size_out != NULL, pipe_in);
375 	//Don't pop, if you set read_eof already.
376 	g_return_val_if_fail(pipe_in->read_buffer_data.read_eof != TRUE, NULL);
377 
378 
379 	*string_out = NULL;
380 	*size_out = 0;
381 
382 	size_t line_size;
383 	gchar *line;
384 	FILE *line_file = open_memstream(&line, &line_size);
385 
386 	ThreadPipeData *current = NULL;
387 	gchar *ptr = NULL;
388 
389 	while (TRUE) {
390 
391 		ptr = NULL;
392 
393 		if (pipe_in->read_buffer_data.block_counter <= 0) {
394 
395 			g_mutex_lock(&pipe_in->mutex);
396 
397 			while (pipe_in->ready_buffer_data.block_counter <= 0 && !pipe_in->ready_buffer_data.write_eof)
398 				g_cond_wait(&pipe_in->cond, &pipe_in->mutex);
399 
400 			pipe_in->read_buffer_data.block_counter = pipe_in->ready_buffer_data.block_counter;
401 			pipe_in->ready_buffer_data.block_counter = 0;
402 
403 			pipe_in->read_buffer_data.size_total = pipe_in->ready_buffer_data.size_total;
404 			pipe_in->ready_buffer_data.size_total = 0;
405 
406 			pipe_in->read_buffer_data.write_eof = pipe_in->ready_buffer_data.write_eof;
407 
408 			g_mutex_unlock(&pipe_in->mutex);
409 
410 		}
411 
412 		current = pipe_in->read_data->next;
413 
414 
415 		if (current == NULL)
416 			break;
417 
418 		ptr = current->data;
419 		while (*ptr != '\n' &&
420 		       *ptr != 0 &&
421 		       ptr - (gchar *)current->data < current->size	//somebody forgot to close the string with 0?
422 			   ) {
423 			fputc(*ptr, line_file);
424 			ptr++;
425 		}
426 
427 		if (ptr - (gchar *)current->data < current->size && *ptr == '\n') {
428 			fputc(*ptr, line_file);
429 			ptr++;
430 			break;
431 		}
432 
433 		thread_pipe_data_destroy(pipe_in);
434 	}
435 
436 	fputc(0, line_file);
437 	fclose(line_file);
438 
439 	if (current == NULL) {
440 
441 		if (line_size == 1) {
442 			g_free(line);
443 			thread_pipe_destroy(pipe_in);
444 			return NULL;
445 		}
446 
447 	} else {
448 
449 		gchar **current_data = (gchar **)&current->data;
450 		gsize *current_size = &current->size;
451 
452 		*current_size -= (ptr - *current_data);
453 		pipe_in->read_buffer_data.size_total -= (ptr - *current_data);
454 		*current_data = ptr;
455 
456 		if (*current_size == 0 || (*current_size == 1 && *ptr == 0))
457 			thread_pipe_data_destroy(pipe_in);
458 
459 	}
460 
461 	/**
462 	 * current == NULL && line_size != 1
463 	 * ||
464 	 * current != NULL
465 	 */
466 
467 	gchar **old_data = (gchar **)&pipe_in->read_data->malloc_address;
468 	gsize *old_size = &pipe_in->read_data->size;
469 
470 	g_free(*old_data);
471 	*old_data = line;
472 	*old_size = line_size;
473 
474 	*string_out = *old_data;
475 	*size_out = *old_size;
476 
477 	return pipe_in;
478 }
479 
480 /**
481  * If you are finished with writing, you have to set write_eof so that the
482  * memory of pipe can be freed. You can set write_eof independent of
483  * - read_eof
484  * - emptiness of pipe
485  * - other shit
486  *
487  * Don't push, after you called thread_pipe_set_write_eof.
488  *
489  * The memory of data and pipe will be freed, if read_eof && write_eof == TRUE.
490  */
thread_pipe_set_write_eof(ThreadPipe * pipe)491 void thread_pipe_set_write_eof(ThreadPipe *pipe) {
492 	g_return_if_fail(pipe != NULL);
493 	g_return_if_fail(pipe->write_buffer_data.write_eof != TRUE);
494 
495 	g_mutex_lock(&pipe->mutex);
496 	gboolean destroy = pipe->ready_buffer_data.read_eof;
497 
498 	pipe->ready_buffer_data.write_eof = TRUE;
499 //	pipe->read_buffer_data.write_eof = TRUE;
500 	pipe->write_buffer_data.write_eof = TRUE;
501 
502 	pipe->ready_buffer_data.block_counter += pipe->write_buffer_data.block_counter;
503 	pipe->write_buffer_data.block_counter = 0;
504 
505 	pipe->ready_buffer_data.size_total += pipe->write_buffer_data.size_total;
506 	pipe->write_buffer_data.size_total = 0;
507 
508 	g_cond_signal(&pipe->cond);
509 	g_mutex_unlock(&pipe->mutex);
510 
511 	if (destroy)
512 		thread_pipe_destroy(pipe);
513 }
514 
515 /**
516  * If you are finished with reading, you have to set read_eof so that the
517  * memory of pipe can be freed. You can set read_eof independent of
518  * - write_eof
519  * - emptiness of pipe
520  * - other shit
521  *
522  * Don't pop, after you called thread_pipe_set_read_eof.
523  *
524  * The memory of data waiting in pipe to be popped, will all be freed.
525  *
526  * The memory of pipe will be freed also, if write_eof && write_eof == TRUE.
527  */
thread_pipe_set_read_eof(ThreadPipe * pipe)528 void thread_pipe_set_read_eof(ThreadPipe *pipe) {
529 	g_return_if_fail(pipe != NULL);
530 	g_return_if_fail(pipe->read_buffer_data.read_eof != TRUE);
531 
532 	g_mutex_lock(&pipe->mutex);
533 	gboolean destroy = pipe->ready_buffer_data.write_eof;
534 	pipe->ready_buffer_data.read_eof = TRUE;
535 	pipe->read_buffer_data.read_eof = TRUE;
536 //	pipe->write_buffer_data.read_eof = TRUE;
537 
538 	pipe->read_buffer_data.write_eof = pipe->ready_buffer_data.write_eof;
539 
540 	pipe->read_buffer_data.block_counter += pipe->ready_buffer_data.block_counter;
541 	pipe->ready_buffer_data.block_counter = 0;
542 
543 	pipe->read_buffer_data.size_total += pipe->ready_buffer_data.size_total;
544 	pipe->ready_buffer_data.size_total = 0;
545 
546 	while (pipe->read_buffer_data.block_counter)
547 		thread_pipe_data_destroy(pipe);
548 	g_mutex_unlock(&pipe->mutex);
549 
550 	if (destroy)
551 		thread_pipe_destroy(pipe);
552 }
553 
554 /**
555  * copy data to a new ThreadPipeData structure
556  */
thread_pipe_data_new(gpointer data,gsize size)557 static ThreadPipeData *thread_pipe_data_new(gpointer data, gsize size) {
558 	ThreadPipeData *pipe_data = g_new0(ThreadPipeData, 1);
559 	if (data != NULL && size != 0) {
560 		pipe_data->malloc_address = g_malloc(size);
561 		memcpy(pipe_data->malloc_address, data, size);
562 		pipe_data->data = pipe_data->malloc_address;
563 		pipe_data->size = size;
564 	}
565 
566 	return pipe_data;
567 }
568 
569 /**
570  * free all memory of a ThreadPipeData structure
571  *
572  * returns
573  * - the next block of the linked list, if there is one
574  * - else NULL
575  */
thread_pipe_data_destroy(ThreadPipe * pipe)576 static ThreadPipeData *thread_pipe_data_destroy(ThreadPipe *pipe) {
577 	ThreadPipeData *pipe_data = pipe->read_data;
578 	ThreadPipeData *next = pipe_data->next;
579 	g_free(pipe_data->malloc_address);
580 	g_free(pipe_data);
581 	if (next != NULL) {
582 		pipe->read_buffer_data.block_counter--;
583 		pipe->read_buffer_data.size_total -= next->size;
584 	}
585 	pipe->read_data = next;
586 	return next;
587 }
588 
589 /**
590  * free all memory of a ThreadPipe structure
591  */
thread_pipe_destroy(ThreadPipe * pipe)592 static void thread_pipe_destroy(ThreadPipe *pipe) {
593 	g_return_if_fail(pipe != NULL);
594 
595 	while (pipe->read_data)
596 		thread_pipe_data_destroy(pipe);
597 	g_mutex_clear(&pipe->mutex);
598 	g_cond_clear(&pipe->cond);
599 	g_free(pipe);
600 }
601