1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21 /*
22 * Copyright 2009 Sun Microsystems, Inc. All rights reserved.
23 * Use is subject to license terms.
24 */
25
26 #include <sys/zfs_context.h>
27 #include <sys/txg_impl.h>
28 #include <sys/dmu_impl.h>
29 #include <sys/dmu_tx.h>
30 #include <sys/dsl_pool.h>
31 #include <sys/callb.h>
32
33 /*
34 * Pool-wide transaction groups.
35 */
36
37 static void txg_sync_thread(void *);
38 static void txg_quiesce_thread(void *);
39
40 int zfs_txg_timeout = 30; /* max seconds worth of delta per txg */
41
42 /*
43 * Prepare the txg subsystem.
44 */
45 void
txg_init(dsl_pool_t * dp,uint64_t txg)46 txg_init(dsl_pool_t *dp, uint64_t txg)
47 {
48 tx_state_t *tx = &dp->dp_tx;
49 int c;
50 bzero(tx, sizeof (tx_state_t));
51
52 tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
53
54 for (c = 0; c < max_ncpus; c++) {
55 int i;
56
57 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
58 for (i = 0; i < TXG_SIZE; i++) {
59 cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
60 NULL);
61 list_create(&tx->tx_cpu[c].tc_callbacks[i],
62 sizeof (dmu_tx_callback_t),
63 offsetof(dmu_tx_callback_t, dcb_node));
64 }
65 }
66
67 mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
68
69 cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL);
70 cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL);
71 cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL);
72 cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL);
73 cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL);
74
75 tx->tx_open_txg = txg;
76 }
77
78 /*
79 * Close down the txg subsystem.
80 */
81 void
txg_fini(dsl_pool_t * dp)82 txg_fini(dsl_pool_t *dp)
83 {
84 tx_state_t *tx = &dp->dp_tx;
85 int c;
86
87 ASSERT(tx->tx_threads == 0);
88
89 mutex_destroy(&tx->tx_sync_lock);
90
91 cv_destroy(&tx->tx_sync_more_cv);
92 cv_destroy(&tx->tx_sync_done_cv);
93 cv_destroy(&tx->tx_quiesce_more_cv);
94 cv_destroy(&tx->tx_quiesce_done_cv);
95 cv_destroy(&tx->tx_exit_cv);
96
97 for (c = 0; c < max_ncpus; c++) {
98 int i;
99
100 mutex_destroy(&tx->tx_cpu[c].tc_lock);
101 for (i = 0; i < TXG_SIZE; i++) {
102 cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
103 list_destroy(&tx->tx_cpu[c].tc_callbacks[i]);
104 }
105 }
106
107 if (tx->tx_commit_cb_taskq != NULL)
108 taskq_destroy(tx->tx_commit_cb_taskq);
109
110 kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
111
112 bzero(tx, sizeof (tx_state_t));
113 }
114
115 /*
116 * Start syncing transaction groups.
117 */
118 void
txg_sync_start(dsl_pool_t * dp)119 txg_sync_start(dsl_pool_t *dp)
120 {
121 tx_state_t *tx = &dp->dp_tx;
122
123 mutex_enter(&tx->tx_sync_lock);
124
125 dprintf("pool %p\n", dp);
126
127 ASSERT(tx->tx_threads == 0);
128
129 tx->tx_threads = 2;
130
131 tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
132 dp, 0, &p0, TS_RUN, minclsyspri);
133
134 /*
135 * The sync thread can need a larger-than-default stack size on
136 * 32-bit x86. This is due in part to nested pools and
137 * scrub_visitbp() recursion.
138 */
139 tx->tx_sync_thread = thread_create(NULL, 12<<10, txg_sync_thread,
140 dp, 0, &p0, TS_RUN, minclsyspri);
141
142 mutex_exit(&tx->tx_sync_lock);
143 }
144
145 static void
txg_thread_enter(tx_state_t * tx,callb_cpr_t * cpr)146 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
147 {
148 CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
149 mutex_enter(&tx->tx_sync_lock);
150 }
151
152 static void
txg_thread_exit(tx_state_t * tx,callb_cpr_t * cpr,kthread_t ** tpp)153 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
154 {
155 ASSERT(*tpp != NULL);
156 *tpp = NULL;
157 tx->tx_threads--;
158 cv_broadcast(&tx->tx_exit_cv);
159 CALLB_CPR_EXIT(cpr); /* drops &tx->tx_sync_lock */
160 thread_exit();
161 }
162
163 static void
txg_thread_wait(tx_state_t * tx,callb_cpr_t * cpr,kcondvar_t * cv,uint64_t time)164 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time)
165 {
166 CALLB_CPR_SAFE_BEGIN(cpr);
167
168 if (time)
169 (void) cv_timedwait(cv, &tx->tx_sync_lock, time);
170 else
171 cv_wait(cv, &tx->tx_sync_lock);
172
173 CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
174 }
175
176 /*
177 * Stop syncing transaction groups.
178 */
179 void
txg_sync_stop(dsl_pool_t * dp)180 txg_sync_stop(dsl_pool_t *dp)
181 {
182 tx_state_t *tx = &dp->dp_tx;
183
184 dprintf("pool %p\n", dp);
185 /*
186 * Finish off any work in progress.
187 */
188 ASSERT(tx->tx_threads == 2);
189
190 /*
191 * We need to ensure that we've vacated the deferred space_maps.
192 */
193 txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE);
194
195 /*
196 * Wake all sync threads and wait for them to die.
197 */
198 mutex_enter(&tx->tx_sync_lock);
199
200 ASSERT(tx->tx_threads == 2);
201
202 tx->tx_exiting = 1;
203
204 cv_broadcast(&tx->tx_quiesce_more_cv);
205 cv_broadcast(&tx->tx_quiesce_done_cv);
206 cv_broadcast(&tx->tx_sync_more_cv);
207
208 while (tx->tx_threads != 0)
209 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
210
211 tx->tx_exiting = 0;
212
213 mutex_exit(&tx->tx_sync_lock);
214 }
215
216 uint64_t
txg_hold_open(dsl_pool_t * dp,txg_handle_t * th)217 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
218 {
219 tx_state_t *tx = &dp->dp_tx;
220 tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
221 uint64_t txg;
222
223 mutex_enter(&tc->tc_lock);
224
225 txg = tx->tx_open_txg;
226 tc->tc_count[txg & TXG_MASK]++;
227
228 th->th_cpu = tc;
229 th->th_txg = txg;
230
231 return (txg);
232 }
233
234 void
txg_rele_to_quiesce(txg_handle_t * th)235 txg_rele_to_quiesce(txg_handle_t *th)
236 {
237 tx_cpu_t *tc = th->th_cpu;
238
239 mutex_exit(&tc->tc_lock);
240 }
241
242 void
txg_register_callbacks(txg_handle_t * th,list_t * tx_callbacks)243 txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks)
244 {
245 tx_cpu_t *tc = th->th_cpu;
246 int g = th->th_txg & TXG_MASK;
247
248 mutex_enter(&tc->tc_lock);
249 list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
250 mutex_exit(&tc->tc_lock);
251 }
252
253 void
txg_rele_to_sync(txg_handle_t * th)254 txg_rele_to_sync(txg_handle_t *th)
255 {
256 tx_cpu_t *tc = th->th_cpu;
257 int g = th->th_txg & TXG_MASK;
258
259 mutex_enter(&tc->tc_lock);
260 ASSERT(tc->tc_count[g] != 0);
261 if (--tc->tc_count[g] == 0)
262 cv_broadcast(&tc->tc_cv[g]);
263 mutex_exit(&tc->tc_lock);
264
265 th->th_cpu = NULL; /* defensive */
266 }
267
268 static void
txg_quiesce(dsl_pool_t * dp,uint64_t txg)269 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
270 {
271 tx_state_t *tx = &dp->dp_tx;
272 int g = txg & TXG_MASK;
273 int c;
274
275 /*
276 * Grab all tx_cpu locks so nobody else can get into this txg.
277 */
278 for (c = 0; c < max_ncpus; c++)
279 mutex_enter(&tx->tx_cpu[c].tc_lock);
280
281 ASSERT(txg == tx->tx_open_txg);
282 tx->tx_open_txg++;
283
284 /*
285 * Now that we've incremented tx_open_txg, we can let threads
286 * enter the next transaction group.
287 */
288 for (c = 0; c < max_ncpus; c++)
289 mutex_exit(&tx->tx_cpu[c].tc_lock);
290
291 /*
292 * Quiesce the transaction group by waiting for everyone to txg_exit().
293 */
294 for (c = 0; c < max_ncpus; c++) {
295 tx_cpu_t *tc = &tx->tx_cpu[c];
296 mutex_enter(&tc->tc_lock);
297 while (tc->tc_count[g] != 0)
298 cv_wait(&tc->tc_cv[g], &tc->tc_lock);
299 mutex_exit(&tc->tc_lock);
300 }
301 }
302
303 static void
txg_do_callbacks(list_t * cb_list)304 txg_do_callbacks(list_t *cb_list)
305 {
306 dmu_tx_do_callbacks(cb_list, 0);
307
308 list_destroy(cb_list);
309
310 kmem_free(cb_list, sizeof (list_t));
311 }
312
313 /*
314 * Dispatch the commit callbacks registered on this txg to worker threads.
315 */
316 static void
txg_dispatch_callbacks(dsl_pool_t * dp,uint64_t txg)317 txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
318 {
319 int c;
320 tx_state_t *tx = &dp->dp_tx;
321 list_t *cb_list;
322
323 for (c = 0; c < max_ncpus; c++) {
324 tx_cpu_t *tc = &tx->tx_cpu[c];
325 /* No need to lock tx_cpu_t at this point */
326
327 int g = txg & TXG_MASK;
328
329 if (list_is_empty(&tc->tc_callbacks[g]))
330 continue;
331
332 if (tx->tx_commit_cb_taskq == NULL) {
333 /*
334 * Commit callback taskq hasn't been created yet.
335 */
336 tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
337 max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2,
338 TASKQ_PREPOPULATE);
339 }
340
341 cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP);
342 list_create(cb_list, sizeof (dmu_tx_callback_t),
343 offsetof(dmu_tx_callback_t, dcb_node));
344
345 list_move_tail(&tc->tc_callbacks[g], cb_list);
346
347 (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *)
348 txg_do_callbacks, cb_list, TQ_SLEEP);
349 }
350 }
351
352 static void
txg_sync_thread(void * arg)353 txg_sync_thread(void *arg)
354 {
355 dsl_pool_t *dp = arg;
356 spa_t *spa = dp->dp_spa;
357 tx_state_t *tx = &dp->dp_tx;
358 callb_cpr_t cpr;
359 uint64_t start, delta;
360
361 txg_thread_enter(tx, &cpr);
362 dprintf("txg_sync_thread called\n");
363 start = delta = 0;
364 for (;;) {
365 uint64_t timer, timeout = zfs_txg_timeout * hz;
366 uint64_t txg;
367 dprintf("txg_sync_thread thread for\n");
368 /*
369 * We sync when we're scrubbing, there's someone waiting
370 * on us, or the quiesce thread has handed off a txg to
371 * us, or we have reached our timeout.
372 */
373 timer = (delta >= timeout ? 0 : timeout - delta);
374 while ((dp->dp_scrub_func == SCRUB_FUNC_NONE ||
375 spa_load_state(spa) != SPA_LOAD_NONE ||
376 spa_shutting_down(spa)) &&
377 !tx->tx_exiting && timer > 0 &&
378 tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
379 tx->tx_quiesced_txg == 0) {
380 dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
381 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
382 txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
383 delta = ddi_get_lbolt() - start;
384 timer = (delta > timeout ? 0 : timeout - delta);
385 }
386
387 /*
388 * Wait until the quiesce thread hands off a txg to us,
389 * prompting it to do so if necessary.
390 */
391 while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
392 if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
393 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
394 cv_broadcast(&tx->tx_quiesce_more_cv);
395 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
396 }
397
398 if (tx->tx_exiting)
399 txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
400
401 /*
402 * Consume the quiesced txg which has been handed off to
403 * us. This may cause the quiescing thread to now be
404 * able to quiesce another txg, so we must signal it.
405 */
406 txg = tx->tx_quiesced_txg;
407 tx->tx_quiesced_txg = 0;
408 tx->tx_syncing_txg = txg;
409 cv_broadcast(&tx->tx_quiesce_more_cv);
410
411 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
412 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
413 mutex_exit(&tx->tx_sync_lock);
414
415 start = ddi_get_lbolt();
416 spa_sync(spa, txg);
417 delta = ddi_get_lbolt() - start;
418
419 mutex_enter(&tx->tx_sync_lock);
420 tx->tx_synced_txg = txg;
421 tx->tx_syncing_txg = 0;
422 cv_broadcast(&tx->tx_sync_done_cv);
423
424 /*
425 * Dispatch commit callbacks to worker threads.
426 */
427 txg_dispatch_callbacks(dp, txg);
428 }
429 }
430
431 static void
txg_quiesce_thread(void * arg)432 txg_quiesce_thread(void *arg)
433 {
434 dsl_pool_t *dp = arg;
435 tx_state_t *tx = &dp->dp_tx;
436 callb_cpr_t cpr;
437
438 txg_thread_enter(tx, &cpr);
439
440 for (;;) {
441 uint64_t txg;
442
443 /*
444 * We quiesce when there's someone waiting on us.
445 * However, we can only have one txg in "quiescing" or
446 * "quiesced, waiting to sync" state. So we wait until
447 * the "quiesced, waiting to sync" txg has been consumed
448 * by the sync thread.
449 */
450 while (!tx->tx_exiting &&
451 (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
452 tx->tx_quiesced_txg != 0))
453 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
454
455 if (tx->tx_exiting)
456 txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
457
458 txg = tx->tx_open_txg;
459 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
460 txg, tx->tx_quiesce_txg_waiting,
461 tx->tx_sync_txg_waiting);
462 mutex_exit(&tx->tx_sync_lock);
463 txg_quiesce(dp, txg);
464 mutex_enter(&tx->tx_sync_lock);
465
466 /*
467 * Hand this txg off to the sync thread.
468 */
469 dprintf("quiesce done, handing off txg %llu\n", txg);
470 tx->tx_quiesced_txg = txg;
471 cv_broadcast(&tx->tx_sync_more_cv);
472 cv_broadcast(&tx->tx_quiesce_done_cv);
473 }
474 }
475
476 /*
477 * Delay this thread by 'ticks' if we are still in the open transaction
478 * group and there is already a waiting txg quiesing or quiesced. Abort
479 * the delay if this txg stalls or enters the quiesing state.
480 */
481 void
txg_delay(dsl_pool_t * dp,uint64_t txg,int ticks)482 txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks)
483 {
484 tx_state_t *tx = &dp->dp_tx;
485 int timeout = ddi_get_lbolt() + ticks;
486
487 /* don't delay if this txg could transition to quiesing immediately */
488 if (tx->tx_open_txg > txg ||
489 tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
490 return;
491
492 mutex_enter(&tx->tx_sync_lock);
493 if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
494 mutex_exit(&tx->tx_sync_lock);
495 return;
496 }
497
498 while (ddi_get_lbolt() < timeout &&
499 tx->tx_syncing_txg < txg-1 && !txg_stalled(dp))
500 (void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock,
501 timeout);
502
503 mutex_exit(&tx->tx_sync_lock);
504 }
505
506 void
txg_wait_synced(dsl_pool_t * dp,uint64_t txg)507 txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
508 {
509 tx_state_t *tx = &dp->dp_tx;
510
511 mutex_enter(&tx->tx_sync_lock);
512 ASSERT(tx->tx_threads == 2);
513 if (txg == 0)
514 txg = tx->tx_open_txg + TXG_DEFER_SIZE;
515 if (tx->tx_sync_txg_waiting < txg)
516 tx->tx_sync_txg_waiting = txg;
517 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
518 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
519 while (tx->tx_synced_txg < txg) {
520 dprintf("broadcasting sync more "
521 "tx_synced=%llu waiting=%llu dp=%p\n",
522 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
523 cv_broadcast(&tx->tx_sync_more_cv);
524 cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
525 }
526 mutex_exit(&tx->tx_sync_lock);
527 }
528
529 void
txg_wait_open(dsl_pool_t * dp,uint64_t txg)530 txg_wait_open(dsl_pool_t *dp, uint64_t txg)
531 {
532 tx_state_t *tx = &dp->dp_tx;
533
534 mutex_enter(&tx->tx_sync_lock);
535 ASSERT(tx->tx_threads == 2);
536 if (txg == 0)
537 txg = tx->tx_open_txg + 1;
538 if (tx->tx_quiesce_txg_waiting < txg)
539 tx->tx_quiesce_txg_waiting = txg;
540 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
541 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
542 while (tx->tx_open_txg < txg) {
543 cv_broadcast(&tx->tx_quiesce_more_cv);
544 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
545 }
546 mutex_exit(&tx->tx_sync_lock);
547 }
548
549 boolean_t
txg_stalled(dsl_pool_t * dp)550 txg_stalled(dsl_pool_t *dp)
551 {
552 tx_state_t *tx = &dp->dp_tx;
553 return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
554 }
555
556 boolean_t
txg_sync_waiting(dsl_pool_t * dp)557 txg_sync_waiting(dsl_pool_t *dp)
558 {
559 tx_state_t *tx = &dp->dp_tx;
560
561 return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
562 tx->tx_quiesced_txg != 0);
563 }
564
565 /*
566 * Per-txg object lists.
567 */
568 void
txg_list_create(txg_list_t * tl,size_t offset)569 txg_list_create(txg_list_t *tl, size_t offset)
570 {
571 int t;
572
573 mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
574
575 tl->tl_offset = offset;
576
577 for (t = 0; t < TXG_SIZE; t++)
578 tl->tl_head[t] = NULL;
579 }
580
581 void
txg_list_destroy(txg_list_t * tl)582 txg_list_destroy(txg_list_t *tl)
583 {
584 int t;
585
586 for (t = 0; t < TXG_SIZE; t++)
587 ASSERT(txg_list_empty(tl, t));
588
589 mutex_destroy(&tl->tl_lock);
590 }
591
592 int
txg_list_empty(txg_list_t * tl,uint64_t txg)593 txg_list_empty(txg_list_t *tl, uint64_t txg)
594 {
595 return (tl->tl_head[txg & TXG_MASK] == NULL);
596 }
597
598 /*
599 * Add an entry to the list.
600 * Returns 0 if it's a new entry, 1 if it's already there.
601 */
602 int
txg_list_add(txg_list_t * tl,void * p,uint64_t txg)603 txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
604 {
605 int t = txg & TXG_MASK;
606 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
607 int already_on_list;
608
609 mutex_enter(&tl->tl_lock);
610 already_on_list = tn->tn_member[t];
611 if (!already_on_list) {
612 tn->tn_member[t] = 1;
613 tn->tn_next[t] = tl->tl_head[t];
614 tl->tl_head[t] = tn;
615 }
616 mutex_exit(&tl->tl_lock);
617
618 return (already_on_list);
619 }
620
621 /*
622 * Remove the head of the list and return it.
623 */
624 void *
txg_list_remove(txg_list_t * tl,uint64_t txg)625 txg_list_remove(txg_list_t *tl, uint64_t txg)
626 {
627 int t = txg & TXG_MASK;
628 txg_node_t *tn;
629 void *p = NULL;
630
631 mutex_enter(&tl->tl_lock);
632 if ((tn = tl->tl_head[t]) != NULL) {
633 p = (char *)tn - tl->tl_offset;
634 tl->tl_head[t] = tn->tn_next[t];
635 tn->tn_next[t] = NULL;
636 tn->tn_member[t] = 0;
637 }
638 mutex_exit(&tl->tl_lock);
639
640 return (p);
641 }
642
643 /*
644 * Remove a specific item from the list and return it.
645 */
646 void *
txg_list_remove_this(txg_list_t * tl,void * p,uint64_t txg)647 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
648 {
649 int t = txg & TXG_MASK;
650 txg_node_t *tn, **tp;
651
652 mutex_enter(&tl->tl_lock);
653
654 for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
655 if ((char *)tn - tl->tl_offset == p) {
656 *tp = tn->tn_next[t];
657 tn->tn_next[t] = NULL;
658 tn->tn_member[t] = 0;
659 mutex_exit(&tl->tl_lock);
660 return (p);
661 }
662 }
663
664 mutex_exit(&tl->tl_lock);
665
666 return (NULL);
667 }
668
669 int
txg_list_member(txg_list_t * tl,void * p,uint64_t txg)670 txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
671 {
672 int t = txg & TXG_MASK;
673 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
674
675 return (tn->tn_member[t]);
676 }
677
678 /*
679 * Walk a txg list -- only safe if you know it's not changing.
680 */
681 void *
txg_list_head(txg_list_t * tl,uint64_t txg)682 txg_list_head(txg_list_t *tl, uint64_t txg)
683 {
684 int t = txg & TXG_MASK;
685 txg_node_t *tn = tl->tl_head[t];
686
687 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
688 }
689
690 void *
txg_list_next(txg_list_t * tl,void * p,uint64_t txg)691 txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
692 {
693 int t = txg & TXG_MASK;
694 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
695
696 tn = tn->tn_next[t];
697
698 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
699 }
700