1 /*
2   Copyright 2021 Northern.tech AS
3 
4   This file is part of CFEngine 3 - written and maintained by Northern.tech AS.
5 
6   This program is free software; you can redistribute it and/or modify it
7   under the terms of the GNU General Public License as published by the
8   Free Software Foundation; version 3.
9 
10   This program is distributed in the hope that it will be useful,
11   but WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   GNU General Public License for more details.
14 
15   You should have received a copy of the GNU General Public License
16   along with this program; if not, write to the Free Software
17   Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA
18 
19   To the extent this program is licensed as part of the Enterprise
20   versions of CFEngine, the applicable Commercial Open Source License
21   (COSL) may apply to this file if you as a licensee so wish it. See
22   included file COSL.txt.
23 */
24 
25 #include <platform.h>
26 #include <threaded_deque.h>
27 #include <alloc.h>
28 #include <logging.h>
29 #include <mutex.h>
30 #include <pthread.h>
31 
32 
33 #define EXPAND_FACTOR     2
34 #define DEFAULT_CAPACITY 16
35 
36 /** @struct ThreadedDeque_
37   @brief An implementation of a thread safe deque based on a circular array
38 
39   Can push left, push right and give various statistics about its contents,
40   like amount of elements, capacity and if it is empty. Has the ability to
41   block if deque is empty, waiting for new elements to be pushed.
42   */
43 struct ThreadedDeque_ {
44     pthread_mutex_t *lock;            /**< Thread lock for accessing data. */
45     pthread_cond_t *cond_non_empty;   /**< Blocking condition if empty     */
46     pthread_cond_t *cond_empty;       /**< Blocking condition if not empty */
47     void (*ItemDestroy) (void *item); /**< Data-specific destroy function. */
48     void **data;                      /**< Internal array of elements.     */
49     size_t left;                      /**< Current position in deque.      */
50     size_t right;                     /**< Current end of deque.           */
51     size_t size;                      /**< Current size of deque.          */
52     size_t capacity;                  /**< Current memory allocated.       */
53 };
54 
55 static void DestroyRange(ThreadedDeque *deque, size_t start, size_t end);
56 static void ExpandIfNecessary(ThreadedDeque *deque);
57 
ThreadedDequeNew(size_t initial_capacity,void (ItemDestroy)(void * item))58 ThreadedDeque *ThreadedDequeNew(size_t initial_capacity,
59                                 void (ItemDestroy) (void *item))
60 {
61     ThreadedDeque *deque = xcalloc(1, sizeof(ThreadedDeque));
62 
63     if (initial_capacity == 0)
64     {
65         initial_capacity = DEFAULT_CAPACITY;
66     }
67 
68     pthread_mutexattr_t attr;
69     pthread_mutexattr_init(&attr);
70     // enables errorchecking for deadlocks
71     int ret = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
72     if (ret != 0)
73     {
74         Log(LOG_LEVEL_ERR,
75             "Failed to use error-checking mutexes for deque, "
76             "falling back to normal ones (pthread_mutexattr_settype: %s)",
77             GetErrorStrFromCode(ret));
78         pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL);
79     }
80 
81     deque->lock = xmalloc(sizeof(pthread_mutex_t));
82     ret = pthread_mutex_init(deque->lock, &attr);
83     if (ret != 0)
84     {
85         Log(LOG_LEVEL_ERR,
86             "Failed to initialize mutex (pthread_mutex_init: %s)",
87             GetErrorStrFromCode(ret));
88         pthread_mutexattr_destroy(&attr);
89         free(deque->lock);
90         free(deque);
91         return NULL;
92     }
93 
94     pthread_mutexattr_destroy(&attr);
95 
96     deque->cond_non_empty = xmalloc(sizeof(pthread_cond_t));
97     ret = pthread_cond_init(deque->cond_non_empty, NULL);
98     if (ret != 0)
99     {
100         Log(LOG_LEVEL_ERR,
101             "Failed to initialize thread condition (pthread_cond_init: %s)",
102             GetErrorStrFromCode(ret));
103         free(deque->lock);
104         free(deque->cond_non_empty);
105         free(deque);
106         return NULL;
107     }
108 
109     deque->cond_empty = xmalloc(sizeof(pthread_cond_t));
110     ret = pthread_cond_init(deque->cond_empty, NULL);
111     if (ret != 0)
112     {
113         Log(LOG_LEVEL_ERR,
114             "Failed to initialize thread condition "
115             "(pthread_cond_init: %s)",
116             GetErrorStrFromCode(ret));
117         free(deque->lock);
118         free(deque->cond_empty);
119         free(deque->cond_non_empty);
120         free(deque);
121         return NULL;
122     }
123 
124     deque->capacity = initial_capacity;
125     deque->left = 0;
126     deque->right = 0;
127     deque->size = 0;
128     deque->data = xmalloc(sizeof(void *) * initial_capacity);
129     deque->ItemDestroy = ItemDestroy;
130 
131     return deque;
132 }
133 
ThreadedDequeDestroy(ThreadedDeque * deque)134 void ThreadedDequeDestroy(ThreadedDeque *deque)
135 {
136     if (deque != NULL)
137     {
138         DestroyRange(deque, deque->left, deque->right);
139 
140         ThreadedDequeSoftDestroy(deque);
141     }
142 }
143 
ThreadedDequeSoftDestroy(ThreadedDeque * deque)144 void ThreadedDequeSoftDestroy(ThreadedDeque *deque)
145 {
146     if (deque != NULL)
147     {
148         if (deque->lock != NULL)
149         {
150             pthread_mutex_destroy(deque->lock);
151             free(deque->lock);
152         }
153 
154         if (deque->cond_non_empty != NULL)
155         {
156             pthread_cond_destroy(deque->cond_non_empty);
157             free(deque->cond_non_empty);
158         }
159 
160         if (deque->cond_empty != NULL)
161         {
162             pthread_cond_destroy(deque->cond_empty);
163             free(deque->cond_empty);
164         }
165 
166         free(deque->data);
167         free(deque);
168     }
169 }
170 
ThreadedDequePopLeft(ThreadedDeque * deque,void ** item,int timeout)171 bool ThreadedDequePopLeft(ThreadedDeque *deque,
172                           void **item,
173                           int timeout)
174 {
175     assert(deque != NULL);
176 
177     ThreadLock(deque->lock);
178 
179     if (deque->size == 0 && timeout != 0)
180     {
181         int res = 0;
182         do {
183             res = ThreadWait(deque->cond_non_empty, deque->lock, timeout);
184 
185             if (res != 0)
186             {
187                 /* Lock is reacquired even when timed out, so it needs to be
188                    released again. */
189                 ThreadUnlock(deque->lock);
190                 return false;
191             }
192         } while (deque->size == 0);
193         // Reevaluate predicate to protect against spurious wakeups
194     }
195 
196     bool ret = true;
197     if (deque->size > 0)
198     {
199         size_t left = deque->left;
200         *item = deque->data[left];
201 
202         deque->data[left++] = NULL;
203 
204         left %= deque->capacity;
205         deque->left = left;
206         deque->size--;
207     } else {
208         ret = false;
209         *item = NULL;
210     }
211 
212     if (deque->size == 0)
213     {
214         // Signals that the deque is empty for ThreadedDequeWaitEmpty
215         pthread_cond_broadcast(deque->cond_empty);
216     }
217 
218     ThreadUnlock(deque->lock);
219 
220     return ret;
221 }
222 
ThreadedDequePopRight(ThreadedDeque * deque,void ** item,int timeout)223 bool ThreadedDequePopRight(ThreadedDeque *deque,
224                            void **item,
225                            int timeout)
226 {
227     assert(deque != NULL);
228 
229     ThreadLock(deque->lock);
230 
231     if (deque->size == 0 && timeout != 0)
232     {
233         int res = 0;
234         do {
235             res = ThreadWait(deque->cond_non_empty, deque->lock, timeout);
236 
237             if (res != 0)
238             {
239                 /* Lock is reacquired even when timed out, so it needs to be
240                    released again. */
241                 ThreadUnlock(deque->lock);
242                 return false;
243             }
244         } while (deque->size == 0);
245         // Reevaluate predicate to protect against spurious wakeups
246     }
247 
248     bool ret = true;
249     if (deque->size > 0)
250     {
251         size_t right = deque->right;
252         right = right == 0 ? deque->capacity - 1 : right - 1;
253 
254         *item = deque->data[right];
255         deque->data[right] = NULL;
256 
257         deque->right = right;
258         deque->size--;
259     } else {
260         ret = false;
261         *item = NULL;
262     }
263 
264     if (deque->size == 0)
265     {
266         // Signals that the deque is empty for ThreadedDequeWaitEmpty
267         pthread_cond_broadcast(deque->cond_empty);
268     }
269 
270     ThreadUnlock(deque->lock);
271 
272     return ret;
273 }
274 
ThreadedDequePopLeftN(ThreadedDeque * deque,void *** data_array,size_t num,int timeout)275 size_t ThreadedDequePopLeftN(ThreadedDeque *deque,
276                              void ***data_array,
277                              size_t num,
278                              int timeout)
279 {
280     assert(deque != NULL);
281 
282     ThreadLock(deque->lock);
283 
284     if (deque->size == 0 && timeout != 0)
285     {
286         int res = 0;
287         do {
288             res = ThreadWait(deque->cond_non_empty, deque->lock, timeout);
289 
290             if (res != 0)
291             {
292                 /* Lock is reacquired even when timed out, so it needs to be
293                    released again. */
294                 ThreadUnlock(deque->lock);
295                 *data_array = NULL;
296                 return 0;
297             }
298         } while (deque->size == 0);
299         // Reevaluate predicate to protect against spurious wakeups
300     }
301 
302     size_t size = num < deque->size ? num : deque->size;
303     void **data = NULL;
304 
305     if (size > 0)
306     {
307         data = xcalloc(size, sizeof(void *));
308         size_t left = deque->left;
309 
310         for (size_t i = 0; i < size; i++)
311         {
312             data[i] = deque->data[left];
313             deque->data[left++] = NULL;
314             left %= deque->capacity;
315         }
316 
317         deque->left = left;
318         deque->size -= size;
319     }
320 
321     if (deque->size == 0)
322     {
323         // Signals that the deque is empty for ThreadedDequeWaitEmpty
324         pthread_cond_broadcast(deque->cond_empty);
325     }
326 
327     *data_array = data;
328 
329     ThreadUnlock(deque->lock);
330 
331     return size;
332 }
333 
ThreadedDequePopRightN(ThreadedDeque * deque,void *** data_array,size_t num,int timeout)334 size_t ThreadedDequePopRightN(ThreadedDeque *deque,
335                               void ***data_array,
336                               size_t num,
337                               int timeout)
338 {
339     assert(deque != NULL);
340 
341     ThreadLock(deque->lock);
342 
343     if (deque->size == 0 && timeout != 0)
344     {
345         int res = 0;
346         do {
347             res = ThreadWait(deque->cond_non_empty, deque->lock, timeout);
348 
349             if (res != 0)
350             {
351                 /* Lock is reacquired even when timed out, so it needs to be
352                    released again. */
353                 ThreadUnlock(deque->lock);
354                 *data_array = NULL;
355                 return 0;
356             }
357         } while (deque->size == 0);
358         // Reevaluate predicate to protect against spurious wakeups
359     }
360 
361     size_t size = num < deque->size ? num : deque->size;
362     void **data = NULL;
363 
364     if (size > 0)
365     {
366         data = xcalloc(size, sizeof(void *));
367         size_t right = deque->right;
368 
369         for (size_t i = 0; i < size; i++)
370         {
371             right = right == 0 ? deque->capacity - 1 : right - 1;
372             data[i] = deque->data[right];
373             deque->data[right] = NULL;
374         }
375 
376         deque->right = right;
377         deque->size -= size;
378     }
379 
380     if (deque->size == 0)
381     {
382         // Signals that the deque is empty for ThreadedDequeWaitEmpty
383         pthread_cond_broadcast(deque->cond_empty);
384     }
385 
386     *data_array = data;
387 
388     ThreadUnlock(deque->lock);
389 
390     return size;
391 }
392 
ThreadedDequePushLeft(ThreadedDeque * deque,void * item)393 size_t ThreadedDequePushLeft(ThreadedDeque *deque, void *item)
394 {
395     assert(deque != NULL);
396 
397     ThreadLock(deque->lock);
398 
399     ExpandIfNecessary(deque);
400 
401     deque->left = deque->left == 0 ? deque->capacity - 1 : deque->left - 1;
402     deque->data[deque->left] = item;
403     deque->size++;
404     size_t const size = deque->size;
405     pthread_cond_signal(deque->cond_non_empty);
406 
407     ThreadUnlock(deque->lock);
408 
409     return size;
410 }
411 
ThreadedDequePushRight(ThreadedDeque * deque,void * item)412 size_t ThreadedDequePushRight(ThreadedDeque *deque, void *item)
413 {
414     assert(deque != NULL);
415 
416     ThreadLock(deque->lock);
417 
418     ExpandIfNecessary(deque);
419 
420     deque->data[deque->right++] = item;
421     deque->right %= deque->capacity;
422     deque->size++;
423     size_t const size = deque->size;
424 
425     pthread_cond_signal(deque->cond_non_empty);
426 
427     ThreadUnlock(deque->lock);
428 
429     return size;
430 }
431 
ThreadedDequeCount(ThreadedDeque const * deque)432 size_t ThreadedDequeCount(ThreadedDeque const *deque)
433 {
434     assert(deque != NULL);
435 
436     ThreadLock(deque->lock);
437     size_t const count = deque->size;
438     ThreadUnlock(deque->lock);
439 
440     return count;
441 }
442 
ThreadedDequeCapacity(ThreadedDeque const * deque)443 size_t ThreadedDequeCapacity(ThreadedDeque const *deque)
444 {
445     assert(deque != NULL);
446 
447     ThreadLock(deque->lock);
448     size_t const capacity = deque->capacity;
449     ThreadUnlock(deque->lock);
450 
451     return capacity;
452 }
453 
ThreadedDequeIsEmpty(ThreadedDeque const * deque)454 bool ThreadedDequeIsEmpty(ThreadedDeque const *deque)
455 {
456     assert(deque != NULL);
457 
458     ThreadLock(deque->lock);
459     bool const empty = (deque->size == 0);
460     ThreadUnlock(deque->lock);
461 
462     return empty;
463 }
464 
ThreadedDequeWaitEmpty(ThreadedDeque const * deque,int timeout)465 bool ThreadedDequeWaitEmpty(ThreadedDeque const *deque, int timeout)
466 {
467     assert(deque != NULL);
468 
469     ThreadLock(deque->lock);
470 
471     if (deque->size == 0)
472     {
473         ThreadUnlock(deque->lock);
474         return true;
475     }
476 
477     if (timeout == 0)
478     {
479         ThreadUnlock(deque->lock);
480         return false;
481     }
482 
483     do {
484         int res = ThreadWait(deque->cond_empty, deque->lock, timeout);
485 
486         if (res != 0)
487         {
488             /* Lock is reacquired even when timed out, so it needs to be
489                released again. */
490             ThreadUnlock(deque->lock);
491             return false;
492         }
493         // Reevaluate predicate to protect against spurious wakeups
494     } while (deque->size != 0);
495 
496     ThreadUnlock(deque->lock);
497 
498     return true;
499 }
500 
ThreadedDequeCopy(ThreadedDeque * deque)501 ThreadedDeque *ThreadedDequeCopy(ThreadedDeque *deque)
502 {
503     assert(deque != NULL);
504 
505     ThreadLock(deque->lock);
506 
507     ThreadedDeque *new_deque = xmemdup(deque, sizeof(ThreadedDeque));
508     new_deque->data = xmalloc(sizeof(void *) * deque->capacity);
509     memcpy(new_deque->data, deque->data,
510            sizeof(void *) * new_deque->capacity);
511 
512     ThreadUnlock(deque->lock);
513 
514     pthread_mutexattr_t attr;
515     pthread_mutexattr_init(&attr);
516     // enables error checking for deadlocks
517     int ret = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
518     if (ret != 0)
519     {
520         Log(LOG_LEVEL_ERR,
521             "Failed to use error-checking mutexes for deque, "
522             "falling back to normal ones (pthread_mutexattr_settype: %s)",
523             GetErrorStrFromCode(ret));
524         pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL);
525     }
526 
527     new_deque->lock = xmalloc(sizeof(pthread_mutex_t));
528     ret = pthread_mutex_init(new_deque->lock, &attr);
529     if (ret != 0)
530     {
531         Log(LOG_LEVEL_ERR,
532             "Failed to initialize mutex (pthread_mutex_init: %s)",
533             GetErrorStrFromCode(ret));
534         pthread_mutexattr_destroy(&attr);
535         free(new_deque->lock);
536         free(new_deque);
537         return NULL;
538     }
539 
540     new_deque->cond_non_empty = xmalloc(sizeof(pthread_cond_t));
541     ret = pthread_cond_init(new_deque->cond_non_empty, NULL);
542     if (ret != 0)
543     {
544         Log(LOG_LEVEL_ERR,
545             "Failed to initialize thread condition "
546             "(pthread_cond_init: %s)",
547             GetErrorStrFromCode(ret));
548         free(new_deque->lock);
549         free(new_deque->cond_non_empty);
550         free(new_deque);
551         return NULL;
552     }
553 
554     new_deque->cond_empty = xmalloc(sizeof(pthread_cond_t));
555     ret = pthread_cond_init(new_deque->cond_empty, NULL);
556     if (ret != 0)
557     {
558         Log(LOG_LEVEL_ERR,
559             "Failed to initialize thread condition "
560             "(pthread_cond_init: %s)",
561             GetErrorStrFromCode(ret));
562         free(new_deque->lock);
563         free(new_deque->cond_empty);
564         free(new_deque->cond_non_empty);
565         free(new_deque);
566         return NULL;
567     }
568 
569     return new_deque;
570 }
571 
572 /**
573   @brief Destroys data in range.
574   @warning Assumes that locks are acquired.
575   @note If start == end, this means that all elements in deque will be
576         destroyed. Since the internal array is circular, it will wrap around
577         when reaching the array bounds.
578   @param [in] deque  Pointer to struct.
579   @param [in] start  Position to start destroying from.
580   @param [in] end    First position to not destroy. Can be same as start to
581                      destroy the entire allocated area.
582   */
DestroyRange(ThreadedDeque * deque,size_t start,size_t end)583 static void DestroyRange(ThreadedDeque *deque, size_t start, size_t end)
584 {
585     assert(deque != NULL);
586     if (start > deque->capacity || end > deque->capacity)
587     {
588         Log(LOG_LEVEL_DEBUG,
589             "Failed to destroy ThreadedDeque, index greater than capacity: "
590             "start = %zu, end = %zu, capacity = %zu",
591             start, end, deque->capacity);
592         return;
593     }
594 
595     if ((deque->ItemDestroy != NULL) && deque->size > 0)
596     {
597         do
598         {
599             deque->ItemDestroy(deque->data[start]);
600             start++;
601             start %= deque->capacity;
602         } while (start != end);
603     }
604 }
605 
606 /**
607   @brief Either expands capacity of deque, or shifts tail to beginning.
608   @warning Assumes that locks are acquired.
609   @param [in] deque  Pointer to struct.
610   */
ExpandIfNecessary(ThreadedDeque * deque)611 static void ExpandIfNecessary(ThreadedDeque *deque)
612 {
613     assert(deque != NULL);
614     assert(deque->size <= deque->capacity);
615 
616     if (deque->size == deque->capacity)
617     {
618         if (deque->right <= deque->left)
619         {
620             size_t old_capacity = deque->capacity;
621 
622             deque->capacity *= EXPAND_FACTOR;
623             deque->data = xrealloc(deque->data,
624                                    sizeof(void *) * deque->capacity);
625 
626             /* Move the data that has wrapped around to the newly allocated
627              * part of the deque, since we need a continuous block of memory.
628              * Offset of new placement is `old_capacity`.
629              */
630             memmove(deque->data + old_capacity, deque->data,
631                     sizeof(void *) * deque->right);
632 
633             deque->right += old_capacity;
634         }
635         else
636         {
637             deque->capacity *= EXPAND_FACTOR;
638             deque->data = xrealloc(deque->data,
639                                    sizeof(void *) * deque->capacity);
640         }
641     }
642 }
643