1 /*
2 * Copyright (c) 2013 Andrew Kelley
3 *
4 * This file is part of libgroove, which is MIT licensed.
5 * See http://opensource.org/licenses/MIT
6 */
7
8 #include "queue.h"
9
10 #include <libavutil/mem.h>
11 #include <pthread.h>
12
13 struct ItemList {
14 void *obj;
15 struct ItemList *next;
16 };
17
18 struct GrooveQueuePrivate {
19 struct GrooveQueue externals;
20 struct ItemList *first;
21 struct ItemList *last;
22 pthread_mutex_t mutex;
23 pthread_cond_t cond;
24 int abort_request;
25 };
26
groove_queue_create(void)27 struct GrooveQueue *groove_queue_create(void) {
28 struct GrooveQueuePrivate *q = av_mallocz(sizeof(struct GrooveQueuePrivate));
29 if (!q)
30 return NULL;
31
32 if (pthread_mutex_init(&q->mutex, NULL) != 0) {
33 av_free(q);
34 return NULL;
35 }
36 if (pthread_cond_init(&q->cond, NULL) != 0) {
37 av_free(q);
38 pthread_mutex_destroy(&q->mutex);
39 return NULL;
40 }
41 struct GrooveQueue *queue = &q->externals;
42 queue->cleanup = groove_queue_cleanup_default;
43 return queue;
44 }
45
groove_queue_flush(struct GrooveQueue * queue)46 void groove_queue_flush(struct GrooveQueue *queue) {
47 struct GrooveQueuePrivate *q = (struct GrooveQueuePrivate *) queue;
48
49 pthread_mutex_lock(&q->mutex);
50
51 struct ItemList *el;
52 struct ItemList *el1;
53 for (el = q->first; el != NULL; el = el1) {
54 el1 = el->next;
55 if (queue->cleanup)
56 queue->cleanup(queue, el->obj);
57 av_free(el);
58 }
59 q->first = NULL;
60 q->last = NULL;
61
62 pthread_mutex_unlock(&q->mutex);
63 }
64
groove_queue_destroy(struct GrooveQueue * queue)65 void groove_queue_destroy(struct GrooveQueue *queue) {
66 groove_queue_flush(queue);
67 struct GrooveQueuePrivate *q = (struct GrooveQueuePrivate *) queue;
68 pthread_mutex_destroy(&q->mutex);
69 pthread_cond_destroy(&q->cond);
70 av_free(q);
71 }
72
groove_queue_abort(struct GrooveQueue * queue)73 void groove_queue_abort(struct GrooveQueue *queue) {
74 struct GrooveQueuePrivate *q = (struct GrooveQueuePrivate *) queue;
75
76 pthread_mutex_lock(&q->mutex);
77
78 q->abort_request = 1;
79
80 pthread_cond_signal(&q->cond);
81 pthread_mutex_unlock(&q->mutex);
82 }
83
groove_queue_reset(struct GrooveQueue * queue)84 void groove_queue_reset(struct GrooveQueue *queue) {
85 struct GrooveQueuePrivate *q = (struct GrooveQueuePrivate *) queue;
86
87 pthread_mutex_lock(&q->mutex);
88
89 q->abort_request = 0;
90
91 pthread_mutex_unlock(&q->mutex);
92 }
93
groove_queue_put(struct GrooveQueue * queue,void * obj)94 int groove_queue_put(struct GrooveQueue *queue, void *obj) {
95 struct ItemList * el1 = av_mallocz(sizeof(struct ItemList));
96
97 if (!el1)
98 return -1;
99
100 el1->obj = obj;
101
102 struct GrooveQueuePrivate *q = (struct GrooveQueuePrivate *) queue;
103 pthread_mutex_lock(&q->mutex);
104
105 if (!q->last)
106 q->first = el1;
107 else
108 q->last->next = el1;
109 q->last = el1;
110
111 if (queue->put)
112 queue->put(queue, obj);
113
114 pthread_cond_signal(&q->cond);
115 pthread_mutex_unlock(&q->mutex);
116
117 return 0;
118 }
119
groove_queue_peek(struct GrooveQueue * queue,int block)120 int groove_queue_peek(struct GrooveQueue *queue, int block) {
121 int ret;
122
123 struct GrooveQueuePrivate *q = (struct GrooveQueuePrivate *) queue;
124 pthread_mutex_lock(&q->mutex);
125
126 for (;;) {
127 if (q->abort_request) {
128 ret = -1;
129 break;
130 }
131
132 if (q->first) {
133 ret = 1;
134 break;
135 } else if (!block) {
136 ret = 0;
137 break;
138 } else {
139 pthread_cond_wait(&q->cond, &q->mutex);
140 }
141 }
142
143 pthread_mutex_unlock(&q->mutex);
144 return ret;
145 }
146
groove_queue_get(struct GrooveQueue * queue,void ** obj_ptr,int block)147 int groove_queue_get(struct GrooveQueue *queue, void **obj_ptr, int block) {
148 struct ItemList *ev1;
149 int ret;
150
151 struct GrooveQueuePrivate *q = (struct GrooveQueuePrivate *) queue;
152 pthread_mutex_lock(&q->mutex);
153
154 for (;;) {
155 if (q->abort_request) {
156 ret = -1;
157 break;
158 }
159
160 ev1 = q->first;
161 if (ev1) {
162 q->first = ev1->next;
163 if (!q->first)
164 q->last = NULL;
165
166 if (queue->get)
167 queue->get(queue, ev1->obj);
168
169 *obj_ptr = ev1->obj;
170 av_free(ev1);
171 ret = 1;
172 break;
173 } else if(!block) {
174 ret = 0;
175 break;
176 } else {
177 pthread_cond_wait(&q->cond, &q->mutex);
178 }
179 }
180
181 pthread_mutex_unlock(&q->mutex);
182 return ret;
183 }
184
groove_queue_purge(struct GrooveQueue * queue)185 void groove_queue_purge(struct GrooveQueue *queue) {
186 struct GrooveQueuePrivate *q = (struct GrooveQueuePrivate *) queue;
187
188 pthread_mutex_lock(&q->mutex);
189 struct ItemList *node = q->first;
190 struct ItemList *prev = NULL;
191 while (node) {
192 if (queue->purge(queue, node->obj)) {
193 if (prev) {
194 prev->next = node->next;
195 if (queue->cleanup)
196 queue->cleanup(queue, node->obj);
197 av_free(node);
198 node = prev->next;
199 if (!node)
200 q->last = prev;
201 } else {
202 struct ItemList *next = node->next;
203 if (queue->cleanup)
204 queue->cleanup(queue, node->obj);
205 av_free(node);
206 q->first = next;
207 node = next;
208 if (!node)
209 q->last = NULL;
210 }
211 } else {
212 prev = node;
213 node = node->next;
214 }
215 }
216 pthread_mutex_unlock(&q->mutex);
217 }
218
groove_queue_cleanup_default(struct GrooveQueue * queue,void * obj)219 void groove_queue_cleanup_default(struct GrooveQueue *queue, void *obj) {
220 av_free(obj);
221 }
222
223