1 /* 2 * Copyright (c) 2019 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * This code uses concepts and configuration based on 'synth', by 8 * John R. Marino <draco@marino.st>, which was written in ada. 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 14 * 1. Redistributions of source code must retain the above copyright 15 * notice, this list of conditions and the following disclaimer. 16 * 2. Redistributions in binary form must reproduce the above copyright 17 * notice, this list of conditions and the following disclaimer in 18 * the documentation and/or other materials provided with the 19 * distribution. 20 * 3. Neither the name of The DragonFly Project nor the names of its 21 * contributors may be used to endorse or promote products derived 22 * from this software without specific, prior written permission. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 25 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 27 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 28 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 29 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 30 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 31 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 32 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 33 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 34 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 35 * SUCH DAMAGE. 36 */ 37 #include "dsynth.h" 38 39 typedef struct job { 40 pthread_t td; 41 pthread_cond_t cond; 42 bulk_t *active; 43 int terminate : 1; 44 } job_t; 45 46 /* 47 * Most of these globals are locked with BulkMutex 48 */ 49 static int BulkScanJob; 50 static int BulkCurJobs; 51 static int BulkMaxJobs; 52 static job_t JobsAry[MAXBULK]; 53 static void (*BulkFunc)(bulk_t *bulk); 54 static bulk_t *BulkSubmit; 55 static bulk_t **BulkSubmitTail = &BulkSubmit; 56 static bulk_t *BulkResponse; 57 static bulk_t **BulkResponseTail = &BulkResponse; 58 static pthread_cond_t BulkResponseCond; 59 static pthread_mutex_t BulkMutex; 60 61 static void bulkstart(void); 62 #if 0 63 static int readall(int fd, void *buf, size_t bytes); 64 static int writeall(int fd, const void *buf, size_t bytes); 65 #endif 66 static void *bulkthread(void *arg); 67 68 /* 69 * Initialize for bulk scan operations. Always paired with donebulk() 70 */ 71 void 72 initbulk(void (*func)(bulk_t *bulk), int jobs) 73 { 74 int i; 75 76 if (jobs > MAXBULK) 77 jobs = MAXBULK; 78 79 ddassert(BulkSubmit == NULL); 80 BulkCurJobs = 0; 81 BulkMaxJobs = jobs; 82 BulkFunc = func; 83 BulkScanJob = 0; 84 85 addbuildenv("__MAKE_CONF", "/dev/null", 86 BENV_ENVIRONMENT | BENV_PKGLIST); 87 88 pthread_mutex_init(&BulkMutex, NULL); 89 pthread_cond_init(&BulkResponseCond, NULL); 90 91 pthread_mutex_lock(&BulkMutex); 92 for (i = 0; i < jobs; ++i) { 93 pthread_cond_init(&JobsAry[i].cond, NULL); 94 pthread_create(&JobsAry[i].td, NULL, bulkthread, &JobsAry[i]); 95 } 96 pthread_mutex_unlock(&BulkMutex); 97 } 98 99 void 100 donebulk(void) 101 { 102 bulk_t *bulk; 103 int i; 104 105 pthread_mutex_lock(&BulkMutex); 106 while ((bulk = BulkSubmit) != NULL) { 107 BulkSubmit = bulk->next; 108 freebulk(bulk); 109 } 110 BulkSubmitTail = &BulkSubmit; 111 112 for (i = 0; i < BulkMaxJobs; ++i) { 113 JobsAry[i].terminate = 1; 114 pthread_cond_signal(&JobsAry[i].cond); 115 } 116 pthread_mutex_unlock(&BulkMutex); 117 for (i = 0; i < BulkMaxJobs; ++i) { 118 pthread_join(JobsAry[i].td, NULL); 119 pthread_cond_destroy(&JobsAry[i].cond); 120 if (JobsAry[i].active) { 121 freebulk(JobsAry[i].active); 122 JobsAry[i].active = NULL; 123 pthread_mutex_lock(&BulkMutex); 124 --BulkCurJobs; 125 pthread_mutex_unlock(&BulkMutex); 126 } 127 JobsAry[i].terminate = 0; 128 } 129 ddassert(BulkCurJobs == 0); 130 131 while ((bulk = BulkResponse) != NULL) { 132 BulkResponse = bulk->next; 133 freebulk(bulk); 134 } 135 BulkResponseTail = &BulkResponse; 136 137 BulkFunc = NULL; 138 139 bzero(JobsAry, sizeof(JobsAry)); 140 141 delbuildenv("__MAKE_CONF"); 142 } 143 144 void 145 queuebulk(const char *s1, const char *s2, const char *s3, const char *s4) 146 { 147 bulk_t *bulk; 148 149 bulk = calloc(1, sizeof(*bulk)); 150 if (s1) 151 bulk->s1 = strdup(s1); 152 if (s2) 153 bulk->s2 = strdup(s2); 154 if (s3) 155 bulk->s3 = strdup(s3); 156 if (s4) 157 bulk->s4 = strdup(s4); 158 bulk->state = ONSUBMIT; 159 160 pthread_mutex_lock(&BulkMutex); 161 *BulkSubmitTail = bulk; 162 BulkSubmitTail = &bulk->next; 163 if (BulkCurJobs < BulkMaxJobs) { 164 pthread_mutex_unlock(&BulkMutex); 165 bulkstart(); 166 } else { 167 pthread_mutex_unlock(&BulkMutex); 168 } 169 } 170 171 /* 172 * Fill any idle job slots with new jobs as available. 173 */ 174 static 175 void 176 bulkstart(void) 177 { 178 bulk_t *bulk; 179 int i; 180 181 pthread_mutex_lock(&BulkMutex); 182 while ((bulk = BulkSubmit) != NULL && BulkCurJobs < BulkMaxJobs) { 183 i = BulkScanJob + 1; 184 for (;;) { 185 i = i % BulkMaxJobs; 186 if (JobsAry[i].active == NULL) 187 break; 188 ++i; 189 } 190 BulkScanJob = i; 191 BulkSubmit = bulk->next; 192 if (BulkSubmit == NULL) 193 BulkSubmitTail = &BulkSubmit; 194 195 bulk->state = ONRUN; 196 JobsAry[i].active = bulk; 197 pthread_cond_signal(&JobsAry[i].cond); 198 ++BulkCurJobs; 199 } 200 pthread_mutex_unlock(&BulkMutex); 201 } 202 203 /* 204 * Retrieve completed job or job with activity 205 */ 206 bulk_t * 207 getbulk(void) 208 { 209 bulk_t *bulk; 210 211 pthread_mutex_lock(&BulkMutex); 212 while (BulkCurJobs && BulkResponse == NULL) { 213 pthread_cond_wait(&BulkResponseCond, &BulkMutex); 214 } 215 if (BulkResponse) { 216 bulk = BulkResponse; 217 ddassert(bulk->state == ONRESPONSE); 218 BulkResponse = bulk->next; 219 if (BulkResponse == NULL) 220 BulkResponseTail = &BulkResponse; 221 bulk->state = UNLISTED; 222 } else { 223 bulk = NULL; 224 } 225 pthread_mutex_unlock(&BulkMutex); 226 bulkstart(); 227 228 return bulk; 229 } 230 231 void 232 freebulk(bulk_t *bulk) 233 { 234 ddassert(bulk->state == UNLISTED); 235 236 if (bulk->s1) { 237 free(bulk->s1); 238 bulk->s1 = NULL; 239 } 240 if (bulk->s2) { 241 free(bulk->s2); 242 bulk->s2 = NULL; 243 } 244 if (bulk->s3) { 245 free(bulk->s3); 246 bulk->s3 = NULL; 247 } 248 if (bulk->s4) { 249 free(bulk->s4); 250 bulk->s4 = NULL; 251 } 252 if (bulk->r1) { 253 free(bulk->r1); 254 bulk->r1 = NULL; 255 } 256 if (bulk->r2) { 257 free(bulk->r2); 258 bulk->r2 = NULL; 259 } 260 if (bulk->r3) { 261 free(bulk->r3); 262 bulk->r3 = NULL; 263 } 264 if (bulk->r4) { 265 free(bulk->r4); 266 bulk->r4 = NULL; 267 } 268 free(bulk); 269 } 270 271 #if 0 272 273 /* 274 * Returns non-zero if unable to read specified number of bytes 275 */ 276 static 277 int 278 readall(int fd, void *buf, size_t bytes) 279 { 280 ssize_t r; 281 282 for (;;) { 283 r = read(fd, buf, bytes); 284 if (r == (ssize_t)bytes) 285 break; 286 if (r > 0) { 287 buf = (char *)buf + r; 288 bytes -= r; 289 continue; 290 } 291 if (r < 0 && errno == EINTR) 292 continue; 293 return 1; 294 } 295 return 0; 296 } 297 298 static 299 int 300 writeall(int fd, const void *buf, size_t bytes) 301 { 302 ssize_t r; 303 304 for (;;) { 305 r = write(fd, buf, bytes); 306 if (r == (ssize_t)bytes) 307 break; 308 if (r > 0) { 309 buf = (const char *)buf + r; 310 bytes -= r; 311 continue; 312 } 313 if (r < 0 && errno == EINTR) 314 continue; 315 return 1; 316 } 317 return 0; 318 } 319 320 #endif 321 322 static void * 323 bulkthread(void *arg) 324 { 325 job_t *job = arg; 326 bulk_t *bulk; 327 328 pthread_mutex_lock(&BulkMutex); 329 for (;;) { 330 if (job->terminate) 331 break; 332 if (job->active == NULL) 333 pthread_cond_wait(&job->cond, &BulkMutex); 334 bulk = job->active; 335 if (bulk) { 336 bulk->state = ISRUNNING; 337 338 pthread_mutex_unlock(&BulkMutex); 339 BulkFunc(bulk); 340 pthread_mutex_lock(&BulkMutex); 341 342 bulk->state = ONRESPONSE; 343 bulk->next = NULL; 344 *BulkResponseTail = bulk; 345 BulkResponseTail = &bulk->next; 346 --BulkCurJobs; 347 pthread_cond_signal(&BulkResponseCond); 348 } 349 350 /* 351 * Optimization - automatically fetch the next job 352 */ 353 if ((bulk = BulkSubmit) != NULL && job->terminate == 0) { 354 BulkSubmit = bulk->next; 355 if (BulkSubmit == NULL) 356 BulkSubmitTail = &BulkSubmit; 357 bulk->state = ONRUN; 358 job->active = bulk; 359 ++BulkCurJobs; 360 } else { 361 job->active = NULL; 362 } 363 } 364 pthread_mutex_unlock(&BulkMutex); 365 366 return NULL; 367 } 368