1 /*
2 * Copyright (c) 2019 The DragonFly Project. All rights reserved.
3 *
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@backplane.com>
6 *
7 * This code uses concepts and configuration based on 'synth', by
8 * John R. Marino <draco@marino.st>, which was written in ada.
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
13 *
14 * 1. Redistributions of source code must retain the above copyright
15 * notice, this list of conditions and the following disclaimer.
16 * 2. Redistributions in binary form must reproduce the above copyright
17 * notice, this list of conditions and the following disclaimer in
18 * the documentation and/or other materials provided with the
19 * distribution.
20 * 3. Neither the name of The DragonFly Project nor the names of its
21 * contributors may be used to endorse or promote products derived
22 * from this software without specific, prior written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
27 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
28 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
29 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
30 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
31 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
32 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
33 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
34 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
35 * SUCH DAMAGE.
36 */
37 #include "dsynth.h"
38
39 typedef struct job {
40 pthread_t td;
41 pthread_cond_t cond;
42 bulk_t *active;
43 int terminate : 1;
44 } job_t;
45
46 /*
47 * Most of these globals are locked with BulkMutex
48 */
49 static int BulkScanJob;
50 static int BulkCurJobs;
51 static int BulkMaxJobs;
52 static job_t JobsAry[MAXBULK];
53 static void (*BulkFunc)(bulk_t *bulk);
54 static bulk_t *BulkSubmit;
55 static bulk_t **BulkSubmitTail = &BulkSubmit;
56 static bulk_t *BulkResponse;
57 static bulk_t **BulkResponseTail = &BulkResponse;
58 static pthread_cond_t BulkResponseCond;
59 static pthread_mutex_t BulkMutex;
60
61 static void bulkstart(void);
62 #if 0
63 static int readall(int fd, void *buf, size_t bytes);
64 static int writeall(int fd, const void *buf, size_t bytes);
65 #endif
66 static void *bulkthread(void *arg);
67
68 /*
69 * Initialize for bulk scan operations. Always paired with donebulk()
70 */
71 void
initbulk(void (* func)(bulk_t * bulk),int jobs)72 initbulk(void (*func)(bulk_t *bulk), int jobs)
73 {
74 int i;
75
76 if (jobs > MAXBULK)
77 jobs = MAXBULK;
78
79 ddassert(BulkSubmit == NULL);
80 BulkCurJobs = 0;
81 BulkMaxJobs = jobs;
82 BulkFunc = func;
83 BulkScanJob = 0;
84
85 addbuildenv("__MAKE_CONF", "/dev/null",
86 BENV_ENVIRONMENT | BENV_PKGLIST);
87
88 /*
89 * CCache is a horrible unreliable hack but... leave the
90 * mechanism in-place in case someone has a death wish.
91 */
92 if (UseCCache) {
93 addbuildenv("WITH_CCACHE_BUILD", "yes", BENV_MAKECONF);
94 addbuildenv("CCACHE_DIR", CCachePath, BENV_MAKECONF);
95 }
96
97 pthread_mutex_init(&BulkMutex, NULL);
98 pthread_cond_init(&BulkResponseCond, NULL);
99
100 pthread_mutex_lock(&BulkMutex);
101 for (i = 0; i < jobs; ++i) {
102 pthread_cond_init(&JobsAry[i].cond, NULL);
103 pthread_create(&JobsAry[i].td, NULL, bulkthread, &JobsAry[i]);
104 }
105 pthread_mutex_unlock(&BulkMutex);
106 }
107
108 void
donebulk(void)109 donebulk(void)
110 {
111 bulk_t *bulk;
112 int i;
113
114 pthread_mutex_lock(&BulkMutex);
115 while ((bulk = BulkSubmit) != NULL) {
116 BulkSubmit = bulk->next;
117 freebulk(bulk);
118 }
119 BulkSubmitTail = &BulkSubmit;
120
121 for (i = 0; i < BulkMaxJobs; ++i) {
122 JobsAry[i].terminate = 1;
123 pthread_cond_signal(&JobsAry[i].cond);
124 }
125 pthread_mutex_unlock(&BulkMutex);
126 for (i = 0; i < BulkMaxJobs; ++i) {
127 pthread_join(JobsAry[i].td, NULL);
128 pthread_cond_destroy(&JobsAry[i].cond);
129 if (JobsAry[i].active) {
130 freebulk(JobsAry[i].active);
131 JobsAry[i].active = NULL;
132 pthread_mutex_lock(&BulkMutex);
133 --BulkCurJobs;
134 pthread_mutex_unlock(&BulkMutex);
135 }
136 JobsAry[i].terminate = 0;
137 }
138 ddassert(BulkCurJobs == 0);
139
140 while ((bulk = BulkResponse) != NULL) {
141 BulkResponse = bulk->next;
142 freebulk(bulk);
143 }
144 BulkResponseTail = &BulkResponse;
145
146 BulkFunc = NULL;
147
148 bzero(JobsAry, sizeof(JobsAry));
149
150 if (UseCCache) {
151 delbuildenv("WITH_CCACHE_BUILD");
152 delbuildenv("CCACHE_DIR");
153 }
154 delbuildenv("__MAKE_CONF");
155 }
156
157 void
queuebulk(const char * s1,const char * s2,const char * s3,const char * s4)158 queuebulk(const char *s1, const char *s2, const char *s3, const char *s4)
159 {
160 bulk_t *bulk;
161
162 bulk = calloc(1, sizeof(*bulk));
163 if (s1)
164 bulk->s1 = strdup(s1);
165 if (s2)
166 bulk->s2 = strdup(s2);
167 if (s3)
168 bulk->s3 = strdup(s3);
169 if (s4)
170 bulk->s4 = strdup(s4);
171 bulk->state = ONSUBMIT;
172
173 pthread_mutex_lock(&BulkMutex);
174 *BulkSubmitTail = bulk;
175 BulkSubmitTail = &bulk->next;
176 if (BulkCurJobs < BulkMaxJobs) {
177 pthread_mutex_unlock(&BulkMutex);
178 bulkstart();
179 } else {
180 pthread_mutex_unlock(&BulkMutex);
181 }
182 }
183
184 /*
185 * Fill any idle job slots with new jobs as available.
186 */
187 static
188 void
bulkstart(void)189 bulkstart(void)
190 {
191 bulk_t *bulk;
192 int i;
193
194 pthread_mutex_lock(&BulkMutex);
195 while ((bulk = BulkSubmit) != NULL && BulkCurJobs < BulkMaxJobs) {
196 i = BulkScanJob + 1;
197 for (;;) {
198 i = i % BulkMaxJobs;
199 if (JobsAry[i].active == NULL)
200 break;
201 ++i;
202 }
203 BulkScanJob = i;
204 BulkSubmit = bulk->next;
205 if (BulkSubmit == NULL)
206 BulkSubmitTail = &BulkSubmit;
207
208 bulk->state = ONRUN;
209 JobsAry[i].active = bulk;
210 pthread_cond_signal(&JobsAry[i].cond);
211 ++BulkCurJobs;
212 }
213 pthread_mutex_unlock(&BulkMutex);
214 }
215
216 /*
217 * Retrieve completed job or job with activity
218 */
219 bulk_t *
getbulk(void)220 getbulk(void)
221 {
222 bulk_t *bulk;
223
224 pthread_mutex_lock(&BulkMutex);
225 while (BulkCurJobs && BulkResponse == NULL) {
226 pthread_cond_wait(&BulkResponseCond, &BulkMutex);
227 }
228 if (BulkResponse) {
229 bulk = BulkResponse;
230 ddassert(bulk->state == ONRESPONSE);
231 BulkResponse = bulk->next;
232 if (BulkResponse == NULL)
233 BulkResponseTail = &BulkResponse;
234 bulk->state = UNLISTED;
235 } else {
236 bulk = NULL;
237 }
238 pthread_mutex_unlock(&BulkMutex);
239 bulkstart();
240
241 return bulk;
242 }
243
244 void
freebulk(bulk_t * bulk)245 freebulk(bulk_t *bulk)
246 {
247 ddassert(bulk->state == UNLISTED);
248
249 if (bulk->s1) {
250 free(bulk->s1);
251 bulk->s1 = NULL;
252 }
253 if (bulk->s2) {
254 free(bulk->s2);
255 bulk->s2 = NULL;
256 }
257 if (bulk->s3) {
258 free(bulk->s3);
259 bulk->s3 = NULL;
260 }
261 if (bulk->s4) {
262 free(bulk->s4);
263 bulk->s4 = NULL;
264 }
265 if (bulk->r1) {
266 free(bulk->r1);
267 bulk->r1 = NULL;
268 }
269 if (bulk->r2) {
270 free(bulk->r2);
271 bulk->r2 = NULL;
272 }
273 if (bulk->r3) {
274 free(bulk->r3);
275 bulk->r3 = NULL;
276 }
277 if (bulk->r4) {
278 free(bulk->r4);
279 bulk->r4 = NULL;
280 }
281 free(bulk);
282 }
283
284 #if 0
285
286 /*
287 * Returns non-zero if unable to read specified number of bytes
288 */
289 static
290 int
291 readall(int fd, void *buf, size_t bytes)
292 {
293 ssize_t r;
294
295 for (;;) {
296 r = read(fd, buf, bytes);
297 if (r == (ssize_t)bytes)
298 break;
299 if (r > 0) {
300 buf = (char *)buf + r;
301 bytes -= r;
302 continue;
303 }
304 if (r < 0 && errno == EINTR)
305 continue;
306 return 1;
307 }
308 return 0;
309 }
310
311 static
312 int
313 writeall(int fd, const void *buf, size_t bytes)
314 {
315 ssize_t r;
316
317 for (;;) {
318 r = write(fd, buf, bytes);
319 if (r == (ssize_t)bytes)
320 break;
321 if (r > 0) {
322 buf = (const char *)buf + r;
323 bytes -= r;
324 continue;
325 }
326 if (r < 0 && errno == EINTR)
327 continue;
328 return 1;
329 }
330 return 0;
331 }
332
333 #endif
334
335 static void *
bulkthread(void * arg)336 bulkthread(void *arg)
337 {
338 job_t *job = arg;
339 bulk_t *bulk;
340
341 pthread_mutex_lock(&BulkMutex);
342 for (;;) {
343 if (job->terminate)
344 break;
345 if (job->active == NULL)
346 pthread_cond_wait(&job->cond, &BulkMutex);
347 bulk = job->active;
348 if (bulk) {
349 bulk->state = ISRUNNING;
350
351 pthread_mutex_unlock(&BulkMutex);
352 BulkFunc(bulk);
353 pthread_mutex_lock(&BulkMutex);
354
355 bulk->state = ONRESPONSE;
356 bulk->next = NULL;
357 *BulkResponseTail = bulk;
358 BulkResponseTail = &bulk->next;
359 --BulkCurJobs;
360 pthread_cond_signal(&BulkResponseCond);
361 }
362
363 /*
364 * Optimization - automatically fetch the next job
365 */
366 if ((bulk = BulkSubmit) != NULL && job->terminate == 0) {
367 BulkSubmit = bulk->next;
368 if (BulkSubmit == NULL)
369 BulkSubmitTail = &BulkSubmit;
370 bulk->state = ONRUN;
371 job->active = bulk;
372 ++BulkCurJobs;
373 } else {
374 job->active = NULL;
375 }
376 }
377 pthread_mutex_unlock(&BulkMutex);
378
379 return NULL;
380 }
381