1 /*
2 Copyright 2021 Northern.tech AS
3
4 This file is part of CFEngine 3 - written and maintained by Northern.tech AS.
5
6 This program is free software; you can redistribute it and/or modify it
7 under the terms of the GNU General Public License as published by the
8 Free Software Foundation; version 3.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA
18
19 To the extent this program is licensed as part of the Enterprise
20 versions of CFEngine, the applicable Commercial Open Source License
21 (COSL) may apply to this file if you as a licensee so wish it. See
22 included file COSL.txt.
23 */
24
25 #include <platform.h>
26 #include <threaded_deque.h>
27 #include <alloc.h>
28 #include <logging.h>
29 #include <mutex.h>
30 #include <pthread.h>
31
32
33 #define EXPAND_FACTOR 2
34 #define DEFAULT_CAPACITY 16
35
36 /** @struct ThreadedDeque_
37 @brief An implementation of a thread safe deque based on a circular array
38
39 Can push left, push right and give various statistics about its contents,
40 like amount of elements, capacity and if it is empty. Has the ability to
41 block if deque is empty, waiting for new elements to be pushed.
42 */
43 struct ThreadedDeque_ {
44 pthread_mutex_t *lock; /**< Thread lock for accessing data. */
45 pthread_cond_t *cond_non_empty; /**< Blocking condition if empty */
46 pthread_cond_t *cond_empty; /**< Blocking condition if not empty */
47 void (*ItemDestroy) (void *item); /**< Data-specific destroy function. */
48 void **data; /**< Internal array of elements. */
49 size_t left; /**< Current position in deque. */
50 size_t right; /**< Current end of deque. */
51 size_t size; /**< Current size of deque. */
52 size_t capacity; /**< Current memory allocated. */
53 };
54
55 static void DestroyRange(ThreadedDeque *deque, size_t start, size_t end);
56 static void ExpandIfNecessary(ThreadedDeque *deque);
57
ThreadedDequeNew(size_t initial_capacity,void (ItemDestroy)(void * item))58 ThreadedDeque *ThreadedDequeNew(size_t initial_capacity,
59 void (ItemDestroy) (void *item))
60 {
61 ThreadedDeque *deque = xcalloc(1, sizeof(ThreadedDeque));
62
63 if (initial_capacity == 0)
64 {
65 initial_capacity = DEFAULT_CAPACITY;
66 }
67
68 pthread_mutexattr_t attr;
69 pthread_mutexattr_init(&attr);
70 // enables errorchecking for deadlocks
71 int ret = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
72 if (ret != 0)
73 {
74 Log(LOG_LEVEL_ERR,
75 "Failed to use error-checking mutexes for deque, "
76 "falling back to normal ones (pthread_mutexattr_settype: %s)",
77 GetErrorStrFromCode(ret));
78 pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL);
79 }
80
81 deque->lock = xmalloc(sizeof(pthread_mutex_t));
82 ret = pthread_mutex_init(deque->lock, &attr);
83 if (ret != 0)
84 {
85 Log(LOG_LEVEL_ERR,
86 "Failed to initialize mutex (pthread_mutex_init: %s)",
87 GetErrorStrFromCode(ret));
88 pthread_mutexattr_destroy(&attr);
89 free(deque->lock);
90 free(deque);
91 return NULL;
92 }
93
94 pthread_mutexattr_destroy(&attr);
95
96 deque->cond_non_empty = xmalloc(sizeof(pthread_cond_t));
97 ret = pthread_cond_init(deque->cond_non_empty, NULL);
98 if (ret != 0)
99 {
100 Log(LOG_LEVEL_ERR,
101 "Failed to initialize thread condition (pthread_cond_init: %s)",
102 GetErrorStrFromCode(ret));
103 free(deque->lock);
104 free(deque->cond_non_empty);
105 free(deque);
106 return NULL;
107 }
108
109 deque->cond_empty = xmalloc(sizeof(pthread_cond_t));
110 ret = pthread_cond_init(deque->cond_empty, NULL);
111 if (ret != 0)
112 {
113 Log(LOG_LEVEL_ERR,
114 "Failed to initialize thread condition "
115 "(pthread_cond_init: %s)",
116 GetErrorStrFromCode(ret));
117 free(deque->lock);
118 free(deque->cond_empty);
119 free(deque->cond_non_empty);
120 free(deque);
121 return NULL;
122 }
123
124 deque->capacity = initial_capacity;
125 deque->left = 0;
126 deque->right = 0;
127 deque->size = 0;
128 deque->data = xmalloc(sizeof(void *) * initial_capacity);
129 deque->ItemDestroy = ItemDestroy;
130
131 return deque;
132 }
133
ThreadedDequeDestroy(ThreadedDeque * deque)134 void ThreadedDequeDestroy(ThreadedDeque *deque)
135 {
136 if (deque != NULL)
137 {
138 DestroyRange(deque, deque->left, deque->right);
139
140 ThreadedDequeSoftDestroy(deque);
141 }
142 }
143
ThreadedDequeSoftDestroy(ThreadedDeque * deque)144 void ThreadedDequeSoftDestroy(ThreadedDeque *deque)
145 {
146 if (deque != NULL)
147 {
148 if (deque->lock != NULL)
149 {
150 pthread_mutex_destroy(deque->lock);
151 free(deque->lock);
152 }
153
154 if (deque->cond_non_empty != NULL)
155 {
156 pthread_cond_destroy(deque->cond_non_empty);
157 free(deque->cond_non_empty);
158 }
159
160 if (deque->cond_empty != NULL)
161 {
162 pthread_cond_destroy(deque->cond_empty);
163 free(deque->cond_empty);
164 }
165
166 free(deque->data);
167 free(deque);
168 }
169 }
170
ThreadedDequePopLeft(ThreadedDeque * deque,void ** item,int timeout)171 bool ThreadedDequePopLeft(ThreadedDeque *deque,
172 void **item,
173 int timeout)
174 {
175 assert(deque != NULL);
176
177 ThreadLock(deque->lock);
178
179 if (deque->size == 0 && timeout != 0)
180 {
181 int res = 0;
182 do {
183 res = ThreadWait(deque->cond_non_empty, deque->lock, timeout);
184
185 if (res != 0)
186 {
187 /* Lock is reacquired even when timed out, so it needs to be
188 released again. */
189 ThreadUnlock(deque->lock);
190 return false;
191 }
192 } while (deque->size == 0);
193 // Reevaluate predicate to protect against spurious wakeups
194 }
195
196 bool ret = true;
197 if (deque->size > 0)
198 {
199 size_t left = deque->left;
200 *item = deque->data[left];
201
202 deque->data[left++] = NULL;
203
204 left %= deque->capacity;
205 deque->left = left;
206 deque->size--;
207 } else {
208 ret = false;
209 *item = NULL;
210 }
211
212 if (deque->size == 0)
213 {
214 // Signals that the deque is empty for ThreadedDequeWaitEmpty
215 pthread_cond_broadcast(deque->cond_empty);
216 }
217
218 ThreadUnlock(deque->lock);
219
220 return ret;
221 }
222
ThreadedDequePopRight(ThreadedDeque * deque,void ** item,int timeout)223 bool ThreadedDequePopRight(ThreadedDeque *deque,
224 void **item,
225 int timeout)
226 {
227 assert(deque != NULL);
228
229 ThreadLock(deque->lock);
230
231 if (deque->size == 0 && timeout != 0)
232 {
233 int res = 0;
234 do {
235 res = ThreadWait(deque->cond_non_empty, deque->lock, timeout);
236
237 if (res != 0)
238 {
239 /* Lock is reacquired even when timed out, so it needs to be
240 released again. */
241 ThreadUnlock(deque->lock);
242 return false;
243 }
244 } while (deque->size == 0);
245 // Reevaluate predicate to protect against spurious wakeups
246 }
247
248 bool ret = true;
249 if (deque->size > 0)
250 {
251 size_t right = deque->right;
252 right = right == 0 ? deque->capacity - 1 : right - 1;
253
254 *item = deque->data[right];
255 deque->data[right] = NULL;
256
257 deque->right = right;
258 deque->size--;
259 } else {
260 ret = false;
261 *item = NULL;
262 }
263
264 if (deque->size == 0)
265 {
266 // Signals that the deque is empty for ThreadedDequeWaitEmpty
267 pthread_cond_broadcast(deque->cond_empty);
268 }
269
270 ThreadUnlock(deque->lock);
271
272 return ret;
273 }
274
ThreadedDequePopLeftN(ThreadedDeque * deque,void *** data_array,size_t num,int timeout)275 size_t ThreadedDequePopLeftN(ThreadedDeque *deque,
276 void ***data_array,
277 size_t num,
278 int timeout)
279 {
280 assert(deque != NULL);
281
282 ThreadLock(deque->lock);
283
284 if (deque->size == 0 && timeout != 0)
285 {
286 int res = 0;
287 do {
288 res = ThreadWait(deque->cond_non_empty, deque->lock, timeout);
289
290 if (res != 0)
291 {
292 /* Lock is reacquired even when timed out, so it needs to be
293 released again. */
294 ThreadUnlock(deque->lock);
295 *data_array = NULL;
296 return 0;
297 }
298 } while (deque->size == 0);
299 // Reevaluate predicate to protect against spurious wakeups
300 }
301
302 size_t size = num < deque->size ? num : deque->size;
303 void **data = NULL;
304
305 if (size > 0)
306 {
307 data = xcalloc(size, sizeof(void *));
308 size_t left = deque->left;
309
310 for (size_t i = 0; i < size; i++)
311 {
312 data[i] = deque->data[left];
313 deque->data[left++] = NULL;
314 left %= deque->capacity;
315 }
316
317 deque->left = left;
318 deque->size -= size;
319 }
320
321 if (deque->size == 0)
322 {
323 // Signals that the deque is empty for ThreadedDequeWaitEmpty
324 pthread_cond_broadcast(deque->cond_empty);
325 }
326
327 *data_array = data;
328
329 ThreadUnlock(deque->lock);
330
331 return size;
332 }
333
ThreadedDequePopRightN(ThreadedDeque * deque,void *** data_array,size_t num,int timeout)334 size_t ThreadedDequePopRightN(ThreadedDeque *deque,
335 void ***data_array,
336 size_t num,
337 int timeout)
338 {
339 assert(deque != NULL);
340
341 ThreadLock(deque->lock);
342
343 if (deque->size == 0 && timeout != 0)
344 {
345 int res = 0;
346 do {
347 res = ThreadWait(deque->cond_non_empty, deque->lock, timeout);
348
349 if (res != 0)
350 {
351 /* Lock is reacquired even when timed out, so it needs to be
352 released again. */
353 ThreadUnlock(deque->lock);
354 *data_array = NULL;
355 return 0;
356 }
357 } while (deque->size == 0);
358 // Reevaluate predicate to protect against spurious wakeups
359 }
360
361 size_t size = num < deque->size ? num : deque->size;
362 void **data = NULL;
363
364 if (size > 0)
365 {
366 data = xcalloc(size, sizeof(void *));
367 size_t right = deque->right;
368
369 for (size_t i = 0; i < size; i++)
370 {
371 right = right == 0 ? deque->capacity - 1 : right - 1;
372 data[i] = deque->data[right];
373 deque->data[right] = NULL;
374 }
375
376 deque->right = right;
377 deque->size -= size;
378 }
379
380 if (deque->size == 0)
381 {
382 // Signals that the deque is empty for ThreadedDequeWaitEmpty
383 pthread_cond_broadcast(deque->cond_empty);
384 }
385
386 *data_array = data;
387
388 ThreadUnlock(deque->lock);
389
390 return size;
391 }
392
ThreadedDequePushLeft(ThreadedDeque * deque,void * item)393 size_t ThreadedDequePushLeft(ThreadedDeque *deque, void *item)
394 {
395 assert(deque != NULL);
396
397 ThreadLock(deque->lock);
398
399 ExpandIfNecessary(deque);
400
401 deque->left = deque->left == 0 ? deque->capacity - 1 : deque->left - 1;
402 deque->data[deque->left] = item;
403 deque->size++;
404 size_t const size = deque->size;
405 pthread_cond_signal(deque->cond_non_empty);
406
407 ThreadUnlock(deque->lock);
408
409 return size;
410 }
411
ThreadedDequePushRight(ThreadedDeque * deque,void * item)412 size_t ThreadedDequePushRight(ThreadedDeque *deque, void *item)
413 {
414 assert(deque != NULL);
415
416 ThreadLock(deque->lock);
417
418 ExpandIfNecessary(deque);
419
420 deque->data[deque->right++] = item;
421 deque->right %= deque->capacity;
422 deque->size++;
423 size_t const size = deque->size;
424
425 pthread_cond_signal(deque->cond_non_empty);
426
427 ThreadUnlock(deque->lock);
428
429 return size;
430 }
431
ThreadedDequeCount(ThreadedDeque const * deque)432 size_t ThreadedDequeCount(ThreadedDeque const *deque)
433 {
434 assert(deque != NULL);
435
436 ThreadLock(deque->lock);
437 size_t const count = deque->size;
438 ThreadUnlock(deque->lock);
439
440 return count;
441 }
442
ThreadedDequeCapacity(ThreadedDeque const * deque)443 size_t ThreadedDequeCapacity(ThreadedDeque const *deque)
444 {
445 assert(deque != NULL);
446
447 ThreadLock(deque->lock);
448 size_t const capacity = deque->capacity;
449 ThreadUnlock(deque->lock);
450
451 return capacity;
452 }
453
ThreadedDequeIsEmpty(ThreadedDeque const * deque)454 bool ThreadedDequeIsEmpty(ThreadedDeque const *deque)
455 {
456 assert(deque != NULL);
457
458 ThreadLock(deque->lock);
459 bool const empty = (deque->size == 0);
460 ThreadUnlock(deque->lock);
461
462 return empty;
463 }
464
ThreadedDequeWaitEmpty(ThreadedDeque const * deque,int timeout)465 bool ThreadedDequeWaitEmpty(ThreadedDeque const *deque, int timeout)
466 {
467 assert(deque != NULL);
468
469 ThreadLock(deque->lock);
470
471 if (deque->size == 0)
472 {
473 ThreadUnlock(deque->lock);
474 return true;
475 }
476
477 if (timeout == 0)
478 {
479 ThreadUnlock(deque->lock);
480 return false;
481 }
482
483 do {
484 int res = ThreadWait(deque->cond_empty, deque->lock, timeout);
485
486 if (res != 0)
487 {
488 /* Lock is reacquired even when timed out, so it needs to be
489 released again. */
490 ThreadUnlock(deque->lock);
491 return false;
492 }
493 // Reevaluate predicate to protect against spurious wakeups
494 } while (deque->size != 0);
495
496 ThreadUnlock(deque->lock);
497
498 return true;
499 }
500
ThreadedDequeCopy(ThreadedDeque * deque)501 ThreadedDeque *ThreadedDequeCopy(ThreadedDeque *deque)
502 {
503 assert(deque != NULL);
504
505 ThreadLock(deque->lock);
506
507 ThreadedDeque *new_deque = xmemdup(deque, sizeof(ThreadedDeque));
508 new_deque->data = xmalloc(sizeof(void *) * deque->capacity);
509 memcpy(new_deque->data, deque->data,
510 sizeof(void *) * new_deque->capacity);
511
512 ThreadUnlock(deque->lock);
513
514 pthread_mutexattr_t attr;
515 pthread_mutexattr_init(&attr);
516 // enables error checking for deadlocks
517 int ret = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
518 if (ret != 0)
519 {
520 Log(LOG_LEVEL_ERR,
521 "Failed to use error-checking mutexes for deque, "
522 "falling back to normal ones (pthread_mutexattr_settype: %s)",
523 GetErrorStrFromCode(ret));
524 pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL);
525 }
526
527 new_deque->lock = xmalloc(sizeof(pthread_mutex_t));
528 ret = pthread_mutex_init(new_deque->lock, &attr);
529 if (ret != 0)
530 {
531 Log(LOG_LEVEL_ERR,
532 "Failed to initialize mutex (pthread_mutex_init: %s)",
533 GetErrorStrFromCode(ret));
534 pthread_mutexattr_destroy(&attr);
535 free(new_deque->lock);
536 free(new_deque);
537 return NULL;
538 }
539
540 new_deque->cond_non_empty = xmalloc(sizeof(pthread_cond_t));
541 ret = pthread_cond_init(new_deque->cond_non_empty, NULL);
542 if (ret != 0)
543 {
544 Log(LOG_LEVEL_ERR,
545 "Failed to initialize thread condition "
546 "(pthread_cond_init: %s)",
547 GetErrorStrFromCode(ret));
548 free(new_deque->lock);
549 free(new_deque->cond_non_empty);
550 free(new_deque);
551 return NULL;
552 }
553
554 new_deque->cond_empty = xmalloc(sizeof(pthread_cond_t));
555 ret = pthread_cond_init(new_deque->cond_empty, NULL);
556 if (ret != 0)
557 {
558 Log(LOG_LEVEL_ERR,
559 "Failed to initialize thread condition "
560 "(pthread_cond_init: %s)",
561 GetErrorStrFromCode(ret));
562 free(new_deque->lock);
563 free(new_deque->cond_empty);
564 free(new_deque->cond_non_empty);
565 free(new_deque);
566 return NULL;
567 }
568
569 return new_deque;
570 }
571
572 /**
573 @brief Destroys data in range.
574 @warning Assumes that locks are acquired.
575 @note If start == end, this means that all elements in deque will be
576 destroyed. Since the internal array is circular, it will wrap around
577 when reaching the array bounds.
578 @param [in] deque Pointer to struct.
579 @param [in] start Position to start destroying from.
580 @param [in] end First position to not destroy. Can be same as start to
581 destroy the entire allocated area.
582 */
DestroyRange(ThreadedDeque * deque,size_t start,size_t end)583 static void DestroyRange(ThreadedDeque *deque, size_t start, size_t end)
584 {
585 assert(deque != NULL);
586 if (start > deque->capacity || end > deque->capacity)
587 {
588 Log(LOG_LEVEL_DEBUG,
589 "Failed to destroy ThreadedDeque, index greater than capacity: "
590 "start = %zu, end = %zu, capacity = %zu",
591 start, end, deque->capacity);
592 return;
593 }
594
595 if ((deque->ItemDestroy != NULL) && deque->size > 0)
596 {
597 do
598 {
599 deque->ItemDestroy(deque->data[start]);
600 start++;
601 start %= deque->capacity;
602 } while (start != end);
603 }
604 }
605
606 /**
607 @brief Either expands capacity of deque, or shifts tail to beginning.
608 @warning Assumes that locks are acquired.
609 @param [in] deque Pointer to struct.
610 */
ExpandIfNecessary(ThreadedDeque * deque)611 static void ExpandIfNecessary(ThreadedDeque *deque)
612 {
613 assert(deque != NULL);
614 assert(deque->size <= deque->capacity);
615
616 if (deque->size == deque->capacity)
617 {
618 if (deque->right <= deque->left)
619 {
620 size_t old_capacity = deque->capacity;
621
622 deque->capacity *= EXPAND_FACTOR;
623 deque->data = xrealloc(deque->data,
624 sizeof(void *) * deque->capacity);
625
626 /* Move the data that has wrapped around to the newly allocated
627 * part of the deque, since we need a continuous block of memory.
628 * Offset of new placement is `old_capacity`.
629 */
630 memmove(deque->data + old_capacity, deque->data,
631 sizeof(void *) * deque->right);
632
633 deque->right += old_capacity;
634 }
635 else
636 {
637 deque->capacity *= EXPAND_FACTOR;
638 deque->data = xrealloc(deque->data,
639 sizeof(void *) * deque->capacity);
640 }
641 }
642 }
643