Print this page
NEX-20218 Backport Illumos #9474 txg_kick() fails to see that we are quiescing, forcing transactions to their next stages without leaving them accumulate changes
MFV illumos-gate@fa41d87de9ec9000964c605eb01d6dc19e4a1abe
    9464 txg_kick() fails to see that we are quiescing, forcing transactions to their next stages without leaving them accumulate changes
    Reviewed by: Matt Ahrens <matt@delphix.com>
    Reviewed by: Brad Lewis <brad.lewis@delphix.com>
    Reviewed by: Andriy Gapon <avg@FreeBSD.org>
    Approved by: Dan McDonald <danmcd@joyent.com>
NEX-20208 Backport Illumos #9993 zil writes can get delayed in zio pipeline
MFV illumos-gate@2258ad0b755b24a55c6173b1e6bb6188389f72dd
    9993 zil writes can get delayed in zio pipeline
    Reviewed by: Prakash Surya <prakash.surya@delphix.com>
    Reviewed by: Brad Lewis <brad.lewis@delphix.com>
    Reviewed by: Matt Ahrens <matt@delphix.com>
    Approved by: Dan McDonald <danmcd@joyent.com>
NEX-9552 zfs_scan_idle throttling harms performance and needs to be removed
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
NEX-15067 KRRP: system panics during ZFS-receive: assertion failed: arc_can_share(hdr, buf)
Reviewed by: Yuri Pankov <yuri.pankov@nexenta.com>
NEX-15067 KRRP: system panics during ZFS-receive: assertion failed: arc_can_share(hdr, buf)
Reviewed by: Yuri Pankov <yuri.pankov@nexenta.com>
NEX-14571 remove isal support remnants
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
NEX-13140 DVA-throttle support for special-class
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
Reviewed by: Yuri Pankov <yuri.pankov@nexenta.com>
NEX-9752 backport illumos 6950 ARC should cache compressed data
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
Reviewed by: Yuri Pankov <yuri.pankov@nexenta.com>
6950 ARC should cache compressed data
Reviewed by: Prakash Surya <prakash.surya@delphix.com>
Reviewed by: Dan Kimmel <dan.kimmel@delphix.com>
Reviewed by: Matt Ahrens <mahrens@delphix.com>
Reviewed by: Paul Dagnelie <pcd@delphix.com>
Reviewed by: Don Brady <don.brady@intel.com>
Reviewed by: Richard Elling <Richard.Elling@RichardElling.com>
Approved by: Richard Lowe <richlowe@richlowe.net>
NEX-6088 ZFS scrub/resilver take excessively long due to issuing lots of random IO
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
NEX-8065 ZFS doesn't notice when disk vdevs have no write cache
Reviewed by: Dan Fields <dan.fields@nexenta.com>
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
Reviewed by: George Wilson <george.wilson@delphix.com>
NEX-5856 ddt_capped isn't reset when deduped dataset is destroyed
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
NEX-5795 Rename 'wrc' as 'wbc' in the source and in the tech docs
Reviewed by: Alex Aizman <alex.aizman@nexenta.com>
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
NEX-5367 special vdev: sync-write options (NEW)
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
NEX-5318 Cleanup specialclass property (obsolete, not used) and fix related meta-to-special case
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
NEX-5188 Removed special-vdev causes panic on read or on get size of special-bp
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
NEX-5058 WBC: Race between the purging of window and opening new one
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
Reviewed by: Alex Aizman <alex.aizman@nexenta.com>
NEX-2830 ZFS smart compression
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
NEX-4794 Write Back Cache sync and async writes: adjust routing according to watermark limits
Reviewed by: Alex Aizman <alex.aizman@nexenta.com>
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
NEX-4619 Want kstats to monitor TRIM and UNMAP operation
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
Reviewed by: Hans Rosenfeld <hans.rosenfeld@nexenta.com>
Reviewed by: Josef 'Jeff' Sipek <josef.sipek@nexenta.com>
6328 Fix cstyle errors in zfs codebase (fix studio)
6328 Fix cstyle errors in zfs codebase
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: Alex Reece <alex@delphix.com>
Reviewed by: Richard Elling <Richard.Elling@RichardElling.com>
Reviewed by: Jorgen Lundman <lundman@lundman.net>
Approved by: Robert Mustacchi <rm@joyent.com>
4185 add new cryptographic checksums to ZFS: SHA-512, Skein, Edon-R (fix studio build)
4185 add new cryptographic checksums to ZFS: SHA-512, Skein, Edon-R
Reviewed by: George Wilson <george.wilson@delphix.com>
Reviewed by: Prakash Surya <prakash.surya@delphix.com>
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
Reviewed by: Richard Lowe <richlowe@richlowe.net>
Approved by: Garrett D'Amore <garrett@damore.org>
NEX-4582 update wrc test cases for allow to use write back cache per tree of datasets
Reviewed by: Steve Peng <steve.peng@nexenta.com>
Reviewed by: Alex Aizman <alex.aizman@nexenta.com>
5960 zfs recv should prefetch indirect blocks
5925 zfs receive -o origin=
Reviewed by: Prakash Surya <prakash.surya@delphix.com>
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
5438 zfs_blkptr_verify should continue after zfs_panic_recover
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: George Wilson <george@delphix.com>
Reviewed by: Xin LI <delphij@freebsd.org>
Approved by: Dan McDonald <danmcd@omniti.com>
5818 zfs {ref}compressratio is incorrect with 4k sector size
Reviewed by: Alex Reece <alex@delphix.com>
Reviewed by: George Wilson <george@delphix.com>
Reviewed by: Richard Elling <richard.elling@richardelling.com>
Reviewed by: Steven Hartland <killing@multiplay.co.uk>
Reviewed by: Don Brady <dev.fs.zfs@gmail.com>
Approved by: Albert Lee <trisk@omniti.com>
NEX-3502 dedup ceiling should set a pool prop when cap is in effect
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
Reviewed by: Yuri Pankov <yuri.pankov@nexenta.com>
NEX-3984 On-demand TRIM
Reviewed by: Alek Pinchuk <alek@nexenta.com>
Reviewed by: Josef 'Jeff' Sipek <josef.sipek@nexenta.com>
Conflicts:
        usr/src/common/zfs/zpool_prop.c
        usr/src/uts/common/sys/fs/zfs.h
NEX-4003 WRC: System panics on debug build
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
Reviewed by: Josef 'Jeff' Sipek <josef.sipek@nexenta.com>
NEX-3558 KRRP Integration
NEX-3508 CLONE - Port NEX-2946 Add UNMAP/TRIM functionality to ZFS and illumos
Reviewed by: Josef Sipek <josef.sipek@nexenta.com>
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
Conflicts:
    usr/src/uts/common/io/scsi/targets/sd.c
    usr/src/uts/common/sys/scsi/targets/sddef.h
NEX-3411 Removal of small l2arc ddt vdev disables dedup despite enough RAM
Reviewed by: Kirill Davydychev <kirill.davydychev@nexenta.com>
Reviewed by: Tony Nguyen <tony.nguyen@nexenta.com>
NEX-3300 ddt byte count ceiling tunables should not depend on zfs_ddt_limit_type being set
Reviewed by: Josef 'Jeff' Sipek <josef.sipek@nexenta.com>
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
NEX-3165 need some dedup improvements
Reviewed by: Josef 'Jeff' Sipek <josef.sipek@nexenta.com>
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
4370 avoid transmitting holes during zfs send
4371 DMU code clean up
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: George Wilson <george.wilson@delphix.com>
Reviewed by: Christopher Siden <christopher.siden@delphix.com>
Reviewed by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
Approved by: Garrett D'Amore <garrett@damore.org>
NEX-1110 Odd zpool Latency Output
OS-70 remove zio timer code
Moved closed ZFS files to open repo, changed Makefiles accordingly
Removed unneeded weak symbols
Support for secondarycache=data option
Align mutex tables in arc.c and dbuf.c to 64 bytes (cache line), place each kmutex_t on cache line by itself to avoid false sharing
Fixup merge results
re #13989 port of illumos-3805
3805 arc shouldn't cache freed blocks
Reviewed by: George Wilson <george.wilson@delphix.com>
Reviewed by: Christopher Siden <christopher.siden@delphix.com>
Reviewed by: Richard Elling <richard.elling@dey-sys.com>
Reviewed by: Will Andrews <will@firepipe.net>
Approved by: Dan McDonald <danmcd@nexenta.com>
SUP-504 Multiple disks being falsely failed/retired by new zio_timeout handling code
re #12770 rb4121 zio latency reports can produce false positives
re #12645 rb4073 Make vdev delay simulator independent of DEBUG
re #12643 rb4064 ZFS meta refactoring - vdev utilization tracking, auto-dedup
re #12616 rb4051 zfs_log_write()/dmu_sync() write once to special refactoring
re #8279 rb3915 need a mechanism to notify NMS about ZFS config changes (fix lint -courtesy of Yuri Pankov)
re #12584 rb4049 zfsxx latest code merge (fix lint - courtesy of Yuri Pankov)
re #12585 rb4049 ZFS++ work port - refactoring to improve separation of open/closed code, bug fixes, performance improvements - open code
re #12393 rb3935 Kerberos and smbd disagree about who is our AD server (fix elf runtime attributes check)
re #11612 rb3907 Failing vdev of a mirrored pool should not take zfs operations out of action for extended periods of time.
re #8346 rb2639 KT disk failures
Bug 11205: add missing libzfs_closed_stubs.c to fix opensource-only build.
ZFS plus work: special vdevs, cos, cos/vdev properties

@@ -16,15 +16,16 @@
  * fields enclosed by brackets "[]" replaced with your own identifying
  * information: Portions Copyright [yyyy] [name of copyright owner]
  *
  * CDDL HEADER END
  */
+
 /*
  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  * Copyright (c) 2011, 2017 by Delphix. All rights reserved.
- * Copyright (c) 2011 Nexenta Systems, Inc. All rights reserved.
  * Copyright (c) 2014 Integros [integros.com]
+ * Copyright 2017 Nexenta Systems, Inc. All rights reserved.
  */
 
 #include <sys/sysmacros.h>
 #include <sys/zfs_context.h>
 #include <sys/fm/fs/zfs.h>

@@ -37,14 +38,21 @@
 #include <sys/zio_checksum.h>
 #include <sys/dmu_objset.h>
 #include <sys/arc.h>
 #include <sys/ddt.h>
 #include <sys/blkptr.h>
+#include <sys/special.h>
+#include <sys/blkptr.h>
 #include <sys/zfeature.h>
+#include <sys/dkioc_free_util.h>
+#include <sys/dsl_scan.h>
+
 #include <sys/metaslab_impl.h>
 #include <sys/abd.h>
 
+extern int zfs_txg_timeout;
+
 /*
  * ==========================================================================
  * I/O type descriptions
  * ==========================================================================
  */

@@ -67,16 +75,14 @@
 
 #ifdef _KERNEL
 extern vmem_t *zio_alloc_arena;
 #endif
 
-#define ZIO_PIPELINE_CONTINUE           0x100
-#define ZIO_PIPELINE_STOP               0x101
-
 #define BP_SPANB(indblkshift, level) \
         (((uint64_t)1) << ((level) * ((indblkshift) - SPA_BLKPTRSHIFT)))
 #define COMPARE_META_LEVEL      0x80000000ul
+
 /*
  * The following actions directly effect the spa's sync-to-convergence logic.
  * The values below define the sync pass when we start performing the action.
  * Care should be taken when changing these values as they directly impact
  * spa_sync() performance. Tuning these values may introduce subtle performance

@@ -103,10 +109,25 @@
 int zio_buf_debug_limit = 16384;
 #else
 int zio_buf_debug_limit = 0;
 #endif
 
+/*
+ * Fault insertion for stress testing
+ */
+int zio_faulty_vdev_enabled = 0;
+uint64_t zio_faulty_vdev_guid;
+uint64_t zio_faulty_vdev_delay_us = 1000000;    /* 1 second */
+
+/*
+ * Tunable to allow for debugging SCSI UNMAP/SATA TRIM calls. Disabling
+ * it will prevent ZFS from attempting to issue DKIOCFREE ioctls to the
+ * underlying storage.
+ */
+boolean_t zfs_trim = B_TRUE;
+uint64_t zfs_trim_min_ext_sz = 1 << 20; /* 1 MB */
+
 static void zio_taskq_dispatch(zio_t *, zio_taskq_type_t, boolean_t);
 
 void
 zio_init(void)
 {

@@ -178,10 +199,11 @@
                 if (zio_data_buf_cache[c - 1] == NULL)
                         zio_data_buf_cache[c - 1] = zio_data_buf_cache[c];
         }
 
         zio_inject_init();
+
 }
 
 void
 zio_fini(void)
 {

@@ -440,30 +462,25 @@
 
         kmem_cache_free(zio_link_cache, zl);
 }
 
 static boolean_t
-zio_wait_for_children(zio_t *zio, uint8_t childbits, enum zio_wait_type wait)
+zio_wait_for_children(zio_t *zio, enum zio_child child, enum zio_wait_type wait)
 {
+        uint64_t *countp = &zio->io_children[child][wait];
         boolean_t waiting = B_FALSE;
 
         mutex_enter(&zio->io_lock);
         ASSERT(zio->io_stall == NULL);
-        for (int c = 0; c < ZIO_CHILD_TYPES; c++) {
-                if (!(ZIO_CHILD_BIT_IS_SET(childbits, c)))
-                        continue;
-
-                uint64_t *countp = &zio->io_children[c][wait];
                 if (*countp != 0) {
                         zio->io_stage >>= 1;
                         ASSERT3U(zio->io_stage, !=, ZIO_STAGE_OPEN);
                         zio->io_stall = countp;
                         waiting = B_TRUE;
-                        break;
                 }
-        }
         mutex_exit(&zio->io_lock);
+
         return (waiting);
 }
 
 static void
 zio_notify_parent(zio_t *pio, zio_t *zio, enum zio_wait_type wait)

@@ -617,15 +634,20 @@
 
         if (zb != NULL)
                 zio->io_bookmark = *zb;
 
         if (pio != NULL) {
+                zio->io_mc = pio->io_mc;
                 if (zio->io_logical == NULL)
                         zio->io_logical = pio->io_logical;
                 if (zio->io_child_type == ZIO_CHILD_GANG)
                         zio->io_gang_leader = pio->io_gang_leader;
                 zio_add_child(pio, zio);
+
+                /* copy the smartcomp setting when creating child zio's */
+                bcopy(&pio->io_smartcomp, &zio->io_smartcomp,
+                    sizeof (zio->io_smartcomp));
         }
 
         return (zio);
 }
 

@@ -660,10 +682,18 @@
 }
 
 void
 zfs_blkptr_verify(spa_t *spa, const blkptr_t *bp)
 {
+        /*
+         * SPECIAL-BP has two DVAs, but DVA[0] in this case is a
+         * temporary DVA, and after migration only the DVA[1]
+         * contains valid data. Therefore, we start walking for
+         * these BPs from DVA[1].
+         */
+        int start_dva = BP_IS_SPECIAL(bp) ? 1 : 0;
+
         if (!DMU_OT_IS_VALID(BP_GET_TYPE(bp))) {
                 zfs_panic_recover("blkptr at %p has invalid TYPE %llu",
                     bp, (longlong_t)BP_GET_TYPE(bp));
         }
         if (BP_GET_CHECKSUM(bp) >= ZIO_CHECKSUM_FUNCTIONS ||

@@ -691,25 +721,18 @@
                             bp, (longlong_t)BPE_GET_ETYPE(bp));
                 }
         }
 
         /*
-         * Do not verify individual DVAs if the config is not trusted. This
-         * will be done once the zio is executed in vdev_mirror_map_alloc.
-         */
-        if (!spa->spa_trust_config)
-                return;
-
-        /*
          * Pool-specific checks.
          *
          * Note: it would be nice to verify that the blk_birth and
          * BP_PHYSICAL_BIRTH() are not too large.  However, spa_freeze()
          * allows the birth time of log blocks (and dmu_sync()-ed blocks
          * that are in the log) to be arbitrarily large.
          */
-        for (int i = 0; i < BP_GET_NDVAS(bp); i++) {
+        for (int i = start_dva; i < BP_GET_NDVAS(bp); i++) {
                 uint64_t vdevid = DVA_GET_VDEV(&bp->blk_dva[i]);
                 if (vdevid >= spa->spa_root_vdev->vdev_children) {
                         zfs_panic_recover("blkptr at %p DVA %u has invalid "
                             "VDEV %llu",
                             bp, i, (longlong_t)vdevid);

@@ -746,40 +769,10 @@
                             bp, i, (longlong_t)offset);
                 }
         }
 }
 
-boolean_t
-zfs_dva_valid(spa_t *spa, const dva_t *dva, const blkptr_t *bp)
-{
-        uint64_t vdevid = DVA_GET_VDEV(dva);
-
-        if (vdevid >= spa->spa_root_vdev->vdev_children)
-                return (B_FALSE);
-
-        vdev_t *vd = spa->spa_root_vdev->vdev_child[vdevid];
-        if (vd == NULL)
-                return (B_FALSE);
-
-        if (vd->vdev_ops == &vdev_hole_ops)
-                return (B_FALSE);
-
-        if (vd->vdev_ops == &vdev_missing_ops) {
-                return (B_FALSE);
-        }
-
-        uint64_t offset = DVA_GET_OFFSET(dva);
-        uint64_t asize = DVA_GET_ASIZE(dva);
-
-        if (BP_IS_GANG(bp))
-                asize = vdev_psize_to_asize(vd, SPA_GANGBLOCKSIZE);
-        if (offset + asize > vd->vdev_asize)
-                return (B_FALSE);
-
-        return (B_TRUE);
-}
-
 zio_t *
 zio_read(zio_t *pio, spa_t *spa, const blkptr_t *bp,
     abd_t *data, uint64_t size, zio_done_func_t *done, void *private,
     zio_priority_t priority, enum zio_flag flags, const zbookmark_phys_t *zb)
 {

@@ -800,11 +793,12 @@
 zio_write(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp,
     abd_t *data, uint64_t lsize, uint64_t psize, const zio_prop_t *zp,
     zio_done_func_t *ready, zio_done_func_t *children_ready,
     zio_done_func_t *physdone, zio_done_func_t *done,
     void *private, zio_priority_t priority, enum zio_flag flags,
-    const zbookmark_phys_t *zb)
+    const zbookmark_phys_t *zb,
+    const zio_smartcomp_info_t *smartcomp)
 {
         zio_t *zio;
 
         ASSERT(zp->zp_checksum >= ZIO_CHECKSUM_OFF &&
             zp->zp_checksum < ZIO_CHECKSUM_FUNCTIONS &&

@@ -822,10 +816,12 @@
 
         zio->io_ready = ready;
         zio->io_children_ready = children_ready;
         zio->io_physdone = physdone;
         zio->io_prop = *zp;
+        if (smartcomp != NULL)
+                bcopy(smartcomp, &zio->io_smartcomp, sizeof (*smartcomp));
 
         /*
          * Data can be NULL if we are going to call zio_write_override() to
          * provide the already-allocated BP.  But we may need the data to
          * verify a dedup hit (if requested).  In this case, don't try to

@@ -873,12 +869,10 @@
 
 void
 zio_free(spa_t *spa, uint64_t txg, const blkptr_t *bp)
 {
 
-        zfs_blkptr_verify(spa, bp);
-
         /*
          * The check for EMBEDDED is a performance optimization.  We
          * process the free here (by ignoring it) rather than
          * putting it on the list and then processing it in zio_free_sync().
          */

@@ -915,10 +909,11 @@
         if (BP_IS_EMBEDDED(bp))
                 return (zio_null(pio, spa, NULL, NULL, NULL, 0));
 
         metaslab_check_free(spa, bp);
         arc_freed(spa, bp);
+        dsl_scan_freed(spa, bp);
 
         /*
          * GANG and DEDUP blocks can induce a read (for the gang block header,
          * or the DDT), so issue them asynchronously so that this thread is
          * not tied up.

@@ -937,11 +932,11 @@
 zio_claim(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
     zio_done_func_t *done, void *private, enum zio_flag flags)
 {
         zio_t *zio;
 
-        zfs_blkptr_verify(spa, bp);
+        dprintf_bp(bp, "claiming in txg %llu", txg);
 
         if (BP_IS_EMBEDDED(bp))
                 return (zio_null(pio, spa, NULL, NULL, NULL, 0));
 
         /*

@@ -966,35 +961,159 @@
         ASSERT0(zio->io_queued_timestamp);
 
         return (zio);
 }
 
-zio_t *
-zio_ioctl(zio_t *pio, spa_t *spa, vdev_t *vd, int cmd,
-    zio_done_func_t *done, void *private, enum zio_flag flags)
+static zio_t *
+zio_ioctl_with_pipeline(zio_t *pio, spa_t *spa, vdev_t *vd, int cmd,
+    zio_done_func_t *done, void *private, enum zio_flag flags,
+    enum zio_stage pipeline)
 {
         zio_t *zio;
         int c;
 
         if (vd->vdev_children == 0) {
                 zio = zio_create(pio, spa, 0, NULL, NULL, 0, 0, done, private,
                     ZIO_TYPE_IOCTL, ZIO_PRIORITY_NOW, flags, vd, 0, NULL,
-                    ZIO_STAGE_OPEN, ZIO_IOCTL_PIPELINE);
+                    ZIO_STAGE_OPEN, pipeline);
 
                 zio->io_cmd = cmd;
         } else {
-                zio = zio_null(pio, spa, NULL, NULL, NULL, flags);
-
+                zio = zio_null(pio, spa, vd, done, private, flags);
+                /*
+                 * DKIOCFREE ioctl's need some special handling on interior
+                 * vdevs. If the device provides an ops function to handle
+                 * recomputing dkioc_free extents, then we call it.
+                 * Otherwise the default behavior applies, which simply fans
+                 * out the ioctl to all component vdevs.
+                 */
+                if (cmd == DKIOCFREE && vd->vdev_ops->vdev_op_trim != NULL) {
+                        vd->vdev_ops->vdev_op_trim(vd, zio, private);
+                } else {
                 for (c = 0; c < vd->vdev_children; c++)
-                        zio_nowait(zio_ioctl(zio, spa, vd->vdev_child[c], cmd,
-                            done, private, flags));
+                                zio_nowait(zio_ioctl_with_pipeline(zio,
+                                    spa, vd->vdev_child[c], cmd, NULL,
+                                    private, flags, pipeline));
         }
+        }
 
         return (zio);
 }
 
 zio_t *
+zio_ioctl(zio_t *pio, spa_t *spa, vdev_t *vd, int cmd,
+    zio_done_func_t *done, void *private, enum zio_flag flags)
+{
+        return (zio_ioctl_with_pipeline(pio, spa, vd, cmd, done,
+            private, flags, ZIO_IOCTL_PIPELINE));
+}
+
+/*
+ * Callback for when a trim zio has completed. This simply frees the
+ * dkioc_free_list_t extent list of the DKIOCFREE ioctl.
+ */
+static void
+zio_trim_done(zio_t *zio)
+{
+        VERIFY(zio->io_private != NULL);
+        dfl_free(zio->io_private);
+}
+
+static void
+zio_trim_check(uint64_t start, uint64_t len, void *msp)
+{
+        metaslab_t *ms = msp;
+        boolean_t held = MUTEX_HELD(&ms->ms_lock);
+        if (!held)
+                mutex_enter(&ms->ms_lock);
+        ASSERT(ms->ms_trimming_ts != NULL);
+        ASSERT(range_tree_contains(ms->ms_trimming_ts->ts_tree,
+            start - VDEV_LABEL_START_SIZE, len));
+        if (!held)
+                mutex_exit(&ms->ms_lock);
+}
+
+/*
+ * Takes a bunch of freed extents and tells the underlying vdevs that the
+ * space associated with these extents can be released.
+ * This is used by flash storage to pre-erase blocks for rapid reuse later
+ * and thin-provisioned block storage to reclaim unused blocks.
+ */
+zio_t *
+zio_trim(spa_t *spa, vdev_t *vd, struct range_tree *tree,
+    zio_done_func_t *done, void *private, enum zio_flag flags,
+    int trim_flags, metaslab_t *msp)
+{
+        dkioc_free_list_t *dfl = NULL;
+        range_seg_t *rs;
+        uint64_t rs_idx;
+        uint64_t num_exts;
+        uint64_t bytes_issued = 0, bytes_skipped = 0, exts_skipped = 0;
+        /*
+         * We need this to invoke the caller's `done' callback with the
+         * correct io_private (not the dkioc_free_list_t, which is needed
+         * by the underlying DKIOCFREE ioctl).
+         */
+        zio_t *sub_pio = zio_root(spa, done, private, flags);
+
+        ASSERT(range_tree_space(tree) != 0);
+
+        if (!zfs_trim)
+                return (sub_pio);
+
+        num_exts = avl_numnodes(&tree->rt_root);
+        dfl = kmem_zalloc(DFL_SZ(num_exts), KM_SLEEP);
+        dfl->dfl_flags = trim_flags;
+        dfl->dfl_num_exts = num_exts;
+        dfl->dfl_offset = VDEV_LABEL_START_SIZE;
+        if (msp) {
+                dfl->dfl_ck_func = zio_trim_check;
+                dfl->dfl_ck_arg = msp;
+        }
+
+        for (rs = avl_first(&tree->rt_root), rs_idx = 0; rs != NULL;
+            rs = AVL_NEXT(&tree->rt_root, rs)) {
+                uint64_t len = rs->rs_end - rs->rs_start;
+
+                if (len < zfs_trim_min_ext_sz) {
+                        bytes_skipped += len;
+                        exts_skipped++;
+                        continue;
+                }
+
+                dfl->dfl_exts[rs_idx].dfle_start = rs->rs_start;
+                dfl->dfl_exts[rs_idx].dfle_length = len;
+
+                // check we're a multiple of the vdev ashift
+                ASSERT0(dfl->dfl_exts[rs_idx].dfle_start &
+                    ((1 << vd->vdev_ashift) - 1));
+                ASSERT0(dfl->dfl_exts[rs_idx].dfle_length &
+                    ((1 << vd->vdev_ashift) - 1));
+
+                rs_idx++;
+                bytes_issued += len;
+        }
+
+        spa_trimstats_update(spa, rs_idx, bytes_issued, exts_skipped,
+            bytes_skipped);
+
+        /* the zfs_trim_min_ext_sz filter may have shortened the list */
+        if (dfl->dfl_num_exts != rs_idx) {
+                dkioc_free_list_t *dfl2 = kmem_zalloc(DFL_SZ(rs_idx), KM_SLEEP);
+                bcopy(dfl, dfl2, DFL_SZ(rs_idx));
+                dfl2->dfl_num_exts = rs_idx;
+                dfl_free(dfl);
+                dfl = dfl2;
+        }
+
+        zio_nowait(zio_ioctl_with_pipeline(sub_pio, spa, vd, DKIOCFREE,
+            zio_trim_done, dfl, ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE |
+            ZIO_FLAG_DONT_RETRY, ZIO_TRIM_PIPELINE));
+        return (sub_pio);
+}
+
+zio_t *
 zio_read_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
     abd_t *data, int checksum, zio_done_func_t *done, void *private,
     zio_priority_t priority, enum zio_flag flags, boolean_t labels)
 {
         zio_t *zio;

@@ -1056,31 +1175,13 @@
     enum zio_flag flags, zio_done_func_t *done, void *private)
 {
         enum zio_stage pipeline = ZIO_VDEV_CHILD_PIPELINE;
         zio_t *zio;
 
-        /*
-         * vdev child I/Os do not propagate their error to the parent.
-         * Therefore, for correct operation the caller *must* check for
-         * and handle the error in the child i/o's done callback.
-         * The only exceptions are i/os that we don't care about
-         * (OPTIONAL or REPAIR).
-         */
-        ASSERT((flags & ZIO_FLAG_OPTIONAL) || (flags & ZIO_FLAG_IO_REPAIR) ||
-            done != NULL);
+        ASSERT(vd->vdev_parent ==
+            (pio->io_vd ? pio->io_vd : pio->io_spa->spa_root_vdev));
 
-        /*
-         * In the common case, where the parent zio was to a normal vdev,
-         * the child zio must be to a child vdev of that vdev.  Otherwise,
-         * the child zio must be to a top-level vdev.
-         */
-        if (pio->io_vd != NULL && pio->io_vd->vdev_ops != &vdev_indirect_ops) {
-                ASSERT3P(vd->vdev_parent, ==, pio->io_vd);
-        } else {
-                ASSERT3P(vd, ==, vd->vdev_top);
-        }
-
         if (type == ZIO_TYPE_READ && bp != NULL) {
                 /*
                  * If we have the bp, then the child should perform the
                  * checksum and the parent need not.  This pushes error
                  * detection as close to the leaves as possible and

@@ -1088,16 +1189,14 @@
                  */
                 pipeline |= ZIO_STAGE_CHECKSUM_VERIFY;
                 pio->io_pipeline &= ~ZIO_STAGE_CHECKSUM_VERIFY;
         }
 
-        if (vd->vdev_ops->vdev_op_leaf) {
-                ASSERT0(vd->vdev_children);
+        if (vd->vdev_children == 0)
                 offset += VDEV_LABEL_START_SIZE;
-        }
 
-        flags |= ZIO_VDEV_CHILD_FLAGS(pio);
+        flags |= ZIO_VDEV_CHILD_FLAGS(pio) | ZIO_FLAG_DONT_PROPAGATE;
 
         /*
          * If we've decided to do a repair, the write is not speculative --
          * even if the original read was.
          */

@@ -1110,11 +1209,11 @@
          * If this is a retried I/O then we ignore it since we will
          * have already processed the original allocating I/O.
          */
         if (flags & ZIO_FLAG_IO_ALLOCATING &&
             (vd != vd->vdev_top || (flags & ZIO_FLAG_IO_RETRY))) {
-                metaslab_class_t *mc = spa_normal_class(pio->io_spa);
+                metaslab_class_t *mc = pio->io_mc;
 
                 ASSERT(mc->mc_alloc_throttle_enabled);
                 ASSERT(type == ZIO_TYPE_WRITE);
                 ASSERT(priority == ZIO_PRIORITY_ASYNC_WRITE);
                 ASSERT(!(flags & ZIO_FLAG_IO_REPAIR));

@@ -1191,12 +1290,10 @@
 static int
 zio_read_bp_init(zio_t *zio)
 {
         blkptr_t *bp = zio->io_bp;
 
-        ASSERT3P(zio->io_bp, ==, &zio->io_bp_copy);
-
         if (BP_GET_COMPRESS(bp) != ZIO_COMPRESS_OFF &&
             zio->io_child_type == ZIO_CHILD_LOGICAL &&
             !(zio->io_flags & ZIO_FLAG_RAW)) {
                 uint64_t psize =
                     BP_IS_EMBEDDED(bp) ? BPE_GET_PSIZE(bp) : BP_GET_PSIZE(bp);

@@ -1211,14 +1308,13 @@
                 void *data = abd_borrow_buf(zio->io_abd, psize);
                 decode_embedded_bp_compressed(bp, data);
                 abd_return_buf_copy(zio->io_abd, data, psize);
         } else {
                 ASSERT(!BP_IS_EMBEDDED(bp));
-                ASSERT3P(zio->io_bp, ==, &zio->io_bp_copy);
         }
 
-        if (!DMU_OT_IS_METADATA(BP_GET_TYPE(bp)) && BP_GET_LEVEL(bp) == 0)
+        if (!BP_IS_METADATA(bp))
                 zio->io_flags |= ZIO_FLAG_DONT_CACHE;
 
         if (BP_GET_TYPE(bp) == DMU_OT_DDT_ZAP)
                 zio->io_flags |= ZIO_FLAG_DONT_CACHE;
 

@@ -1302,14 +1398,13 @@
 
         /*
          * If our children haven't all reached the ready stage,
          * wait for them and then repeat this pipeline stage.
          */
-        if (zio_wait_for_children(zio, ZIO_CHILD_LOGICAL_BIT |
-            ZIO_CHILD_GANG_BIT, ZIO_WAIT_READY)) {
+        if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_READY) ||
+            zio_wait_for_children(zio, ZIO_CHILD_LOGICAL, ZIO_WAIT_READY))
                 return (ZIO_PIPELINE_STOP);
-        }
 
         if (!IO_IS_ALLOCATING(zio))
                 return (ZIO_PIPELINE_CONTINUE);
 
         if (zio->io_children_ready != NULL) {

@@ -1347,12 +1442,14 @@
                 /* Make sure someone doesn't change their mind on overwrites */
                 ASSERT(BP_IS_EMBEDDED(bp) || MIN(zp->zp_copies + BP_IS_GANG(bp),
                     spa_max_replication(spa)) == BP_GET_NDVAS(bp));
         }
 
+        DTRACE_PROBE1(zio_compress_ready, zio_t *, zio);
         /* If it's a compressed write that is not raw, compress the buffer. */
-        if (compress != ZIO_COMPRESS_OFF && psize == lsize) {
+        if (compress != ZIO_COMPRESS_OFF && psize == lsize &&
+            ZIO_SHOULD_COMPRESS(zio)) {
                 void *cbuf = zio_buf_alloc(lsize);
                 psize = zio_compress_data(compress, zio->io_abd, cbuf, lsize);
                 if (psize == 0 || psize == lsize) {
                         compress = ZIO_COMPRESS_OFF;
                         zio_buf_free(cbuf, lsize);

@@ -1367,10 +1464,16 @@
                         zio_buf_free(cbuf, lsize);
                         bp->blk_birth = zio->io_txg;
                         zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
                         ASSERT(spa_feature_is_active(spa,
                             SPA_FEATURE_EMBEDDED_DATA));
+                        if (zio->io_smartcomp.sc_result != NULL) {
+                                zio->io_smartcomp.sc_result(
+                                    zio->io_smartcomp.sc_userinfo, zio);
+                        } else {
+                                ASSERT(zio->io_smartcomp.sc_ask == NULL);
+                        }
                         return (ZIO_PIPELINE_CONTINUE);
                 } else {
                         /*
                          * Round up compressed size up to the ashift
                          * of the smallest-ashift device, and zero the tail.

@@ -1394,19 +1497,37 @@
                                 zio_push_transform(zio, cdata,
                                     psize, lsize, NULL);
                         }
                 }
 
+                if (zio->io_smartcomp.sc_result != NULL) {
+                        zio->io_smartcomp.sc_result(
+                            zio->io_smartcomp.sc_userinfo, zio);
+                } else {
+                        ASSERT(zio->io_smartcomp.sc_ask == NULL);
+                }
+
                 /*
                  * We were unable to handle this as an override bp, treat
                  * it as a regular write I/O.
                  */
                 zio->io_bp_override = NULL;
                 *bp = zio->io_bp_orig;
                 zio->io_pipeline = zio->io_orig_pipeline;
         } else {
                 ASSERT3U(psize, !=, 0);
+
+                /*
+                 * We are here because of:
+                 *      - compress == ZIO_COMPRESS_OFF
+                 *      - SmartCompression decides don't compress this data
+                 *      - this is a RAW-write
+                 *
+                 *      In case of RAW-write we should not override "compress"
+                 */
+                if ((zio->io_flags & ZIO_FLAG_RAW) == 0)
+                        compress = ZIO_COMPRESS_OFF;
         }
 
         /*
          * The final pass of spa_sync() must be all rewrites, but the first
          * few passes offer a trade-off: allocating blocks defers convergence,

@@ -1435,10 +1556,14 @@
                         BP_SET_LEVEL(bp, zp->zp_level);
                         BP_SET_BIRTH(bp, zio->io_txg, 0);
                 }
                 zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
         } else {
+                if (zp->zp_dedup) {
+                        /* check the best-effort dedup setting */
+                        zio_best_effort_dedup(zio);
+                }
                 ASSERT(zp->zp_checksum != ZIO_CHECKSUM_GANG_HEADER);
                 BP_SET_LSIZE(bp, lsize);
                 BP_SET_TYPE(bp, zp->zp_type);
                 BP_SET_LEVEL(bp, zp->zp_level);
                 BP_SET_PSIZE(bp, psize);

@@ -1468,12 +1593,10 @@
         if (zio->io_child_type == ZIO_CHILD_LOGICAL) {
                 if (BP_GET_DEDUP(bp))
                         zio->io_pipeline = ZIO_DDT_FREE_PIPELINE;
         }
 
-        ASSERT3P(zio->io_bp, ==, &zio->io_bp_copy);
-
         return (ZIO_PIPELINE_CONTINUE);
 }
 
 /*
  * ==========================================================================

@@ -1504,11 +1627,12 @@
 
         /*
          * If this is a high priority I/O, then use the high priority taskq if
          * available.
          */
-        if (zio->io_priority == ZIO_PRIORITY_NOW &&
+        if ((zio->io_priority == ZIO_PRIORITY_NOW ||
+            zio->io_priority == ZIO_PRIORITY_SYNC_WRITE) &&
             spa->spa_zio_taskq[t][q + 1].stqs_count != 0)
                 q++;
 
         ASSERT3U(q, <, ZIO_TASKQ_TYPES);
 

@@ -1631,10 +1755,11 @@
 
         ASSERT3U(zio->io_queued_timestamp, >, 0);
 
         while (zio->io_stage < ZIO_STAGE_DONE) {
                 enum zio_stage pipeline = zio->io_pipeline;
+                enum zio_stage old_stage = zio->io_stage;
                 enum zio_stage stage = zio->io_stage;
                 int rv;
 
                 ASSERT(!MUTEX_HELD(&zio->io_lock));
                 ASSERT(ISP2(stage));

@@ -1668,10 +1793,16 @@
                 rv = zio_pipeline[highbit64(stage) - 1](zio);
 
                 if (rv == ZIO_PIPELINE_STOP)
                         return;
 
+                if (rv == ZIO_PIPELINE_RESTART_STAGE) {
+                        zio->io_stage = old_stage;
+                        (void) zio_issue_async(zio);
+                        return;
+                }
+
                 ASSERT(rv == ZIO_PIPELINE_CONTINUE);
         }
 }
 
 /*

@@ -2148,13 +2279,12 @@
 static int
 zio_gang_issue(zio_t *zio)
 {
         blkptr_t *bp = zio->io_bp;
 
-        if (zio_wait_for_children(zio, ZIO_CHILD_GANG_BIT, ZIO_WAIT_DONE)) {
+        if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_DONE))
                 return (ZIO_PIPELINE_STOP);
-        }
 
         ASSERT(BP_IS_GANG(bp) && zio->io_gang_leader == zio);
         ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
 
         if (zio->io_child_error[ZIO_CHILD_GANG] == 0)

@@ -2206,11 +2336,11 @@
 
 static int
 zio_write_gang_block(zio_t *pio)
 {
         spa_t *spa = pio->io_spa;
-        metaslab_class_t *mc = spa_normal_class(spa);
+        metaslab_class_t *mc = pio->io_mc;
         blkptr_t *bp = pio->io_bp;
         zio_t *gio = pio->io_gang_leader;
         zio_t *zio;
         zio_gang_node_t *gn, **gnpp;
         zio_gbh_phys_t *gbh;

@@ -2303,12 +2433,15 @@
 
                 zio_t *cio = zio_write(zio, spa, txg, &gbh->zg_blkptr[g],
                     abd_get_offset(pio->io_abd, pio->io_size - resid), lsize,
                     lsize, &zp, zio_write_gang_member_ready, NULL, NULL,
                     zio_write_gang_done, &gn->gn_child[g], pio->io_priority,
-                    ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark);
+                    ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark,
+                    &pio->io_smartcomp);
 
+                cio->io_mc = mc;
+
                 if (pio->io_flags & ZIO_FLAG_IO_ALLOCATING) {
                         ASSERT(pio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
                         ASSERT(!(pio->io_flags & ZIO_FLAG_NODATA));
 
                         /*

@@ -2471,13 +2604,12 @@
 static int
 zio_ddt_read_done(zio_t *zio)
 {
         blkptr_t *bp = zio->io_bp;
 
-        if (zio_wait_for_children(zio, ZIO_CHILD_DDT_BIT, ZIO_WAIT_DONE)) {
+        if (zio_wait_for_children(zio, ZIO_CHILD_DDT, ZIO_WAIT_DONE))
                 return (ZIO_PIPELINE_STOP);
-        }
 
         ASSERT(BP_GET_DEDUP(bp));
         ASSERT(BP_GET_PSIZE(bp) == zio->io_size);
         ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
 

@@ -2505,10 +2637,11 @@
         ASSERT(zio->io_vsd == NULL);
 
         return (ZIO_PIPELINE_CONTINUE);
 }
 
+/* ARGSUSED */
 static boolean_t
 zio_ddt_collision(zio_t *zio, ddt_t *ddt, ddt_entry_t *dde)
 {
         spa_t *spa = zio->io_spa;
         boolean_t do_raw = (zio->io_flags & ZIO_FLAG_RAW);

@@ -2542,11 +2675,11 @@
                         blkptr_t blk = *zio->io_bp;
                         int error;
 
                         ddt_bp_fill(ddp, &blk, ddp->ddp_phys_birth);
 
-                        ddt_exit(ddt);
+                        dde_exit(dde);
 
                         /*
                          * Intuitively, it would make more sense to compare
                          * io_abd than io_orig_abd in the raw case since you
                          * don't want to look at any transformations that have

@@ -2573,11 +2706,11 @@
                                     zio->io_orig_size) != 0)
                                         error = SET_ERROR(EEXIST);
                                 arc_buf_destroy(abuf, &abuf);
                         }
 
-                        ddt_enter(ddt);
+                        dde_enter(dde);
                         return (error != 0);
                 }
         }
 
         return (B_FALSE);

@@ -2585,40 +2718,38 @@
 
 static void
 zio_ddt_child_write_ready(zio_t *zio)
 {
         int p = zio->io_prop.zp_copies;
-        ddt_t *ddt = ddt_select(zio->io_spa, zio->io_bp);
         ddt_entry_t *dde = zio->io_private;
         ddt_phys_t *ddp = &dde->dde_phys[p];
         zio_t *pio;
 
         if (zio->io_error)
                 return;
 
-        ddt_enter(ddt);
+        dde_enter(dde);
 
         ASSERT(dde->dde_lead_zio[p] == zio);
 
         ddt_phys_fill(ddp, zio->io_bp);
 
         zio_link_t *zl = NULL;
         while ((pio = zio_walk_parents(zio, &zl)) != NULL)
                 ddt_bp_fill(ddp, pio->io_bp, zio->io_txg);
 
-        ddt_exit(ddt);
+        dde_exit(dde);
 }
 
 static void
 zio_ddt_child_write_done(zio_t *zio)
 {
         int p = zio->io_prop.zp_copies;
-        ddt_t *ddt = ddt_select(zio->io_spa, zio->io_bp);
         ddt_entry_t *dde = zio->io_private;
         ddt_phys_t *ddp = &dde->dde_phys[p];
 
-        ddt_enter(ddt);
+        dde_enter(dde);
 
         ASSERT(ddp->ddp_refcnt == 0);
         ASSERT(dde->dde_lead_zio[p] == zio);
         dde->dde_lead_zio[p] = NULL;
 

@@ -2628,11 +2759,11 @@
                         ddt_phys_addref(ddp);
         } else {
                 ddt_phys_clear(ddp);
         }
 
-        ddt_exit(ddt);
+        dde_exit(dde);
 }
 
 static void
 zio_ddt_ditto_write_done(zio_t *zio)
 {

@@ -2642,11 +2773,11 @@
         ddt_t *ddt = ddt_select(zio->io_spa, bp);
         ddt_entry_t *dde = zio->io_private;
         ddt_phys_t *ddp = &dde->dde_phys[p];
         ddt_key_t *ddk = &dde->dde_key;
 
-        ddt_enter(ddt);
+        dde_enter(dde);
 
         ASSERT(ddp->ddp_refcnt == 0);
         ASSERT(dde->dde_lead_zio[p] == zio);
         dde->dde_lead_zio[p] = NULL;
 

@@ -2657,11 +2788,11 @@
                 if (ddp->ddp_phys_birth != 0)
                         ddt_phys_free(ddt, ddk, ddp, zio->io_txg);
                 ddt_phys_fill(ddp, bp);
         }
 
-        ddt_exit(ddt);
+        dde_exit(dde);
 }
 
 static int
 zio_ddt_write(zio_t *zio)
 {

@@ -2680,14 +2811,36 @@
         ASSERT(BP_GET_DEDUP(bp));
         ASSERT(BP_GET_CHECKSUM(bp) == zp->zp_checksum);
         ASSERT(BP_IS_HOLE(bp) || zio->io_bp_override);
         ASSERT(!(zio->io_bp_override && (zio->io_flags & ZIO_FLAG_RAW)));
 
-        ddt_enter(ddt);
         dde = ddt_lookup(ddt, bp, B_TRUE);
-        ddp = &dde->dde_phys[p];
 
+        /*
+         * If we're not using special tier, for each new DDE that's not on disk:
+         * disable dedup if we have exhausted "allowed" DDT L2/ARC space
+         */
+        if ((dde->dde_state & DDE_NEW) && !spa->spa_usesc &&
+            (zfs_ddt_limit_type != DDT_NO_LIMIT || zfs_ddt_byte_ceiling != 0)) {
+                /* turn off dedup if we need to stop DDT growth */
+                if (spa_enable_dedup_cap(spa)) {
+                        dde->dde_state |= DDE_DONT_SYNC;
+
+                        /* disable dedup and use the ordinary write pipeline */
+                        zio_pop_transforms(zio);
+                        zp->zp_dedup = zp->zp_dedup_verify = B_FALSE;
+                        zio->io_stage = ZIO_STAGE_OPEN;
+                        zio->io_pipeline = ZIO_WRITE_PIPELINE;
+                        zio->io_bp_override = NULL;
+                        BP_ZERO(bp);
+                        dde_exit(dde);
+
+                        return (ZIO_PIPELINE_CONTINUE);
+                }
+        }
+        ASSERT(!(dde->dde_state & DDE_DONT_SYNC));
+
         if (zp->zp_dedup_verify && zio_ddt_collision(zio, ddt, dde)) {
                 /*
                  * If we're using a weak checksum, upgrade to a strong checksum
                  * and try again.  If we're already using a strong checksum,
                  * we can't resolve it, so just convert to an ordinary write.

@@ -2703,14 +2856,15 @@
                         zp->zp_dedup = B_FALSE;
                         BP_SET_DEDUP(bp, B_FALSE);
                 }
                 ASSERT(!BP_GET_DEDUP(bp));
                 zio->io_pipeline = ZIO_WRITE_PIPELINE;
-                ddt_exit(ddt);
+                dde_exit(dde);
                 return (ZIO_PIPELINE_CONTINUE);
         }
 
+        ddp = &dde->dde_phys[p];
         ditto_copies = ddt_ditto_copies_needed(ddt, dde, ddp);
         ASSERT(ditto_copies < SPA_DVAS_PER_BP);
 
         if (ditto_copies > ddt_ditto_copies_present(dde) &&
             dde->dde_lead_zio[DDT_PHYS_DITTO] == NULL) {

@@ -2729,18 +2883,18 @@
                         zio_pop_transforms(zio);
                         zio->io_stage = ZIO_STAGE_OPEN;
                         zio->io_pipeline = ZIO_WRITE_PIPELINE;
                         zio->io_bp_override = NULL;
                         BP_ZERO(bp);
-                        ddt_exit(ddt);
+                        dde_exit(dde);
                         return (ZIO_PIPELINE_CONTINUE);
                 }
 
                 dio = zio_write(zio, spa, txg, bp, zio->io_orig_abd,
                     zio->io_orig_size, zio->io_orig_size, &czp, NULL, NULL,
                     NULL, zio_ddt_ditto_write_done, dde, zio->io_priority,
-                    ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark);
+                    ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark, NULL);
 
                 zio_push_transform(dio, zio->io_abd, zio->io_size, 0, NULL);
                 dde->dde_lead_zio[DDT_PHYS_DITTO] = dio;
         }
 

@@ -2759,17 +2913,17 @@
         } else {
                 cio = zio_write(zio, spa, txg, bp, zio->io_orig_abd,
                     zio->io_orig_size, zio->io_orig_size, zp,
                     zio_ddt_child_write_ready, NULL, NULL,
                     zio_ddt_child_write_done, dde, zio->io_priority,
-                    ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark);
+                    ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark, NULL);
 
                 zio_push_transform(cio, zio->io_abd, zio->io_size, 0, NULL);
                 dde->dde_lead_zio[p] = cio;
         }
 
-        ddt_exit(ddt);
+        dde_exit(dde);
 
         if (cio)
                 zio_nowait(cio);
         if (dio)
                 zio_nowait(dio);

@@ -2789,15 +2943,15 @@
         ddt_phys_t *ddp;
 
         ASSERT(BP_GET_DEDUP(bp));
         ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
 
-        ddt_enter(ddt);
         freedde = dde = ddt_lookup(ddt, bp, B_TRUE);
         ddp = ddt_phys_select(dde, bp);
+        if (ddp)
         ddt_phys_decref(ddp);
-        ddt_exit(ddt);
+        dde_exit(dde);
 
         return (ZIO_PIPELINE_CONTINUE);
 }
 
 /*

@@ -2805,32 +2959,32 @@
  * Allocate and free blocks
  * ==========================================================================
  */
 
 static zio_t *
-zio_io_to_allocate(spa_t *spa)
+zio_io_to_allocate(metaslab_class_t *mc)
 {
         zio_t *zio;
 
-        ASSERT(MUTEX_HELD(&spa->spa_alloc_lock));
+        ASSERT(MUTEX_HELD(&mc->mc_alloc_lock));
 
-        zio = avl_first(&spa->spa_alloc_tree);
+        zio = avl_first(&mc->mc_alloc_tree);
         if (zio == NULL)
                 return (NULL);
 
         ASSERT(IO_IS_ALLOCATING(zio));
 
         /*
          * Try to place a reservation for this zio. If we're unable to
          * reserve then we throttle.
          */
-        if (!metaslab_class_throttle_reserve(spa_normal_class(spa),
+        if (!metaslab_class_throttle_reserve(mc,
             zio->io_prop.zp_copies, zio, 0)) {
                 return (NULL);
         }
 
-        avl_remove(&spa->spa_alloc_tree, zio);
+        avl_remove(&mc->mc_alloc_tree, zio);
         ASSERT3U(zio->io_stage, <, ZIO_STAGE_DVA_ALLOCATE);
 
         return (zio);
 }
 

@@ -2838,12 +2992,19 @@
 zio_dva_throttle(zio_t *zio)
 {
         spa_t *spa = zio->io_spa;
         zio_t *nio;
 
+        /* We need to use parent's MetaslabClass */
+        if (zio->io_mc == NULL) {
+                zio->io_mc = spa_select_class(spa, zio);
+                if (zio->io_prop.zp_usewbc)
+                        return (ZIO_PIPELINE_CONTINUE);
+        }
+
         if (zio->io_priority == ZIO_PRIORITY_SYNC_WRITE ||
-            !spa_normal_class(zio->io_spa)->mc_alloc_throttle_enabled ||
+            !zio->io_mc->mc_alloc_throttle_enabled ||
             zio->io_child_type == ZIO_CHILD_GANG ||
             zio->io_flags & ZIO_FLAG_NODATA) {
                 return (ZIO_PIPELINE_CONTINUE);
         }
 

@@ -2850,17 +3011,17 @@
         ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
 
         ASSERT3U(zio->io_queued_timestamp, >, 0);
         ASSERT(zio->io_stage == ZIO_STAGE_DVA_THROTTLE);
 
-        mutex_enter(&spa->spa_alloc_lock);
+        mutex_enter(&zio->io_mc->mc_alloc_lock);
 
         ASSERT(zio->io_type == ZIO_TYPE_WRITE);
-        avl_add(&spa->spa_alloc_tree, zio);
+        avl_add(&zio->io_mc->mc_alloc_tree, zio);
 
-        nio = zio_io_to_allocate(zio->io_spa);
-        mutex_exit(&spa->spa_alloc_lock);
+        nio = zio_io_to_allocate(zio->io_mc);
+        mutex_exit(&zio->io_mc->mc_alloc_lock);
 
         if (nio == zio)
                 return (ZIO_PIPELINE_CONTINUE);
 
         if (nio != NULL) {

@@ -2877,17 +3038,17 @@
         }
         return (ZIO_PIPELINE_STOP);
 }
 
 void
-zio_allocate_dispatch(spa_t *spa)
+zio_allocate_dispatch(metaslab_class_t *mc)
 {
         zio_t *zio;
 
-        mutex_enter(&spa->spa_alloc_lock);
-        zio = zio_io_to_allocate(spa);
-        mutex_exit(&spa->spa_alloc_lock);
+        mutex_enter(&mc->mc_alloc_lock);
+        zio = zio_io_to_allocate(mc);
+        mutex_exit(&mc->mc_alloc_lock);
         if (zio == NULL)
                 return;
 
         ASSERT3U(zio->io_stage, ==, ZIO_STAGE_DVA_THROTTLE);
         ASSERT0(zio->io_error);

@@ -2896,11 +3057,12 @@
 
 static int
 zio_dva_allocate(zio_t *zio)
 {
         spa_t *spa = zio->io_spa;
-        metaslab_class_t *mc = spa_normal_class(spa);
+        metaslab_class_t *mc = zio->io_mc;
+
         blkptr_t *bp = zio->io_bp;
         int error;
         int flags = 0;
 
         if (zio->io_gang_leader == NULL) {

@@ -2912,30 +3074,49 @@
         ASSERT0(BP_GET_NDVAS(bp));
         ASSERT3U(zio->io_prop.zp_copies, >, 0);
         ASSERT3U(zio->io_prop.zp_copies, <=, spa_max_replication(spa));
         ASSERT3U(zio->io_size, ==, BP_GET_PSIZE(bp));
 
-        if (zio->io_flags & ZIO_FLAG_NODATA) {
+        if (zio->io_flags & ZIO_FLAG_NODATA || zio->io_prop.zp_usewbc) {
                 flags |= METASLAB_DONT_THROTTLE;
         }
         if (zio->io_flags & ZIO_FLAG_GANG_CHILD) {
                 flags |= METASLAB_GANG_CHILD;
         }
-        if (zio->io_priority == ZIO_PRIORITY_ASYNC_WRITE) {
+        if (zio->io_priority == ZIO_PRIORITY_ASYNC_WRITE &&
+            zio->io_flags & ZIO_FLAG_IO_ALLOCATING) {
                 flags |= METASLAB_ASYNC_ALLOC;
         }
 
         error = metaslab_alloc(spa, mc, zio->io_size, bp,
             zio->io_prop.zp_copies, zio->io_txg, NULL, flags,
             &zio->io_alloc_list, zio);
 
+#ifdef _KERNEL
+        DTRACE_PROBE6(zio_dva_allocate,
+            uint64_t, DVA_GET_VDEV(&bp->blk_dva[0]),
+            uint64_t, DVA_GET_VDEV(&bp->blk_dva[1]),
+            uint64_t, BP_GET_LEVEL(bp),
+            boolean_t, BP_IS_SPECIAL(bp),
+            boolean_t, BP_IS_METADATA(bp),
+            int, error);
+#endif
+
         if (error != 0) {
                 spa_dbgmsg(spa, "%s: metaslab allocation failure: zio %p, "
                     "size %llu, error %d", spa_name(spa), zio, zio->io_size,
                     error);
-                if (error == ENOSPC && zio->io_size > SPA_MINBLOCKSIZE)
+                if (error == ENOSPC && zio->io_size > SPA_MINBLOCKSIZE) {
+                        if (zio->io_prop.zp_usewbc) {
+                                zio->io_prop.zp_usewbc = B_FALSE;
+                                zio->io_prop.zp_usesc = B_FALSE;
+                                zio->io_mc = spa_normal_class(spa);
+                        }
+
                         return (zio_write_gang_block(zio));
+                }
+
                 zio->io_error = error;
         }
 
         return (ZIO_PIPELINE_CONTINUE);
 }

@@ -2989,25 +3170,68 @@
 zio_alloc_zil(spa_t *spa, uint64_t txg, blkptr_t *new_bp, blkptr_t *old_bp,
     uint64_t size, boolean_t *slog)
 {
         int error = 1;
         zio_alloc_list_t io_alloc_list;
+        spa_meta_placement_t *mp = &spa->spa_meta_policy;
 
         ASSERT(txg > spa_syncing_txg(spa));
 
         metaslab_trace_init(&io_alloc_list);
-        error = metaslab_alloc(spa, spa_log_class(spa), size, new_bp, 1,
-            txg, old_bp, METASLAB_HINTBP_AVOID, &io_alloc_list, NULL);
-        if (error == 0) {
+
+        /*
+         * ZIL blocks are always contiguous (i.e. not gang blocks)
+         * so we set the METASLAB_HINTBP_AVOID flag so that they
+         * don't "fast gang" when allocating them.
+         * If the caller indicates that slog is not to be used
+         * (via use_slog)
+         * separate allocation class will not indeed be used,
+         * independently of whether this is log or special
+         */
+
+        if (spa_has_slogs(spa)) {
+                error = metaslab_alloc(spa, spa_log_class(spa),
+                    size, new_bp, 1, txg, old_bp,
+                    METASLAB_HINTBP_AVOID, &io_alloc_list, NULL);
+
+                DTRACE_PROBE2(zio_alloc_zil_log,
+                    spa_t *, spa, int, error);
+
+                if (error == 0)
                 *slog = TRUE;
-        } else {
+        }
+
+        /*
+         * use special when failed to allocate from the regular
+         * slog, but only if allowed and if the special used
+         * space is  below watermarks
+         */
+        if (error != 0 && spa_can_special_be_used(spa) &&
+            mp->spa_sync_to_special != SYNC_TO_SPECIAL_DISABLED) {
+                error = metaslab_alloc(spa, spa_special_class(spa),
+                    size, new_bp, 1, txg, old_bp,
+                    METASLAB_HINTBP_AVOID, &io_alloc_list, NULL);
+
+                DTRACE_PROBE2(zio_alloc_zil_special,
+                    spa_t *, spa, int, error);
+
+                if (error == 0)
+                        *slog = FALSE;
+        }
+
+        if (error != 0) {
                 error = metaslab_alloc(spa, spa_normal_class(spa), size,
                     new_bp, 1, txg, old_bp, METASLAB_HINTBP_AVOID,
                     &io_alloc_list, NULL);
+
+                DTRACE_PROBE2(zio_alloc_zil_normal,
+                    spa_t *, spa, int, error);
+
                 if (error == 0)
                         *slog = FALSE;
         }
+
         metaslab_trace_fini(&io_alloc_list);
 
         if (error == 0) {
                 BP_SET_LSIZE(new_bp, size);
                 BP_SET_PSIZE(new_bp, size);

@@ -3060,10 +3284,12 @@
 zio_vdev_io_start(zio_t *zio)
 {
         vdev_t *vd = zio->io_vd;
         uint64_t align;
         spa_t *spa = zio->io_spa;
+        zio_type_t type = zio->io_type;
+        zio->io_vd_timestamp = gethrtime();
 
         ASSERT(zio->io_error == 0);
         ASSERT(zio->io_child_error[ZIO_CHILD_VDEV] == 0);
 
         if (vd == NULL) {

@@ -3076,49 +3302,20 @@
                 vdev_mirror_ops.vdev_op_io_start(zio);
                 return (ZIO_PIPELINE_STOP);
         }
 
         ASSERT3P(zio->io_logical, !=, zio);
-        if (zio->io_type == ZIO_TYPE_WRITE) {
-                ASSERT(spa->spa_trust_config);
 
-                if (zio->io_vd->vdev_removing) {
-                        ASSERT(zio->io_flags &
-                            (ZIO_FLAG_PHYSICAL | ZIO_FLAG_SELF_HEAL |
-                            ZIO_FLAG_INDUCE_DAMAGE));
-                }
-        }
-
-        /*
-         * We keep track of time-sensitive I/Os so that the scan thread
-         * can quickly react to certain workloads.  In particular, we care
-         * about non-scrubbing, top-level reads and writes with the following
-         * characteristics:
-         *      - synchronous writes of user data to non-slog devices
-         *      - any reads of user data
-         * When these conditions are met, adjust the timestamp of spa_last_io
-         * which allows the scan thread to adjust its workload accordingly.
-         */
-        if (!(zio->io_flags & ZIO_FLAG_SCAN_THREAD) && zio->io_bp != NULL &&
-            vd == vd->vdev_top && !vd->vdev_islog &&
-            zio->io_bookmark.zb_objset != DMU_META_OBJSET &&
-            zio->io_txg != spa_syncing_txg(spa)) {
-                uint64_t old = spa->spa_last_io;
-                uint64_t new = ddi_get_lbolt64();
-                if (old != new)
-                        (void) atomic_cas_64(&spa->spa_last_io, old, new);
-        }
-
         align = 1ULL << vd->vdev_top->vdev_ashift;
 
         if (!(zio->io_flags & ZIO_FLAG_PHYSICAL) &&
             P2PHASE(zio->io_size, align) != 0) {
                 /* Transform logical writes to be a full physical block size. */
                 uint64_t asize = P2ROUNDUP(zio->io_size, align);
                 abd_t *abuf = abd_alloc_sametype(zio->io_abd, asize);
                 ASSERT(vd == vd->vdev_top);
-                if (zio->io_type == ZIO_TYPE_WRITE) {
+                if (type == ZIO_TYPE_WRITE) {
                         abd_copy(abuf, zio->io_abd, zio->io_size);
                         abd_zero_off(abuf, zio->io_size, asize - zio->io_size);
                 }
                 zio_push_transform(zio, abuf, asize, asize, zio_subblock);
         }

@@ -3137,11 +3334,11 @@
                  */
                 ASSERT0(P2PHASE(zio->io_offset, SPA_MINBLOCKSIZE));
                 ASSERT0(P2PHASE(zio->io_size, SPA_MINBLOCKSIZE));
         }
 
-        VERIFY(zio->io_type != ZIO_TYPE_WRITE || spa_writeable(spa));
+        VERIFY(type != ZIO_TYPE_WRITE || spa_writeable(spa));
 
         /*
          * If this is a repair I/O, and there's no self-healing involved --
          * that is, we're just resilvering what we expect to resilver --
          * then don't do the I/O unless zio's txg is actually in vd's DTL.

@@ -3156,19 +3353,18 @@
          */
         if ((zio->io_flags & ZIO_FLAG_IO_REPAIR) &&
             !(zio->io_flags & ZIO_FLAG_SELF_HEAL) &&
             zio->io_txg != 0 && /* not a delegated i/o */
             !vdev_dtl_contains(vd, DTL_PARTIAL, zio->io_txg, 1)) {
-                ASSERT(zio->io_type == ZIO_TYPE_WRITE);
+                ASSERT(type == ZIO_TYPE_WRITE);
                 zio_vdev_io_bypass(zio);
                 return (ZIO_PIPELINE_CONTINUE);
         }
 
         if (vd->vdev_ops->vdev_op_leaf &&
-            (zio->io_type == ZIO_TYPE_READ || zio->io_type == ZIO_TYPE_WRITE)) {
-
-                if (zio->io_type == ZIO_TYPE_READ && vdev_cache_read(zio))
+            (type == ZIO_TYPE_READ || type == ZIO_TYPE_WRITE)) {
+                if (type == ZIO_TYPE_READ && vdev_cache_read(zio))
                         return (ZIO_PIPELINE_CONTINUE);
 
                 if ((zio = vdev_queue_io(zio)) == NULL)
                         return (ZIO_PIPELINE_STOP);
 

@@ -3175,11 +3371,20 @@
                 if (!vdev_accessible(vd, zio)) {
                         zio->io_error = SET_ERROR(ENXIO);
                         zio_interrupt(zio);
                         return (ZIO_PIPELINE_STOP);
                 }
+
+                /*
+                 * Insert a fault simulation delay for a particular vdev.
+                 */
+                if (zio_faulty_vdev_enabled &&
+                    (zio->io_vd->vdev_guid == zio_faulty_vdev_guid)) {
+                        delay(NSEC_TO_TICK(zio_faulty_vdev_delay_us *
+                            (NANOSEC / MICROSEC)));
         }
+        }
 
         vd->vdev_ops->vdev_op_io_start(zio);
         return (ZIO_PIPELINE_STOP);
 }
 

@@ -3188,18 +3393,16 @@
 {
         vdev_t *vd = zio->io_vd;
         vdev_ops_t *ops = vd ? vd->vdev_ops : &vdev_mirror_ops;
         boolean_t unexpected_error = B_FALSE;
 
-        if (zio_wait_for_children(zio, ZIO_CHILD_VDEV_BIT, ZIO_WAIT_DONE)) {
+        if (zio_wait_for_children(zio, ZIO_CHILD_VDEV, ZIO_WAIT_DONE))
                 return (ZIO_PIPELINE_STOP);
-        }
 
         ASSERT(zio->io_type == ZIO_TYPE_READ || zio->io_type == ZIO_TYPE_WRITE);
 
         if (vd != NULL && vd->vdev_ops->vdev_op_leaf) {
-
                 vdev_queue_io_done(zio);
 
                 if (zio->io_type == ZIO_TYPE_WRITE)
                         vdev_cache_write(zio);
 

@@ -3222,10 +3425,16 @@
         ops->vdev_op_io_done(zio);
 
         if (unexpected_error)
                 VERIFY(vdev_probe(vd, zio) == NULL);
 
+        /*
+         * Measure delta between start and end of the I/O in nanoseconds.
+         * XXX: Handle overflow.
+         */
+        zio->io_vd_timestamp = gethrtime() - zio->io_vd_timestamp;
+
         return (ZIO_PIPELINE_CONTINUE);
 }
 
 /*
  * For non-raidz ZIOs, we can just copy aside the bad data read from the

@@ -3256,13 +3465,12 @@
 static int
 zio_vdev_io_assess(zio_t *zio)
 {
         vdev_t *vd = zio->io_vd;
 
-        if (zio_wait_for_children(zio, ZIO_CHILD_VDEV_BIT, ZIO_WAIT_DONE)) {
+        if (zio_wait_for_children(zio, ZIO_CHILD_VDEV, ZIO_WAIT_DONE))
                 return (ZIO_PIPELINE_STOP);
-        }
 
         if (vd == NULL && !(zio->io_flags & ZIO_FLAG_CONFIG_WRITER))
                 spa_config_exit(zio->io_spa, SCL_ZIO, zio);
 
         if (zio->io_vsd != NULL) {

@@ -3473,14 +3681,13 @@
 {
         blkptr_t *bp = zio->io_bp;
         zio_t *pio, *pio_next;
         zio_link_t *zl = NULL;
 
-        if (zio_wait_for_children(zio, ZIO_CHILD_GANG_BIT | ZIO_CHILD_DDT_BIT,
-            ZIO_WAIT_READY)) {
+        if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_READY) ||
+            zio_wait_for_children(zio, ZIO_CHILD_DDT, ZIO_WAIT_READY))
                 return (ZIO_PIPELINE_STOP);
-        }
 
         if (zio->io_ready) {
                 ASSERT(IO_IS_ALLOCATING(zio));
                 ASSERT(bp->blk_birth == zio->io_txg || BP_IS_HOLE(bp) ||
                     (zio->io_flags & ZIO_FLAG_NOPWRITE));

@@ -3500,14 +3707,13 @@
                         ASSERT(zio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
                         /*
                          * We were unable to allocate anything, unreserve and
                          * issue the next I/O to allocate.
                          */
-                        metaslab_class_throttle_unreserve(
-                            spa_normal_class(zio->io_spa),
+                        metaslab_class_throttle_unreserve(zio->io_mc,
                             zio->io_prop.zp_copies, zio);
-                        zio_allocate_dispatch(zio->io_spa);
+                        zio_allocate_dispatch(zio->io_mc);
                 }
         }
 
         mutex_enter(&zio->io_lock);
         zio->io_state[ZIO_WAIT_READY] = 1;

@@ -3589,19 +3795,18 @@
 
         mutex_enter(&pio->io_lock);
         metaslab_group_alloc_decrement(zio->io_spa, vd->vdev_id, pio, flags);
         mutex_exit(&pio->io_lock);
 
-        metaslab_class_throttle_unreserve(spa_normal_class(zio->io_spa),
-            1, pio);
+        metaslab_class_throttle_unreserve(pio->io_mc, 1, pio);
 
         /*
          * Call into the pipeline to see if there is more work that
          * needs to be done. If there is work to be done it will be
          * dispatched to another taskq thread.
          */
-        zio_allocate_dispatch(zio->io_spa);
+        zio_allocate_dispatch(pio->io_mc);
 }
 
 static int
 zio_done(zio_t *zio)
 {

@@ -3609,20 +3814,22 @@
         zio_t *lio = zio->io_logical;
         blkptr_t *bp = zio->io_bp;
         vdev_t *vd = zio->io_vd;
         uint64_t psize = zio->io_size;
         zio_t *pio, *pio_next;
-        metaslab_class_t *mc = spa_normal_class(spa);
+        metaslab_class_t *mc = zio->io_mc;
         zio_link_t *zl = NULL;
 
         /*
          * If our children haven't all completed,
          * wait for them and then repeat this pipeline stage.
          */
-        if (zio_wait_for_children(zio, ZIO_CHILD_ALL_BITS, ZIO_WAIT_DONE)) {
+        if (zio_wait_for_children(zio, ZIO_CHILD_VDEV, ZIO_WAIT_DONE) ||
+            zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_DONE) ||
+            zio_wait_for_children(zio, ZIO_CHILD_DDT, ZIO_WAIT_DONE) ||
+            zio_wait_for_children(zio, ZIO_CHILD_LOGICAL, ZIO_WAIT_DONE))
                 return (ZIO_PIPELINE_STOP);
-        }
 
         /*
          * If the allocation throttle is enabled, then update the accounting.
          * We only track child I/Os that are part of an allocating async
          * write. We must do this since the allocation is performed

@@ -3908,10 +4115,38 @@
         }
 
         return (ZIO_PIPELINE_STOP);
 }
 
+zio_t *
+zio_wbc(zio_type_t type, vdev_t *vd, abd_t *data,
+    uint64_t size, uint64_t offset)
+{
+        zio_t *zio = NULL;
+
+        switch (type) {
+        case ZIO_TYPE_WRITE:
+                zio = zio_create(NULL, vd->vdev_spa, 0, NULL, data, size,
+                    size, NULL, NULL, ZIO_TYPE_WRITE, ZIO_PRIORITY_ASYNC_WRITE,
+                    ZIO_FLAG_PHYSICAL, vd, offset,
+                    NULL, ZIO_STAGE_OPEN, ZIO_WRITE_PHYS_PIPELINE);
+                break;
+        case ZIO_TYPE_READ:
+                zio = zio_create(NULL, vd->vdev_spa, 0, NULL, data, size,
+                    size, NULL, NULL, ZIO_TYPE_READ, ZIO_PRIORITY_ASYNC_READ,
+                    ZIO_FLAG_DONT_CACHE | ZIO_FLAG_PHYSICAL, vd, offset,
+                    NULL, ZIO_STAGE_OPEN, ZIO_READ_PHYS_PIPELINE);
+                break;
+        default:
+                ASSERT(0);
+        }
+
+        zio->io_prop.zp_checksum = ZIO_CHECKSUM_OFF;
+
+        return (zio);
+}
+
 /*
  * ==========================================================================
  * I/O pipeline definition
  * ==========================================================================
  */