1 /* 2 * Copyright (c) 2019 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * This code uses concepts and configuration based on 'synth', by 8 * John R. Marino <draco@marino.st>, which was written in ada. 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 14 * 1. Redistributions of source code must retain the above copyright 15 * notice, this list of conditions and the following disclaimer. 16 * 2. Redistributions in binary form must reproduce the above copyright 17 * notice, this list of conditions and the following disclaimer in 18 * the documentation and/or other materials provided with the 19 * distribution. 20 * 3. Neither the name of The DragonFly Project nor the names of its 21 * contributors may be used to endorse or promote products derived 22 * from this software without specific, prior written permission. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 25 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 27 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 28 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 29 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 30 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 31 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 32 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 33 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 34 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 35 * SUCH DAMAGE. 36 */ 37 #include "dsynth.h" 38 39 typedef struct job { 40 pthread_t td; 41 pthread_cond_t cond; 42 bulk_t *active; 43 int terminate : 1; 44 } job_t; 45 46 /* 47 * Most of these globals are locked with BulkMutex 48 */ 49 static int BulkScanJob; 50 static int BulkCurJobs; 51 static int BulkMaxJobs; 52 static job_t JobsAry[MAXBULK]; 53 static void (*BulkFunc)(bulk_t *bulk); 54 static bulk_t *BulkSubmit; 55 static bulk_t **BulkSubmitTail = &BulkSubmit; 56 static bulk_t *BulkResponse; 57 static bulk_t **BulkResponseTail = &BulkResponse; 58 static pthread_cond_t BulkResponseCond; 59 static pthread_mutex_t BulkMutex; 60 61 static void bulkstart(void); 62 #if 0 63 static int readall(int fd, void *buf, size_t bytes); 64 static int writeall(int fd, const void *buf, size_t bytes); 65 #endif 66 static void *bulkthread(void *arg); 67 68 void 69 initbulk(void (*func)(bulk_t *bulk), int jobs) 70 { 71 int i; 72 73 if (jobs > MAXBULK) 74 jobs = MAXBULK; 75 76 ddassert(BulkSubmit == NULL); 77 BulkCurJobs = 0; 78 BulkMaxJobs = jobs; 79 BulkFunc = func; 80 BulkScanJob = 0; 81 82 pthread_mutex_init(&BulkMutex, NULL); 83 pthread_cond_init(&BulkResponseCond, NULL); 84 85 pthread_mutex_lock(&BulkMutex); 86 for (i = 0; i < jobs; ++i) { 87 pthread_cond_init(&JobsAry[i].cond, NULL); 88 pthread_create(&JobsAry[i].td, NULL, bulkthread, &JobsAry[i]); 89 } 90 pthread_mutex_unlock(&BulkMutex); 91 } 92 93 void 94 donebulk(void) 95 { 96 bulk_t *bulk; 97 int i; 98 99 pthread_mutex_lock(&BulkMutex); 100 while ((bulk = BulkSubmit) != NULL) { 101 BulkSubmit = bulk->next; 102 freebulk(bulk); 103 } 104 BulkSubmitTail = &BulkSubmit; 105 106 for (i = 0; i < BulkMaxJobs; ++i) { 107 JobsAry[i].terminate = 1; 108 pthread_cond_signal(&JobsAry[i].cond); 109 } 110 pthread_mutex_unlock(&BulkMutex); 111 for (i = 0; i < BulkMaxJobs; ++i) { 112 pthread_join(JobsAry[i].td, NULL); 113 pthread_cond_destroy(&JobsAry[i].cond); 114 if (JobsAry[i].active) { 115 freebulk(JobsAry[i].active); 116 JobsAry[i].active = NULL; 117 pthread_mutex_lock(&BulkMutex); 118 --BulkCurJobs; 119 pthread_mutex_unlock(&BulkMutex); 120 } 121 JobsAry[i].terminate = 0; 122 } 123 ddassert(BulkCurJobs == 0); 124 125 while ((bulk = BulkResponse) != NULL) { 126 BulkResponse = bulk->next; 127 freebulk(bulk); 128 } 129 BulkResponseTail = &BulkResponse; 130 131 BulkFunc = NULL; 132 133 bzero(JobsAry, sizeof(JobsAry)); 134 } 135 136 void 137 queuebulk(const char *s1, const char *s2, const char *s3, const char *s4) 138 { 139 bulk_t *bulk; 140 141 bulk = calloc(1, sizeof(*bulk)); 142 if (s1) 143 bulk->s1 = strdup(s1); 144 if (s2) 145 bulk->s2 = strdup(s2); 146 if (s3) 147 bulk->s3 = strdup(s3); 148 if (s4) 149 bulk->s4 = strdup(s4); 150 bulk->state = ONSUBMIT; 151 152 pthread_mutex_lock(&BulkMutex); 153 *BulkSubmitTail = bulk; 154 BulkSubmitTail = &bulk->next; 155 if (BulkCurJobs < BulkMaxJobs) { 156 pthread_mutex_unlock(&BulkMutex); 157 bulkstart(); 158 } else { 159 pthread_mutex_unlock(&BulkMutex); 160 } 161 } 162 163 /* 164 * Fill any idle job slots with new jobs as available. 165 */ 166 static 167 void 168 bulkstart(void) 169 { 170 bulk_t *bulk; 171 int i; 172 173 pthread_mutex_lock(&BulkMutex); 174 while ((bulk = BulkSubmit) != NULL && BulkCurJobs < BulkMaxJobs) { 175 i = BulkScanJob + 1; 176 for (;;) { 177 i = i % BulkMaxJobs; 178 if (JobsAry[i].active == NULL) 179 break; 180 ++i; 181 } 182 BulkScanJob = i; 183 BulkSubmit = bulk->next; 184 if (BulkSubmit == NULL) 185 BulkSubmitTail = &BulkSubmit; 186 187 bulk->state = ONRUN; 188 JobsAry[i].active = bulk; 189 pthread_cond_signal(&JobsAry[i].cond); 190 ++BulkCurJobs; 191 } 192 pthread_mutex_unlock(&BulkMutex); 193 } 194 195 /* 196 * Retrieve completed job or job with activity 197 */ 198 bulk_t * 199 getbulk(void) 200 { 201 bulk_t *bulk; 202 203 pthread_mutex_lock(&BulkMutex); 204 while (BulkCurJobs && BulkResponse == NULL) { 205 pthread_cond_wait(&BulkResponseCond, &BulkMutex); 206 } 207 if (BulkResponse) { 208 bulk = BulkResponse; 209 ddassert(bulk->state == ONRESPONSE); 210 BulkResponse = bulk->next; 211 if (BulkResponse == NULL) 212 BulkResponseTail = &BulkResponse; 213 bulk->state = UNLISTED; 214 } else { 215 bulk = NULL; 216 } 217 pthread_mutex_unlock(&BulkMutex); 218 bulkstart(); 219 220 return bulk; 221 } 222 223 void 224 freebulk(bulk_t *bulk) 225 { 226 ddassert(bulk->state == UNLISTED); 227 228 if (bulk->s1) { 229 free(bulk->s1); 230 bulk->s1 = NULL; 231 } 232 if (bulk->s2) { 233 free(bulk->s2); 234 bulk->s2 = NULL; 235 } 236 if (bulk->s3) { 237 free(bulk->s3); 238 bulk->s3 = NULL; 239 } 240 if (bulk->s4) { 241 free(bulk->s4); 242 bulk->s4 = NULL; 243 } 244 if (bulk->r1) { 245 free(bulk->r1); 246 bulk->r1 = NULL; 247 } 248 if (bulk->r2) { 249 free(bulk->r2); 250 bulk->r2 = NULL; 251 } 252 if (bulk->r3) { 253 free(bulk->r3); 254 bulk->r3 = NULL; 255 } 256 if (bulk->r4) { 257 free(bulk->r4); 258 bulk->r4 = NULL; 259 } 260 free(bulk); 261 } 262 263 #if 0 264 265 /* 266 * Returns non-zero if unable to read specified number of bytes 267 */ 268 static 269 int 270 readall(int fd, void *buf, size_t bytes) 271 { 272 ssize_t r; 273 274 for (;;) { 275 r = read(fd, buf, bytes); 276 if (r == (ssize_t)bytes) 277 break; 278 if (r > 0) { 279 buf = (char *)buf + r; 280 bytes -= r; 281 continue; 282 } 283 if (r < 0 && errno == EINTR) 284 continue; 285 return 1; 286 } 287 return 0; 288 } 289 290 static 291 int 292 writeall(int fd, const void *buf, size_t bytes) 293 { 294 ssize_t r; 295 296 for (;;) { 297 r = write(fd, buf, bytes); 298 if (r == (ssize_t)bytes) 299 break; 300 if (r > 0) { 301 buf = (const char *)buf + r; 302 bytes -= r; 303 continue; 304 } 305 if (r < 0 && errno == EINTR) 306 continue; 307 return 1; 308 } 309 return 0; 310 } 311 312 #endif 313 314 static void * 315 bulkthread(void *arg) 316 { 317 job_t *job = arg; 318 bulk_t *bulk; 319 320 pthread_mutex_lock(&BulkMutex); 321 for (;;) { 322 if (job->terminate) 323 break; 324 if (job->active == NULL) 325 pthread_cond_wait(&job->cond, &BulkMutex); 326 bulk = job->active; 327 if (bulk) { 328 bulk->state = ISRUNNING; 329 330 pthread_mutex_unlock(&BulkMutex); 331 BulkFunc(bulk); 332 pthread_mutex_lock(&BulkMutex); 333 334 bulk->state = ONRESPONSE; 335 bulk->next = NULL; 336 *BulkResponseTail = bulk; 337 BulkResponseTail = &bulk->next; 338 --BulkCurJobs; 339 pthread_cond_signal(&BulkResponseCond); 340 } 341 342 /* 343 * Optimization - automatically fetch the next job 344 */ 345 if ((bulk = BulkSubmit) != NULL && job->terminate == 0) { 346 BulkSubmit = bulk->next; 347 if (BulkSubmit == NULL) 348 BulkSubmitTail = &BulkSubmit; 349 bulk->state = ONRUN; 350 job->active = bulk; 351 ++BulkCurJobs; 352 } else { 353 job->active = NULL; 354 } 355 } 356 pthread_mutex_unlock(&BulkMutex); 357 358 return NULL; 359 } 360