1 /*
2  * Copyright (c) 2009, Sangoma Technologies
3  * Moises Silva <moy@sangoma.com>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  *
10  * * Redistributions of source code must retain the above copyright
11  * notice, this list of conditions and the following disclaimer.
12  *
13  * * Redistributions in binary form must reproduce the above copyright
14  * notice, this list of conditions and the following disclaimer in the
15  * documentation and/or other materials provided with the distribution.
16  *
17  * * Neither the name of the original author; nor the names of any contributors
18  * may be used to endorse or promote products derived from this software
19  * without specific prior written permission.
20  *
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  * A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER
26  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
27  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
28  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
29  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
30  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
31  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
32  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "private/ftdm_core.h"
36 
37 static ftdm_status_t ftdm_std_queue_create(ftdm_queue_t **outqueue, ftdm_size_t capacity);
38 static ftdm_status_t ftdm_std_queue_enqueue(ftdm_queue_t *queue, void *obj);
39 static void *ftdm_std_queue_dequeue(ftdm_queue_t *queue);
40 static ftdm_status_t ftdm_std_queue_wait(ftdm_queue_t *queue, int ms);
41 static ftdm_status_t ftdm_std_queue_get_interrupt(ftdm_queue_t *queue, ftdm_interrupt_t **interrupt);
42 static ftdm_status_t ftdm_std_queue_destroy(ftdm_queue_t **inqueue);
43 
44 struct ftdm_queue {
45 	ftdm_mutex_t *mutex;
46 	ftdm_interrupt_t *interrupt;
47 	ftdm_size_t capacity;
48 	ftdm_size_t size;
49 	unsigned rindex;
50 	unsigned windex;
51 	void **elements;
52 };
53 
54 FT_DECLARE_DATA ftdm_queue_handler_t g_ftdm_queue_handler =
55 {
56 	/*.create = */ ftdm_std_queue_create,
57 	/*.enqueue = */ ftdm_std_queue_enqueue,
58 	/*.dequeue = */ ftdm_std_queue_dequeue,
59 	/*.wait = */ ftdm_std_queue_wait,
60 	/*.get_interrupt = */ ftdm_std_queue_get_interrupt,
61 	/*.destroy = */ ftdm_std_queue_destroy
62 };
63 
ftdm_global_set_queue_handler(ftdm_queue_handler_t * handler)64 FT_DECLARE(ftdm_status_t) ftdm_global_set_queue_handler(ftdm_queue_handler_t *handler)
65 {
66 	if (!handler ||
67 		!handler->create ||
68 		!handler->enqueue ||
69 		!handler->dequeue ||
70 		!handler->wait ||
71 		!handler->get_interrupt ||
72 		!handler->destroy) {
73 		return FTDM_FAIL;
74 	}
75 	memcpy(&g_ftdm_queue_handler, handler, sizeof(*handler));
76 	return FTDM_SUCCESS;
77 }
78 
ftdm_std_queue_create(ftdm_queue_t ** outqueue,ftdm_size_t capacity)79 static ftdm_status_t ftdm_std_queue_create(ftdm_queue_t **outqueue, ftdm_size_t capacity)
80 {
81 	ftdm_queue_t *queue = NULL;
82 	ftdm_assert_return(outqueue, FTDM_FAIL, "Queue double pointer is null\n");
83 	ftdm_assert_return(capacity > 0, FTDM_FAIL, "Queue capacity is not bigger than 0\n");
84 
85 	*outqueue = NULL;
86 	queue = ftdm_calloc(1, sizeof(*queue));
87 	if (!queue) {
88 		return FTDM_FAIL;
89 	}
90 
91 	queue->elements = ftdm_calloc(1, (sizeof(void*)*capacity));
92 	if (!queue->elements) {
93 		goto failed;
94 	}
95 	queue->capacity = capacity;
96 
97 	if (ftdm_mutex_create(&queue->mutex) != FTDM_SUCCESS) {
98 		goto failed;
99 	}
100 
101 	if (ftdm_interrupt_create(&queue->interrupt, FTDM_INVALID_SOCKET, FTDM_NO_FLAGS) != FTDM_SUCCESS) {
102 		goto failed;
103 	}
104 
105 	*outqueue = queue;
106 	return FTDM_SUCCESS;
107 
108 failed:
109 	if (queue) {
110 		if (queue->interrupt) {
111 			ftdm_interrupt_destroy(&queue->interrupt);
112 		}
113 		if (queue->mutex) {
114 			ftdm_mutex_destroy(&queue->mutex);
115 		}
116 		ftdm_safe_free(queue->elements);
117 		ftdm_safe_free(queue);
118 	}
119 	return FTDM_FAIL;
120 }
121 
ftdm_std_queue_enqueue(ftdm_queue_t * queue,void * obj)122 static ftdm_status_t ftdm_std_queue_enqueue(ftdm_queue_t *queue, void *obj)
123 {
124 	ftdm_status_t status = FTDM_FAIL;
125 
126 	ftdm_assert_return(queue != NULL, FTDM_FAIL, "Queue is null!");
127 
128 	ftdm_mutex_lock(queue->mutex);
129 
130 	if (queue->windex == queue->capacity) {
131 		/* try to see if we can wrap around */
132 		queue->windex = 0;
133 	}
134 
135 	if (queue->size != 0 && queue->windex == queue->rindex) {
136 		ftdm_log(FTDM_LOG_ERROR, "Failed to enqueue obj %p in queue %p, no more room! windex == rindex == %d!\n", obj, queue, queue->windex);
137 		goto done;
138 	}
139 
140 	queue->elements[queue->windex++] = obj;
141 	queue->size++;
142 	status = FTDM_SUCCESS;
143 
144 	/* wake up queue reader */
145 	ftdm_interrupt_signal(queue->interrupt);
146 
147 done:
148 
149 	ftdm_mutex_unlock(queue->mutex);
150 
151 	return status;
152 }
153 
ftdm_std_queue_dequeue(ftdm_queue_t * queue)154 static void *ftdm_std_queue_dequeue(ftdm_queue_t *queue)
155 {
156 	void *obj = NULL;
157 
158 	ftdm_assert_return(queue != NULL, NULL, "Queue is null!");
159 
160 	ftdm_mutex_lock(queue->mutex);
161 
162 	if (queue->size == 0) {
163 		goto done;
164 	}
165 
166 	obj = queue->elements[queue->rindex];
167 	queue->elements[queue->rindex++] = NULL;
168 	queue->size--;
169 	if (queue->rindex == queue->capacity) {
170 		queue->rindex = 0;
171 	}
172 
173 done:
174 
175 	ftdm_mutex_unlock(queue->mutex);
176 
177 	return obj;
178 }
179 
ftdm_std_queue_wait(ftdm_queue_t * queue,int ms)180 static ftdm_status_t ftdm_std_queue_wait(ftdm_queue_t *queue, int ms)
181 {
182 	ftdm_status_t ret;
183 	ftdm_assert_return(queue != NULL, FTDM_FAIL, "Queue is null!");
184 
185 	ftdm_mutex_lock(queue->mutex);
186 
187 	/* if there is elements in the queue, no need to wait */
188 	if (queue->size != 0) {
189 		ftdm_mutex_unlock(queue->mutex);
190 		return FTDM_SUCCESS;
191 	}
192 
193 	/* no elements on the queue, wait for someone to write an element */
194 	ret = ftdm_interrupt_wait(queue->interrupt, ms);
195 
196 	/* got an element or timeout, bail out */
197 	ftdm_mutex_unlock(queue->mutex);
198 
199 	return ret;
200 }
201 
ftdm_std_queue_get_interrupt(ftdm_queue_t * queue,ftdm_interrupt_t ** interrupt)202 static ftdm_status_t ftdm_std_queue_get_interrupt(ftdm_queue_t *queue, ftdm_interrupt_t **interrupt)
203 {
204 	ftdm_assert_return(queue != NULL, FTDM_FAIL, "Queue is null!\n");
205 	ftdm_assert_return(interrupt != NULL, FTDM_FAIL, "Queue is null!\n");
206 	*interrupt = queue->interrupt;
207 	return FTDM_SUCCESS;
208 }
209 
ftdm_std_queue_destroy(ftdm_queue_t ** inqueue)210 static ftdm_status_t ftdm_std_queue_destroy(ftdm_queue_t **inqueue)
211 {
212 	ftdm_queue_t *queue = NULL;
213 	ftdm_assert_return(inqueue != NULL, FTDM_FAIL, "Queue is null!\n");
214 	ftdm_assert_return(*inqueue != NULL, FTDM_FAIL, "Queue is null!\n");
215 
216 	queue = *inqueue;
217 	ftdm_interrupt_destroy(&queue->interrupt);
218 	ftdm_mutex_destroy(&queue->mutex);
219 	ftdm_safe_free(queue->elements);
220 	ftdm_safe_free(queue);
221 	*inqueue = NULL;
222 	return FTDM_SUCCESS;
223 }
224 
225 /* For Emacs:
226  * Local Variables:
227  * mode:c
228  * indent-tabs-mode:t
229  * tab-width:4
230  * c-basic-offset:4
231  * End:
232  * For VIM:
233  * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
234  */
235