1 #include <test.h>
2 
3 #include <alloc.h>
4 #include <mutex.h>
5 #include <threaded_queue.h>
6 
7 /* Memory illustration legend:          *
8  *      | : memory bounds               *
9  *      > : head                        *
10  *      < : tail                        *
11  *      ^ : head + tail (empty)         *
12  *      v : head + tail (at capacity)   *
13  *      x : used memory                 *
14  *      - : unused memory               */
15 
test_push_pop(void)16 static void test_push_pop(void)
17 {
18     // Initialised with DEFAULT_CAPACITY = 16
19     ThreadedQueue *queue = ThreadedQueueNew(0, free);
20     // |^---------------|
21     ThreadedQueuePush(queue, xstrdup("1"));
22     // |><--------------|
23     ThreadedQueuePush(queue, xstrdup("2"));
24     // |>x<-------------|
25     ThreadedQueuePush(queue, xstrdup("3"));
26     // |>xx<------------|
27 
28     char *str1; ThreadedQueuePop(queue, (void **)&str1, 0);
29     // |->x<------------|
30     char *str2; ThreadedQueuePop(queue, (void **)&str2, 0);
31     // |--><------------|
32     char *str3; ThreadedQueuePop(queue, (void **)&str3, 0);
33     // |---v------------|
34 
35     assert_string_equal(str1, "1");
36     assert_string_equal(str2, "2");
37     assert_string_equal(str3, "3");
38 
39     free(str1);
40     free(str2);
41     free(str3);
42 
43     ThreadedQueueDestroy(queue);
44 }
45 
test_pop_empty_and_push_null(void)46 static void test_pop_empty_and_push_null(void)
47 {
48     ThreadedQueue *queue = ThreadedQueueNew(1, NULL);
49     // |^|
50 
51     assert(ThreadedQueueIsEmpty(queue));
52 
53     void *i_am_null = NULL; bool ret = ThreadedQueuePop(queue, &i_am_null, 0);
54     // |^|
55     assert(i_am_null == NULL);
56     assert_false(ret);
57     ThreadedQueuePush(queue, i_am_null);
58     // |v|
59     ret = ThreadedQueuePop(queue, &i_am_null, 0);
60     assert(i_am_null == NULL);
61     assert_true(ret);
62     // |^|
63 
64     ThreadedQueueDestroy(queue);
65 }
66 
test_copy(void)67 static void test_copy(void)
68 {
69     ThreadedQueue *queue = ThreadedQueueNew(4, free);
70     // queue: |^---|
71 
72     ThreadedQueuePush(queue, xstrdup("1"));
73     // queue: |><--|
74     ThreadedQueuePush(queue, xstrdup("2"));
75     // queue: |>x<-|
76     ThreadedQueuePush(queue, xstrdup("3"));
77     // queue: |>xx<|
78 
79     ThreadedQueue *new_queue = ThreadedQueueCopy(queue);
80     // new_queue: |>xx<|
81 
82     assert(new_queue != NULL);
83     assert_int_equal(ThreadedQueueCount(queue),
84                      ThreadedQueueCount(new_queue));
85     assert_int_equal(ThreadedQueueCapacity(queue),
86                      ThreadedQueueCapacity(new_queue));
87 
88     char *old_str1; ThreadedQueuePop(queue, (void **)&old_str1, 0);
89     // queue: |->x<|
90     char *old_str2; ThreadedQueuePop(queue, (void **)&old_str2, 0);
91     // queue: |--><|
92     char *old_str3; ThreadedQueuePop(queue, (void **)&old_str3, 0);
93     // queue: |---^|
94 
95     char *new_str1; ThreadedQueuePop(new_queue, (void **)&new_str1, 0);
96     // new_queue: |->x<|
97     char *new_str2; ThreadedQueuePop(new_queue, (void **)&new_str2, 0);
98     // new_queue: |--><|
99     char *new_str3; ThreadedQueuePop(new_queue, (void **)&new_str3, 0);
100     // new_queue: |---^|
101 
102     // Check if pointers are equal (since this is a shallow copy)
103     assert(old_str1 == new_str1);
104     assert(old_str2 == new_str2);
105     assert(old_str3 == new_str3);
106 
107     free(old_str1);
108     free(old_str2);
109     free(old_str3);
110 
111     ThreadedQueueSoftDestroy(queue);
112 
113     // Tests expanding the copied queue
114     ThreadedQueuePush(new_queue, xstrdup("1"));
115     // Internal array wraps:
116     // new_queue: |<-->|
117     ThreadedQueuePush(new_queue, xstrdup("2"));
118     // new_queue: |x<->|
119     ThreadedQueuePush(new_queue, xstrdup("3"));
120     // new_queue: |xx<>|
121     ThreadedQueuePush(new_queue, xstrdup("4"));
122     // new_queue: |xxxv|
123     ThreadedQueuePush(new_queue, xstrdup("5"));
124     // Internal array restructured, array moved to end:
125     // new_queue: |<-->xxxx|
126 
127     assert_int_equal(ThreadedQueueCount(new_queue), 5);
128     assert_int_equal(ThreadedQueueCapacity(new_queue), 8);
129 
130     ThreadedQueuePop(new_queue, (void **)&new_str1, 0);
131     // new_queue: |<--->xxx|
132     ThreadedQueuePop(new_queue, (void **)&new_str2, 0);
133     // new_queue: |<---->xx|
134     ThreadedQueuePop(new_queue, (void **)&new_str3, 0);
135     // new_queue: |<----->x|
136     char *new_str4; ThreadedQueuePop(new_queue, (void **)&new_str4, 0);
137     // new_queue: |<------>|
138     char *new_str5; ThreadedQueuePop(new_queue, (void **)&new_str5, 0);
139     // new_queue: |^-------|
140 
141     assert_string_equal(new_str1, "1");
142     assert_string_equal(new_str2, "2");
143     assert_string_equal(new_str3, "3");
144     assert_string_equal(new_str4, "4");
145     assert_string_equal(new_str5, "5");
146 
147     free(new_str1);
148     free(new_str2);
149     free(new_str3);
150     free(new_str4);
151     free(new_str5);
152 
153     ThreadedQueueDestroy(new_queue);
154 }
155 
test_push_report_count(void)156 static void test_push_report_count(void)
157 {
158     ThreadedQueue *queue = ThreadedQueueNew(0, free);
159 
160     size_t size1 = ThreadedQueuePush(queue, xstrdup("1"));
161     size_t size2 = ThreadedQueuePush(queue, xstrdup("2"));
162     size_t size3 = ThreadedQueuePush(queue, xstrdup("3"));
163     size_t size4 = ThreadedQueuePush(queue, xstrdup("4"));
164 
165     assert_int_equal(size1, 1);
166     assert_int_equal(size2, 2);
167     assert_int_equal(size3, 3);
168     assert_int_equal(size4, 4);
169 
170     ThreadedQueueDestroy(queue);
171 }
172 
test_expand(void)173 static void test_expand(void)
174 {
175     ThreadedQueue *queue = ThreadedQueueNew(1, free);
176     // |^|
177 
178     ThreadedQueuePush(queue, xstrdup("spam"));
179     // |v|
180     ThreadedQueuePush(queue, xstrdup("spam"));
181     // |vx|
182 
183     char *tmp; ThreadedQueuePop(queue, (void **)&tmp, 0);
184     // |<>|
185     free(tmp);
186 
187     ThreadedQueuePush(queue, xstrdup("spam"));
188     // |xv|
189     ThreadedQueuePush(queue, xstrdup("spam"));
190     // Internal array restructured:
191     // |<>xx|
192     ThreadedQueuePush(queue, xstrdup("spam"));
193     // |xvxx|
194     ThreadedQueuePush(queue, xstrdup("spam"));
195     // Internal array restructured:
196     // |->xxxx<-|
197     ThreadedQueuePush(queue, xstrdup("spam"));
198     // |->xxxxx<|
199 
200     ThreadedQueuePop(queue, (void **)&tmp, 0);
201     // |-->xxxx<|
202     free(tmp);
203     ThreadedQueuePop(queue, (void **)&tmp, 0);
204     // |--->xxx<|
205     free(tmp);
206 
207     ThreadedQueuePush(queue, xstrdup("spam"));
208     // |<-->xxxx|
209     ThreadedQueuePush(queue, xstrdup("spam"));
210     // |x<->xxxx|
211     ThreadedQueuePush(queue, xstrdup("spam"));
212     // |xx<>xxxx|
213     ThreadedQueuePush(queue, xstrdup("spam"));
214     // |xxxvxxxx|
215     ThreadedQueuePush(queue, xstrdup("spam"));
216     // Internal array restructured
217     // |--->xxxxxxxx<---|
218     ThreadedQueuePush(queue, xstrdup("spam"));
219     // |--->xxxxxxxxx<--|
220     ThreadedQueuePush(queue, xstrdup("spam"));
221     // |--->xxxxxxxxxx<-|
222 
223     assert_int_equal(ThreadedQueueCount(queue), 11);
224     assert_int_equal(ThreadedQueueCapacity(queue), 16);
225 
226     ThreadedQueueDestroy(queue);
227 }
228 
test_pushn(void)229 static void test_pushn(void)
230 {
231     ThreadedQueue *queue = ThreadedQueueNew(0, NULL);
232 
233     char *strs[] = {"spam1", "spam2", "spam3", "spam4", "spam5"};
234     size_t count = ThreadedQueuePushN(queue, (void**) strs, 5);
235     assert_int_equal(count, 5);
236     count = ThreadedQueueCount(queue);
237     assert_int_equal(count, 5);
238 
239     for (int i = 0; i < 5; i++)
240     {
241         char *item;
242         ThreadedQueuePop(queue, (void **)&item, 0);
243         assert_string_equal(item, strs[i]);
244     }
245     count = ThreadedQueueCount(queue);
246     assert_int_equal(count, 0);
247 
248     ThreadedQueueDestroy(queue);
249 }
250 
test_popn(void)251 static void test_popn(void)
252 {
253     ThreadedQueue *queue = ThreadedQueueNew(0, free);
254     // Initialised with default size 16
255     // |^---------------|
256 
257     char *strs[] = {"spam1", "spam2", "spam3", "spam4", "spam5"};
258 
259     for (int i = 0; i < 5; i++)
260     {
261         ThreadedQueuePush(queue, xstrdup(strs[i]));
262     }
263     // |>xxxx<----------|
264 
265     void **data = NULL;
266     size_t count = ThreadedQueuePopN(queue, &data, 5, 0);
267     // |-----^----------|
268 
269     for (size_t i = 0; i < count; i++)
270     {
271         assert_string_equal(data[i], strs[i]);
272         free(data[i]);
273     }
274 
275     free(data);
276     ThreadedQueueDestroy(queue);
277 }
278 
test_clear(void)279 static void test_clear(void)
280 {
281     ThreadedQueue *queue = ThreadedQueueNew(0, free);
282 
283     char *strs[] = {"spam1", "spam2", "spam3", "spam4", "spam5"};
284 
285     for (int i = 0; i < 5; i++)
286     {
287         ThreadedQueuePush(queue, xstrdup(strs[i]));
288     }
289     size_t count = ThreadedQueueCount(queue);
290     assert_int_equal(count, 5);
291 
292     ThreadedQueueClear(queue);
293     count = ThreadedQueueCount(queue);
294     assert_int_equal(count, 0);
295 
296     ThreadedQueuePush(queue, xstrdup(strs[4]));
297     char *item;
298     ThreadedQueuePop(queue, (void **) &item, THREAD_BLOCK_INDEFINITELY);
299     assert_string_equal(item, strs[4]);
300     free(item);
301 
302     ThreadedQueueDestroy(queue);
303 }
304 
test_clear_and_push(void)305 static void test_clear_and_push(void)
306 {
307     ThreadedQueue *queue = ThreadedQueueNew(0, NULL);
308 
309     char *strs[] = {"spam1", "spam2", "spam3", "spam4", "spam5"};
310 
311     for (int i = 0; i < 4; i++)
312     {
313         ThreadedQueuePush(queue, strs[i]);
314     }
315     size_t count = ThreadedQueueCount(queue);
316     assert_int_equal(count, 4);
317 
318     count = ThreadedQueueClearAndPush(queue, strs[4]);
319     assert_int_equal(count, 1);
320     count = ThreadedQueueCount(queue);
321     assert_int_equal(count, 1);
322 
323     char *item;
324     ThreadedQueuePop(queue, (void **)&item, 0);
325     assert_string_equal(item, strs[4]);
326 
327     ThreadedQueueDestroy(queue);
328 }
329 
330 // Thread tests
331 static ThreadedQueue *thread_queue;
332 
thread_pop()333 static void *thread_pop()
334 {
335     char *tmp;
336     ThreadedQueuePop(thread_queue, (void **)&tmp, THREAD_BLOCK_INDEFINITELY);
337     assert_string_equal(tmp, "bla");
338     free(tmp);
339 
340     return NULL;
341 }
342 
343 /**
344  * Used in test_threads_pushn(). Tries to pop 5 items while there should only be
345  * 3 which are all pushed at once. So the attempt should always pop 3 items.
346  */
thread_pop_5_3()347 static void *thread_pop_5_3()
348 {
349     char **items;
350     size_t n_popped = ThreadedQueuePopN(thread_queue, (void***)&items, 5, THREAD_BLOCK_INDEFINITELY);
351     assert_int_equal(n_popped, 3);
352     free(items);
353 
354     return NULL;
355 }
356 
thread_push()357 static void *thread_push()
358 {
359     char *str = "bla";
360     ThreadedQueuePush(thread_queue, xstrdup(str));
361 
362     return NULL;
363 }
364 
thread_wait_empty()365 static void *thread_wait_empty()
366 {
367     ThreadedQueueWaitEmpty(thread_queue, THREAD_BLOCK_INDEFINITELY);
368     ThreadedQueuePush(thread_queue, xstrdup("a_test"));
369 
370     return NULL;
371 }
372 
373 /* Used in the test_threads_clear_empty */
thread_just_wait_empty()374 static void *thread_just_wait_empty()
375 {
376     ThreadedQueueWaitEmpty(thread_queue, THREAD_BLOCK_INDEFINITELY);
377 
378     return NULL;
379 }
380 
test_threads_wait_pop(void)381 static void test_threads_wait_pop(void)
382 {
383 #define POP_ITERATIONS 100
384     thread_queue = ThreadedQueueNew(0, free);
385 
386     pthread_t pops[POP_ITERATIONS] = {0};
387     for (int i = 0; i < POP_ITERATIONS; i++)
388     {
389         int res_create = pthread_create(&(pops[i]), NULL,
390                                         thread_pop, NULL);
391         assert_int_equal(res_create, 0);
392     }
393 
394     pthread_t pushs[POP_ITERATIONS] = {0};
395     for (int i = 0; i < POP_ITERATIONS; i++)
396     {
397         int res_create = pthread_create(&(pushs[i]), NULL,
398                                         thread_push, NULL);
399         assert_int_equal(res_create, 0);
400     }
401 
402     void *retval = NULL;
403     int res;
404 
405     for (int i = 0; i < POP_ITERATIONS; i++)
406     {
407         res = pthread_join(pops[i], retval);
408         assert_int_equal(res, 0);
409         assert(retval == NULL);
410 
411         res = pthread_join(pushs[i], retval);
412         assert_int_equal(res, 0);
413         assert(retval == NULL);
414     }
415 
416     ThreadedQueueDestroy(thread_queue);
417 }
418 
test_threads_wait_empty(void)419 static void test_threads_wait_empty(void)
420 {
421 #define WAIT_ITERATIONS 100
422     thread_queue = ThreadedQueueNew(0, free);
423 
424     pthread_t pushs[WAIT_ITERATIONS] = {0};
425     for (int i = 0; i < WAIT_ITERATIONS; i++)
426     {
427         int res_create = pthread_create(&(pushs[i]), NULL,
428                                         thread_push, NULL);
429         assert_int_equal(res_create, 0);
430     }
431 
432     sleep(1);
433     pthread_t wait_thread = 0;
434     int res_create = pthread_create(&wait_thread, NULL,
435                                     thread_wait_empty, NULL);
436     assert_int_equal(res_create, 0);
437 
438     do {
439         sleep(1);
440     } while (ThreadedQueueCount(thread_queue) != WAIT_ITERATIONS);
441 
442     char **data_array = NULL;
443     size_t arr_size = ThreadedQueuePopN(thread_queue, (void ***)&data_array,
444                                         WAIT_ITERATIONS, 0);
445 
446     for (size_t i = 0; i < arr_size; i++)
447     {
448         free(data_array[i]);
449     }
450 
451     free(data_array);
452 
453     char *waited_str; ThreadedQueuePop(thread_queue, (void **)&waited_str, 1);
454     assert_string_equal(waited_str, "a_test");
455     free(waited_str);
456 
457     void *retval = NULL;
458     int res;
459 
460     for (int i = 0; i < WAIT_ITERATIONS; i++)
461     {
462         res = pthread_join(pushs[i], retval);
463         assert_int_equal(res, 0);
464         assert(retval == NULL);
465     }
466 
467     res = pthread_join(wait_thread, retval);
468     assert_int_equal(res, 0);
469     assert(retval == NULL);
470 
471     ThreadedQueueDestroy(thread_queue);
472 }
473 
test_threads_pushn()474 static void test_threads_pushn()
475 {
476     thread_queue = ThreadedQueueNew(0, NULL);
477 
478     pthread_t pop_thread;
479     int res = pthread_create(&pop_thread, NULL,
480                              thread_pop_5_3, NULL);
481     assert_int_equal(res, 0);
482 
483     /* give the other thread time to start waiting */
484     sleep(1);
485 
486     char *strs[] = {"spam1", "spam2", "spam3"};
487     size_t count = ThreadedQueuePushN(thread_queue, (void **)strs, 3);
488     assert_int_equal(count, 3);
489 
490     res = pthread_join(pop_thread, NULL);
491     assert_int_equal(res, 0);
492 
493     count = ThreadedQueueCount(thread_queue);
494     assert_int_equal(count, 0);
495 
496     ThreadedQueueDestroy(thread_queue);
497 }
498 
test_threads_clear_empty()499 static void test_threads_clear_empty()
500 {
501     thread_queue = ThreadedQueueNew(0, NULL);
502 
503     char *strs[] = {"spam1", "spam2", "spam3", "spam4", "spam5"};
504 
505     for (int i = 0; i < 5; i++)
506     {
507         ThreadedQueuePush(thread_queue, strs[i]);
508     }
509     size_t count = ThreadedQueueCount(thread_queue);
510     assert_int_equal(count, 5);
511 
512     pthread_t wait_thread;
513     int res = pthread_create(&wait_thread, NULL,
514                              thread_just_wait_empty, NULL);
515     assert_int_equal(res, 0);
516 
517     ThreadedQueueClear(thread_queue);
518     count = ThreadedQueueCount(thread_queue);
519     assert_int_equal(count, 0);
520 
521     res = pthread_join(wait_thread, NULL);
522     assert_int_equal(res, 0);
523 
524     ThreadedQueueDestroy(thread_queue);
525 }
526 
main()527 int main()
528 {
529     PRINT_TEST_BANNER();
530     const UnitTest tests[] =
531     {
532         unit_test(test_push_pop),
533         unit_test(test_pop_empty_and_push_null),
534         unit_test(test_copy),
535         unit_test(test_push_report_count),
536         unit_test(test_expand),
537         unit_test(test_popn),
538         unit_test(test_pushn),
539         unit_test(test_clear),
540         unit_test(test_clear_and_push),
541         unit_test(test_threads_wait_pop),
542         unit_test(test_threads_wait_empty),
543         unit_test(test_threads_pushn),
544         unit_test(test_threads_clear_empty),
545     };
546     return run_tests(tests);
547 }
548