xref: /dragonfly/usr.bin/dsynth/bulk.c (revision 88ed2a5c)
1 /*
2  * Copyright (c) 2019 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * This code uses concepts and configuration based on 'synth', by
8  * John R. Marino <draco@marino.st>, which was written in ada.
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  * 1. Redistributions of source code must retain the above copyright
15  *    notice, this list of conditions and the following disclaimer.
16  * 2. Redistributions in binary form must reproduce the above copyright
17  *    notice, this list of conditions and the following disclaimer in
18  *    the documentation and/or other materials provided with the
19  *    distribution.
20  * 3. Neither the name of The DragonFly Project nor the names of its
21  *    contributors may be used to endorse or promote products derived
22  *    from this software without specific, prior written permission.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
27  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
28  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
29  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
30  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
31  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
32  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
33  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
34  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
35  * SUCH DAMAGE.
36  */
37 #include "dsynth.h"
38 
39 typedef struct job {
40 	pthread_t td;
41 	pthread_cond_t cond;
42 	bulk_t	*active;
43 	int terminate : 1;
44 } job_t;
45 
46 /*
47  * Most of these globals are locked with BulkMutex
48  */
49 static int BulkScanJob;
50 static int BulkCurJobs;
51 static int BulkMaxJobs;
52 static job_t JobsAry[MAXBULK];
53 static void (*BulkFunc)(bulk_t *bulk);
54 static bulk_t *BulkSubmit;
55 static bulk_t **BulkSubmitTail = &BulkSubmit;
56 static bulk_t *BulkResponse;
57 static bulk_t **BulkResponseTail = &BulkResponse;
58 static pthread_cond_t BulkResponseCond;
59 static pthread_mutex_t BulkMutex;
60 
61 static void bulkstart(void);
62 #if 0
63 static int readall(int fd, void *buf, size_t bytes);
64 static int writeall(int fd, const void *buf, size_t bytes);
65 #endif
66 static void *bulkthread(void *arg);
67 
68 /*
69  * Initialize for bulk scan operations.  Always paired with donebulk()
70  */
71 void
72 initbulk(void (*func)(bulk_t *bulk), int jobs)
73 {
74 	int i;
75 
76 	if (jobs > MAXBULK)
77 		jobs = MAXBULK;
78 
79 	ddassert(BulkSubmit == NULL);
80 	BulkCurJobs = 0;
81 	BulkMaxJobs = jobs;
82 	BulkFunc = func;
83 	BulkScanJob = 0;
84 
85 	addbuildenv("__MAKE_CONF", "/dev/null", BENV_ENVIRONMENT);
86 
87 	pthread_mutex_init(&BulkMutex, NULL);
88 	pthread_cond_init(&BulkResponseCond, NULL);
89 
90 	pthread_mutex_lock(&BulkMutex);
91 	for (i = 0; i < jobs; ++i) {
92 		pthread_cond_init(&JobsAry[i].cond, NULL);
93 		pthread_create(&JobsAry[i].td, NULL, bulkthread, &JobsAry[i]);
94 	}
95 	pthread_mutex_unlock(&BulkMutex);
96 }
97 
98 void
99 donebulk(void)
100 {
101 	bulk_t *bulk;
102 	int i;
103 
104 	pthread_mutex_lock(&BulkMutex);
105 	while ((bulk = BulkSubmit) != NULL) {
106 		BulkSubmit = bulk->next;
107 		freebulk(bulk);
108 	}
109 	BulkSubmitTail = &BulkSubmit;
110 
111 	for (i = 0; i < BulkMaxJobs; ++i) {
112 		JobsAry[i].terminate = 1;
113 		pthread_cond_signal(&JobsAry[i].cond);
114 	}
115 	pthread_mutex_unlock(&BulkMutex);
116 	for (i = 0; i < BulkMaxJobs; ++i) {
117 		pthread_join(JobsAry[i].td, NULL);
118 		pthread_cond_destroy(&JobsAry[i].cond);
119 		if (JobsAry[i].active) {
120 			freebulk(JobsAry[i].active);
121 			JobsAry[i].active = NULL;
122 			pthread_mutex_lock(&BulkMutex);
123 			--BulkCurJobs;
124 			pthread_mutex_unlock(&BulkMutex);
125 		}
126 		JobsAry[i].terminate = 0;
127 	}
128 	ddassert(BulkCurJobs == 0);
129 
130 	while ((bulk = BulkResponse) != NULL) {
131 		BulkResponse = bulk->next;
132 		freebulk(bulk);
133 	}
134 	BulkResponseTail = &BulkResponse;
135 
136 	BulkFunc = NULL;
137 
138 	bzero(JobsAry, sizeof(JobsAry));
139 
140 	delbuildenv("__MAKE_CONF");
141 }
142 
143 void
144 queuebulk(const char *s1, const char *s2, const char *s3, const char *s4)
145 {
146 	bulk_t *bulk;
147 
148 	bulk = calloc(1, sizeof(*bulk));
149 	if (s1)
150 		bulk->s1 = strdup(s1);
151 	if (s2)
152 		bulk->s2 = strdup(s2);
153 	if (s3)
154 		bulk->s3 = strdup(s3);
155 	if (s4)
156 		bulk->s4 = strdup(s4);
157 	bulk->state = ONSUBMIT;
158 
159 	pthread_mutex_lock(&BulkMutex);
160 	*BulkSubmitTail = bulk;
161 	BulkSubmitTail = &bulk->next;
162 	if (BulkCurJobs < BulkMaxJobs) {
163 		pthread_mutex_unlock(&BulkMutex);
164 		bulkstart();
165 	} else {
166 		pthread_mutex_unlock(&BulkMutex);
167 	}
168 }
169 
170 /*
171  * Fill any idle job slots with new jobs as available.
172  */
173 static
174 void
175 bulkstart(void)
176 {
177 	bulk_t *bulk;
178 	int i;
179 
180 	pthread_mutex_lock(&BulkMutex);
181 	while ((bulk = BulkSubmit) != NULL && BulkCurJobs < BulkMaxJobs) {
182 		i = BulkScanJob + 1;
183 		for (;;) {
184 			i = i % BulkMaxJobs;
185 			if (JobsAry[i].active == NULL)
186 				break;
187 			++i;
188 		}
189 		BulkScanJob = i;
190 		BulkSubmit = bulk->next;
191 		if (BulkSubmit == NULL)
192 			BulkSubmitTail = &BulkSubmit;
193 
194 		bulk->state = ONRUN;
195 		JobsAry[i].active = bulk;
196 		pthread_cond_signal(&JobsAry[i].cond);
197 		++BulkCurJobs;
198 	}
199 	pthread_mutex_unlock(&BulkMutex);
200 }
201 
202 /*
203  * Retrieve completed job or job with activity
204  */
205 bulk_t *
206 getbulk(void)
207 {
208 	bulk_t *bulk;
209 
210 	pthread_mutex_lock(&BulkMutex);
211 	while (BulkCurJobs && BulkResponse == NULL) {
212 		pthread_cond_wait(&BulkResponseCond, &BulkMutex);
213 	}
214 	if (BulkResponse) {
215 		bulk = BulkResponse;
216 		ddassert(bulk->state == ONRESPONSE);
217 		BulkResponse = bulk->next;
218 		if (BulkResponse == NULL)
219 			BulkResponseTail = &BulkResponse;
220 		bulk->state = UNLISTED;
221 	} else {
222 		bulk = NULL;
223 	}
224 	pthread_mutex_unlock(&BulkMutex);
225 	bulkstart();
226 
227 	return bulk;
228 }
229 
230 void
231 freebulk(bulk_t *bulk)
232 {
233 	ddassert(bulk->state == UNLISTED);
234 
235 	if (bulk->s1) {
236 		free(bulk->s1);
237 		bulk->s1 = NULL;
238 	}
239 	if (bulk->s2) {
240 		free(bulk->s2);
241 		bulk->s2 = NULL;
242 	}
243 	if (bulk->s3) {
244 		free(bulk->s3);
245 		bulk->s3 = NULL;
246 	}
247 	if (bulk->s4) {
248 		free(bulk->s4);
249 		bulk->s4 = NULL;
250 	}
251 	if (bulk->r1) {
252 		free(bulk->r1);
253 		bulk->r1 = NULL;
254 	}
255 	if (bulk->r2) {
256 		free(bulk->r2);
257 		bulk->r2 = NULL;
258 	}
259 	if (bulk->r3) {
260 		free(bulk->r3);
261 		bulk->r3 = NULL;
262 	}
263 	if (bulk->r4) {
264 		free(bulk->r4);
265 		bulk->r4 = NULL;
266 	}
267 	free(bulk);
268 }
269 
270 #if 0
271 
272 /*
273  * Returns non-zero if unable to read specified number of bytes
274  */
275 static
276 int
277 readall(int fd, void *buf, size_t bytes)
278 {
279 	ssize_t r;
280 
281 	for (;;) {
282 		r = read(fd, buf, bytes);
283 		if (r == (ssize_t)bytes)
284 			break;
285 		if (r > 0) {
286 			buf = (char *)buf + r;
287 			bytes -= r;
288 			continue;
289 		}
290 		if (r < 0 && errno == EINTR)
291 			continue;
292 		return 1;
293 	}
294 	return 0;
295 }
296 
297 static
298 int
299 writeall(int fd, const void *buf, size_t bytes)
300 {
301 	ssize_t r;
302 
303 	for (;;) {
304 		r = write(fd, buf, bytes);
305 		if (r == (ssize_t)bytes)
306 			break;
307 		if (r > 0) {
308 			buf = (const char *)buf + r;
309 			bytes -= r;
310 			continue;
311 		}
312 		if (r < 0 && errno == EINTR)
313 			continue;
314 		return 1;
315 	}
316 	return 0;
317 }
318 
319 #endif
320 
321 static void *
322 bulkthread(void *arg)
323 {
324 	job_t *job = arg;
325 	bulk_t *bulk;
326 
327 	pthread_mutex_lock(&BulkMutex);
328 	for (;;) {
329 		if (job->terminate)
330 			break;
331 		if (job->active == NULL)
332 			pthread_cond_wait(&job->cond, &BulkMutex);
333 		bulk = job->active;
334 		if (bulk) {
335 			bulk->state = ISRUNNING;
336 
337 			pthread_mutex_unlock(&BulkMutex);
338 			BulkFunc(bulk);
339 			pthread_mutex_lock(&BulkMutex);
340 
341 			bulk->state = ONRESPONSE;
342 			bulk->next = NULL;
343 			*BulkResponseTail = bulk;
344 			BulkResponseTail = &bulk->next;
345 			--BulkCurJobs;
346 			pthread_cond_signal(&BulkResponseCond);
347 		}
348 
349 		/*
350 		 * Optimization - automatically fetch the next job
351 		 */
352 		if ((bulk = BulkSubmit) != NULL && job->terminate == 0) {
353 			BulkSubmit = bulk->next;
354 			if (BulkSubmit == NULL)
355 				BulkSubmitTail = &BulkSubmit;
356 			bulk->state = ONRUN;
357 			job->active = bulk;
358 			++BulkCurJobs;
359 		} else {
360 			job->active = NULL;
361 		}
362 	}
363 	pthread_mutex_unlock(&BulkMutex);
364 
365 	return NULL;
366 }
367