1 /*
2 * Copyright (C) 2021 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3 *
4 * This file is part of MooseFS.
5 *
6 * MooseFS is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, version 2 (only).
9 *
10 * MooseFS is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with MooseFS; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18 * or visit http://www.gnu.org/licenses/gpl-2.0.html
19 */
20
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24
25 #include <stdlib.h>
26 #include <pthread.h>
27 #include <inttypes.h>
28 #include <errno.h>
29
30 #include "massert.h"
31
32 typedef struct _qentry {
33 void *data;
34 struct _qentry *next;
35 } qentry;
36
37 typedef struct _queue {
38 qentry *head,**tail;
39 uint32_t elements;
40 uint32_t maxelements;
41 uint32_t closed;
42 pthread_cond_t waitfree,waitfull;
43 pthread_mutex_t lock;
44 } queue;
45
squeue_new(uint32_t length)46 void* squeue_new(uint32_t length) {
47 queue *q;
48 q = (queue*)malloc(sizeof(queue));
49 passert(q);
50 q->head = NULL;
51 q->tail = &(q->head);
52 q->elements = 0;
53 q->maxelements = length;
54 q->closed = 0;
55 if (length) {
56 zassert(pthread_cond_init(&(q->waitfull),NULL));
57 }
58 zassert(pthread_cond_init(&(q->waitfree),NULL));
59 zassert(pthread_mutex_init(&(q->lock),NULL));
60 return q;
61 }
62
squeue_delete(void * que)63 void squeue_delete(void *que) {
64 queue *q = (queue*)que;
65 qentry *qe,*qen;
66 zassert(pthread_mutex_lock(&(q->lock)));
67 for (qe = q->head ; qe ; qe = qen) {
68 qen = qe->next;
69 free(qe->data);
70 free(qe);
71 }
72 zassert(pthread_mutex_unlock(&(q->lock)));
73 zassert(pthread_mutex_destroy(&(q->lock)));
74 zassert(pthread_cond_destroy(&(q->waitfree)));
75 if (q->maxelements) {
76 zassert(pthread_cond_destroy(&(q->waitfull)));
77 }
78 free(q);
79 }
80
squeue_close(void * que)81 void squeue_close(void *que) {
82 queue *q = (queue*)que;
83 zassert(pthread_mutex_lock(&(q->lock)));
84 q->closed = 1;
85 zassert(pthread_cond_broadcast(&(q->waitfree)));
86 if (q->maxelements) {
87 zassert(pthread_cond_broadcast(&(q->waitfull)));
88 }
89 zassert(pthread_mutex_unlock(&(q->lock)));
90 }
91
squeue_isempty(void * que)92 int squeue_isempty(void *que) {
93 queue *q = (queue*)que;
94 int r;
95 zassert(pthread_mutex_lock(&(q->lock)));
96 r=(q->elements==0)?1:0;
97 zassert(pthread_mutex_unlock(&(q->lock)));
98 return r;
99 }
100
squeue_elements(void * que)101 uint32_t squeue_elements(void *que) {
102 queue *q = (queue*)que;
103 uint32_t r;
104 zassert(pthread_mutex_lock(&(q->lock)));
105 r=q->elements;
106 zassert(pthread_mutex_unlock(&(q->lock)));
107 return r;
108 }
109
squeue_isfull(void * que)110 int squeue_isfull(void *que) {
111 queue *q = (queue*)que;
112 int r;
113 zassert(pthread_mutex_lock(&(q->lock)));
114 r = (q->maxelements>0 && q->maxelements<=q->elements)?1:0;
115 zassert(pthread_mutex_unlock(&(q->lock)));
116 return r;
117 }
118
squeue_sizeleft(void * que)119 uint32_t squeue_sizeleft(void *que) {
120 queue *q = (queue*)que;
121 uint32_t r;
122 zassert(pthread_mutex_lock(&(q->lock)));
123 if (q->maxelements>0) {
124 r = q->maxelements-q->elements;
125 } else {
126 r = 0xFFFFFFFF;
127 }
128 zassert(pthread_mutex_unlock(&(q->lock)));
129 return r;
130 }
131
squeue_put(void * que,void * data)132 int squeue_put(void *que,void *data) {
133 queue *q = (queue*)que;
134 qentry *qe;
135 qe = malloc(sizeof(qentry));
136 passert(qe);
137 qe->data = data;
138 qe->next = NULL;
139 zassert(pthread_mutex_lock(&(q->lock)));
140 if (q->maxelements) {
141 while (q->elements>=q->maxelements && q->closed==0) {
142 zassert(pthread_cond_wait(&(q->waitfull),&(q->lock)));
143 }
144 if (q->closed) {
145 zassert(pthread_mutex_unlock(&(q->lock)));
146 free(qe);
147 errno = EIO;
148 return -1;
149 }
150 }
151 q->elements++;
152 *(q->tail) = qe;
153 q->tail = &(qe->next);
154 zassert(pthread_cond_signal(&(q->waitfree)));
155 zassert(pthread_mutex_unlock(&(q->lock)));
156 return 0;
157 }
158
squeue_tryput(void * que,void * data)159 int squeue_tryput(void *que,void *data) {
160 queue *q = (queue*)que;
161 qentry *qe;
162 zassert(pthread_mutex_lock(&(q->lock)));
163 if (q->maxelements) {
164 if (q->elements>=q->maxelements) {
165 zassert(pthread_mutex_unlock(&(q->lock)));
166 errno = EBUSY;
167 return -1;
168 }
169 }
170 qe = malloc(sizeof(qentry));
171 passert(qe);
172 qe->data = data;
173 qe->next = NULL;
174 q->elements++;
175 *(q->tail) = qe;
176 q->tail = &(qe->next);
177 zassert(pthread_cond_signal(&(q->waitfree)));
178 zassert(pthread_mutex_unlock(&(q->lock)));
179 return 0;
180 }
181
squeue_get(void * que,void ** data)182 int squeue_get(void *que,void **data) {
183 queue *q = (queue*)que;
184 qentry *qe;
185 zassert(pthread_mutex_lock(&(q->lock)));
186 while (q->elements==0 && q->closed==0) {
187 zassert(pthread_cond_wait(&(q->waitfree),&(q->lock)));
188 }
189 if (q->closed) {
190 zassert(pthread_mutex_unlock(&(q->lock)));
191 if (data) {
192 *data=NULL;
193 }
194 errno = EIO;
195 return -1;
196 }
197 qe = q->head;
198 q->head = qe->next;
199 if (q->head==NULL) {
200 q->tail = &(q->head);
201 }
202 q->elements--;
203 if (q->maxelements) {
204 zassert(pthread_cond_signal(&(q->waitfull)));
205 }
206 zassert(pthread_mutex_unlock(&(q->lock)));
207 if (data) {
208 *data = qe->data;
209 }
210 free(qe);
211 return 0;
212 }
213
squeue_tryget(void * que,void ** data)214 int squeue_tryget(void *que,void **data) {
215 queue *q = (queue*)que;
216 qentry *qe;
217 zassert(pthread_mutex_lock(&(q->lock)));
218 if (q->elements==0) {
219 zassert(pthread_mutex_unlock(&(q->lock)));
220 if (data) {
221 *data=NULL;
222 }
223 errno = EBUSY;
224 return -1;
225 }
226 qe = q->head;
227 q->head = qe->next;
228 if (q->head==NULL) {
229 q->tail = &(q->head);
230 }
231 q->elements--;
232 if (q->maxelements) {
233 zassert(pthread_cond_signal(&(q->waitfull)));
234 }
235 zassert(pthread_mutex_unlock(&(q->lock)));
236 if (data) {
237 *data = qe->data;
238 }
239 free(qe);
240 return 0;
241 }
242