xref: /dragonfly/usr.bin/dsynth/bulk.c (revision 33c3dcc3)
1 /*
2  * Copyright (c) 2019 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * This code uses concepts and configuration based on 'synth', by
8  * John R. Marino <draco@marino.st>, which was written in ada.
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  * 1. Redistributions of source code must retain the above copyright
15  *    notice, this list of conditions and the following disclaimer.
16  * 2. Redistributions in binary form must reproduce the above copyright
17  *    notice, this list of conditions and the following disclaimer in
18  *    the documentation and/or other materials provided with the
19  *    distribution.
20  * 3. Neither the name of The DragonFly Project nor the names of its
21  *    contributors may be used to endorse or promote products derived
22  *    from this software without specific, prior written permission.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
27  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
28  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
29  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
30  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
31  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
32  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
33  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
34  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
35  * SUCH DAMAGE.
36  */
37 #include "dsynth.h"
38 
39 typedef struct job {
40 	pthread_t td;
41 	pthread_cond_t cond;
42 	bulk_t	*active;
43 	int terminate : 1;
44 } job_t;
45 
46 /*
47  * Most of these globals are locked with BulkMutex
48  */
49 static int BulkScanJob;
50 static int BulkCurJobs;
51 static int BulkMaxJobs;
52 static job_t JobsAry[MAXBULK];
53 static void (*BulkFunc)(bulk_t *bulk);
54 static bulk_t *BulkSubmit;
55 static bulk_t **BulkSubmitTail = &BulkSubmit;
56 static bulk_t *BulkResponse;
57 static bulk_t **BulkResponseTail = &BulkResponse;
58 static pthread_cond_t BulkResponseCond;
59 static pthread_mutex_t BulkMutex;
60 
61 static void bulkstart(void);
62 #if 0
63 static int readall(int fd, void *buf, size_t bytes);
64 static int writeall(int fd, const void *buf, size_t bytes);
65 #endif
66 static void *bulkthread(void *arg);
67 
68 /*
69  * Initialize for bulk scan operations.  Always paired with donebulk()
70  */
71 void
initbulk(void (* func)(bulk_t * bulk),int jobs)72 initbulk(void (*func)(bulk_t *bulk), int jobs)
73 {
74 	int i;
75 
76 	if (jobs > MAXBULK)
77 		jobs = MAXBULK;
78 
79 	ddassert(BulkSubmit == NULL);
80 	BulkCurJobs = 0;
81 	BulkMaxJobs = jobs;
82 	BulkFunc = func;
83 	BulkScanJob = 0;
84 
85 	addbuildenv("__MAKE_CONF", "/dev/null",
86 		    BENV_ENVIRONMENT | BENV_PKGLIST);
87 
88 	/*
89 	 * CCache is a horrible unreliable hack but... leave the
90 	 * mechanism in-place in case someone has a death wish.
91 	 */
92 	if (UseCCache) {
93 		addbuildenv("WITH_CCACHE_BUILD", "yes", BENV_MAKECONF);
94 		addbuildenv("CCACHE_DIR", CCachePath, BENV_MAKECONF);
95 	}
96 
97 	pthread_mutex_init(&BulkMutex, NULL);
98 	pthread_cond_init(&BulkResponseCond, NULL);
99 
100 	pthread_mutex_lock(&BulkMutex);
101 	for (i = 0; i < jobs; ++i) {
102 		pthread_cond_init(&JobsAry[i].cond, NULL);
103 		pthread_create(&JobsAry[i].td, NULL, bulkthread, &JobsAry[i]);
104 	}
105 	pthread_mutex_unlock(&BulkMutex);
106 }
107 
108 void
donebulk(void)109 donebulk(void)
110 {
111 	bulk_t *bulk;
112 	int i;
113 
114 	pthread_mutex_lock(&BulkMutex);
115 	while ((bulk = BulkSubmit) != NULL) {
116 		BulkSubmit = bulk->next;
117 		freebulk(bulk);
118 	}
119 	BulkSubmitTail = &BulkSubmit;
120 
121 	for (i = 0; i < BulkMaxJobs; ++i) {
122 		JobsAry[i].terminate = 1;
123 		pthread_cond_signal(&JobsAry[i].cond);
124 	}
125 	pthread_mutex_unlock(&BulkMutex);
126 	for (i = 0; i < BulkMaxJobs; ++i) {
127 		pthread_join(JobsAry[i].td, NULL);
128 		pthread_cond_destroy(&JobsAry[i].cond);
129 		if (JobsAry[i].active) {
130 			freebulk(JobsAry[i].active);
131 			JobsAry[i].active = NULL;
132 			pthread_mutex_lock(&BulkMutex);
133 			--BulkCurJobs;
134 			pthread_mutex_unlock(&BulkMutex);
135 		}
136 		JobsAry[i].terminate = 0;
137 	}
138 	ddassert(BulkCurJobs == 0);
139 
140 	while ((bulk = BulkResponse) != NULL) {
141 		BulkResponse = bulk->next;
142 		freebulk(bulk);
143 	}
144 	BulkResponseTail = &BulkResponse;
145 
146 	BulkFunc = NULL;
147 
148 	bzero(JobsAry, sizeof(JobsAry));
149 
150 	if (UseCCache) {
151 		delbuildenv("WITH_CCACHE_BUILD");
152 		delbuildenv("CCACHE_DIR");
153 	}
154 	delbuildenv("__MAKE_CONF");
155 }
156 
157 void
queuebulk(const char * s1,const char * s2,const char * s3,const char * s4)158 queuebulk(const char *s1, const char *s2, const char *s3, const char *s4)
159 {
160 	bulk_t *bulk;
161 
162 	bulk = calloc(1, sizeof(*bulk));
163 	if (s1)
164 		bulk->s1 = strdup(s1);
165 	if (s2)
166 		bulk->s2 = strdup(s2);
167 	if (s3)
168 		bulk->s3 = strdup(s3);
169 	if (s4)
170 		bulk->s4 = strdup(s4);
171 	bulk->state = ONSUBMIT;
172 
173 	pthread_mutex_lock(&BulkMutex);
174 	*BulkSubmitTail = bulk;
175 	BulkSubmitTail = &bulk->next;
176 	if (BulkCurJobs < BulkMaxJobs) {
177 		pthread_mutex_unlock(&BulkMutex);
178 		bulkstart();
179 	} else {
180 		pthread_mutex_unlock(&BulkMutex);
181 	}
182 }
183 
184 /*
185  * Fill any idle job slots with new jobs as available.
186  */
187 static
188 void
bulkstart(void)189 bulkstart(void)
190 {
191 	bulk_t *bulk;
192 	int i;
193 
194 	pthread_mutex_lock(&BulkMutex);
195 	while ((bulk = BulkSubmit) != NULL && BulkCurJobs < BulkMaxJobs) {
196 		i = BulkScanJob + 1;
197 		for (;;) {
198 			i = i % BulkMaxJobs;
199 			if (JobsAry[i].active == NULL)
200 				break;
201 			++i;
202 		}
203 		BulkScanJob = i;
204 		BulkSubmit = bulk->next;
205 		if (BulkSubmit == NULL)
206 			BulkSubmitTail = &BulkSubmit;
207 
208 		bulk->state = ONRUN;
209 		JobsAry[i].active = bulk;
210 		pthread_cond_signal(&JobsAry[i].cond);
211 		++BulkCurJobs;
212 	}
213 	pthread_mutex_unlock(&BulkMutex);
214 }
215 
216 /*
217  * Retrieve completed job or job with activity
218  */
219 bulk_t *
getbulk(void)220 getbulk(void)
221 {
222 	bulk_t *bulk;
223 
224 	pthread_mutex_lock(&BulkMutex);
225 	while (BulkCurJobs && BulkResponse == NULL) {
226 		pthread_cond_wait(&BulkResponseCond, &BulkMutex);
227 	}
228 	if (BulkResponse) {
229 		bulk = BulkResponse;
230 		ddassert(bulk->state == ONRESPONSE);
231 		BulkResponse = bulk->next;
232 		if (BulkResponse == NULL)
233 			BulkResponseTail = &BulkResponse;
234 		bulk->state = UNLISTED;
235 	} else {
236 		bulk = NULL;
237 	}
238 	pthread_mutex_unlock(&BulkMutex);
239 	bulkstart();
240 
241 	return bulk;
242 }
243 
244 void
freebulk(bulk_t * bulk)245 freebulk(bulk_t *bulk)
246 {
247 	ddassert(bulk->state == UNLISTED);
248 
249 	if (bulk->s1) {
250 		free(bulk->s1);
251 		bulk->s1 = NULL;
252 	}
253 	if (bulk->s2) {
254 		free(bulk->s2);
255 		bulk->s2 = NULL;
256 	}
257 	if (bulk->s3) {
258 		free(bulk->s3);
259 		bulk->s3 = NULL;
260 	}
261 	if (bulk->s4) {
262 		free(bulk->s4);
263 		bulk->s4 = NULL;
264 	}
265 	if (bulk->r1) {
266 		free(bulk->r1);
267 		bulk->r1 = NULL;
268 	}
269 	if (bulk->r2) {
270 		free(bulk->r2);
271 		bulk->r2 = NULL;
272 	}
273 	if (bulk->r3) {
274 		free(bulk->r3);
275 		bulk->r3 = NULL;
276 	}
277 	if (bulk->r4) {
278 		free(bulk->r4);
279 		bulk->r4 = NULL;
280 	}
281 	free(bulk);
282 }
283 
284 #if 0
285 
286 /*
287  * Returns non-zero if unable to read specified number of bytes
288  */
289 static
290 int
291 readall(int fd, void *buf, size_t bytes)
292 {
293 	ssize_t r;
294 
295 	for (;;) {
296 		r = read(fd, buf, bytes);
297 		if (r == (ssize_t)bytes)
298 			break;
299 		if (r > 0) {
300 			buf = (char *)buf + r;
301 			bytes -= r;
302 			continue;
303 		}
304 		if (r < 0 && errno == EINTR)
305 			continue;
306 		return 1;
307 	}
308 	return 0;
309 }
310 
311 static
312 int
313 writeall(int fd, const void *buf, size_t bytes)
314 {
315 	ssize_t r;
316 
317 	for (;;) {
318 		r = write(fd, buf, bytes);
319 		if (r == (ssize_t)bytes)
320 			break;
321 		if (r > 0) {
322 			buf = (const char *)buf + r;
323 			bytes -= r;
324 			continue;
325 		}
326 		if (r < 0 && errno == EINTR)
327 			continue;
328 		return 1;
329 	}
330 	return 0;
331 }
332 
333 #endif
334 
335 static void *
bulkthread(void * arg)336 bulkthread(void *arg)
337 {
338 	job_t *job = arg;
339 	bulk_t *bulk;
340 
341 	pthread_mutex_lock(&BulkMutex);
342 	for (;;) {
343 		if (job->terminate)
344 			break;
345 		if (job->active == NULL)
346 			pthread_cond_wait(&job->cond, &BulkMutex);
347 		bulk = job->active;
348 		if (bulk) {
349 			bulk->state = ISRUNNING;
350 
351 			pthread_mutex_unlock(&BulkMutex);
352 			BulkFunc(bulk);
353 			pthread_mutex_lock(&BulkMutex);
354 
355 			bulk->state = ONRESPONSE;
356 			bulk->next = NULL;
357 			*BulkResponseTail = bulk;
358 			BulkResponseTail = &bulk->next;
359 			--BulkCurJobs;
360 			pthread_cond_signal(&BulkResponseCond);
361 		}
362 
363 		/*
364 		 * Optimization - automatically fetch the next job
365 		 */
366 		if ((bulk = BulkSubmit) != NULL && job->terminate == 0) {
367 			BulkSubmit = bulk->next;
368 			if (BulkSubmit == NULL)
369 				BulkSubmitTail = &BulkSubmit;
370 			bulk->state = ONRUN;
371 			job->active = bulk;
372 			++BulkCurJobs;
373 		} else {
374 			job->active = NULL;
375 		}
376 	}
377 	pthread_mutex_unlock(&BulkMutex);
378 
379 	return NULL;
380 }
381