1 /*
2  * Copyright (C) 2021 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3  *
4  * This file is part of MooseFS.
5  *
6  * MooseFS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, version 2 (only).
9  *
10  * MooseFS is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with MooseFS; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18  * or visit http://www.gnu.org/licenses/gpl-2.0.html
19  */
20 
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24 
25 #include <stdlib.h>
26 #include <pthread.h>
27 #include <inttypes.h>
28 #include <errno.h>
29 
30 #include "massert.h"
31 
32 typedef struct _qentry {
33 	void *data;
34 	struct _qentry *next;
35 } qentry;
36 
37 typedef struct _queue {
38 	qentry *head,**tail;
39 	uint32_t elements;
40 	uint32_t maxelements;
41 	uint32_t closed;
42 	pthread_cond_t waitfree,waitfull;
43 	pthread_mutex_t lock;
44 } queue;
45 
squeue_new(uint32_t length)46 void* squeue_new(uint32_t length) {
47 	queue *q;
48 	q = (queue*)malloc(sizeof(queue));
49 	passert(q);
50 	q->head = NULL;
51 	q->tail = &(q->head);
52 	q->elements = 0;
53 	q->maxelements = length;
54 	q->closed = 0;
55 	if (length) {
56 		zassert(pthread_cond_init(&(q->waitfull),NULL));
57 	}
58 	zassert(pthread_cond_init(&(q->waitfree),NULL));
59 	zassert(pthread_mutex_init(&(q->lock),NULL));
60 	return q;
61 }
62 
squeue_delete(void * que)63 void squeue_delete(void *que) {
64 	queue *q = (queue*)que;
65 	qentry *qe,*qen;
66 	zassert(pthread_mutex_lock(&(q->lock)));
67 	for (qe = q->head ; qe ; qe = qen) {
68 		qen = qe->next;
69 		free(qe->data);
70 		free(qe);
71 	}
72 	zassert(pthread_mutex_unlock(&(q->lock)));
73 	zassert(pthread_mutex_destroy(&(q->lock)));
74 	zassert(pthread_cond_destroy(&(q->waitfree)));
75 	if (q->maxelements) {
76 		zassert(pthread_cond_destroy(&(q->waitfull)));
77 	}
78 	free(q);
79 }
80 
squeue_close(void * que)81 void squeue_close(void *que) {
82 	queue *q = (queue*)que;
83 	zassert(pthread_mutex_lock(&(q->lock)));
84 	q->closed = 1;
85 	zassert(pthread_cond_broadcast(&(q->waitfree)));
86 	if (q->maxelements) {
87 		zassert(pthread_cond_broadcast(&(q->waitfull)));
88 	}
89 	zassert(pthread_mutex_unlock(&(q->lock)));
90 }
91 
squeue_isempty(void * que)92 int squeue_isempty(void *que) {
93 	queue *q = (queue*)que;
94 	int r;
95 	zassert(pthread_mutex_lock(&(q->lock)));
96 	r=(q->elements==0)?1:0;
97 	zassert(pthread_mutex_unlock(&(q->lock)));
98 	return r;
99 }
100 
squeue_elements(void * que)101 uint32_t squeue_elements(void *que) {
102 	queue *q = (queue*)que;
103 	uint32_t r;
104 	zassert(pthread_mutex_lock(&(q->lock)));
105 	r=q->elements;
106 	zassert(pthread_mutex_unlock(&(q->lock)));
107 	return r;
108 }
109 
squeue_isfull(void * que)110 int squeue_isfull(void *que) {
111 	queue *q = (queue*)que;
112 	int r;
113 	zassert(pthread_mutex_lock(&(q->lock)));
114 	r = (q->maxelements>0 && q->maxelements<=q->elements)?1:0;
115 	zassert(pthread_mutex_unlock(&(q->lock)));
116 	return r;
117 }
118 
squeue_sizeleft(void * que)119 uint32_t squeue_sizeleft(void *que) {
120 	queue *q = (queue*)que;
121 	uint32_t r;
122 	zassert(pthread_mutex_lock(&(q->lock)));
123 	if (q->maxelements>0) {
124 		r = q->maxelements-q->elements;
125 	} else {
126 		r = 0xFFFFFFFF;
127 	}
128 	zassert(pthread_mutex_unlock(&(q->lock)));
129 	return r;
130 }
131 
squeue_put(void * que,void * data)132 int squeue_put(void *que,void *data) {
133 	queue *q = (queue*)que;
134 	qentry *qe;
135 	qe = malloc(sizeof(qentry));
136 	passert(qe);
137 	qe->data = data;
138 	qe->next = NULL;
139 	zassert(pthread_mutex_lock(&(q->lock)));
140 	if (q->maxelements) {
141 		while (q->elements>=q->maxelements && q->closed==0) {
142 			zassert(pthread_cond_wait(&(q->waitfull),&(q->lock)));
143 		}
144 		if (q->closed) {
145 			zassert(pthread_mutex_unlock(&(q->lock)));
146 			free(qe);
147 			errno = EIO;
148 			return -1;
149 		}
150 	}
151 	q->elements++;
152 	*(q->tail) = qe;
153 	q->tail = &(qe->next);
154 	zassert(pthread_cond_signal(&(q->waitfree)));
155 	zassert(pthread_mutex_unlock(&(q->lock)));
156 	return 0;
157 }
158 
squeue_tryput(void * que,void * data)159 int squeue_tryput(void *que,void *data) {
160 	queue *q = (queue*)que;
161 	qentry *qe;
162 	zassert(pthread_mutex_lock(&(q->lock)));
163 	if (q->maxelements) {
164 		if (q->elements>=q->maxelements) {
165 			zassert(pthread_mutex_unlock(&(q->lock)));
166 			errno = EBUSY;
167 			return -1;
168 		}
169 	}
170 	qe = malloc(sizeof(qentry));
171 	passert(qe);
172 	qe->data = data;
173 	qe->next = NULL;
174 	q->elements++;
175 	*(q->tail) = qe;
176 	q->tail = &(qe->next);
177 	zassert(pthread_cond_signal(&(q->waitfree)));
178 	zassert(pthread_mutex_unlock(&(q->lock)));
179 	return 0;
180 }
181 
squeue_get(void * que,void ** data)182 int squeue_get(void *que,void **data) {
183 	queue *q = (queue*)que;
184 	qentry *qe;
185 	zassert(pthread_mutex_lock(&(q->lock)));
186 	while (q->elements==0 && q->closed==0) {
187 		zassert(pthread_cond_wait(&(q->waitfree),&(q->lock)));
188 	}
189 	if (q->closed) {
190 		zassert(pthread_mutex_unlock(&(q->lock)));
191 		if (data) {
192 			*data=NULL;
193 		}
194 		errno = EIO;
195 		return -1;
196 	}
197 	qe = q->head;
198 	q->head = qe->next;
199 	if (q->head==NULL) {
200 		q->tail = &(q->head);
201 	}
202 	q->elements--;
203 	if (q->maxelements) {
204 		zassert(pthread_cond_signal(&(q->waitfull)));
205 	}
206 	zassert(pthread_mutex_unlock(&(q->lock)));
207 	if (data) {
208 		*data = qe->data;
209 	}
210 	free(qe);
211 	return 0;
212 }
213 
squeue_tryget(void * que,void ** data)214 int squeue_tryget(void *que,void **data) {
215 	queue *q = (queue*)que;
216 	qentry *qe;
217 	zassert(pthread_mutex_lock(&(q->lock)));
218 	if (q->elements==0) {
219 		zassert(pthread_mutex_unlock(&(q->lock)));
220 		if (data) {
221 			*data=NULL;
222 		}
223 		errno = EBUSY;
224 		return -1;
225 	}
226 	qe = q->head;
227 	q->head = qe->next;
228 	if (q->head==NULL) {
229 		q->tail = &(q->head);
230 	}
231 	q->elements--;
232 	if (q->maxelements) {
233 		zassert(pthread_cond_signal(&(q->waitfull)));
234 	}
235 	zassert(pthread_mutex_unlock(&(q->lock)));
236 	if (data) {
237 		*data = qe->data;
238 	}
239 	free(qe);
240 	return 0;
241 }
242