1 /* 2 * Copyright (c) 2019 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * This code uses concepts and configuration based on 'synth', by 8 * John R. Marino <draco@marino.st>, which was written in ada. 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 14 * 1. Redistributions of source code must retain the above copyright 15 * notice, this list of conditions and the following disclaimer. 16 * 2. Redistributions in binary form must reproduce the above copyright 17 * notice, this list of conditions and the following disclaimer in 18 * the documentation and/or other materials provided with the 19 * distribution. 20 * 3. Neither the name of The DragonFly Project nor the names of its 21 * contributors may be used to endorse or promote products derived 22 * from this software without specific, prior written permission. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 25 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 27 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 28 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 29 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 30 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 31 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 32 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 33 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 34 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 35 * SUCH DAMAGE. 36 */ 37 #include "dsynth.h" 38 39 typedef struct job { 40 pthread_t td; 41 pthread_cond_t cond; 42 bulk_t *active; 43 int terminate : 1; 44 } job_t; 45 46 /* 47 * Most of these globals are locked with BulkMutex 48 */ 49 static int BulkScanJob; 50 static int BulkCurJobs; 51 static int BulkMaxJobs; 52 static job_t JobsAry[MAXBULK]; 53 static void (*BulkFunc)(bulk_t *bulk); 54 static bulk_t *BulkSubmit; 55 static bulk_t **BulkSubmitTail = &BulkSubmit; 56 static bulk_t *BulkResponse; 57 static bulk_t **BulkResponseTail = &BulkResponse; 58 static pthread_cond_t BulkResponseCond; 59 static pthread_mutex_t BulkMutex; 60 61 static void bulkstart(void); 62 #if 0 63 static int readall(int fd, void *buf, size_t bytes); 64 static int writeall(int fd, const void *buf, size_t bytes); 65 #endif 66 static void *bulkthread(void *arg); 67 68 /* 69 * Initialize for bulk scan operations. Always paired with donebulk() 70 */ 71 void 72 initbulk(void (*func)(bulk_t *bulk), int jobs) 73 { 74 int i; 75 76 if (jobs > MAXBULK) 77 jobs = MAXBULK; 78 79 ddassert(BulkSubmit == NULL); 80 BulkCurJobs = 0; 81 BulkMaxJobs = jobs; 82 BulkFunc = func; 83 BulkScanJob = 0; 84 85 addbuildenv("__MAKE_CONF", "/dev/null", 86 BENV_ENVIRONMENT | BENV_PKGLIST); 87 88 /* 89 * CCache is a horrible unreliable hack but... leave the 90 * mechanism in-place in case someone has a death wish. 91 */ 92 if (UseCCache) { 93 addbuildenv("WITH_CCACHE_BUILD", "yes", BENV_MAKECONF); 94 addbuildenv("CCACHE_DIR", CCachePath, BENV_MAKECONF); 95 } 96 97 pthread_mutex_init(&BulkMutex, NULL); 98 pthread_cond_init(&BulkResponseCond, NULL); 99 100 pthread_mutex_lock(&BulkMutex); 101 for (i = 0; i < jobs; ++i) { 102 pthread_cond_init(&JobsAry[i].cond, NULL); 103 pthread_create(&JobsAry[i].td, NULL, bulkthread, &JobsAry[i]); 104 } 105 pthread_mutex_unlock(&BulkMutex); 106 } 107 108 void 109 donebulk(void) 110 { 111 bulk_t *bulk; 112 int i; 113 114 pthread_mutex_lock(&BulkMutex); 115 while ((bulk = BulkSubmit) != NULL) { 116 BulkSubmit = bulk->next; 117 freebulk(bulk); 118 } 119 BulkSubmitTail = &BulkSubmit; 120 121 for (i = 0; i < BulkMaxJobs; ++i) { 122 JobsAry[i].terminate = 1; 123 pthread_cond_signal(&JobsAry[i].cond); 124 } 125 pthread_mutex_unlock(&BulkMutex); 126 for (i = 0; i < BulkMaxJobs; ++i) { 127 pthread_join(JobsAry[i].td, NULL); 128 pthread_cond_destroy(&JobsAry[i].cond); 129 if (JobsAry[i].active) { 130 freebulk(JobsAry[i].active); 131 JobsAry[i].active = NULL; 132 pthread_mutex_lock(&BulkMutex); 133 --BulkCurJobs; 134 pthread_mutex_unlock(&BulkMutex); 135 } 136 JobsAry[i].terminate = 0; 137 } 138 ddassert(BulkCurJobs == 0); 139 140 while ((bulk = BulkResponse) != NULL) { 141 BulkResponse = bulk->next; 142 freebulk(bulk); 143 } 144 BulkResponseTail = &BulkResponse; 145 146 BulkFunc = NULL; 147 148 bzero(JobsAry, sizeof(JobsAry)); 149 150 if (UseCCache) { 151 delbuildenv("WITH_CCACHE_BUILD"); 152 delbuildenv("CCACHE_DIR"); 153 } 154 delbuildenv("__MAKE_CONF"); 155 } 156 157 void 158 queuebulk(const char *s1, const char *s2, const char *s3, const char *s4) 159 { 160 bulk_t *bulk; 161 162 bulk = calloc(1, sizeof(*bulk)); 163 if (s1) 164 bulk->s1 = strdup(s1); 165 if (s2) 166 bulk->s2 = strdup(s2); 167 if (s3) 168 bulk->s3 = strdup(s3); 169 if (s4) 170 bulk->s4 = strdup(s4); 171 bulk->state = ONSUBMIT; 172 173 pthread_mutex_lock(&BulkMutex); 174 *BulkSubmitTail = bulk; 175 BulkSubmitTail = &bulk->next; 176 if (BulkCurJobs < BulkMaxJobs) { 177 pthread_mutex_unlock(&BulkMutex); 178 bulkstart(); 179 } else { 180 pthread_mutex_unlock(&BulkMutex); 181 } 182 } 183 184 /* 185 * Fill any idle job slots with new jobs as available. 186 */ 187 static 188 void 189 bulkstart(void) 190 { 191 bulk_t *bulk; 192 int i; 193 194 pthread_mutex_lock(&BulkMutex); 195 while ((bulk = BulkSubmit) != NULL && BulkCurJobs < BulkMaxJobs) { 196 i = BulkScanJob + 1; 197 for (;;) { 198 i = i % BulkMaxJobs; 199 if (JobsAry[i].active == NULL) 200 break; 201 ++i; 202 } 203 BulkScanJob = i; 204 BulkSubmit = bulk->next; 205 if (BulkSubmit == NULL) 206 BulkSubmitTail = &BulkSubmit; 207 208 bulk->state = ONRUN; 209 JobsAry[i].active = bulk; 210 pthread_cond_signal(&JobsAry[i].cond); 211 ++BulkCurJobs; 212 } 213 pthread_mutex_unlock(&BulkMutex); 214 } 215 216 /* 217 * Retrieve completed job or job with activity 218 */ 219 bulk_t * 220 getbulk(void) 221 { 222 bulk_t *bulk; 223 224 pthread_mutex_lock(&BulkMutex); 225 while (BulkCurJobs && BulkResponse == NULL) { 226 pthread_cond_wait(&BulkResponseCond, &BulkMutex); 227 } 228 if (BulkResponse) { 229 bulk = BulkResponse; 230 ddassert(bulk->state == ONRESPONSE); 231 BulkResponse = bulk->next; 232 if (BulkResponse == NULL) 233 BulkResponseTail = &BulkResponse; 234 bulk->state = UNLISTED; 235 } else { 236 bulk = NULL; 237 } 238 pthread_mutex_unlock(&BulkMutex); 239 bulkstart(); 240 241 return bulk; 242 } 243 244 void 245 freebulk(bulk_t *bulk) 246 { 247 ddassert(bulk->state == UNLISTED); 248 249 if (bulk->s1) { 250 free(bulk->s1); 251 bulk->s1 = NULL; 252 } 253 if (bulk->s2) { 254 free(bulk->s2); 255 bulk->s2 = NULL; 256 } 257 if (bulk->s3) { 258 free(bulk->s3); 259 bulk->s3 = NULL; 260 } 261 if (bulk->s4) { 262 free(bulk->s4); 263 bulk->s4 = NULL; 264 } 265 if (bulk->r1) { 266 free(bulk->r1); 267 bulk->r1 = NULL; 268 } 269 if (bulk->r2) { 270 free(bulk->r2); 271 bulk->r2 = NULL; 272 } 273 if (bulk->r3) { 274 free(bulk->r3); 275 bulk->r3 = NULL; 276 } 277 if (bulk->r4) { 278 free(bulk->r4); 279 bulk->r4 = NULL; 280 } 281 free(bulk); 282 } 283 284 #if 0 285 286 /* 287 * Returns non-zero if unable to read specified number of bytes 288 */ 289 static 290 int 291 readall(int fd, void *buf, size_t bytes) 292 { 293 ssize_t r; 294 295 for (;;) { 296 r = read(fd, buf, bytes); 297 if (r == (ssize_t)bytes) 298 break; 299 if (r > 0) { 300 buf = (char *)buf + r; 301 bytes -= r; 302 continue; 303 } 304 if (r < 0 && errno == EINTR) 305 continue; 306 return 1; 307 } 308 return 0; 309 } 310 311 static 312 int 313 writeall(int fd, const void *buf, size_t bytes) 314 { 315 ssize_t r; 316 317 for (;;) { 318 r = write(fd, buf, bytes); 319 if (r == (ssize_t)bytes) 320 break; 321 if (r > 0) { 322 buf = (const char *)buf + r; 323 bytes -= r; 324 continue; 325 } 326 if (r < 0 && errno == EINTR) 327 continue; 328 return 1; 329 } 330 return 0; 331 } 332 333 #endif 334 335 static void * 336 bulkthread(void *arg) 337 { 338 job_t *job = arg; 339 bulk_t *bulk; 340 341 pthread_mutex_lock(&BulkMutex); 342 for (;;) { 343 if (job->terminate) 344 break; 345 if (job->active == NULL) 346 pthread_cond_wait(&job->cond, &BulkMutex); 347 bulk = job->active; 348 if (bulk) { 349 bulk->state = ISRUNNING; 350 351 pthread_mutex_unlock(&BulkMutex); 352 BulkFunc(bulk); 353 pthread_mutex_lock(&BulkMutex); 354 355 bulk->state = ONRESPONSE; 356 bulk->next = NULL; 357 *BulkResponseTail = bulk; 358 BulkResponseTail = &bulk->next; 359 --BulkCurJobs; 360 pthread_cond_signal(&BulkResponseCond); 361 } 362 363 /* 364 * Optimization - automatically fetch the next job 365 */ 366 if ((bulk = BulkSubmit) != NULL && job->terminate == 0) { 367 BulkSubmit = bulk->next; 368 if (BulkSubmit == NULL) 369 BulkSubmitTail = &BulkSubmit; 370 bulk->state = ONRUN; 371 job->active = bulk; 372 ++BulkCurJobs; 373 } else { 374 job->active = NULL; 375 } 376 } 377 pthread_mutex_unlock(&BulkMutex); 378 379 return NULL; 380 } 381