1 /* 2 * Copyright (c) 2019 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * This code uses concepts and configuration based on 'synth', by 8 * John R. Marino <draco@marino.st>, which was written in ada. 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 14 * 1. Redistributions of source code must retain the above copyright 15 * notice, this list of conditions and the following disclaimer. 16 * 2. Redistributions in binary form must reproduce the above copyright 17 * notice, this list of conditions and the following disclaimer in 18 * the documentation and/or other materials provided with the 19 * distribution. 20 * 3. Neither the name of The DragonFly Project nor the names of its 21 * contributors may be used to endorse or promote products derived 22 * from this software without specific, prior written permission. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 25 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 27 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 28 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 29 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 30 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 31 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 32 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 33 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 34 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 35 * SUCH DAMAGE. 36 */ 37 #include "dsynth.h" 38 39 typedef struct job { 40 pthread_t td; 41 pthread_cond_t cond; 42 bulk_t *active; 43 int terminate : 1; 44 } job_t; 45 46 /* 47 * Most of these globals are locked with BulkMutex 48 */ 49 static int BulkScanJob; 50 static int BulkCurJobs; 51 static int BulkMaxJobs; 52 static job_t JobsAry[MAXBULK]; 53 static void (*BulkFunc)(bulk_t *bulk); 54 static bulk_t *BulkSubmit; 55 static bulk_t **BulkSubmitTail = &BulkSubmit; 56 static bulk_t *BulkResponse; 57 static bulk_t **BulkResponseTail = &BulkResponse; 58 static pthread_cond_t BulkResponseCond; 59 static pthread_mutex_t BulkMutex; 60 61 static void bulkstart(void); 62 #if 0 63 static int readall(int fd, void *buf, size_t bytes); 64 static int writeall(int fd, const void *buf, size_t bytes); 65 #endif 66 static void *bulkthread(void *arg); 67 68 /* 69 * Initialize for bulk scan operations. Always paired with donebulk() 70 */ 71 void 72 initbulk(void (*func)(bulk_t *bulk), int jobs) 73 { 74 int i; 75 76 if (jobs > MAXBULK) 77 jobs = MAXBULK; 78 79 ddassert(BulkSubmit == NULL); 80 BulkCurJobs = 0; 81 BulkMaxJobs = jobs; 82 BulkFunc = func; 83 BulkScanJob = 0; 84 85 addbuildenv("__MAKE_CONF", "/dev/null", BENV_ENVIRONMENT); 86 87 pthread_mutex_init(&BulkMutex, NULL); 88 pthread_cond_init(&BulkResponseCond, NULL); 89 90 pthread_mutex_lock(&BulkMutex); 91 for (i = 0; i < jobs; ++i) { 92 pthread_cond_init(&JobsAry[i].cond, NULL); 93 pthread_create(&JobsAry[i].td, NULL, bulkthread, &JobsAry[i]); 94 } 95 pthread_mutex_unlock(&BulkMutex); 96 } 97 98 void 99 donebulk(void) 100 { 101 bulk_t *bulk; 102 int i; 103 104 pthread_mutex_lock(&BulkMutex); 105 while ((bulk = BulkSubmit) != NULL) { 106 BulkSubmit = bulk->next; 107 freebulk(bulk); 108 } 109 BulkSubmitTail = &BulkSubmit; 110 111 for (i = 0; i < BulkMaxJobs; ++i) { 112 JobsAry[i].terminate = 1; 113 pthread_cond_signal(&JobsAry[i].cond); 114 } 115 pthread_mutex_unlock(&BulkMutex); 116 for (i = 0; i < BulkMaxJobs; ++i) { 117 pthread_join(JobsAry[i].td, NULL); 118 pthread_cond_destroy(&JobsAry[i].cond); 119 if (JobsAry[i].active) { 120 freebulk(JobsAry[i].active); 121 JobsAry[i].active = NULL; 122 pthread_mutex_lock(&BulkMutex); 123 --BulkCurJobs; 124 pthread_mutex_unlock(&BulkMutex); 125 } 126 JobsAry[i].terminate = 0; 127 } 128 ddassert(BulkCurJobs == 0); 129 130 while ((bulk = BulkResponse) != NULL) { 131 BulkResponse = bulk->next; 132 freebulk(bulk); 133 } 134 BulkResponseTail = &BulkResponse; 135 136 BulkFunc = NULL; 137 138 bzero(JobsAry, sizeof(JobsAry)); 139 140 delbuildenv("__MAKE_CONF"); 141 } 142 143 void 144 queuebulk(const char *s1, const char *s2, const char *s3, const char *s4) 145 { 146 bulk_t *bulk; 147 148 bulk = calloc(1, sizeof(*bulk)); 149 if (s1) 150 bulk->s1 = strdup(s1); 151 if (s2) 152 bulk->s2 = strdup(s2); 153 if (s3) 154 bulk->s3 = strdup(s3); 155 if (s4) 156 bulk->s4 = strdup(s4); 157 bulk->state = ONSUBMIT; 158 159 pthread_mutex_lock(&BulkMutex); 160 *BulkSubmitTail = bulk; 161 BulkSubmitTail = &bulk->next; 162 if (BulkCurJobs < BulkMaxJobs) { 163 pthread_mutex_unlock(&BulkMutex); 164 bulkstart(); 165 } else { 166 pthread_mutex_unlock(&BulkMutex); 167 } 168 } 169 170 /* 171 * Fill any idle job slots with new jobs as available. 172 */ 173 static 174 void 175 bulkstart(void) 176 { 177 bulk_t *bulk; 178 int i; 179 180 pthread_mutex_lock(&BulkMutex); 181 while ((bulk = BulkSubmit) != NULL && BulkCurJobs < BulkMaxJobs) { 182 i = BulkScanJob + 1; 183 for (;;) { 184 i = i % BulkMaxJobs; 185 if (JobsAry[i].active == NULL) 186 break; 187 ++i; 188 } 189 BulkScanJob = i; 190 BulkSubmit = bulk->next; 191 if (BulkSubmit == NULL) 192 BulkSubmitTail = &BulkSubmit; 193 194 bulk->state = ONRUN; 195 JobsAry[i].active = bulk; 196 pthread_cond_signal(&JobsAry[i].cond); 197 ++BulkCurJobs; 198 } 199 pthread_mutex_unlock(&BulkMutex); 200 } 201 202 /* 203 * Retrieve completed job or job with activity 204 */ 205 bulk_t * 206 getbulk(void) 207 { 208 bulk_t *bulk; 209 210 pthread_mutex_lock(&BulkMutex); 211 while (BulkCurJobs && BulkResponse == NULL) { 212 pthread_cond_wait(&BulkResponseCond, &BulkMutex); 213 } 214 if (BulkResponse) { 215 bulk = BulkResponse; 216 ddassert(bulk->state == ONRESPONSE); 217 BulkResponse = bulk->next; 218 if (BulkResponse == NULL) 219 BulkResponseTail = &BulkResponse; 220 bulk->state = UNLISTED; 221 } else { 222 bulk = NULL; 223 } 224 pthread_mutex_unlock(&BulkMutex); 225 bulkstart(); 226 227 return bulk; 228 } 229 230 void 231 freebulk(bulk_t *bulk) 232 { 233 ddassert(bulk->state == UNLISTED); 234 235 if (bulk->s1) { 236 free(bulk->s1); 237 bulk->s1 = NULL; 238 } 239 if (bulk->s2) { 240 free(bulk->s2); 241 bulk->s2 = NULL; 242 } 243 if (bulk->s3) { 244 free(bulk->s3); 245 bulk->s3 = NULL; 246 } 247 if (bulk->s4) { 248 free(bulk->s4); 249 bulk->s4 = NULL; 250 } 251 if (bulk->r1) { 252 free(bulk->r1); 253 bulk->r1 = NULL; 254 } 255 if (bulk->r2) { 256 free(bulk->r2); 257 bulk->r2 = NULL; 258 } 259 if (bulk->r3) { 260 free(bulk->r3); 261 bulk->r3 = NULL; 262 } 263 if (bulk->r4) { 264 free(bulk->r4); 265 bulk->r4 = NULL; 266 } 267 free(bulk); 268 } 269 270 #if 0 271 272 /* 273 * Returns non-zero if unable to read specified number of bytes 274 */ 275 static 276 int 277 readall(int fd, void *buf, size_t bytes) 278 { 279 ssize_t r; 280 281 for (;;) { 282 r = read(fd, buf, bytes); 283 if (r == (ssize_t)bytes) 284 break; 285 if (r > 0) { 286 buf = (char *)buf + r; 287 bytes -= r; 288 continue; 289 } 290 if (r < 0 && errno == EINTR) 291 continue; 292 return 1; 293 } 294 return 0; 295 } 296 297 static 298 int 299 writeall(int fd, const void *buf, size_t bytes) 300 { 301 ssize_t r; 302 303 for (;;) { 304 r = write(fd, buf, bytes); 305 if (r == (ssize_t)bytes) 306 break; 307 if (r > 0) { 308 buf = (const char *)buf + r; 309 bytes -= r; 310 continue; 311 } 312 if (r < 0 && errno == EINTR) 313 continue; 314 return 1; 315 } 316 return 0; 317 } 318 319 #endif 320 321 static void * 322 bulkthread(void *arg) 323 { 324 job_t *job = arg; 325 bulk_t *bulk; 326 327 pthread_mutex_lock(&BulkMutex); 328 for (;;) { 329 if (job->terminate) 330 break; 331 if (job->active == NULL) 332 pthread_cond_wait(&job->cond, &BulkMutex); 333 bulk = job->active; 334 if (bulk) { 335 bulk->state = ISRUNNING; 336 337 pthread_mutex_unlock(&BulkMutex); 338 BulkFunc(bulk); 339 pthread_mutex_lock(&BulkMutex); 340 341 bulk->state = ONRESPONSE; 342 bulk->next = NULL; 343 *BulkResponseTail = bulk; 344 BulkResponseTail = &bulk->next; 345 --BulkCurJobs; 346 pthread_cond_signal(&BulkResponseCond); 347 } 348 349 /* 350 * Optimization - automatically fetch the next job 351 */ 352 if ((bulk = BulkSubmit) != NULL && job->terminate == 0) { 353 BulkSubmit = bulk->next; 354 if (BulkSubmit == NULL) 355 BulkSubmitTail = &BulkSubmit; 356 bulk->state = ONRUN; 357 job->active = bulk; 358 ++BulkCurJobs; 359 } else { 360 job->active = NULL; 361 } 362 } 363 pthread_mutex_unlock(&BulkMutex); 364 365 return NULL; 366 } 367