Print this page
NEX-18589 checksum errors on SSD-based pool
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
NEX-14242 Getting panic in module "zfs" due to a NULL pointer dereference
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
NEX-17716 reoccurring checksum errors on pool
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
NEX-9552 zfs_scan_idle throttling harms performance and needs to be removed
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
NEX-15266 Default resilver throttling values are too aggressive
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
NEX-15266 Default resilver throttling values are too aggressive
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
NEX-10069 ZFS_READONLY is a little too strict (fix test lint)
NEX-9553 Move ss_fill gap logic from scan algorithm into range_tree.c
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
Reviewed by: Yuri Pankov <yuri.pankov@nexenta.com>
NEX-9752 backport illumos 6950 ARC should cache compressed data
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
Reviewed by: Yuri Pankov <yuri.pankov@nexenta.com>
6950 ARC should cache compressed data
Reviewed by: Prakash Surya <prakash.surya@delphix.com>
Reviewed by: Dan Kimmel <dan.kimmel@delphix.com>
Reviewed by: Matt Ahrens <mahrens@delphix.com>
Reviewed by: Paul Dagnelie <pcd@delphix.com>
Reviewed by: Don Brady <don.brady@intel.com>
Reviewed by: Richard Elling <Richard.Elling@RichardElling.com>
Approved by: Richard Lowe <richlowe@richlowe.net>
NEX-9719 Reorganize scan_io_t to make it smaller and improve scan performance
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
NEX-9658 Resilver code leaks the block sorting queues
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
NEX-9651 Resilver code leaks a bit of memory in the dataset processing queue
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
NEX-9551 Resilver algorithm should properly sort metadata and data with copies > 1
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
Reviewed by: Yuri Pankov <yuri.pankov@nexenta.com>
NEX-9601 New resilver algorithm causes scrub errors on WBC devices
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
NEX-9593 New resilvering algorithm can panic when WBC mirrors are in use
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
Reviewed by: Yuri Pankov <yuri.pankov@nexenta.com>
NEX-9554 dsl_scan.c internals contain some confusingly similar function names for handling the dataset and block sorting queues
Reviewed by: Yuri Pankov <yuri.pankov@nexenta.com>
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
NEX-9562 Attaching a vdev while resilver/scrub is running causes panic.
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
NEX-6088 ZFS scrub/resilver take excessively long due to issuing lots of random IO
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
NEX-5553 ZFS auto-trim, manual-trim and scrub can race and deadlock
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
Reviewed by: Rob Gittins <rob.gittins@nexenta.com>
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
NEX-4940 Special Vdev operation in presence (or absense) of IO Errors
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
Reviewed by: Alex Aizman <alex.aizman@nexenta.com>
NEX-4705 WRC: Kernel-panic during the destroying of a pool with activated WRC
Reviewed by: Alex Aizman <alex.aizman@nexenta.com>
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
6450 scrub/resilver unnecessarily traverses snapshots created after the scrub started
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
Reviewed by: Josef 'Jeff' Sipek <josef.sipek@nexenta.com>
6292 exporting a pool while an async destroy is running can leave entries in the deferred tree
Reviewed by: Paul Dagnelie <pcd@delphix.com>
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: Andriy Gapon <avg@FreeBSD.org>
Reviewed by: Fabian Keil <fk@fabiankeil.de>
Approved by: Gordon Ross <gordon.ross@nexenta.com>
4185 add new cryptographic checksums to ZFS: SHA-512, Skein, Edon-R (fix multi-proto)
6251 add tunable to disable free_bpobj processing
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: Prakash Surya <prakash.surya@delphix.com>
Reviewed by: Simon Klinkert <simon.klinkert@gmail.com>
Reviewed by: Richard Elling <Richard.Elling@RichardElling.com>
Reviewed by: Albert Lee <trisk@omniti.com>
Reviewed by: Xin Li <delphij@freebsd.org>
Approved by: Garrett D'Amore <garrett@damore.org>
NEX-4582 update wrc test cases for allow to use write back cache per tree of datasets
Reviewed by: Steve Peng <steve.peng@nexenta.com>
Reviewed by: Alex Aizman <alex.aizman@nexenta.com>
5960 zfs recv should prefetch indirect blocks
5925 zfs receive -o origin=
Reviewed by: Prakash Surya <prakash.surya@delphix.com>
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
NEX-3984 On-demand TRIM
Reviewed by: Alek Pinchuk <alek@nexenta.com>
Reviewed by: Josef 'Jeff' Sipek <josef.sipek@nexenta.com>
Conflicts:
        usr/src/common/zfs/zpool_prop.c
        usr/src/uts/common/sys/fs/zfs.h
4391 panic system rather than corrupting pool if we hit bug 4390
Reviewed by: Adam Leventhal <ahl@delphix.com>
Reviewed by: Christopher Siden <christopher.siden@delphix.com>
Approved by: Gordon Ross <gwr@nexenta.com>
4370 avoid transmitting holes during zfs send
4371 DMU code clean up
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: George Wilson <george.wilson@delphix.com>
Reviewed by: Christopher Siden <christopher.siden@delphix.com>
Reviewed by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
Approved by: Garrett D'Amore <garrett@damore.org>
OS-80 support for vdev and CoS properties for the new I/O scheduler
OS-95 lint warning introduced by OS-61
Issue #26: partial scrub
Added partial scrub options:
-M for MOS only scrub
-m for metadata scrub
Issue #2: optimize DDE lookup in DDT objects
Added option to control number of classes of DDE's in DDT.
New default is one, that is all DDE's are stored together
regardless of refcount.
re #12619 rb4429 More dp->dp_config_rwlock holds
re #12585 rb4049 ZFS++ work port - refactoring to improve separation of open/closed code, bug fixes, performance improvements - open code

@@ -18,12 +18,13 @@
  *
  * CDDL HEADER END
  */
 /*
  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Copyright 2017 Nexenta Systems, Inc.  All rights reserved.
  * Copyright 2016 Gary Mills
- * Copyright (c) 2011, 2017 by Delphix. All rights reserved.
+ * Copyright (c) 2011, 2016 by Delphix. All rights reserved.
  * Copyright 2017 Joyent, Inc.
  * Copyright (c) 2017 Datto Inc.
  */
 
 #include <sys/dsl_scan.h>

@@ -51,38 +52,117 @@
 #include <sys/zfeature.h>
 #include <sys/abd.h>
 #ifdef _KERNEL
 #include <sys/zfs_vfsops.h>
 #endif
+#include <sys/range_tree.h>
 
+extern int zfs_vdev_async_write_active_min_dirty_percent;
+
+typedef struct {
+        uint64_t        sds_dsobj;
+        uint64_t        sds_txg;
+        avl_node_t      sds_node;
+} scan_ds_t;
+
+typedef struct {
+        dsl_scan_io_queue_t     *qri_queue;
+        uint64_t                qri_limit;
+} io_queue_run_info_t;
+
+/*
+ * This controls what conditions are placed on dsl_scan_sync_state():
+ * SYNC_OPTIONAL) write out scn_phys iff scn_bytes_pending == 0
+ * SYNC_MANDATORY) write out scn_phys always. scn_bytes_pending must be 0.
+ * SYNC_CACHED) if scn_bytes_pending == 0, write out scn_phys. Otherwise
+ *      write out the scn_phys_cached version.
+ * See dsl_scan_sync_state for details.
+ */
+typedef enum {
+        SYNC_OPTIONAL,
+        SYNC_MANDATORY,
+        SYNC_CACHED
+} state_sync_type_t;
+
 typedef int (scan_cb_t)(dsl_pool_t *, const blkptr_t *,
     const zbookmark_phys_t *);
 
 static scan_cb_t dsl_scan_scrub_cb;
 static void dsl_scan_cancel_sync(void *, dmu_tx_t *);
-static void dsl_scan_sync_state(dsl_scan_t *, dmu_tx_t *);
+static void dsl_scan_sync_state(dsl_scan_t *scn, dmu_tx_t *tx,
+    state_sync_type_t sync_type);
 static boolean_t dsl_scan_restarting(dsl_scan_t *, dmu_tx_t *);
 
-int zfs_top_maxinflight = 32;           /* maximum I/Os per top-level */
-int zfs_resilver_delay = 2;             /* number of ticks to delay resilver */
-int zfs_scrub_delay = 4;                /* number of ticks to delay scrub */
-int zfs_scan_idle = 50;                 /* idle window in clock ticks */
+static int scan_ds_queue_compar(const void *a, const void *b);
+static void scan_ds_queue_empty(dsl_scan_t *scn, boolean_t destroy);
+static boolean_t scan_ds_queue_contains(dsl_scan_t *scn, uint64_t dsobj,
+    uint64_t *txg);
+static int scan_ds_queue_insert(dsl_scan_t *scn, uint64_t dsobj, uint64_t txg);
+static void scan_ds_queue_remove(dsl_scan_t *scn, uint64_t dsobj);
+static boolean_t scan_ds_queue_first(dsl_scan_t *scn, uint64_t *dsobj,
+    uint64_t *txg);
+static void scan_ds_queue_sync(dsl_scan_t *scn, dmu_tx_t *tx);
 
+/*
+ * Maximum number of parallelly executing I/Os per top-level vdev.
+ * Tune with care. Very high settings (hundreds) are known to trigger
+ * some firmware bugs and resets on certain SSDs.
+ */
+int zfs_top_maxinflight = 32;
+
+/*
+ * Minimum amount of data we dequeue if our queues are full and the
+ * dirty data limit for a txg has been reached.
+ */
+uint64_t zfs_scan_dequeue_min =                 16 << 20;
+/*
+ * The duration target we're aiming for a dsl_scan_sync to take due to our
+ * dequeued data. If we go over that value, we lower the amount we dequeue
+ * each run and vice versa. The bonus value below is just something we add
+ * on top of he target value so that we have a little bit of fudging in case
+ * some top-level vdevs finish before others - we want to keep the vdevs as
+ * hot as possible.
+ */
+uint64_t zfs_scan_dequeue_run_target_ms =       2000;
+uint64_t zfs_dequeue_run_bonus_ms =             1000;
+#define DEQUEUE_BONUS_MS_MAX                    100000
+
+boolean_t zfs_scan_direct = B_FALSE;    /* don't queue & sort zios, go direct */
+uint64_t zfs_scan_max_ext_gap = 2 << 20;        /* bytes */
+/* See scan_io_queue_mem_lim for details on the memory limit tunables */
+uint64_t zfs_scan_mem_lim_fact = 20;            /* fraction of physmem */
+uint64_t zfs_scan_mem_lim_soft_fact = 20;       /* fraction of mem lim above */
+uint64_t zfs_scan_checkpoint_intval = 7200;     /* seconds */
+/*
+ * fill_weight is non-tunable at runtime, so we copy it at module init from
+ * zfs_scan_fill_weight. Runtime adjustments to zfs_scan_fill_weight would
+ * break queue sorting.
+ */
+uint64_t zfs_scan_fill_weight = 3;
+static uint64_t fill_weight = 3;
+
+/* See scan_io_queue_mem_lim for details on the memory limit tunables */
+uint64_t zfs_scan_mem_lim_min = 16 << 20;       /* bytes */
+uint64_t zfs_scan_mem_lim_soft_max = 128 << 20; /* bytes */
+
+#define ZFS_SCAN_CHECKPOINT_INTVAL      SEC_TO_TICK(zfs_scan_checkpoint_intval)
+
 int zfs_scan_min_time_ms = 1000; /* min millisecs to scrub per txg */
 int zfs_free_min_time_ms = 1000; /* min millisecs to free per txg */
-int zfs_obsolete_min_time_ms = 500; /* min millisecs to obsolete per txg */
 int zfs_resilver_min_time_ms = 3000; /* min millisecs to resilver per txg */
 boolean_t zfs_no_scrub_io = B_FALSE; /* set to disable scrub i/o */
 boolean_t zfs_no_scrub_prefetch = B_FALSE; /* set to disable scrub prefetch */
 enum ddt_class zfs_scrub_ddt_class_max = DDT_CLASS_DUPLICATE;
 int dsl_scan_delay_completion = B_FALSE; /* set to delay scan completion */
 /* max number of blocks to free in a single TXG */
-uint64_t zfs_async_block_max_blocks = UINT64_MAX;
+uint64_t zfs_free_max_blocks = UINT64_MAX;
 
 #define DSL_SCAN_IS_SCRUB_RESILVER(scn) \
         ((scn)->scn_phys.scn_func == POOL_SCAN_SCRUB || \
-        (scn)->scn_phys.scn_func == POOL_SCAN_RESILVER)
+        (scn)->scn_phys.scn_func == POOL_SCAN_RESILVER || \
+        (scn)->scn_phys.scn_func == POOL_SCAN_MOS || \
+        (scn)->scn_phys.scn_func == POOL_SCAN_META)
 
 extern int zfs_txg_timeout;
 
 /*
  * Enable/disable the processing of the free_bpobj object.

@@ -92,12 +172,134 @@
 /* the order has to match pool_scan_type */
 static scan_cb_t *scan_funcs[POOL_SCAN_FUNCS] = {
         NULL,
         dsl_scan_scrub_cb,      /* POOL_SCAN_SCRUB */
         dsl_scan_scrub_cb,      /* POOL_SCAN_RESILVER */
+        dsl_scan_scrub_cb,      /* POOL_SCAN_MOS */
+        dsl_scan_scrub_cb,      /* POOL_SCAN_META */
 };
 
+typedef struct scan_io {
+        uint64_t                sio_prop;
+        uint64_t                sio_phys_birth;
+        uint64_t                sio_birth;
+        zio_cksum_t             sio_cksum;
+        zbookmark_phys_t        sio_zb;
+        union {
+                avl_node_t      sio_addr_node;
+                list_node_t     sio_list_node;
+        } sio_nodes;
+        uint64_t                sio_dva_word1;
+        uint32_t                sio_asize;
+        int                     sio_flags;
+} scan_io_t;
+
+struct dsl_scan_io_queue {
+        dsl_scan_t      *q_scn;
+        vdev_t          *q_vd;
+
+        kcondvar_t      q_cv;
+
+        range_tree_t    *q_exts_by_addr;
+        avl_tree_t      q_zios_by_addr;
+        avl_tree_t      q_exts_by_size;
+
+        /* number of bytes in queued zios - atomic ops */
+        uint64_t        q_zio_bytes;
+
+        range_seg_t     q_issuing_rs;
+        uint64_t        q_num_issuing_zios;
+};
+
+#define SCAN_IO_GET_OFFSET(sio) \
+        BF64_GET_SB((sio)->sio_dva_word1, 0, 63, SPA_MINBLOCKSHIFT, 0)
+#define SCAN_IO_SET_OFFSET(sio, offset) \
+        BF64_SET_SB((sio)->sio_dva_word1, 0, 63, SPA_MINBLOCKSHIFT, 0, offset)
+
+static void scan_io_queue_insert_cb(range_tree_t *rt, range_seg_t *rs,
+    void *arg);
+static void scan_io_queue_remove_cb(range_tree_t *rt, range_seg_t *rs,
+    void *arg);
+static void scan_io_queue_vacate_cb(range_tree_t *rt, void *arg);
+static int ext_size_compar(const void *x, const void *y);
+static int io_addr_compar(const void *x, const void *y);
+
+static struct range_tree_ops scan_io_queue_ops = {
+        .rtop_create = NULL,
+        .rtop_destroy = NULL,
+        .rtop_add = scan_io_queue_insert_cb,
+        .rtop_remove = scan_io_queue_remove_cb,
+        .rtop_vacate = scan_io_queue_vacate_cb
+};
+
+typedef enum {
+        MEM_LIM_NONE,
+        MEM_LIM_SOFT,
+        MEM_LIM_HARD
+} mem_lim_t;
+
+static void dsl_scan_enqueue(dsl_pool_t *dp, const blkptr_t *bp,
+    int zio_flags, const zbookmark_phys_t *zb);
+static void scan_exec_io(dsl_pool_t *dp, const blkptr_t *bp, int zio_flags,
+    const zbookmark_phys_t *zb, boolean_t limit_inflight);
+static void scan_io_queue_insert(dsl_scan_t *scn, dsl_scan_io_queue_t *queue,
+    const blkptr_t *bp, int dva_i, int zio_flags, const zbookmark_phys_t *zb);
+
+static void scan_io_queues_run_one(io_queue_run_info_t *info);
+static void scan_io_queues_run(dsl_scan_t *scn);
+static mem_lim_t scan_io_queue_mem_lim(dsl_scan_t *scn);
+
+static dsl_scan_io_queue_t *scan_io_queue_create(vdev_t *vd);
+static void scan_io_queues_destroy(dsl_scan_t *scn);
+static void dsl_scan_freed_dva(spa_t *spa, const blkptr_t *bp, int dva_i);
+
+static inline boolean_t
+dsl_scan_is_running(const dsl_scan_t *scn)
+{
+        return (scn->scn_phys.scn_state == DSS_SCANNING ||
+            scn->scn_phys.scn_state == DSS_FINISHING);
+}
+
+static inline void
+sio2bp(const scan_io_t *sio, blkptr_t *bp, uint64_t vdev_id)
+{
+        bzero(bp, sizeof (*bp));
+        DVA_SET_ASIZE(&bp->blk_dva[0], sio->sio_asize);
+        DVA_SET_VDEV(&bp->blk_dva[0], vdev_id);
+        bp->blk_dva[0].dva_word[1] = sio->sio_dva_word1;
+        bp->blk_prop = sio->sio_prop;
+        /*
+         * We must reset the special flag, because the rebuilt BP lacks
+         * a second DVA, so wbc_select_dva must not be allowed to run.
+         */
+        BP_SET_SPECIAL(bp, 0);
+        bp->blk_phys_birth = sio->sio_phys_birth;
+        bp->blk_birth = sio->sio_birth;
+        bp->blk_fill = 1;       /* we always only work with data pointers */
+        bp->blk_cksum = sio->sio_cksum;
+}
+
+static inline void
+bp2sio(const blkptr_t *bp, scan_io_t *sio, int dva_i)
+{
+        if (BP_IS_SPECIAL(bp))
+                ASSERT3S(dva_i, ==, WBC_NORMAL_DVA);
+        /* we discard the vdev guid, since we can deduce it from the queue */
+        sio->sio_dva_word1 = bp->blk_dva[dva_i].dva_word[1];
+        sio->sio_asize = DVA_GET_ASIZE(&bp->blk_dva[dva_i]);
+        sio->sio_prop = bp->blk_prop;
+        sio->sio_phys_birth = bp->blk_phys_birth;
+        sio->sio_birth = bp->blk_birth;
+        sio->sio_cksum = bp->blk_cksum;
+}
+
+void
+dsl_scan_global_init()
+{
+        fill_weight = zfs_scan_fill_weight;
+}
+
 int
 dsl_scan_init(dsl_pool_t *dp, uint64_t txg)
 {
         int err;
         dsl_scan_t *scn;

@@ -105,27 +307,37 @@
         uint64_t f;
 
         scn = dp->dp_scan = kmem_zalloc(sizeof (dsl_scan_t), KM_SLEEP);
         scn->scn_dp = dp;
 
+        mutex_init(&scn->scn_sorted_lock, NULL, MUTEX_DEFAULT, NULL);
+        mutex_init(&scn->scn_status_lock, NULL, MUTEX_DEFAULT, NULL);
+
         /*
          * It's possible that we're resuming a scan after a reboot so
          * make sure that the scan_async_destroying flag is initialized
          * appropriately.
          */
         ASSERT(!scn->scn_async_destroying);
         scn->scn_async_destroying = spa_feature_is_active(dp->dp_spa,
             SPA_FEATURE_ASYNC_DESTROY);
 
+        bcopy(&scn->scn_phys, &scn->scn_phys_cached, sizeof (scn->scn_phys));
+        mutex_init(&scn->scn_queue_lock, NULL, MUTEX_DEFAULT, NULL);
+        avl_create(&scn->scn_queue, scan_ds_queue_compar, sizeof (scan_ds_t),
+            offsetof(scan_ds_t, sds_node));
+
         err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
             "scrub_func", sizeof (uint64_t), 1, &f);
         if (err == 0) {
                 /*
                  * There was an old-style scrub in progress.  Restart a
                  * new-style scrub from the beginning.
                  */
                 scn->scn_restart_txg = txg;
+                DTRACE_PROBE2(scan_init__old2new, dsl_scan_t *, scn,
+                    uint64_t, txg);
                 zfs_dbgmsg("old-style scrub was in progress; "
                     "restarting new-style scrub in txg %llu",
                     scn->scn_restart_txg);
 
                 /*

@@ -142,34 +354,68 @@
                 if (err == ENOENT)
                         return (0);
                 else if (err)
                         return (err);
 
-                if (scn->scn_phys.scn_state == DSS_SCANNING &&
+                /*
+                 * We might be restarting after a reboot, so jump the issued
+                 * counter to how far we've scanned. We know we're consistent
+                 * up to here.
+                 */
+                scn->scn_bytes_issued = scn->scn_phys.scn_examined;
+
+                if (dsl_scan_is_running(scn) &&
                     spa_prev_software_version(dp->dp_spa) < SPA_VERSION_SCAN) {
                         /*
                          * A new-type scrub was in progress on an old
                          * pool, and the pool was accessed by old
                          * software.  Restart from the beginning, since
                          * the old software may have changed the pool in
                          * the meantime.
                          */
                         scn->scn_restart_txg = txg;
+                        DTRACE_PROBE2(scan_init__new2old2new,
+                            dsl_scan_t *, scn, uint64_t, txg);
                         zfs_dbgmsg("new-style scrub was modified "
                             "by old software; restarting in txg %llu",
                             scn->scn_restart_txg);
                 }
         }
 
+        /* reload the queue into the in-core state */
+        if (scn->scn_phys.scn_queue_obj != 0) {
+                zap_cursor_t zc;
+                zap_attribute_t za;
+
+                for (zap_cursor_init(&zc, dp->dp_meta_objset,
+                    scn->scn_phys.scn_queue_obj);
+                    zap_cursor_retrieve(&zc, &za) == 0;
+                    (void) zap_cursor_advance(&zc)) {
+                        VERIFY0(scan_ds_queue_insert(scn,
+                            zfs_strtonum(za.za_name, NULL),
+                            za.za_first_integer));
+                }
+                zap_cursor_fini(&zc);
+        }
+
         spa_scan_stat_init(spa);
         return (0);
 }
 
 void
 dsl_scan_fini(dsl_pool_t *dp)
 {
-        if (dp->dp_scan) {
+        if (dp->dp_scan != NULL) {
+                dsl_scan_t *scn = dp->dp_scan;
+
+                mutex_destroy(&scn->scn_sorted_lock);
+                mutex_destroy(&scn->scn_status_lock);
+                if (scn->scn_taskq != NULL)
+                        taskq_destroy(scn->scn_taskq);
+                scan_ds_queue_empty(scn, B_TRUE);
+                mutex_destroy(&scn->scn_queue_lock);
+
                 kmem_free(dp->dp_scan, sizeof (dsl_scan_t));
                 dp->dp_scan = NULL;
         }
 }
 

@@ -177,11 +423,11 @@
 static int
 dsl_scan_setup_check(void *arg, dmu_tx_t *tx)
 {
         dsl_scan_t *scn = dmu_tx_pool(tx)->dp_scan;
 
-        if (scn->scn_phys.scn_state == DSS_SCANNING)
+        if (dsl_scan_is_running(scn))
                 return (SET_ERROR(EBUSY));
 
         return (0);
 }
 

@@ -192,27 +438,32 @@
         pool_scan_func_t *funcp = arg;
         dmu_object_type_t ot = 0;
         dsl_pool_t *dp = scn->scn_dp;
         spa_t *spa = dp->dp_spa;
 
-        ASSERT(scn->scn_phys.scn_state != DSS_SCANNING);
+        ASSERT(!dsl_scan_is_running(scn));
         ASSERT(*funcp > POOL_SCAN_NONE && *funcp < POOL_SCAN_FUNCS);
         bzero(&scn->scn_phys, sizeof (scn->scn_phys));
         scn->scn_phys.scn_func = *funcp;
         scn->scn_phys.scn_state = DSS_SCANNING;
         scn->scn_phys.scn_min_txg = 0;
         scn->scn_phys.scn_max_txg = tx->tx_txg;
-        scn->scn_phys.scn_ddt_class_max = DDT_CLASSES - 1; /* the entire DDT */
+        /* the entire DDT */
+        scn->scn_phys.scn_ddt_class_max = spa->spa_ddt_class_max;
         scn->scn_phys.scn_start_time = gethrestime_sec();
         scn->scn_phys.scn_errors = 0;
         scn->scn_phys.scn_to_examine = spa->spa_root_vdev->vdev_stat.vs_alloc;
         scn->scn_restart_txg = 0;
         scn->scn_done_txg = 0;
+        scn->scn_bytes_issued = 0;
+        scn->scn_checkpointing = B_FALSE;
+        scn->scn_last_checkpoint = 0;
         spa_scan_stat_init(spa);
 
         if (DSL_SCAN_IS_SCRUB_RESILVER(scn)) {
-                scn->scn_phys.scn_ddt_class_max = zfs_scrub_ddt_class_max;
+                scn->scn_phys.scn_ddt_class_max =
+                    MIN(zfs_scrub_ddt_class_max, spa->spa_ddt_class_max);
 
                 /* rewrite all disk labels */
                 vdev_config_dirty(spa->spa_root_vdev);
 
                 if (vdev_resilver_needed(spa->spa_root_vdev,

@@ -228,11 +479,12 @@
                  * If this is an incremental scrub, limit the DDT scrub phase
                  * to just the auto-ditto class (for correctness); the rest
                  * of the scrub should go faster using top-down pruning.
                  */
                 if (scn->scn_phys.scn_min_txg > TXG_INITIAL)
-                        scn->scn_phys.scn_ddt_class_max = DDT_CLASS_DITTO;
+                        scn->scn_phys.scn_ddt_class_max =
+                            MIN(DDT_CLASS_DITTO, spa->spa_ddt_class_max);
 
         }
 
         /* back to the generic stuff */
 

@@ -246,12 +498,14 @@
                 ot = DMU_OT_ZAP_OTHER;
 
         scn->scn_phys.scn_queue_obj = zap_create(dp->dp_meta_objset,
             ot ? ot : DMU_OT_SCAN_QUEUE, DMU_OT_NONE, 0, tx);
 
-        dsl_scan_sync_state(scn, tx);
+        bcopy(&scn->scn_phys, &scn->scn_phys_cached, sizeof (scn->scn_phys));
 
+        dsl_scan_sync_state(scn, tx, SYNC_MANDATORY);
+
         spa_history_log_internal(spa, "scan setup", tx,
             "func=%u mintxg=%llu maxtxg=%llu",
             *funcp, scn->scn_phys.scn_min_txg, scn->scn_phys.scn_max_txg);
 }
 

@@ -280,29 +534,39 @@
                 (void) zap_remove(dp->dp_meta_objset,
                     DMU_POOL_DIRECTORY_OBJECT, old_names[i], tx);
         }
 
         if (scn->scn_phys.scn_queue_obj != 0) {
-                VERIFY(0 == dmu_object_free(dp->dp_meta_objset,
+                VERIFY0(dmu_object_free(dp->dp_meta_objset,
                     scn->scn_phys.scn_queue_obj, tx));
                 scn->scn_phys.scn_queue_obj = 0;
         }
+        scan_ds_queue_empty(scn, B_FALSE);
 
         scn->scn_phys.scn_flags &= ~DSF_SCRUB_PAUSED;
 
         /*
          * If we were "restarted" from a stopped state, don't bother
          * with anything else.
          */
-        if (scn->scn_phys.scn_state != DSS_SCANNING)
+        if (!dsl_scan_is_running(scn)) {
+                ASSERT(!scn->scn_is_sorted);
                 return;
+        }
 
-        if (complete)
-                scn->scn_phys.scn_state = DSS_FINISHED;
-        else
-                scn->scn_phys.scn_state = DSS_CANCELED;
+        if (scn->scn_is_sorted) {
+                scan_io_queues_destroy(scn);
+                scn->scn_is_sorted = B_FALSE;
 
+                if (scn->scn_taskq != NULL) {
+                        taskq_destroy(scn->scn_taskq);
+                        scn->scn_taskq = NULL;
+                }
+        }
+
+        scn->scn_phys.scn_state = complete ? DSS_FINISHED : DSS_CANCELED;
+
         if (dsl_scan_restarting(scn, tx))
                 spa_history_log_internal(spa, "scan aborted, restarting", tx,
                     "errors=%llu", spa_get_errlog_size(spa));
         else if (!complete)
                 spa_history_log_internal(spa, "scan cancelled", tx,

@@ -341,19 +605,28 @@
                  */
                 spa_async_request(spa, SPA_ASYNC_RESILVER_DONE);
         }
 
         scn->scn_phys.scn_end_time = gethrestime_sec();
+
+        ASSERT(!dsl_scan_is_running(scn));
+
+        /*
+         * If the special-vdev does not have any errors after
+         * SCRUB/RESILVER we need to drop flag that does not
+         * allow to write to special
+         */
+        spa_special_check_errors(spa);
 }
 
 /* ARGSUSED */
 static int
 dsl_scan_cancel_check(void *arg, dmu_tx_t *tx)
 {
         dsl_scan_t *scn = dmu_tx_pool(tx)->dp_scan;
 
-        if (scn->scn_phys.scn_state != DSS_SCANNING)
+        if (!dsl_scan_is_running(scn))
                 return (SET_ERROR(ENOENT));
         return (0);
 }
 
 /* ARGSUSED */

@@ -361,12 +634,11 @@
 dsl_scan_cancel_sync(void *arg, dmu_tx_t *tx)
 {
         dsl_scan_t *scn = dmu_tx_pool(tx)->dp_scan;
 
         dsl_scan_done(scn, B_FALSE, tx);
-        dsl_scan_sync_state(scn, tx);
-        spa_event_notify(scn->scn_dp->dp_spa, NULL, NULL, ESC_ZFS_SCRUB_ABORT);
+        dsl_scan_sync_state(scn, tx, SYNC_MANDATORY);
 }
 
 int
 dsl_scan_cancel(dsl_pool_t *dp)
 {

@@ -416,12 +688,12 @@
 
         if (*cmd == POOL_SCRUB_PAUSE) {
                 /* can't pause a scrub when there is no in-progress scrub */
                 spa->spa_scan_pass_scrub_pause = gethrestime_sec();
                 scn->scn_phys.scn_flags |= DSF_SCRUB_PAUSED;
-                dsl_scan_sync_state(scn, tx);
-                spa_event_notify(spa, NULL, NULL, ESC_ZFS_SCRUB_PAUSED);
+                scn->scn_phys_cached.scn_flags |= DSF_SCRUB_PAUSED;
+                dsl_scan_sync_state(scn, tx, SYNC_CACHED);
         } else {
                 ASSERT3U(*cmd, ==, POOL_SCRUB_NORMAL);
                 if (dsl_scan_is_paused_scrub(scn)) {
                         /*
                          * We need to keep track of how much time we spend

@@ -430,11 +702,12 @@
                          */
                         spa->spa_scan_pass_scrub_spent_paused +=
                             gethrestime_sec() - spa->spa_scan_pass_scrub_pause;
                         spa->spa_scan_pass_scrub_pause = 0;
                         scn->scn_phys.scn_flags &= ~DSF_SCRUB_PAUSED;
-                        dsl_scan_sync_state(scn, tx);
+                        scn->scn_phys_cached.scn_flags &= ~DSF_SCRUB_PAUSED;
+                        dsl_scan_sync_state(scn, tx, SYNC_CACHED);
                 }
         }
 }
 
 /*

@@ -451,11 +724,12 @@
 boolean_t
 dsl_scan_scrubbing(const dsl_pool_t *dp)
 {
         dsl_scan_t *scn = dp->dp_scan;
 
-        if (scn->scn_phys.scn_state == DSS_SCANNING &&
+        if ((scn->scn_phys.scn_state == DSS_SCANNING ||
+            scn->scn_phys.scn_state == DSS_FINISHING) &&
             scn->scn_phys.scn_func == POOL_SCAN_SCRUB)
                 return (B_TRUE);
 
         return (B_FALSE);
 }

@@ -487,21 +761,192 @@
         if (ds->ds_is_snapshot)
                 return (MIN(smt, dsl_dataset_phys(ds)->ds_creation_txg));
         return (smt);
 }
 
+/*
+ * This is the dataset processing "queue", i.e. the datasets that are to be
+ * scanned for data locations and inserted into the LBA reordering tree.
+ * Please note that even though we call this a "queue", the actual
+ * implementation uses an avl tree (to detect double insertion). The tree
+ * uses the dataset object set number for the sorting criterion, so
+ * scan_ds_queue_insert CANNOT be guaranteed to always append stuff at the
+ * end (datasets are inserted by the scanner in discovery order, i.e.
+ * parent-child relationships). Consequently, the scanner must never step
+ * through the AVL tree in a naively sequential fashion using AVL_NEXT.
+ * We must always use scan_ds_queue_first to pick the first dataset in the
+ * list, process it, remove it using scan_ds_queue_remove and pick the next
+ * first dataset, again using scan_ds_queue_first.
+ */
+static int
+scan_ds_queue_compar(const void *a, const void *b)
+{
+        const scan_ds_t *sds_a = a, *sds_b = b;
+
+        if (sds_a->sds_dsobj < sds_b->sds_dsobj)
+                return (-1);
+        if (sds_a->sds_dsobj == sds_b->sds_dsobj)
+                return (0);
+        return (1);
+}
+
 static void
-dsl_scan_sync_state(dsl_scan_t *scn, dmu_tx_t *tx)
+scan_ds_queue_empty(dsl_scan_t *scn, boolean_t destroy)
 {
+        void *cookie = NULL;
+        scan_ds_t *sds;
+
+        mutex_enter(&scn->scn_queue_lock);
+        while ((sds = avl_destroy_nodes(&scn->scn_queue, &cookie)) != NULL)
+                kmem_free(sds, sizeof (*sds));
+        mutex_exit(&scn->scn_queue_lock);
+
+        if (destroy)
+                avl_destroy(&scn->scn_queue);
+}
+
+static boolean_t
+scan_ds_queue_contains(dsl_scan_t *scn, uint64_t dsobj, uint64_t *txg)
+{
+        scan_ds_t *sds;
+        scan_ds_t srch = { .sds_dsobj = dsobj };
+
+        mutex_enter(&scn->scn_queue_lock);
+        sds = avl_find(&scn->scn_queue, &srch, NULL);
+        if (sds != NULL && txg != NULL)
+                *txg = sds->sds_txg;
+        mutex_exit(&scn->scn_queue_lock);
+
+        return (sds != NULL);
+}
+
+static int
+scan_ds_queue_insert(dsl_scan_t *scn, uint64_t dsobj, uint64_t txg)
+{
+        scan_ds_t *sds;
+        avl_index_t where;
+
+        sds = kmem_zalloc(sizeof (*sds), KM_SLEEP);
+        sds->sds_dsobj = dsobj;
+        sds->sds_txg = txg;
+
+        mutex_enter(&scn->scn_queue_lock);
+        if (avl_find(&scn->scn_queue, sds, &where) != NULL) {
+                kmem_free(sds, sizeof (*sds));
+                return (EEXIST);
+        }
+        avl_insert(&scn->scn_queue, sds, where);
+        mutex_exit(&scn->scn_queue_lock);
+
+        return (0);
+}
+
+static void
+scan_ds_queue_remove(dsl_scan_t *scn, uint64_t dsobj)
+{
+        scan_ds_t srch, *sds;
+
+        srch.sds_dsobj = dsobj;
+
+        mutex_enter(&scn->scn_queue_lock);
+        sds = avl_find(&scn->scn_queue, &srch, NULL);
+        VERIFY(sds != NULL);
+        avl_remove(&scn->scn_queue, sds);
+        mutex_exit(&scn->scn_queue_lock);
+
+        kmem_free(sds, sizeof (*sds));
+}
+
+static boolean_t
+scan_ds_queue_first(dsl_scan_t *scn, uint64_t *dsobj, uint64_t *txg)
+{
+        scan_ds_t *sds;
+
+        mutex_enter(&scn->scn_queue_lock);
+        sds = avl_first(&scn->scn_queue);
+        if (sds != NULL) {
+                *dsobj = sds->sds_dsobj;
+                *txg = sds->sds_txg;
+        }
+        mutex_exit(&scn->scn_queue_lock);
+
+        return (sds != NULL);
+}
+
+static void
+scan_ds_queue_sync(dsl_scan_t *scn, dmu_tx_t *tx)
+{
+        dsl_pool_t *dp = scn->scn_dp;
+        spa_t *spa = dp->dp_spa;
+        dmu_object_type_t ot = (spa_version(spa) >= SPA_VERSION_DSL_SCRUB) ?
+            DMU_OT_SCAN_QUEUE : DMU_OT_ZAP_OTHER;
+
+        ASSERT0(scn->scn_bytes_pending);
+        ASSERT(scn->scn_phys.scn_queue_obj != 0);
+
+        VERIFY0(dmu_object_free(dp->dp_meta_objset,
+            scn->scn_phys.scn_queue_obj, tx));
+        scn->scn_phys.scn_queue_obj = zap_create(dp->dp_meta_objset, ot,
+            DMU_OT_NONE, 0, tx);
+
+        mutex_enter(&scn->scn_queue_lock);
+        for (scan_ds_t *sds = avl_first(&scn->scn_queue);
+            sds != NULL; sds = AVL_NEXT(&scn->scn_queue, sds)) {
+                VERIFY0(zap_add_int_key(dp->dp_meta_objset,
+                    scn->scn_phys.scn_queue_obj, sds->sds_dsobj,
+                    sds->sds_txg, tx));
+        }
+        mutex_exit(&scn->scn_queue_lock);
+}
+
+/*
+ * Writes out a persistent dsl_scan_phys_t record to the pool directory.
+ * Because we can be running in the block sorting algorithm, we do not always
+ * want to write out the record, only when it is "safe" to do so. This safety
+ * condition is achieved by making sure that the sorting queues are empty
+ * (scn_bytes_pending==0). The sync'ed state could be inconsistent with how
+ * much actual scanning progress has been made. What kind of sync is performed
+ * specified by the sync_type argument. If the sync is optional, we only
+ * sync if the queues are empty. If the sync is mandatory, we do a hard VERIFY
+ * to make sure that the queues are empty. The third possible state is a
+ * "cached" sync. This is done in response to:
+ * 1) The dataset that was in the last sync'ed dsl_scan_phys_t having been
+ *      destroyed, so we wouldn't be able to restart scanning from it.
+ * 2) The snapshot that was in the last sync'ed dsl_scan_phys_t having been
+ *      superseded by a newer snapshot.
+ * 3) The dataset that was in the last sync'ed dsl_scan_phys_t having been
+ *      swapped with its clone.
+ * In all cases, a cached sync simply rewrites the last record we've written,
+ * just slightly modified. For the modifications that are performed to the
+ * last written dsl_scan_phys_t, see dsl_scan_ds_destroyed,
+ * dsl_scan_ds_snapshotted and dsl_scan_ds_clone_swapped.
+ */
+static void
+dsl_scan_sync_state(dsl_scan_t *scn, dmu_tx_t *tx, state_sync_type_t sync_type)
+{
+        mutex_enter(&scn->scn_status_lock);
+        ASSERT(sync_type != SYNC_MANDATORY || scn->scn_bytes_pending == 0);
+        if (scn->scn_bytes_pending == 0) {
+                if (scn->scn_phys.scn_queue_obj != 0)
+                        scan_ds_queue_sync(scn, tx);
         VERIFY0(zap_update(scn->scn_dp->dp_meta_objset,
             DMU_POOL_DIRECTORY_OBJECT,
             DMU_POOL_SCAN, sizeof (uint64_t), SCAN_PHYS_NUMINTS,
             &scn->scn_phys, tx));
+                bcopy(&scn->scn_phys, &scn->scn_phys_cached,
+                    sizeof (scn->scn_phys));
+                scn->scn_checkpointing = B_FALSE;
+                scn->scn_last_checkpoint = ddi_get_lbolt();
+        } else if (sync_type == SYNC_CACHED) {
+                VERIFY0(zap_update(scn->scn_dp->dp_meta_objset,
+                    DMU_POOL_DIRECTORY_OBJECT,
+                    DMU_POOL_SCAN, sizeof (uint64_t), SCAN_PHYS_NUMINTS,
+                    &scn->scn_phys_cached, tx));
+        }
+        mutex_exit(&scn->scn_status_lock);
 }
 
-extern int zfs_vdev_async_write_active_min_dirty_percent;
-
 static boolean_t
 dsl_scan_check_suspend(dsl_scan_t *scn, const zbookmark_phys_t *zb)
 {
         /* we never skip user/group accounting objects */
         if (zb && (int64_t)zb->zb_object < 0)

@@ -528,27 +973,43 @@
          *    (default 30%), or someone is explicitly waiting for this txg
          *    to complete.
          *  or
          *  - the spa is shutting down because this pool is being exported
          *    or the machine is rebooting.
+         *  or
+         *  - the scan queue has reached its memory use limit
          */
         int mintime = (scn->scn_phys.scn_func == POOL_SCAN_RESILVER) ?
             zfs_resilver_min_time_ms : zfs_scan_min_time_ms;
         uint64_t elapsed_nanosecs = gethrtime() - scn->scn_sync_start_time;
         int dirty_pct = scn->scn_dp->dp_dirty_total * 100 / zfs_dirty_data_max;
         if (elapsed_nanosecs / NANOSEC >= zfs_txg_timeout ||
             (NSEC2MSEC(elapsed_nanosecs) > mintime &&
             (txg_sync_waiting(scn->scn_dp) ||
             dirty_pct >= zfs_vdev_async_write_active_min_dirty_percent)) ||
-            spa_shutting_down(scn->scn_dp->dp_spa)) {
+            spa_shutting_down(scn->scn_dp->dp_spa) || scn->scn_clearing ||
+            scan_io_queue_mem_lim(scn) == MEM_LIM_HARD) {
                 if (zb) {
+                        DTRACE_PROBE1(scan_pause, zbookmark_phys_t *, zb);
                         dprintf("suspending at bookmark %llx/%llx/%llx/%llx\n",
                             (longlong_t)zb->zb_objset,
                             (longlong_t)zb->zb_object,
                             (longlong_t)zb->zb_level,
                             (longlong_t)zb->zb_blkid);
                         scn->scn_phys.scn_bookmark = *zb;
+                } else {
+                        DTRACE_PROBE1(scan_pause_ddt, ddt_bookmark_t *,
+                            &scn->scn_phys.scn_ddt_bookmark);
+                        dprintf("pausing at DDT bookmark %llx/%llx/%llx/%llx\n",
+                            (longlong_t)scn->scn_phys.scn_ddt_bookmark.
+                            ddb_class,
+                            (longlong_t)scn->scn_phys.scn_ddt_bookmark.
+                            ddb_type,
+                            (longlong_t)scn->scn_phys.scn_ddt_bookmark.
+                            ddb_checksum,
+                            (longlong_t)scn->scn_phys.scn_ddt_bookmark.
+                            ddb_cursor);
                 }
                 dprintf("suspending at DDT bookmark %llx/%llx/%llx/%llx\n",
                     (longlong_t)scn->scn_phys.scn_ddt_bookmark.ddb_class,
                     (longlong_t)scn->scn_phys.scn_ddt_bookmark.ddb_type,
                     (longlong_t)scn->scn_phys.scn_ddt_bookmark.ddb_checksum,

@@ -694,10 +1155,11 @@
                  * indicate that it's OK to start checking for suspending
                  * again.
                  */
                 if (bcmp(zb, &scn->scn_phys.scn_bookmark, sizeof (*zb)) == 0 ||
                     zb->zb_object > scn->scn_phys.scn_bookmark.zb_object) {
+                        DTRACE_PROBE1(scan_resume, zbookmark_phys_t *, zb);
                         dprintf("resuming at %llx/%llx/%llx/%llx\n",
                             (longlong_t)zb->zb_objset,
                             (longlong_t)zb->zb_object,
                             (longlong_t)zb->zb_level,
                             (longlong_t)zb->zb_blkid);

@@ -728,11 +1190,11 @@
                 arc_buf_t *buf;
 
                 err = arc_read(NULL, dp->dp_spa, bp, arc_getbuf_func, &buf,
                     ZIO_PRIORITY_ASYNC_READ, zio_flags, &flags, zb);
                 if (err) {
-                        scn->scn_phys.scn_errors++;
+                        atomic_inc_64(&scn->scn_phys.scn_errors);
                         return (err);
                 }
                 for (i = 0, cbp = buf->b_data; i < epb; i++, cbp++) {
                         dsl_scan_prefetch(scn, buf, cbp, zb->zb_objset,
                             zb->zb_object, zb->zb_blkid * epb + i);

@@ -755,11 +1217,11 @@
                 arc_buf_t *buf;
 
                 err = arc_read(NULL, dp->dp_spa, bp, arc_getbuf_func, &buf,
                     ZIO_PRIORITY_ASYNC_READ, zio_flags, &flags, zb);
                 if (err) {
-                        scn->scn_phys.scn_errors++;
+                        atomic_inc_64(&scn->scn_phys.scn_errors);
                         return (err);
                 }
                 for (i = 0, cdnp = buf->b_data; i < epb; i++, cdnp++) {
                         for (j = 0; j < cdnp->dn_nblkptr; j++) {
                                 blkptr_t *cbp = &cdnp->dn_blkptr[j];

@@ -779,11 +1241,11 @@
                 arc_buf_t *buf;
 
                 err = arc_read(NULL, dp->dp_spa, bp, arc_getbuf_func, &buf,
                     ZIO_PRIORITY_ASYNC_READ, zio_flags, &flags, zb);
                 if (err) {
-                        scn->scn_phys.scn_errors++;
+                        atomic_inc_64(&scn->scn_phys.scn_errors);
                         return (err);
                 }
 
                 osp = buf->b_data;
 

@@ -859,10 +1321,15 @@
         if (BP_IS_HOLE(bp))
                 return;
 
         scn->scn_visited_this_txg++;
 
+#ifdef  _KERNEL
+        DTRACE_PROBE7(scan_visitbp, blkptr_t *, bp, zbookmark_phys_t *, zb,
+            dnode_phys_t *, dnp, dsl_dataset_t *, ds, dsl_scan_t *, scn,
+            dmu_objset_type_t, ostype, dmu_tx_t *, tx);
+#endif  /* _KERNEL */
         dprintf_bp(bp,
             "visiting ds=%p/%llu zb=%llx/%llx/%llx/%llx bp=%p",
             ds, ds ? ds->ds_object : 0,
             zb->zb_objset, zb->zb_object, zb->zb_level, zb->zb_blkid,
             bp);

@@ -905,24 +1372,19 @@
         SET_BOOKMARK(&zb, ds ? ds->ds_object : DMU_META_OBJSET,
             ZB_ROOT_OBJECT, ZB_ROOT_LEVEL, ZB_ROOT_BLKID);
         dsl_scan_visitbp(bp, &zb, NULL,
             ds, scn, DMU_OST_NONE, tx);
 
+        DTRACE_PROBE4(scan_finished, dsl_scan_t *, scn, dsl_dataset_t *, ds,
+            blkptr_t *, bp, dmu_tx_t *, tx);
         dprintf_ds(ds, "finished scan%s", "");
 }
 
-void
-dsl_scan_ds_destroyed(dsl_dataset_t *ds, dmu_tx_t *tx)
+static void
+ds_destroyed_scn_phys(dsl_dataset_t *ds, dsl_scan_phys_t *scn_phys)
 {
-        dsl_pool_t *dp = ds->ds_dir->dd_pool;
-        dsl_scan_t *scn = dp->dp_scan;
-        uint64_t mintxg;
-
-        if (scn->scn_phys.scn_state != DSS_SCANNING)
-                return;
-
-        if (scn->scn_phys.scn_bookmark.zb_objset == ds->ds_object) {
+        if (scn_phys->scn_bookmark.zb_objset == ds->ds_object) {
                 if (ds->ds_is_snapshot) {
                         /*
                          * Note:
                          *  - scn_cur_{min,max}_txg stays the same.
                          *  - Setting the flag is not really necessary if

@@ -930,27 +1392,64 @@
                          *    is nothing after this snapshot that we care
                          *    about.  However, we set it anyway and then
                          *    ignore it when we retraverse it in
                          *    dsl_scan_visitds().
                          */
-                        scn->scn_phys.scn_bookmark.zb_objset =
+                        scn_phys->scn_bookmark.zb_objset =
                             dsl_dataset_phys(ds)->ds_next_snap_obj;
                         zfs_dbgmsg("destroying ds %llu; currently traversing; "
                             "reset zb_objset to %llu",
                             (u_longlong_t)ds->ds_object,
                             (u_longlong_t)dsl_dataset_phys(ds)->
                             ds_next_snap_obj);
-                        scn->scn_phys.scn_flags |= DSF_VISIT_DS_AGAIN;
+                        scn_phys->scn_flags |= DSF_VISIT_DS_AGAIN;
                 } else {
-                        SET_BOOKMARK(&scn->scn_phys.scn_bookmark,
+                        SET_BOOKMARK(&scn_phys->scn_bookmark,
                             ZB_DESTROYED_OBJSET, 0, 0, 0);
                         zfs_dbgmsg("destroying ds %llu; currently traversing; "
                             "reset bookmark to -1,0,0,0",
                             (u_longlong_t)ds->ds_object);
                 }
-        } else if (zap_lookup_int_key(dp->dp_meta_objset,
-            scn->scn_phys.scn_queue_obj, ds->ds_object, &mintxg) == 0) {
+        }
+}
+
+/*
+ * Invoked when a dataset is destroyed. We need to make sure that:
+ *
+ * 1) If it is the dataset that was currently being scanned, we write
+ *      a new dsl_scan_phys_t and marking the objset reference in it
+ *      as destroyed.
+ * 2) Remove it from the work queue, if it was present.
+ *
+ * If the dataset was actually a snapshot, instead of marking the dataset
+ * as destroyed, we instead substitute the next snapshot in line.
+ */
+void
+dsl_scan_ds_destroyed(dsl_dataset_t *ds, dmu_tx_t *tx)
+{
+        dsl_pool_t *dp = ds->ds_dir->dd_pool;
+        dsl_scan_t *scn = dp->dp_scan;
+        uint64_t mintxg;
+
+        if (!dsl_scan_is_running(scn))
+                return;
+
+        ds_destroyed_scn_phys(ds, &scn->scn_phys);
+        ds_destroyed_scn_phys(ds, &scn->scn_phys_cached);
+
+        if (scan_ds_queue_contains(scn, ds->ds_object, &mintxg)) {
+                scan_ds_queue_remove(scn, ds->ds_object);
+                if (ds->ds_is_snapshot) {
+                        VERIFY0(scan_ds_queue_insert(scn,
+                            dsl_dataset_phys(ds)->ds_next_snap_obj, mintxg));
+                }
+        }
+
+        if (zap_lookup_int_key(dp->dp_meta_objset, scn->scn_phys.scn_queue_obj,
+            ds->ds_object, &mintxg) == 0) {
+                DTRACE_PROBE3(scan_ds_destroyed__in_queue,
+                    dsl_scan_t *, scn, dsl_dataset_t *, ds, dmu_tx_t *, tx);
                 ASSERT3U(dsl_dataset_phys(ds)->ds_num_children, <=, 1);
                 VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
                     scn->scn_phys.scn_queue_obj, ds->ds_object, tx));
                 if (ds->ds_is_snapshot) {
                         /*

@@ -975,34 +1474,58 @@
 
         /*
          * dsl_scan_sync() should be called after this, and should sync
          * out our changed state, but just to be safe, do it here.
          */
-        dsl_scan_sync_state(scn, tx);
+        dsl_scan_sync_state(scn, tx, SYNC_CACHED);
 }
 
+static void
+ds_snapshotted_bookmark(dsl_dataset_t *ds, zbookmark_phys_t *scn_bookmark)
+{
+        if (scn_bookmark->zb_objset == ds->ds_object) {
+                scn_bookmark->zb_objset =
+                    dsl_dataset_phys(ds)->ds_prev_snap_obj;
+                zfs_dbgmsg("snapshotting ds %llu; currently traversing; "
+                    "reset zb_objset to %llu",
+                    (u_longlong_t)ds->ds_object,
+                    (u_longlong_t)dsl_dataset_phys(ds)->ds_prev_snap_obj);
+        }
+}
+
+/*
+ * Called when a dataset is snapshotted. If we were currently traversing
+ * this snapshot, we reset our bookmark to point at the newly created
+ * snapshot. We also modify our work queue to remove the old snapshot and
+ * replace with the new one.
+ */
 void
 dsl_scan_ds_snapshotted(dsl_dataset_t *ds, dmu_tx_t *tx)
 {
         dsl_pool_t *dp = ds->ds_dir->dd_pool;
         dsl_scan_t *scn = dp->dp_scan;
         uint64_t mintxg;
 
-        if (scn->scn_phys.scn_state != DSS_SCANNING)
+        if (!dsl_scan_is_running(scn))
                 return;
 
         ASSERT(dsl_dataset_phys(ds)->ds_prev_snap_obj != 0);
 
-        if (scn->scn_phys.scn_bookmark.zb_objset == ds->ds_object) {
-                scn->scn_phys.scn_bookmark.zb_objset =
-                    dsl_dataset_phys(ds)->ds_prev_snap_obj;
-                zfs_dbgmsg("snapshotting ds %llu; currently traversing; "
-                    "reset zb_objset to %llu",
-                    (u_longlong_t)ds->ds_object,
-                    (u_longlong_t)dsl_dataset_phys(ds)->ds_prev_snap_obj);
-        } else if (zap_lookup_int_key(dp->dp_meta_objset,
-            scn->scn_phys.scn_queue_obj, ds->ds_object, &mintxg) == 0) {
+        ds_snapshotted_bookmark(ds, &scn->scn_phys.scn_bookmark);
+        ds_snapshotted_bookmark(ds, &scn->scn_phys_cached.scn_bookmark);
+
+        if (scan_ds_queue_contains(scn, ds->ds_object, &mintxg)) {
+                scan_ds_queue_remove(scn, ds->ds_object);
+                VERIFY0(scan_ds_queue_insert(scn,
+                    dsl_dataset_phys(ds)->ds_prev_snap_obj, mintxg));
+        }
+
+        if (zap_lookup_int_key(dp->dp_meta_objset, scn->scn_phys.scn_queue_obj,
+            ds->ds_object, &mintxg) == 0) {
+                DTRACE_PROBE3(scan_ds_snapshotted__in_queue,
+                    dsl_scan_t *, scn, dsl_dataset_t *, ds, dmu_tx_t *, tx);
+
                 VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
                     scn->scn_phys.scn_queue_obj, ds->ds_object, tx));
                 VERIFY(zap_add_int_key(dp->dp_meta_objset,
                     scn->scn_phys.scn_queue_obj,
                     dsl_dataset_phys(ds)->ds_prev_snap_obj, mintxg, tx) == 0);

@@ -1009,31 +1532,69 @@
                 zfs_dbgmsg("snapshotting ds %llu; in queue; "
                     "replacing with %llu",
                     (u_longlong_t)ds->ds_object,
                     (u_longlong_t)dsl_dataset_phys(ds)->ds_prev_snap_obj);
         }
-        dsl_scan_sync_state(scn, tx);
+
+        dsl_scan_sync_state(scn, tx, SYNC_CACHED);
 }
 
+static void
+ds_clone_swapped_bookmark(dsl_dataset_t *ds1, dsl_dataset_t *ds2,
+    zbookmark_phys_t *scn_bookmark)
+{
+        if (scn_bookmark->zb_objset == ds1->ds_object) {
+                scn_bookmark->zb_objset = ds2->ds_object;
+                zfs_dbgmsg("clone_swap ds %llu; currently traversing; "
+                    "reset zb_objset to %llu",
+                    (u_longlong_t)ds1->ds_object,
+                    (u_longlong_t)ds2->ds_object);
+        } else if (scn_bookmark->zb_objset == ds2->ds_object) {
+                scn_bookmark->zb_objset = ds1->ds_object;
+                zfs_dbgmsg("clone_swap ds %llu; currently traversing; "
+                    "reset zb_objset to %llu",
+                    (u_longlong_t)ds2->ds_object,
+                    (u_longlong_t)ds1->ds_object);
+        }
+}
+
+/*
+ * Called when a parent dataset and its clone are swapped. If we were
+ * currently traversing the dataset, we need to switch to traversing the
+ * newly promoted parent.
+ */
 void
 dsl_scan_ds_clone_swapped(dsl_dataset_t *ds1, dsl_dataset_t *ds2, dmu_tx_t *tx)
 {
         dsl_pool_t *dp = ds1->ds_dir->dd_pool;
         dsl_scan_t *scn = dp->dp_scan;
         uint64_t mintxg;
 
-        if (scn->scn_phys.scn_state != DSS_SCANNING)
+        if (!dsl_scan_is_running(scn))
                 return;
 
-        if (scn->scn_phys.scn_bookmark.zb_objset == ds1->ds_object) {
-                scn->scn_phys.scn_bookmark.zb_objset = ds2->ds_object;
+        ds_clone_swapped_bookmark(ds1, ds2, &scn->scn_phys.scn_bookmark);
+        ds_clone_swapped_bookmark(ds1, ds2, &scn->scn_phys_cached.scn_bookmark);
+
+        if (scan_ds_queue_contains(scn, ds1->ds_object, &mintxg)) {
+                int err;
+
+                scan_ds_queue_remove(scn, ds1->ds_object);
+                err = scan_ds_queue_insert(scn, ds2->ds_object, mintxg);
+                VERIFY(err == 0 || err == EEXIST);
+                if (err == EEXIST) {
+                        /* Both were there to begin with */
+                        VERIFY0(scan_ds_queue_insert(scn, ds1->ds_object,
+                            mintxg));
+                }
                 zfs_dbgmsg("clone_swap ds %llu; currently traversing; "
                     "reset zb_objset to %llu",
                     (u_longlong_t)ds1->ds_object,
                     (u_longlong_t)ds2->ds_object);
-        } else if (scn->scn_phys.scn_bookmark.zb_objset == ds2->ds_object) {
-                scn->scn_phys.scn_bookmark.zb_objset = ds1->ds_object;
+        } else if (scan_ds_queue_contains(scn, ds2->ds_object, &mintxg)) {
+                scan_ds_queue_remove(scn, ds2->ds_object);
+                VERIFY0(scan_ds_queue_insert(scn, ds1->ds_object, mintxg));
                 zfs_dbgmsg("clone_swap ds %llu; currently traversing; "
                     "reset zb_objset to %llu",
                     (u_longlong_t)ds2->ds_object,
                     (u_longlong_t)ds1->ds_object);
         }

@@ -1040,10 +1601,13 @@
 
         if (zap_lookup_int_key(dp->dp_meta_objset, scn->scn_phys.scn_queue_obj,
             ds1->ds_object, &mintxg) == 0) {
                 int err;
 
+                DTRACE_PROBE4(scan_ds_clone_swapped__in_queue_ds1,
+                    dsl_scan_t *, scn, dsl_dataset_t *, ds1,
+                    dsl_dataset_t *, ds2, dmu_tx_t *, tx);
                 ASSERT3U(mintxg, ==, dsl_dataset_phys(ds1)->ds_prev_snap_txg);
                 ASSERT3U(mintxg, ==, dsl_dataset_phys(ds2)->ds_prev_snap_txg);
                 VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
                     scn->scn_phys.scn_queue_obj, ds1->ds_object, tx));
                 err = zap_add_int_key(dp->dp_meta_objset,

@@ -1059,10 +1623,13 @@
                     "replacing with %llu",
                     (u_longlong_t)ds1->ds_object,
                     (u_longlong_t)ds2->ds_object);
         } else if (zap_lookup_int_key(dp->dp_meta_objset,
             scn->scn_phys.scn_queue_obj, ds2->ds_object, &mintxg) == 0) {
+                DTRACE_PROBE4(scan_ds_clone_swapped__in_queue_ds2,
+                    dsl_scan_t *, scn, dsl_dataset_t *, ds1,
+                    dsl_dataset_t *, ds2, dmu_tx_t *, tx);
                 ASSERT3U(mintxg, ==, dsl_dataset_phys(ds1)->ds_prev_snap_txg);
                 ASSERT3U(mintxg, ==, dsl_dataset_phys(ds2)->ds_prev_snap_txg);
                 VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
                     scn->scn_phys.scn_queue_obj, ds2->ds_object, tx));
                 VERIFY(0 == zap_add_int_key(dp->dp_meta_objset,

@@ -1071,56 +1638,51 @@
                     "replacing with %llu",
                     (u_longlong_t)ds2->ds_object,
                     (u_longlong_t)ds1->ds_object);
         }
 
-        dsl_scan_sync_state(scn, tx);
+        dsl_scan_sync_state(scn, tx, SYNC_CACHED);
 }
 
-struct enqueue_clones_arg {
-        dmu_tx_t *tx;
-        uint64_t originobj;
-};
-
 /* ARGSUSED */
 static int
 enqueue_clones_cb(dsl_pool_t *dp, dsl_dataset_t *hds, void *arg)
 {
-        struct enqueue_clones_arg *eca = arg;
+        uint64_t originobj = *(uint64_t *)arg;
         dsl_dataset_t *ds;
         int err;
         dsl_scan_t *scn = dp->dp_scan;
 
-        if (dsl_dir_phys(hds->ds_dir)->dd_origin_obj != eca->originobj)
+        if (dsl_dir_phys(hds->ds_dir)->dd_origin_obj != originobj)
                 return (0);
 
         err = dsl_dataset_hold_obj(dp, hds->ds_object, FTAG, &ds);
         if (err)
                 return (err);
 
-        while (dsl_dataset_phys(ds)->ds_prev_snap_obj != eca->originobj) {
+        while (dsl_dataset_phys(ds)->ds_prev_snap_obj != originobj) {
                 dsl_dataset_t *prev;
                 err = dsl_dataset_hold_obj(dp,
                     dsl_dataset_phys(ds)->ds_prev_snap_obj, FTAG, &prev);
 
                 dsl_dataset_rele(ds, FTAG);
                 if (err)
                         return (err);
                 ds = prev;
         }
-        VERIFY(zap_add_int_key(dp->dp_meta_objset,
-            scn->scn_phys.scn_queue_obj, ds->ds_object,
-            dsl_dataset_phys(ds)->ds_prev_snap_txg, eca->tx) == 0);
+        VERIFY0(scan_ds_queue_insert(scn, ds->ds_object,
+            dsl_dataset_phys(ds)->ds_prev_snap_txg));
         dsl_dataset_rele(ds, FTAG);
         return (0);
 }
 
 static void
 dsl_scan_visitds(dsl_scan_t *scn, uint64_t dsobj, dmu_tx_t *tx)
 {
         dsl_pool_t *dp = scn->scn_dp;
         dsl_dataset_t *ds;
+        objset_t *os;
 
         VERIFY3U(0, ==, dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds));
 
         if (scn->scn_phys.scn_cur_min_txg >=
             scn->scn_phys.scn_max_txg) {

@@ -1160,27 +1722,22 @@
                 kmem_free(dsname, MAXNAMELEN);
 
                 goto out;
         }
 
+        if (dmu_objset_from_ds(ds, &os))
+                goto out;
+
         /*
          * Only the ZIL in the head (non-snapshot) is valid. Even though
          * snapshots can have ZIL block pointers (which may be the same
-         * BP as in the head), they must be ignored. In addition, $ORIGIN
-         * doesn't have a objset (i.e. its ds_bp is a hole) so we don't
-         * need to look for a ZIL in it either. So we traverse the ZIL here,
-         * rather than in scan_recurse(), because the regular snapshot
-         * block-sharing rules don't apply to it.
+         * BP as in the head), they must be ignored.  So we traverse the
+         * ZIL here, rather than in scan_recurse(), because the regular
+         * snapshot block-sharing rules don't apply to it.
          */
-        if (DSL_SCAN_IS_SCRUB_RESILVER(scn) && !dsl_dataset_is_snapshot(ds) &&
-            ds->ds_dir != dp->dp_origin_snap->ds_dir) {
-                objset_t *os;
-                if (dmu_objset_from_ds(ds, &os) != 0) {
-                        goto out;
-                }
+        if (DSL_SCAN_IS_SCRUB_RESILVER(scn) && !ds->ds_is_snapshot)
                 dsl_scan_zil(dp, &os->os_zil_header);
-        }
 
         /*
          * Iterate over the bps in this ds.
          */
         dmu_buf_will_dirty(ds->ds_dbuf, tx);

@@ -1196,10 +1753,13 @@
             (longlong_t)scn->scn_phys.scn_cur_min_txg,
             (longlong_t)scn->scn_phys.scn_cur_max_txg,
             (int)scn->scn_suspending);
         kmem_free(dsname, ZFS_MAX_DATASET_NAME_LEN);
 
+        DTRACE_PROBE3(scan_done, dsl_scan_t *, scn, dsl_dataset_t *, ds,
+            dmu_tx_t *, tx);
+
         if (scn->scn_suspending)
                 goto out;
 
         /*
          * We've finished this pass over this dataset.

@@ -1207,26 +1767,26 @@
 
         /*
          * If we did not completely visit this dataset, do another pass.
          */
         if (scn->scn_phys.scn_flags & DSF_VISIT_DS_AGAIN) {
+                DTRACE_PROBE3(scan_incomplete, dsl_scan_t *, scn,
+                    dsl_dataset_t *, ds, dmu_tx_t *, tx);
                 zfs_dbgmsg("incomplete pass; visiting again");
                 scn->scn_phys.scn_flags &= ~DSF_VISIT_DS_AGAIN;
-                VERIFY(zap_add_int_key(dp->dp_meta_objset,
-                    scn->scn_phys.scn_queue_obj, ds->ds_object,
-                    scn->scn_phys.scn_cur_max_txg, tx) == 0);
+                VERIFY0(scan_ds_queue_insert(scn, ds->ds_object,
+                    scn->scn_phys.scn_cur_max_txg));
                 goto out;
         }
 
         /*
          * Add descendent datasets to work queue.
          */
         if (dsl_dataset_phys(ds)->ds_next_snap_obj != 0) {
-                VERIFY(zap_add_int_key(dp->dp_meta_objset,
-                    scn->scn_phys.scn_queue_obj,
+                VERIFY0(scan_ds_queue_insert(scn,
                     dsl_dataset_phys(ds)->ds_next_snap_obj,
-                    dsl_dataset_phys(ds)->ds_creation_txg, tx) == 0);
+                    dsl_dataset_phys(ds)->ds_creation_txg));
         }
         if (dsl_dataset_phys(ds)->ds_num_children > 1) {
                 boolean_t usenext = B_FALSE;
                 if (dsl_dataset_phys(ds)->ds_next_clones_obj != 0) {
                         uint64_t count;

@@ -1243,21 +1803,25 @@
                             count == dsl_dataset_phys(ds)->ds_num_children - 1)
                                 usenext = B_TRUE;
                 }
 
                 if (usenext) {
-                        VERIFY0(zap_join_key(dp->dp_meta_objset,
-                            dsl_dataset_phys(ds)->ds_next_clones_obj,
-                            scn->scn_phys.scn_queue_obj,
-                            dsl_dataset_phys(ds)->ds_creation_txg, tx));
+                        zap_cursor_t zc;
+                        zap_attribute_t za;
+                        for (zap_cursor_init(&zc, dp->dp_meta_objset,
+                            dsl_dataset_phys(ds)->ds_next_clones_obj);
+                            zap_cursor_retrieve(&zc, &za) == 0;
+                            (void) zap_cursor_advance(&zc)) {
+                                VERIFY0(scan_ds_queue_insert(scn,
+                                    zfs_strtonum(za.za_name, NULL),
+                                    dsl_dataset_phys(ds)->ds_creation_txg));
+                        }
+                        zap_cursor_fini(&zc);
                 } else {
-                        struct enqueue_clones_arg eca;
-                        eca.tx = tx;
-                        eca.originobj = ds->ds_object;
-
                         VERIFY0(dmu_objset_find_dp(dp, dp->dp_root_dir_obj,
-                            enqueue_clones_cb, &eca, DS_FIND_CHILDREN));
+                            enqueue_clones_cb, &ds->ds_object,
+                            DS_FIND_CHILDREN));
                 }
         }
 
 out:
         dsl_dataset_rele(ds, FTAG);

@@ -1265,11 +1829,10 @@
 
 /* ARGSUSED */
 static int
 enqueue_cb(dsl_pool_t *dp, dsl_dataset_t *hds, void *arg)
 {
-        dmu_tx_t *tx = arg;
         dsl_dataset_t *ds;
         int err;
         dsl_scan_t *scn = dp->dp_scan;
 
         err = dsl_dataset_hold_obj(dp, hds->ds_object, FTAG, &ds);

@@ -1295,12 +1858,12 @@
                 }
                 dsl_dataset_rele(ds, FTAG);
                 ds = prev;
         }
 
-        VERIFY(zap_add_int_key(dp->dp_meta_objset, scn->scn_phys.scn_queue_obj,
-            ds->ds_object, dsl_dataset_phys(ds)->ds_prev_snap_txg, tx) == 0);
+        VERIFY0(scan_ds_queue_insert(scn, ds->ds_object,
+            dsl_dataset_phys(ds)->ds_prev_snap_txg));
         dsl_dataset_rele(ds, FTAG);
         return (0);
 }
 
 /*

@@ -1347,27 +1910,31 @@
         while ((error = ddt_walk(scn->scn_dp->dp_spa, ddb, &dde)) == 0) {
                 ddt_t *ddt;
 
                 if (ddb->ddb_class > scn->scn_phys.scn_ddt_class_max)
                         break;
+                DTRACE_PROBE1(scan_ddb, ddt_bookmark_t *, ddb);
                 dprintf("visiting ddb=%llu/%llu/%llu/%llx\n",
                     (longlong_t)ddb->ddb_class,
                     (longlong_t)ddb->ddb_type,
                     (longlong_t)ddb->ddb_checksum,
                     (longlong_t)ddb->ddb_cursor);
 
                 /* There should be no pending changes to the dedup table */
                 ddt = scn->scn_dp->dp_spa->spa_ddt[ddb->ddb_checksum];
-                ASSERT(avl_first(&ddt->ddt_tree) == NULL);
-
+#ifdef ZFS_DEBUG
+                for (uint_t i = 0; i < DDT_HASHSZ; i++)
+                        ASSERT(avl_first(&ddt->ddt_tree[i]) == NULL);
+#endif
                 dsl_scan_ddt_entry(scn, ddb->ddb_checksum, &dde, tx);
                 n++;
 
                 if (dsl_scan_check_suspend(scn, NULL))
                         break;
         }
 
+        DTRACE_PROBE2(scan_ddt_done, dsl_scan_t *, scn, uint64_t, n);
         zfs_dbgmsg("scanned %llu ddt entries with class_max = %u; "
             "suspending=%u", (longlong_t)n,
             (int)scn->scn_phys.scn_ddt_class_max, (int)scn->scn_suspending);
 
         ASSERT(error == 0 || error == ENOENT);

@@ -1401,12 +1968,11 @@
 
 static void
 dsl_scan_visit(dsl_scan_t *scn, dmu_tx_t *tx)
 {
         dsl_pool_t *dp = scn->scn_dp;
-        zap_cursor_t zc;
-        zap_attribute_t za;
+        uint64_t dsobj, txg;
 
         if (scn->scn_phys.scn_ddt_bookmark.ddb_class <=
             scn->scn_phys.scn_ddt_class_max) {
                 scn->scn_phys.scn_cur_min_txg = scn->scn_phys.scn_min_txg;
                 scn->scn_phys.scn_cur_max_txg = scn->scn_phys.scn_max_txg;

@@ -1426,25 +1992,26 @@
                 if (scn->scn_suspending)
                         return;
 
                 if (spa_version(dp->dp_spa) < SPA_VERSION_DSL_SCRUB) {
                         VERIFY0(dmu_objset_find_dp(dp, dp->dp_root_dir_obj,
-                            enqueue_cb, tx, DS_FIND_CHILDREN));
+                            enqueue_cb, NULL, DS_FIND_CHILDREN));
                 } else {
                         dsl_scan_visitds(scn,
                             dp->dp_origin_snap->ds_object, tx);
                 }
                 ASSERT(!scn->scn_suspending);
         } else if (scn->scn_phys.scn_bookmark.zb_objset !=
             ZB_DESTROYED_OBJSET) {
+                uint64_t dsobj = scn->scn_phys.scn_bookmark.zb_objset;
                 /*
                  * If we were suspended, continue from here.  Note if the
                  * ds we were suspended on was deleted, the zb_objset may
                  * be -1, so we will skip this and find a new objset
                  * below.
                  */
-                dsl_scan_visitds(scn, scn->scn_phys.scn_bookmark.zb_objset, tx);
+                dsl_scan_visitds(scn, dsobj, tx);
                 if (scn->scn_suspending)
                         return;
         }
 
         /*

@@ -1452,56 +2019,51 @@
          * bookmark so we don't think that we're still trying to resume.
          */
         bzero(&scn->scn_phys.scn_bookmark, sizeof (zbookmark_phys_t));
 
         /* keep pulling things out of the zap-object-as-queue */
-        while (zap_cursor_init(&zc, dp->dp_meta_objset,
-            scn->scn_phys.scn_queue_obj),
-            zap_cursor_retrieve(&zc, &za) == 0) {
+        while (scan_ds_queue_first(scn, &dsobj, &txg)) {
                 dsl_dataset_t *ds;
-                uint64_t dsobj;
 
-                dsobj = zfs_strtonum(za.za_name, NULL);
-                VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
-                    scn->scn_phys.scn_queue_obj, dsobj, tx));
+                scan_ds_queue_remove(scn, dsobj);
 
                 /* Set up min/max txg */
                 VERIFY3U(0, ==, dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds));
-                if (za.za_first_integer != 0) {
+                if (txg != 0) {
                         scn->scn_phys.scn_cur_min_txg =
-                            MAX(scn->scn_phys.scn_min_txg,
-                            za.za_first_integer);
+                            MAX(scn->scn_phys.scn_min_txg, txg);
                 } else {
                         scn->scn_phys.scn_cur_min_txg =
                             MAX(scn->scn_phys.scn_min_txg,
                             dsl_dataset_phys(ds)->ds_prev_snap_txg);
                 }
                 scn->scn_phys.scn_cur_max_txg = dsl_scan_ds_maxtxg(ds);
                 dsl_dataset_rele(ds, FTAG);
 
                 dsl_scan_visitds(scn, dsobj, tx);
-                zap_cursor_fini(&zc);
                 if (scn->scn_suspending)
                         return;
         }
-        zap_cursor_fini(&zc);
+        /* No more objsets to fetch, we're done */
+        scn->scn_phys.scn_bookmark.zb_objset = ZB_DESTROYED_OBJSET;
+        ASSERT0(scn->scn_suspending);
 }
 
 static boolean_t
-dsl_scan_async_block_should_pause(dsl_scan_t *scn)
+dsl_scan_free_should_suspend(dsl_scan_t *scn)
 {
         uint64_t elapsed_nanosecs;
 
         if (zfs_recover)
                 return (B_FALSE);
 
-        if (scn->scn_visited_this_txg >= zfs_async_block_max_blocks)
+        if (scn->scn_visited_this_txg >= zfs_free_max_blocks)
                 return (B_TRUE);
 
         elapsed_nanosecs = gethrtime() - scn->scn_sync_start_time;
         return (elapsed_nanosecs / NANOSEC > zfs_txg_timeout ||
-            (NSEC2MSEC(elapsed_nanosecs) > scn->scn_async_block_min_time_ms &&
+            (NSEC2MSEC(elapsed_nanosecs) > zfs_free_min_time_ms &&
             txg_sync_waiting(scn->scn_dp)) ||
             spa_shutting_down(scn->scn_dp->dp_spa));
 }
 
 static int

@@ -1509,11 +2071,11 @@
 {
         dsl_scan_t *scn = arg;
 
         if (!scn->scn_is_bptree ||
             (BP_GET_LEVEL(bp) == 0 && BP_GET_TYPE(bp) != DMU_OT_OBJSET)) {
-                if (dsl_scan_async_block_should_pause(scn))
+                if (dsl_scan_free_should_suspend(scn))
                         return (SET_ERROR(ERESTART));
         }
 
         zio_nowait(zio_free_sync(scn->scn_zio_root, scn->scn_dp->dp_spa,
             dmu_tx_get_txg(tx), bp, 0));

@@ -1522,26 +2084,10 @@
             -BP_GET_PSIZE(bp), -BP_GET_UCSIZE(bp), tx);
         scn->scn_visited_this_txg++;
         return (0);
 }
 
-static int
-dsl_scan_obsolete_block_cb(void *arg, const blkptr_t *bp, dmu_tx_t *tx)
-{
-        dsl_scan_t *scn = arg;
-        const dva_t *dva = &bp->blk_dva[0];
-
-        if (dsl_scan_async_block_should_pause(scn))
-                return (SET_ERROR(ERESTART));
-
-        spa_vdev_indirect_mark_obsolete(scn->scn_dp->dp_spa,
-            DVA_GET_VDEV(dva), DVA_GET_OFFSET(dva),
-            DVA_GET_ASIZE(dva), tx);
-        scn->scn_visited_this_txg++;
-        return (0);
-}
-
 boolean_t
 dsl_scan_active(dsl_scan_t *scn)
 {
         spa_t *spa = scn->scn_dp->dp_spa;
         uint64_t used = 0, comp, uncomp;

@@ -1548,12 +2094,11 @@
 
         if (spa->spa_load_state != SPA_LOAD_NONE)
                 return (B_FALSE);
         if (spa_shutting_down(spa))
                 return (B_FALSE);
-        if ((scn->scn_phys.scn_state == DSS_SCANNING &&
-            !dsl_scan_is_paused_scrub(scn)) ||
+        if ((dsl_scan_is_running(scn) && !dsl_scan_is_paused_scrub(scn)) ||
             (scn->scn_async_destroying && !scn->scn_async_stalled))
                 return (B_TRUE);
 
         if (spa_version(scn->scn_dp->dp_spa) >= SPA_VERSION_DEADLISTS) {
                 (void) bpobj_space(&scn->scn_dp->dp_free_bpobj,

@@ -1618,11 +2163,10 @@
          * blocks than to scrub them.
          */
         if (zfs_free_bpobj_enabled &&
             spa_version(dp->dp_spa) >= SPA_VERSION_DEADLISTS) {
                 scn->scn_is_bptree = B_FALSE;
-                scn->scn_async_block_min_time_ms = zfs_free_min_time_ms;
                 scn->scn_zio_root = zio_root(dp->dp_spa, NULL,
                     NULL, ZIO_FLAG_MUSTSUCCEED);
                 err = bpobj_iterate(&dp->dp_free_bpobj,
                     dsl_scan_free_block_cb, scn, tx);
                 VERIFY3U(0, ==, zio_wait(scn->scn_zio_root));

@@ -1716,46 +2260,39 @@
                 dsl_dir_diduse_space(dp->dp_free_dir, DD_USED_HEAD,
                     -dsl_dir_phys(dp->dp_free_dir)->dd_used_bytes,
                     -dsl_dir_phys(dp->dp_free_dir)->dd_compressed_bytes,
                     -dsl_dir_phys(dp->dp_free_dir)->dd_uncompressed_bytes, tx);
         }
-
         if (dp->dp_free_dir != NULL && !scn->scn_async_destroying) {
                 /* finished; verify that space accounting went to zero */
                 ASSERT0(dsl_dir_phys(dp->dp_free_dir)->dd_used_bytes);
                 ASSERT0(dsl_dir_phys(dp->dp_free_dir)->dd_compressed_bytes);
                 ASSERT0(dsl_dir_phys(dp->dp_free_dir)->dd_uncompressed_bytes);
         }
 
-        EQUIV(bpobj_is_open(&dp->dp_obsolete_bpobj),
-            0 == zap_contains(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
-            DMU_POOL_OBSOLETE_BPOBJ));
-        if (err == 0 && bpobj_is_open(&dp->dp_obsolete_bpobj)) {
-                ASSERT(spa_feature_is_active(dp->dp_spa,
-                    SPA_FEATURE_OBSOLETE_COUNTS));
+        if (!dsl_scan_is_running(scn))
+                return;
 
-                scn->scn_is_bptree = B_FALSE;
-                scn->scn_async_block_min_time_ms = zfs_obsolete_min_time_ms;
-                err = bpobj_iterate(&dp->dp_obsolete_bpobj,
-                    dsl_scan_obsolete_block_cb, scn, tx);
-                if (err != 0 && err != ERESTART)
-                        zfs_panic_recover("error %u from bpobj_iterate()", err);
-
-                if (bpobj_is_empty(&dp->dp_obsolete_bpobj))
-                        dsl_pool_destroy_obsolete_bpobj(dp, tx);
+        if (!zfs_scan_direct) {
+                if (!scn->scn_is_sorted)
+                        scn->scn_last_queue_run_time = 0;
+                scn->scn_is_sorted = B_TRUE;
         }
 
-        if (scn->scn_phys.scn_state != DSS_SCANNING)
-                return;
-
-        if (scn->scn_done_txg == tx->tx_txg) {
+        if (scn->scn_done_txg == tx->tx_txg ||
+            scn->scn_phys.scn_state == DSS_FINISHING) {
                 ASSERT(!scn->scn_suspending);
+                if (scn->scn_bytes_pending != 0) {
+                        ASSERT(scn->scn_is_sorted);
+                        scn->scn_phys.scn_state = DSS_FINISHING;
+                        goto finish;
+                }
                 /* finished with scan. */
                 zfs_dbgmsg("txg %llu scan complete", tx->tx_txg);
                 dsl_scan_done(scn, B_TRUE, tx);
                 ASSERT3U(spa->spa_scrub_inflight, ==, 0);
-                dsl_scan_sync_state(scn, tx);
+                dsl_scan_sync_state(scn, tx, SYNC_MANDATORY);
                 return;
         }
 
         if (dsl_scan_is_paused_scrub(scn))
                 return;

@@ -1780,10 +2317,34 @@
                     (longlong_t)scn->scn_phys.scn_bookmark.zb_object,
                     (longlong_t)scn->scn_phys.scn_bookmark.zb_level,
                     (longlong_t)scn->scn_phys.scn_bookmark.zb_blkid);
         }
 
+        if (scn->scn_is_sorted) {
+                /*
+                 * This is the out-of-order queue handling. We determine our
+                 * memory usage and based on that switch states between normal
+                 * operation (i.e. don't issue queued up I/O unless we've
+                 * reached the end of scanning) and 'clearing' (issue queued
+                 * extents just to clear up some memory).
+                 */
+                mem_lim_t mlim = scan_io_queue_mem_lim(scn);
+
+                if (mlim == MEM_LIM_HARD && !scn->scn_clearing)
+                        scn->scn_clearing = B_TRUE;
+                else if (mlim == MEM_LIM_NONE && scn->scn_clearing)
+                        scn->scn_clearing = B_FALSE;
+
+                if ((scn->scn_checkpointing || ddi_get_lbolt() -
+                    scn->scn_last_checkpoint > ZFS_SCAN_CHECKPOINT_INTVAL) &&
+                    scn->scn_phys.scn_state != DSS_FINISHING &&
+                    !scn->scn_clearing) {
+                        scn->scn_checkpointing = B_TRUE;
+                }
+        }
+
+        if (!scn->scn_clearing && !scn->scn_checkpointing) {
         scn->scn_zio_root = zio_root(dp->dp_spa, NULL,
             NULL, ZIO_FLAG_CANFAIL);
         dsl_pool_config_enter(dp, FTAG);
         dsl_scan_visit(scn, tx);
         dsl_pool_config_exit(dp, FTAG);

@@ -1790,17 +2351,30 @@
         (void) zio_wait(scn->scn_zio_root);
         scn->scn_zio_root = NULL;
 
         zfs_dbgmsg("visited %llu blocks in %llums",
             (longlong_t)scn->scn_visited_this_txg,
-            (longlong_t)NSEC2MSEC(gethrtime() - scn->scn_sync_start_time));
+                    (longlong_t)NSEC2MSEC(gethrtime() -
+                    scn->scn_sync_start_time));
 
         if (!scn->scn_suspending) {
                 scn->scn_done_txg = tx->tx_txg + 1;
+                        zfs_dbgmsg("txg %llu traversal complete, waiting "
+                            "till txg %llu", tx->tx_txg, scn->scn_done_txg);
+                }
+        }
+        if (!scn->scn_suspending) {
+                scn->scn_done_txg = tx->tx_txg + 1;
                 zfs_dbgmsg("txg %llu traversal complete, waiting till txg %llu",
                     tx->tx_txg, scn->scn_done_txg);
         }
+finish:
+        if (scn->scn_is_sorted) {
+                dsl_pool_config_enter(dp, FTAG);
+                scan_io_queues_run(scn);
+                dsl_pool_config_exit(dp, FTAG);
+        }
 
         if (DSL_SCAN_IS_SCRUB_RESILVER(scn)) {
                 mutex_enter(&spa->spa_scrub_lock);
                 while (spa->spa_scrub_inflight > 0) {
                         cv_wait(&spa->spa_scrub_io_cv,

@@ -1807,19 +2381,22 @@
                             &spa->spa_scrub_lock);
                 }
                 mutex_exit(&spa->spa_scrub_lock);
         }
 
-        dsl_scan_sync_state(scn, tx);
+        dsl_scan_sync_state(scn, tx, SYNC_OPTIONAL);
 }
 
 /*
  * This will start a new scan, or restart an existing one.
  */
 void
 dsl_resilver_restart(dsl_pool_t *dp, uint64_t txg)
 {
+        /* Stop any ongoing TRIMs */
+        spa_man_trim_stop(dp->dp_spa);
+
         if (txg == 0) {
                 dmu_tx_t *tx;
                 tx = dmu_tx_create_dd(dp->dp_mos_dir);
                 VERIFY(0 == dmu_tx_assign(tx, TXG_WAIT));
 

@@ -1833,23 +2410,27 @@
 }
 
 boolean_t
 dsl_scan_resilvering(dsl_pool_t *dp)
 {
-        return (dp->dp_scan->scn_phys.scn_state == DSS_SCANNING &&
+        return (dsl_scan_is_running(dp->dp_scan) &&
             dp->dp_scan->scn_phys.scn_func == POOL_SCAN_RESILVER);
 }
 
 /*
  * scrub consumers
  */
 
 static void
-count_block(zfs_all_blkstats_t *zab, const blkptr_t *bp)
+count_block(dsl_scan_t *scn, zfs_all_blkstats_t *zab, const blkptr_t *bp)
 {
         int i;
 
+        for (i = 0; i < BP_GET_NDVAS(bp); i++)
+                atomic_add_64(&scn->scn_bytes_issued,
+                    DVA_GET_ASIZE(&bp->blk_dva[i]));
+
         /*
          * If we resume after a reboot, zab will be NULL; don't record
          * incomplete stats in that case.
          */
         if (zab == NULL)

@@ -1899,68 +2480,92 @@
         abd_free(zio->io_abd);
 
         mutex_enter(&spa->spa_scrub_lock);
         spa->spa_scrub_inflight--;
         cv_broadcast(&spa->spa_scrub_io_cv);
+        mutex_exit(&spa->spa_scrub_lock);
 
         if (zio->io_error && (zio->io_error != ECKSUM ||
             !(zio->io_flags & ZIO_FLAG_SPECULATIVE))) {
-                spa->spa_dsl_pool->dp_scan->scn_phys.scn_errors++;
+                atomic_inc_64(&spa->spa_dsl_pool->dp_scan->scn_phys.scn_errors);
+                DTRACE_PROBE1(scan_error, zio_t *, zio);
         }
-        mutex_exit(&spa->spa_scrub_lock);
 }
 
 static int
 dsl_scan_scrub_cb(dsl_pool_t *dp,
     const blkptr_t *bp, const zbookmark_phys_t *zb)
 {
         dsl_scan_t *scn = dp->dp_scan;
-        size_t size = BP_GET_PSIZE(bp);
         spa_t *spa = dp->dp_spa;
         uint64_t phys_birth = BP_PHYSICAL_BIRTH(bp);
         boolean_t needs_io;
         int zio_flags = ZIO_FLAG_SCAN_THREAD | ZIO_FLAG_RAW | ZIO_FLAG_CANFAIL;
-        int scan_delay = 0;
+        boolean_t ignore_dva0;
 
         if (phys_birth <= scn->scn_phys.scn_min_txg ||
             phys_birth >= scn->scn_phys.scn_max_txg)
                 return (0);
 
-        count_block(dp->dp_blkstats, bp);
-
-        if (BP_IS_EMBEDDED(bp))
+        if (BP_IS_EMBEDDED(bp)) {
+                count_block(scn, dp->dp_blkstats, bp);
                 return (0);
+        }
 
         ASSERT(DSL_SCAN_IS_SCRUB_RESILVER(scn));
-        if (scn->scn_phys.scn_func == POOL_SCAN_SCRUB) {
+        if (scn->scn_phys.scn_func == POOL_SCAN_SCRUB ||
+            scn->scn_phys.scn_func == POOL_SCAN_MOS ||
+            scn->scn_phys.scn_func == POOL_SCAN_META) {
                 zio_flags |= ZIO_FLAG_SCRUB;
                 needs_io = B_TRUE;
-                scan_delay = zfs_scrub_delay;
         } else {
                 ASSERT3U(scn->scn_phys.scn_func, ==, POOL_SCAN_RESILVER);
                 zio_flags |= ZIO_FLAG_RESILVER;
                 needs_io = B_FALSE;
-                scan_delay = zfs_resilver_delay;
         }
 
         /* If it's an intent log block, failure is expected. */
         if (zb->zb_level == ZB_ZIL_LEVEL)
                 zio_flags |= ZIO_FLAG_SPECULATIVE;
 
+        if (scn->scn_phys.scn_func == POOL_SCAN_MOS)
+                needs_io = (zb->zb_objset == 0);
+
+        if (scn->scn_phys.scn_func == POOL_SCAN_META)
+                needs_io = zb->zb_objset == 0 || BP_GET_LEVEL(bp) != 0 ||
+                    DMU_OT_IS_METADATA(BP_GET_TYPE(bp));
+
+        DTRACE_PROBE3(scan_needs_io, boolean_t, needs_io,
+            const blkptr_t *, bp, spa_t *, spa);
+
+        /*
+         * WBC will invalidate DVA[0] after migrating the block to the main
+         * pool. If the user subsequently disables WBC and removes the special
+         * device, DVA[0] can now point to a hole vdev. We won't try to do
+         * I/O to it, but we must also avoid doing DTL checks.
+         */
+        ignore_dva0 = (BP_IS_SPECIAL(bp) &&
+            wbc_bp_is_migrated(spa_get_wbc_data(spa), bp));
+
         for (int d = 0; d < BP_GET_NDVAS(bp); d++) {
-                vdev_t *vd = vdev_lookup_top(spa,
-                    DVA_GET_VDEV(&bp->blk_dva[d]));
+                vdev_t *vd;
 
                 /*
                  * Keep track of how much data we've examined so that
                  * zpool(1M) status can make useful progress reports.
                  */
                 scn->scn_phys.scn_examined += DVA_GET_ASIZE(&bp->blk_dva[d]);
                 spa->spa_scan_pass_exam += DVA_GET_ASIZE(&bp->blk_dva[d]);
 
+                /* WBC-invalidated DVA post-migration, so skip it */
+                if (d == 0 && ignore_dva0)
+                        continue;
+                vd = vdev_lookup_top(spa, DVA_GET_VDEV(&bp->blk_dva[d]));
+
                 /* if it's a resilver, this may not be in the target range */
-                if (!needs_io) {
+                if (!needs_io && scn->scn_phys.scn_func != POOL_SCAN_MOS &&
+                    scn->scn_phys.scn_func != POOL_SCAN_META) {
                         if (DVA_GET_GANG(&bp->blk_dva[d])) {
                                 /*
                                  * Gang members may be spread across multiple
                                  * vdevs, so the best estimate we have is the
                                  * scrub range, which has already been checked.

@@ -1967,37 +2572,26 @@
                                  * XXX -- it would be better to change our
                                  * allocation policy to ensure that all
                                  * gang members reside on the same vdev.
                                  */
                                 needs_io = B_TRUE;
+                                DTRACE_PROBE2(gang_bp, const blkptr_t *, bp,
+                                    spa_t *, spa);
                         } else {
                                 needs_io = vdev_dtl_contains(vd, DTL_PARTIAL,
                                     phys_birth, 1);
+                                if (needs_io)
+                                        DTRACE_PROBE2(dtl, const blkptr_t *,
+                                            bp, spa_t *, spa);
                         }
                 }
         }
 
         if (needs_io && !zfs_no_scrub_io) {
-                vdev_t *rvd = spa->spa_root_vdev;
-                uint64_t maxinflight = rvd->vdev_children * zfs_top_maxinflight;
-
-                mutex_enter(&spa->spa_scrub_lock);
-                while (spa->spa_scrub_inflight >= maxinflight)
-                        cv_wait(&spa->spa_scrub_io_cv, &spa->spa_scrub_lock);
-                spa->spa_scrub_inflight++;
-                mutex_exit(&spa->spa_scrub_lock);
-
-                /*
-                 * If we're seeing recent (zfs_scan_idle) "important" I/Os
-                 * then throttle our workload to limit the impact of a scan.
-                 */
-                if (ddi_get_lbolt64() - spa->spa_last_io <= zfs_scan_idle)
-                        delay(scan_delay);
-
-                zio_nowait(zio_read(NULL, spa, bp,
-                    abd_alloc_for_io(size, B_FALSE), size, dsl_scan_scrub_done,
-                    NULL, ZIO_PRIORITY_SCRUB, zio_flags, zb));
+                dsl_scan_enqueue(dp, bp, zio_flags, zb);
+        } else {
+                count_block(scn, dp->dp_blkstats, bp);
         }
 
         /* do not relocate this block */
         return (0);
 }

@@ -2027,14 +2621,12 @@
 
         if (func == POOL_SCAN_SCRUB && dsl_scan_is_paused_scrub(scn)) {
                 /* got scrub start cmd, resume paused scrub */
                 int err = dsl_scrub_set_pause_resume(scn->scn_dp,
                     POOL_SCRUB_NORMAL);
-                if (err == 0) {
-                        spa_event_notify(spa, NULL, NULL, ESC_ZFS_SCRUB_RESUME);
+                if (err == 0)
                         return (ECANCELED);
-                }
 
                 return (SET_ERROR(err));
         }
 
         return (dsl_sync_task(spa_name(spa), dsl_scan_setup_check,

@@ -2044,6 +2636,990 @@
 static boolean_t
 dsl_scan_restarting(dsl_scan_t *scn, dmu_tx_t *tx)
 {
         return (scn->scn_restart_txg != 0 &&
             scn->scn_restart_txg <= tx->tx_txg);
+}
+
+/*
+ * Grand theory statement on scan queue sorting
+ *
+ * Scanning is implemented by recursively traversing all indirection levels
+ * in an object and reading all blocks referenced from said objects. This
+ * results in us approximately traversing the object from lowest logical
+ * offset to the highest. Naturally, if we were simply read all blocks in
+ * this order, we would require that the blocks be also physically arranged
+ * in sort of a linear fashion on the vdevs. However, this is frequently
+ * not the case on pools. So we instead stick the I/Os into a reordering
+ * queue and issue them out of logical order and in a way that most benefits
+ * physical disks (LBA-order).
+ *
+ * This sorting algorithm is subject to limitations. We can't do this with
+ * blocks that are non-leaf, because the scanner itself depends on these
+ * being available ASAP for further metadata traversal. So we exclude any
+ * block that is bp_level > 0. Fortunately, this usually represents only
+ * around 1% of our data volume, so no great loss.
+ *
+ * As a further limitation, we cannot sort blocks which have more than
+ * one DVA present (copies > 1), because there's no sensible way to sort
+ * these (how do you sort a queue based on multiple contradictory
+ * criteria?). So we exclude those as well. Again, these are very rarely
+ * used for leaf blocks, usually only on metadata.
+ *
+ * WBC consideration: we can't sort blocks which have not yet been fully
+ * migrated to normal devices, because their data can reside purely on the
+ * special device or on both normal and special. This would require larger
+ * data structures to track both DVAs in our queues and we need the
+ * smallest in-core structures we can possibly get to get good sorting
+ * performance. Therefore, blocks which have not yet been fully migrated
+ * out of the WBC are processed as non-sortable and issued immediately.
+ *
+ * Queue management:
+ *
+ * Ideally, we would want to scan all metadata and queue up all leaf block
+ * I/O prior to starting to issue it, because that allows us to do an
+ * optimal sorting job. This can however consume large amounts of memory.
+ * Therefore we continuously monitor the size of the queues and constrain
+ * them to 5% (zfs_scan_mem_lim_fact) of physmem. If the queues grow larger
+ * than this limit, we clear out a few of the largest extents at the head
+ * of the queues to make room for more scanning. Hopefully, these extents
+ * will be fairly large and contiguous, allowing us to approach sequential
+ * I/O throughput even without a fully sorted tree.
+ *
+ * Metadata scanning takes place in dsl_scan_visit(), which is called from
+ * dsl_scan_sync() every spa_sync(). If we have either fully scanned all
+ * metadata on the pool, or we need to make room in memory because our
+ * queues are too large, dsl_scan_visit() is postponed and
+ * scan_io_queues_run() is called from dsl_scan_sync() instead. That means,
+ * metadata scanning and queued I/O issuing are mutually exclusive. This is
+ * to provide maximum sequential I/O throughput for the queued I/O issue
+ * process. Sequential I/O performance is significantly negatively impacted
+ * if it is interleaved with random I/O.
+ *
+ * Backwards compatibility
+ *
+ * This new algorithm is backwards compatible with the legacy on-disk data
+ * structures. If imported on a machine without the new sorting algorithm,
+ * the scan simply resumes from the last checkpoint.
+ */
+
+/*
+ * Given a set of I/O parameters as discovered by the metadata traversal
+ * process, attempts to place the I/O into the reordering queue (if
+ * possible), or immediately executes the I/O. The check for whether an
+ * I/O is suitable for sorting is performed here.
+ */
+static void
+dsl_scan_enqueue(dsl_pool_t *dp, const blkptr_t *bp, int zio_flags,
+    const zbookmark_phys_t *zb)
+{
+        spa_t *spa = dp->dp_spa;
+
+        ASSERT(!BP_IS_EMBEDDED(bp));
+        if (!dp->dp_scan->scn_is_sorted || (BP_IS_SPECIAL(bp) &&
+            !wbc_bp_is_migrated(spa_get_wbc_data(spa), bp))) {
+                scan_exec_io(dp, bp, zio_flags, zb, B_TRUE);
+                return;
+        }
+
+        for (int i = 0; i < BP_GET_NDVAS(bp); i++) {
+                dva_t dva;
+                vdev_t *vdev;
+
+                /* On special BPs we only support handling the normal DVA */
+                if (BP_IS_SPECIAL(bp) && i != WBC_NORMAL_DVA)
+                        continue;
+
+                dva = bp->blk_dva[i];
+                vdev = vdev_lookup_top(spa, DVA_GET_VDEV(&dva));
+                ASSERT(vdev != NULL);
+
+                mutex_enter(&vdev->vdev_scan_io_queue_lock);
+                if (vdev->vdev_scan_io_queue == NULL)
+                        vdev->vdev_scan_io_queue = scan_io_queue_create(vdev);
+                ASSERT(dp->dp_scan != NULL);
+                scan_io_queue_insert(dp->dp_scan, vdev->vdev_scan_io_queue, bp,
+                    i, zio_flags, zb);
+                mutex_exit(&vdev->vdev_scan_io_queue_lock);
+        }
+}
+
+/*
+ * Given a scanning zio's information, executes the zio. The zio need
+ * not necessarily be only sortable, this function simply executes the
+ * zio, no matter what it is. The limit_inflight flag controls whether
+ * we limit the number of concurrently executing scan zio's to
+ * zfs_top_maxinflight times the number of top-level vdevs. This is
+ * used during metadata discovery to pace the generation of I/O and
+ * properly time the pausing of the scanning algorithm. The queue
+ * processing part uses a different method of controlling timing and
+ * so doesn't need this limit applied to its zio's.
+ */
+static void
+scan_exec_io(dsl_pool_t *dp, const blkptr_t *bp, int zio_flags,
+    const zbookmark_phys_t *zb, boolean_t limit_inflight)
+{
+        spa_t *spa = dp->dp_spa;
+        size_t size = BP_GET_PSIZE(bp);
+        vdev_t *rvd = spa->spa_root_vdev;
+        uint64_t maxinflight = rvd->vdev_children * zfs_top_maxinflight;
+        dsl_scan_t *scn = dp->dp_scan;
+        zio_priority_t prio;
+
+        mutex_enter(&spa->spa_scrub_lock);
+        while (limit_inflight && spa->spa_scrub_inflight >= maxinflight)
+                cv_wait(&spa->spa_scrub_io_cv, &spa->spa_scrub_lock);
+        spa->spa_scrub_inflight++;
+        mutex_exit(&spa->spa_scrub_lock);
+
+        for (int i = 0; i < BP_GET_NDVAS(bp); i++)
+                atomic_add_64(&spa->spa_scan_pass_work,
+                    DVA_GET_ASIZE(&bp->blk_dva[i]));
+
+        count_block(dp->dp_scan, dp->dp_blkstats, bp);
+        DTRACE_PROBE3(do_io, uint64_t, dp->dp_scan->scn_phys.scn_func,
+            boolean_t, B_TRUE, spa_t *, spa);
+        prio = (scn->scn_phys.scn_func == POOL_SCAN_RESILVER ?
+            ZIO_PRIORITY_RESILVER : ZIO_PRIORITY_SCRUB);
+        zio_nowait(zio_read(NULL, spa, bp, abd_alloc_for_io(size, B_FALSE),
+            size, dsl_scan_scrub_done, NULL, prio, zio_flags, zb));
+}
+
+/*
+ * Given all the info we got from our metadata scanning process, we
+ * construct a scan_io_t and insert it into the scan sorting queue. The
+ * I/O must already be suitable for us to process. This is controlled
+ * by dsl_scan_enqueue().
+ */
+static void
+scan_io_queue_insert(dsl_scan_t *scn, dsl_scan_io_queue_t *queue,
+    const blkptr_t *bp, int dva_i, int zio_flags, const zbookmark_phys_t *zb)
+{
+        scan_io_t *sio = kmem_zalloc(sizeof (*sio), KM_SLEEP);
+        avl_index_t idx;
+        uint64_t offset, asize;
+
+        ASSERT(MUTEX_HELD(&queue->q_vd->vdev_scan_io_queue_lock));
+
+        bp2sio(bp, sio, dva_i);
+        sio->sio_flags = zio_flags;
+        sio->sio_zb = *zb;
+        offset = SCAN_IO_GET_OFFSET(sio);
+        asize = sio->sio_asize;
+
+        if (avl_find(&queue->q_zios_by_addr, sio, &idx) != NULL) {
+                /* block is already scheduled for reading */
+                kmem_free(sio, sizeof (*sio));
+                return;
+        }
+        avl_insert(&queue->q_zios_by_addr, sio, idx);
+        atomic_add_64(&queue->q_zio_bytes, asize);
+
+        /*
+         * Increment the bytes pending counter now so that we can't
+         * get an integer underflow in case the worker processes the
+         * zio before we get to incrementing this counter.
+         */
+        mutex_enter(&scn->scn_status_lock);
+        scn->scn_bytes_pending += asize;
+        mutex_exit(&scn->scn_status_lock);
+
+        range_tree_set_gap(queue->q_exts_by_addr, zfs_scan_max_ext_gap);
+        range_tree_add_fill(queue->q_exts_by_addr, offset, asize, asize);
+}
+
+/* q_exts_by_addr segment add callback. */
+/*ARGSUSED*/
+static void
+scan_io_queue_insert_cb(range_tree_t *rt, range_seg_t *rs, void *arg)
+{
+        dsl_scan_io_queue_t *queue = arg;
+        avl_index_t idx;
+        ASSERT(MUTEX_HELD(&queue->q_vd->vdev_scan_io_queue_lock));
+        VERIFY3P(avl_find(&queue->q_exts_by_size, rs, &idx), ==, NULL);
+        avl_insert(&queue->q_exts_by_size, rs, idx);
+}
+
+/* q_exts_by_addr segment remove callback. */
+/*ARGSUSED*/
+static void
+scan_io_queue_remove_cb(range_tree_t *rt, range_seg_t *rs, void *arg)
+{
+        dsl_scan_io_queue_t *queue = arg;
+        avl_remove(&queue->q_exts_by_size, rs);
+}
+
+/* q_exts_by_addr vacate callback. */
+/*ARGSUSED*/
+static void
+scan_io_queue_vacate_cb(range_tree_t *rt, void *arg)
+{
+        dsl_scan_io_queue_t *queue = arg;
+        void *cookie = NULL;
+        while (avl_destroy_nodes(&queue->q_exts_by_size, &cookie) != NULL)
+                ;
+}
+
+/*
+ * This is the primary extent sorting algorithm. We balance two parameters:
+ * 1) how many bytes of I/O are in an extent
+ * 2) how well the extent is filled with I/O (as a fraction of its total size)
+ * Since we allow extents to have gaps between their constituent I/Os, it's
+ * possible to have a fairly large extent that contains the same amount of
+ * I/O bytes than a much smaller extent, which just packs the I/O more tightly.
+ * The algorithm sorts based on a score calculated from the extent's size,
+ * the relative fill volume (in %) and a "fill weight" parameter that controls
+ * the split between whether we prefer larger extents or more well populated
+ * extents:
+ *
+ * SCORE = FILL_IN_BYTES + (FILL_IN_PERCENT * FILL_IN_BYTES * FILL_WEIGHT)
+ *
+ * Example:
+ * 1) assume extsz = 64 MiB
+ * 2) assume fill = 32 MiB (extent is half full)
+ * 3) assume fill_weight = 3
+ * 4)   SCORE = 32M + (((32M * 100) / 64M) * 3 * 32M) / 100
+ *      SCORE = 32M + (50 * 3 * 32M) / 100
+ *      SCORE = 32M + (4800M / 100)
+ *      SCORE = 32M + 48M
+ *               ^     ^
+ *               |     +--- final total relative fill-based score
+ *               +--------- final total fill-based score
+ *      SCORE = 80M
+ *
+ * As can be seen, at fill_ratio=3, the algorithm is slightly biased towards
+ * extents that are more completely filled (in a 3:2 ratio) vs just larger.
+ */
+static int
+ext_size_compar(const void *x, const void *y)
+{
+        const range_seg_t *rsa = x, *rsb = y;
+        uint64_t sa = rsa->rs_end - rsa->rs_start,
+            sb = rsb->rs_end - rsb->rs_start;
+        uint64_t score_a, score_b;
+
+        score_a = rsa->rs_fill + (((rsa->rs_fill * 100) / sa) *
+            fill_weight * rsa->rs_fill) / 100;
+        score_b = rsb->rs_fill + (((rsb->rs_fill * 100) / sb) *
+            fill_weight * rsb->rs_fill) / 100;
+
+        if (score_a > score_b)
+                return (-1);
+        if (score_a == score_b) {
+                if (rsa->rs_start < rsb->rs_start)
+                        return (-1);
+                if (rsa->rs_start == rsb->rs_start)
+                        return (0);
+                return (1);
+        }
+        return (1);
+}
+
+/*
+ * Comparator for the q_zios_by_addr tree. Sorting is simply performed
+ * based on LBA-order (from lowest to highest).
+ */
+static int
+io_addr_compar(const void *x, const void *y)
+{
+        const scan_io_t *a = x, *b = y;
+        uint64_t off_a = SCAN_IO_GET_OFFSET(a);
+        uint64_t off_b = SCAN_IO_GET_OFFSET(b);
+        if (off_a < off_b)
+                return (-1);
+        if (off_a == off_b)
+                return (0);
+        return (1);
+}
+
+static dsl_scan_io_queue_t *
+scan_io_queue_create(vdev_t *vd)
+{
+        dsl_scan_t *scn = vd->vdev_spa->spa_dsl_pool->dp_scan;
+        dsl_scan_io_queue_t *q = kmem_zalloc(sizeof (*q), KM_SLEEP);
+
+        q->q_scn = scn;
+        q->q_vd = vd;
+        cv_init(&q->q_cv, NULL, CV_DEFAULT, NULL);
+        q->q_exts_by_addr = range_tree_create(&scan_io_queue_ops, q,
+            &q->q_vd->vdev_scan_io_queue_lock);
+        avl_create(&q->q_exts_by_size, ext_size_compar,
+            sizeof (range_seg_t), offsetof(range_seg_t, rs_pp_node));
+        avl_create(&q->q_zios_by_addr, io_addr_compar,
+            sizeof (scan_io_t), offsetof(scan_io_t, sio_nodes.sio_addr_node));
+
+        return (q);
+}
+
+/*
+ * Destroyes a scan queue and all segments and scan_io_t's contained in it.
+ * No further execution of I/O occurs, anything pending in the queue is
+ * simply dropped. Prior to calling this, the queue should have been
+ * removed from its parent top-level vdev, hence holding the queue's
+ * lock is not permitted.
+ */
+void
+dsl_scan_io_queue_destroy(dsl_scan_io_queue_t *queue)
+{
+        dsl_scan_t *scn = queue->q_scn;
+        scan_io_t *sio;
+        uint64_t bytes_dequeued = 0;
+        kmutex_t *q_lock = &queue->q_vd->vdev_scan_io_queue_lock;
+
+        ASSERT(!MUTEX_HELD(q_lock));
+
+#ifdef  DEBUG   /* This is for the ASSERT(range_tree_contains... below */
+        mutex_enter(q_lock);
+#endif
+        while ((sio = avl_first(&queue->q_zios_by_addr)) != NULL) {
+                ASSERT(range_tree_contains(queue->q_exts_by_addr,
+                    SCAN_IO_GET_OFFSET(sio), sio->sio_asize));
+                bytes_dequeued += sio->sio_asize;
+                avl_remove(&queue->q_zios_by_addr, sio);
+                kmem_free(sio, sizeof (*sio));
+        }
+#ifdef  DEBUG
+        mutex_exit(q_lock);
+#endif
+
+        mutex_enter(&scn->scn_status_lock);
+        ASSERT3U(scn->scn_bytes_pending, >=, bytes_dequeued);
+        scn->scn_bytes_pending -= bytes_dequeued;
+        mutex_exit(&scn->scn_status_lock);
+
+        /* lock here to avoid tripping assertion in range_tree_vacate */
+        mutex_enter(q_lock);
+        range_tree_vacate(queue->q_exts_by_addr, NULL, queue);
+        mutex_exit(q_lock);
+
+        range_tree_destroy(queue->q_exts_by_addr);
+        avl_destroy(&queue->q_exts_by_size);
+        avl_destroy(&queue->q_zios_by_addr);
+        cv_destroy(&queue->q_cv);
+
+        kmem_free(queue, sizeof (*queue));
+}
+
+/*
+ * Properly transfers a dsl_scan_queue_t from `svd' to `tvd'. This is
+ * called on behalf of vdev_top_transfer when creating or destroying
+ * a mirror vdev due to zpool attach/detach.
+ */
+void
+dsl_scan_io_queue_vdev_xfer(vdev_t *svd, vdev_t *tvd)
+{
+        mutex_enter(&svd->vdev_scan_io_queue_lock);
+        mutex_enter(&tvd->vdev_scan_io_queue_lock);
+
+        VERIFY3P(tvd->vdev_scan_io_queue, ==, NULL);
+        tvd->vdev_scan_io_queue = svd->vdev_scan_io_queue;
+        svd->vdev_scan_io_queue = NULL;
+        if (tvd->vdev_scan_io_queue != NULL) {
+                tvd->vdev_scan_io_queue->q_vd = tvd;
+                range_tree_set_lock(tvd->vdev_scan_io_queue->q_exts_by_addr,
+                    &tvd->vdev_scan_io_queue_lock);
+        }
+
+        mutex_exit(&tvd->vdev_scan_io_queue_lock);
+        mutex_exit(&svd->vdev_scan_io_queue_lock);
+}
+
+static void
+scan_io_queues_destroy(dsl_scan_t *scn)
+{
+        vdev_t *rvd = scn->scn_dp->dp_spa->spa_root_vdev;
+
+        for (uint64_t i = 0; i < rvd->vdev_children; i++) {
+                vdev_t *tvd = rvd->vdev_child[i];
+                dsl_scan_io_queue_t *queue;
+
+                mutex_enter(&tvd->vdev_scan_io_queue_lock);
+                queue = tvd->vdev_scan_io_queue;
+                tvd->vdev_scan_io_queue = NULL;
+                mutex_exit(&tvd->vdev_scan_io_queue_lock);
+
+                if (queue != NULL)
+                        dsl_scan_io_queue_destroy(queue);
+        }
+}
+
+/*
+ * Computes the memory limit state that we're currently in. A sorted scan
+ * needs quite a bit of memory to hold the sorting queues, so we need to
+ * reasonably constrain their size so they don't impact overall system
+ * performance. We compute two limits:
+ * 1) Hard memory limit: if the amount of memory used by the sorting
+ *      queues on a pool gets above this value, we stop the metadata
+ *      scanning portion and start issuing the queued up and sorted
+ *      I/Os to reduce memory usage.
+ *      This limit is calculated as a fraction of physmem (by default 5%).
+ *      We constrain the lower bound of the hard limit to an absolute
+ *      minimum of zfs_scan_mem_lim_min (default: 16 MiB). We also constrain
+ *      the upper bound to 5% of the total pool size - no chance we'll
+ *      ever need that much memory, but just to keep the value in check.
+ * 2) Soft memory limit: once we hit the hard memory limit, we start
+ *      issuing I/O to lower queue memory usage, but we don't want to
+ *      completely empty them out, as having more in the queues allows
+ *      us to make better sorting decisions. So we stop the issuing of
+ *      I/Os once the amount of memory used drops below the soft limit
+ *      (at which point we stop issuing I/O and start scanning metadata
+ *      again).
+ *      The limit is calculated by subtracting a fraction of the hard
+ *      limit from the hard limit. By default this fraction is 10%, so
+ *      the soft limit is 90% of the hard limit. We cap the size of the
+ *      difference between the hard and soft limits at an absolute
+ *      maximum of zfs_scan_mem_lim_soft_max (default: 128 MiB) - this is
+ *      sufficient to not cause too frequent switching between the
+ *      metadata scan and I/O issue (even at 2k recordsize, 128 MiB's
+ *      worth of queues is about 1.2 GiB of on-pool data, so scanning
+ *      that should take at least a decent fraction of a second).
+ */
+static mem_lim_t
+scan_io_queue_mem_lim(dsl_scan_t *scn)
+{
+        vdev_t *rvd = scn->scn_dp->dp_spa->spa_root_vdev;
+        uint64_t mlim_hard, mlim_soft, mused;
+        uint64_t alloc = metaslab_class_get_alloc(spa_normal_class(
+            scn->scn_dp->dp_spa));
+
+        mlim_hard = MAX((physmem / zfs_scan_mem_lim_fact) * PAGESIZE,
+            zfs_scan_mem_lim_min);
+        mlim_hard = MIN(mlim_hard, alloc / 20);
+        mlim_soft = mlim_hard - MIN(mlim_hard / zfs_scan_mem_lim_soft_fact,
+            zfs_scan_mem_lim_soft_max);
+        mused = 0;
+        for (uint64_t i = 0; i < rvd->vdev_children; i++) {
+                vdev_t *tvd = rvd->vdev_child[i];
+                dsl_scan_io_queue_t *queue;
+
+                mutex_enter(&tvd->vdev_scan_io_queue_lock);
+                queue = tvd->vdev_scan_io_queue;
+                if (queue != NULL) {
+                        /* #extents in exts_by_size = # in exts_by_addr */
+                        mused += avl_numnodes(&queue->q_exts_by_size) *
+                            sizeof (range_seg_t) +
+                            (avl_numnodes(&queue->q_zios_by_addr) +
+                            queue->q_num_issuing_zios) * sizeof (scan_io_t);
+                }
+                mutex_exit(&tvd->vdev_scan_io_queue_lock);
+        }
+        DTRACE_PROBE4(queue_mem_lim, dsl_scan_t *, scn, uint64_t, mlim_hard,
+            uint64_t, mlim_soft, uint64_t, mused);
+
+        if (mused >= mlim_hard)
+                return (MEM_LIM_HARD);
+        else if (mused >= mlim_soft)
+                return (MEM_LIM_SOFT);
+        else
+                return (MEM_LIM_NONE);
+}
+
+/*
+ * Given a list of scan_io_t's in io_list, this issues the io's out to
+ * disk. Passing shutdown=B_TRUE instead discards the zio's without
+ * issuing them. This consumes the io_list and frees the scan_io_t's.
+ * This is called when emptying queues, either when we're up against
+ * the memory limit or we have finished scanning.
+ */
+static void
+scan_io_queue_issue(list_t *io_list, dsl_scan_io_queue_t *queue)
+{
+        dsl_scan_t *scn = queue->q_scn;
+        scan_io_t *sio;
+        int64_t bytes_issued = 0;
+
+        while ((sio = list_head(io_list)) != NULL) {
+                blkptr_t bp;
+
+                sio2bp(sio, &bp, queue->q_vd->vdev_id);
+                bytes_issued += sio->sio_asize;
+                scan_exec_io(scn->scn_dp, &bp, sio->sio_flags, &sio->sio_zb,
+                    B_FALSE);
+                (void) list_remove_head(io_list);
+                ASSERT(queue->q_num_issuing_zios > 0);
+                atomic_dec_64(&queue->q_num_issuing_zios);
+                kmem_free(sio, sizeof (*sio));
+        }
+
+        mutex_enter(&scn->scn_status_lock);
+        ASSERT3U(scn->scn_bytes_pending, >=, bytes_issued);
+        scn->scn_bytes_pending -= bytes_issued;
+        mutex_exit(&scn->scn_status_lock);
+
+        ASSERT3U(queue->q_zio_bytes, >=, bytes_issued);
+        atomic_add_64(&queue->q_zio_bytes, -bytes_issued);
+
+        list_destroy(io_list);
+}
+
+/*
+ * Given a range_seg_t (extent) and a list, this function passes over a
+ * scan queue and gathers up the appropriate ios which fit into that
+ * scan seg (starting from lowest LBA). During this, we observe that we
+ * don't go over the `limit' in the total amount of scan_io_t bytes that
+ * were gathered. At the end, we remove the appropriate amount of space
+ * from the q_exts_by_addr. If we have consumed the entire scan seg, we
+ * remove it completely from q_exts_by_addr. If we've only consumed a
+ * portion of it, we shorten the scan seg appropriately. A future call
+ * will consume more of the scan seg's constituent io's until
+ * consuming the extent completely. If we've reduced the size of the
+ * scan seg, we of course reinsert it in the appropriate spot in the
+ * q_exts_by_size tree.
+ */
+static uint64_t
+scan_io_queue_gather(const range_seg_t *rs, list_t *list,
+    dsl_scan_io_queue_t *queue, uint64_t limit)
+{
+        scan_io_t srch_sio, *sio, *next_sio;
+        avl_index_t idx;
+        int64_t num_zios = 0, bytes = 0;
+        boolean_t size_limited = B_FALSE;
+
+        ASSERT(rs != NULL);
+        ASSERT3U(limit, !=, 0);
+        ASSERT(MUTEX_HELD(&queue->q_vd->vdev_scan_io_queue_lock));
+
+        list_create(list, sizeof (scan_io_t),
+            offsetof(scan_io_t, sio_nodes.sio_list_node));
+        SCAN_IO_SET_OFFSET(&srch_sio, rs->rs_start);
+
+        /*
+         * The exact start of the extent might not contain any matching zios,
+         * so if that's the case, examine the next one in the tree.
+         */
+        sio = avl_find(&queue->q_zios_by_addr, &srch_sio, &idx);
+        if (sio == NULL)
+                sio = avl_nearest(&queue->q_zios_by_addr, idx, AVL_AFTER);
+
+        while (sio != NULL && SCAN_IO_GET_OFFSET(sio) < rs->rs_end) {
+                if (bytes >= limit) {
+                        size_limited = B_TRUE;
+                        break;
+                }
+                ASSERT3U(SCAN_IO_GET_OFFSET(sio), >=, rs->rs_start);
+                ASSERT3U(SCAN_IO_GET_OFFSET(sio) + sio->sio_asize, <=,
+                    rs->rs_end);
+
+                next_sio = AVL_NEXT(&queue->q_zios_by_addr, sio);
+                avl_remove(&queue->q_zios_by_addr, sio);
+                list_insert_tail(list, sio);
+                num_zios++;
+                bytes += sio->sio_asize;
+                sio = next_sio;
+        }
+
+        if (size_limited) {
+                uint64_t end;
+                sio = list_tail(list);
+                end = SCAN_IO_GET_OFFSET(sio) + sio->sio_asize;
+                range_tree_remove_fill(queue->q_exts_by_addr, rs->rs_start,
+                    end - rs->rs_start, bytes, 0);
+        } else {
+                /*
+                 * Whole extent consumed, remove it all, including any head
+                 * or tail overhang.
+                 */
+                range_tree_remove_fill(queue->q_exts_by_addr, rs->rs_start,
+                    rs->rs_end - rs->rs_start, bytes, 0);
+        }
+        atomic_add_64(&queue->q_num_issuing_zios, num_zios);
+
+        return (bytes);
+}
+
+/*
+ * This is called from the queue emptying thread and selects the next
+ * extent from which we are to issue io's. The behavior of this function
+ * depends on the state of the scan, the current memory consumption and
+ * whether or not we are performing a scan shutdown.
+ * 1) We select extents in an elevator algorithm (LBA-order) if:
+ *      a) the scan has finished traversing metadata (DSS_FINISHING)
+ *      b) the scan needs to perform a checkpoint
+ * 2) We select the largest available extent if we are up against the
+ *      memory limit.
+ * 3) Otherwise we don't select any extents.
+ */
+static const range_seg_t *
+scan_io_queue_fetch_ext(dsl_scan_io_queue_t *queue)
+{
+        dsl_scan_t *scn = queue->q_scn;
+
+        ASSERT(MUTEX_HELD(&queue->q_vd->vdev_scan_io_queue_lock));
+        ASSERT0(queue->q_issuing_rs.rs_start);
+        ASSERT0(queue->q_issuing_rs.rs_end);
+        ASSERT(scn->scn_is_sorted);
+
+        if (scn->scn_phys.scn_state == DSS_FINISHING ||
+            scn->scn_checkpointing) {
+                /*
+                 * When the scan has finished traversing all metadata and is
+                 * in the DSS_FINISHING state or a checkpoint has been
+                 * requested, no new extents will be added to the sorting
+                 * queue, so the way we are sorted now is as good as it'll
+                 * get. So instead, switch to issuing extents in linear order.
+                 */
+                return (range_tree_first(queue->q_exts_by_addr));
+        } else if (scn->scn_clearing) {
+                return (avl_first(&queue->q_exts_by_size));
+        } else {
+                return (NULL);
+        }
+}
+
+/*
+ * Empties a scan queue until we have issued at least info->qri_limit
+ * bytes, or the queue is empty. This is called via the scn_taskq so as
+ * to parallelize processing of all top-level vdevs as much as possible.
+ */
+static void
+scan_io_queues_run_one(io_queue_run_info_t *info)
+{
+        dsl_scan_io_queue_t *queue = info->qri_queue;
+        uint64_t limit = info->qri_limit;
+        dsl_scan_t *scn = queue->q_scn;
+        kmutex_t *q_lock = &queue->q_vd->vdev_scan_io_queue_lock;
+        list_t zio_list;
+        const range_seg_t *rs;
+        uint64_t issued = 0;
+
+        ASSERT(scn->scn_is_sorted);
+
+        /* loop until we have issued as much I/O as was requested */
+        while (issued < limit) {
+                scan_io_t *first_io, *last_io;
+
+                mutex_enter(q_lock);
+                /* First we select the extent we'll be issuing from next. */
+                rs = scan_io_queue_fetch_ext(queue);
+                DTRACE_PROBE2(queue_fetch_ext, range_seg_t *, rs,
+                    dsl_scan_io_queue_t *, queue);
+                if (rs == NULL) {
+                        mutex_exit(q_lock);
+                        break;
+                }
+
+                /*
+                 * We have selected which extent needs to be processed next,
+                 * gather up the corresponding zio's, taking care not to step
+                 * over the limit.
+                 */
+                issued += scan_io_queue_gather(rs, &zio_list, queue,
+                    limit - issued);
+                first_io = list_head(&zio_list);
+                last_io = list_tail(&zio_list);
+                if (first_io != NULL) {
+                        /*
+                         * We have zio's to issue. Construct a fake range
+                         * seg that covers the whole list of zio's to issue
+                         * (the list is guaranteed to be LBA-ordered) and
+                         * save that in the queue's "in flight" segment.
+                         * This is used to prevent freeing I/O from hitting
+                         * that range while we're working on it.
+                         */
+                        ASSERT(last_io != NULL);
+                        queue->q_issuing_rs.rs_start =
+                            SCAN_IO_GET_OFFSET(first_io);
+                        queue->q_issuing_rs.rs_end =
+                            SCAN_IO_GET_OFFSET(last_io) + last_io->sio_asize;
+                }
+                mutex_exit(q_lock);
+
+                /*
+                 * Issuing zio's can take a long time (especially because
+                 * we are contrained by zfs_top_maxinflight), so drop the
+                 * queue lock.
+                 */
+                scan_io_queue_issue(&zio_list, queue);
+
+                mutex_enter(q_lock);
+                /* invalidate the in-flight I/O range */
+                bzero(&queue->q_issuing_rs, sizeof (queue->q_issuing_rs));
+                cv_broadcast(&queue->q_cv);
+                mutex_exit(q_lock);
+        }
+}
+
+/*
+ * Performs an emptying run on all scan queues in the pool. This just
+ * punches out one thread per top-level vdev, each of which processes
+ * only that vdev's scan queue. We can parallelize the I/O here because
+ * we know that each queue's io's only affect its own top-level vdev.
+ * The amount of I/O dequeued per run of this function is calibrated
+ * dynamically so that its total run time doesn't exceed
+ * zfs_scan_dequeue_run_target_ms + zfs_dequeue_run_bonus_ms. The
+ * timing algorithm aims to hit the target value, but still
+ * oversubscribes the amount of data that it is allowed to fetch by
+ * the bonus value. This is to allow for non-equal completion times
+ * across top-level vdevs.
+ *
+ * This function waits for the queue runs to complete, and must be
+ * called from dsl_scan_sync (or in general, syncing context).
+ */
+static void
+scan_io_queues_run(dsl_scan_t *scn)
+{
+        spa_t *spa = scn->scn_dp->dp_spa;
+        uint64_t dirty_limit, total_limit, total_bytes;
+        io_queue_run_info_t *info;
+        uint64_t dequeue_min = zfs_scan_dequeue_min *
+            spa->spa_root_vdev->vdev_children;
+
+        ASSERT(scn->scn_is_sorted);
+        ASSERT(spa_config_held(spa, SCL_CONFIG, RW_READER));
+
+        if (scn->scn_taskq == NULL) {
+                char *tq_name = kmem_zalloc(ZFS_MAX_DATASET_NAME_LEN + 16,
+                    KM_SLEEP);
+                const int nthreads = spa->spa_root_vdev->vdev_children;
+
+                /*
+                 * We need to make this taskq *always* execute as many
+                 * threads in parallel as we have top-level vdevs and no
+                 * less, otherwise strange serialization of the calls to
+                 * scan_io_queues_run_one can occur during spa_sync runs
+                 * and that significantly impacts performance.
+                 */
+                (void) snprintf(tq_name, ZFS_MAX_DATASET_NAME_LEN + 16,
+                    "dsl_scan_tq_%s", spa->spa_name);
+                scn->scn_taskq = taskq_create(tq_name, nthreads, minclsyspri,
+                    nthreads, nthreads, TASKQ_PREPOPULATE);
+                kmem_free(tq_name, ZFS_MAX_DATASET_NAME_LEN + 16);
+        }
+
+        /*
+         * This is the automatic run time calibration algorithm. We gauge
+         * how long spa_sync took since last time we were invoked. If it
+         * took longer than our target + bonus values, we reduce the
+         * amount of data that the queues are allowed to process in this
+         * iteration. Conversely, if it took less than target + bonus,
+         * we increase the amount of data the queues are allowed to process.
+         * This is designed as a partial load-following algorithm, so if
+         * other ZFS users start issuing I/O, we back off, until we hit our
+         * minimum issue amount (per-TL-vdev) of zfs_scan_dequeue_min bytes.
+         */
+        if (scn->scn_last_queue_run_time != 0) {
+                uint64_t now = ddi_get_lbolt64();
+                uint64_t delta_ms = TICK_TO_MSEC(now -
+                    scn->scn_last_queue_run_time);
+                uint64_t bonus = zfs_dequeue_run_bonus_ms;
+
+                bonus = MIN(bonus, DEQUEUE_BONUS_MS_MAX);
+                if (delta_ms <= bonus)
+                        delta_ms = bonus + 1;
+
+                scn->scn_last_dequeue_limit = MAX(dequeue_min,
+                    (scn->scn_last_dequeue_limit *
+                    zfs_scan_dequeue_run_target_ms) / (delta_ms - bonus));
+                scn->scn_last_queue_run_time = now;
+        } else {
+                scn->scn_last_queue_run_time = ddi_get_lbolt64();
+                scn->scn_last_dequeue_limit = dequeue_min;
+        }
+
+        /*
+         * We also constrain the amount of data we are allowed to issue
+         * by the zfs_dirty_data_max value - this serves as basically a
+         * sanity check just to prevent us from issuing huge amounts of
+         * data to be dequeued per run.
+         */
+        dirty_limit = (zfs_vdev_async_write_active_min_dirty_percent *
+            zfs_dirty_data_max) / 100;
+        if (dirty_limit >= scn->scn_dp->dp_dirty_total)
+                dirty_limit -= scn->scn_dp->dp_dirty_total;
+        else
+                dirty_limit = 0;
+
+        total_limit = MAX(MIN(scn->scn_last_dequeue_limit, dirty_limit),
+            dequeue_min);
+
+        /*
+         * We use this to determine how much data each queue is allowed to
+         * issue this run. We take the amount of dirty data available in
+         * the current txg and proportionally split it between each queue,
+         * depending on how full a given queue is. No need to lock here,
+         * new data can't enter the queue, since that's only done in our
+         * sync thread.
+         */
+        total_bytes = scn->scn_bytes_pending;
+        if (total_bytes == 0)
+                return;
+
+        info = kmem_zalloc(sizeof (*info) * spa->spa_root_vdev->vdev_children,
+            KM_SLEEP);
+        for (uint64_t i = 0; i < spa->spa_root_vdev->vdev_children; i++) {
+                vdev_t *vd = spa->spa_root_vdev->vdev_child[i];
+                dsl_scan_io_queue_t *queue;
+                uint64_t limit;
+
+                /*
+                 * No need to lock to check if the queue exists, since this
+                 * is called from sync context only and queues are only
+                 * created in sync context also.
+                 */
+                queue = vd->vdev_scan_io_queue;
+                if (queue == NULL)
+                        continue;
+
+                /*
+                 * Compute the per-queue limit as a fraction of the queue's
+                 * size, relative to the total amount of zio bytes in all
+                 * all queues. 1000 here is the fixed-point precision. If
+                 * there are ever  more than 1000 top-level vdevs, this
+                 * code might misbehave.
+                 */
+                limit = MAX((((queue->q_zio_bytes * 1000) / total_bytes) *
+                    total_limit) / 1000, zfs_scan_dequeue_min);
+
+                info[i].qri_queue = queue;
+                info[i].qri_limit = limit;
+
+                VERIFY(taskq_dispatch(scn->scn_taskq,
+                    (void (*)(void *))scan_io_queues_run_one, &info[i],
+                    TQ_SLEEP) != NULL);
+        }
+
+        /*
+         * We need to wait for all queues to finish their run, just to keep
+         * things nice and consistent. This doesn't necessarily mean all
+         * I/O generated by the queues emptying has finished (there may be
+         * up to zfs_top_maxinflight zio's still processing on behalf of
+         * each queue).
+         */
+        taskq_wait(scn->scn_taskq);
+
+        kmem_free(info, sizeof (*info) * spa->spa_root_vdev->vdev_children);
+}
+
+/*
+ * Callback invoked when a zio_free() zio is executing. This needs to be
+ * intercepted to prevent the zio from deallocating a particular portion
+ * of disk space and it then getting reallocated and written to, while we
+ * still have it queued up for processing, or even while we're trying to
+ * scrub or resilver it.
+ */
+void
+dsl_scan_freed(spa_t *spa, const blkptr_t *bp)
+{
+        dsl_pool_t *dp = spa->spa_dsl_pool;
+        dsl_scan_t *scn = dp->dp_scan;
+
+        ASSERT(!BP_IS_EMBEDDED(bp));
+        ASSERT(scn != NULL);
+        if (!dsl_scan_is_running(scn))
+                return;
+
+        for (int i = 0; i < BP_GET_NDVAS(bp); i++) {
+                if (BP_IS_SPECIAL(bp) && i != WBC_NORMAL_DVA)
+                        continue;
+                dsl_scan_freed_dva(spa, bp, i);
+        }
+}
+
+static void
+dsl_scan_freed_dva(spa_t *spa, const blkptr_t *bp, int dva_i)
+{
+        dsl_pool_t *dp = spa->spa_dsl_pool;
+        dsl_scan_t *scn = dp->dp_scan;
+        vdev_t *vdev;
+        kmutex_t *q_lock;
+        dsl_scan_io_queue_t *queue;
+        scan_io_t srch, *sio;
+        avl_index_t idx;
+        uint64_t offset;
+        int64_t asize;
+
+        ASSERT(!BP_IS_EMBEDDED(bp));
+        ASSERT(scn != NULL);
+        ASSERT(!BP_IS_SPECIAL(bp) || dva_i == WBC_NORMAL_DVA);
+
+        vdev = vdev_lookup_top(spa, DVA_GET_VDEV(&bp->blk_dva[dva_i]));
+        ASSERT(vdev != NULL);
+        q_lock = &vdev->vdev_scan_io_queue_lock;
+        queue = vdev->vdev_scan_io_queue;
+
+        mutex_enter(q_lock);
+        if (queue == NULL) {
+                mutex_exit(q_lock);
+                return;
+        }
+
+        bp2sio(bp, &srch, dva_i);
+        offset = SCAN_IO_GET_OFFSET(&srch);
+        asize = srch.sio_asize;
+
+        /*
+         * We can find the zio in two states:
+         * 1) Cold, just sitting in the queue of zio's to be issued at
+         *      some point in the future. In this case, all we do is
+         *      remove the zio from the q_zios_by_addr tree, decrement
+         *      its data volume from the containing range_seg_t and
+         *      resort the q_exts_by_size tree to reflect that the
+         *      range_seg_t has lost some of its 'fill'. We don't shorten
+         *      the range_seg_t - this is usually rare enough not to be
+         *      worth the extra hassle of trying keep track of precise
+         *      extent boundaries.
+         * 2) Hot, where the zio is currently in-flight in
+         *      dsl_scan_issue_ios. In this case, we can't simply
+         *      reach in and stop the in-flight zio's, so we instead
+         *      block the caller. Eventually, dsl_scan_issue_ios will
+         *      be done with issuing the zio's it gathered and will
+         *      signal us.
+         */
+        sio = avl_find(&queue->q_zios_by_addr, &srch, &idx);
+        if (sio != NULL) {
+                range_seg_t *rs;
+
+                /* Got it while it was cold in the queue */
+                ASSERT3U(srch.sio_asize, ==, sio->sio_asize);
+                DTRACE_PROBE2(dequeue_now, const blkptr_t *, bp,
+                    dsl_scan_queue_t *, queue);
+                count_block(scn, dp->dp_blkstats, bp);
+                ASSERT(range_tree_contains(queue->q_exts_by_addr, offset,
+                    asize));
+                avl_remove(&queue->q_zios_by_addr, sio);
+
+                /*
+                 * Since we're taking this scan_io_t out of its parent
+                 * range_seg_t, we need to alter the range_seg_t's rs_fill
+                 * value, so this changes its ordering position. We need
+                 * to reinsert in its appropriate place in q_exts_by_size.
+                 */
+                rs = range_tree_find(queue->q_exts_by_addr,
+                    SCAN_IO_GET_OFFSET(sio), sio->sio_asize);
+                ASSERT(rs != NULL);
+                ASSERT3U(rs->rs_fill, >=, sio->sio_asize);
+                avl_remove(&queue->q_exts_by_size, rs);
+                ASSERT3U(rs->rs_fill, >=, sio->sio_asize);
+                rs->rs_fill -= sio->sio_asize;
+                VERIFY3P(avl_find(&queue->q_exts_by_size, rs, &idx), ==, NULL);
+                avl_insert(&queue->q_exts_by_size, rs, idx);
+
+                /*
+                 * We only update the queue byte counter in the cold path,
+                 * otherwise it will already have been accounted for as
+                 * part of the zio's execution.
+                 */
+                ASSERT3U(queue->q_zio_bytes, >=, asize);
+                atomic_add_64(&queue->q_zio_bytes, -asize);
+
+                mutex_enter(&scn->scn_status_lock);
+                ASSERT3U(scn->scn_bytes_pending, >=, asize);
+                scn->scn_bytes_pending -= asize;
+                mutex_exit(&scn->scn_status_lock);
+
+                kmem_free(sio, sizeof (*sio));
+        } else {
+                /*
+                 * If it's part of an extent that's currently being issued,
+                 * wait until the extent has been consumed. In this case it's
+                 * not us who is dequeueing this zio, so no need to
+                 * decrement its size from scn_bytes_pending or the queue.
+                 */
+                while (queue->q_issuing_rs.rs_start <= offset &&
+                    queue->q_issuing_rs.rs_end >= offset + asize) {
+                        DTRACE_PROBE2(dequeue_wait, const blkptr_t *, bp,
+                            dsl_scan_queue_t *, queue);
+                        cv_wait(&queue->q_cv, &vdev->vdev_scan_io_queue_lock);
+                }
+        }
+        mutex_exit(q_lock);
 }