1 /*
2 * Copyright (c) 2009, Sangoma Technologies
3 * Moises Silva <moy@sangoma.com>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 *
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 *
17 * * Neither the name of the original author; nor the names of any contributors
18 * may be used to endorse or promote products derived from this software
19 * without specific prior written permission.
20 *
21 *
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
26 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
27 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
28 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
29 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
30 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
31 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
32 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33 */
34
35 #include "private/ftdm_core.h"
36
37 static ftdm_status_t ftdm_std_queue_create(ftdm_queue_t **outqueue, ftdm_size_t capacity);
38 static ftdm_status_t ftdm_std_queue_enqueue(ftdm_queue_t *queue, void *obj);
39 static void *ftdm_std_queue_dequeue(ftdm_queue_t *queue);
40 static ftdm_status_t ftdm_std_queue_wait(ftdm_queue_t *queue, int ms);
41 static ftdm_status_t ftdm_std_queue_get_interrupt(ftdm_queue_t *queue, ftdm_interrupt_t **interrupt);
42 static ftdm_status_t ftdm_std_queue_destroy(ftdm_queue_t **inqueue);
43
44 struct ftdm_queue {
45 ftdm_mutex_t *mutex;
46 ftdm_interrupt_t *interrupt;
47 ftdm_size_t capacity;
48 ftdm_size_t size;
49 unsigned rindex;
50 unsigned windex;
51 void **elements;
52 };
53
54 FT_DECLARE_DATA ftdm_queue_handler_t g_ftdm_queue_handler =
55 {
56 /*.create = */ ftdm_std_queue_create,
57 /*.enqueue = */ ftdm_std_queue_enqueue,
58 /*.dequeue = */ ftdm_std_queue_dequeue,
59 /*.wait = */ ftdm_std_queue_wait,
60 /*.get_interrupt = */ ftdm_std_queue_get_interrupt,
61 /*.destroy = */ ftdm_std_queue_destroy
62 };
63
ftdm_global_set_queue_handler(ftdm_queue_handler_t * handler)64 FT_DECLARE(ftdm_status_t) ftdm_global_set_queue_handler(ftdm_queue_handler_t *handler)
65 {
66 if (!handler ||
67 !handler->create ||
68 !handler->enqueue ||
69 !handler->dequeue ||
70 !handler->wait ||
71 !handler->get_interrupt ||
72 !handler->destroy) {
73 return FTDM_FAIL;
74 }
75 memcpy(&g_ftdm_queue_handler, handler, sizeof(*handler));
76 return FTDM_SUCCESS;
77 }
78
ftdm_std_queue_create(ftdm_queue_t ** outqueue,ftdm_size_t capacity)79 static ftdm_status_t ftdm_std_queue_create(ftdm_queue_t **outqueue, ftdm_size_t capacity)
80 {
81 ftdm_queue_t *queue = NULL;
82 ftdm_assert_return(outqueue, FTDM_FAIL, "Queue double pointer is null\n");
83 ftdm_assert_return(capacity > 0, FTDM_FAIL, "Queue capacity is not bigger than 0\n");
84
85 *outqueue = NULL;
86 queue = ftdm_calloc(1, sizeof(*queue));
87 if (!queue) {
88 return FTDM_FAIL;
89 }
90
91 queue->elements = ftdm_calloc(1, (sizeof(void*)*capacity));
92 if (!queue->elements) {
93 goto failed;
94 }
95 queue->capacity = capacity;
96
97 if (ftdm_mutex_create(&queue->mutex) != FTDM_SUCCESS) {
98 goto failed;
99 }
100
101 if (ftdm_interrupt_create(&queue->interrupt, FTDM_INVALID_SOCKET, FTDM_NO_FLAGS) != FTDM_SUCCESS) {
102 goto failed;
103 }
104
105 *outqueue = queue;
106 return FTDM_SUCCESS;
107
108 failed:
109 if (queue) {
110 if (queue->interrupt) {
111 ftdm_interrupt_destroy(&queue->interrupt);
112 }
113 if (queue->mutex) {
114 ftdm_mutex_destroy(&queue->mutex);
115 }
116 ftdm_safe_free(queue->elements);
117 ftdm_safe_free(queue);
118 }
119 return FTDM_FAIL;
120 }
121
ftdm_std_queue_enqueue(ftdm_queue_t * queue,void * obj)122 static ftdm_status_t ftdm_std_queue_enqueue(ftdm_queue_t *queue, void *obj)
123 {
124 ftdm_status_t status = FTDM_FAIL;
125
126 ftdm_assert_return(queue != NULL, FTDM_FAIL, "Queue is null!");
127
128 ftdm_mutex_lock(queue->mutex);
129
130 if (queue->windex == queue->capacity) {
131 /* try to see if we can wrap around */
132 queue->windex = 0;
133 }
134
135 if (queue->size != 0 && queue->windex == queue->rindex) {
136 ftdm_log(FTDM_LOG_ERROR, "Failed to enqueue obj %p in queue %p, no more room! windex == rindex == %d!\n", obj, queue, queue->windex);
137 goto done;
138 }
139
140 queue->elements[queue->windex++] = obj;
141 queue->size++;
142 status = FTDM_SUCCESS;
143
144 /* wake up queue reader */
145 ftdm_interrupt_signal(queue->interrupt);
146
147 done:
148
149 ftdm_mutex_unlock(queue->mutex);
150
151 return status;
152 }
153
ftdm_std_queue_dequeue(ftdm_queue_t * queue)154 static void *ftdm_std_queue_dequeue(ftdm_queue_t *queue)
155 {
156 void *obj = NULL;
157
158 ftdm_assert_return(queue != NULL, NULL, "Queue is null!");
159
160 ftdm_mutex_lock(queue->mutex);
161
162 if (queue->size == 0) {
163 goto done;
164 }
165
166 obj = queue->elements[queue->rindex];
167 queue->elements[queue->rindex++] = NULL;
168 queue->size--;
169 if (queue->rindex == queue->capacity) {
170 queue->rindex = 0;
171 }
172
173 done:
174
175 ftdm_mutex_unlock(queue->mutex);
176
177 return obj;
178 }
179
ftdm_std_queue_wait(ftdm_queue_t * queue,int ms)180 static ftdm_status_t ftdm_std_queue_wait(ftdm_queue_t *queue, int ms)
181 {
182 ftdm_status_t ret;
183 ftdm_assert_return(queue != NULL, FTDM_FAIL, "Queue is null!");
184
185 ftdm_mutex_lock(queue->mutex);
186
187 /* if there is elements in the queue, no need to wait */
188 if (queue->size != 0) {
189 ftdm_mutex_unlock(queue->mutex);
190 return FTDM_SUCCESS;
191 }
192
193 /* no elements on the queue, wait for someone to write an element */
194 ret = ftdm_interrupt_wait(queue->interrupt, ms);
195
196 /* got an element or timeout, bail out */
197 ftdm_mutex_unlock(queue->mutex);
198
199 return ret;
200 }
201
ftdm_std_queue_get_interrupt(ftdm_queue_t * queue,ftdm_interrupt_t ** interrupt)202 static ftdm_status_t ftdm_std_queue_get_interrupt(ftdm_queue_t *queue, ftdm_interrupt_t **interrupt)
203 {
204 ftdm_assert_return(queue != NULL, FTDM_FAIL, "Queue is null!\n");
205 ftdm_assert_return(interrupt != NULL, FTDM_FAIL, "Queue is null!\n");
206 *interrupt = queue->interrupt;
207 return FTDM_SUCCESS;
208 }
209
ftdm_std_queue_destroy(ftdm_queue_t ** inqueue)210 static ftdm_status_t ftdm_std_queue_destroy(ftdm_queue_t **inqueue)
211 {
212 ftdm_queue_t *queue = NULL;
213 ftdm_assert_return(inqueue != NULL, FTDM_FAIL, "Queue is null!\n");
214 ftdm_assert_return(*inqueue != NULL, FTDM_FAIL, "Queue is null!\n");
215
216 queue = *inqueue;
217 ftdm_interrupt_destroy(&queue->interrupt);
218 ftdm_mutex_destroy(&queue->mutex);
219 ftdm_safe_free(queue->elements);
220 ftdm_safe_free(queue);
221 *inqueue = NULL;
222 return FTDM_SUCCESS;
223 }
224
225 /* For Emacs:
226 * Local Variables:
227 * mode:c
228 * indent-tabs-mode:t
229 * tab-width:4
230 * c-basic-offset:4
231 * End:
232 * For VIM:
233 * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
234 */
235