1 /*
2 * Copyright (C) 2016 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3 *
4 * This file is part of MooseFS.
5 *
6 * MooseFS is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, version 2 (only).
9 *
10 * MooseFS is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with MooseFS; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18 * or visit http://www.gnu.org/licenses/gpl-2.0.html
19 */
20
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24
25 #include <stdlib.h>
26 #include <pthread.h>
27 #include <inttypes.h>
28 #include <errno.h>
29
30 #include "massert.h"
31
32 typedef struct _qentry {
33 uint32_t id;
34 uint32_t op;
35 uint8_t *data;
36 uint32_t leng;
37 struct _qentry *next;
38 } qentry;
39
40 typedef struct _queue {
41 qentry *head,**tail;
42 uint32_t elements;
43 uint32_t size;
44 uint32_t maxsize;
45 uint32_t freewaiting;
46 uint32_t fullwaiting;
47 uint32_t closed;
48 pthread_cond_t waitfree,waitfull;
49 pthread_mutex_t lock;
50 } queue;
51
queue_new(uint32_t size)52 void* queue_new(uint32_t size) {
53 queue *q;
54 q = (queue*)malloc(sizeof(queue));
55 passert(q);
56 q->head = NULL;
57 q->tail = &(q->head);
58 q->elements = 0;
59 q->size = 0;
60 q->maxsize = size;
61 q->freewaiting = 0;
62 q->fullwaiting = 0;
63 q->closed = 0;
64 if (size) {
65 zassert(pthread_cond_init(&(q->waitfull),NULL));
66 }
67 zassert(pthread_cond_init(&(q->waitfree),NULL));
68 zassert(pthread_mutex_init(&(q->lock),NULL));
69 return q;
70 }
71
queue_delete(void * que)72 void queue_delete(void *que) {
73 queue *q = (queue*)que;
74 qentry *qe,*qen;
75 zassert(pthread_mutex_lock(&(q->lock)));
76 sassert(q->freewaiting==0);
77 sassert(q->fullwaiting==0);
78 for (qe = q->head ; qe ; qe = qen) {
79 qen = qe->next;
80 free(qe->data);
81 free(qe);
82 }
83 zassert(pthread_mutex_unlock(&(q->lock)));
84 zassert(pthread_mutex_destroy(&(q->lock)));
85 zassert(pthread_cond_destroy(&(q->waitfree)));
86 if (q->maxsize) {
87 zassert(pthread_cond_destroy(&(q->waitfull)));
88 }
89 free(q);
90 }
91
queue_close(void * que)92 void queue_close(void *que) {
93 queue *q = (queue*)que;
94 zassert(pthread_mutex_lock(&(q->lock)));
95 q->closed = 1;
96 if (q->freewaiting>0) {
97 zassert(pthread_cond_broadcast(&(q->waitfree)));
98 q->freewaiting = 0;
99 }
100 if (q->fullwaiting>0) {
101 zassert(pthread_cond_broadcast(&(q->waitfull)));
102 q->fullwaiting = 0;
103 }
104 zassert(pthread_mutex_unlock(&(q->lock)));
105 }
106
queue_isempty(void * que)107 int queue_isempty(void *que) {
108 queue *q = (queue*)que;
109 int r;
110 zassert(pthread_mutex_lock(&(q->lock)));
111 r=(q->elements==0)?1:0;
112 zassert(pthread_mutex_unlock(&(q->lock)));
113 return r;
114 }
115
queue_elements(void * que)116 uint32_t queue_elements(void *que) {
117 queue *q = (queue*)que;
118 uint32_t r;
119 zassert(pthread_mutex_lock(&(q->lock)));
120 r=q->elements;
121 zassert(pthread_mutex_unlock(&(q->lock)));
122 return r;
123 }
124
queue_isfull(void * que)125 int queue_isfull(void *que) {
126 queue *q = (queue*)que;
127 int r;
128 zassert(pthread_mutex_lock(&(q->lock)));
129 r = (q->maxsize>0 && q->maxsize<=q->size)?1:0;
130 zassert(pthread_mutex_unlock(&(q->lock)));
131 return r;
132 }
133
queue_sizeleft(void * que)134 uint32_t queue_sizeleft(void *que) {
135 queue *q = (queue*)que;
136 uint32_t r;
137 zassert(pthread_mutex_lock(&(q->lock)));
138 if (q->maxsize>0) {
139 r = q->maxsize-q->size;
140 } else {
141 r = 0xFFFFFFFF;
142 }
143 zassert(pthread_mutex_unlock(&(q->lock)));
144 return r;
145 }
146
queue_put(void * que,uint32_t id,uint32_t op,uint8_t * data,uint32_t leng)147 int queue_put(void *que,uint32_t id,uint32_t op,uint8_t *data,uint32_t leng) {
148 queue *q = (queue*)que;
149 qentry *qe;
150 qe = malloc(sizeof(qentry));
151 passert(qe);
152 qe->id = id;
153 qe->op = op;
154 qe->data = data;
155 qe->leng = leng;
156 qe->next = NULL;
157 zassert(pthread_mutex_lock(&(q->lock)));
158 if (q->maxsize) {
159 if (leng>q->maxsize) {
160 zassert(pthread_mutex_unlock(&(q->lock)));
161 free(qe);
162 errno = EDEADLK;
163 return -1;
164 }
165 while (q->size+leng>q->maxsize && q->closed==0) {
166 q->fullwaiting++;
167 zassert(pthread_cond_wait(&(q->waitfull),&(q->lock)));
168 }
169 if (q->closed) {
170 zassert(pthread_mutex_unlock(&(q->lock)));
171 errno = EIO;
172 return -1;
173 }
174 }
175 q->elements++;
176 q->size += leng;
177 *(q->tail) = qe;
178 q->tail = &(qe->next);
179 if (q->freewaiting>0) {
180 zassert(pthread_cond_signal(&(q->waitfree)));
181 q->freewaiting--;
182 }
183 zassert(pthread_mutex_unlock(&(q->lock)));
184 return 0;
185 }
186
queue_tryput(void * que,uint32_t id,uint32_t op,uint8_t * data,uint32_t leng)187 int queue_tryput(void *que,uint32_t id,uint32_t op,uint8_t *data,uint32_t leng) {
188 queue *q = (queue*)que;
189 qentry *qe;
190 zassert(pthread_mutex_lock(&(q->lock)));
191 if (q->maxsize) {
192 if (leng>q->maxsize) {
193 zassert(pthread_mutex_unlock(&(q->lock)));
194 errno = EDEADLK;
195 return -1;
196 }
197 if (q->size+leng>q->maxsize) {
198 zassert(pthread_mutex_unlock(&(q->lock)));
199 errno = EBUSY;
200 return -1;
201 }
202 }
203 qe = malloc(sizeof(qentry));
204 passert(qe);
205 qe->id = id;
206 qe->op = op;
207 qe->data = data;
208 qe->leng = leng;
209 qe->next = NULL;
210 q->elements++;
211 q->size += leng;
212 *(q->tail) = qe;
213 q->tail = &(qe->next);
214 if (q->freewaiting>0) {
215 zassert(pthread_cond_signal(&(q->waitfree)));
216 q->freewaiting--;
217 }
218 zassert(pthread_mutex_unlock(&(q->lock)));
219 return 0;
220 }
221
queue_get(void * que,uint32_t * id,uint32_t * op,uint8_t ** data,uint32_t * leng)222 int queue_get(void *que,uint32_t *id,uint32_t *op,uint8_t **data,uint32_t *leng) {
223 queue *q = (queue*)que;
224 qentry *qe;
225 zassert(pthread_mutex_lock(&(q->lock)));
226 while (q->elements==0 && q->closed==0) {
227 q->freewaiting++;
228 zassert(pthread_cond_wait(&(q->waitfree),&(q->lock)));
229 }
230 if (q->closed) {
231 zassert(pthread_mutex_unlock(&(q->lock)));
232 if (id) {
233 *id=0;
234 }
235 if (op) {
236 *op=0;
237 }
238 if (data) {
239 *data=NULL;
240 }
241 if (leng) {
242 *leng=0;
243 }
244 errno = EIO;
245 return -1;
246 }
247 qe = q->head;
248 q->head = qe->next;
249 if (q->head==NULL) {
250 q->tail = &(q->head);
251 }
252 q->elements--;
253 q->size -= qe->leng;
254 if (q->fullwaiting>0) {
255 zassert(pthread_cond_signal(&(q->waitfull)));
256 q->fullwaiting--;
257 }
258 zassert(pthread_mutex_unlock(&(q->lock)));
259 if (id) {
260 *id = qe->id;
261 }
262 if (op) {
263 *op = qe->op;
264 }
265 if (data) {
266 *data = qe->data;
267 }
268 if (leng) {
269 *leng = qe->leng;
270 }
271 free(qe);
272 return 0;
273 }
274
queue_tryget(void * que,uint32_t * id,uint32_t * op,uint8_t ** data,uint32_t * leng)275 int queue_tryget(void *que,uint32_t *id,uint32_t *op,uint8_t **data,uint32_t *leng) {
276 queue *q = (queue*)que;
277 qentry *qe;
278 zassert(pthread_mutex_lock(&(q->lock)));
279 if (q->elements==0) {
280 zassert(pthread_mutex_unlock(&(q->lock)));
281 if (id) {
282 *id=0;
283 }
284 if (op) {
285 *op=0;
286 }
287 if (data) {
288 *data=NULL;
289 }
290 if (leng) {
291 *leng=0;
292 }
293 errno = EBUSY;
294 return -1;
295 }
296 qe = q->head;
297 q->head = qe->next;
298 if (q->head==NULL) {
299 q->tail = &(q->head);
300 }
301 q->elements--;
302 q->size -= qe->leng;
303 if (q->fullwaiting>0) {
304 zassert(pthread_cond_signal(&(q->waitfull)));
305 q->fullwaiting--;
306 }
307 zassert(pthread_mutex_unlock(&(q->lock)));
308 if (id) {
309 *id = qe->id;
310 }
311 if (op) {
312 *op = qe->op;
313 }
314 if (data) {
315 *data = qe->data;
316 }
317 if (leng) {
318 *leng = qe->leng;
319 }
320 free(qe);
321 return 0;
322 }
323