1 /*
2 * This file and its contents are supplied under the terms of the
3 * Common Development and Distribution License ("CDDL"), version 1.0.
4 * You may only use this file in accordance with the terms of version
5 * 1.0 of the CDDL.
6 *
7 * A full copy of the text of the CDDL should have accompanied this
8 * source. A copy of the CDDL is also available via the Internet at
9 * http://www.illumos.org/license/CDDL.
10 */
11
12 /*
13 * Copyright 2015 Joyent, Inc.
14 */
15
16 /*
17 * Merge queue
18 *
19 * A multi-threaded merging queue.
20 *
21 * The general constraint of the merge queue is that if a set of items are
22 * inserted into the queue in the same order, then no matter how many threads
23 * are on the scene, we will always process the items in the same order. The
24 * secondary constraint is that to support environments that must be
25 * single-threaded, we explicitly *must not* create a thread in the case where
26 * the number of requested threads is just one.
27 *
28 * To that end, we've designed our queue as a circular buffer. We will grow that
29 * buffer to contain enough space for all the input items, after which we'll
30 * then treat it as a circular buffer.
31 *
32 * Items will be issued to a processing function two at a time, until there is
33 * only one item remaining in the queue, at which point we will be doing any
34 * merging work.
35 *
36 * A given queue has three different entries that we care about tracking:
37 *
38 * o mq_nproc - What is the slot of the next item to process for something
39 * looking for work.
40 *
41 * o mq_next - What is the slot of the next item that should be inserted into
42 * the queue.
43 *
44 * o mq_ncommit - What is the slot of the next item that should be committed.
45 *
46 * When a thread comes and looks for work, we pop entries off of the queue based
47 * on the index provided by mq_nproc. At the same time, it also gets the slot
48 * that it should place the result in, which is mq_next. However, because we
49 * have multiple threads that are operating on the system, we want to make sure
50 * that we push things onto the queue in order. We do that by allocating a slot
51 * to each task and when it completes, it waits for its slot to be ready based
52 * on it being the value of mq_ncommit.
53 *
54 * In addition, we keep track of the number of items in the queue as well as the
55 * number of active workers. There's also a generation count that is used to
56 * figure out when the various values might lap one another.
57 *
58 * The following images show what happens when we have a queue with six items
59 * and whose capacity has been shrunk to six, to better fit in the screen.
60 *
61 *
62 * 1) This is the initial configuration of the queue right before any processing
63 * is done in the context of mergeq_merge(). Every box has an initial item for
64 * merging in it (represented by an 'x'). Here, the mq_nproc, mq_next, and
65 * mq_ncommit will all point at the initial entry. However, the mq_next has
66 * already lapped around the array and thus has a generation count of one.
67 *
68 * The '+' characters indicate which bucket the corresponding value of mq_nproc,
69 * mq_ncommit, and mq_nproc.
70 *
71 * +---++---++---++---++---++---+
72 * | X || X || X || X || X || X |
73 * +---++---++---++---++---++---+
74 * mq_next (g1) +
75 * mq_ncommit (g0) +
76 * mq_nproc (g0) +
77 *
78 * 2) This shows the state right as the first thread begins to process an entry.
79 * Note in this example we will have two threads processing this queue. Note,
80 * mq_ncommit has not advanced. This is because the first thread has started
81 * processing entries, but it has not finished, and thus we can't commit it.
82 * We've incremented mq_next by one because it has gone ahead and assigned a
83 * single entry. We've incremented mq_nproc by two, because we have removed two
84 * entries and thus will have another set available.
85 *
86 * +---++---++---++---++---++---+ t1 - slot 0
87 * | || || X || X || X || X | t2 - idle
88 * +---++---++---++---++---++---+
89 * mq_next (g1) +
90 * mq_ncommit (g0) +
91 * mq_nproc (g0) +
92 *
93 *
94 * 3) This shows the state right after the second thread begins to process an
95 * entry, note that the first thread has not finished. The changes are very
96 * similar to the previous state, we've advanced, mq_nproc and mq_next, but not
97 * mq_ncommit.
98 *
99 * +---++---++---++---++---++---+ t1 - slot 0
100 * | || || || || X || X | t2 - slot 1
101 * +---++---++---++---++---++---+
102 * mq_next (g1) +
103 * mq_ncommit (g0) +
104 * mq_nproc (g0) +
105 *
106 * 4) This shows the state after thread one has finished processing an item, but
107 * before it does anything else. Note that even if thread two finishes early, it
108 * cannot commit its item until thread one finishes. Here 'Y' refers to the
109 * result of merging the first two 'X's.
110 *
111 * +---++---++---++---++---++---+ t1 - idle
112 * | Y || || || || X || X | t2 - slot 1
113 * +---++---++---++---++---++---+
114 * mq_next (g1) +
115 * mq_ncommit (g0) +
116 * mq_nproc (g0) +
117 *
118 * 5) This shows the state after thread one has begun to process the next round
119 * and after thread two has committed, but before it begins processing the next
120 * item. Note that mq_nproc has wrapped around and we've bumped its generation
121 * counter.
122 *
123 * +---++---++---++---++---++---+ t1 - slot 2
124 * | Y || Y || || || || | t2 - idle
125 * +---++---++---++---++---++---+
126 * mq_next (g1) +
127 * mq_ncommit (g0) +
128 * mq_nproc (g0) +
129 *
130 * 6) Here, thread two, will take the next two Y values and thread 1 will commit
131 * its 'Y'. Thread one now must wait until thread two finishes such that it can
132 * do additional work.
133 *
134 * +---++---++---++---++---++---+ t1 - waiting
135 * | || || Y || || || | t2 - slot 3
136 * +---++---++---++---++---++---+
137 * mq_next (g1) +
138 * mq_ncommit (g0) +
139 * mq_nproc (g0) +
140 *
141 * 7) Here, thread two has committed and thread one is about to go process the
142 * final entry. The character 'Z' represents the results of merging two 'Y's.
143 *
144 * +---++---++---++---++---++---+ t1 - idle
145 * | || || Y || Z || || | t2 - idle
146 * +---++---++---++---++---++---+
147 * mq_next (g1) +
148 * mq_ncommit (g0) +
149 * mq_nproc (g0) +
150 *
151 * 8) Here, thread one is processing the final item. Thread two is waiting in
152 * mergeq_pop() for enough items to be available. In this case, it will never
153 * happen; however, once all threads have finished it will break out.
154 *
155 * +---++---++---++---++---++---+ t1 - slot 4
156 * | || || || || || | t2 - idle
157 * +---++---++---++---++---++---+
158 * mq_next (g1) +
159 * mq_ncommit (g0) +
160 * mq_nproc (g0) +
161 *
162 * 9) This is the final state of the queue, it has a single '*' item which is
163 * the final merge result. At this point, both thread one and thread two would
164 * stop processing and we'll return the result to the user.
165 *
166 * +---++---++---++---++---++---+ t1 - slot 4
167 * | || || || || * || | t2 - idle
168 * +---++---++---++---++---++---+
169 * mq_next (g1) +
170 * mq_ncommit (g0) +
171 * mq_nproc (g0) +
172 *
173 *
174 * Note, that if at any point in time the processing function fails, then all
175 * the merges will quiesce and that error will be propagated back to the user.
176 */
177
178 #include <strings.h>
179 #include <sys/debug.h>
180 #include <thread.h>
181 #include <synch.h>
182 #include <errno.h>
183 #include <limits.h>
184 #include <stdlib.h>
185
186 #include "mergeq.h"
187
188 struct mergeq {
189 mutex_t mq_lock; /* Protects items below */
190 cond_t mq_cond; /* Condition variable */
191 void **mq_items; /* Array of items to process */
192 size_t mq_nitems; /* Number of items in the queue */
193 size_t mq_cap; /* Capacity of the items */
194 size_t mq_next; /* Place to put next entry */
195 size_t mq_gnext; /* Generation for next */
196 size_t mq_nproc; /* Index of next thing to process */
197 size_t mq_gnproc; /* Generation for next proc */
198 size_t mq_ncommit; /* Index of the next thing to commit */
199 size_t mq_gncommit; /* Commit generation */
200 uint_t mq_nactthrs; /* Number of active threads */
201 uint_t mq_ndthreads; /* Desired number of threads */
202 thread_t *mq_thrs; /* Actual threads */
203 mergeq_proc_f *mq_func; /* Processing function */
204 void *mq_arg; /* Argument for processing */
205 boolean_t mq_working; /* Are we working on processing */
206 boolean_t mq_iserror; /* Have we encountered an error? */
207 int mq_error;
208 };
209
210 #define MERGEQ_DEFAULT_CAP 64
211
212 static int
mergeq_error(int err)213 mergeq_error(int err)
214 {
215 errno = err;
216 return (MERGEQ_ERROR);
217 }
218
219 void
mergeq_fini(mergeq_t * mqp)220 mergeq_fini(mergeq_t *mqp)
221 {
222 if (mqp == NULL)
223 return;
224
225 VERIFY(mqp->mq_working != B_TRUE);
226
227 if (mqp->mq_items != NULL)
228 mergeq_free(mqp->mq_items, sizeof (void *) * mqp->mq_cap);
229 if (mqp->mq_ndthreads > 0) {
230 mergeq_free(mqp->mq_thrs, sizeof (thread_t) *
231 mqp->mq_ndthreads);
232 }
233 VERIFY0(cond_destroy(&mqp->mq_cond));
234 VERIFY0(mutex_destroy(&mqp->mq_lock));
235 mergeq_free(mqp, sizeof (mergeq_t));
236 }
237
238 int
mergeq_init(mergeq_t ** outp,uint_t nthrs)239 mergeq_init(mergeq_t **outp, uint_t nthrs)
240 {
241 int ret;
242 mergeq_t *mqp;
243
244 mqp = mergeq_alloc(sizeof (mergeq_t));
245 if (mqp == NULL)
246 return (mergeq_error(ENOMEM));
247
248 bzero(mqp, sizeof (mergeq_t));
249 mqp->mq_items = mergeq_alloc(sizeof (void *) * MERGEQ_DEFAULT_CAP);
250 if (mqp->mq_items == NULL) {
251 mergeq_free(mqp, sizeof (mergeq_t));
252 return (mergeq_error(ENOMEM));
253 }
254 bzero(mqp->mq_items, sizeof (void *) * MERGEQ_DEFAULT_CAP);
255
256 mqp->mq_ndthreads = nthrs - 1;
257 if (mqp->mq_ndthreads > 0) {
258 mqp->mq_thrs = mergeq_alloc(sizeof (thread_t) *
259 mqp->mq_ndthreads);
260 if (mqp->mq_thrs == NULL) {
261 mergeq_free(mqp->mq_items, sizeof (void *) *
262 MERGEQ_DEFAULT_CAP);
263 mergeq_free(mqp, sizeof (mergeq_t));
264 return (mergeq_error(ENOMEM));
265 }
266 }
267
268 if ((ret = mutex_init(&mqp->mq_lock, USYNC_THREAD | LOCK_ERRORCHECK,
269 NULL)) != 0) {
270 if (mqp->mq_ndthreads > 0) {
271 mergeq_free(mqp->mq_thrs,
272 sizeof (thread_t) * mqp->mq_ndthreads);
273 }
274 mergeq_free(mqp->mq_items, sizeof (void *) *
275 MERGEQ_DEFAULT_CAP);
276 mergeq_free(mqp, sizeof (mergeq_t));
277 return (mergeq_error(ret));
278 }
279
280 if ((ret = cond_init(&mqp->mq_cond, USYNC_THREAD, NULL)) != 0) {
281 VERIFY0(mutex_destroy(&mqp->mq_lock));
282 if (mqp->mq_ndthreads > 0) {
283 mergeq_free(mqp->mq_thrs,
284 sizeof (thread_t) * mqp->mq_ndthreads);
285 }
286 mergeq_free(mqp->mq_items, sizeof (void *) *
287 MERGEQ_DEFAULT_CAP);
288 mergeq_free(mqp, sizeof (mergeq_t));
289 return (mergeq_error(ret));
290 }
291
292 mqp->mq_cap = MERGEQ_DEFAULT_CAP;
293 *outp = mqp;
294 return (0);
295 }
296
297 static void
mergeq_reset(mergeq_t * mqp)298 mergeq_reset(mergeq_t *mqp)
299 {
300 VERIFY(MUTEX_HELD(&mqp->mq_lock));
301 VERIFY(mqp->mq_working == B_FALSE);
302 if (mqp->mq_cap != 0)
303 bzero(mqp->mq_items, sizeof (void *) * mqp->mq_cap);
304 mqp->mq_nitems = 0;
305 mqp->mq_next = 0;
306 mqp->mq_gnext = 0;
307 mqp->mq_nproc = 0;
308 mqp->mq_gnproc = 0;
309 mqp->mq_ncommit = 0;
310 mqp->mq_gncommit = 0;
311 mqp->mq_func = NULL;
312 mqp->mq_arg = NULL;
313 mqp->mq_iserror = B_FALSE;
314 mqp->mq_error = 0;
315 }
316
317 static int
mergeq_grow(mergeq_t * mqp)318 mergeq_grow(mergeq_t *mqp)
319 {
320 size_t ncap;
321 void **items;
322
323 VERIFY(MUTEX_HELD(&mqp->mq_lock));
324 VERIFY(mqp->mq_working == B_FALSE);
325
326 if (SIZE_MAX - mqp->mq_cap < MERGEQ_DEFAULT_CAP)
327 return (ENOSPC);
328
329 ncap = mqp->mq_cap + MERGEQ_DEFAULT_CAP;
330 items = mergeq_alloc(ncap * sizeof (void *));
331 if (items == NULL)
332 return (ENOMEM);
333
334 bzero(items, ncap * sizeof (void *));
335 bcopy(mqp->mq_items, items, mqp->mq_cap * sizeof (void *));
336 mergeq_free(mqp->mq_items, sizeof (mqp->mq_cap) * sizeof (void *));
337 mqp->mq_items = items;
338 mqp->mq_cap = ncap;
339 return (0);
340 }
341
342 int
mergeq_add(mergeq_t * mqp,void * item)343 mergeq_add(mergeq_t *mqp, void *item)
344 {
345 VERIFY0(mutex_lock(&mqp->mq_lock));
346 if (mqp->mq_working == B_TRUE) {
347 VERIFY0(mutex_unlock(&mqp->mq_lock));
348 return (mergeq_error(ENXIO));
349 }
350
351 if (mqp->mq_next == mqp->mq_cap) {
352 int ret;
353
354 if ((ret = mergeq_grow(mqp)) != 0) {
355 VERIFY0(mutex_unlock(&mqp->mq_lock));
356 return (mergeq_error(ret));
357 }
358 }
359 mqp->mq_items[mqp->mq_next] = item;
360 mqp->mq_next++;
361 mqp->mq_nitems++;
362
363 VERIFY0(mutex_unlock(&mqp->mq_lock));
364 return (0);
365 }
366
367 static size_t
mergeq_slot(mergeq_t * mqp)368 mergeq_slot(mergeq_t *mqp)
369 {
370 size_t s;
371
372 VERIFY(MUTEX_HELD(&mqp->mq_lock));
373 VERIFY(mqp->mq_next < mqp->mq_cap);
374
375 /*
376 * This probably should be a cv / wait thing.
377 */
378 VERIFY(mqp->mq_nproc != (mqp->mq_next + 1) % mqp->mq_cap);
379
380 s = mqp->mq_next;
381 mqp->mq_next++;
382 if (mqp->mq_next == mqp->mq_cap) {
383 mqp->mq_next %= mqp->mq_cap;
384 mqp->mq_gnext++;
385 }
386
387 return (s);
388 }
389
390 /*
391 * Internal function to push items onto the queue which is now a circular
392 * buffer. This should only be used once we begin working on the queue.
393 */
394 static void
mergeq_push(mergeq_t * mqp,size_t slot,void * item)395 mergeq_push(mergeq_t *mqp, size_t slot, void *item)
396 {
397 VERIFY(MUTEX_HELD(&mqp->mq_lock));
398 VERIFY(slot < mqp->mq_cap);
399
400 /*
401 * We need to verify that we don't push over something that exists.
402 * Based on the design, this should never happen. However, in the face
403 * of bugs, anything is possible.
404 */
405 while (mqp->mq_ncommit != slot && mqp->mq_iserror == B_FALSE)
406 (void) cond_wait(&mqp->mq_cond, &mqp->mq_lock);
407
408 if (mqp->mq_iserror == B_TRUE)
409 return;
410
411 mqp->mq_items[slot] = item;
412 mqp->mq_nitems++;
413 mqp->mq_ncommit++;
414 if (mqp->mq_ncommit == mqp->mq_cap) {
415 mqp->mq_ncommit %= mqp->mq_cap;
416 mqp->mq_gncommit++;
417 }
418 (void) cond_broadcast(&mqp->mq_cond);
419 }
420
421 static void *
mergeq_pop_one(mergeq_t * mqp)422 mergeq_pop_one(mergeq_t *mqp)
423 {
424 void *out;
425
426 /*
427 * We can't move mq_nproc beyond mq_next if they're on the same
428 * generation.
429 */
430 VERIFY(mqp->mq_gnext != mqp->mq_gnproc ||
431 mqp->mq_nproc != mqp->mq_next);
432
433 out = mqp->mq_items[mqp->mq_nproc];
434
435 mqp->mq_items[mqp->mq_nproc] = NULL;
436 mqp->mq_nproc++;
437 if (mqp->mq_nproc == mqp->mq_cap) {
438 mqp->mq_nproc %= mqp->mq_cap;
439 mqp->mq_gnproc++;
440 }
441 mqp->mq_nitems--;
442
443 return (out);
444 }
445
446 /*
447 * Pop a set of two entries from the queue. We may not have anything to process
448 * at the moment, eg. be waiting for someone to add something. In which case,
449 * we'll be sitting and waiting.
450 */
451 static boolean_t
mergeq_pop(mergeq_t * mqp,void ** first,void ** second)452 mergeq_pop(mergeq_t *mqp, void **first, void **second)
453 {
454 VERIFY(MUTEX_HELD(&mqp->mq_lock));
455 VERIFY(mqp->mq_nproc < mqp->mq_cap);
456
457 while (mqp->mq_nitems < 2 && mqp->mq_nactthrs > 0 &&
458 mqp->mq_iserror == B_FALSE)
459 (void) cond_wait(&mqp->mq_cond, &mqp->mq_lock);
460
461 if (mqp->mq_iserror == B_TRUE)
462 return (B_FALSE);
463
464 if (mqp->mq_nitems < 2 && mqp->mq_nactthrs == 0) {
465 VERIFY(mqp->mq_iserror == B_TRUE || mqp->mq_nitems == 1);
466 return (B_FALSE);
467 }
468 VERIFY(mqp->mq_nitems >= 2);
469
470 *first = mergeq_pop_one(mqp);
471 *second = mergeq_pop_one(mqp);
472
473 return (B_TRUE);
474 }
475
476 static void *
mergeq_thr_merge(void * arg)477 mergeq_thr_merge(void *arg)
478 {
479 mergeq_t *mqp = arg;
480
481 VERIFY0(mutex_lock(&mqp->mq_lock));
482
483 /*
484 * Check to make sure creation worked and if not, fail fast.
485 */
486 if (mqp->mq_iserror == B_TRUE) {
487 VERIFY0(mutex_unlock(&mqp->mq_lock));
488 return (NULL);
489 }
490
491 for (;;) {
492 void *first, *second, *out;
493 int ret;
494 size_t slot;
495
496 if (mqp->mq_nitems == 1 && mqp->mq_nactthrs == 0) {
497 VERIFY0(mutex_unlock(&mqp->mq_lock));
498 return (NULL);
499 }
500
501 if (mergeq_pop(mqp, &first, &second) == B_FALSE) {
502 VERIFY0(mutex_unlock(&mqp->mq_lock));
503 return (NULL);
504 }
505 slot = mergeq_slot(mqp);
506
507 mqp->mq_nactthrs++;
508
509 VERIFY0(mutex_unlock(&mqp->mq_lock));
510 ret = mqp->mq_func(first, second, &out, mqp->mq_arg);
511 VERIFY0(mutex_lock(&mqp->mq_lock));
512
513 if (ret != 0) {
514 if (mqp->mq_iserror == B_FALSE) {
515 mqp->mq_iserror = B_TRUE;
516 mqp->mq_error = ret;
517 (void) cond_broadcast(&mqp->mq_cond);
518 }
519 mqp->mq_nactthrs--;
520 VERIFY0(mutex_unlock(&mqp->mq_lock));
521 return (NULL);
522 }
523 mergeq_push(mqp, slot, out);
524 mqp->mq_nactthrs--;
525 }
526 }
527
528 int
mergeq_merge(mergeq_t * mqp,mergeq_proc_f * func,void * arg,void ** outp,int * errp)529 mergeq_merge(mergeq_t *mqp, mergeq_proc_f *func, void *arg, void **outp,
530 int *errp)
531 {
532 int ret, i;
533 boolean_t seterr = B_FALSE;
534
535 if (mqp == NULL || func == NULL || outp == NULL) {
536 return (mergeq_error(EINVAL));
537 }
538
539 VERIFY0(mutex_lock(&mqp->mq_lock));
540 if (mqp->mq_working == B_TRUE) {
541 VERIFY0(mutex_unlock(&mqp->mq_lock));
542 return (mergeq_error(EBUSY));
543 }
544
545 if (mqp->mq_nitems == 0) {
546 *outp = NULL;
547 mergeq_reset(mqp);
548 VERIFY0(mutex_unlock(&mqp->mq_lock));
549 return (0);
550 }
551
552 /*
553 * Now that we've finished adding items to the queue, turn it into a
554 * circular buffer.
555 */
556 mqp->mq_func = func;
557 mqp->mq_arg = arg;
558 mqp->mq_nproc = 0;
559 mqp->mq_working = B_TRUE;
560 if (mqp->mq_next == mqp->mq_cap) {
561 mqp->mq_next %= mqp->mq_cap;
562 mqp->mq_gnext++;
563 }
564 mqp->mq_ncommit = mqp->mq_next;
565
566 ret = 0;
567 for (i = 0; i < mqp->mq_ndthreads; i++) {
568 ret = thr_create(NULL, 0, mergeq_thr_merge, mqp, 0,
569 &mqp->mq_thrs[i]);
570 if (ret != 0) {
571 mqp->mq_iserror = B_TRUE;
572 break;
573 }
574 }
575
576 VERIFY0(mutex_unlock(&mqp->mq_lock));
577 if (ret == 0)
578 (void) mergeq_thr_merge(mqp);
579
580 for (i = 0; i < mqp->mq_ndthreads; i++) {
581 VERIFY0(thr_join(mqp->mq_thrs[i], NULL, NULL));
582 }
583
584 VERIFY0(mutex_lock(&mqp->mq_lock));
585
586 VERIFY(mqp->mq_nactthrs == 0);
587 mqp->mq_working = B_FALSE;
588 if (ret == 0 && mqp->mq_iserror == B_FALSE) {
589 VERIFY(mqp->mq_nitems == 1);
590 *outp = mergeq_pop_one(mqp);
591 } else if (ret == 0 && mqp->mq_iserror == B_TRUE) {
592 ret = MERGEQ_UERROR;
593 if (errp != NULL)
594 *errp = mqp->mq_error;
595 } else {
596 seterr = B_TRUE;
597 }
598
599 mergeq_reset(mqp);
600 VERIFY0(mutex_unlock(&mqp->mq_lock));
601
602 if (seterr == B_TRUE)
603 return (mergeq_error(ret));
604
605 return (ret);
606 }
607