1 /* 2 * This file and its contents are supplied under the terms of the 3 * Common Development and Distribution License ("CDDL"), version 1.0. 4 * You may only use this file in accordance with the terms of version 5 * 1.0 of the CDDL. 6 * 7 * A full copy of the text of the CDDL should have accompanied this 8 * source. A copy of the CDDL is also available via the Internet at 9 * http://www.illumos.org/license/CDDL. 10 */ 11 12 /* 13 * Copyright 2015 Joyent, Inc. 14 */ 15 16 /* 17 * Merge queue 18 * 19 * A multi-threaded merging queue. 20 * 21 * The general constraint of the merge queue is that if a set of items are 22 * inserted into the queue in the same order, then no matter how many threads 23 * are on the scene, we will always process the items in the same order. The 24 * secondary constraint is that to support environments that must be 25 * single-threaded, we explicitly *must not* create a thread in the case where 26 * the number of requested threads is just one. 27 * 28 * To that end, we've designed our queue as a circular buffer. We will grow that 29 * buffer to contain enough space for all the input items, after which we'll 30 * then treat it as a circular buffer. 31 * 32 * Items will be issued to a processing function two at a time, until there is 33 * only one item remaining in the queue, at which point we will be doing any 34 * merging work. 35 * 36 * A given queue has three different entries that we care about tracking: 37 * 38 * o mq_nproc - What is the slot of the next item to process for something 39 * looking for work. 40 * 41 * o mq_next - What is the slot of the next item that should be inserted into 42 * the queue. 43 * 44 * o mq_ncommit - What is the slot of the next item that should be committed. 45 * 46 * When a thread comes and looks for work, we pop entries off of the queue based 47 * on the index provided by mq_nproc. At the same time, it also gets the slot 48 * that it should place the result in, which is mq_next. However, because we 49 * have multiple threads that are operating on the system, we want to make sure 50 * that we push things onto the queue in order. We do that by allocating a slot 51 * to each task and when it completes, it waits for its slot to be ready based 52 * on it being the value of mq_ncommit. 53 * 54 * In addition, we keep track of the number of items in the queue as well as the 55 * number of active workers. There's also a generation count that is used to 56 * figure out when the various values might lap one another. 57 * 58 * The following images show what happens when we have a queue with six items 59 * and whose capacity has been shrunk to six, to better fit in the screen. 60 * 61 * 62 * 1) This is the initial configuration of the queue right before any processing 63 * is done in the context of mergeq_merge(). Every box has an initial item for 64 * merging in it (represented by an 'x'). Here, the mq_nproc, mq_next, and 65 * mq_ncommit will all point at the initial entry. However, the mq_next has 66 * already lapped around the array and thus has a generation count of one. 67 * 68 * The '+' characters indicate which bucket the corresponding value of mq_nproc, 69 * mq_ncommit, and mq_nproc. 70 * 71 * +---++---++---++---++---++---+ 72 * | X || X || X || X || X || X | 73 * +---++---++---++---++---++---+ 74 * mq_next (g1) + 75 * mq_ncommit (g0) + 76 * mq_nproc (g0) + 77 * 78 * 2) This shows the state right as the first thread begins to process an entry. 79 * Note in this example we will have two threads processing this queue. Note, 80 * mq_ncommit has not advanced. This is because the first thread has started 81 * processing entries, but it has not finished, and thus we can't commit it. 82 * We've incremented mq_next by one because it has gone ahead and assigned a 83 * single entry. We've incremented mq_nproc by two, because we have removed two 84 * entries and thus will have another set available. 85 * 86 * +---++---++---++---++---++---+ t1 - slot 0 87 * | || || X || X || X || X | t2 - idle 88 * +---++---++---++---++---++---+ 89 * mq_next (g1) + 90 * mq_ncommit (g0) + 91 * mq_nproc (g0) + 92 * 93 * 94 * 3) This shows the state right after the second thread begins to process an 95 * entry, note that the first thread has not finished. The changes are very 96 * similar to the previous state, we've advanced, mq_nproc and mq_next, but not 97 * mq_ncommit. 98 * 99 * +---++---++---++---++---++---+ t1 - slot 0 100 * | || || || || X || X | t2 - slot 1 101 * +---++---++---++---++---++---+ 102 * mq_next (g1) + 103 * mq_ncommit (g0) + 104 * mq_nproc (g0) + 105 * 106 * 4) This shows the state after thread one has finished processing an item, but 107 * before it does anything else. Note that even if thread two finishes early, it 108 * cannot commit its item until thread one finishes. Here 'Y' refers to the 109 * result of merging the first two 'X's. 110 * 111 * +---++---++---++---++---++---+ t1 - idle 112 * | Y || || || || X || X | t2 - slot 1 113 * +---++---++---++---++---++---+ 114 * mq_next (g1) + 115 * mq_ncommit (g0) + 116 * mq_nproc (g0) + 117 * 118 * 5) This shows the state after thread one has begun to process the next round 119 * and after thread two has committed, but before it begins processing the next 120 * item. Note that mq_nproc has wrapped around and we've bumped its generation 121 * counter. 122 * 123 * +---++---++---++---++---++---+ t1 - slot 2 124 * | Y || Y || || || || | t2 - idle 125 * +---++---++---++---++---++---+ 126 * mq_next (g1) + 127 * mq_ncommit (g0) + 128 * mq_nproc (g0) + 129 * 130 * 6) Here, thread two, will take the next two Y values and thread 1 will commit 131 * its 'Y'. Thread one now must wait until thread two finishes such that it can 132 * do additional work. 133 * 134 * +---++---++---++---++---++---+ t1 - waiting 135 * | || || Y || || || | t2 - slot 3 136 * +---++---++---++---++---++---+ 137 * mq_next (g1) + 138 * mq_ncommit (g0) + 139 * mq_nproc (g0) + 140 * 141 * 7) Here, thread two has committed and thread one is about to go process the 142 * final entry. The character 'Z' represents the results of merging two 'Y's. 143 * 144 * +---++---++---++---++---++---+ t1 - idle 145 * | || || Y || Z || || | t2 - idle 146 * +---++---++---++---++---++---+ 147 * mq_next (g1) + 148 * mq_ncommit (g0) + 149 * mq_nproc (g0) + 150 * 151 * 8) Here, thread one is processing the final item. Thread two is waiting in 152 * mergeq_pop() for enough items to be available. In this case, it will never 153 * happen; however, once all threads have finished it will break out. 154 * 155 * +---++---++---++---++---++---+ t1 - slot 4 156 * | || || || || || | t2 - idle 157 * +---++---++---++---++---++---+ 158 * mq_next (g1) + 159 * mq_ncommit (g0) + 160 * mq_nproc (g0) + 161 * 162 * 9) This is the final state of the queue, it has a single '*' item which is 163 * the final merge result. At this point, both thread one and thread two would 164 * stop processing and we'll return the result to the user. 165 * 166 * +---++---++---++---++---++---+ t1 - slot 4 167 * | || || || || * || | t2 - idle 168 * +---++---++---++---++---++---+ 169 * mq_next (g1) + 170 * mq_ncommit (g0) + 171 * mq_nproc (g0) + 172 * 173 * 174 * Note, that if at any point in time the processing function fails, then all 175 * the merges will quiesce and that error will be propagated back to the user. 176 */ 177 178 #include <strings.h> 179 #include <sys/debug.h> 180 #include <thread.h> 181 #include <synch.h> 182 #include <errno.h> 183 #include <limits.h> 184 #include <stdlib.h> 185 186 #include "mergeq.h" 187 188 struct mergeq { 189 mutex_t mq_lock; /* Protects items below */ 190 cond_t mq_cond; /* Condition variable */ 191 void **mq_items; /* Array of items to process */ 192 size_t mq_nitems; /* Number of items in the queue */ 193 size_t mq_cap; /* Capacity of the items */ 194 size_t mq_next; /* Place to put next entry */ 195 size_t mq_gnext; /* Generation for next */ 196 size_t mq_nproc; /* Index of next thing to process */ 197 size_t mq_gnproc; /* Generation for next proc */ 198 size_t mq_ncommit; /* Index of the next thing to commit */ 199 size_t mq_gncommit; /* Commit generation */ 200 uint_t mq_nactthrs; /* Number of active threads */ 201 uint_t mq_ndthreads; /* Desired number of threads */ 202 thread_t *mq_thrs; /* Actual threads */ 203 mergeq_proc_f *mq_func; /* Processing function */ 204 void *mq_arg; /* Argument for processing */ 205 boolean_t mq_working; /* Are we working on processing */ 206 boolean_t mq_iserror; /* Have we encountered an error? */ 207 int mq_error; 208 }; 209 210 #define MERGEQ_DEFAULT_CAP 64 211 212 static int 213 mergeq_error(int err) 214 { 215 errno = err; 216 return (MERGEQ_ERROR); 217 } 218 219 void 220 mergeq_fini(mergeq_t *mqp) 221 { 222 if (mqp == NULL) 223 return; 224 225 VERIFY(mqp->mq_working != B_TRUE); 226 227 if (mqp->mq_items != NULL) 228 mergeq_free(mqp->mq_items, sizeof (void *) * mqp->mq_cap); 229 if (mqp->mq_ndthreads > 0) { 230 mergeq_free(mqp->mq_thrs, sizeof (thread_t) * 231 mqp->mq_ndthreads); 232 } 233 VERIFY0(cond_destroy(&mqp->mq_cond)); 234 VERIFY0(mutex_destroy(&mqp->mq_lock)); 235 mergeq_free(mqp, sizeof (mergeq_t)); 236 } 237 238 int 239 mergeq_init(mergeq_t **outp, uint_t nthrs) 240 { 241 int ret; 242 mergeq_t *mqp; 243 244 mqp = mergeq_alloc(sizeof (mergeq_t)); 245 if (mqp == NULL) 246 return (mergeq_error(ENOMEM)); 247 248 bzero(mqp, sizeof (mergeq_t)); 249 mqp->mq_items = mergeq_alloc(sizeof (void *) * MERGEQ_DEFAULT_CAP); 250 if (mqp->mq_items == NULL) { 251 mergeq_free(mqp, sizeof (mergeq_t)); 252 return (mergeq_error(ENOMEM)); 253 } 254 bzero(mqp->mq_items, sizeof (void *) * MERGEQ_DEFAULT_CAP); 255 256 mqp->mq_ndthreads = nthrs - 1; 257 if (mqp->mq_ndthreads > 0) { 258 mqp->mq_thrs = mergeq_alloc(sizeof (thread_t) * 259 mqp->mq_ndthreads); 260 if (mqp->mq_thrs == NULL) { 261 mergeq_free(mqp->mq_items, sizeof (void *) * 262 MERGEQ_DEFAULT_CAP); 263 mergeq_free(mqp, sizeof (mergeq_t)); 264 return (mergeq_error(ENOMEM)); 265 } 266 } 267 268 if ((ret = mutex_init(&mqp->mq_lock, USYNC_THREAD | LOCK_ERRORCHECK, 269 NULL)) != 0) { 270 if (mqp->mq_ndthreads > 0) { 271 mergeq_free(mqp->mq_thrs, 272 sizeof (thread_t) * mqp->mq_ndthreads); 273 } 274 mergeq_free(mqp->mq_items, sizeof (void *) * 275 MERGEQ_DEFAULT_CAP); 276 mergeq_free(mqp, sizeof (mergeq_t)); 277 return (mergeq_error(ret)); 278 } 279 280 if ((ret = cond_init(&mqp->mq_cond, USYNC_THREAD, NULL)) != 0) { 281 VERIFY0(mutex_destroy(&mqp->mq_lock)); 282 if (mqp->mq_ndthreads > 0) { 283 mergeq_free(mqp->mq_thrs, 284 sizeof (thread_t) * mqp->mq_ndthreads); 285 } 286 mergeq_free(mqp->mq_items, sizeof (void *) * 287 MERGEQ_DEFAULT_CAP); 288 mergeq_free(mqp, sizeof (mergeq_t)); 289 return (mergeq_error(ret)); 290 } 291 292 mqp->mq_cap = MERGEQ_DEFAULT_CAP; 293 *outp = mqp; 294 return (0); 295 } 296 297 static void 298 mergeq_reset(mergeq_t *mqp) 299 { 300 VERIFY(MUTEX_HELD(&mqp->mq_lock)); 301 VERIFY(mqp->mq_working == B_FALSE); 302 if (mqp->mq_cap != 0) 303 bzero(mqp->mq_items, sizeof (void *) * mqp->mq_cap); 304 mqp->mq_nitems = 0; 305 mqp->mq_next = 0; 306 mqp->mq_gnext = 0; 307 mqp->mq_nproc = 0; 308 mqp->mq_gnproc = 0; 309 mqp->mq_ncommit = 0; 310 mqp->mq_gncommit = 0; 311 mqp->mq_func = NULL; 312 mqp->mq_arg = NULL; 313 mqp->mq_iserror = B_FALSE; 314 mqp->mq_error = 0; 315 } 316 317 static int 318 mergeq_grow(mergeq_t *mqp) 319 { 320 size_t ncap; 321 void **items; 322 323 VERIFY(MUTEX_HELD(&mqp->mq_lock)); 324 VERIFY(mqp->mq_working == B_FALSE); 325 326 if (SIZE_MAX - mqp->mq_cap < MERGEQ_DEFAULT_CAP) 327 return (ENOSPC); 328 329 ncap = mqp->mq_cap + MERGEQ_DEFAULT_CAP; 330 items = mergeq_alloc(ncap * sizeof (void *)); 331 if (items == NULL) 332 return (ENOMEM); 333 334 bzero(items, ncap * sizeof (void *)); 335 bcopy(mqp->mq_items, items, mqp->mq_cap * sizeof (void *)); 336 mergeq_free(mqp->mq_items, sizeof (mqp->mq_cap) * sizeof (void *)); 337 mqp->mq_items = items; 338 mqp->mq_cap = ncap; 339 return (0); 340 } 341 342 int 343 mergeq_add(mergeq_t *mqp, void *item) 344 { 345 VERIFY0(mutex_lock(&mqp->mq_lock)); 346 if (mqp->mq_working == B_TRUE) { 347 VERIFY0(mutex_unlock(&mqp->mq_lock)); 348 return (mergeq_error(ENXIO)); 349 } 350 351 if (mqp->mq_next == mqp->mq_cap) { 352 int ret; 353 354 if ((ret = mergeq_grow(mqp)) != 0) { 355 VERIFY0(mutex_unlock(&mqp->mq_lock)); 356 return (mergeq_error(ret)); 357 } 358 } 359 mqp->mq_items[mqp->mq_next] = item; 360 mqp->mq_next++; 361 mqp->mq_nitems++; 362 363 VERIFY0(mutex_unlock(&mqp->mq_lock)); 364 return (0); 365 } 366 367 static size_t 368 mergeq_slot(mergeq_t *mqp) 369 { 370 size_t s; 371 372 VERIFY(MUTEX_HELD(&mqp->mq_lock)); 373 VERIFY(mqp->mq_next < mqp->mq_cap); 374 375 /* 376 * This probably should be a cv / wait thing. 377 */ 378 VERIFY(mqp->mq_nproc != (mqp->mq_next + 1) % mqp->mq_cap); 379 380 s = mqp->mq_next; 381 mqp->mq_next++; 382 if (mqp->mq_next == mqp->mq_cap) { 383 mqp->mq_next %= mqp->mq_cap; 384 mqp->mq_gnext++; 385 } 386 387 return (s); 388 } 389 390 /* 391 * Internal function to push items onto the queue which is now a circular 392 * buffer. This should only be used once we begin working on the queue. 393 */ 394 static void 395 mergeq_push(mergeq_t *mqp, size_t slot, void *item) 396 { 397 VERIFY(MUTEX_HELD(&mqp->mq_lock)); 398 VERIFY(slot < mqp->mq_cap); 399 400 /* 401 * We need to verify that we don't push over something that exists. 402 * Based on the design, this should never happen. However, in the face 403 * of bugs, anything is possible. 404 */ 405 while (mqp->mq_ncommit != slot && mqp->mq_iserror == B_FALSE) 406 (void) cond_wait(&mqp->mq_cond, &mqp->mq_lock); 407 408 if (mqp->mq_iserror == B_TRUE) 409 return; 410 411 mqp->mq_items[slot] = item; 412 mqp->mq_nitems++; 413 mqp->mq_ncommit++; 414 if (mqp->mq_ncommit == mqp->mq_cap) { 415 mqp->mq_ncommit %= mqp->mq_cap; 416 mqp->mq_gncommit++; 417 } 418 (void) cond_broadcast(&mqp->mq_cond); 419 } 420 421 static void * 422 mergeq_pop_one(mergeq_t *mqp) 423 { 424 void *out; 425 426 /* 427 * We can't move mq_nproc beyond mq_next if they're on the same 428 * generation. 429 */ 430 VERIFY(mqp->mq_gnext != mqp->mq_gnproc || 431 mqp->mq_nproc != mqp->mq_next); 432 433 out = mqp->mq_items[mqp->mq_nproc]; 434 435 mqp->mq_items[mqp->mq_nproc] = NULL; 436 mqp->mq_nproc++; 437 if (mqp->mq_nproc == mqp->mq_cap) { 438 mqp->mq_nproc %= mqp->mq_cap; 439 mqp->mq_gnproc++; 440 } 441 mqp->mq_nitems--; 442 443 return (out); 444 } 445 446 /* 447 * Pop a set of two entries from the queue. We may not have anything to process 448 * at the moment, eg. be waiting for someone to add something. In which case, 449 * we'll be sitting and waiting. 450 */ 451 static boolean_t 452 mergeq_pop(mergeq_t *mqp, void **first, void **second) 453 { 454 VERIFY(MUTEX_HELD(&mqp->mq_lock)); 455 VERIFY(mqp->mq_nproc < mqp->mq_cap); 456 457 while (mqp->mq_nitems < 2 && mqp->mq_nactthrs > 0 && 458 mqp->mq_iserror == B_FALSE) 459 (void) cond_wait(&mqp->mq_cond, &mqp->mq_lock); 460 461 if (mqp->mq_iserror == B_TRUE) 462 return (B_FALSE); 463 464 if (mqp->mq_nitems < 2 && mqp->mq_nactthrs == 0) { 465 VERIFY(mqp->mq_iserror == B_TRUE || mqp->mq_nitems == 1); 466 return (B_FALSE); 467 } 468 VERIFY(mqp->mq_nitems >= 2); 469 470 *first = mergeq_pop_one(mqp); 471 *second = mergeq_pop_one(mqp); 472 473 return (B_TRUE); 474 } 475 476 static void * 477 mergeq_thr_merge(void *arg) 478 { 479 mergeq_t *mqp = arg; 480 481 VERIFY0(mutex_lock(&mqp->mq_lock)); 482 483 /* 484 * Check to make sure creation worked and if not, fail fast. 485 */ 486 if (mqp->mq_iserror == B_TRUE) { 487 VERIFY0(mutex_unlock(&mqp->mq_lock)); 488 return (NULL); 489 } 490 491 for (;;) { 492 void *first, *second, *out; 493 int ret; 494 size_t slot; 495 496 if (mqp->mq_nitems == 1 && mqp->mq_nactthrs == 0) { 497 VERIFY0(mutex_unlock(&mqp->mq_lock)); 498 return (NULL); 499 } 500 501 if (mergeq_pop(mqp, &first, &second) == B_FALSE) { 502 VERIFY0(mutex_unlock(&mqp->mq_lock)); 503 return (NULL); 504 } 505 slot = mergeq_slot(mqp); 506 507 mqp->mq_nactthrs++; 508 509 VERIFY0(mutex_unlock(&mqp->mq_lock)); 510 ret = mqp->mq_func(first, second, &out, mqp->mq_arg); 511 VERIFY0(mutex_lock(&mqp->mq_lock)); 512 513 if (ret != 0) { 514 if (mqp->mq_iserror == B_FALSE) { 515 mqp->mq_iserror = B_TRUE; 516 mqp->mq_error = ret; 517 (void) cond_broadcast(&mqp->mq_cond); 518 } 519 mqp->mq_nactthrs--; 520 VERIFY0(mutex_unlock(&mqp->mq_lock)); 521 return (NULL); 522 } 523 mergeq_push(mqp, slot, out); 524 mqp->mq_nactthrs--; 525 } 526 } 527 528 int 529 mergeq_merge(mergeq_t *mqp, mergeq_proc_f *func, void *arg, void **outp, 530 int *errp) 531 { 532 int ret, i; 533 boolean_t seterr = B_FALSE; 534 535 if (mqp == NULL || func == NULL || outp == NULL) { 536 return (mergeq_error(EINVAL)); 537 } 538 539 VERIFY0(mutex_lock(&mqp->mq_lock)); 540 if (mqp->mq_working == B_TRUE) { 541 VERIFY0(mutex_unlock(&mqp->mq_lock)); 542 return (mergeq_error(EBUSY)); 543 } 544 545 if (mqp->mq_nitems == 0) { 546 *outp = NULL; 547 mergeq_reset(mqp); 548 VERIFY0(mutex_unlock(&mqp->mq_lock)); 549 return (0); 550 } 551 552 /* 553 * Now that we've finished adding items to the queue, turn it into a 554 * circular buffer. 555 */ 556 mqp->mq_func = func; 557 mqp->mq_arg = arg; 558 mqp->mq_nproc = 0; 559 mqp->mq_working = B_TRUE; 560 if (mqp->mq_next == mqp->mq_cap) { 561 mqp->mq_next %= mqp->mq_cap; 562 mqp->mq_gnext++; 563 } 564 mqp->mq_ncommit = mqp->mq_next; 565 566 ret = 0; 567 for (i = 0; i < mqp->mq_ndthreads; i++) { 568 ret = thr_create(NULL, 0, mergeq_thr_merge, mqp, 0, 569 &mqp->mq_thrs[i]); 570 if (ret != 0) { 571 mqp->mq_iserror = B_TRUE; 572 break; 573 } 574 } 575 576 VERIFY0(mutex_unlock(&mqp->mq_lock)); 577 if (ret == 0) 578 (void) mergeq_thr_merge(mqp); 579 580 for (i = 0; i < mqp->mq_ndthreads; i++) { 581 VERIFY0(thr_join(mqp->mq_thrs[i], NULL, NULL)); 582 } 583 584 VERIFY0(mutex_lock(&mqp->mq_lock)); 585 586 VERIFY(mqp->mq_nactthrs == 0); 587 mqp->mq_working = B_FALSE; 588 if (ret == 0 && mqp->mq_iserror == B_FALSE) { 589 VERIFY(mqp->mq_nitems == 1); 590 *outp = mergeq_pop_one(mqp); 591 } else if (ret == 0 && mqp->mq_iserror == B_TRUE) { 592 ret = MERGEQ_UERROR; 593 if (errp != NULL) 594 *errp = mqp->mq_error; 595 } else { 596 seterr = B_TRUE; 597 } 598 599 mergeq_reset(mqp); 600 VERIFY0(mutex_unlock(&mqp->mq_lock)); 601 602 if (seterr == B_TRUE) 603 return (mergeq_error(ret)); 604 605 return (ret); 606 } 607