1 /*
2  * Copyright (c) 2013 Andrew Kelley
3  *
4  * This file is part of libgroove, which is MIT licensed.
5  * See http://opensource.org/licenses/MIT
6  */
7 
8 #include "queue.h"
9 
10 #include <libavutil/mem.h>
11 #include <pthread.h>
12 
13 struct ItemList {
14     void *obj;
15     struct ItemList *next;
16 };
17 
18 struct GrooveQueuePrivate {
19     struct GrooveQueue externals;
20     struct ItemList *first;
21     struct ItemList *last;
22     pthread_mutex_t mutex;
23     pthread_cond_t cond;
24     int abort_request;
25 };
26 
groove_queue_create(void)27 struct GrooveQueue *groove_queue_create(void) {
28     struct GrooveQueuePrivate *q = av_mallocz(sizeof(struct GrooveQueuePrivate));
29     if (!q)
30         return NULL;
31 
32     if (pthread_mutex_init(&q->mutex, NULL) != 0) {
33         av_free(q);
34         return NULL;
35     }
36     if (pthread_cond_init(&q->cond, NULL) != 0) {
37         av_free(q);
38         pthread_mutex_destroy(&q->mutex);
39         return NULL;
40     }
41     struct GrooveQueue *queue = &q->externals;
42     queue->cleanup = groove_queue_cleanup_default;
43     return queue;
44 }
45 
groove_queue_flush(struct GrooveQueue * queue)46 void groove_queue_flush(struct GrooveQueue *queue) {
47     struct GrooveQueuePrivate *q = (struct GrooveQueuePrivate *) queue;
48 
49     pthread_mutex_lock(&q->mutex);
50 
51     struct ItemList *el;
52     struct ItemList *el1;
53     for (el = q->first; el != NULL; el = el1) {
54         el1 = el->next;
55         if (queue->cleanup)
56             queue->cleanup(queue, el->obj);
57         av_free(el);
58     }
59     q->first = NULL;
60     q->last = NULL;
61 
62     pthread_mutex_unlock(&q->mutex);
63 }
64 
groove_queue_destroy(struct GrooveQueue * queue)65 void groove_queue_destroy(struct GrooveQueue *queue) {
66     groove_queue_flush(queue);
67     struct GrooveQueuePrivate *q = (struct GrooveQueuePrivate *) queue;
68     pthread_mutex_destroy(&q->mutex);
69     pthread_cond_destroy(&q->cond);
70     av_free(q);
71 }
72 
groove_queue_abort(struct GrooveQueue * queue)73 void groove_queue_abort(struct GrooveQueue *queue) {
74     struct GrooveQueuePrivate *q = (struct GrooveQueuePrivate *) queue;
75 
76     pthread_mutex_lock(&q->mutex);
77 
78     q->abort_request = 1;
79 
80     pthread_cond_signal(&q->cond);
81     pthread_mutex_unlock(&q->mutex);
82 }
83 
groove_queue_reset(struct GrooveQueue * queue)84 void groove_queue_reset(struct GrooveQueue *queue) {
85     struct GrooveQueuePrivate *q = (struct GrooveQueuePrivate *) queue;
86 
87     pthread_mutex_lock(&q->mutex);
88 
89     q->abort_request = 0;
90 
91     pthread_mutex_unlock(&q->mutex);
92 }
93 
groove_queue_put(struct GrooveQueue * queue,void * obj)94 int groove_queue_put(struct GrooveQueue *queue, void *obj) {
95     struct ItemList * el1 = av_mallocz(sizeof(struct ItemList));
96 
97     if (!el1)
98         return -1;
99 
100     el1->obj = obj;
101 
102     struct GrooveQueuePrivate *q = (struct GrooveQueuePrivate *) queue;
103     pthread_mutex_lock(&q->mutex);
104 
105     if (!q->last)
106         q->first = el1;
107     else
108         q->last->next = el1;
109     q->last = el1;
110 
111     if (queue->put)
112         queue->put(queue, obj);
113 
114     pthread_cond_signal(&q->cond);
115     pthread_mutex_unlock(&q->mutex);
116 
117     return 0;
118 }
119 
groove_queue_peek(struct GrooveQueue * queue,int block)120 int groove_queue_peek(struct GrooveQueue *queue, int block) {
121     int ret;
122 
123     struct GrooveQueuePrivate *q = (struct GrooveQueuePrivate *) queue;
124     pthread_mutex_lock(&q->mutex);
125 
126     for (;;) {
127         if (q->abort_request) {
128             ret = -1;
129             break;
130         }
131 
132         if (q->first) {
133             ret = 1;
134             break;
135         } else if (!block) {
136             ret = 0;
137             break;
138         } else {
139             pthread_cond_wait(&q->cond, &q->mutex);
140         }
141     }
142 
143     pthread_mutex_unlock(&q->mutex);
144     return ret;
145 }
146 
groove_queue_get(struct GrooveQueue * queue,void ** obj_ptr,int block)147 int groove_queue_get(struct GrooveQueue *queue, void **obj_ptr, int block) {
148     struct ItemList *ev1;
149     int ret;
150 
151     struct GrooveQueuePrivate *q = (struct GrooveQueuePrivate *) queue;
152     pthread_mutex_lock(&q->mutex);
153 
154     for (;;) {
155         if (q->abort_request) {
156             ret = -1;
157             break;
158         }
159 
160         ev1 = q->first;
161         if (ev1) {
162             q->first = ev1->next;
163             if (!q->first)
164                 q->last = NULL;
165 
166             if (queue->get)
167                 queue->get(queue, ev1->obj);
168 
169             *obj_ptr = ev1->obj;
170             av_free(ev1);
171             ret = 1;
172             break;
173         } else if(!block) {
174             ret = 0;
175             break;
176         } else {
177             pthread_cond_wait(&q->cond, &q->mutex);
178         }
179     }
180 
181     pthread_mutex_unlock(&q->mutex);
182     return ret;
183 }
184 
groove_queue_purge(struct GrooveQueue * queue)185 void groove_queue_purge(struct GrooveQueue *queue) {
186     struct GrooveQueuePrivate *q = (struct GrooveQueuePrivate *) queue;
187 
188     pthread_mutex_lock(&q->mutex);
189     struct ItemList *node = q->first;
190     struct ItemList *prev = NULL;
191     while (node) {
192         if (queue->purge(queue, node->obj)) {
193             if (prev) {
194                 prev->next = node->next;
195                 if (queue->cleanup)
196                     queue->cleanup(queue, node->obj);
197                 av_free(node);
198                 node = prev->next;
199                 if (!node)
200                     q->last = prev;
201             } else {
202                 struct ItemList *next = node->next;
203                 if (queue->cleanup)
204                     queue->cleanup(queue, node->obj);
205                 av_free(node);
206                 q->first = next;
207                 node = next;
208                 if (!node)
209                     q->last = NULL;
210             }
211         } else {
212             prev = node;
213             node = node->next;
214         }
215     }
216     pthread_mutex_unlock(&q->mutex);
217 }
218 
groove_queue_cleanup_default(struct GrooveQueue * queue,void * obj)219 void groove_queue_cleanup_default(struct GrooveQueue *queue, void *obj) {
220     av_free(obj);
221 }
222 
223