1 /*
2  * Copyright (C) 2016 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3  *
4  * This file is part of MooseFS.
5  *
6  * MooseFS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, version 2 (only).
9  *
10  * MooseFS is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with MooseFS; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18  * or visit http://www.gnu.org/licenses/gpl-2.0.html
19  */
20 
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24 
25 #include <stdlib.h>
26 #include <pthread.h>
27 #include <inttypes.h>
28 #include <errno.h>
29 
30 #include "massert.h"
31 
32 typedef struct _qentry {
33 	uint32_t id;
34 	uint32_t op;
35 	uint8_t *data;
36 	uint32_t leng;
37 	struct _qentry *next;
38 } qentry;
39 
40 typedef struct _queue {
41 	qentry *head,**tail;
42 	uint32_t elements;
43 	uint32_t size;
44 	uint32_t maxsize;
45 	uint32_t freewaiting;
46 	uint32_t fullwaiting;
47 	uint32_t closed;
48 	pthread_cond_t waitfree,waitfull;
49 	pthread_mutex_t lock;
50 } queue;
51 
queue_new(uint32_t size)52 void* queue_new(uint32_t size) {
53 	queue *q;
54 	q = (queue*)malloc(sizeof(queue));
55 	passert(q);
56 	q->head = NULL;
57 	q->tail = &(q->head);
58 	q->elements = 0;
59 	q->size = 0;
60 	q->maxsize = size;
61 	q->freewaiting = 0;
62 	q->fullwaiting = 0;
63 	q->closed = 0;
64 	if (size) {
65 		zassert(pthread_cond_init(&(q->waitfull),NULL));
66 	}
67 	zassert(pthread_cond_init(&(q->waitfree),NULL));
68 	zassert(pthread_mutex_init(&(q->lock),NULL));
69 	return q;
70 }
71 
queue_delete(void * que)72 void queue_delete(void *que) {
73 	queue *q = (queue*)que;
74 	qentry *qe,*qen;
75 	zassert(pthread_mutex_lock(&(q->lock)));
76 	sassert(q->freewaiting==0);
77 	sassert(q->fullwaiting==0);
78 	for (qe = q->head ; qe ; qe = qen) {
79 		qen = qe->next;
80 		free(qe->data);
81 		free(qe);
82 	}
83 	zassert(pthread_mutex_unlock(&(q->lock)));
84 	zassert(pthread_mutex_destroy(&(q->lock)));
85 	zassert(pthread_cond_destroy(&(q->waitfree)));
86 	if (q->maxsize) {
87 		zassert(pthread_cond_destroy(&(q->waitfull)));
88 	}
89 	free(q);
90 }
91 
queue_close(void * que)92 void queue_close(void *que) {
93 	queue *q = (queue*)que;
94 	zassert(pthread_mutex_lock(&(q->lock)));
95 	q->closed = 1;
96 	if (q->freewaiting>0) {
97 		zassert(pthread_cond_broadcast(&(q->waitfree)));
98 		q->freewaiting = 0;
99 	}
100 	if (q->fullwaiting>0) {
101 		zassert(pthread_cond_broadcast(&(q->waitfull)));
102 		q->fullwaiting = 0;
103 	}
104 	zassert(pthread_mutex_unlock(&(q->lock)));
105 }
106 
queue_isempty(void * que)107 int queue_isempty(void *que) {
108 	queue *q = (queue*)que;
109 	int r;
110 	zassert(pthread_mutex_lock(&(q->lock)));
111 	r=(q->elements==0)?1:0;
112 	zassert(pthread_mutex_unlock(&(q->lock)));
113 	return r;
114 }
115 
queue_elements(void * que)116 uint32_t queue_elements(void *que) {
117 	queue *q = (queue*)que;
118 	uint32_t r;
119 	zassert(pthread_mutex_lock(&(q->lock)));
120 	r=q->elements;
121 	zassert(pthread_mutex_unlock(&(q->lock)));
122 	return r;
123 }
124 
queue_isfull(void * que)125 int queue_isfull(void *que) {
126 	queue *q = (queue*)que;
127 	int r;
128 	zassert(pthread_mutex_lock(&(q->lock)));
129 	r = (q->maxsize>0 && q->maxsize<=q->size)?1:0;
130 	zassert(pthread_mutex_unlock(&(q->lock)));
131 	return r;
132 }
133 
queue_sizeleft(void * que)134 uint32_t queue_sizeleft(void *que) {
135 	queue *q = (queue*)que;
136 	uint32_t r;
137 	zassert(pthread_mutex_lock(&(q->lock)));
138 	if (q->maxsize>0) {
139 		r = q->maxsize-q->size;
140 	} else {
141 		r = 0xFFFFFFFF;
142 	}
143 	zassert(pthread_mutex_unlock(&(q->lock)));
144 	return r;
145 }
146 
queue_put(void * que,uint32_t id,uint32_t op,uint8_t * data,uint32_t leng)147 int queue_put(void *que,uint32_t id,uint32_t op,uint8_t *data,uint32_t leng) {
148 	queue *q = (queue*)que;
149 	qentry *qe;
150 	qe = malloc(sizeof(qentry));
151 	passert(qe);
152 	qe->id = id;
153 	qe->op = op;
154 	qe->data = data;
155 	qe->leng = leng;
156 	qe->next = NULL;
157 	zassert(pthread_mutex_lock(&(q->lock)));
158 	if (q->maxsize) {
159 		if (leng>q->maxsize) {
160 			zassert(pthread_mutex_unlock(&(q->lock)));
161 			free(qe);
162 			errno = EDEADLK;
163 			return -1;
164 		}
165 		while (q->size+leng>q->maxsize && q->closed==0) {
166 			q->fullwaiting++;
167 			zassert(pthread_cond_wait(&(q->waitfull),&(q->lock)));
168 		}
169 		if (q->closed) {
170 			zassert(pthread_mutex_unlock(&(q->lock)));
171 			errno = EIO;
172 			return -1;
173 		}
174 	}
175 	q->elements++;
176 	q->size += leng;
177 	*(q->tail) = qe;
178 	q->tail = &(qe->next);
179 	if (q->freewaiting>0) {
180 		zassert(pthread_cond_signal(&(q->waitfree)));
181 		q->freewaiting--;
182 	}
183 	zassert(pthread_mutex_unlock(&(q->lock)));
184 	return 0;
185 }
186 
queue_tryput(void * que,uint32_t id,uint32_t op,uint8_t * data,uint32_t leng)187 int queue_tryput(void *que,uint32_t id,uint32_t op,uint8_t *data,uint32_t leng) {
188 	queue *q = (queue*)que;
189 	qentry *qe;
190 	zassert(pthread_mutex_lock(&(q->lock)));
191 	if (q->maxsize) {
192 		if (leng>q->maxsize) {
193 			zassert(pthread_mutex_unlock(&(q->lock)));
194 			errno = EDEADLK;
195 			return -1;
196 		}
197 		if (q->size+leng>q->maxsize) {
198 			zassert(pthread_mutex_unlock(&(q->lock)));
199 			errno = EBUSY;
200 			return -1;
201 		}
202 	}
203 	qe = malloc(sizeof(qentry));
204 	passert(qe);
205 	qe->id = id;
206 	qe->op = op;
207 	qe->data = data;
208 	qe->leng = leng;
209 	qe->next = NULL;
210 	q->elements++;
211 	q->size += leng;
212 	*(q->tail) = qe;
213 	q->tail = &(qe->next);
214 	if (q->freewaiting>0) {
215 		zassert(pthread_cond_signal(&(q->waitfree)));
216 		q->freewaiting--;
217 	}
218 	zassert(pthread_mutex_unlock(&(q->lock)));
219 	return 0;
220 }
221 
queue_get(void * que,uint32_t * id,uint32_t * op,uint8_t ** data,uint32_t * leng)222 int queue_get(void *que,uint32_t *id,uint32_t *op,uint8_t **data,uint32_t *leng) {
223 	queue *q = (queue*)que;
224 	qentry *qe;
225 	zassert(pthread_mutex_lock(&(q->lock)));
226 	while (q->elements==0 && q->closed==0) {
227 		q->freewaiting++;
228 		zassert(pthread_cond_wait(&(q->waitfree),&(q->lock)));
229 	}
230 	if (q->closed) {
231 		zassert(pthread_mutex_unlock(&(q->lock)));
232 		if (id) {
233 			*id=0;
234 		}
235 		if (op) {
236 			*op=0;
237 		}
238 		if (data) {
239 			*data=NULL;
240 		}
241 		if (leng) {
242 			*leng=0;
243 		}
244 		errno = EIO;
245 		return -1;
246 	}
247 	qe = q->head;
248 	q->head = qe->next;
249 	if (q->head==NULL) {
250 		q->tail = &(q->head);
251 	}
252 	q->elements--;
253 	q->size -= qe->leng;
254 	if (q->fullwaiting>0) {
255 		zassert(pthread_cond_signal(&(q->waitfull)));
256 		q->fullwaiting--;
257 	}
258 	zassert(pthread_mutex_unlock(&(q->lock)));
259 	if (id) {
260 		*id = qe->id;
261 	}
262 	if (op) {
263 		*op = qe->op;
264 	}
265 	if (data) {
266 		*data = qe->data;
267 	}
268 	if (leng) {
269 		*leng = qe->leng;
270 	}
271 	free(qe);
272 	return 0;
273 }
274 
queue_tryget(void * que,uint32_t * id,uint32_t * op,uint8_t ** data,uint32_t * leng)275 int queue_tryget(void *que,uint32_t *id,uint32_t *op,uint8_t **data,uint32_t *leng) {
276 	queue *q = (queue*)que;
277 	qentry *qe;
278 	zassert(pthread_mutex_lock(&(q->lock)));
279 	if (q->elements==0) {
280 		zassert(pthread_mutex_unlock(&(q->lock)));
281 		if (id) {
282 			*id=0;
283 		}
284 		if (op) {
285 			*op=0;
286 		}
287 		if (data) {
288 			*data=NULL;
289 		}
290 		if (leng) {
291 			*leng=0;
292 		}
293 		errno = EBUSY;
294 		return -1;
295 	}
296 	qe = q->head;
297 	q->head = qe->next;
298 	if (q->head==NULL) {
299 		q->tail = &(q->head);
300 	}
301 	q->elements--;
302 	q->size -= qe->leng;
303 	if (q->fullwaiting>0) {
304 		zassert(pthread_cond_signal(&(q->waitfull)));
305 		q->fullwaiting--;
306 	}
307 	zassert(pthread_mutex_unlock(&(q->lock)));
308 	if (id) {
309 		*id = qe->id;
310 	}
311 	if (op) {
312 		*op = qe->op;
313 	}
314 	if (data) {
315 		*data = qe->data;
316 	}
317 	if (leng) {
318 		*leng = qe->leng;
319 	}
320 	free(qe);
321 	return 0;
322 }
323