xref: /dragonfly/usr.bin/dsynth/bulk.c (revision 0e32b8c5)
1 /*
2  * Copyright (c) 2019 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * This code uses concepts and configuration based on 'synth', by
8  * John R. Marino <draco@marino.st>, which was written in ada.
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  * 1. Redistributions of source code must retain the above copyright
15  *    notice, this list of conditions and the following disclaimer.
16  * 2. Redistributions in binary form must reproduce the above copyright
17  *    notice, this list of conditions and the following disclaimer in
18  *    the documentation and/or other materials provided with the
19  *    distribution.
20  * 3. Neither the name of The DragonFly Project nor the names of its
21  *    contributors may be used to endorse or promote products derived
22  *    from this software without specific, prior written permission.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
27  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
28  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
29  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
30  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
31  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
32  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
33  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
34  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
35  * SUCH DAMAGE.
36  */
37 #include "dsynth.h"
38 
39 typedef struct job {
40 	pthread_t td;
41 	pthread_cond_t cond;
42 	bulk_t	*active;
43 	int terminate : 1;
44 } job_t;
45 
46 /*
47  * Most of these globals are locked with BulkMutex
48  */
49 static int BulkScanJob;
50 static int BulkCurJobs;
51 static int BulkMaxJobs;
52 static job_t JobsAry[MAXBULK];
53 static void (*BulkFunc)(bulk_t *bulk);
54 static bulk_t *BulkSubmit;
55 static bulk_t **BulkSubmitTail = &BulkSubmit;
56 static bulk_t *BulkResponse;
57 static bulk_t **BulkResponseTail = &BulkResponse;
58 static pthread_cond_t BulkResponseCond;
59 static pthread_mutex_t BulkMutex;
60 
61 static void bulkstart(void);
62 #if 0
63 static int readall(int fd, void *buf, size_t bytes);
64 static int writeall(int fd, const void *buf, size_t bytes);
65 #endif
66 static void *bulkthread(void *arg);
67 
68 void
69 initbulk(void (*func)(bulk_t *bulk), int jobs)
70 {
71 	int i;
72 
73 	if (jobs > MAXBULK)
74 		jobs = MAXBULK;
75 
76 	ddassert(BulkSubmit == NULL);
77 	BulkCurJobs = 0;
78 	BulkMaxJobs = jobs;
79 	BulkFunc = func;
80 	BulkScanJob = 0;
81 
82 	pthread_mutex_init(&BulkMutex, NULL);
83 	pthread_cond_init(&BulkResponseCond, NULL);
84 
85 	pthread_mutex_lock(&BulkMutex);
86 	for (i = 0; i < jobs; ++i) {
87 		pthread_cond_init(&JobsAry[i].cond, NULL);
88 		pthread_create(&JobsAry[i].td, NULL, bulkthread, &JobsAry[i]);
89 	}
90 	pthread_mutex_unlock(&BulkMutex);
91 }
92 
93 void
94 donebulk(void)
95 {
96 	bulk_t *bulk;
97 	int i;
98 
99 	pthread_mutex_lock(&BulkMutex);
100 	while ((bulk = BulkSubmit) != NULL) {
101 		BulkSubmit = bulk->next;
102 		freebulk(bulk);
103 	}
104 	BulkSubmitTail = &BulkSubmit;
105 
106 	for (i = 0; i < BulkMaxJobs; ++i) {
107 		JobsAry[i].terminate = 1;
108 		pthread_cond_signal(&JobsAry[i].cond);
109 	}
110 	pthread_mutex_unlock(&BulkMutex);
111 	for (i = 0; i < BulkMaxJobs; ++i) {
112 		pthread_join(JobsAry[i].td, NULL);
113 		pthread_cond_destroy(&JobsAry[i].cond);
114 		if (JobsAry[i].active) {
115 			freebulk(JobsAry[i].active);
116 			JobsAry[i].active = NULL;
117 			pthread_mutex_lock(&BulkMutex);
118 			--BulkCurJobs;
119 			pthread_mutex_unlock(&BulkMutex);
120 		}
121 		JobsAry[i].terminate = 0;
122 	}
123 	ddassert(BulkCurJobs == 0);
124 
125 	while ((bulk = BulkResponse) != NULL) {
126 		BulkResponse = bulk->next;
127 		freebulk(bulk);
128 	}
129 	BulkResponseTail = &BulkResponse;
130 
131 	BulkFunc = NULL;
132 
133 	bzero(JobsAry, sizeof(JobsAry));
134 }
135 
136 void
137 queuebulk(const char *s1, const char *s2, const char *s3, const char *s4)
138 {
139 	bulk_t *bulk;
140 
141 	bulk = calloc(1, sizeof(*bulk));
142 	if (s1)
143 		bulk->s1 = strdup(s1);
144 	if (s2)
145 		bulk->s2 = strdup(s2);
146 	if (s3)
147 		bulk->s3 = strdup(s3);
148 	if (s4)
149 		bulk->s4 = strdup(s4);
150 	bulk->state = ONSUBMIT;
151 
152 	pthread_mutex_lock(&BulkMutex);
153 	*BulkSubmitTail = bulk;
154 	BulkSubmitTail = &bulk->next;
155 	if (BulkCurJobs < BulkMaxJobs) {
156 		pthread_mutex_unlock(&BulkMutex);
157 		bulkstart();
158 	} else {
159 		pthread_mutex_unlock(&BulkMutex);
160 	}
161 }
162 
163 /*
164  * Fill any idle job slots with new jobs as available.
165  */
166 static
167 void
168 bulkstart(void)
169 {
170 	bulk_t *bulk;
171 	int i;
172 
173 	pthread_mutex_lock(&BulkMutex);
174 	while ((bulk = BulkSubmit) != NULL && BulkCurJobs < BulkMaxJobs) {
175 		i = BulkScanJob + 1;
176 		for (;;) {
177 			i = i % BulkMaxJobs;
178 			if (JobsAry[i].active == NULL)
179 				break;
180 			++i;
181 		}
182 		BulkScanJob = i;
183 		BulkSubmit = bulk->next;
184 		if (BulkSubmit == NULL)
185 			BulkSubmitTail = &BulkSubmit;
186 
187 		bulk->state = ONRUN;
188 		JobsAry[i].active = bulk;
189 		pthread_cond_signal(&JobsAry[i].cond);
190 		++BulkCurJobs;
191 	}
192 	pthread_mutex_unlock(&BulkMutex);
193 }
194 
195 /*
196  * Retrieve completed job or job with activity
197  */
198 bulk_t *
199 getbulk(void)
200 {
201 	bulk_t *bulk;
202 
203 	pthread_mutex_lock(&BulkMutex);
204 	while (BulkCurJobs && BulkResponse == NULL) {
205 		pthread_cond_wait(&BulkResponseCond, &BulkMutex);
206 	}
207 	if (BulkResponse) {
208 		bulk = BulkResponse;
209 		ddassert(bulk->state == ONRESPONSE);
210 		BulkResponse = bulk->next;
211 		if (BulkResponse == NULL)
212 			BulkResponseTail = &BulkResponse;
213 		bulk->state = UNLISTED;
214 	} else {
215 		bulk = NULL;
216 	}
217 	pthread_mutex_unlock(&BulkMutex);
218 	bulkstart();
219 
220 	return bulk;
221 }
222 
223 void
224 freebulk(bulk_t *bulk)
225 {
226 	ddassert(bulk->state == UNLISTED);
227 
228 	if (bulk->s1) {
229 		free(bulk->s1);
230 		bulk->s1 = NULL;
231 	}
232 	if (bulk->s2) {
233 		free(bulk->s2);
234 		bulk->s2 = NULL;
235 	}
236 	if (bulk->s3) {
237 		free(bulk->s3);
238 		bulk->s3 = NULL;
239 	}
240 	if (bulk->s4) {
241 		free(bulk->s4);
242 		bulk->s4 = NULL;
243 	}
244 	if (bulk->r1) {
245 		free(bulk->r1);
246 		bulk->r1 = NULL;
247 	}
248 	if (bulk->r2) {
249 		free(bulk->r2);
250 		bulk->r2 = NULL;
251 	}
252 	if (bulk->r3) {
253 		free(bulk->r3);
254 		bulk->r3 = NULL;
255 	}
256 	if (bulk->r4) {
257 		free(bulk->r4);
258 		bulk->r4 = NULL;
259 	}
260 	free(bulk);
261 }
262 
263 #if 0
264 
265 /*
266  * Returns non-zero if unable to read specified number of bytes
267  */
268 static
269 int
270 readall(int fd, void *buf, size_t bytes)
271 {
272 	ssize_t r;
273 
274 	for (;;) {
275 		r = read(fd, buf, bytes);
276 		if (r == (ssize_t)bytes)
277 			break;
278 		if (r > 0) {
279 			buf = (char *)buf + r;
280 			bytes -= r;
281 			continue;
282 		}
283 		if (r < 0 && errno == EINTR)
284 			continue;
285 		return 1;
286 	}
287 	return 0;
288 }
289 
290 static
291 int
292 writeall(int fd, const void *buf, size_t bytes)
293 {
294 	ssize_t r;
295 
296 	for (;;) {
297 		r = write(fd, buf, bytes);
298 		if (r == (ssize_t)bytes)
299 			break;
300 		if (r > 0) {
301 			buf = (const char *)buf + r;
302 			bytes -= r;
303 			continue;
304 		}
305 		if (r < 0 && errno == EINTR)
306 			continue;
307 		return 1;
308 	}
309 	return 0;
310 }
311 
312 #endif
313 
314 static void *
315 bulkthread(void *arg)
316 {
317 	job_t *job = arg;
318 	bulk_t *bulk;
319 
320 	pthread_mutex_lock(&BulkMutex);
321 	for (;;) {
322 		if (job->terminate)
323 			break;
324 		if (job->active == NULL)
325 			pthread_cond_wait(&job->cond, &BulkMutex);
326 		bulk = job->active;
327 		if (bulk) {
328 			bulk->state = ISRUNNING;
329 
330 			pthread_mutex_unlock(&BulkMutex);
331 			BulkFunc(bulk);
332 			pthread_mutex_lock(&BulkMutex);
333 
334 			bulk->state = ONRESPONSE;
335 			bulk->next = NULL;
336 			*BulkResponseTail = bulk;
337 			BulkResponseTail = &bulk->next;
338 			--BulkCurJobs;
339 			pthread_cond_signal(&BulkResponseCond);
340 		}
341 
342 		/*
343 		 * Optimization - automatically fetch the next job
344 		 */
345 		if ((bulk = BulkSubmit) != NULL && job->terminate == 0) {
346 			BulkSubmit = bulk->next;
347 			if (BulkSubmit == NULL)
348 				BulkSubmitTail = &BulkSubmit;
349 			bulk->state = ONRUN;
350 			job->active = bulk;
351 			++BulkCurJobs;
352 		} else {
353 			job->active = NULL;
354 		}
355 	}
356 	pthread_mutex_unlock(&BulkMutex);
357 
358 	return NULL;
359 }
360