xref: /dragonfly/usr.bin/dsynth/bulk.c (revision 0212bfce)
1 /*
2  * Copyright (c) 2019 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * This code uses concepts and configuration based on 'synth', by
8  * John R. Marino <draco@marino.st>, which was written in ada.
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  * 1. Redistributions of source code must retain the above copyright
15  *    notice, this list of conditions and the following disclaimer.
16  * 2. Redistributions in binary form must reproduce the above copyright
17  *    notice, this list of conditions and the following disclaimer in
18  *    the documentation and/or other materials provided with the
19  *    distribution.
20  * 3. Neither the name of The DragonFly Project nor the names of its
21  *    contributors may be used to endorse or promote products derived
22  *    from this software without specific, prior written permission.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
27  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
28  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
29  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
30  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
31  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
32  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
33  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
34  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
35  * SUCH DAMAGE.
36  */
37 #include "dsynth.h"
38 
39 typedef struct job {
40 	pthread_t td;
41 	pthread_cond_t cond;
42 	bulk_t	*active;
43 	int terminate : 1;
44 } job_t;
45 
46 /*
47  * Most of these globals are locked with BulkMutex
48  */
49 static int BulkScanJob;
50 static int BulkCurJobs;
51 static int BulkMaxJobs;
52 static job_t JobsAry[MAXBULK];
53 static void (*BulkFunc)(bulk_t *bulk);
54 static bulk_t *BulkSubmit;
55 static bulk_t **BulkSubmitTail = &BulkSubmit;
56 static bulk_t *BulkResponse;
57 static bulk_t **BulkResponseTail = &BulkResponse;
58 static pthread_cond_t BulkResponseCond;
59 static pthread_mutex_t BulkMutex;
60 
61 static void bulkstart(void);
62 #if 0
63 static int readall(int fd, void *buf, size_t bytes);
64 static int writeall(int fd, const void *buf, size_t bytes);
65 #endif
66 static void *bulkthread(void *arg);
67 
68 /*
69  * Initialize for bulk scan operations.  Always paired with donebulk()
70  */
71 void
72 initbulk(void (*func)(bulk_t *bulk), int jobs)
73 {
74 	int i;
75 
76 	if (jobs > MAXBULK)
77 		jobs = MAXBULK;
78 
79 	ddassert(BulkSubmit == NULL);
80 	BulkCurJobs = 0;
81 	BulkMaxJobs = jobs;
82 	BulkFunc = func;
83 	BulkScanJob = 0;
84 
85 	addbuildenv("__MAKE_CONF", "/dev/null",
86 		    BENV_ENVIRONMENT | BENV_PKGLIST);
87 
88 	pthread_mutex_init(&BulkMutex, NULL);
89 	pthread_cond_init(&BulkResponseCond, NULL);
90 
91 	pthread_mutex_lock(&BulkMutex);
92 	for (i = 0; i < jobs; ++i) {
93 		pthread_cond_init(&JobsAry[i].cond, NULL);
94 		pthread_create(&JobsAry[i].td, NULL, bulkthread, &JobsAry[i]);
95 	}
96 	pthread_mutex_unlock(&BulkMutex);
97 }
98 
99 void
100 donebulk(void)
101 {
102 	bulk_t *bulk;
103 	int i;
104 
105 	pthread_mutex_lock(&BulkMutex);
106 	while ((bulk = BulkSubmit) != NULL) {
107 		BulkSubmit = bulk->next;
108 		freebulk(bulk);
109 	}
110 	BulkSubmitTail = &BulkSubmit;
111 
112 	for (i = 0; i < BulkMaxJobs; ++i) {
113 		JobsAry[i].terminate = 1;
114 		pthread_cond_signal(&JobsAry[i].cond);
115 	}
116 	pthread_mutex_unlock(&BulkMutex);
117 	for (i = 0; i < BulkMaxJobs; ++i) {
118 		pthread_join(JobsAry[i].td, NULL);
119 		pthread_cond_destroy(&JobsAry[i].cond);
120 		if (JobsAry[i].active) {
121 			freebulk(JobsAry[i].active);
122 			JobsAry[i].active = NULL;
123 			pthread_mutex_lock(&BulkMutex);
124 			--BulkCurJobs;
125 			pthread_mutex_unlock(&BulkMutex);
126 		}
127 		JobsAry[i].terminate = 0;
128 	}
129 	ddassert(BulkCurJobs == 0);
130 
131 	while ((bulk = BulkResponse) != NULL) {
132 		BulkResponse = bulk->next;
133 		freebulk(bulk);
134 	}
135 	BulkResponseTail = &BulkResponse;
136 
137 	BulkFunc = NULL;
138 
139 	bzero(JobsAry, sizeof(JobsAry));
140 
141 	delbuildenv("__MAKE_CONF");
142 }
143 
144 void
145 queuebulk(const char *s1, const char *s2, const char *s3, const char *s4)
146 {
147 	bulk_t *bulk;
148 
149 	bulk = calloc(1, sizeof(*bulk));
150 	if (s1)
151 		bulk->s1 = strdup(s1);
152 	if (s2)
153 		bulk->s2 = strdup(s2);
154 	if (s3)
155 		bulk->s3 = strdup(s3);
156 	if (s4)
157 		bulk->s4 = strdup(s4);
158 	bulk->state = ONSUBMIT;
159 
160 	pthread_mutex_lock(&BulkMutex);
161 	*BulkSubmitTail = bulk;
162 	BulkSubmitTail = &bulk->next;
163 	if (BulkCurJobs < BulkMaxJobs) {
164 		pthread_mutex_unlock(&BulkMutex);
165 		bulkstart();
166 	} else {
167 		pthread_mutex_unlock(&BulkMutex);
168 	}
169 }
170 
171 /*
172  * Fill any idle job slots with new jobs as available.
173  */
174 static
175 void
176 bulkstart(void)
177 {
178 	bulk_t *bulk;
179 	int i;
180 
181 	pthread_mutex_lock(&BulkMutex);
182 	while ((bulk = BulkSubmit) != NULL && BulkCurJobs < BulkMaxJobs) {
183 		i = BulkScanJob + 1;
184 		for (;;) {
185 			i = i % BulkMaxJobs;
186 			if (JobsAry[i].active == NULL)
187 				break;
188 			++i;
189 		}
190 		BulkScanJob = i;
191 		BulkSubmit = bulk->next;
192 		if (BulkSubmit == NULL)
193 			BulkSubmitTail = &BulkSubmit;
194 
195 		bulk->state = ONRUN;
196 		JobsAry[i].active = bulk;
197 		pthread_cond_signal(&JobsAry[i].cond);
198 		++BulkCurJobs;
199 	}
200 	pthread_mutex_unlock(&BulkMutex);
201 }
202 
203 /*
204  * Retrieve completed job or job with activity
205  */
206 bulk_t *
207 getbulk(void)
208 {
209 	bulk_t *bulk;
210 
211 	pthread_mutex_lock(&BulkMutex);
212 	while (BulkCurJobs && BulkResponse == NULL) {
213 		pthread_cond_wait(&BulkResponseCond, &BulkMutex);
214 	}
215 	if (BulkResponse) {
216 		bulk = BulkResponse;
217 		ddassert(bulk->state == ONRESPONSE);
218 		BulkResponse = bulk->next;
219 		if (BulkResponse == NULL)
220 			BulkResponseTail = &BulkResponse;
221 		bulk->state = UNLISTED;
222 	} else {
223 		bulk = NULL;
224 	}
225 	pthread_mutex_unlock(&BulkMutex);
226 	bulkstart();
227 
228 	return bulk;
229 }
230 
231 void
232 freebulk(bulk_t *bulk)
233 {
234 	ddassert(bulk->state == UNLISTED);
235 
236 	if (bulk->s1) {
237 		free(bulk->s1);
238 		bulk->s1 = NULL;
239 	}
240 	if (bulk->s2) {
241 		free(bulk->s2);
242 		bulk->s2 = NULL;
243 	}
244 	if (bulk->s3) {
245 		free(bulk->s3);
246 		bulk->s3 = NULL;
247 	}
248 	if (bulk->s4) {
249 		free(bulk->s4);
250 		bulk->s4 = NULL;
251 	}
252 	if (bulk->r1) {
253 		free(bulk->r1);
254 		bulk->r1 = NULL;
255 	}
256 	if (bulk->r2) {
257 		free(bulk->r2);
258 		bulk->r2 = NULL;
259 	}
260 	if (bulk->r3) {
261 		free(bulk->r3);
262 		bulk->r3 = NULL;
263 	}
264 	if (bulk->r4) {
265 		free(bulk->r4);
266 		bulk->r4 = NULL;
267 	}
268 	free(bulk);
269 }
270 
271 #if 0
272 
273 /*
274  * Returns non-zero if unable to read specified number of bytes
275  */
276 static
277 int
278 readall(int fd, void *buf, size_t bytes)
279 {
280 	ssize_t r;
281 
282 	for (;;) {
283 		r = read(fd, buf, bytes);
284 		if (r == (ssize_t)bytes)
285 			break;
286 		if (r > 0) {
287 			buf = (char *)buf + r;
288 			bytes -= r;
289 			continue;
290 		}
291 		if (r < 0 && errno == EINTR)
292 			continue;
293 		return 1;
294 	}
295 	return 0;
296 }
297 
298 static
299 int
300 writeall(int fd, const void *buf, size_t bytes)
301 {
302 	ssize_t r;
303 
304 	for (;;) {
305 		r = write(fd, buf, bytes);
306 		if (r == (ssize_t)bytes)
307 			break;
308 		if (r > 0) {
309 			buf = (const char *)buf + r;
310 			bytes -= r;
311 			continue;
312 		}
313 		if (r < 0 && errno == EINTR)
314 			continue;
315 		return 1;
316 	}
317 	return 0;
318 }
319 
320 #endif
321 
322 static void *
323 bulkthread(void *arg)
324 {
325 	job_t *job = arg;
326 	bulk_t *bulk;
327 
328 	pthread_mutex_lock(&BulkMutex);
329 	for (;;) {
330 		if (job->terminate)
331 			break;
332 		if (job->active == NULL)
333 			pthread_cond_wait(&job->cond, &BulkMutex);
334 		bulk = job->active;
335 		if (bulk) {
336 			bulk->state = ISRUNNING;
337 
338 			pthread_mutex_unlock(&BulkMutex);
339 			BulkFunc(bulk);
340 			pthread_mutex_lock(&BulkMutex);
341 
342 			bulk->state = ONRESPONSE;
343 			bulk->next = NULL;
344 			*BulkResponseTail = bulk;
345 			BulkResponseTail = &bulk->next;
346 			--BulkCurJobs;
347 			pthread_cond_signal(&BulkResponseCond);
348 		}
349 
350 		/*
351 		 * Optimization - automatically fetch the next job
352 		 */
353 		if ((bulk = BulkSubmit) != NULL && job->terminate == 0) {
354 			BulkSubmit = bulk->next;
355 			if (BulkSubmit == NULL)
356 				BulkSubmitTail = &BulkSubmit;
357 			bulk->state = ONRUN;
358 			job->active = bulk;
359 			++BulkCurJobs;
360 		} else {
361 			job->active = NULL;
362 		}
363 	}
364 	pthread_mutex_unlock(&BulkMutex);
365 
366 	return NULL;
367 }
368