xref: /netbsd/external/cddl/osnet/dist/uts/common/fs/zfs/zio.c (revision 6550d01e)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #include <sys/zfs_context.h>
27 #include <sys/fm/fs/zfs.h>
28 #include <sys/spa.h>
29 #include <sys/txg.h>
30 #include <sys/spa_impl.h>
31 #include <sys/vdev_impl.h>
32 #include <sys/zio_impl.h>
33 #include <sys/zio_compress.h>
34 #include <sys/zio_checksum.h>
35 #include <sys/dmu_objset.h>
36 #include <sys/arc.h>
37 #include <sys/ddt.h>
38 
39 /*
40  * ==========================================================================
41  * I/O priority table
42  * ==========================================================================
43  */
44 uint8_t zio_priority_table[ZIO_PRIORITY_TABLE_SIZE] = {
45 	0,	/* ZIO_PRIORITY_NOW		*/
46 	0,	/* ZIO_PRIORITY_SYNC_READ	*/
47 	0,	/* ZIO_PRIORITY_SYNC_WRITE	*/
48 	0,	/* ZIO_PRIORITY_LOG_WRITE	*/
49 	1,	/* ZIO_PRIORITY_CACHE_FILL	*/
50 	1,	/* ZIO_PRIORITY_AGG		*/
51 	4,	/* ZIO_PRIORITY_FREE		*/
52 	4,	/* ZIO_PRIORITY_ASYNC_WRITE	*/
53 	6,	/* ZIO_PRIORITY_ASYNC_READ	*/
54 	10,	/* ZIO_PRIORITY_RESILVER	*/
55 	20,	/* ZIO_PRIORITY_SCRUB		*/
56 };
57 
58 /*
59  * ==========================================================================
60  * I/O type descriptions
61  * ==========================================================================
62  */
63 char *zio_type_name[ZIO_TYPES] = {
64 	"zio_null", "zio_read", "zio_write", "zio_free", "zio_claim",
65 	"zio_ioctl"
66 };
67 
68 /*
69  * ==========================================================================
70  * I/O kmem caches
71  * ==========================================================================
72  */
73 kmem_cache_t *zio_cache;
74 kmem_cache_t *zio_link_cache;
75 kmem_cache_t *zio_buf_cache[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
76 kmem_cache_t *zio_data_buf_cache[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
77 
78 #if defined(_KERNEL) && !defined(__NetBSD__)
79 extern vmem_t *zio_alloc_arena;
80 #endif
81 
82 /*
83  * An allocating zio is one that either currently has the DVA allocate
84  * stage set or will have it later in its lifetime.
85  */
86 #define	IO_IS_ALLOCATING(zio) ((zio)->io_orig_pipeline & ZIO_STAGE_DVA_ALLOCATE)
87 
88 boolean_t	zio_requeue_io_start_cut_in_line = B_TRUE;
89 
90 #ifdef ZFS_DEBUG
91 int zio_buf_debug_limit = 16384;
92 #else
93 int zio_buf_debug_limit = 0;
94 #endif
95 
96 void
97 zio_init(void)
98 {
99 	size_t c;
100 	vmem_t *data_alloc_arena = NULL;
101 
102 #if defined(_KERNEL) && !defined(__NetBSD__)
103 	data_alloc_arena = zio_alloc_arena;
104 #endif
105 	zio_cache = kmem_cache_create("zio_cache",
106 	    sizeof (zio_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
107 	zio_link_cache = kmem_cache_create("zio_link_cache",
108 	    sizeof (zio_link_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
109 
110 #ifndef __NetBSD__
111 	/*
112 	 * For small buffers, we want a cache for each multiple of
113 	 * SPA_MINBLOCKSIZE.  For medium-size buffers, we want a cache
114 	 * for each quarter-power of 2.  For large buffers, we want
115 	 * a cache for each multiple of PAGESIZE.
116 	 */
117 	for (c = 0; c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT; c++) {
118 		size_t size = (c + 1) << SPA_MINBLOCKSHIFT;
119 		size_t p2 = size;
120 		size_t align = 0;
121 
122 		while (p2 & (p2 - 1))
123 			p2 &= p2 - 1;
124 
125 		if (size <= 4 * SPA_MINBLOCKSIZE) {
126 			align = SPA_MINBLOCKSIZE;
127 		} else if (P2PHASE(size, PAGESIZE) == 0) {
128 			align = PAGESIZE;
129 		} else if (P2PHASE(size, p2 >> 2) == 0) {
130 			align = p2 >> 2;
131 		}
132 
133 		if (align != 0) {
134 			char name[36];
135 			(void) sprintf(name, "zio_buf_%lu", (ulong_t)size);
136 			zio_buf_cache[c] = kmem_cache_create(name, size,
137 			    align, NULL, NULL, NULL, NULL, NULL,
138 			    size > zio_buf_debug_limit ? KMC_NODEBUG : 0);
139 
140 			(void) sprintf(name, "zio_data_buf_%lu", (ulong_t)size);
141 			zio_data_buf_cache[c] = kmem_cache_create(name, size,
142 			    align, NULL, NULL, NULL, NULL, data_alloc_arena,
143 			    size > zio_buf_debug_limit ? KMC_NODEBUG : 0);
144 		}
145 	}
146 
147 	while (--c != 0) {
148 		ASSERT(zio_buf_cache[c] != NULL);
149 		if (zio_buf_cache[c - 1] == NULL)
150 			zio_buf_cache[c - 1] = zio_buf_cache[c];
151 
152 		ASSERT(zio_data_buf_cache[c] != NULL);
153 		if (zio_data_buf_cache[c - 1] == NULL)
154 			zio_data_buf_cache[c - 1] = zio_data_buf_cache[c];
155 	}
156 #endif /* __NetBSD__ */
157 	zio_inject_init();
158 }
159 
160 void
161 zio_fini(void)
162 {
163 	size_t c;
164 	kmem_cache_t *last_cache = NULL;
165 	kmem_cache_t *last_data_cache = NULL;
166 
167 #ifndef __NetBSD__
168 	for (c = 0; c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT; c++) {
169 		if (zio_buf_cache[c] != last_cache) {
170 			last_cache = zio_buf_cache[c];
171 			kmem_cache_destroy(zio_buf_cache[c]);
172 		}
173 		zio_buf_cache[c] = NULL;
174 
175 		if (zio_data_buf_cache[c] != last_data_cache) {
176 			last_data_cache = zio_data_buf_cache[c];
177 			kmem_cache_destroy(zio_data_buf_cache[c]);
178 		}
179 		zio_data_buf_cache[c] = NULL;
180 	}
181 #endif /* __NetBSD__ */
182 
183 	kmem_cache_destroy(zio_link_cache);
184 	kmem_cache_destroy(zio_cache);
185 
186 	zio_inject_fini();
187 }
188 
189 /*
190  * ==========================================================================
191  * Allocate and free I/O buffers
192  * ==========================================================================
193  */
194 
195 /*
196  * Use zio_buf_alloc to allocate ZFS metadata.  This data will appear in a
197  * crashdump if the kernel panics, so use it judiciously.  Obviously, it's
198  * useful to inspect ZFS metadata, but if possible, we should avoid keeping
199  * excess / transient data in-core during a crashdump.
200  */
201 void *
202 zio_buf_alloc(size_t size)
203 {
204 	size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
205 	ASSERT(c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
206 #ifdef __NetBSD__
207 	return (kmem_alloc(size, KM_SLEEP));
208 #else
209 	return (kmem_cache_alloc(zio_buf_cache[c], KM_PUSHPAGE));
210 #endif
211 }
212 
213 /*
214  * Use zio_data_buf_alloc to allocate data.  The data will not appear in a
215  * crashdump if the kernel panics.  This exists so that we will limit the amount
216  * of ZFS data that shows up in a kernel crashdump.  (Thus reducing the amount
217  * of kernel heap dumped to disk when the kernel panics)
218  */
219 void *
220 zio_data_buf_alloc(size_t size)
221 {
222 	size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
223 
224 	ASSERT(c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
225 #ifdef __NetBSD__
226 	return (kmem_alloc(size, KM_SLEEP));
227 #else
228 	return (kmem_cache_alloc(zio_data_buf_cache[c], KM_PUSHPAGE));
229 #endif
230 }
231 
232 void
233 zio_buf_free(void *buf, size_t size)
234 {
235 	size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
236 
237 	ASSERT(c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
238 
239 #ifdef __NetBSD__
240 	kmem_free(buf, size);
241 #else
242 	kmem_cache_free(zio_buf_cache[c], buf);
243 #endif
244 }
245 
246 void
247 zio_data_buf_free(void *buf, size_t size)
248 {
249 	size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
250 
251 	ASSERT(c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
252 
253 #ifdef __NetBSD__
254 	kmem_free(buf, size);
255 #else
256 	kmem_cache_free(zio_data_buf_cache[c], buf);
257 #endif
258 }
259 
260 /*
261  * ==========================================================================
262  * Push and pop I/O transform buffers
263  * ==========================================================================
264  */
265 static void
266 zio_push_transform(zio_t *zio, void *data, uint64_t size, uint64_t bufsize,
267 	zio_transform_func_t *transform)
268 {
269 	zio_transform_t *zt = kmem_alloc(sizeof (zio_transform_t), KM_SLEEP);
270 
271 	zt->zt_orig_data = zio->io_data;
272 	zt->zt_orig_size = zio->io_size;
273 	zt->zt_bufsize = bufsize;
274 	zt->zt_transform = transform;
275 
276 	zt->zt_next = zio->io_transform_stack;
277 	zio->io_transform_stack = zt;
278 
279 	zio->io_data = data;
280 	zio->io_size = size;
281 }
282 
283 static void
284 zio_pop_transforms(zio_t *zio)
285 {
286 	zio_transform_t *zt;
287 
288 	while ((zt = zio->io_transform_stack) != NULL) {
289 		if (zt->zt_transform != NULL)
290 			zt->zt_transform(zio,
291 			    zt->zt_orig_data, zt->zt_orig_size);
292 
293 		if (zt->zt_bufsize != 0)
294 			zio_buf_free(zio->io_data, zt->zt_bufsize);
295 
296 		zio->io_data = zt->zt_orig_data;
297 		zio->io_size = zt->zt_orig_size;
298 		zio->io_transform_stack = zt->zt_next;
299 
300 		kmem_free(zt, sizeof (zio_transform_t));
301 	}
302 }
303 
304 /*
305  * ==========================================================================
306  * I/O transform callbacks for subblocks and decompression
307  * ==========================================================================
308  */
309 static void
310 zio_subblock(zio_t *zio, void *data, uint64_t size)
311 {
312 	ASSERT(zio->io_size > size);
313 
314 	if (zio->io_type == ZIO_TYPE_READ)
315 		bcopy(zio->io_data, data, size);
316 }
317 
318 static void
319 zio_decompress(zio_t *zio, void *data, uint64_t size)
320 {
321 	if (zio->io_error == 0 &&
322 	    zio_decompress_data(BP_GET_COMPRESS(zio->io_bp),
323 	    zio->io_data, data, zio->io_size, size) != 0)
324 		zio->io_error = EIO;
325 }
326 
327 /*
328  * ==========================================================================
329  * I/O parent/child relationships and pipeline interlocks
330  * ==========================================================================
331  */
332 /*
333  * NOTE - Callers to zio_walk_parents() and zio_walk_children must
334  *        continue calling these functions until they return NULL.
335  *        Otherwise, the next caller will pick up the list walk in
336  *        some indeterminate state.  (Otherwise every caller would
337  *        have to pass in a cookie to keep the state represented by
338  *        io_walk_link, which gets annoying.)
339  */
340 zio_t *
341 zio_walk_parents(zio_t *cio)
342 {
343 	zio_link_t *zl = cio->io_walk_link;
344 	list_t *pl = &cio->io_parent_list;
345 
346 	zl = (zl == NULL) ? list_head(pl) : list_next(pl, zl);
347 	cio->io_walk_link = zl;
348 
349 	if (zl == NULL)
350 		return (NULL);
351 
352 	ASSERT(zl->zl_child == cio);
353 	return (zl->zl_parent);
354 }
355 
356 zio_t *
357 zio_walk_children(zio_t *pio)
358 {
359 	zio_link_t *zl = pio->io_walk_link;
360 	list_t *cl = &pio->io_child_list;
361 
362 	zl = (zl == NULL) ? list_head(cl) : list_next(cl, zl);
363 	pio->io_walk_link = zl;
364 
365 	if (zl == NULL)
366 		return (NULL);
367 
368 	ASSERT(zl->zl_parent == pio);
369 	return (zl->zl_child);
370 }
371 
372 zio_t *
373 zio_unique_parent(zio_t *cio)
374 {
375 	zio_t *pio = zio_walk_parents(cio);
376 
377 	VERIFY(zio_walk_parents(cio) == NULL);
378 	return (pio);
379 }
380 
381 void
382 zio_add_child(zio_t *pio, zio_t *cio)
383 {
384 	zio_link_t *zl = kmem_cache_alloc(zio_link_cache, KM_SLEEP);
385 
386 	/*
387 	 * Logical I/Os can have logical, gang, or vdev children.
388 	 * Gang I/Os can have gang or vdev children.
389 	 * Vdev I/Os can only have vdev children.
390 	 * The following ASSERT captures all of these constraints.
391 	 */
392 	ASSERT(cio->io_child_type <= pio->io_child_type);
393 
394 	zl->zl_parent = pio;
395 	zl->zl_child = cio;
396 
397 	mutex_enter(&cio->io_lock);
398 	mutex_enter(&pio->io_lock);
399 
400 	ASSERT(pio->io_state[ZIO_WAIT_DONE] == 0);
401 
402 	for (int w = 0; w < ZIO_WAIT_TYPES; w++)
403 		pio->io_children[cio->io_child_type][w] += !cio->io_state[w];
404 
405 	list_insert_head(&pio->io_child_list, zl);
406 	list_insert_head(&cio->io_parent_list, zl);
407 
408 	pio->io_child_count++;
409 	cio->io_parent_count++;
410 
411 	mutex_exit(&pio->io_lock);
412 	mutex_exit(&cio->io_lock);
413 }
414 
415 static void
416 zio_remove_child(zio_t *pio, zio_t *cio, zio_link_t *zl)
417 {
418 	ASSERT(zl->zl_parent == pio);
419 	ASSERT(zl->zl_child == cio);
420 
421 	mutex_enter(&cio->io_lock);
422 	mutex_enter(&pio->io_lock);
423 
424 	list_remove(&pio->io_child_list, zl);
425 	list_remove(&cio->io_parent_list, zl);
426 
427 	pio->io_child_count--;
428 	cio->io_parent_count--;
429 
430 	mutex_exit(&pio->io_lock);
431 	mutex_exit(&cio->io_lock);
432 
433 	kmem_cache_free(zio_link_cache, zl);
434 }
435 
436 static boolean_t
437 zio_wait_for_children(zio_t *zio, enum zio_child child, enum zio_wait_type wait)
438 {
439 	uint64_t *countp = &zio->io_children[child][wait];
440 	boolean_t waiting = B_FALSE;
441 
442 	mutex_enter(&zio->io_lock);
443 	ASSERT(zio->io_stall == NULL);
444 	if (*countp != 0) {
445 		zio->io_stage >>= 1;
446 		zio->io_stall = countp;
447 		waiting = B_TRUE;
448 	}
449 	mutex_exit(&zio->io_lock);
450 
451 	return (waiting);
452 }
453 
454 static void
455 zio_notify_parent(zio_t *pio, zio_t *zio, enum zio_wait_type wait)
456 {
457 	uint64_t *countp = &pio->io_children[zio->io_child_type][wait];
458 	int *errorp = &pio->io_child_error[zio->io_child_type];
459 
460 	mutex_enter(&pio->io_lock);
461 	if (zio->io_error && !(zio->io_flags & ZIO_FLAG_DONT_PROPAGATE))
462 		*errorp = zio_worst_error(*errorp, zio->io_error);
463 	pio->io_reexecute |= zio->io_reexecute;
464 	ASSERT3U(*countp, >, 0);
465 	if (--*countp == 0 && pio->io_stall == countp) {
466 		pio->io_stall = NULL;
467 		mutex_exit(&pio->io_lock);
468 		zio_execute(pio);
469 	} else {
470 		mutex_exit(&pio->io_lock);
471 	}
472 }
473 
474 static void
475 zio_inherit_child_errors(zio_t *zio, enum zio_child c)
476 {
477 	if (zio->io_child_error[c] != 0 && zio->io_error == 0)
478 		zio->io_error = zio->io_child_error[c];
479 }
480 
481 /*
482  * ==========================================================================
483  * Create the various types of I/O (read, write, free, etc)
484  * ==========================================================================
485  */
486 static zio_t *
487 zio_create(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
488     void *data, uint64_t size, zio_done_func_t *done, void *private,
489     zio_type_t type, int priority, enum zio_flag flags,
490     vdev_t *vd, uint64_t offset, const zbookmark_t *zb,
491     enum zio_stage stage, enum zio_stage pipeline)
492 {
493 	zio_t *zio;
494 
495 	ASSERT3U(size, <=, SPA_MAXBLOCKSIZE);
496 	ASSERT(P2PHASE(size, SPA_MINBLOCKSIZE) == 0);
497 	ASSERT(P2PHASE(offset, SPA_MINBLOCKSIZE) == 0);
498 
499 	ASSERT(!vd || spa_config_held(spa, SCL_STATE_ALL, RW_READER));
500 	ASSERT(!bp || !(flags & ZIO_FLAG_CONFIG_WRITER));
501 	ASSERT(vd || stage == ZIO_STAGE_OPEN);
502 
503 	zio = kmem_cache_alloc(zio_cache, KM_SLEEP);
504 	bzero(zio, sizeof (zio_t));
505 
506 	mutex_init(&zio->io_lock, NULL, MUTEX_DEFAULT, NULL);
507 	cv_init(&zio->io_cv, NULL, CV_DEFAULT, NULL);
508 
509 	list_create(&zio->io_parent_list, sizeof (zio_link_t),
510 	    offsetof(zio_link_t, zl_parent_node));
511 	list_create(&zio->io_child_list, sizeof (zio_link_t),
512 	    offsetof(zio_link_t, zl_child_node));
513 
514 	if (vd != NULL)
515 		zio->io_child_type = ZIO_CHILD_VDEV;
516 	else if (flags & ZIO_FLAG_GANG_CHILD)
517 		zio->io_child_type = ZIO_CHILD_GANG;
518 	else if (flags & ZIO_FLAG_DDT_CHILD)
519 		zio->io_child_type = ZIO_CHILD_DDT;
520 	else
521 		zio->io_child_type = ZIO_CHILD_LOGICAL;
522 
523 	if (bp != NULL) {
524 		zio->io_bp = (blkptr_t *)bp;
525 		zio->io_bp_copy = *bp;
526 		zio->io_bp_orig = *bp;
527 		if (type != ZIO_TYPE_WRITE ||
528 		    zio->io_child_type == ZIO_CHILD_DDT)
529 			zio->io_bp = &zio->io_bp_copy;	/* so caller can free */
530 		if (zio->io_child_type == ZIO_CHILD_LOGICAL)
531 			zio->io_logical = zio;
532 		if (zio->io_child_type > ZIO_CHILD_GANG && BP_IS_GANG(bp))
533 			pipeline |= ZIO_GANG_STAGES;
534 	}
535 
536 	zio->io_spa = spa;
537 	zio->io_txg = txg;
538 	zio->io_done = done;
539 	zio->io_private = private;
540 	zio->io_type = type;
541 	zio->io_priority = priority;
542 	zio->io_vd = vd;
543 	zio->io_offset = offset;
544 	zio->io_orig_data = zio->io_data = data;
545 	zio->io_orig_size = zio->io_size = size;
546 	zio->io_orig_flags = zio->io_flags = flags;
547 	zio->io_orig_stage = zio->io_stage = stage;
548 	zio->io_orig_pipeline = zio->io_pipeline = pipeline;
549 
550 	zio->io_state[ZIO_WAIT_READY] = (stage >= ZIO_STAGE_READY);
551 	zio->io_state[ZIO_WAIT_DONE] = (stage >= ZIO_STAGE_DONE);
552 
553 	if (zb != NULL)
554 		zio->io_bookmark = *zb;
555 
556 	if (pio != NULL) {
557 		if (zio->io_logical == NULL)
558 			zio->io_logical = pio->io_logical;
559 		if (zio->io_child_type == ZIO_CHILD_GANG)
560 			zio->io_gang_leader = pio->io_gang_leader;
561 		zio_add_child(pio, zio);
562 	}
563 
564 	return (zio);
565 }
566 
567 static void
568 zio_destroy(zio_t *zio)
569 {
570 	list_destroy(&zio->io_parent_list);
571 	list_destroy(&zio->io_child_list);
572 	mutex_destroy(&zio->io_lock);
573 	cv_destroy(&zio->io_cv);
574 	kmem_cache_free(zio_cache, zio);
575 }
576 
577 zio_t *
578 zio_null(zio_t *pio, spa_t *spa, vdev_t *vd, zio_done_func_t *done,
579     void *private, enum zio_flag flags)
580 {
581 	zio_t *zio;
582 
583 	zio = zio_create(pio, spa, 0, NULL, NULL, 0, done, private,
584 	    ZIO_TYPE_NULL, ZIO_PRIORITY_NOW, flags, vd, 0, NULL,
585 	    ZIO_STAGE_OPEN, ZIO_INTERLOCK_PIPELINE);
586 
587 	return (zio);
588 }
589 
590 zio_t *
591 zio_root(spa_t *spa, zio_done_func_t *done, void *private, enum zio_flag flags)
592 {
593 	return (zio_null(NULL, spa, NULL, done, private, flags));
594 }
595 
596 zio_t *
597 zio_read(zio_t *pio, spa_t *spa, const blkptr_t *bp,
598     void *data, uint64_t size, zio_done_func_t *done, void *private,
599     int priority, enum zio_flag flags, const zbookmark_t *zb)
600 {
601 	zio_t *zio;
602 
603 	zio = zio_create(pio, spa, BP_PHYSICAL_BIRTH(bp), bp,
604 	    data, size, done, private,
605 	    ZIO_TYPE_READ, priority, flags, NULL, 0, zb,
606 	    ZIO_STAGE_OPEN, (flags & ZIO_FLAG_DDT_CHILD) ?
607 	    ZIO_DDT_CHILD_READ_PIPELINE : ZIO_READ_PIPELINE);
608 
609 	return (zio);
610 }
611 
612 zio_t *
613 zio_write(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp,
614     void *data, uint64_t size, const zio_prop_t *zp,
615     zio_done_func_t *ready, zio_done_func_t *done, void *private,
616     int priority, enum zio_flag flags, const zbookmark_t *zb)
617 {
618 	zio_t *zio;
619 
620 	ASSERT(zp->zp_checksum >= ZIO_CHECKSUM_OFF &&
621 	    zp->zp_checksum < ZIO_CHECKSUM_FUNCTIONS &&
622 	    zp->zp_compress >= ZIO_COMPRESS_OFF &&
623 	    zp->zp_compress < ZIO_COMPRESS_FUNCTIONS &&
624 	    zp->zp_type < DMU_OT_NUMTYPES &&
625 	    zp->zp_level < 32 &&
626 	    zp->zp_copies > 0 &&
627 	    zp->zp_copies <= spa_max_replication(spa) &&
628 	    zp->zp_dedup <= 1 &&
629 	    zp->zp_dedup_verify <= 1);
630 
631 	zio = zio_create(pio, spa, txg, bp, data, size, done, private,
632 	    ZIO_TYPE_WRITE, priority, flags, NULL, 0, zb,
633 	    ZIO_STAGE_OPEN, (flags & ZIO_FLAG_DDT_CHILD) ?
634 	    ZIO_DDT_CHILD_WRITE_PIPELINE : ZIO_WRITE_PIPELINE);
635 
636 	zio->io_ready = ready;
637 	zio->io_prop = *zp;
638 
639 	return (zio);
640 }
641 
642 zio_t *
643 zio_rewrite(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp, void *data,
644     uint64_t size, zio_done_func_t *done, void *private, int priority,
645     enum zio_flag flags, zbookmark_t *zb)
646 {
647 	zio_t *zio;
648 
649 	zio = zio_create(pio, spa, txg, bp, data, size, done, private,
650 	    ZIO_TYPE_WRITE, priority, flags, NULL, 0, zb,
651 	    ZIO_STAGE_OPEN, ZIO_REWRITE_PIPELINE);
652 
653 	return (zio);
654 }
655 
656 void
657 zio_write_override(zio_t *zio, blkptr_t *bp, int copies)
658 {
659 	ASSERT(zio->io_type == ZIO_TYPE_WRITE);
660 	ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
661 	ASSERT(zio->io_stage == ZIO_STAGE_OPEN);
662 	ASSERT(zio->io_txg == spa_syncing_txg(zio->io_spa));
663 
664 	zio->io_prop.zp_copies = copies;
665 	zio->io_bp_override = bp;
666 }
667 
668 void
669 zio_free(spa_t *spa, uint64_t txg, const blkptr_t *bp)
670 {
671 	bplist_enqueue_deferred(&spa->spa_free_bplist[txg & TXG_MASK], bp);
672 }
673 
674 zio_t *
675 zio_free_sync(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
676     enum zio_flag flags)
677 {
678 	zio_t *zio;
679 
680 	ASSERT(!BP_IS_HOLE(bp));
681 	ASSERT(spa_syncing_txg(spa) == txg);
682 	ASSERT(spa_sync_pass(spa) <= SYNC_PASS_DEFERRED_FREE);
683 
684 	zio = zio_create(pio, spa, txg, bp, NULL, BP_GET_PSIZE(bp),
685 	    NULL, NULL, ZIO_TYPE_FREE, ZIO_PRIORITY_FREE, flags,
686 	    NULL, 0, NULL, ZIO_STAGE_OPEN, ZIO_FREE_PIPELINE);
687 
688 	return (zio);
689 }
690 
691 zio_t *
692 zio_claim(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
693     zio_done_func_t *done, void *private, enum zio_flag flags)
694 {
695 	zio_t *zio;
696 
697 	/*
698 	 * A claim is an allocation of a specific block.  Claims are needed
699 	 * to support immediate writes in the intent log.  The issue is that
700 	 * immediate writes contain committed data, but in a txg that was
701 	 * *not* committed.  Upon opening the pool after an unclean shutdown,
702 	 * the intent log claims all blocks that contain immediate write data
703 	 * so that the SPA knows they're in use.
704 	 *
705 	 * All claims *must* be resolved in the first txg -- before the SPA
706 	 * starts allocating blocks -- so that nothing is allocated twice.
707 	 * If txg == 0 we just verify that the block is claimable.
708 	 */
709 	ASSERT3U(spa->spa_uberblock.ub_rootbp.blk_birth, <, spa_first_txg(spa));
710 	ASSERT(txg == spa_first_txg(spa) || txg == 0);
711 	ASSERT(!BP_GET_DEDUP(bp) || !spa_writeable(spa));	/* zdb(1M) */
712 
713 	zio = zio_create(pio, spa, txg, bp, NULL, BP_GET_PSIZE(bp),
714 	    done, private, ZIO_TYPE_CLAIM, ZIO_PRIORITY_NOW, flags,
715 	    NULL, 0, NULL, ZIO_STAGE_OPEN, ZIO_CLAIM_PIPELINE);
716 
717 	return (zio);
718 }
719 
720 zio_t *
721 zio_ioctl(zio_t *pio, spa_t *spa, vdev_t *vd, int cmd,
722     zio_done_func_t *done, void *private, int priority, enum zio_flag flags)
723 {
724 	zio_t *zio;
725 	int c;
726 
727 	if (vd->vdev_children == 0) {
728 		zio = zio_create(pio, spa, 0, NULL, NULL, 0, done, private,
729 		    ZIO_TYPE_IOCTL, priority, flags, vd, 0, NULL,
730 		    ZIO_STAGE_OPEN, ZIO_IOCTL_PIPELINE);
731 
732 		zio->io_cmd = cmd;
733 	} else {
734 		zio = zio_null(pio, spa, NULL, NULL, NULL, flags);
735 
736 		for (c = 0; c < vd->vdev_children; c++)
737 			zio_nowait(zio_ioctl(zio, spa, vd->vdev_child[c], cmd,
738 			    done, private, priority, flags));
739 	}
740 
741 	return (zio);
742 }
743 
744 zio_t *
745 zio_read_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
746     void *data, int checksum, zio_done_func_t *done, void *private,
747     int priority, enum zio_flag flags, boolean_t labels)
748 {
749 	zio_t *zio;
750 
751 	ASSERT(vd->vdev_children == 0);
752 	ASSERT(!labels || offset + size <= VDEV_LABEL_START_SIZE ||
753 	    offset >= vd->vdev_psize - VDEV_LABEL_END_SIZE);
754 	ASSERT3U(offset + size, <=, vd->vdev_psize);
755 
756 	zio = zio_create(pio, vd->vdev_spa, 0, NULL, data, size, done, private,
757 	    ZIO_TYPE_READ, priority, flags, vd, offset, NULL,
758 	    ZIO_STAGE_OPEN, ZIO_READ_PHYS_PIPELINE);
759 
760 	zio->io_prop.zp_checksum = checksum;
761 
762 	return (zio);
763 }
764 
765 zio_t *
766 zio_write_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
767     void *data, int checksum, zio_done_func_t *done, void *private,
768     int priority, enum zio_flag flags, boolean_t labels)
769 {
770 	zio_t *zio;
771 
772 	ASSERT(vd->vdev_children == 0);
773 	ASSERT(!labels || offset + size <= VDEV_LABEL_START_SIZE ||
774 	    offset >= vd->vdev_psize - VDEV_LABEL_END_SIZE);
775 	ASSERT3U(offset + size, <=, vd->vdev_psize);
776 
777 	zio = zio_create(pio, vd->vdev_spa, 0, NULL, data, size, done, private,
778 	    ZIO_TYPE_WRITE, priority, flags, vd, offset, NULL,
779 	    ZIO_STAGE_OPEN, ZIO_WRITE_PHYS_PIPELINE);
780 
781 	zio->io_prop.zp_checksum = checksum;
782 
783 	if (zio_checksum_table[checksum].ci_eck) {
784 		/*
785 		 * zec checksums are necessarily destructive -- they modify
786 		 * the end of the write buffer to hold the verifier/checksum.
787 		 * Therefore, we must make a local copy in case the data is
788 		 * being written to multiple places in parallel.
789 		 */
790 		void *wbuf = zio_buf_alloc(size);
791 		bcopy(data, wbuf, size);
792 		zio_push_transform(zio, wbuf, size, size, NULL);
793 	}
794 
795 	return (zio);
796 }
797 
798 /*
799  * Create a child I/O to do some work for us.
800  */
801 zio_t *
802 zio_vdev_child_io(zio_t *pio, blkptr_t *bp, vdev_t *vd, uint64_t offset,
803 	void *data, uint64_t size, int type, int priority, enum zio_flag flags,
804 	zio_done_func_t *done, void *private)
805 {
806 	enum zio_stage pipeline = ZIO_VDEV_CHILD_PIPELINE;
807 	zio_t *zio;
808 
809 	ASSERT(vd->vdev_parent ==
810 	    (pio->io_vd ? pio->io_vd : pio->io_spa->spa_root_vdev));
811 
812 	if (type == ZIO_TYPE_READ && bp != NULL) {
813 		/*
814 		 * If we have the bp, then the child should perform the
815 		 * checksum and the parent need not.  This pushes error
816 		 * detection as close to the leaves as possible and
817 		 * eliminates redundant checksums in the interior nodes.
818 		 */
819 		pipeline |= ZIO_STAGE_CHECKSUM_VERIFY;
820 		pio->io_pipeline &= ~ZIO_STAGE_CHECKSUM_VERIFY;
821 	}
822 
823 	if (vd->vdev_children == 0)
824 		offset += VDEV_LABEL_START_SIZE;
825 
826 	flags |= ZIO_VDEV_CHILD_FLAGS(pio) | ZIO_FLAG_DONT_PROPAGATE;
827 
828 	/*
829 	 * If we've decided to do a repair, the write is not speculative --
830 	 * even if the original read was.
831 	 */
832 	if (flags & ZIO_FLAG_IO_REPAIR)
833 		flags &= ~ZIO_FLAG_SPECULATIVE;
834 
835 	zio = zio_create(pio, pio->io_spa, pio->io_txg, bp, data, size,
836 	    done, private, type, priority, flags, vd, offset, &pio->io_bookmark,
837 	    ZIO_STAGE_VDEV_IO_START >> 1, pipeline);
838 
839 	return (zio);
840 }
841 
842 zio_t *
843 zio_vdev_delegated_io(vdev_t *vd, uint64_t offset, void *data, uint64_t size,
844 	int type, int priority, enum zio_flag flags,
845 	zio_done_func_t *done, void *private)
846 {
847 	zio_t *zio;
848 
849 	ASSERT(vd->vdev_ops->vdev_op_leaf);
850 
851 	zio = zio_create(NULL, vd->vdev_spa, 0, NULL,
852 	    data, size, done, private, type, priority,
853 	    flags | ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_RETRY,
854 	    vd, offset, NULL,
855 	    ZIO_STAGE_VDEV_IO_START >> 1, ZIO_VDEV_CHILD_PIPELINE);
856 
857 	return (zio);
858 }
859 
860 void
861 zio_flush(zio_t *zio, vdev_t *vd)
862 {
863 	zio_nowait(zio_ioctl(zio, zio->io_spa, vd, DKIOCFLUSHWRITECACHE,
864 	    NULL, NULL, ZIO_PRIORITY_NOW,
865 	    ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE | ZIO_FLAG_DONT_RETRY));
866 }
867 
868 void
869 zio_shrink(zio_t *zio, uint64_t size)
870 {
871 	ASSERT(zio->io_executor == NULL);
872 	ASSERT(zio->io_orig_size == zio->io_size);
873 	ASSERT(size <= zio->io_size);
874 
875 	/*
876 	 * We don't shrink for raidz because of problems with the
877 	 * reconstruction when reading back less than the block size.
878 	 * Note, BP_IS_RAIDZ() assumes no compression.
879 	 */
880 	ASSERT(BP_GET_COMPRESS(zio->io_bp) == ZIO_COMPRESS_OFF);
881 	if (!BP_IS_RAIDZ(zio->io_bp))
882 		zio->io_orig_size = zio->io_size = size;
883 }
884 
885 /*
886  * ==========================================================================
887  * Prepare to read and write logical blocks
888  * ==========================================================================
889  */
890 
891 static int
892 zio_read_bp_init(zio_t *zio)
893 {
894 	blkptr_t *bp = zio->io_bp;
895 
896 	if (BP_GET_COMPRESS(bp) != ZIO_COMPRESS_OFF &&
897 	    zio->io_child_type == ZIO_CHILD_LOGICAL &&
898 	    !(zio->io_flags & ZIO_FLAG_RAW)) {
899 		uint64_t psize = BP_GET_PSIZE(bp);
900 		void *cbuf = zio_buf_alloc(psize);
901 
902 		zio_push_transform(zio, cbuf, psize, psize, zio_decompress);
903 	}
904 
905 	if (!dmu_ot[BP_GET_TYPE(bp)].ot_metadata && BP_GET_LEVEL(bp) == 0)
906 		zio->io_flags |= ZIO_FLAG_DONT_CACHE;
907 
908 	if (BP_GET_TYPE(bp) == DMU_OT_DDT_ZAP)
909 		zio->io_flags |= ZIO_FLAG_DONT_CACHE;
910 
911 	if (BP_GET_DEDUP(bp) && zio->io_child_type == ZIO_CHILD_LOGICAL)
912 		zio->io_pipeline = ZIO_DDT_READ_PIPELINE;
913 
914 	return (ZIO_PIPELINE_CONTINUE);
915 }
916 
917 static int
918 zio_write_bp_init(zio_t *zio)
919 {
920 	spa_t *spa = zio->io_spa;
921 	zio_prop_t *zp = &zio->io_prop;
922 	enum zio_compress compress = zp->zp_compress;
923 	blkptr_t *bp = zio->io_bp;
924 	uint64_t lsize = zio->io_size;
925 	uint64_t psize = lsize;
926 	int pass = 1;
927 
928 	/*
929 	 * If our children haven't all reached the ready stage,
930 	 * wait for them and then repeat this pipeline stage.
931 	 */
932 	if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_READY) ||
933 	    zio_wait_for_children(zio, ZIO_CHILD_LOGICAL, ZIO_WAIT_READY))
934 		return (ZIO_PIPELINE_STOP);
935 
936 	if (!IO_IS_ALLOCATING(zio))
937 		return (ZIO_PIPELINE_CONTINUE);
938 
939 	ASSERT(zio->io_child_type != ZIO_CHILD_DDT);
940 
941 	if (zio->io_bp_override) {
942 		ASSERT(bp->blk_birth != zio->io_txg);
943 		ASSERT(BP_GET_DEDUP(zio->io_bp_override) == 0);
944 
945 		*bp = *zio->io_bp_override;
946 		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
947 
948 		if (BP_IS_HOLE(bp) || !zp->zp_dedup)
949 			return (ZIO_PIPELINE_CONTINUE);
950 
951 		ASSERT(zio_checksum_table[zp->zp_checksum].ci_dedup ||
952 		    zp->zp_dedup_verify);
953 
954 		if (BP_GET_CHECKSUM(bp) == zp->zp_checksum) {
955 			BP_SET_DEDUP(bp, 1);
956 			zio->io_pipeline |= ZIO_STAGE_DDT_WRITE;
957 			return (ZIO_PIPELINE_CONTINUE);
958 		}
959 		zio->io_bp_override = NULL;
960 		BP_ZERO(bp);
961 	}
962 
963 	if (bp->blk_birth == zio->io_txg) {
964 		/*
965 		 * We're rewriting an existing block, which means we're
966 		 * working on behalf of spa_sync().  For spa_sync() to
967 		 * converge, it must eventually be the case that we don't
968 		 * have to allocate new blocks.  But compression changes
969 		 * the blocksize, which forces a reallocate, and makes
970 		 * convergence take longer.  Therefore, after the first
971 		 * few passes, stop compressing to ensure convergence.
972 		 */
973 		pass = spa_sync_pass(spa);
974 
975 		ASSERT(zio->io_txg == spa_syncing_txg(spa));
976 		ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
977 		ASSERT(!BP_GET_DEDUP(bp));
978 
979 		if (pass > SYNC_PASS_DONT_COMPRESS)
980 			compress = ZIO_COMPRESS_OFF;
981 
982 		/* Make sure someone doesn't change their mind on overwrites */
983 		ASSERT(MIN(zp->zp_copies + BP_IS_GANG(bp),
984 		    spa_max_replication(spa)) == BP_GET_NDVAS(bp));
985 	}
986 
987 	if (compress != ZIO_COMPRESS_OFF) {
988 		void *cbuf = zio_buf_alloc(lsize);
989 		psize = zio_compress_data(compress, zio->io_data, cbuf, lsize);
990 		if (psize == 0 || psize == lsize) {
991 			compress = ZIO_COMPRESS_OFF;
992 			zio_buf_free(cbuf, lsize);
993 		} else {
994 			ASSERT(psize < lsize);
995 			zio_push_transform(zio, cbuf, psize, lsize, NULL);
996 		}
997 	}
998 
999 	/*
1000 	 * The final pass of spa_sync() must be all rewrites, but the first
1001 	 * few passes offer a trade-off: allocating blocks defers convergence,
1002 	 * but newly allocated blocks are sequential, so they can be written
1003 	 * to disk faster.  Therefore, we allow the first few passes of
1004 	 * spa_sync() to allocate new blocks, but force rewrites after that.
1005 	 * There should only be a handful of blocks after pass 1 in any case.
1006 	 */
1007 	if (bp->blk_birth == zio->io_txg && BP_GET_PSIZE(bp) == psize &&
1008 	    pass > SYNC_PASS_REWRITE) {
1009 		ASSERT(psize != 0);
1010 		enum zio_stage gang_stages = zio->io_pipeline & ZIO_GANG_STAGES;
1011 		zio->io_pipeline = ZIO_REWRITE_PIPELINE | gang_stages;
1012 		zio->io_flags |= ZIO_FLAG_IO_REWRITE;
1013 	} else {
1014 		BP_ZERO(bp);
1015 		zio->io_pipeline = ZIO_WRITE_PIPELINE;
1016 	}
1017 
1018 	if (psize == 0) {
1019 		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
1020 	} else {
1021 		ASSERT(zp->zp_checksum != ZIO_CHECKSUM_GANG_HEADER);
1022 		BP_SET_LSIZE(bp, lsize);
1023 		BP_SET_PSIZE(bp, psize);
1024 		BP_SET_COMPRESS(bp, compress);
1025 		BP_SET_CHECKSUM(bp, zp->zp_checksum);
1026 		BP_SET_TYPE(bp, zp->zp_type);
1027 		BP_SET_LEVEL(bp, zp->zp_level);
1028 		BP_SET_DEDUP(bp, zp->zp_dedup);
1029 		BP_SET_BYTEORDER(bp, ZFS_HOST_BYTEORDER);
1030 		if (zp->zp_dedup) {
1031 			ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
1032 			ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REWRITE));
1033 			zio->io_pipeline = ZIO_DDT_WRITE_PIPELINE;
1034 		}
1035 	}
1036 
1037 	return (ZIO_PIPELINE_CONTINUE);
1038 }
1039 
1040 static int
1041 zio_free_bp_init(zio_t *zio)
1042 {
1043 	blkptr_t *bp = zio->io_bp;
1044 
1045 	if (zio->io_child_type == ZIO_CHILD_LOGICAL) {
1046 		if (BP_GET_DEDUP(bp))
1047 			zio->io_pipeline = ZIO_DDT_FREE_PIPELINE;
1048 		else
1049 			arc_free(zio->io_spa, bp);
1050 	}
1051 
1052 	return (ZIO_PIPELINE_CONTINUE);
1053 }
1054 
1055 /*
1056  * ==========================================================================
1057  * Execute the I/O pipeline
1058  * ==========================================================================
1059  */
1060 
1061 static void
1062 zio_taskq_dispatch(zio_t *zio, enum zio_taskq_type q, boolean_t cutinline)
1063 {
1064 	spa_t *spa = zio->io_spa;
1065 	zio_type_t t = zio->io_type;
1066 	int flags = TQ_SLEEP | (cutinline ? TQ_FRONT : 0);
1067 
1068 	/*
1069 	 * If we're a config writer or a probe, the normal issue and
1070 	 * interrupt threads may all be blocked waiting for the config lock.
1071 	 * In this case, select the otherwise-unused taskq for ZIO_TYPE_NULL.
1072 	 */
1073 	if (zio->io_flags & (ZIO_FLAG_CONFIG_WRITER | ZIO_FLAG_PROBE))
1074 		t = ZIO_TYPE_NULL;
1075 
1076 	/*
1077 	 * A similar issue exists for the L2ARC write thread until L2ARC 2.0.
1078 	 */
1079 	if (t == ZIO_TYPE_WRITE && zio->io_vd && zio->io_vd->vdev_aux)
1080 		t = ZIO_TYPE_NULL;
1081 
1082 	/*
1083 	 * If this is a high priority I/O, then use the high priority taskq.
1084 	 */
1085 	if (zio->io_priority == ZIO_PRIORITY_NOW &&
1086 	    spa->spa_zio_taskq[t][q + 1] != NULL)
1087 		q++;
1088 
1089 	ASSERT3U(q, <, ZIO_TASKQ_TYPES);
1090 	(void) taskq_dispatch(spa->spa_zio_taskq[t][q],
1091 	    (task_func_t *)zio_execute, zio, flags);
1092 }
1093 
1094 static boolean_t
1095 zio_taskq_member(zio_t *zio, enum zio_taskq_type q)
1096 {
1097 	kthread_t *executor = zio->io_executor;
1098 	spa_t *spa = zio->io_spa;
1099 
1100 	for (zio_type_t t = 0; t < ZIO_TYPES; t++)
1101 		if (taskq_member(spa->spa_zio_taskq[t][q], executor))
1102 			return (B_TRUE);
1103 
1104 	return (B_FALSE);
1105 }
1106 
1107 static int
1108 zio_issue_async(zio_t *zio)
1109 {
1110 	zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, B_FALSE);
1111 
1112 	return (ZIO_PIPELINE_STOP);
1113 }
1114 
1115 void
1116 zio_interrupt(zio_t *zio)
1117 {
1118 	zio_taskq_dispatch(zio, ZIO_TASKQ_INTERRUPT, B_FALSE);
1119 }
1120 
1121 /*
1122  * Execute the I/O pipeline until one of the following occurs:
1123  * (1) the I/O completes; (2) the pipeline stalls waiting for
1124  * dependent child I/Os; (3) the I/O issues, so we're waiting
1125  * for an I/O completion interrupt; (4) the I/O is delegated by
1126  * vdev-level caching or aggregation; (5) the I/O is deferred
1127  * due to vdev-level queueing; (6) the I/O is handed off to
1128  * another thread.  In all cases, the pipeline stops whenever
1129  * there's no CPU work; it never burns a thread in cv_wait().
1130  *
1131  * There's no locking on io_stage because there's no legitimate way
1132  * for multiple threads to be attempting to process the same I/O.
1133  */
1134 static zio_pipe_stage_t *zio_pipeline[];
1135 
1136 void
1137 zio_execute(zio_t *zio)
1138 {
1139 	zio->io_executor = curthread;
1140 
1141 	while (zio->io_stage < ZIO_STAGE_DONE) {
1142 		enum zio_stage pipeline = zio->io_pipeline;
1143 		enum zio_stage stage = zio->io_stage;
1144 		int rv;
1145 
1146 		ASSERT(!MUTEX_HELD(&zio->io_lock));
1147 		ASSERT(ISP2(stage));
1148 		ASSERT(zio->io_stall == NULL);
1149 
1150 		do {
1151 			stage <<= 1;
1152 		} while ((stage & pipeline) == 0);
1153 
1154 		ASSERT(stage <= ZIO_STAGE_DONE);
1155 
1156 		/*
1157 		 * If we are in interrupt context and this pipeline stage
1158 		 * will grab a config lock that is held across I/O,
1159 		 * or may wait for an I/O that needs an interrupt thread
1160 		 * to complete, issue async to avoid deadlock.
1161 		 *
1162 		 * For VDEV_IO_START, we cut in line so that the io will
1163 		 * be sent to disk promptly.
1164 		 */
1165 		if ((stage & ZIO_BLOCKING_STAGES) && zio->io_vd == NULL &&
1166 		    zio_taskq_member(zio, ZIO_TASKQ_INTERRUPT)) {
1167 			boolean_t cut = (stage == ZIO_STAGE_VDEV_IO_START) ?
1168 			    zio_requeue_io_start_cut_in_line : B_FALSE;
1169 			zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, cut);
1170 			return;
1171 		}
1172 
1173 		zio->io_stage = stage;
1174 		rv = zio_pipeline[highbit(stage) - 1](zio);
1175 
1176 		if (rv == ZIO_PIPELINE_STOP)
1177 			return;
1178 
1179 		ASSERT(rv == ZIO_PIPELINE_CONTINUE);
1180 	}
1181 }
1182 
1183 /*
1184  * ==========================================================================
1185  * Initiate I/O, either sync or async
1186  * ==========================================================================
1187  */
1188 int
1189 zio_wait(zio_t *zio)
1190 {
1191 	int error;
1192 
1193 	ASSERT(zio->io_stage == ZIO_STAGE_OPEN);
1194 	ASSERT(zio->io_executor == NULL);
1195 
1196 	zio->io_waiter = curthread;
1197 
1198 	zio_execute(zio);
1199 
1200 	mutex_enter(&zio->io_lock);
1201 	while (zio->io_executor != NULL)
1202 		cv_wait(&zio->io_cv, &zio->io_lock);
1203 	mutex_exit(&zio->io_lock);
1204 
1205 	error = zio->io_error;
1206 	zio_destroy(zio);
1207 
1208 	return (error);
1209 }
1210 
1211 void
1212 zio_nowait(zio_t *zio)
1213 {
1214 	ASSERT(zio->io_executor == NULL);
1215 
1216 	if (zio->io_child_type == ZIO_CHILD_LOGICAL &&
1217 	    zio_unique_parent(zio) == NULL) {
1218 		/*
1219 		 * This is a logical async I/O with no parent to wait for it.
1220 		 * We add it to the spa_async_root_zio "Godfather" I/O which
1221 		 * will ensure they complete prior to unloading the pool.
1222 		 */
1223 		spa_t *spa = zio->io_spa;
1224 
1225 		zio_add_child(spa->spa_async_zio_root, zio);
1226 	}
1227 
1228 	zio_execute(zio);
1229 }
1230 
1231 /*
1232  * ==========================================================================
1233  * Reexecute or suspend/resume failed I/O
1234  * ==========================================================================
1235  */
1236 
1237 static void
1238 zio_reexecute(zio_t *pio)
1239 {
1240 	zio_t *cio, *cio_next;
1241 
1242 	ASSERT(pio->io_child_type == ZIO_CHILD_LOGICAL);
1243 	ASSERT(pio->io_orig_stage == ZIO_STAGE_OPEN);
1244 	ASSERT(pio->io_gang_leader == NULL);
1245 	ASSERT(pio->io_gang_tree == NULL);
1246 
1247 	pio->io_flags = pio->io_orig_flags;
1248 	pio->io_stage = pio->io_orig_stage;
1249 	pio->io_pipeline = pio->io_orig_pipeline;
1250 	pio->io_reexecute = 0;
1251 	pio->io_error = 0;
1252 	for (int w = 0; w < ZIO_WAIT_TYPES; w++)
1253 		pio->io_state[w] = 0;
1254 	for (int c = 0; c < ZIO_CHILD_TYPES; c++)
1255 		pio->io_child_error[c] = 0;
1256 
1257 	if (IO_IS_ALLOCATING(pio))
1258 		BP_ZERO(pio->io_bp);
1259 
1260 	/*
1261 	 * As we reexecute pio's children, new children could be created.
1262 	 * New children go to the head of pio's io_child_list, however,
1263 	 * so we will (correctly) not reexecute them.  The key is that
1264 	 * the remainder of pio's io_child_list, from 'cio_next' onward,
1265 	 * cannot be affected by any side effects of reexecuting 'cio'.
1266 	 */
1267 	for (cio = zio_walk_children(pio); cio != NULL; cio = cio_next) {
1268 		cio_next = zio_walk_children(pio);
1269 		mutex_enter(&pio->io_lock);
1270 		for (int w = 0; w < ZIO_WAIT_TYPES; w++)
1271 			pio->io_children[cio->io_child_type][w]++;
1272 		mutex_exit(&pio->io_lock);
1273 		zio_reexecute(cio);
1274 	}
1275 
1276 	/*
1277 	 * Now that all children have been reexecuted, execute the parent.
1278 	 * We don't reexecute "The Godfather" I/O here as it's the
1279 	 * responsibility of the caller to wait on him.
1280 	 */
1281 	if (!(pio->io_flags & ZIO_FLAG_GODFATHER))
1282 		zio_execute(pio);
1283 }
1284 
1285 void
1286 zio_suspend(spa_t *spa, zio_t *zio)
1287 {
1288 	if (spa_get_failmode(spa) == ZIO_FAILURE_MODE_PANIC)
1289 		fm_panic("Pool '%s' has encountered an uncorrectable I/O "
1290 		    "failure and the failure mode property for this pool "
1291 		    "is set to panic.", spa_name(spa));
1292 
1293 	zfs_ereport_post(FM_EREPORT_ZFS_IO_FAILURE, spa, NULL, NULL, 0, 0);
1294 
1295 	mutex_enter(&spa->spa_suspend_lock);
1296 
1297 	if (spa->spa_suspend_zio_root == NULL)
1298 		spa->spa_suspend_zio_root = zio_root(spa, NULL, NULL,
1299 		    ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE |
1300 		    ZIO_FLAG_GODFATHER);
1301 
1302 	spa->spa_suspended = B_TRUE;
1303 
1304 	if (zio != NULL) {
1305 		ASSERT(!(zio->io_flags & ZIO_FLAG_GODFATHER));
1306 		ASSERT(zio != spa->spa_suspend_zio_root);
1307 		ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
1308 		ASSERT(zio_unique_parent(zio) == NULL);
1309 		ASSERT(zio->io_stage == ZIO_STAGE_DONE);
1310 		zio_add_child(spa->spa_suspend_zio_root, zio);
1311 	}
1312 
1313 	mutex_exit(&spa->spa_suspend_lock);
1314 }
1315 
1316 int
1317 zio_resume(spa_t *spa)
1318 {
1319 	zio_t *pio;
1320 
1321 	/*
1322 	 * Reexecute all previously suspended i/o.
1323 	 */
1324 	mutex_enter(&spa->spa_suspend_lock);
1325 	spa->spa_suspended = B_FALSE;
1326 	cv_broadcast(&spa->spa_suspend_cv);
1327 	pio = spa->spa_suspend_zio_root;
1328 	spa->spa_suspend_zio_root = NULL;
1329 	mutex_exit(&spa->spa_suspend_lock);
1330 
1331 	if (pio == NULL)
1332 		return (0);
1333 
1334 	zio_reexecute(pio);
1335 	return (zio_wait(pio));
1336 }
1337 
1338 void
1339 zio_resume_wait(spa_t *spa)
1340 {
1341 	mutex_enter(&spa->spa_suspend_lock);
1342 	while (spa_suspended(spa))
1343 		cv_wait(&spa->spa_suspend_cv, &spa->spa_suspend_lock);
1344 	mutex_exit(&spa->spa_suspend_lock);
1345 }
1346 
1347 /*
1348  * ==========================================================================
1349  * Gang blocks.
1350  *
1351  * A gang block is a collection of small blocks that looks to the DMU
1352  * like one large block.  When zio_dva_allocate() cannot find a block
1353  * of the requested size, due to either severe fragmentation or the pool
1354  * being nearly full, it calls zio_write_gang_block() to construct the
1355  * block from smaller fragments.
1356  *
1357  * A gang block consists of a gang header (zio_gbh_phys_t) and up to
1358  * three (SPA_GBH_NBLKPTRS) gang members.  The gang header is just like
1359  * an indirect block: it's an array of block pointers.  It consumes
1360  * only one sector and hence is allocatable regardless of fragmentation.
1361  * The gang header's bps point to its gang members, which hold the data.
1362  *
1363  * Gang blocks are self-checksumming, using the bp's <vdev, offset, txg>
1364  * as the verifier to ensure uniqueness of the SHA256 checksum.
1365  * Critically, the gang block bp's blk_cksum is the checksum of the data,
1366  * not the gang header.  This ensures that data block signatures (needed for
1367  * deduplication) are independent of how the block is physically stored.
1368  *
1369  * Gang blocks can be nested: a gang member may itself be a gang block.
1370  * Thus every gang block is a tree in which root and all interior nodes are
1371  * gang headers, and the leaves are normal blocks that contain user data.
1372  * The root of the gang tree is called the gang leader.
1373  *
1374  * To perform any operation (read, rewrite, free, claim) on a gang block,
1375  * zio_gang_assemble() first assembles the gang tree (minus data leaves)
1376  * in the io_gang_tree field of the original logical i/o by recursively
1377  * reading the gang leader and all gang headers below it.  This yields
1378  * an in-core tree containing the contents of every gang header and the
1379  * bps for every constituent of the gang block.
1380  *
1381  * With the gang tree now assembled, zio_gang_issue() just walks the gang tree
1382  * and invokes a callback on each bp.  To free a gang block, zio_gang_issue()
1383  * calls zio_free_gang() -- a trivial wrapper around zio_free() -- for each bp.
1384  * zio_claim_gang() provides a similarly trivial wrapper for zio_claim().
1385  * zio_read_gang() is a wrapper around zio_read() that omits reading gang
1386  * headers, since we already have those in io_gang_tree.  zio_rewrite_gang()
1387  * performs a zio_rewrite() of the data or, for gang headers, a zio_rewrite()
1388  * of the gang header plus zio_checksum_compute() of the data to update the
1389  * gang header's blk_cksum as described above.
1390  *
1391  * The two-phase assemble/issue model solves the problem of partial failure --
1392  * what if you'd freed part of a gang block but then couldn't read the
1393  * gang header for another part?  Assembling the entire gang tree first
1394  * ensures that all the necessary gang header I/O has succeeded before
1395  * starting the actual work of free, claim, or write.  Once the gang tree
1396  * is assembled, free and claim are in-memory operations that cannot fail.
1397  *
1398  * In the event that a gang write fails, zio_dva_unallocate() walks the
1399  * gang tree to immediately free (i.e. insert back into the space map)
1400  * everything we've allocated.  This ensures that we don't get ENOSPC
1401  * errors during repeated suspend/resume cycles due to a flaky device.
1402  *
1403  * Gang rewrites only happen during sync-to-convergence.  If we can't assemble
1404  * the gang tree, we won't modify the block, so we can safely defer the free
1405  * (knowing that the block is still intact).  If we *can* assemble the gang
1406  * tree, then even if some of the rewrites fail, zio_dva_unallocate() will free
1407  * each constituent bp and we can allocate a new block on the next sync pass.
1408  *
1409  * In all cases, the gang tree allows complete recovery from partial failure.
1410  * ==========================================================================
1411  */
1412 
1413 static zio_t *
1414 zio_read_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, void *data)
1415 {
1416 	if (gn != NULL)
1417 		return (pio);
1418 
1419 	return (zio_read(pio, pio->io_spa, bp, data, BP_GET_PSIZE(bp),
1420 	    NULL, NULL, pio->io_priority, ZIO_GANG_CHILD_FLAGS(pio),
1421 	    &pio->io_bookmark));
1422 }
1423 
1424 zio_t *
1425 zio_rewrite_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, void *data)
1426 {
1427 	zio_t *zio;
1428 
1429 	if (gn != NULL) {
1430 		zio = zio_rewrite(pio, pio->io_spa, pio->io_txg, bp,
1431 		    gn->gn_gbh, SPA_GANGBLOCKSIZE, NULL, NULL, pio->io_priority,
1432 		    ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark);
1433 		/*
1434 		 * As we rewrite each gang header, the pipeline will compute
1435 		 * a new gang block header checksum for it; but no one will
1436 		 * compute a new data checksum, so we do that here.  The one
1437 		 * exception is the gang leader: the pipeline already computed
1438 		 * its data checksum because that stage precedes gang assembly.
1439 		 * (Presently, nothing actually uses interior data checksums;
1440 		 * this is just good hygiene.)
1441 		 */
1442 		if (gn != pio->io_gang_leader->io_gang_tree) {
1443 			zio_checksum_compute(zio, BP_GET_CHECKSUM(bp),
1444 			    data, BP_GET_PSIZE(bp));
1445 		}
1446 		/*
1447 		 * If we are here to damage data for testing purposes,
1448 		 * leave the GBH alone so that we can detect the damage.
1449 		 */
1450 		if (pio->io_gang_leader->io_flags & ZIO_FLAG_INDUCE_DAMAGE)
1451 			zio->io_pipeline &= ~ZIO_VDEV_IO_STAGES;
1452 	} else {
1453 		zio = zio_rewrite(pio, pio->io_spa, pio->io_txg, bp,
1454 		    data, BP_GET_PSIZE(bp), NULL, NULL, pio->io_priority,
1455 		    ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark);
1456 	}
1457 
1458 	return (zio);
1459 }
1460 
1461 /* ARGSUSED */
1462 zio_t *
1463 zio_free_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, void *data)
1464 {
1465 	return (zio_free_sync(pio, pio->io_spa, pio->io_txg, bp,
1466 	    ZIO_GANG_CHILD_FLAGS(pio)));
1467 }
1468 
1469 /* ARGSUSED */
1470 zio_t *
1471 zio_claim_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, void *data)
1472 {
1473 	return (zio_claim(pio, pio->io_spa, pio->io_txg, bp,
1474 	    NULL, NULL, ZIO_GANG_CHILD_FLAGS(pio)));
1475 }
1476 
1477 static zio_gang_issue_func_t *zio_gang_issue_func[ZIO_TYPES] = {
1478 	NULL,
1479 	zio_read_gang,
1480 	zio_rewrite_gang,
1481 	zio_free_gang,
1482 	zio_claim_gang,
1483 	NULL
1484 };
1485 
1486 static void zio_gang_tree_assemble_done(zio_t *zio);
1487 
1488 static zio_gang_node_t *
1489 zio_gang_node_alloc(zio_gang_node_t **gnpp)
1490 {
1491 	zio_gang_node_t *gn;
1492 
1493 	ASSERT(*gnpp == NULL);
1494 
1495 	gn = kmem_zalloc(sizeof (*gn), KM_SLEEP);
1496 	gn->gn_gbh = zio_buf_alloc(SPA_GANGBLOCKSIZE);
1497 	*gnpp = gn;
1498 
1499 	return (gn);
1500 }
1501 
1502 static void
1503 zio_gang_node_free(zio_gang_node_t **gnpp)
1504 {
1505 	zio_gang_node_t *gn = *gnpp;
1506 
1507 	for (int g = 0; g < SPA_GBH_NBLKPTRS; g++)
1508 		ASSERT(gn->gn_child[g] == NULL);
1509 
1510 	zio_buf_free(gn->gn_gbh, SPA_GANGBLOCKSIZE);
1511 	kmem_free(gn, sizeof (*gn));
1512 	*gnpp = NULL;
1513 }
1514 
1515 static void
1516 zio_gang_tree_free(zio_gang_node_t **gnpp)
1517 {
1518 	zio_gang_node_t *gn = *gnpp;
1519 
1520 	if (gn == NULL)
1521 		return;
1522 
1523 	for (int g = 0; g < SPA_GBH_NBLKPTRS; g++)
1524 		zio_gang_tree_free(&gn->gn_child[g]);
1525 
1526 	zio_gang_node_free(gnpp);
1527 }
1528 
1529 static void
1530 zio_gang_tree_assemble(zio_t *gio, blkptr_t *bp, zio_gang_node_t **gnpp)
1531 {
1532 	zio_gang_node_t *gn = zio_gang_node_alloc(gnpp);
1533 
1534 	ASSERT(gio->io_gang_leader == gio);
1535 	ASSERT(BP_IS_GANG(bp));
1536 
1537 	zio_nowait(zio_read(gio, gio->io_spa, bp, gn->gn_gbh,
1538 	    SPA_GANGBLOCKSIZE, zio_gang_tree_assemble_done, gn,
1539 	    gio->io_priority, ZIO_GANG_CHILD_FLAGS(gio), &gio->io_bookmark));
1540 }
1541 
1542 static void
1543 zio_gang_tree_assemble_done(zio_t *zio)
1544 {
1545 	zio_t *gio = zio->io_gang_leader;
1546 	zio_gang_node_t *gn = zio->io_private;
1547 	blkptr_t *bp = zio->io_bp;
1548 
1549 	ASSERT(gio == zio_unique_parent(zio));
1550 	ASSERT(zio->io_child_count == 0);
1551 
1552 	if (zio->io_error)
1553 		return;
1554 
1555 	if (BP_SHOULD_BYTESWAP(bp))
1556 		byteswap_uint64_array(zio->io_data, zio->io_size);
1557 
1558 	ASSERT(zio->io_data == gn->gn_gbh);
1559 	ASSERT(zio->io_size == SPA_GANGBLOCKSIZE);
1560 	ASSERT(gn->gn_gbh->zg_tail.zec_magic == ZEC_MAGIC);
1561 
1562 	for (int g = 0; g < SPA_GBH_NBLKPTRS; g++) {
1563 		blkptr_t *gbp = &gn->gn_gbh->zg_blkptr[g];
1564 		if (!BP_IS_GANG(gbp))
1565 			continue;
1566 		zio_gang_tree_assemble(gio, gbp, &gn->gn_child[g]);
1567 	}
1568 }
1569 
1570 static void
1571 zio_gang_tree_issue(zio_t *pio, zio_gang_node_t *gn, blkptr_t *bp, void *data)
1572 {
1573 	zio_t *gio = pio->io_gang_leader;
1574 	zio_t *zio;
1575 
1576 	ASSERT(BP_IS_GANG(bp) == !!gn);
1577 	ASSERT(BP_GET_CHECKSUM(bp) == BP_GET_CHECKSUM(gio->io_bp));
1578 	ASSERT(BP_GET_LSIZE(bp) == BP_GET_PSIZE(bp) || gn == gio->io_gang_tree);
1579 
1580 	/*
1581 	 * If you're a gang header, your data is in gn->gn_gbh.
1582 	 * If you're a gang member, your data is in 'data' and gn == NULL.
1583 	 */
1584 	zio = zio_gang_issue_func[gio->io_type](pio, bp, gn, data);
1585 
1586 	if (gn != NULL) {
1587 		ASSERT(gn->gn_gbh->zg_tail.zec_magic == ZEC_MAGIC);
1588 
1589 		for (int g = 0; g < SPA_GBH_NBLKPTRS; g++) {
1590 			blkptr_t *gbp = &gn->gn_gbh->zg_blkptr[g];
1591 			if (BP_IS_HOLE(gbp))
1592 				continue;
1593 			zio_gang_tree_issue(zio, gn->gn_child[g], gbp, data);
1594 			data = (char *)data + BP_GET_PSIZE(gbp);
1595 		}
1596 	}
1597 
1598 	if (gn == gio->io_gang_tree)
1599 		ASSERT3P((char *)gio->io_data + gio->io_size, ==, data);
1600 
1601 	if (zio != pio)
1602 		zio_nowait(zio);
1603 }
1604 
1605 static int
1606 zio_gang_assemble(zio_t *zio)
1607 {
1608 	blkptr_t *bp = zio->io_bp;
1609 
1610 	ASSERT(BP_IS_GANG(bp) && zio->io_gang_leader == NULL);
1611 	ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
1612 
1613 	zio->io_gang_leader = zio;
1614 
1615 	zio_gang_tree_assemble(zio, bp, &zio->io_gang_tree);
1616 
1617 	return (ZIO_PIPELINE_CONTINUE);
1618 }
1619 
1620 static int
1621 zio_gang_issue(zio_t *zio)
1622 {
1623 	blkptr_t *bp = zio->io_bp;
1624 
1625 	if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_DONE))
1626 		return (ZIO_PIPELINE_STOP);
1627 
1628 	ASSERT(BP_IS_GANG(bp) && zio->io_gang_leader == zio);
1629 	ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
1630 
1631 	if (zio->io_child_error[ZIO_CHILD_GANG] == 0)
1632 		zio_gang_tree_issue(zio, zio->io_gang_tree, bp, zio->io_data);
1633 	else
1634 		zio_gang_tree_free(&zio->io_gang_tree);
1635 
1636 	zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
1637 
1638 	return (ZIO_PIPELINE_CONTINUE);
1639 }
1640 
1641 static void
1642 zio_write_gang_member_ready(zio_t *zio)
1643 {
1644 	zio_t *pio = zio_unique_parent(zio);
1645 	zio_t *gio = zio->io_gang_leader;
1646 	dva_t *cdva = zio->io_bp->blk_dva;
1647 	dva_t *pdva = pio->io_bp->blk_dva;
1648 	uint64_t asize;
1649 
1650 	if (BP_IS_HOLE(zio->io_bp))
1651 		return;
1652 
1653 	ASSERT(BP_IS_HOLE(&zio->io_bp_orig));
1654 
1655 	ASSERT(zio->io_child_type == ZIO_CHILD_GANG);
1656 	ASSERT3U(zio->io_prop.zp_copies, ==, gio->io_prop.zp_copies);
1657 	ASSERT3U(zio->io_prop.zp_copies, <=, BP_GET_NDVAS(zio->io_bp));
1658 	ASSERT3U(pio->io_prop.zp_copies, <=, BP_GET_NDVAS(pio->io_bp));
1659 	ASSERT3U(BP_GET_NDVAS(zio->io_bp), <=, BP_GET_NDVAS(pio->io_bp));
1660 
1661 	mutex_enter(&pio->io_lock);
1662 	for (int d = 0; d < BP_GET_NDVAS(zio->io_bp); d++) {
1663 		ASSERT(DVA_GET_GANG(&pdva[d]));
1664 		asize = DVA_GET_ASIZE(&pdva[d]);
1665 		asize += DVA_GET_ASIZE(&cdva[d]);
1666 		DVA_SET_ASIZE(&pdva[d], asize);
1667 	}
1668 	mutex_exit(&pio->io_lock);
1669 }
1670 
1671 static int
1672 zio_write_gang_block(zio_t *pio)
1673 {
1674 	spa_t *spa = pio->io_spa;
1675 	blkptr_t *bp = pio->io_bp;
1676 	zio_t *gio = pio->io_gang_leader;
1677 	zio_t *zio;
1678 	zio_gang_node_t *gn, **gnpp;
1679 	zio_gbh_phys_t *gbh;
1680 	uint64_t txg = pio->io_txg;
1681 	uint64_t resid = pio->io_size;
1682 	uint64_t lsize;
1683 	int copies = gio->io_prop.zp_copies;
1684 	int gbh_copies = MIN(copies + 1, spa_max_replication(spa));
1685 	zio_prop_t zp;
1686 	int error;
1687 
1688 	error = metaslab_alloc(spa, spa_normal_class(spa), SPA_GANGBLOCKSIZE,
1689 	    bp, gbh_copies, txg, pio == gio ? NULL : gio->io_bp,
1690 	    METASLAB_HINTBP_FAVOR | METASLAB_GANG_HEADER);
1691 	if (error) {
1692 		pio->io_error = error;
1693 		return (ZIO_PIPELINE_CONTINUE);
1694 	}
1695 
1696 	if (pio == gio) {
1697 		gnpp = &gio->io_gang_tree;
1698 	} else {
1699 		gnpp = pio->io_private;
1700 		ASSERT(pio->io_ready == zio_write_gang_member_ready);
1701 	}
1702 
1703 	gn = zio_gang_node_alloc(gnpp);
1704 	gbh = gn->gn_gbh;
1705 	bzero(gbh, SPA_GANGBLOCKSIZE);
1706 
1707 	/*
1708 	 * Create the gang header.
1709 	 */
1710 	zio = zio_rewrite(pio, spa, txg, bp, gbh, SPA_GANGBLOCKSIZE, NULL, NULL,
1711 	    pio->io_priority, ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark);
1712 
1713 	/*
1714 	 * Create and nowait the gang children.
1715 	 */
1716 	for (int g = 0; resid != 0; resid -= lsize, g++) {
1717 		lsize = P2ROUNDUP(resid / (SPA_GBH_NBLKPTRS - g),
1718 		    SPA_MINBLOCKSIZE);
1719 		ASSERT(lsize >= SPA_MINBLOCKSIZE && lsize <= resid);
1720 
1721 		zp.zp_checksum = gio->io_prop.zp_checksum;
1722 		zp.zp_compress = ZIO_COMPRESS_OFF;
1723 		zp.zp_type = DMU_OT_NONE;
1724 		zp.zp_level = 0;
1725 		zp.zp_copies = gio->io_prop.zp_copies;
1726 		zp.zp_dedup = 0;
1727 		zp.zp_dedup_verify = 0;
1728 
1729 		zio_nowait(zio_write(zio, spa, txg, &gbh->zg_blkptr[g],
1730 		    (char *)pio->io_data + (pio->io_size - resid), lsize, &zp,
1731 		    zio_write_gang_member_ready, NULL, &gn->gn_child[g],
1732 		    pio->io_priority, ZIO_GANG_CHILD_FLAGS(pio),
1733 		    &pio->io_bookmark));
1734 	}
1735 
1736 	/*
1737 	 * Set pio's pipeline to just wait for zio to finish.
1738 	 */
1739 	pio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
1740 
1741 	zio_nowait(zio);
1742 
1743 	return (ZIO_PIPELINE_CONTINUE);
1744 }
1745 
1746 /*
1747  * ==========================================================================
1748  * Dedup
1749  * ==========================================================================
1750  */
1751 static void
1752 zio_ddt_child_read_done(zio_t *zio)
1753 {
1754 	blkptr_t *bp = zio->io_bp;
1755 	ddt_entry_t *dde = zio->io_private;
1756 	ddt_phys_t *ddp;
1757 	zio_t *pio = zio_unique_parent(zio);
1758 
1759 	mutex_enter(&pio->io_lock);
1760 	ddp = ddt_phys_select(dde, bp);
1761 	if (zio->io_error == 0)
1762 		ddt_phys_clear(ddp);	/* this ddp doesn't need repair */
1763 	if (zio->io_error == 0 && dde->dde_repair_data == NULL)
1764 		dde->dde_repair_data = zio->io_data;
1765 	else
1766 		zio_buf_free(zio->io_data, zio->io_size);
1767 	mutex_exit(&pio->io_lock);
1768 }
1769 
1770 static int
1771 zio_ddt_read_start(zio_t *zio)
1772 {
1773 	blkptr_t *bp = zio->io_bp;
1774 
1775 	ASSERT(BP_GET_DEDUP(bp));
1776 	ASSERT(BP_GET_PSIZE(bp) == zio->io_size);
1777 	ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
1778 
1779 	if (zio->io_child_error[ZIO_CHILD_DDT]) {
1780 		ddt_t *ddt = ddt_select(zio->io_spa, bp);
1781 		ddt_entry_t *dde = ddt_repair_start(ddt, bp);
1782 		ddt_phys_t *ddp = dde->dde_phys;
1783 		ddt_phys_t *ddp_self = ddt_phys_select(dde, bp);
1784 		blkptr_t blk;
1785 
1786 		ASSERT(zio->io_vsd == NULL);
1787 		zio->io_vsd = dde;
1788 
1789 		if (ddp_self == NULL)
1790 			return (ZIO_PIPELINE_CONTINUE);
1791 
1792 		for (int p = 0; p < DDT_PHYS_TYPES; p++, ddp++) {
1793 			if (ddp->ddp_phys_birth == 0 || ddp == ddp_self)
1794 				continue;
1795 			ddt_bp_create(ddt->ddt_checksum, &dde->dde_key, ddp,
1796 			    &blk);
1797 			zio_nowait(zio_read(zio, zio->io_spa, &blk,
1798 			    zio_buf_alloc(zio->io_size), zio->io_size,
1799 			    zio_ddt_child_read_done, dde, zio->io_priority,
1800 			    ZIO_DDT_CHILD_FLAGS(zio) | ZIO_FLAG_DONT_PROPAGATE,
1801 			    &zio->io_bookmark));
1802 		}
1803 		return (ZIO_PIPELINE_CONTINUE);
1804 	}
1805 
1806 	zio_nowait(zio_read(zio, zio->io_spa, bp,
1807 	    zio->io_data, zio->io_size, NULL, NULL, zio->io_priority,
1808 	    ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark));
1809 
1810 	return (ZIO_PIPELINE_CONTINUE);
1811 }
1812 
1813 static int
1814 zio_ddt_read_done(zio_t *zio)
1815 {
1816 	blkptr_t *bp = zio->io_bp;
1817 
1818 	if (zio_wait_for_children(zio, ZIO_CHILD_DDT, ZIO_WAIT_DONE))
1819 		return (ZIO_PIPELINE_STOP);
1820 
1821 	ASSERT(BP_GET_DEDUP(bp));
1822 	ASSERT(BP_GET_PSIZE(bp) == zio->io_size);
1823 	ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
1824 
1825 	if (zio->io_child_error[ZIO_CHILD_DDT]) {
1826 		ddt_t *ddt = ddt_select(zio->io_spa, bp);
1827 		ddt_entry_t *dde = zio->io_vsd;
1828 		if (ddt == NULL) {
1829 			ASSERT(spa_load_state(zio->io_spa) != SPA_LOAD_NONE);
1830 			return (ZIO_PIPELINE_CONTINUE);
1831 		}
1832 		if (dde == NULL) {
1833 			zio->io_stage = ZIO_STAGE_DDT_READ_START >> 1;
1834 			zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, B_FALSE);
1835 			return (ZIO_PIPELINE_STOP);
1836 		}
1837 		if (dde->dde_repair_data != NULL) {
1838 			bcopy(dde->dde_repair_data, zio->io_data, zio->io_size);
1839 			zio->io_child_error[ZIO_CHILD_DDT] = 0;
1840 		}
1841 		ddt_repair_done(ddt, dde);
1842 		zio->io_vsd = NULL;
1843 	}
1844 
1845 	ASSERT(zio->io_vsd == NULL);
1846 
1847 	return (ZIO_PIPELINE_CONTINUE);
1848 }
1849 
1850 static boolean_t
1851 zio_ddt_collision(zio_t *zio, ddt_t *ddt, ddt_entry_t *dde)
1852 {
1853 	spa_t *spa = zio->io_spa;
1854 
1855 	/*
1856 	 * Note: we compare the original data, not the transformed data,
1857 	 * because when zio->io_bp is an override bp, we will not have
1858 	 * pushed the I/O transforms.  That's an important optimization
1859 	 * because otherwise we'd compress/encrypt all dmu_sync() data twice.
1860 	 */
1861 	for (int p = DDT_PHYS_SINGLE; p <= DDT_PHYS_TRIPLE; p++) {
1862 		zio_t *lio = dde->dde_lead_zio[p];
1863 
1864 		if (lio != NULL) {
1865 			return (lio->io_orig_size != zio->io_orig_size ||
1866 			    bcmp(zio->io_orig_data, lio->io_orig_data,
1867 			    zio->io_orig_size) != 0);
1868 		}
1869 	}
1870 
1871 	for (int p = DDT_PHYS_SINGLE; p <= DDT_PHYS_TRIPLE; p++) {
1872 		ddt_phys_t *ddp = &dde->dde_phys[p];
1873 
1874 		if (ddp->ddp_phys_birth != 0) {
1875 			arc_buf_t *abuf = NULL;
1876 			uint32_t aflags = ARC_WAIT;
1877 			blkptr_t blk = *zio->io_bp;
1878 			int error;
1879 
1880 			ddt_bp_fill(ddp, &blk, ddp->ddp_phys_birth);
1881 
1882 			ddt_exit(ddt);
1883 
1884 			error = arc_read_nolock(NULL, spa, &blk,
1885 			    arc_getbuf_func, &abuf, ZIO_PRIORITY_SYNC_READ,
1886 			    ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE,
1887 			    &aflags, &zio->io_bookmark);
1888 
1889 			if (error == 0) {
1890 				if (arc_buf_size(abuf) != zio->io_orig_size ||
1891 				    bcmp(abuf->b_data, zio->io_orig_data,
1892 				    zio->io_orig_size) != 0)
1893 					error = EEXIST;
1894 				VERIFY(arc_buf_remove_ref(abuf, &abuf) == 1);
1895 			}
1896 
1897 			ddt_enter(ddt);
1898 			return (error != 0);
1899 		}
1900 	}
1901 
1902 	return (B_FALSE);
1903 }
1904 
1905 static void
1906 zio_ddt_child_write_ready(zio_t *zio)
1907 {
1908 	int p = zio->io_prop.zp_copies;
1909 	ddt_t *ddt = ddt_select(zio->io_spa, zio->io_bp);
1910 	ddt_entry_t *dde = zio->io_private;
1911 	ddt_phys_t *ddp = &dde->dde_phys[p];
1912 	zio_t *pio;
1913 
1914 	if (zio->io_error)
1915 		return;
1916 
1917 	ddt_enter(ddt);
1918 
1919 	ASSERT(dde->dde_lead_zio[p] == zio);
1920 
1921 	ddt_phys_fill(ddp, zio->io_bp);
1922 
1923 	while ((pio = zio_walk_parents(zio)) != NULL)
1924 		ddt_bp_fill(ddp, pio->io_bp, zio->io_txg);
1925 
1926 	ddt_exit(ddt);
1927 }
1928 
1929 static void
1930 zio_ddt_child_write_done(zio_t *zio)
1931 {
1932 	int p = zio->io_prop.zp_copies;
1933 	ddt_t *ddt = ddt_select(zio->io_spa, zio->io_bp);
1934 	ddt_entry_t *dde = zio->io_private;
1935 	ddt_phys_t *ddp = &dde->dde_phys[p];
1936 
1937 	ddt_enter(ddt);
1938 
1939 	ASSERT(ddp->ddp_refcnt == 0);
1940 	ASSERT(dde->dde_lead_zio[p] == zio);
1941 	dde->dde_lead_zio[p] = NULL;
1942 
1943 	if (zio->io_error == 0) {
1944 		while (zio_walk_parents(zio) != NULL)
1945 			ddt_phys_addref(ddp);
1946 	} else {
1947 		ddt_phys_clear(ddp);
1948 	}
1949 
1950 	ddt_exit(ddt);
1951 }
1952 
1953 static void
1954 zio_ddt_ditto_write_done(zio_t *zio)
1955 {
1956 	int p = DDT_PHYS_DITTO;
1957 	zio_prop_t *zp = &zio->io_prop;
1958 	blkptr_t *bp = zio->io_bp;
1959 	ddt_t *ddt = ddt_select(zio->io_spa, bp);
1960 	ddt_entry_t *dde = zio->io_private;
1961 	ddt_phys_t *ddp = &dde->dde_phys[p];
1962 	ddt_key_t *ddk = &dde->dde_key;
1963 
1964 	ddt_enter(ddt);
1965 
1966 	ASSERT(ddp->ddp_refcnt == 0);
1967 	ASSERT(dde->dde_lead_zio[p] == zio);
1968 	dde->dde_lead_zio[p] = NULL;
1969 
1970 	if (zio->io_error == 0) {
1971 		ASSERT(ZIO_CHECKSUM_EQUAL(bp->blk_cksum, ddk->ddk_cksum));
1972 		ASSERT(zp->zp_copies < SPA_DVAS_PER_BP);
1973 		ASSERT(zp->zp_copies == BP_GET_NDVAS(bp) - BP_IS_GANG(bp));
1974 		if (ddp->ddp_phys_birth != 0)
1975 			ddt_phys_free(ddt, ddk, ddp, zio->io_txg);
1976 		ddt_phys_fill(ddp, bp);
1977 	}
1978 
1979 	ddt_exit(ddt);
1980 }
1981 
1982 static int
1983 zio_ddt_write(zio_t *zio)
1984 {
1985 	spa_t *spa = zio->io_spa;
1986 	blkptr_t *bp = zio->io_bp;
1987 	uint64_t txg = zio->io_txg;
1988 	zio_prop_t *zp = &zio->io_prop;
1989 	int p = zp->zp_copies;
1990 	int ditto_copies;
1991 	zio_t *cio = NULL;
1992 	zio_t *dio = NULL;
1993 	ddt_t *ddt = ddt_select(spa, bp);
1994 	ddt_entry_t *dde;
1995 	ddt_phys_t *ddp;
1996 
1997 	ASSERT(BP_GET_DEDUP(bp));
1998 	ASSERT(BP_GET_CHECKSUM(bp) == zp->zp_checksum);
1999 	ASSERT(BP_IS_HOLE(bp) || zio->io_bp_override);
2000 
2001 	ddt_enter(ddt);
2002 	dde = ddt_lookup(ddt, bp, B_TRUE);
2003 	ddp = &dde->dde_phys[p];
2004 
2005 	if (zp->zp_dedup_verify && zio_ddt_collision(zio, ddt, dde)) {
2006 		/*
2007 		 * If we're using a weak checksum, upgrade to a strong checksum
2008 		 * and try again.  If we're already using a strong checksum,
2009 		 * we can't resolve it, so just convert to an ordinary write.
2010 		 * (And automatically e-mail a paper to Nature?)
2011 		 */
2012 		if (!zio_checksum_table[zp->zp_checksum].ci_dedup) {
2013 			zp->zp_checksum = spa_dedup_checksum(spa);
2014 			zio_pop_transforms(zio);
2015 			zio->io_stage = ZIO_STAGE_OPEN;
2016 			BP_ZERO(bp);
2017 		} else {
2018 			zp->zp_dedup = 0;
2019 		}
2020 		zio->io_pipeline = ZIO_WRITE_PIPELINE;
2021 		ddt_exit(ddt);
2022 		return (ZIO_PIPELINE_CONTINUE);
2023 	}
2024 
2025 	ditto_copies = ddt_ditto_copies_needed(ddt, dde, ddp);
2026 	ASSERT(ditto_copies < SPA_DVAS_PER_BP);
2027 
2028 	if (ditto_copies > ddt_ditto_copies_present(dde) &&
2029 	    dde->dde_lead_zio[DDT_PHYS_DITTO] == NULL) {
2030 		zio_prop_t czp = *zp;
2031 
2032 		czp.zp_copies = ditto_copies;
2033 
2034 		/*
2035 		 * If we arrived here with an override bp, we won't have run
2036 		 * the transform stack, so we won't have the data we need to
2037 		 * generate a child i/o.  So, toss the override bp and restart.
2038 		 * This is safe, because using the override bp is just an
2039 		 * optimization; and it's rare, so the cost doesn't matter.
2040 		 */
2041 		if (zio->io_bp_override) {
2042 			zio_pop_transforms(zio);
2043 			zio->io_stage = ZIO_STAGE_OPEN;
2044 			zio->io_pipeline = ZIO_WRITE_PIPELINE;
2045 			zio->io_bp_override = NULL;
2046 			BP_ZERO(bp);
2047 			ddt_exit(ddt);
2048 			return (ZIO_PIPELINE_CONTINUE);
2049 		}
2050 
2051 		dio = zio_write(zio, spa, txg, bp, zio->io_orig_data,
2052 		    zio->io_orig_size, &czp, NULL,
2053 		    zio_ddt_ditto_write_done, dde, zio->io_priority,
2054 		    ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark);
2055 
2056 		zio_push_transform(dio, zio->io_data, zio->io_size, 0, NULL);
2057 		dde->dde_lead_zio[DDT_PHYS_DITTO] = dio;
2058 	}
2059 
2060 	if (ddp->ddp_phys_birth != 0 || dde->dde_lead_zio[p] != NULL) {
2061 		if (ddp->ddp_phys_birth != 0)
2062 			ddt_bp_fill(ddp, bp, txg);
2063 		if (dde->dde_lead_zio[p] != NULL)
2064 			zio_add_child(zio, dde->dde_lead_zio[p]);
2065 		else
2066 			ddt_phys_addref(ddp);
2067 	} else if (zio->io_bp_override) {
2068 		ASSERT(bp->blk_birth == txg);
2069 		ASSERT(BP_EQUAL(bp, zio->io_bp_override));
2070 		ddt_phys_fill(ddp, bp);
2071 		ddt_phys_addref(ddp);
2072 	} else {
2073 		cio = zio_write(zio, spa, txg, bp, zio->io_orig_data,
2074 		    zio->io_orig_size, zp, zio_ddt_child_write_ready,
2075 		    zio_ddt_child_write_done, dde, zio->io_priority,
2076 		    ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark);
2077 
2078 		zio_push_transform(cio, zio->io_data, zio->io_size, 0, NULL);
2079 		dde->dde_lead_zio[p] = cio;
2080 	}
2081 
2082 	ddt_exit(ddt);
2083 
2084 	if (cio)
2085 		zio_nowait(cio);
2086 	if (dio)
2087 		zio_nowait(dio);
2088 
2089 	return (ZIO_PIPELINE_CONTINUE);
2090 }
2091 
2092 static int
2093 zio_ddt_free(zio_t *zio)
2094 {
2095 	spa_t *spa = zio->io_spa;
2096 	blkptr_t *bp = zio->io_bp;
2097 	ddt_t *ddt = ddt_select(spa, bp);
2098 	ddt_entry_t *dde;
2099 	ddt_phys_t *ddp;
2100 
2101 	ASSERT(BP_GET_DEDUP(bp));
2102 	ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
2103 
2104 	ddt_enter(ddt);
2105 	dde = ddt_lookup(ddt, bp, B_TRUE);
2106 	ddp = ddt_phys_select(dde, bp);
2107 	ddt_phys_decref(ddp);
2108 	ddt_exit(ddt);
2109 
2110 	return (ZIO_PIPELINE_CONTINUE);
2111 }
2112 
2113 /*
2114  * ==========================================================================
2115  * Allocate and free blocks
2116  * ==========================================================================
2117  */
2118 static int
2119 zio_dva_allocate(zio_t *zio)
2120 {
2121 	spa_t *spa = zio->io_spa;
2122 	metaslab_class_t *mc = spa_normal_class(spa);
2123 	blkptr_t *bp = zio->io_bp;
2124 	int error;
2125 
2126 	if (zio->io_gang_leader == NULL) {
2127 		ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
2128 		zio->io_gang_leader = zio;
2129 	}
2130 
2131 	ASSERT(BP_IS_HOLE(bp));
2132 	ASSERT3U(BP_GET_NDVAS(bp), ==, 0);
2133 	ASSERT3U(zio->io_prop.zp_copies, >, 0);
2134 	ASSERT3U(zio->io_prop.zp_copies, <=, spa_max_replication(spa));
2135 	ASSERT3U(zio->io_size, ==, BP_GET_PSIZE(bp));
2136 
2137 	error = metaslab_alloc(spa, mc, zio->io_size, bp,
2138 	    zio->io_prop.zp_copies, zio->io_txg, NULL, 0);
2139 
2140 	if (error) {
2141 		if (error == ENOSPC && zio->io_size > SPA_MINBLOCKSIZE)
2142 			return (zio_write_gang_block(zio));
2143 		zio->io_error = error;
2144 	}
2145 
2146 	return (ZIO_PIPELINE_CONTINUE);
2147 }
2148 
2149 static int
2150 zio_dva_free(zio_t *zio)
2151 {
2152 	metaslab_free(zio->io_spa, zio->io_bp, zio->io_txg, B_FALSE);
2153 
2154 	return (ZIO_PIPELINE_CONTINUE);
2155 }
2156 
2157 static int
2158 zio_dva_claim(zio_t *zio)
2159 {
2160 	int error;
2161 
2162 	error = metaslab_claim(zio->io_spa, zio->io_bp, zio->io_txg);
2163 	if (error)
2164 		zio->io_error = error;
2165 
2166 	return (ZIO_PIPELINE_CONTINUE);
2167 }
2168 
2169 /*
2170  * Undo an allocation.  This is used by zio_done() when an I/O fails
2171  * and we want to give back the block we just allocated.
2172  * This handles both normal blocks and gang blocks.
2173  */
2174 static void
2175 zio_dva_unallocate(zio_t *zio, zio_gang_node_t *gn, blkptr_t *bp)
2176 {
2177 	ASSERT(bp->blk_birth == zio->io_txg || BP_IS_HOLE(bp));
2178 	ASSERT(zio->io_bp_override == NULL);
2179 
2180 	if (!BP_IS_HOLE(bp))
2181 		metaslab_free(zio->io_spa, bp, bp->blk_birth, B_TRUE);
2182 
2183 	if (gn != NULL) {
2184 		for (int g = 0; g < SPA_GBH_NBLKPTRS; g++) {
2185 			zio_dva_unallocate(zio, gn->gn_child[g],
2186 			    &gn->gn_gbh->zg_blkptr[g]);
2187 		}
2188 	}
2189 }
2190 
2191 /*
2192  * Try to allocate an intent log block.  Return 0 on success, errno on failure.
2193  */
2194 int
2195 zio_alloc_zil(spa_t *spa, uint64_t txg, blkptr_t *new_bp, blkptr_t *old_bp,
2196     uint64_t size, boolean_t use_slog)
2197 {
2198 	int error = 1;
2199 
2200 	ASSERT(txg > spa_syncing_txg(spa));
2201 
2202 	if (use_slog)
2203 		error = metaslab_alloc(spa, spa_log_class(spa), size,
2204 		    new_bp, 1, txg, old_bp, METASLAB_HINTBP_AVOID);
2205 
2206 	if (error)
2207 		error = metaslab_alloc(spa, spa_normal_class(spa), size,
2208 		    new_bp, 1, txg, old_bp, METASLAB_HINTBP_AVOID);
2209 
2210 	if (error == 0) {
2211 		BP_SET_LSIZE(new_bp, size);
2212 		BP_SET_PSIZE(new_bp, size);
2213 		BP_SET_COMPRESS(new_bp, ZIO_COMPRESS_OFF);
2214 		BP_SET_CHECKSUM(new_bp,
2215 		    spa_version(spa) >= SPA_VERSION_SLIM_ZIL
2216 		    ? ZIO_CHECKSUM_ZILOG2 : ZIO_CHECKSUM_ZILOG);
2217 		BP_SET_TYPE(new_bp, DMU_OT_INTENT_LOG);
2218 		BP_SET_LEVEL(new_bp, 0);
2219 		BP_SET_DEDUP(new_bp, 0);
2220 		BP_SET_BYTEORDER(new_bp, ZFS_HOST_BYTEORDER);
2221 	}
2222 
2223 	return (error);
2224 }
2225 
2226 /*
2227  * Free an intent log block.
2228  */
2229 void
2230 zio_free_zil(spa_t *spa, uint64_t txg, blkptr_t *bp)
2231 {
2232 	ASSERT(BP_GET_TYPE(bp) == DMU_OT_INTENT_LOG);
2233 	ASSERT(!BP_IS_GANG(bp));
2234 
2235 	zio_free(spa, txg, bp);
2236 }
2237 
2238 /*
2239  * ==========================================================================
2240  * Read and write to physical devices
2241  * ==========================================================================
2242  */
2243 static int
2244 zio_vdev_io_start(zio_t *zio)
2245 {
2246 	vdev_t *vd = zio->io_vd;
2247 	uint64_t align;
2248 	spa_t *spa = zio->io_spa;
2249 
2250 	ASSERT(zio->io_error == 0);
2251 	ASSERT(zio->io_child_error[ZIO_CHILD_VDEV] == 0);
2252 
2253 	if (vd == NULL) {
2254 		if (!(zio->io_flags & ZIO_FLAG_CONFIG_WRITER))
2255 			spa_config_enter(spa, SCL_ZIO, zio, RW_READER);
2256 
2257 		/*
2258 		 * The mirror_ops handle multiple DVAs in a single BP.
2259 		 */
2260 		return (vdev_mirror_ops.vdev_op_io_start(zio));
2261 	}
2262 
2263 	align = 1ULL << vd->vdev_top->vdev_ashift;
2264 
2265 	if (P2PHASE(zio->io_size, align) != 0) {
2266 		uint64_t asize = P2ROUNDUP(zio->io_size, align);
2267 		char *abuf = zio_buf_alloc(asize);
2268 		ASSERT(vd == vd->vdev_top);
2269 		if (zio->io_type == ZIO_TYPE_WRITE) {
2270 			bcopy(zio->io_data, abuf, zio->io_size);
2271 			bzero(abuf + zio->io_size, asize - zio->io_size);
2272 		}
2273 		zio_push_transform(zio, abuf, asize, asize, zio_subblock);
2274 	}
2275 
2276 	ASSERT(P2PHASE(zio->io_offset, align) == 0);
2277 	ASSERT(P2PHASE(zio->io_size, align) == 0);
2278 	ASSERT(zio->io_type != ZIO_TYPE_WRITE || spa_writeable(spa));
2279 
2280 	/*
2281 	 * If this is a repair I/O, and there's no self-healing involved --
2282 	 * that is, we're just resilvering what we expect to resilver --
2283 	 * then don't do the I/O unless zio's txg is actually in vd's DTL.
2284 	 * This prevents spurious resilvering with nested replication.
2285 	 * For example, given a mirror of mirrors, (A+B)+(C+D), if only
2286 	 * A is out of date, we'll read from C+D, then use the data to
2287 	 * resilver A+B -- but we don't actually want to resilver B, just A.
2288 	 * The top-level mirror has no way to know this, so instead we just
2289 	 * discard unnecessary repairs as we work our way down the vdev tree.
2290 	 * The same logic applies to any form of nested replication:
2291 	 * ditto + mirror, RAID-Z + replacing, etc.  This covers them all.
2292 	 */
2293 	if ((zio->io_flags & ZIO_FLAG_IO_REPAIR) &&
2294 	    !(zio->io_flags & ZIO_FLAG_SELF_HEAL) &&
2295 	    zio->io_txg != 0 &&	/* not a delegated i/o */
2296 	    !vdev_dtl_contains(vd, DTL_PARTIAL, zio->io_txg, 1)) {
2297 		ASSERT(zio->io_type == ZIO_TYPE_WRITE);
2298 		zio_vdev_io_bypass(zio);
2299 		return (ZIO_PIPELINE_CONTINUE);
2300 	}
2301 
2302 	if (vd->vdev_ops->vdev_op_leaf &&
2303 	    (zio->io_type == ZIO_TYPE_READ || zio->io_type == ZIO_TYPE_WRITE)) {
2304 
2305 		if (zio->io_type == ZIO_TYPE_READ && vdev_cache_read(zio) == 0)
2306 			return (ZIO_PIPELINE_CONTINUE);
2307 
2308 		if ((zio = vdev_queue_io(zio)) == NULL)
2309 			return (ZIO_PIPELINE_STOP);
2310 
2311 		if (!vdev_accessible(vd, zio)) {
2312 			zio->io_error = ENXIO;
2313 			zio_interrupt(zio);
2314 			return (ZIO_PIPELINE_STOP);
2315 		}
2316 	}
2317 
2318 	return (vd->vdev_ops->vdev_op_io_start(zio));
2319 }
2320 
2321 static int
2322 zio_vdev_io_done(zio_t *zio)
2323 {
2324 	vdev_t *vd = zio->io_vd;
2325 	vdev_ops_t *ops = vd ? vd->vdev_ops : &vdev_mirror_ops;
2326 	boolean_t unexpected_error = B_FALSE;
2327 
2328 	if (zio_wait_for_children(zio, ZIO_CHILD_VDEV, ZIO_WAIT_DONE))
2329 		return (ZIO_PIPELINE_STOP);
2330 
2331 	ASSERT(zio->io_type == ZIO_TYPE_READ || zio->io_type == ZIO_TYPE_WRITE);
2332 
2333 	if (vd != NULL && vd->vdev_ops->vdev_op_leaf) {
2334 
2335 		vdev_queue_io_done(zio);
2336 
2337 		if (zio->io_type == ZIO_TYPE_WRITE)
2338 			vdev_cache_write(zio);
2339 
2340 		if (zio_injection_enabled && zio->io_error == 0)
2341 			zio->io_error = zio_handle_device_injection(vd,
2342 			    zio, EIO);
2343 
2344 		if (zio_injection_enabled && zio->io_error == 0)
2345 			zio->io_error = zio_handle_label_injection(zio, EIO);
2346 
2347 		if (zio->io_error) {
2348 			if (!vdev_accessible(vd, zio)) {
2349 				zio->io_error = ENXIO;
2350 			} else {
2351 				unexpected_error = B_TRUE;
2352 			}
2353 		}
2354 	}
2355 
2356 	ops->vdev_op_io_done(zio);
2357 
2358 	if (unexpected_error)
2359 		VERIFY(vdev_probe(vd, zio) == NULL);
2360 
2361 	return (ZIO_PIPELINE_CONTINUE);
2362 }
2363 
2364 /*
2365  * For non-raidz ZIOs, we can just copy aside the bad data read from the
2366  * disk, and use that to finish the checksum ereport later.
2367  */
2368 static void
2369 zio_vsd_default_cksum_finish(zio_cksum_report_t *zcr,
2370     const void *good_buf)
2371 {
2372 	/* no processing needed */
2373 	zfs_ereport_finish_checksum(zcr, good_buf, zcr->zcr_cbdata, B_FALSE);
2374 }
2375 
2376 /*ARGSUSED*/
2377 void
2378 zio_vsd_default_cksum_report(zio_t *zio, zio_cksum_report_t *zcr, void *ignored)
2379 {
2380 	void *buf = zio_buf_alloc(zio->io_size);
2381 
2382 	bcopy(zio->io_data, buf, zio->io_size);
2383 
2384 	zcr->zcr_cbinfo = zio->io_size;
2385 	zcr->zcr_cbdata = buf;
2386 	zcr->zcr_finish = zio_vsd_default_cksum_finish;
2387 	zcr->zcr_free = zio_buf_free;
2388 }
2389 
2390 static int
2391 zio_vdev_io_assess(zio_t *zio)
2392 {
2393 	vdev_t *vd = zio->io_vd;
2394 
2395 	if (zio_wait_for_children(zio, ZIO_CHILD_VDEV, ZIO_WAIT_DONE))
2396 		return (ZIO_PIPELINE_STOP);
2397 
2398 	if (vd == NULL && !(zio->io_flags & ZIO_FLAG_CONFIG_WRITER))
2399 		spa_config_exit(zio->io_spa, SCL_ZIO, zio);
2400 
2401 	if (zio->io_vsd != NULL) {
2402 		zio->io_vsd_ops->vsd_free(zio);
2403 		zio->io_vsd = NULL;
2404 	}
2405 
2406 	if (zio_injection_enabled && zio->io_error == 0)
2407 		zio->io_error = zio_handle_fault_injection(zio, EIO);
2408 
2409 	/*
2410 	 * If the I/O failed, determine whether we should attempt to retry it.
2411 	 *
2412 	 * On retry, we cut in line in the issue queue, since we don't want
2413 	 * compression/checksumming/etc. work to prevent our (cheap) IO reissue.
2414 	 */
2415 	if (zio->io_error && vd == NULL &&
2416 	    !(zio->io_flags & (ZIO_FLAG_DONT_RETRY | ZIO_FLAG_IO_RETRY))) {
2417 		ASSERT(!(zio->io_flags & ZIO_FLAG_DONT_QUEUE));	/* not a leaf */
2418 		ASSERT(!(zio->io_flags & ZIO_FLAG_IO_BYPASS));	/* not a leaf */
2419 		zio->io_error = 0;
2420 		zio->io_flags |= ZIO_FLAG_IO_RETRY |
2421 		    ZIO_FLAG_DONT_CACHE | ZIO_FLAG_DONT_AGGREGATE;
2422 		zio->io_stage = ZIO_STAGE_VDEV_IO_START >> 1;
2423 		zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE,
2424 		    zio_requeue_io_start_cut_in_line);
2425 		return (ZIO_PIPELINE_STOP);
2426 	}
2427 
2428 	/*
2429 	 * If we got an error on a leaf device, convert it to ENXIO
2430 	 * if the device is not accessible at all.
2431 	 */
2432 	if (zio->io_error && vd != NULL && vd->vdev_ops->vdev_op_leaf &&
2433 	    !vdev_accessible(vd, zio))
2434 		zio->io_error = ENXIO;
2435 
2436 	/*
2437 	 * If we can't write to an interior vdev (mirror or RAID-Z),
2438 	 * set vdev_cant_write so that we stop trying to allocate from it.
2439 	 */
2440 	if (zio->io_error == ENXIO && zio->io_type == ZIO_TYPE_WRITE &&
2441 	    vd != NULL && !vd->vdev_ops->vdev_op_leaf)
2442 		vd->vdev_cant_write = B_TRUE;
2443 
2444 	if (zio->io_error)
2445 		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
2446 
2447 	return (ZIO_PIPELINE_CONTINUE);
2448 }
2449 
2450 void
2451 zio_vdev_io_reissue(zio_t *zio)
2452 {
2453 	ASSERT(zio->io_stage == ZIO_STAGE_VDEV_IO_START);
2454 	ASSERT(zio->io_error == 0);
2455 
2456 	zio->io_stage >>= 1;
2457 }
2458 
2459 void
2460 zio_vdev_io_redone(zio_t *zio)
2461 {
2462 	ASSERT(zio->io_stage == ZIO_STAGE_VDEV_IO_DONE);
2463 
2464 	zio->io_stage >>= 1;
2465 }
2466 
2467 void
2468 zio_vdev_io_bypass(zio_t *zio)
2469 {
2470 	ASSERT(zio->io_stage == ZIO_STAGE_VDEV_IO_START);
2471 	ASSERT(zio->io_error == 0);
2472 
2473 	zio->io_flags |= ZIO_FLAG_IO_BYPASS;
2474 	zio->io_stage = ZIO_STAGE_VDEV_IO_ASSESS >> 1;
2475 }
2476 
2477 /*
2478  * ==========================================================================
2479  * Generate and verify checksums
2480  * ==========================================================================
2481  */
2482 static int
2483 zio_checksum_generate(zio_t *zio)
2484 {
2485 	blkptr_t *bp = zio->io_bp;
2486 	enum zio_checksum checksum;
2487 
2488 	if (bp == NULL) {
2489 		/*
2490 		 * This is zio_write_phys().
2491 		 * We're either generating a label checksum, or none at all.
2492 		 */
2493 		checksum = zio->io_prop.zp_checksum;
2494 
2495 		if (checksum == ZIO_CHECKSUM_OFF)
2496 			return (ZIO_PIPELINE_CONTINUE);
2497 
2498 		ASSERT(checksum == ZIO_CHECKSUM_LABEL);
2499 	} else {
2500 		if (BP_IS_GANG(bp) && zio->io_child_type == ZIO_CHILD_GANG) {
2501 			ASSERT(!IO_IS_ALLOCATING(zio));
2502 			checksum = ZIO_CHECKSUM_GANG_HEADER;
2503 		} else {
2504 			checksum = BP_GET_CHECKSUM(bp);
2505 		}
2506 	}
2507 
2508 	zio_checksum_compute(zio, checksum, zio->io_data, zio->io_size);
2509 
2510 	return (ZIO_PIPELINE_CONTINUE);
2511 }
2512 
2513 static int
2514 zio_checksum_verify(zio_t *zio)
2515 {
2516 	zio_bad_cksum_t info;
2517 	blkptr_t *bp = zio->io_bp;
2518 	int error;
2519 
2520 	ASSERT(zio->io_vd != NULL);
2521 
2522 	if (bp == NULL) {
2523 		/*
2524 		 * This is zio_read_phys().
2525 		 * We're either verifying a label checksum, or nothing at all.
2526 		 */
2527 		if (zio->io_prop.zp_checksum == ZIO_CHECKSUM_OFF)
2528 			return (ZIO_PIPELINE_CONTINUE);
2529 
2530 		ASSERT(zio->io_prop.zp_checksum == ZIO_CHECKSUM_LABEL);
2531 	}
2532 
2533 	if ((error = zio_checksum_error(zio, &info)) != 0) {
2534 		zio->io_error = error;
2535 		if (!(zio->io_flags & ZIO_FLAG_SPECULATIVE)) {
2536 			zfs_ereport_start_checksum(zio->io_spa,
2537 			    zio->io_vd, zio, zio->io_offset,
2538 			    zio->io_size, NULL, &info);
2539 		}
2540 	}
2541 
2542 	return (ZIO_PIPELINE_CONTINUE);
2543 }
2544 
2545 /*
2546  * Called by RAID-Z to ensure we don't compute the checksum twice.
2547  */
2548 void
2549 zio_checksum_verified(zio_t *zio)
2550 {
2551 	zio->io_pipeline &= ~ZIO_STAGE_CHECKSUM_VERIFY;
2552 }
2553 
2554 /*
2555  * ==========================================================================
2556  * Error rank.  Error are ranked in the order 0, ENXIO, ECKSUM, EIO, other.
2557  * An error of 0 indictes success.  ENXIO indicates whole-device failure,
2558  * which may be transient (e.g. unplugged) or permament.  ECKSUM and EIO
2559  * indicate errors that are specific to one I/O, and most likely permanent.
2560  * Any other error is presumed to be worse because we weren't expecting it.
2561  * ==========================================================================
2562  */
2563 int
2564 zio_worst_error(int e1, int e2)
2565 {
2566 	static int zio_error_rank[] = { 0, ENXIO, ECKSUM, EIO };
2567 	int r1, r2;
2568 
2569 	for (r1 = 0; r1 < sizeof (zio_error_rank) / sizeof (int); r1++)
2570 		if (e1 == zio_error_rank[r1])
2571 			break;
2572 
2573 	for (r2 = 0; r2 < sizeof (zio_error_rank) / sizeof (int); r2++)
2574 		if (e2 == zio_error_rank[r2])
2575 			break;
2576 
2577 	return (r1 > r2 ? e1 : e2);
2578 }
2579 
2580 /*
2581  * ==========================================================================
2582  * I/O completion
2583  * ==========================================================================
2584  */
2585 static int
2586 zio_ready(zio_t *zio)
2587 {
2588 	blkptr_t *bp = zio->io_bp;
2589 	zio_t *pio, *pio_next;
2590 
2591 	if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_READY) ||
2592 	    zio_wait_for_children(zio, ZIO_CHILD_DDT, ZIO_WAIT_READY))
2593 		return (ZIO_PIPELINE_STOP);
2594 
2595 	if (zio->io_ready) {
2596 		ASSERT(IO_IS_ALLOCATING(zio));
2597 		ASSERT(bp->blk_birth == zio->io_txg || BP_IS_HOLE(bp));
2598 		ASSERT(zio->io_children[ZIO_CHILD_GANG][ZIO_WAIT_READY] == 0);
2599 
2600 		zio->io_ready(zio);
2601 	}
2602 
2603 	if (bp != NULL && bp != &zio->io_bp_copy)
2604 		zio->io_bp_copy = *bp;
2605 
2606 	if (zio->io_error)
2607 		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
2608 
2609 	mutex_enter(&zio->io_lock);
2610 	zio->io_state[ZIO_WAIT_READY] = 1;
2611 	pio = zio_walk_parents(zio);
2612 	mutex_exit(&zio->io_lock);
2613 
2614 	/*
2615 	 * As we notify zio's parents, new parents could be added.
2616 	 * New parents go to the head of zio's io_parent_list, however,
2617 	 * so we will (correctly) not notify them.  The remainder of zio's
2618 	 * io_parent_list, from 'pio_next' onward, cannot change because
2619 	 * all parents must wait for us to be done before they can be done.
2620 	 */
2621 	for (; pio != NULL; pio = pio_next) {
2622 		pio_next = zio_walk_parents(zio);
2623 		zio_notify_parent(pio, zio, ZIO_WAIT_READY);
2624 	}
2625 
2626 	if (zio->io_flags & ZIO_FLAG_NODATA) {
2627 		if (BP_IS_GANG(bp)) {
2628 			zio->io_flags &= ~ZIO_FLAG_NODATA;
2629 		} else {
2630 			ASSERT((uintptr_t)zio->io_data < SPA_MAXBLOCKSIZE);
2631 			zio->io_pipeline &= ~ZIO_VDEV_IO_STAGES;
2632 		}
2633 	}
2634 
2635 	if (zio_injection_enabled &&
2636 	    zio->io_spa->spa_syncing_txg == zio->io_txg)
2637 		zio_handle_ignored_writes(zio);
2638 
2639 	return (ZIO_PIPELINE_CONTINUE);
2640 }
2641 
2642 static int
2643 zio_done(zio_t *zio)
2644 {
2645 	spa_t *spa = zio->io_spa;
2646 	zio_t *lio = zio->io_logical;
2647 	blkptr_t *bp = zio->io_bp;
2648 	vdev_t *vd = zio->io_vd;
2649 	uint64_t psize = zio->io_size;
2650 	zio_t *pio, *pio_next;
2651 
2652 	/*
2653 	 * If our children haven't all completed,
2654 	 * wait for them and then repeat this pipeline stage.
2655 	 */
2656 	if (zio_wait_for_children(zio, ZIO_CHILD_VDEV, ZIO_WAIT_DONE) ||
2657 	    zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_DONE) ||
2658 	    zio_wait_for_children(zio, ZIO_CHILD_DDT, ZIO_WAIT_DONE) ||
2659 	    zio_wait_for_children(zio, ZIO_CHILD_LOGICAL, ZIO_WAIT_DONE))
2660 		return (ZIO_PIPELINE_STOP);
2661 
2662 	for (int c = 0; c < ZIO_CHILD_TYPES; c++)
2663 		for (int w = 0; w < ZIO_WAIT_TYPES; w++)
2664 			ASSERT(zio->io_children[c][w] == 0);
2665 
2666 	if (bp != NULL) {
2667 		ASSERT(bp->blk_pad[0] == 0);
2668 		ASSERT(bp->blk_pad[1] == 0);
2669 		ASSERT(bcmp(bp, &zio->io_bp_copy, sizeof (blkptr_t)) == 0 ||
2670 		    (bp == zio_unique_parent(zio)->io_bp));
2671 		if (zio->io_type == ZIO_TYPE_WRITE && !BP_IS_HOLE(bp) &&
2672 		    zio->io_bp_override == NULL &&
2673 		    !(zio->io_flags & ZIO_FLAG_IO_REPAIR)) {
2674 			ASSERT(!BP_SHOULD_BYTESWAP(bp));
2675 			ASSERT3U(zio->io_prop.zp_copies, <=, BP_GET_NDVAS(bp));
2676 			ASSERT(BP_COUNT_GANG(bp) == 0 ||
2677 			    (BP_COUNT_GANG(bp) == BP_GET_NDVAS(bp)));
2678 		}
2679 	}
2680 
2681 	/*
2682 	 * If there were child vdev/gang/ddt errors, they apply to us now.
2683 	 */
2684 	zio_inherit_child_errors(zio, ZIO_CHILD_VDEV);
2685 	zio_inherit_child_errors(zio, ZIO_CHILD_GANG);
2686 	zio_inherit_child_errors(zio, ZIO_CHILD_DDT);
2687 
2688 	/*
2689 	 * If the I/O on the transformed data was successful, generate any
2690 	 * checksum reports now while we still have the transformed data.
2691 	 */
2692 	if (zio->io_error == 0) {
2693 		while (zio->io_cksum_report != NULL) {
2694 			zio_cksum_report_t *zcr = zio->io_cksum_report;
2695 			uint64_t align = zcr->zcr_align;
2696 			uint64_t asize = P2ROUNDUP(psize, align);
2697 			char *abuf = zio->io_data;
2698 
2699 			if (asize != psize) {
2700 				abuf = zio_buf_alloc(asize);
2701 				bcopy(zio->io_data, abuf, psize);
2702 				bzero(abuf + psize, asize - psize);
2703 			}
2704 
2705 			zio->io_cksum_report = zcr->zcr_next;
2706 			zcr->zcr_next = NULL;
2707 			zcr->zcr_finish(zcr, abuf);
2708 			zfs_ereport_free_checksum(zcr);
2709 
2710 			if (asize != psize)
2711 				zio_buf_free(abuf, asize);
2712 		}
2713 	}
2714 
2715 	zio_pop_transforms(zio);	/* note: may set zio->io_error */
2716 
2717 	vdev_stat_update(zio, psize);
2718 
2719 	if (zio->io_error) {
2720 		/*
2721 		 * If this I/O is attached to a particular vdev,
2722 		 * generate an error message describing the I/O failure
2723 		 * at the block level.  We ignore these errors if the
2724 		 * device is currently unavailable.
2725 		 */
2726 		if (zio->io_error != ECKSUM && vd != NULL && !vdev_is_dead(vd))
2727 			zfs_ereport_post(FM_EREPORT_ZFS_IO, spa, vd, zio, 0, 0);
2728 
2729 		if ((zio->io_error == EIO || !(zio->io_flags &
2730 		    (ZIO_FLAG_SPECULATIVE | ZIO_FLAG_DONT_PROPAGATE))) &&
2731 		    zio == lio) {
2732 			/*
2733 			 * For logical I/O requests, tell the SPA to log the
2734 			 * error and generate a logical data ereport.
2735 			 */
2736 			spa_log_error(spa, zio);
2737 			zfs_ereport_post(FM_EREPORT_ZFS_DATA, spa, NULL, zio,
2738 			    0, 0);
2739 		}
2740 	}
2741 
2742 	if (zio->io_error && zio == lio) {
2743 		/*
2744 		 * Determine whether zio should be reexecuted.  This will
2745 		 * propagate all the way to the root via zio_notify_parent().
2746 		 */
2747 		ASSERT(vd == NULL && bp != NULL);
2748 		ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
2749 
2750 		if (IO_IS_ALLOCATING(zio) &&
2751 		    !(zio->io_flags & ZIO_FLAG_CANFAIL)) {
2752 			if (zio->io_error != ENOSPC)
2753 				zio->io_reexecute |= ZIO_REEXECUTE_NOW;
2754 			else
2755 				zio->io_reexecute |= ZIO_REEXECUTE_SUSPEND;
2756 		}
2757 
2758 		if ((zio->io_type == ZIO_TYPE_READ ||
2759 		    zio->io_type == ZIO_TYPE_FREE) &&
2760 		    zio->io_error == ENXIO &&
2761 		    spa_load_state(spa) == SPA_LOAD_NONE &&
2762 		    spa_get_failmode(spa) != ZIO_FAILURE_MODE_CONTINUE)
2763 			zio->io_reexecute |= ZIO_REEXECUTE_SUSPEND;
2764 
2765 		if (!(zio->io_flags & ZIO_FLAG_CANFAIL) && !zio->io_reexecute)
2766 			zio->io_reexecute |= ZIO_REEXECUTE_SUSPEND;
2767 
2768 		/*
2769 		 * Here is a possibly good place to attempt to do
2770 		 * either combinatorial reconstruction or error correction
2771 		 * based on checksums.  It also might be a good place
2772 		 * to send out preliminary ereports before we suspend
2773 		 * processing.
2774 		 */
2775 	}
2776 
2777 	/*
2778 	 * If there were logical child errors, they apply to us now.
2779 	 * We defer this until now to avoid conflating logical child
2780 	 * errors with errors that happened to the zio itself when
2781 	 * updating vdev stats and reporting FMA events above.
2782 	 */
2783 	zio_inherit_child_errors(zio, ZIO_CHILD_LOGICAL);
2784 
2785 	if ((zio->io_error || zio->io_reexecute) &&
2786 	    IO_IS_ALLOCATING(zio) && zio->io_gang_leader == zio &&
2787 	    !(zio->io_flags & ZIO_FLAG_IO_REWRITE))
2788 		zio_dva_unallocate(zio, zio->io_gang_tree, bp);
2789 
2790 	zio_gang_tree_free(&zio->io_gang_tree);
2791 
2792 	/*
2793 	 * Godfather I/Os should never suspend.
2794 	 */
2795 	if ((zio->io_flags & ZIO_FLAG_GODFATHER) &&
2796 	    (zio->io_reexecute & ZIO_REEXECUTE_SUSPEND))
2797 		zio->io_reexecute = 0;
2798 
2799 	if (zio->io_reexecute) {
2800 		/*
2801 		 * This is a logical I/O that wants to reexecute.
2802 		 *
2803 		 * Reexecute is top-down.  When an i/o fails, if it's not
2804 		 * the root, it simply notifies its parent and sticks around.
2805 		 * The parent, seeing that it still has children in zio_done(),
2806 		 * does the same.  This percolates all the way up to the root.
2807 		 * The root i/o will reexecute or suspend the entire tree.
2808 		 *
2809 		 * This approach ensures that zio_reexecute() honors
2810 		 * all the original i/o dependency relationships, e.g.
2811 		 * parents not executing until children are ready.
2812 		 */
2813 		ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
2814 
2815 		zio->io_gang_leader = NULL;
2816 
2817 		mutex_enter(&zio->io_lock);
2818 		zio->io_state[ZIO_WAIT_DONE] = 1;
2819 		mutex_exit(&zio->io_lock);
2820 
2821 		/*
2822 		 * "The Godfather" I/O monitors its children but is
2823 		 * not a true parent to them. It will track them through
2824 		 * the pipeline but severs its ties whenever they get into
2825 		 * trouble (e.g. suspended). This allows "The Godfather"
2826 		 * I/O to return status without blocking.
2827 		 */
2828 		for (pio = zio_walk_parents(zio); pio != NULL; pio = pio_next) {
2829 			zio_link_t *zl = zio->io_walk_link;
2830 			pio_next = zio_walk_parents(zio);
2831 
2832 			if ((pio->io_flags & ZIO_FLAG_GODFATHER) &&
2833 			    (zio->io_reexecute & ZIO_REEXECUTE_SUSPEND)) {
2834 				zio_remove_child(pio, zio, zl);
2835 				zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
2836 			}
2837 		}
2838 
2839 		if ((pio = zio_unique_parent(zio)) != NULL) {
2840 			/*
2841 			 * We're not a root i/o, so there's nothing to do
2842 			 * but notify our parent.  Don't propagate errors
2843 			 * upward since we haven't permanently failed yet.
2844 			 */
2845 			ASSERT(!(zio->io_flags & ZIO_FLAG_GODFATHER));
2846 			zio->io_flags |= ZIO_FLAG_DONT_PROPAGATE;
2847 			zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
2848 		} else if (zio->io_reexecute & ZIO_REEXECUTE_SUSPEND) {
2849 			/*
2850 			 * We'd fail again if we reexecuted now, so suspend
2851 			 * until conditions improve (e.g. device comes online).
2852 			 */
2853 			zio_suspend(spa, zio);
2854 		} else {
2855 			/*
2856 			 * Reexecution is potentially a huge amount of work.
2857 			 * Hand it off to the otherwise-unused claim taskq.
2858 			 */
2859 			(void) taskq_dispatch(
2860 			    spa->spa_zio_taskq[ZIO_TYPE_CLAIM][ZIO_TASKQ_ISSUE],
2861 			    (task_func_t *)zio_reexecute, zio, TQ_SLEEP);
2862 		}
2863 		return (ZIO_PIPELINE_STOP);
2864 	}
2865 
2866 	ASSERT(zio->io_child_count == 0);
2867 	ASSERT(zio->io_reexecute == 0);
2868 	ASSERT(zio->io_error == 0 || (zio->io_flags & ZIO_FLAG_CANFAIL));
2869 
2870 	/*
2871 	 * Report any checksum errors, since the I/O is complete.
2872 	 */
2873 	while (zio->io_cksum_report != NULL) {
2874 		zio_cksum_report_t *zcr = zio->io_cksum_report;
2875 		zio->io_cksum_report = zcr->zcr_next;
2876 		zcr->zcr_next = NULL;
2877 		zcr->zcr_finish(zcr, NULL);
2878 		zfs_ereport_free_checksum(zcr);
2879 	}
2880 
2881 	/*
2882 	 * It is the responsibility of the done callback to ensure that this
2883 	 * particular zio is no longer discoverable for adoption, and as
2884 	 * such, cannot acquire any new parents.
2885 	 */
2886 	if (zio->io_done)
2887 		zio->io_done(zio);
2888 
2889 	mutex_enter(&zio->io_lock);
2890 	zio->io_state[ZIO_WAIT_DONE] = 1;
2891 	mutex_exit(&zio->io_lock);
2892 
2893 	for (pio = zio_walk_parents(zio); pio != NULL; pio = pio_next) {
2894 		zio_link_t *zl = zio->io_walk_link;
2895 		pio_next = zio_walk_parents(zio);
2896 		zio_remove_child(pio, zio, zl);
2897 		zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
2898 	}
2899 
2900 	if (zio->io_waiter != NULL) {
2901 		mutex_enter(&zio->io_lock);
2902 		zio->io_executor = NULL;
2903 		cv_broadcast(&zio->io_cv);
2904 		mutex_exit(&zio->io_lock);
2905 	} else {
2906 		zio_destroy(zio);
2907 	}
2908 
2909 	return (ZIO_PIPELINE_STOP);
2910 }
2911 
2912 /*
2913  * ==========================================================================
2914  * I/O pipeline definition
2915  * ==========================================================================
2916  */
2917 static zio_pipe_stage_t *zio_pipeline[] = {
2918 	NULL,
2919 	zio_read_bp_init,
2920 	zio_free_bp_init,
2921 	zio_issue_async,
2922 	zio_write_bp_init,
2923 	zio_checksum_generate,
2924 	zio_ddt_read_start,
2925 	zio_ddt_read_done,
2926 	zio_ddt_write,
2927 	zio_ddt_free,
2928 	zio_gang_assemble,
2929 	zio_gang_issue,
2930 	zio_dva_allocate,
2931 	zio_dva_free,
2932 	zio_dva_claim,
2933 	zio_ready,
2934 	zio_vdev_io_start,
2935 	zio_vdev_io_done,
2936 	zio_vdev_io_assess,
2937 	zio_checksum_verify,
2938 	zio_done
2939 };
2940