1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  23  * Copyright 2017 Nexenta Systems, Inc.  All rights reserved.
  24  * Copyright 2016 Gary Mills
  25  * Copyright (c) 2011, 2016 by Delphix. All rights reserved.
  26  * Copyright 2017 Joyent, Inc.
  27  * Copyright (c) 2017 Datto Inc.
  28  */
  29 
  30 #include <sys/dsl_scan.h>
  31 #include <sys/dsl_pool.h>
  32 #include <sys/dsl_dataset.h>
  33 #include <sys/dsl_prop.h>
  34 #include <sys/dsl_dir.h>
  35 #include <sys/dsl_synctask.h>
  36 #include <sys/dnode.h>
  37 #include <sys/dmu_tx.h>
  38 #include <sys/dmu_objset.h>
  39 #include <sys/arc.h>
  40 #include <sys/zap.h>
  41 #include <sys/zio.h>
  42 #include <sys/zfs_context.h>
  43 #include <sys/fs/zfs.h>
  44 #include <sys/zfs_znode.h>
  45 #include <sys/spa_impl.h>
  46 #include <sys/vdev_impl.h>
  47 #include <sys/zil_impl.h>
  48 #include <sys/zio_checksum.h>
  49 #include <sys/ddt.h>
  50 #include <sys/sa.h>
  51 #include <sys/sa_impl.h>
  52 #include <sys/zfeature.h>
  53 #include <sys/abd.h>
  54 #ifdef _KERNEL
  55 #include <sys/zfs_vfsops.h>
  56 #endif
  57 #include <sys/range_tree.h>
  58 
  59 extern int zfs_vdev_async_write_active_min_dirty_percent;
  60 
  61 typedef struct {
  62         uint64_t        sds_dsobj;
  63         uint64_t        sds_txg;
  64         avl_node_t      sds_node;
  65 } scan_ds_t;
  66 
  67 typedef struct {
  68         dsl_scan_io_queue_t     *qri_queue;
  69         uint64_t                qri_limit;
  70 } io_queue_run_info_t;
  71 
  72 /*
  73  * This controls what conditions are placed on dsl_scan_sync_state():
  74  * SYNC_OPTIONAL) write out scn_phys iff scn_bytes_pending == 0
  75  * SYNC_MANDATORY) write out scn_phys always. scn_bytes_pending must be 0.
  76  * SYNC_CACHED) if scn_bytes_pending == 0, write out scn_phys. Otherwise
  77  *      write out the scn_phys_cached version.
  78  * See dsl_scan_sync_state for details.
  79  */
  80 typedef enum {
  81         SYNC_OPTIONAL,
  82         SYNC_MANDATORY,
  83         SYNC_CACHED
  84 } state_sync_type_t;
  85 
  86 typedef int (scan_cb_t)(dsl_pool_t *, const blkptr_t *,
  87     const zbookmark_phys_t *);
  88 
  89 static scan_cb_t dsl_scan_scrub_cb;
  90 static void dsl_scan_cancel_sync(void *, dmu_tx_t *);
  91 static void dsl_scan_sync_state(dsl_scan_t *scn, dmu_tx_t *tx,
  92     state_sync_type_t sync_type);
  93 static boolean_t dsl_scan_restarting(dsl_scan_t *, dmu_tx_t *);
  94 
  95 static int scan_ds_queue_compar(const void *a, const void *b);
  96 static void scan_ds_queue_empty(dsl_scan_t *scn, boolean_t destroy);
  97 static boolean_t scan_ds_queue_contains(dsl_scan_t *scn, uint64_t dsobj,
  98     uint64_t *txg);
  99 static int scan_ds_queue_insert(dsl_scan_t *scn, uint64_t dsobj, uint64_t txg);
 100 static void scan_ds_queue_remove(dsl_scan_t *scn, uint64_t dsobj);
 101 static boolean_t scan_ds_queue_first(dsl_scan_t *scn, uint64_t *dsobj,
 102     uint64_t *txg);
 103 static void scan_ds_queue_sync(dsl_scan_t *scn, dmu_tx_t *tx);
 104 
 105 /*
 106  * Maximum number of parallelly executing I/Os per top-level vdev.
 107  * Tune with care. Very high settings (hundreds) are known to trigger
 108  * some firmware bugs and resets on certain SSDs.
 109  */
 110 int zfs_top_maxinflight = 32;
 111 
 112 /*
 113  * Minimum amount of data we dequeue if our queues are full and the
 114  * dirty data limit for a txg has been reached.
 115  */
 116 uint64_t zfs_scan_dequeue_min =                 16 << 20;
 117 /*
 118  * The duration target we're aiming for a dsl_scan_sync to take due to our
 119  * dequeued data. If we go over that value, we lower the amount we dequeue
 120  * each run and vice versa. The bonus value below is just something we add
 121  * on top of he target value so that we have a little bit of fudging in case
 122  * some top-level vdevs finish before others - we want to keep the vdevs as
 123  * hot as possible.
 124  */
 125 uint64_t zfs_scan_dequeue_run_target_ms =       2000;
 126 uint64_t zfs_dequeue_run_bonus_ms =             1000;
 127 #define DEQUEUE_BONUS_MS_MAX                    100000
 128 
 129 boolean_t zfs_scan_direct = B_FALSE;    /* don't queue & sort zios, go direct */
 130 uint64_t zfs_scan_max_ext_gap = 2 << 20;  /* bytes */
 131 /* See scan_io_queue_mem_lim for details on the memory limit tunables */
 132 uint64_t zfs_scan_mem_lim_fact = 20;            /* fraction of physmem */
 133 uint64_t zfs_scan_mem_lim_soft_fact = 20;       /* fraction of mem lim above */
 134 uint64_t zfs_scan_checkpoint_intval = 7200;     /* seconds */
 135 /*
 136  * fill_weight is non-tunable at runtime, so we copy it at module init from
 137  * zfs_scan_fill_weight. Runtime adjustments to zfs_scan_fill_weight would
 138  * break queue sorting.
 139  */
 140 uint64_t zfs_scan_fill_weight = 3;
 141 static uint64_t fill_weight = 3;
 142 
 143 /* See scan_io_queue_mem_lim for details on the memory limit tunables */
 144 uint64_t zfs_scan_mem_lim_min = 16 << 20; /* bytes */
 145 uint64_t zfs_scan_mem_lim_soft_max = 128 << 20;   /* bytes */
 146 
 147 #define ZFS_SCAN_CHECKPOINT_INTVAL      SEC_TO_TICK(zfs_scan_checkpoint_intval)
 148 
 149 int zfs_scan_min_time_ms = 1000; /* min millisecs to scrub per txg */
 150 int zfs_free_min_time_ms = 1000; /* min millisecs to free per txg */
 151 int zfs_resilver_min_time_ms = 3000; /* min millisecs to resilver per txg */
 152 boolean_t zfs_no_scrub_io = B_FALSE; /* set to disable scrub i/o */
 153 boolean_t zfs_no_scrub_prefetch = B_FALSE; /* set to disable scrub prefetch */
 154 enum ddt_class zfs_scrub_ddt_class_max = DDT_CLASS_DUPLICATE;
 155 int dsl_scan_delay_completion = B_FALSE; /* set to delay scan completion */
 156 /* max number of blocks to free in a single TXG */
 157 uint64_t zfs_free_max_blocks = UINT64_MAX;
 158 
 159 #define DSL_SCAN_IS_SCRUB_RESILVER(scn) \
 160         ((scn)->scn_phys.scn_func == POOL_SCAN_SCRUB || \
 161         (scn)->scn_phys.scn_func == POOL_SCAN_RESILVER || \
 162         (scn)->scn_phys.scn_func == POOL_SCAN_MOS || \
 163         (scn)->scn_phys.scn_func == POOL_SCAN_META)
 164 
 165 extern int zfs_txg_timeout;
 166 
 167 /*
 168  * Enable/disable the processing of the free_bpobj object.
 169  */
 170 boolean_t zfs_free_bpobj_enabled = B_TRUE;
 171 
 172 /* the order has to match pool_scan_type */
 173 static scan_cb_t *scan_funcs[POOL_SCAN_FUNCS] = {
 174         NULL,
 175         dsl_scan_scrub_cb,      /* POOL_SCAN_SCRUB */
 176         dsl_scan_scrub_cb,      /* POOL_SCAN_RESILVER */
 177         dsl_scan_scrub_cb,      /* POOL_SCAN_MOS */
 178         dsl_scan_scrub_cb,      /* POOL_SCAN_META */
 179 };
 180 
 181 typedef struct scan_io {
 182         uint64_t                sio_prop;
 183         uint64_t                sio_phys_birth;
 184         uint64_t                sio_birth;
 185         zio_cksum_t             sio_cksum;
 186         zbookmark_phys_t        sio_zb;
 187         union {
 188                 avl_node_t      sio_addr_node;
 189                 list_node_t     sio_list_node;
 190         } sio_nodes;
 191         uint64_t                sio_dva_word1;
 192         uint32_t                sio_asize;
 193         int                     sio_flags;
 194 } scan_io_t;
 195 
 196 struct dsl_scan_io_queue {
 197         dsl_scan_t      *q_scn;
 198         vdev_t          *q_vd;
 199 
 200         kcondvar_t      q_cv;
 201 
 202         range_tree_t    *q_exts_by_addr;
 203         avl_tree_t      q_zios_by_addr;
 204         avl_tree_t      q_exts_by_size;
 205 
 206         /* number of bytes in queued zios - atomic ops */
 207         uint64_t        q_zio_bytes;
 208 
 209         range_seg_t     q_issuing_rs;
 210         uint64_t        q_num_issuing_zios;
 211 };
 212 
 213 #define SCAN_IO_GET_OFFSET(sio) \
 214         BF64_GET_SB((sio)->sio_dva_word1, 0, 63, SPA_MINBLOCKSHIFT, 0)
 215 #define SCAN_IO_SET_OFFSET(sio, offset) \
 216         BF64_SET_SB((sio)->sio_dva_word1, 0, 63, SPA_MINBLOCKSHIFT, 0, offset)
 217 
 218 static void scan_io_queue_insert_cb(range_tree_t *rt, range_seg_t *rs,
 219     void *arg);
 220 static void scan_io_queue_remove_cb(range_tree_t *rt, range_seg_t *rs,
 221     void *arg);
 222 static void scan_io_queue_vacate_cb(range_tree_t *rt, void *arg);
 223 static int ext_size_compar(const void *x, const void *y);
 224 static int io_addr_compar(const void *x, const void *y);
 225 
 226 static struct range_tree_ops scan_io_queue_ops = {
 227         .rtop_create = NULL,
 228         .rtop_destroy = NULL,
 229         .rtop_add = scan_io_queue_insert_cb,
 230         .rtop_remove = scan_io_queue_remove_cb,
 231         .rtop_vacate = scan_io_queue_vacate_cb
 232 };
 233 
 234 typedef enum {
 235         MEM_LIM_NONE,
 236         MEM_LIM_SOFT,
 237         MEM_LIM_HARD
 238 } mem_lim_t;
 239 
 240 static void dsl_scan_enqueue(dsl_pool_t *dp, const blkptr_t *bp,
 241     int zio_flags, const zbookmark_phys_t *zb);
 242 static void scan_exec_io(dsl_pool_t *dp, const blkptr_t *bp, int zio_flags,
 243     const zbookmark_phys_t *zb, boolean_t limit_inflight);
 244 static void scan_io_queue_insert(dsl_scan_t *scn, dsl_scan_io_queue_t *queue,
 245     const blkptr_t *bp, int dva_i, int zio_flags, const zbookmark_phys_t *zb);
 246 
 247 static void scan_io_queues_run_one(io_queue_run_info_t *info);
 248 static void scan_io_queues_run(dsl_scan_t *scn);
 249 static mem_lim_t scan_io_queue_mem_lim(dsl_scan_t *scn);
 250 
 251 static dsl_scan_io_queue_t *scan_io_queue_create(vdev_t *vd);
 252 static void scan_io_queues_destroy(dsl_scan_t *scn);
 253 static void dsl_scan_freed_dva(spa_t *spa, const blkptr_t *bp, int dva_i);
 254 
 255 static inline boolean_t
 256 dsl_scan_is_running(const dsl_scan_t *scn)
 257 {
 258         return (scn->scn_phys.scn_state == DSS_SCANNING ||
 259             scn->scn_phys.scn_state == DSS_FINISHING);
 260 }
 261 
 262 static inline void
 263 sio2bp(const scan_io_t *sio, blkptr_t *bp, uint64_t vdev_id)
 264 {
 265         bzero(bp, sizeof (*bp));
 266         DVA_SET_ASIZE(&bp->blk_dva[0], sio->sio_asize);
 267         DVA_SET_VDEV(&bp->blk_dva[0], vdev_id);
 268         bp->blk_dva[0].dva_word[1] = sio->sio_dva_word1;
 269         bp->blk_prop = sio->sio_prop;
 270         /*
 271          * We must reset the special flag, because the rebuilt BP lacks
 272          * a second DVA, so wbc_select_dva must not be allowed to run.
 273          */
 274         BP_SET_SPECIAL(bp, 0);
 275         bp->blk_phys_birth = sio->sio_phys_birth;
 276         bp->blk_birth = sio->sio_birth;
 277         bp->blk_fill = 1;    /* we always only work with data pointers */
 278         bp->blk_cksum = sio->sio_cksum;
 279 }
 280 
 281 static inline void
 282 bp2sio(const blkptr_t *bp, scan_io_t *sio, int dva_i)
 283 {
 284         if (BP_IS_SPECIAL(bp))
 285                 ASSERT3S(dva_i, ==, WBC_NORMAL_DVA);
 286         /* we discard the vdev guid, since we can deduce it from the queue */
 287         sio->sio_dva_word1 = bp->blk_dva[dva_i].dva_word[1];
 288         sio->sio_asize = DVA_GET_ASIZE(&bp->blk_dva[dva_i]);
 289         sio->sio_prop = bp->blk_prop;
 290         sio->sio_phys_birth = bp->blk_phys_birth;
 291         sio->sio_birth = bp->blk_birth;
 292         sio->sio_cksum = bp->blk_cksum;
 293 }
 294 
 295 void
 296 dsl_scan_global_init()
 297 {
 298         fill_weight = zfs_scan_fill_weight;
 299 }
 300 
 301 int
 302 dsl_scan_init(dsl_pool_t *dp, uint64_t txg)
 303 {
 304         int err;
 305         dsl_scan_t *scn;
 306         spa_t *spa = dp->dp_spa;
 307         uint64_t f;
 308 
 309         scn = dp->dp_scan = kmem_zalloc(sizeof (dsl_scan_t), KM_SLEEP);
 310         scn->scn_dp = dp;
 311 
 312         mutex_init(&scn->scn_sorted_lock, NULL, MUTEX_DEFAULT, NULL);
 313         mutex_init(&scn->scn_status_lock, NULL, MUTEX_DEFAULT, NULL);
 314 
 315         /*
 316          * It's possible that we're resuming a scan after a reboot so
 317          * make sure that the scan_async_destroying flag is initialized
 318          * appropriately.
 319          */
 320         ASSERT(!scn->scn_async_destroying);
 321         scn->scn_async_destroying = spa_feature_is_active(dp->dp_spa,
 322             SPA_FEATURE_ASYNC_DESTROY);
 323 
 324         bcopy(&scn->scn_phys, &scn->scn_phys_cached, sizeof (scn->scn_phys));
 325         mutex_init(&scn->scn_queue_lock, NULL, MUTEX_DEFAULT, NULL);
 326         avl_create(&scn->scn_queue, scan_ds_queue_compar, sizeof (scan_ds_t),
 327             offsetof(scan_ds_t, sds_node));
 328 
 329         err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
 330             "scrub_func", sizeof (uint64_t), 1, &f);
 331         if (err == 0) {
 332                 /*
 333                  * There was an old-style scrub in progress.  Restart a
 334                  * new-style scrub from the beginning.
 335                  */
 336                 scn->scn_restart_txg = txg;
 337                 DTRACE_PROBE2(scan_init__old2new, dsl_scan_t *, scn,
 338                     uint64_t, txg);
 339                 zfs_dbgmsg("old-style scrub was in progress; "
 340                     "restarting new-style scrub in txg %llu",
 341                     scn->scn_restart_txg);
 342 
 343                 /*
 344                  * Load the queue obj from the old location so that it
 345                  * can be freed by dsl_scan_done().
 346                  */
 347                 (void) zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
 348                     "scrub_queue", sizeof (uint64_t), 1,
 349                     &scn->scn_phys.scn_queue_obj);
 350         } else {
 351                 err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
 352                     DMU_POOL_SCAN, sizeof (uint64_t), SCAN_PHYS_NUMINTS,
 353                     &scn->scn_phys);
 354                 if (err == ENOENT)
 355                         return (0);
 356                 else if (err)
 357                         return (err);
 358 
 359                 /*
 360                  * We might be restarting after a reboot, so jump the issued
 361                  * counter to how far we've scanned. We know we're consistent
 362                  * up to here.
 363                  */
 364                 scn->scn_bytes_issued = scn->scn_phys.scn_examined;
 365 
 366                 if (dsl_scan_is_running(scn) &&
 367                     spa_prev_software_version(dp->dp_spa) < SPA_VERSION_SCAN) {
 368                         /*
 369                          * A new-type scrub was in progress on an old
 370                          * pool, and the pool was accessed by old
 371                          * software.  Restart from the beginning, since
 372                          * the old software may have changed the pool in
 373                          * the meantime.
 374                          */
 375                         scn->scn_restart_txg = txg;
 376                         DTRACE_PROBE2(scan_init__new2old2new,
 377                             dsl_scan_t *, scn, uint64_t, txg);
 378                         zfs_dbgmsg("new-style scrub was modified "
 379                             "by old software; restarting in txg %llu",
 380                             scn->scn_restart_txg);
 381                 }
 382         }
 383 
 384         /* reload the queue into the in-core state */
 385         if (scn->scn_phys.scn_queue_obj != 0) {
 386                 zap_cursor_t zc;
 387                 zap_attribute_t za;
 388 
 389                 for (zap_cursor_init(&zc, dp->dp_meta_objset,
 390                     scn->scn_phys.scn_queue_obj);
 391                     zap_cursor_retrieve(&zc, &za) == 0;
 392                     (void) zap_cursor_advance(&zc)) {
 393                         VERIFY0(scan_ds_queue_insert(scn,
 394                             zfs_strtonum(za.za_name, NULL),
 395                             za.za_first_integer));
 396                 }
 397                 zap_cursor_fini(&zc);
 398         }
 399 
 400         spa_scan_stat_init(spa);
 401         return (0);
 402 }
 403 
 404 void
 405 dsl_scan_fini(dsl_pool_t *dp)
 406 {
 407         if (dp->dp_scan != NULL) {
 408                 dsl_scan_t *scn = dp->dp_scan;
 409 
 410                 mutex_destroy(&scn->scn_sorted_lock);
 411                 mutex_destroy(&scn->scn_status_lock);
 412                 if (scn->scn_taskq != NULL)
 413                         taskq_destroy(scn->scn_taskq);
 414                 scan_ds_queue_empty(scn, B_TRUE);
 415                 mutex_destroy(&scn->scn_queue_lock);
 416 
 417                 kmem_free(dp->dp_scan, sizeof (dsl_scan_t));
 418                 dp->dp_scan = NULL;
 419         }
 420 }
 421 
 422 /* ARGSUSED */
 423 static int
 424 dsl_scan_setup_check(void *arg, dmu_tx_t *tx)
 425 {
 426         dsl_scan_t *scn = dmu_tx_pool(tx)->dp_scan;
 427 
 428         if (dsl_scan_is_running(scn))
 429                 return (SET_ERROR(EBUSY));
 430 
 431         return (0);
 432 }
 433 
 434 static void
 435 dsl_scan_setup_sync(void *arg, dmu_tx_t *tx)
 436 {
 437         dsl_scan_t *scn = dmu_tx_pool(tx)->dp_scan;
 438         pool_scan_func_t *funcp = arg;
 439         dmu_object_type_t ot = 0;
 440         dsl_pool_t *dp = scn->scn_dp;
 441         spa_t *spa = dp->dp_spa;
 442 
 443         ASSERT(!dsl_scan_is_running(scn));
 444         ASSERT(*funcp > POOL_SCAN_NONE && *funcp < POOL_SCAN_FUNCS);
 445         bzero(&scn->scn_phys, sizeof (scn->scn_phys));
 446         scn->scn_phys.scn_func = *funcp;
 447         scn->scn_phys.scn_state = DSS_SCANNING;
 448         scn->scn_phys.scn_min_txg = 0;
 449         scn->scn_phys.scn_max_txg = tx->tx_txg;
 450         /* the entire DDT */
 451         scn->scn_phys.scn_ddt_class_max = spa->spa_ddt_class_max;
 452         scn->scn_phys.scn_start_time = gethrestime_sec();
 453         scn->scn_phys.scn_errors = 0;
 454         scn->scn_phys.scn_to_examine = spa->spa_root_vdev->vdev_stat.vs_alloc;
 455         scn->scn_restart_txg = 0;
 456         scn->scn_done_txg = 0;
 457         scn->scn_bytes_issued = 0;
 458         scn->scn_checkpointing = B_FALSE;
 459         scn->scn_last_checkpoint = 0;
 460         spa_scan_stat_init(spa);
 461 
 462         if (DSL_SCAN_IS_SCRUB_RESILVER(scn)) {
 463                 scn->scn_phys.scn_ddt_class_max =
 464                     MIN(zfs_scrub_ddt_class_max, spa->spa_ddt_class_max);
 465 
 466                 /* rewrite all disk labels */
 467                 vdev_config_dirty(spa->spa_root_vdev);
 468 
 469                 if (vdev_resilver_needed(spa->spa_root_vdev,
 470                     &scn->scn_phys.scn_min_txg, &scn->scn_phys.scn_max_txg)) {
 471                         spa_event_notify(spa, NULL, NULL,
 472                             ESC_ZFS_RESILVER_START);
 473                 } else {
 474                         spa_event_notify(spa, NULL, NULL, ESC_ZFS_SCRUB_START);
 475                 }
 476 
 477                 spa->spa_scrub_started = B_TRUE;
 478                 /*
 479                  * If this is an incremental scrub, limit the DDT scrub phase
 480                  * to just the auto-ditto class (for correctness); the rest
 481                  * of the scrub should go faster using top-down pruning.
 482                  */
 483                 if (scn->scn_phys.scn_min_txg > TXG_INITIAL)
 484                         scn->scn_phys.scn_ddt_class_max =
 485                             MIN(DDT_CLASS_DITTO, spa->spa_ddt_class_max);
 486 
 487         }
 488 
 489         /* back to the generic stuff */
 490 
 491         if (dp->dp_blkstats == NULL) {
 492                 dp->dp_blkstats =
 493                     kmem_alloc(sizeof (zfs_all_blkstats_t), KM_SLEEP);
 494         }
 495         bzero(dp->dp_blkstats, sizeof (zfs_all_blkstats_t));
 496 
 497         if (spa_version(spa) < SPA_VERSION_DSL_SCRUB)
 498                 ot = DMU_OT_ZAP_OTHER;
 499 
 500         scn->scn_phys.scn_queue_obj = zap_create(dp->dp_meta_objset,
 501             ot ? ot : DMU_OT_SCAN_QUEUE, DMU_OT_NONE, 0, tx);
 502 
 503         bcopy(&scn->scn_phys, &scn->scn_phys_cached, sizeof (scn->scn_phys));
 504 
 505         dsl_scan_sync_state(scn, tx, SYNC_MANDATORY);
 506 
 507         spa_history_log_internal(spa, "scan setup", tx,
 508             "func=%u mintxg=%llu maxtxg=%llu",
 509             *funcp, scn->scn_phys.scn_min_txg, scn->scn_phys.scn_max_txg);
 510 }
 511 
 512 /* ARGSUSED */
 513 static void
 514 dsl_scan_done(dsl_scan_t *scn, boolean_t complete, dmu_tx_t *tx)
 515 {
 516         static const char *old_names[] = {
 517                 "scrub_bookmark",
 518                 "scrub_ddt_bookmark",
 519                 "scrub_ddt_class_max",
 520                 "scrub_queue",
 521                 "scrub_min_txg",
 522                 "scrub_max_txg",
 523                 "scrub_func",
 524                 "scrub_errors",
 525                 NULL
 526         };
 527 
 528         dsl_pool_t *dp = scn->scn_dp;
 529         spa_t *spa = dp->dp_spa;
 530         int i;
 531 
 532         /* Remove any remnants of an old-style scrub. */
 533         for (i = 0; old_names[i]; i++) {
 534                 (void) zap_remove(dp->dp_meta_objset,
 535                     DMU_POOL_DIRECTORY_OBJECT, old_names[i], tx);
 536         }
 537 
 538         if (scn->scn_phys.scn_queue_obj != 0) {
 539                 VERIFY0(dmu_object_free(dp->dp_meta_objset,
 540                     scn->scn_phys.scn_queue_obj, tx));
 541                 scn->scn_phys.scn_queue_obj = 0;
 542         }
 543         scan_ds_queue_empty(scn, B_FALSE);
 544 
 545         scn->scn_phys.scn_flags &= ~DSF_SCRUB_PAUSED;
 546 
 547         /*
 548          * If we were "restarted" from a stopped state, don't bother
 549          * with anything else.
 550          */
 551         if (!dsl_scan_is_running(scn)) {
 552                 ASSERT(!scn->scn_is_sorted);
 553                 return;
 554         }
 555 
 556         if (scn->scn_is_sorted) {
 557                 scan_io_queues_destroy(scn);
 558                 scn->scn_is_sorted = B_FALSE;
 559 
 560                 if (scn->scn_taskq != NULL) {
 561                         taskq_destroy(scn->scn_taskq);
 562                         scn->scn_taskq = NULL;
 563                 }
 564         }
 565 
 566         scn->scn_phys.scn_state = complete ? DSS_FINISHED : DSS_CANCELED;
 567 
 568         if (dsl_scan_restarting(scn, tx))
 569                 spa_history_log_internal(spa, "scan aborted, restarting", tx,
 570                     "errors=%llu", spa_get_errlog_size(spa));
 571         else if (!complete)
 572                 spa_history_log_internal(spa, "scan cancelled", tx,
 573                     "errors=%llu", spa_get_errlog_size(spa));
 574         else
 575                 spa_history_log_internal(spa, "scan done", tx,
 576                     "errors=%llu", spa_get_errlog_size(spa));
 577 
 578         if (DSL_SCAN_IS_SCRUB_RESILVER(scn)) {
 579                 mutex_enter(&spa->spa_scrub_lock);
 580                 while (spa->spa_scrub_inflight > 0) {
 581                         cv_wait(&spa->spa_scrub_io_cv,
 582                             &spa->spa_scrub_lock);
 583                 }
 584                 mutex_exit(&spa->spa_scrub_lock);
 585                 spa->spa_scrub_started = B_FALSE;
 586                 spa->spa_scrub_active = B_FALSE;
 587 
 588                 /*
 589                  * If the scrub/resilver completed, update all DTLs to
 590                  * reflect this.  Whether it succeeded or not, vacate
 591                  * all temporary scrub DTLs.
 592                  */
 593                 vdev_dtl_reassess(spa->spa_root_vdev, tx->tx_txg,
 594                     complete ? scn->scn_phys.scn_max_txg : 0, B_TRUE);
 595                 if (complete) {
 596                         spa_event_notify(spa, NULL, NULL,
 597                             scn->scn_phys.scn_min_txg ?
 598                             ESC_ZFS_RESILVER_FINISH : ESC_ZFS_SCRUB_FINISH);
 599                 }
 600                 spa_errlog_rotate(spa);
 601 
 602                 /*
 603                  * We may have finished replacing a device.
 604                  * Let the async thread assess this and handle the detach.
 605                  */
 606                 spa_async_request(spa, SPA_ASYNC_RESILVER_DONE);
 607         }
 608 
 609         scn->scn_phys.scn_end_time = gethrestime_sec();
 610 
 611         ASSERT(!dsl_scan_is_running(scn));
 612 
 613         /*
 614          * If the special-vdev does not have any errors after
 615          * SCRUB/RESILVER we need to drop flag that does not
 616          * allow to write to special
 617          */
 618         spa_special_check_errors(spa);
 619 }
 620 
 621 /* ARGSUSED */
 622 static int
 623 dsl_scan_cancel_check(void *arg, dmu_tx_t *tx)
 624 {
 625         dsl_scan_t *scn = dmu_tx_pool(tx)->dp_scan;
 626 
 627         if (!dsl_scan_is_running(scn))
 628                 return (SET_ERROR(ENOENT));
 629         return (0);
 630 }
 631 
 632 /* ARGSUSED */
 633 static void
 634 dsl_scan_cancel_sync(void *arg, dmu_tx_t *tx)
 635 {
 636         dsl_scan_t *scn = dmu_tx_pool(tx)->dp_scan;
 637 
 638         dsl_scan_done(scn, B_FALSE, tx);
 639         dsl_scan_sync_state(scn, tx, SYNC_MANDATORY);
 640 }
 641 
 642 int
 643 dsl_scan_cancel(dsl_pool_t *dp)
 644 {
 645         return (dsl_sync_task(spa_name(dp->dp_spa), dsl_scan_cancel_check,
 646             dsl_scan_cancel_sync, NULL, 3, ZFS_SPACE_CHECK_RESERVED));
 647 }
 648 
 649 boolean_t
 650 dsl_scan_is_paused_scrub(const dsl_scan_t *scn)
 651 {
 652         if (dsl_scan_scrubbing(scn->scn_dp) &&
 653             scn->scn_phys.scn_flags & DSF_SCRUB_PAUSED)
 654                 return (B_TRUE);
 655 
 656         return (B_FALSE);
 657 }
 658 
 659 static int
 660 dsl_scrub_pause_resume_check(void *arg, dmu_tx_t *tx)
 661 {
 662         pool_scrub_cmd_t *cmd = arg;
 663         dsl_pool_t *dp = dmu_tx_pool(tx);
 664         dsl_scan_t *scn = dp->dp_scan;
 665 
 666         if (*cmd == POOL_SCRUB_PAUSE) {
 667                 /* can't pause a scrub when there is no in-progress scrub */
 668                 if (!dsl_scan_scrubbing(dp))
 669                         return (SET_ERROR(ENOENT));
 670 
 671                 /* can't pause a paused scrub */
 672                 if (dsl_scan_is_paused_scrub(scn))
 673                         return (SET_ERROR(EBUSY));
 674         } else if (*cmd != POOL_SCRUB_NORMAL) {
 675                 return (SET_ERROR(ENOTSUP));
 676         }
 677 
 678         return (0);
 679 }
 680 
 681 static void
 682 dsl_scrub_pause_resume_sync(void *arg, dmu_tx_t *tx)
 683 {
 684         pool_scrub_cmd_t *cmd = arg;
 685         dsl_pool_t *dp = dmu_tx_pool(tx);
 686         spa_t *spa = dp->dp_spa;
 687         dsl_scan_t *scn = dp->dp_scan;
 688 
 689         if (*cmd == POOL_SCRUB_PAUSE) {
 690                 /* can't pause a scrub when there is no in-progress scrub */
 691                 spa->spa_scan_pass_scrub_pause = gethrestime_sec();
 692                 scn->scn_phys.scn_flags |= DSF_SCRUB_PAUSED;
 693                 scn->scn_phys_cached.scn_flags |= DSF_SCRUB_PAUSED;
 694                 dsl_scan_sync_state(scn, tx, SYNC_CACHED);
 695         } else {
 696                 ASSERT3U(*cmd, ==, POOL_SCRUB_NORMAL);
 697                 if (dsl_scan_is_paused_scrub(scn)) {
 698                         /*
 699                          * We need to keep track of how much time we spend
 700                          * paused per pass so that we can adjust the scrub rate
 701                          * shown in the output of 'zpool status'
 702                          */
 703                         spa->spa_scan_pass_scrub_spent_paused +=
 704                             gethrestime_sec() - spa->spa_scan_pass_scrub_pause;
 705                         spa->spa_scan_pass_scrub_pause = 0;
 706                         scn->scn_phys.scn_flags &= ~DSF_SCRUB_PAUSED;
 707                         scn->scn_phys_cached.scn_flags &= ~DSF_SCRUB_PAUSED;
 708                         dsl_scan_sync_state(scn, tx, SYNC_CACHED);
 709                 }
 710         }
 711 }
 712 
 713 /*
 714  * Set scrub pause/resume state if it makes sense to do so
 715  */
 716 int
 717 dsl_scrub_set_pause_resume(const dsl_pool_t *dp, pool_scrub_cmd_t cmd)
 718 {
 719         return (dsl_sync_task(spa_name(dp->dp_spa),
 720             dsl_scrub_pause_resume_check, dsl_scrub_pause_resume_sync, &cmd, 3,
 721             ZFS_SPACE_CHECK_RESERVED));
 722 }
 723 
 724 boolean_t
 725 dsl_scan_scrubbing(const dsl_pool_t *dp)
 726 {
 727         dsl_scan_t *scn = dp->dp_scan;
 728 
 729         if ((scn->scn_phys.scn_state == DSS_SCANNING ||
 730             scn->scn_phys.scn_state == DSS_FINISHING) &&
 731             scn->scn_phys.scn_func == POOL_SCAN_SCRUB)
 732                 return (B_TRUE);
 733 
 734         return (B_FALSE);
 735 }
 736 
 737 static void dsl_scan_visitbp(blkptr_t *bp, const zbookmark_phys_t *zb,
 738     dnode_phys_t *dnp, dsl_dataset_t *ds, dsl_scan_t *scn,
 739     dmu_objset_type_t ostype, dmu_tx_t *tx);
 740 static void dsl_scan_visitdnode(dsl_scan_t *, dsl_dataset_t *ds,
 741     dmu_objset_type_t ostype,
 742     dnode_phys_t *dnp, uint64_t object, dmu_tx_t *tx);
 743 
 744 void
 745 dsl_free(dsl_pool_t *dp, uint64_t txg, const blkptr_t *bp)
 746 {
 747         zio_free(dp->dp_spa, txg, bp);
 748 }
 749 
 750 void
 751 dsl_free_sync(zio_t *pio, dsl_pool_t *dp, uint64_t txg, const blkptr_t *bpp)
 752 {
 753         ASSERT(dsl_pool_sync_context(dp));
 754         zio_nowait(zio_free_sync(pio, dp->dp_spa, txg, bpp, pio->io_flags));
 755 }
 756 
 757 static uint64_t
 758 dsl_scan_ds_maxtxg(dsl_dataset_t *ds)
 759 {
 760         uint64_t smt = ds->ds_dir->dd_pool->dp_scan->scn_phys.scn_max_txg;
 761         if (ds->ds_is_snapshot)
 762                 return (MIN(smt, dsl_dataset_phys(ds)->ds_creation_txg));
 763         return (smt);
 764 }
 765 
 766 /*
 767  * This is the dataset processing "queue", i.e. the datasets that are to be
 768  * scanned for data locations and inserted into the LBA reordering tree.
 769  * Please note that even though we call this a "queue", the actual
 770  * implementation uses an avl tree (to detect double insertion). The tree
 771  * uses the dataset object set number for the sorting criterion, so
 772  * scan_ds_queue_insert CANNOT be guaranteed to always append stuff at the
 773  * end (datasets are inserted by the scanner in discovery order, i.e.
 774  * parent-child relationships). Consequently, the scanner must never step
 775  * through the AVL tree in a naively sequential fashion using AVL_NEXT.
 776  * We must always use scan_ds_queue_first to pick the first dataset in the
 777  * list, process it, remove it using scan_ds_queue_remove and pick the next
 778  * first dataset, again using scan_ds_queue_first.
 779  */
 780 static int
 781 scan_ds_queue_compar(const void *a, const void *b)
 782 {
 783         const scan_ds_t *sds_a = a, *sds_b = b;
 784 
 785         if (sds_a->sds_dsobj < sds_b->sds_dsobj)
 786                 return (-1);
 787         if (sds_a->sds_dsobj == sds_b->sds_dsobj)
 788                 return (0);
 789         return (1);
 790 }
 791 
 792 static void
 793 scan_ds_queue_empty(dsl_scan_t *scn, boolean_t destroy)
 794 {
 795         void *cookie = NULL;
 796         scan_ds_t *sds;
 797 
 798         mutex_enter(&scn->scn_queue_lock);
 799         while ((sds = avl_destroy_nodes(&scn->scn_queue, &cookie)) != NULL)
 800                 kmem_free(sds, sizeof (*sds));
 801         mutex_exit(&scn->scn_queue_lock);
 802 
 803         if (destroy)
 804                 avl_destroy(&scn->scn_queue);
 805 }
 806 
 807 static boolean_t
 808 scan_ds_queue_contains(dsl_scan_t *scn, uint64_t dsobj, uint64_t *txg)
 809 {
 810         scan_ds_t *sds;
 811         scan_ds_t srch = { .sds_dsobj = dsobj };
 812 
 813         mutex_enter(&scn->scn_queue_lock);
 814         sds = avl_find(&scn->scn_queue, &srch, NULL);
 815         if (sds != NULL && txg != NULL)
 816                 *txg = sds->sds_txg;
 817         mutex_exit(&scn->scn_queue_lock);
 818 
 819         return (sds != NULL);
 820 }
 821 
 822 static int
 823 scan_ds_queue_insert(dsl_scan_t *scn, uint64_t dsobj, uint64_t txg)
 824 {
 825         scan_ds_t *sds;
 826         avl_index_t where;
 827 
 828         sds = kmem_zalloc(sizeof (*sds), KM_SLEEP);
 829         sds->sds_dsobj = dsobj;
 830         sds->sds_txg = txg;
 831 
 832         mutex_enter(&scn->scn_queue_lock);
 833         if (avl_find(&scn->scn_queue, sds, &where) != NULL) {
 834                 kmem_free(sds, sizeof (*sds));
 835                 return (EEXIST);
 836         }
 837         avl_insert(&scn->scn_queue, sds, where);
 838         mutex_exit(&scn->scn_queue_lock);
 839 
 840         return (0);
 841 }
 842 
 843 static void
 844 scan_ds_queue_remove(dsl_scan_t *scn, uint64_t dsobj)
 845 {
 846         scan_ds_t srch, *sds;
 847 
 848         srch.sds_dsobj = dsobj;
 849 
 850         mutex_enter(&scn->scn_queue_lock);
 851         sds = avl_find(&scn->scn_queue, &srch, NULL);
 852         VERIFY(sds != NULL);
 853         avl_remove(&scn->scn_queue, sds);
 854         mutex_exit(&scn->scn_queue_lock);
 855 
 856         kmem_free(sds, sizeof (*sds));
 857 }
 858 
 859 static boolean_t
 860 scan_ds_queue_first(dsl_scan_t *scn, uint64_t *dsobj, uint64_t *txg)
 861 {
 862         scan_ds_t *sds;
 863 
 864         mutex_enter(&scn->scn_queue_lock);
 865         sds = avl_first(&scn->scn_queue);
 866         if (sds != NULL) {
 867                 *dsobj = sds->sds_dsobj;
 868                 *txg = sds->sds_txg;
 869         }
 870         mutex_exit(&scn->scn_queue_lock);
 871 
 872         return (sds != NULL);
 873 }
 874 
 875 static void
 876 scan_ds_queue_sync(dsl_scan_t *scn, dmu_tx_t *tx)
 877 {
 878         dsl_pool_t *dp = scn->scn_dp;
 879         spa_t *spa = dp->dp_spa;
 880         dmu_object_type_t ot = (spa_version(spa) >= SPA_VERSION_DSL_SCRUB) ?
 881             DMU_OT_SCAN_QUEUE : DMU_OT_ZAP_OTHER;
 882 
 883         ASSERT0(scn->scn_bytes_pending);
 884         ASSERT(scn->scn_phys.scn_queue_obj != 0);
 885 
 886         VERIFY0(dmu_object_free(dp->dp_meta_objset,
 887             scn->scn_phys.scn_queue_obj, tx));
 888         scn->scn_phys.scn_queue_obj = zap_create(dp->dp_meta_objset, ot,
 889             DMU_OT_NONE, 0, tx);
 890 
 891         mutex_enter(&scn->scn_queue_lock);
 892         for (scan_ds_t *sds = avl_first(&scn->scn_queue);
 893             sds != NULL; sds = AVL_NEXT(&scn->scn_queue, sds)) {
 894                 VERIFY0(zap_add_int_key(dp->dp_meta_objset,
 895                     scn->scn_phys.scn_queue_obj, sds->sds_dsobj,
 896                     sds->sds_txg, tx));
 897         }
 898         mutex_exit(&scn->scn_queue_lock);
 899 }
 900 
 901 /*
 902  * Writes out a persistent dsl_scan_phys_t record to the pool directory.
 903  * Because we can be running in the block sorting algorithm, we do not always
 904  * want to write out the record, only when it is "safe" to do so. This safety
 905  * condition is achieved by making sure that the sorting queues are empty
 906  * (scn_bytes_pending==0). The sync'ed state could be inconsistent with how
 907  * much actual scanning progress has been made. What kind of sync is performed
 908  * specified by the sync_type argument. If the sync is optional, we only
 909  * sync if the queues are empty. If the sync is mandatory, we do a hard VERIFY
 910  * to make sure that the queues are empty. The third possible state is a
 911  * "cached" sync. This is done in response to:
 912  * 1) The dataset that was in the last sync'ed dsl_scan_phys_t having been
 913  *      destroyed, so we wouldn't be able to restart scanning from it.
 914  * 2) The snapshot that was in the last sync'ed dsl_scan_phys_t having been
 915  *      superseded by a newer snapshot.
 916  * 3) The dataset that was in the last sync'ed dsl_scan_phys_t having been
 917  *      swapped with its clone.
 918  * In all cases, a cached sync simply rewrites the last record we've written,
 919  * just slightly modified. For the modifications that are performed to the
 920  * last written dsl_scan_phys_t, see dsl_scan_ds_destroyed,
 921  * dsl_scan_ds_snapshotted and dsl_scan_ds_clone_swapped.
 922  */
 923 static void
 924 dsl_scan_sync_state(dsl_scan_t *scn, dmu_tx_t *tx, state_sync_type_t sync_type)
 925 {
 926         mutex_enter(&scn->scn_status_lock);
 927         ASSERT(sync_type != SYNC_MANDATORY || scn->scn_bytes_pending == 0);
 928         if (scn->scn_bytes_pending == 0) {
 929                 if (scn->scn_phys.scn_queue_obj != 0)
 930                         scan_ds_queue_sync(scn, tx);
 931                 VERIFY0(zap_update(scn->scn_dp->dp_meta_objset,
 932                     DMU_POOL_DIRECTORY_OBJECT,
 933                     DMU_POOL_SCAN, sizeof (uint64_t), SCAN_PHYS_NUMINTS,
 934                     &scn->scn_phys, tx));
 935                 bcopy(&scn->scn_phys, &scn->scn_phys_cached,
 936                     sizeof (scn->scn_phys));
 937                 scn->scn_checkpointing = B_FALSE;
 938                 scn->scn_last_checkpoint = ddi_get_lbolt();
 939         } else if (sync_type == SYNC_CACHED) {
 940                 VERIFY0(zap_update(scn->scn_dp->dp_meta_objset,
 941                     DMU_POOL_DIRECTORY_OBJECT,
 942                     DMU_POOL_SCAN, sizeof (uint64_t), SCAN_PHYS_NUMINTS,
 943                     &scn->scn_phys_cached, tx));
 944         }
 945         mutex_exit(&scn->scn_status_lock);
 946 }
 947 
 948 static boolean_t
 949 dsl_scan_check_suspend(dsl_scan_t *scn, const zbookmark_phys_t *zb)
 950 {
 951         /* we never skip user/group accounting objects */
 952         if (zb && (int64_t)zb->zb_object < 0)
 953                 return (B_FALSE);
 954 
 955         if (scn->scn_suspending)
 956                 return (B_TRUE); /* we're already suspending */
 957 
 958         if (!ZB_IS_ZERO(&scn->scn_phys.scn_bookmark))
 959                 return (B_FALSE); /* we're resuming */
 960 
 961         /* We only know how to resume from level-0 blocks. */
 962         if (zb && zb->zb_level != 0)
 963                 return (B_FALSE);
 964 
 965         /*
 966          * We suspend if:
 967          *  - we have scanned for the maximum time: an entire txg
 968          *    timeout (default 5 sec)
 969          *  or
 970          *  - we have scanned for at least the minimum time (default 1 sec
 971          *    for scrub, 3 sec for resilver), and either we have sufficient
 972          *    dirty data that we are starting to write more quickly
 973          *    (default 30%), or someone is explicitly waiting for this txg
 974          *    to complete.
 975          *  or
 976          *  - the spa is shutting down because this pool is being exported
 977          *    or the machine is rebooting.
 978          *  or
 979          *  - the scan queue has reached its memory use limit
 980          */
 981         int mintime = (scn->scn_phys.scn_func == POOL_SCAN_RESILVER) ?
 982             zfs_resilver_min_time_ms : zfs_scan_min_time_ms;
 983         uint64_t elapsed_nanosecs = gethrtime() - scn->scn_sync_start_time;
 984         int dirty_pct = scn->scn_dp->dp_dirty_total * 100 / zfs_dirty_data_max;
 985         if (elapsed_nanosecs / NANOSEC >= zfs_txg_timeout ||
 986             (NSEC2MSEC(elapsed_nanosecs) > mintime &&
 987             (txg_sync_waiting(scn->scn_dp) ||
 988             dirty_pct >= zfs_vdev_async_write_active_min_dirty_percent)) ||
 989             spa_shutting_down(scn->scn_dp->dp_spa) || scn->scn_clearing ||
 990             scan_io_queue_mem_lim(scn) == MEM_LIM_HARD) {
 991                 if (zb) {
 992                         DTRACE_PROBE1(scan_pause, zbookmark_phys_t *, zb);
 993                         dprintf("suspending at bookmark %llx/%llx/%llx/%llx\n",
 994                             (longlong_t)zb->zb_objset,
 995                             (longlong_t)zb->zb_object,
 996                             (longlong_t)zb->zb_level,
 997                             (longlong_t)zb->zb_blkid);
 998                         scn->scn_phys.scn_bookmark = *zb;
 999                 } else {
1000                         DTRACE_PROBE1(scan_pause_ddt, ddt_bookmark_t *,
1001                             &scn->scn_phys.scn_ddt_bookmark);
1002                         dprintf("pausing at DDT bookmark %llx/%llx/%llx/%llx\n",
1003                             (longlong_t)scn->scn_phys.scn_ddt_bookmark.
1004                             ddb_class,
1005                             (longlong_t)scn->scn_phys.scn_ddt_bookmark.
1006                             ddb_type,
1007                             (longlong_t)scn->scn_phys.scn_ddt_bookmark.
1008                             ddb_checksum,
1009                             (longlong_t)scn->scn_phys.scn_ddt_bookmark.
1010                             ddb_cursor);
1011                 }
1012                 dprintf("suspending at DDT bookmark %llx/%llx/%llx/%llx\n",
1013                     (longlong_t)scn->scn_phys.scn_ddt_bookmark.ddb_class,
1014                     (longlong_t)scn->scn_phys.scn_ddt_bookmark.ddb_type,
1015                     (longlong_t)scn->scn_phys.scn_ddt_bookmark.ddb_checksum,
1016                     (longlong_t)scn->scn_phys.scn_ddt_bookmark.ddb_cursor);
1017                 scn->scn_suspending = B_TRUE;
1018                 return (B_TRUE);
1019         }
1020         return (B_FALSE);
1021 }
1022 
1023 typedef struct zil_scan_arg {
1024         dsl_pool_t      *zsa_dp;
1025         zil_header_t    *zsa_zh;
1026 } zil_scan_arg_t;
1027 
1028 /* ARGSUSED */
1029 static int
1030 dsl_scan_zil_block(zilog_t *zilog, blkptr_t *bp, void *arg, uint64_t claim_txg)
1031 {
1032         zil_scan_arg_t *zsa = arg;
1033         dsl_pool_t *dp = zsa->zsa_dp;
1034         dsl_scan_t *scn = dp->dp_scan;
1035         zil_header_t *zh = zsa->zsa_zh;
1036         zbookmark_phys_t zb;
1037 
1038         if (BP_IS_HOLE(bp) || bp->blk_birth <= scn->scn_phys.scn_cur_min_txg)
1039                 return (0);
1040 
1041         /*
1042          * One block ("stubby") can be allocated a long time ago; we
1043          * want to visit that one because it has been allocated
1044          * (on-disk) even if it hasn't been claimed (even though for
1045          * scrub there's nothing to do to it).
1046          */
1047         if (claim_txg == 0 && bp->blk_birth >= spa_first_txg(dp->dp_spa))
1048                 return (0);
1049 
1050         SET_BOOKMARK(&zb, zh->zh_log.blk_cksum.zc_word[ZIL_ZC_OBJSET],
1051             ZB_ZIL_OBJECT, ZB_ZIL_LEVEL, bp->blk_cksum.zc_word[ZIL_ZC_SEQ]);
1052 
1053         VERIFY(0 == scan_funcs[scn->scn_phys.scn_func](dp, bp, &zb));
1054         return (0);
1055 }
1056 
1057 /* ARGSUSED */
1058 static int
1059 dsl_scan_zil_record(zilog_t *zilog, lr_t *lrc, void *arg, uint64_t claim_txg)
1060 {
1061         if (lrc->lrc_txtype == TX_WRITE) {
1062                 zil_scan_arg_t *zsa = arg;
1063                 dsl_pool_t *dp = zsa->zsa_dp;
1064                 dsl_scan_t *scn = dp->dp_scan;
1065                 zil_header_t *zh = zsa->zsa_zh;
1066                 lr_write_t *lr = (lr_write_t *)lrc;
1067                 blkptr_t *bp = &lr->lr_blkptr;
1068                 zbookmark_phys_t zb;
1069 
1070                 if (BP_IS_HOLE(bp) ||
1071                     bp->blk_birth <= scn->scn_phys.scn_cur_min_txg)
1072                         return (0);
1073 
1074                 /*
1075                  * birth can be < claim_txg if this record's txg is
1076                  * already txg sync'ed (but this log block contains
1077                  * other records that are not synced)
1078                  */
1079                 if (claim_txg == 0 || bp->blk_birth < claim_txg)
1080                         return (0);
1081 
1082                 SET_BOOKMARK(&zb, zh->zh_log.blk_cksum.zc_word[ZIL_ZC_OBJSET],
1083                     lr->lr_foid, ZB_ZIL_LEVEL,
1084                     lr->lr_offset / BP_GET_LSIZE(bp));
1085 
1086                 VERIFY(0 == scan_funcs[scn->scn_phys.scn_func](dp, bp, &zb));
1087         }
1088         return (0);
1089 }
1090 
1091 static void
1092 dsl_scan_zil(dsl_pool_t *dp, zil_header_t *zh)
1093 {
1094         uint64_t claim_txg = zh->zh_claim_txg;
1095         zil_scan_arg_t zsa = { dp, zh };
1096         zilog_t *zilog;
1097 
1098         /*
1099          * We only want to visit blocks that have been claimed but not yet
1100          * replayed (or, in read-only mode, blocks that *would* be claimed).
1101          */
1102         if (claim_txg == 0 && spa_writeable(dp->dp_spa))
1103                 return;
1104 
1105         zilog = zil_alloc(dp->dp_meta_objset, zh);
1106 
1107         (void) zil_parse(zilog, dsl_scan_zil_block, dsl_scan_zil_record, &zsa,
1108             claim_txg);
1109 
1110         zil_free(zilog);
1111 }
1112 
1113 /* ARGSUSED */
1114 static void
1115 dsl_scan_prefetch(dsl_scan_t *scn, arc_buf_t *buf, blkptr_t *bp,
1116     uint64_t objset, uint64_t object, uint64_t blkid)
1117 {
1118         zbookmark_phys_t czb;
1119         arc_flags_t flags = ARC_FLAG_NOWAIT | ARC_FLAG_PREFETCH;
1120 
1121         if (zfs_no_scrub_prefetch)
1122                 return;
1123 
1124         if (BP_IS_HOLE(bp) || bp->blk_birth <= scn->scn_phys.scn_min_txg ||
1125             (BP_GET_LEVEL(bp) == 0 && BP_GET_TYPE(bp) != DMU_OT_DNODE))
1126                 return;
1127 
1128         SET_BOOKMARK(&czb, objset, object, BP_GET_LEVEL(bp), blkid);
1129 
1130         (void) arc_read(scn->scn_zio_root, scn->scn_dp->dp_spa, bp,
1131             NULL, NULL, ZIO_PRIORITY_ASYNC_READ,
1132             ZIO_FLAG_CANFAIL | ZIO_FLAG_SCAN_THREAD, &flags, &czb);
1133 }
1134 
1135 static boolean_t
1136 dsl_scan_check_resume(dsl_scan_t *scn, const dnode_phys_t *dnp,
1137     const zbookmark_phys_t *zb)
1138 {
1139         /*
1140          * We never skip over user/group accounting objects (obj<0)
1141          */
1142         if (!ZB_IS_ZERO(&scn->scn_phys.scn_bookmark) &&
1143             (int64_t)zb->zb_object >= 0) {
1144                 /*
1145                  * If we already visited this bp & everything below (in
1146                  * a prior txg sync), don't bother doing it again.
1147                  */
1148                 if (zbookmark_subtree_completed(dnp, zb,
1149                     &scn->scn_phys.scn_bookmark))
1150                         return (B_TRUE);
1151 
1152                 /*
1153                  * If we found the block we're trying to resume from, or
1154                  * we went past it to a different object, zero it out to
1155                  * indicate that it's OK to start checking for suspending
1156                  * again.
1157                  */
1158                 if (bcmp(zb, &scn->scn_phys.scn_bookmark, sizeof (*zb)) == 0 ||
1159                     zb->zb_object > scn->scn_phys.scn_bookmark.zb_object) {
1160                         DTRACE_PROBE1(scan_resume, zbookmark_phys_t *, zb);
1161                         dprintf("resuming at %llx/%llx/%llx/%llx\n",
1162                             (longlong_t)zb->zb_objset,
1163                             (longlong_t)zb->zb_object,
1164                             (longlong_t)zb->zb_level,
1165                             (longlong_t)zb->zb_blkid);
1166                         bzero(&scn->scn_phys.scn_bookmark, sizeof (*zb));
1167                 }
1168         }
1169         return (B_FALSE);
1170 }
1171 
1172 /*
1173  * Return nonzero on i/o error.
1174  * Return new buf to write out in *bufp.
1175  */
1176 static int
1177 dsl_scan_recurse(dsl_scan_t *scn, dsl_dataset_t *ds, dmu_objset_type_t ostype,
1178     dnode_phys_t *dnp, const blkptr_t *bp,
1179     const zbookmark_phys_t *zb, dmu_tx_t *tx)
1180 {
1181         dsl_pool_t *dp = scn->scn_dp;
1182         int zio_flags = ZIO_FLAG_CANFAIL | ZIO_FLAG_SCAN_THREAD;
1183         int err;
1184 
1185         if (BP_GET_LEVEL(bp) > 0) {
1186                 arc_flags_t flags = ARC_FLAG_WAIT;
1187                 int i;
1188                 blkptr_t *cbp;
1189                 int epb = BP_GET_LSIZE(bp) >> SPA_BLKPTRSHIFT;
1190                 arc_buf_t *buf;
1191 
1192                 err = arc_read(NULL, dp->dp_spa, bp, arc_getbuf_func, &buf,
1193                     ZIO_PRIORITY_ASYNC_READ, zio_flags, &flags, zb);
1194                 if (err) {
1195                         atomic_inc_64(&scn->scn_phys.scn_errors);
1196                         return (err);
1197                 }
1198                 for (i = 0, cbp = buf->b_data; i < epb; i++, cbp++) {
1199                         dsl_scan_prefetch(scn, buf, cbp, zb->zb_objset,
1200                             zb->zb_object, zb->zb_blkid * epb + i);
1201                 }
1202                 for (i = 0, cbp = buf->b_data; i < epb; i++, cbp++) {
1203                         zbookmark_phys_t czb;
1204 
1205                         SET_BOOKMARK(&czb, zb->zb_objset, zb->zb_object,
1206                             zb->zb_level - 1,
1207                             zb->zb_blkid * epb + i);
1208                         dsl_scan_visitbp(cbp, &czb, dnp,
1209                             ds, scn, ostype, tx);
1210                 }
1211                 arc_buf_destroy(buf, &buf);
1212         } else if (BP_GET_TYPE(bp) == DMU_OT_DNODE) {
1213                 arc_flags_t flags = ARC_FLAG_WAIT;
1214                 dnode_phys_t *cdnp;
1215                 int i, j;
1216                 int epb = BP_GET_LSIZE(bp) >> DNODE_SHIFT;
1217                 arc_buf_t *buf;
1218 
1219                 err = arc_read(NULL, dp->dp_spa, bp, arc_getbuf_func, &buf,
1220                     ZIO_PRIORITY_ASYNC_READ, zio_flags, &flags, zb);
1221                 if (err) {
1222                         atomic_inc_64(&scn->scn_phys.scn_errors);
1223                         return (err);
1224                 }
1225                 for (i = 0, cdnp = buf->b_data; i < epb; i++, cdnp++) {
1226                         for (j = 0; j < cdnp->dn_nblkptr; j++) {
1227                                 blkptr_t *cbp = &cdnp->dn_blkptr[j];
1228                                 dsl_scan_prefetch(scn, buf, cbp,
1229                                     zb->zb_objset, zb->zb_blkid * epb + i, j);
1230                         }
1231                 }
1232                 for (i = 0, cdnp = buf->b_data; i < epb; i++, cdnp++) {
1233                         dsl_scan_visitdnode(scn, ds, ostype,
1234                             cdnp, zb->zb_blkid * epb + i, tx);
1235                 }
1236 
1237                 arc_buf_destroy(buf, &buf);
1238         } else if (BP_GET_TYPE(bp) == DMU_OT_OBJSET) {
1239                 arc_flags_t flags = ARC_FLAG_WAIT;
1240                 objset_phys_t *osp;
1241                 arc_buf_t *buf;
1242 
1243                 err = arc_read(NULL, dp->dp_spa, bp, arc_getbuf_func, &buf,
1244                     ZIO_PRIORITY_ASYNC_READ, zio_flags, &flags, zb);
1245                 if (err) {
1246                         atomic_inc_64(&scn->scn_phys.scn_errors);
1247                         return (err);
1248                 }
1249 
1250                 osp = buf->b_data;
1251 
1252                 dsl_scan_visitdnode(scn, ds, osp->os_type,
1253                     &osp->os_meta_dnode, DMU_META_DNODE_OBJECT, tx);
1254 
1255                 if (OBJSET_BUF_HAS_USERUSED(buf)) {
1256                         /*
1257                          * We also always visit user/group accounting
1258                          * objects, and never skip them, even if we are
1259                          * suspending.  This is necessary so that the space
1260                          * deltas from this txg get integrated.
1261                          */
1262                         dsl_scan_visitdnode(scn, ds, osp->os_type,
1263                             &osp->os_groupused_dnode,
1264                             DMU_GROUPUSED_OBJECT, tx);
1265                         dsl_scan_visitdnode(scn, ds, osp->os_type,
1266                             &osp->os_userused_dnode,
1267                             DMU_USERUSED_OBJECT, tx);
1268                 }
1269                 arc_buf_destroy(buf, &buf);
1270         }
1271 
1272         return (0);
1273 }
1274 
1275 static void
1276 dsl_scan_visitdnode(dsl_scan_t *scn, dsl_dataset_t *ds,
1277     dmu_objset_type_t ostype, dnode_phys_t *dnp,
1278     uint64_t object, dmu_tx_t *tx)
1279 {
1280         int j;
1281 
1282         for (j = 0; j < dnp->dn_nblkptr; j++) {
1283                 zbookmark_phys_t czb;
1284 
1285                 SET_BOOKMARK(&czb, ds ? ds->ds_object : 0, object,
1286                     dnp->dn_nlevels - 1, j);
1287                 dsl_scan_visitbp(&dnp->dn_blkptr[j],
1288                     &czb, dnp, ds, scn, ostype, tx);
1289         }
1290 
1291         if (dnp->dn_flags & DNODE_FLAG_SPILL_BLKPTR) {
1292                 zbookmark_phys_t czb;
1293                 SET_BOOKMARK(&czb, ds ? ds->ds_object : 0, object,
1294                     0, DMU_SPILL_BLKID);
1295                 dsl_scan_visitbp(&dnp->dn_spill,
1296                     &czb, dnp, ds, scn, ostype, tx);
1297         }
1298 }
1299 
1300 /*
1301  * The arguments are in this order because mdb can only print the
1302  * first 5; we want them to be useful.
1303  */
1304 static void
1305 dsl_scan_visitbp(blkptr_t *bp, const zbookmark_phys_t *zb,
1306     dnode_phys_t *dnp, dsl_dataset_t *ds, dsl_scan_t *scn,
1307     dmu_objset_type_t ostype, dmu_tx_t *tx)
1308 {
1309         dsl_pool_t *dp = scn->scn_dp;
1310         arc_buf_t *buf = NULL;
1311         blkptr_t bp_toread = *bp;
1312 
1313         /* ASSERT(pbuf == NULL || arc_released(pbuf)); */
1314 
1315         if (dsl_scan_check_suspend(scn, zb))
1316                 return;
1317 
1318         if (dsl_scan_check_resume(scn, dnp, zb))
1319                 return;
1320 
1321         if (BP_IS_HOLE(bp))
1322                 return;
1323 
1324         scn->scn_visited_this_txg++;
1325 
1326 #ifdef  _KERNEL
1327         DTRACE_PROBE7(scan_visitbp, blkptr_t *, bp, zbookmark_phys_t *, zb,
1328             dnode_phys_t *, dnp, dsl_dataset_t *, ds, dsl_scan_t *, scn,
1329             dmu_objset_type_t, ostype, dmu_tx_t *, tx);
1330 #endif  /* _KERNEL */
1331         dprintf_bp(bp,
1332             "visiting ds=%p/%llu zb=%llx/%llx/%llx/%llx bp=%p",
1333             ds, ds ? ds->ds_object : 0,
1334             zb->zb_objset, zb->zb_object, zb->zb_level, zb->zb_blkid,
1335             bp);
1336 
1337         if (bp->blk_birth <= scn->scn_phys.scn_cur_min_txg)
1338                 return;
1339 
1340         if (dsl_scan_recurse(scn, ds, ostype, dnp, &bp_toread, zb, tx) != 0)
1341                 return;
1342 
1343         /*
1344          * If dsl_scan_ddt() has already visited this block, it will have
1345          * already done any translations or scrubbing, so don't call the
1346          * callback again.
1347          */
1348         if (ddt_class_contains(dp->dp_spa,
1349             scn->scn_phys.scn_ddt_class_max, bp)) {
1350                 ASSERT(buf == NULL);
1351                 return;
1352         }
1353 
1354         /*
1355          * If this block is from the future (after cur_max_txg), then we
1356          * are doing this on behalf of a deleted snapshot, and we will
1357          * revisit the future block on the next pass of this dataset.
1358          * Don't scan it now unless we need to because something
1359          * under it was modified.
1360          */
1361         if (BP_PHYSICAL_BIRTH(bp) <= scn->scn_phys.scn_cur_max_txg) {
1362                 scan_funcs[scn->scn_phys.scn_func](dp, bp, zb);
1363         }
1364 }
1365 
1366 static void
1367 dsl_scan_visit_rootbp(dsl_scan_t *scn, dsl_dataset_t *ds, blkptr_t *bp,
1368     dmu_tx_t *tx)
1369 {
1370         zbookmark_phys_t zb;
1371 
1372         SET_BOOKMARK(&zb, ds ? ds->ds_object : DMU_META_OBJSET,
1373             ZB_ROOT_OBJECT, ZB_ROOT_LEVEL, ZB_ROOT_BLKID);
1374         dsl_scan_visitbp(bp, &zb, NULL,
1375             ds, scn, DMU_OST_NONE, tx);
1376 
1377         DTRACE_PROBE4(scan_finished, dsl_scan_t *, scn, dsl_dataset_t *, ds,
1378             blkptr_t *, bp, dmu_tx_t *, tx);
1379         dprintf_ds(ds, "finished scan%s", "");
1380 }
1381 
1382 static void
1383 ds_destroyed_scn_phys(dsl_dataset_t *ds, dsl_scan_phys_t *scn_phys)
1384 {
1385         if (scn_phys->scn_bookmark.zb_objset == ds->ds_object) {
1386                 if (ds->ds_is_snapshot) {
1387                         /*
1388                          * Note:
1389                          *  - scn_cur_{min,max}_txg stays the same.
1390                          *  - Setting the flag is not really necessary if
1391                          *    scn_cur_max_txg == scn_max_txg, because there
1392                          *    is nothing after this snapshot that we care
1393                          *    about.  However, we set it anyway and then
1394                          *    ignore it when we retraverse it in
1395                          *    dsl_scan_visitds().
1396                          */
1397                         scn_phys->scn_bookmark.zb_objset =
1398                             dsl_dataset_phys(ds)->ds_next_snap_obj;
1399                         zfs_dbgmsg("destroying ds %llu; currently traversing; "
1400                             "reset zb_objset to %llu",
1401                             (u_longlong_t)ds->ds_object,
1402                             (u_longlong_t)dsl_dataset_phys(ds)->
1403                             ds_next_snap_obj);
1404                         scn_phys->scn_flags |= DSF_VISIT_DS_AGAIN;
1405                 } else {
1406                         SET_BOOKMARK(&scn_phys->scn_bookmark,
1407                             ZB_DESTROYED_OBJSET, 0, 0, 0);
1408                         zfs_dbgmsg("destroying ds %llu; currently traversing; "
1409                             "reset bookmark to -1,0,0,0",
1410                             (u_longlong_t)ds->ds_object);
1411                 }
1412         }
1413 }
1414 
1415 /*
1416  * Invoked when a dataset is destroyed. We need to make sure that:
1417  *
1418  * 1) If it is the dataset that was currently being scanned, we write
1419  *      a new dsl_scan_phys_t and marking the objset reference in it
1420  *      as destroyed.
1421  * 2) Remove it from the work queue, if it was present.
1422  *
1423  * If the dataset was actually a snapshot, instead of marking the dataset
1424  * as destroyed, we instead substitute the next snapshot in line.
1425  */
1426 void
1427 dsl_scan_ds_destroyed(dsl_dataset_t *ds, dmu_tx_t *tx)
1428 {
1429         dsl_pool_t *dp = ds->ds_dir->dd_pool;
1430         dsl_scan_t *scn = dp->dp_scan;
1431         uint64_t mintxg;
1432 
1433         if (!dsl_scan_is_running(scn))
1434                 return;
1435 
1436         ds_destroyed_scn_phys(ds, &scn->scn_phys);
1437         ds_destroyed_scn_phys(ds, &scn->scn_phys_cached);
1438 
1439         if (scan_ds_queue_contains(scn, ds->ds_object, &mintxg)) {
1440                 scan_ds_queue_remove(scn, ds->ds_object);
1441                 if (ds->ds_is_snapshot) {
1442                         VERIFY0(scan_ds_queue_insert(scn,
1443                             dsl_dataset_phys(ds)->ds_next_snap_obj, mintxg));
1444                 }
1445         }
1446 
1447         if (zap_lookup_int_key(dp->dp_meta_objset, scn->scn_phys.scn_queue_obj,
1448             ds->ds_object, &mintxg) == 0) {
1449                 DTRACE_PROBE3(scan_ds_destroyed__in_queue,
1450                     dsl_scan_t *, scn, dsl_dataset_t *, ds, dmu_tx_t *, tx);
1451                 ASSERT3U(dsl_dataset_phys(ds)->ds_num_children, <=, 1);
1452                 VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
1453                     scn->scn_phys.scn_queue_obj, ds->ds_object, tx));
1454                 if (ds->ds_is_snapshot) {
1455                         /*
1456                          * We keep the same mintxg; it could be >
1457                          * ds_creation_txg if the previous snapshot was
1458                          * deleted too.
1459                          */
1460                         VERIFY(zap_add_int_key(dp->dp_meta_objset,
1461                             scn->scn_phys.scn_queue_obj,
1462                             dsl_dataset_phys(ds)->ds_next_snap_obj,
1463                             mintxg, tx) == 0);
1464                         zfs_dbgmsg("destroying ds %llu; in queue; "
1465                             "replacing with %llu",
1466                             (u_longlong_t)ds->ds_object,
1467                             (u_longlong_t)dsl_dataset_phys(ds)->
1468                             ds_next_snap_obj);
1469                 } else {
1470                         zfs_dbgmsg("destroying ds %llu; in queue; removing",
1471                             (u_longlong_t)ds->ds_object);
1472                 }
1473         }
1474 
1475         /*
1476          * dsl_scan_sync() should be called after this, and should sync
1477          * out our changed state, but just to be safe, do it here.
1478          */
1479         dsl_scan_sync_state(scn, tx, SYNC_CACHED);
1480 }
1481 
1482 static void
1483 ds_snapshotted_bookmark(dsl_dataset_t *ds, zbookmark_phys_t *scn_bookmark)
1484 {
1485         if (scn_bookmark->zb_objset == ds->ds_object) {
1486                 scn_bookmark->zb_objset =
1487                     dsl_dataset_phys(ds)->ds_prev_snap_obj;
1488                 zfs_dbgmsg("snapshotting ds %llu; currently traversing; "
1489                     "reset zb_objset to %llu",
1490                     (u_longlong_t)ds->ds_object,
1491                     (u_longlong_t)dsl_dataset_phys(ds)->ds_prev_snap_obj);
1492         }
1493 }
1494 
1495 /*
1496  * Called when a dataset is snapshotted. If we were currently traversing
1497  * this snapshot, we reset our bookmark to point at the newly created
1498  * snapshot. We also modify our work queue to remove the old snapshot and
1499  * replace with the new one.
1500  */
1501 void
1502 dsl_scan_ds_snapshotted(dsl_dataset_t *ds, dmu_tx_t *tx)
1503 {
1504         dsl_pool_t *dp = ds->ds_dir->dd_pool;
1505         dsl_scan_t *scn = dp->dp_scan;
1506         uint64_t mintxg;
1507 
1508         if (!dsl_scan_is_running(scn))
1509                 return;
1510 
1511         ASSERT(dsl_dataset_phys(ds)->ds_prev_snap_obj != 0);
1512 
1513         ds_snapshotted_bookmark(ds, &scn->scn_phys.scn_bookmark);
1514         ds_snapshotted_bookmark(ds, &scn->scn_phys_cached.scn_bookmark);
1515 
1516         if (scan_ds_queue_contains(scn, ds->ds_object, &mintxg)) {
1517                 scan_ds_queue_remove(scn, ds->ds_object);
1518                 VERIFY0(scan_ds_queue_insert(scn,
1519                     dsl_dataset_phys(ds)->ds_prev_snap_obj, mintxg));
1520         }
1521 
1522         if (zap_lookup_int_key(dp->dp_meta_objset, scn->scn_phys.scn_queue_obj,
1523             ds->ds_object, &mintxg) == 0) {
1524                 DTRACE_PROBE3(scan_ds_snapshotted__in_queue,
1525                     dsl_scan_t *, scn, dsl_dataset_t *, ds, dmu_tx_t *, tx);
1526 
1527                 VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
1528                     scn->scn_phys.scn_queue_obj, ds->ds_object, tx));
1529                 VERIFY(zap_add_int_key(dp->dp_meta_objset,
1530                     scn->scn_phys.scn_queue_obj,
1531                     dsl_dataset_phys(ds)->ds_prev_snap_obj, mintxg, tx) == 0);
1532                 zfs_dbgmsg("snapshotting ds %llu; in queue; "
1533                     "replacing with %llu",
1534                     (u_longlong_t)ds->ds_object,
1535                     (u_longlong_t)dsl_dataset_phys(ds)->ds_prev_snap_obj);
1536         }
1537 
1538         dsl_scan_sync_state(scn, tx, SYNC_CACHED);
1539 }
1540 
1541 static void
1542 ds_clone_swapped_bookmark(dsl_dataset_t *ds1, dsl_dataset_t *ds2,
1543     zbookmark_phys_t *scn_bookmark)
1544 {
1545         if (scn_bookmark->zb_objset == ds1->ds_object) {
1546                 scn_bookmark->zb_objset = ds2->ds_object;
1547                 zfs_dbgmsg("clone_swap ds %llu; currently traversing; "
1548                     "reset zb_objset to %llu",
1549                     (u_longlong_t)ds1->ds_object,
1550                     (u_longlong_t)ds2->ds_object);
1551         } else if (scn_bookmark->zb_objset == ds2->ds_object) {
1552                 scn_bookmark->zb_objset = ds1->ds_object;
1553                 zfs_dbgmsg("clone_swap ds %llu; currently traversing; "
1554                     "reset zb_objset to %llu",
1555                     (u_longlong_t)ds2->ds_object,
1556                     (u_longlong_t)ds1->ds_object);
1557         }
1558 }
1559 
1560 /*
1561  * Called when a parent dataset and its clone are swapped. If we were
1562  * currently traversing the dataset, we need to switch to traversing the
1563  * newly promoted parent.
1564  */
1565 void
1566 dsl_scan_ds_clone_swapped(dsl_dataset_t *ds1, dsl_dataset_t *ds2, dmu_tx_t *tx)
1567 {
1568         dsl_pool_t *dp = ds1->ds_dir->dd_pool;
1569         dsl_scan_t *scn = dp->dp_scan;
1570         uint64_t mintxg;
1571 
1572         if (!dsl_scan_is_running(scn))
1573                 return;
1574 
1575         ds_clone_swapped_bookmark(ds1, ds2, &scn->scn_phys.scn_bookmark);
1576         ds_clone_swapped_bookmark(ds1, ds2, &scn->scn_phys_cached.scn_bookmark);
1577 
1578         if (scan_ds_queue_contains(scn, ds1->ds_object, &mintxg)) {
1579                 int err;
1580 
1581                 scan_ds_queue_remove(scn, ds1->ds_object);
1582                 err = scan_ds_queue_insert(scn, ds2->ds_object, mintxg);
1583                 VERIFY(err == 0 || err == EEXIST);
1584                 if (err == EEXIST) {
1585                         /* Both were there to begin with */
1586                         VERIFY0(scan_ds_queue_insert(scn, ds1->ds_object,
1587                             mintxg));
1588                 }
1589                 zfs_dbgmsg("clone_swap ds %llu; currently traversing; "
1590                     "reset zb_objset to %llu",
1591                     (u_longlong_t)ds1->ds_object,
1592                     (u_longlong_t)ds2->ds_object);
1593         } else if (scan_ds_queue_contains(scn, ds2->ds_object, &mintxg)) {
1594                 scan_ds_queue_remove(scn, ds2->ds_object);
1595                 VERIFY0(scan_ds_queue_insert(scn, ds1->ds_object, mintxg));
1596                 zfs_dbgmsg("clone_swap ds %llu; currently traversing; "
1597                     "reset zb_objset to %llu",
1598                     (u_longlong_t)ds2->ds_object,
1599                     (u_longlong_t)ds1->ds_object);
1600         }
1601 
1602         if (zap_lookup_int_key(dp->dp_meta_objset, scn->scn_phys.scn_queue_obj,
1603             ds1->ds_object, &mintxg) == 0) {
1604                 int err;
1605 
1606                 DTRACE_PROBE4(scan_ds_clone_swapped__in_queue_ds1,
1607                     dsl_scan_t *, scn, dsl_dataset_t *, ds1,
1608                     dsl_dataset_t *, ds2, dmu_tx_t *, tx);
1609                 ASSERT3U(mintxg, ==, dsl_dataset_phys(ds1)->ds_prev_snap_txg);
1610                 ASSERT3U(mintxg, ==, dsl_dataset_phys(ds2)->ds_prev_snap_txg);
1611                 VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
1612                     scn->scn_phys.scn_queue_obj, ds1->ds_object, tx));
1613                 err = zap_add_int_key(dp->dp_meta_objset,
1614                     scn->scn_phys.scn_queue_obj, ds2->ds_object, mintxg, tx);
1615                 VERIFY(err == 0 || err == EEXIST);
1616                 if (err == EEXIST) {
1617                         /* Both were there to begin with */
1618                         VERIFY(0 == zap_add_int_key(dp->dp_meta_objset,
1619                             scn->scn_phys.scn_queue_obj,
1620                             ds1->ds_object, mintxg, tx));
1621                 }
1622                 zfs_dbgmsg("clone_swap ds %llu; in queue; "
1623                     "replacing with %llu",
1624                     (u_longlong_t)ds1->ds_object,
1625                     (u_longlong_t)ds2->ds_object);
1626         } else if (zap_lookup_int_key(dp->dp_meta_objset,
1627             scn->scn_phys.scn_queue_obj, ds2->ds_object, &mintxg) == 0) {
1628                 DTRACE_PROBE4(scan_ds_clone_swapped__in_queue_ds2,
1629                     dsl_scan_t *, scn, dsl_dataset_t *, ds1,
1630                     dsl_dataset_t *, ds2, dmu_tx_t *, tx);
1631                 ASSERT3U(mintxg, ==, dsl_dataset_phys(ds1)->ds_prev_snap_txg);
1632                 ASSERT3U(mintxg, ==, dsl_dataset_phys(ds2)->ds_prev_snap_txg);
1633                 VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
1634                     scn->scn_phys.scn_queue_obj, ds2->ds_object, tx));
1635                 VERIFY(0 == zap_add_int_key(dp->dp_meta_objset,
1636                     scn->scn_phys.scn_queue_obj, ds1->ds_object, mintxg, tx));
1637                 zfs_dbgmsg("clone_swap ds %llu; in queue; "
1638                     "replacing with %llu",
1639                     (u_longlong_t)ds2->ds_object,
1640                     (u_longlong_t)ds1->ds_object);
1641         }
1642 
1643         dsl_scan_sync_state(scn, tx, SYNC_CACHED);
1644 }
1645 
1646 /* ARGSUSED */
1647 static int
1648 enqueue_clones_cb(dsl_pool_t *dp, dsl_dataset_t *hds, void *arg)
1649 {
1650         uint64_t originobj = *(uint64_t *)arg;
1651         dsl_dataset_t *ds;
1652         int err;
1653         dsl_scan_t *scn = dp->dp_scan;
1654 
1655         if (dsl_dir_phys(hds->ds_dir)->dd_origin_obj != originobj)
1656                 return (0);
1657 
1658         err = dsl_dataset_hold_obj(dp, hds->ds_object, FTAG, &ds);
1659         if (err)
1660                 return (err);
1661 
1662         while (dsl_dataset_phys(ds)->ds_prev_snap_obj != originobj) {
1663                 dsl_dataset_t *prev;
1664                 err = dsl_dataset_hold_obj(dp,
1665                     dsl_dataset_phys(ds)->ds_prev_snap_obj, FTAG, &prev);
1666 
1667                 dsl_dataset_rele(ds, FTAG);
1668                 if (err)
1669                         return (err);
1670                 ds = prev;
1671         }
1672         VERIFY0(scan_ds_queue_insert(scn, ds->ds_object,
1673             dsl_dataset_phys(ds)->ds_prev_snap_txg));
1674         dsl_dataset_rele(ds, FTAG);
1675         return (0);
1676 }
1677 
1678 static void
1679 dsl_scan_visitds(dsl_scan_t *scn, uint64_t dsobj, dmu_tx_t *tx)
1680 {
1681         dsl_pool_t *dp = scn->scn_dp;
1682         dsl_dataset_t *ds;
1683         objset_t *os;
1684 
1685         VERIFY3U(0, ==, dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds));
1686 
1687         if (scn->scn_phys.scn_cur_min_txg >=
1688             scn->scn_phys.scn_max_txg) {
1689                 /*
1690                  * This can happen if this snapshot was created after the
1691                  * scan started, and we already completed a previous snapshot
1692                  * that was created after the scan started.  This snapshot
1693                  * only references blocks with:
1694                  *
1695                  *      birth < our ds_creation_txg
1696                  *      cur_min_txg is no less than ds_creation_txg.
1697                  *      We have already visited these blocks.
1698                  * or
1699                  *      birth > scn_max_txg
1700                  *      The scan requested not to visit these blocks.
1701                  *
1702                  * Subsequent snapshots (and clones) can reference our
1703                  * blocks, or blocks with even higher birth times.
1704                  * Therefore we do not need to visit them either,
1705                  * so we do not add them to the work queue.
1706                  *
1707                  * Note that checking for cur_min_txg >= cur_max_txg
1708                  * is not sufficient, because in that case we may need to
1709                  * visit subsequent snapshots.  This happens when min_txg > 0,
1710                  * which raises cur_min_txg.  In this case we will visit
1711                  * this dataset but skip all of its blocks, because the
1712                  * rootbp's birth time is < cur_min_txg.  Then we will
1713                  * add the next snapshots/clones to the work queue.
1714                  */
1715                 char *dsname = kmem_alloc(MAXNAMELEN, KM_SLEEP);
1716                 dsl_dataset_name(ds, dsname);
1717                 zfs_dbgmsg("scanning dataset %llu (%s) is unnecessary because "
1718                     "cur_min_txg (%llu) >= max_txg (%llu)",
1719                     dsobj, dsname,
1720                     scn->scn_phys.scn_cur_min_txg,
1721                     scn->scn_phys.scn_max_txg);
1722                 kmem_free(dsname, MAXNAMELEN);
1723 
1724                 goto out;
1725         }
1726 
1727         if (dmu_objset_from_ds(ds, &os))
1728                 goto out;
1729 
1730         /*
1731          * Only the ZIL in the head (non-snapshot) is valid.  Even though
1732          * snapshots can have ZIL block pointers (which may be the same
1733          * BP as in the head), they must be ignored.  So we traverse the
1734          * ZIL here, rather than in scan_recurse(), because the regular
1735          * snapshot block-sharing rules don't apply to it.
1736          */
1737         if (DSL_SCAN_IS_SCRUB_RESILVER(scn) && !ds->ds_is_snapshot)
1738                 dsl_scan_zil(dp, &os->os_zil_header);
1739 
1740         /*
1741          * Iterate over the bps in this ds.
1742          */
1743         dmu_buf_will_dirty(ds->ds_dbuf, tx);
1744         rrw_enter(&ds->ds_bp_rwlock, RW_READER, FTAG);
1745         dsl_scan_visit_rootbp(scn, ds, &dsl_dataset_phys(ds)->ds_bp, tx);
1746         rrw_exit(&ds->ds_bp_rwlock, FTAG);
1747 
1748         char *dsname = kmem_alloc(ZFS_MAX_DATASET_NAME_LEN, KM_SLEEP);
1749         dsl_dataset_name(ds, dsname);
1750         zfs_dbgmsg("scanned dataset %llu (%s) with min=%llu max=%llu; "
1751             "suspending=%u",
1752             (longlong_t)dsobj, dsname,
1753             (longlong_t)scn->scn_phys.scn_cur_min_txg,
1754             (longlong_t)scn->scn_phys.scn_cur_max_txg,
1755             (int)scn->scn_suspending);
1756         kmem_free(dsname, ZFS_MAX_DATASET_NAME_LEN);
1757 
1758         DTRACE_PROBE3(scan_done, dsl_scan_t *, scn, dsl_dataset_t *, ds,
1759             dmu_tx_t *, tx);
1760 
1761         if (scn->scn_suspending)
1762                 goto out;
1763 
1764         /*
1765          * We've finished this pass over this dataset.
1766          */
1767 
1768         /*
1769          * If we did not completely visit this dataset, do another pass.
1770          */
1771         if (scn->scn_phys.scn_flags & DSF_VISIT_DS_AGAIN) {
1772                 DTRACE_PROBE3(scan_incomplete, dsl_scan_t *, scn,
1773                     dsl_dataset_t *, ds, dmu_tx_t *, tx);
1774                 zfs_dbgmsg("incomplete pass; visiting again");
1775                 scn->scn_phys.scn_flags &= ~DSF_VISIT_DS_AGAIN;
1776                 VERIFY0(scan_ds_queue_insert(scn, ds->ds_object,
1777                     scn->scn_phys.scn_cur_max_txg));
1778                 goto out;
1779         }
1780 
1781         /*
1782          * Add descendent datasets to work queue.
1783          */
1784         if (dsl_dataset_phys(ds)->ds_next_snap_obj != 0) {
1785                 VERIFY0(scan_ds_queue_insert(scn,
1786                     dsl_dataset_phys(ds)->ds_next_snap_obj,
1787                     dsl_dataset_phys(ds)->ds_creation_txg));
1788         }
1789         if (dsl_dataset_phys(ds)->ds_num_children > 1) {
1790                 boolean_t usenext = B_FALSE;
1791                 if (dsl_dataset_phys(ds)->ds_next_clones_obj != 0) {
1792                         uint64_t count;
1793                         /*
1794                          * A bug in a previous version of the code could
1795                          * cause upgrade_clones_cb() to not set
1796                          * ds_next_snap_obj when it should, leading to a
1797                          * missing entry.  Therefore we can only use the
1798                          * next_clones_obj when its count is correct.
1799                          */
1800                         int err = zap_count(dp->dp_meta_objset,
1801                             dsl_dataset_phys(ds)->ds_next_clones_obj, &count);
1802                         if (err == 0 &&
1803                             count == dsl_dataset_phys(ds)->ds_num_children - 1)
1804                                 usenext = B_TRUE;
1805                 }
1806 
1807                 if (usenext) {
1808                         zap_cursor_t zc;
1809                         zap_attribute_t za;
1810                         for (zap_cursor_init(&zc, dp->dp_meta_objset,
1811                             dsl_dataset_phys(ds)->ds_next_clones_obj);
1812                             zap_cursor_retrieve(&zc, &za) == 0;
1813                             (void) zap_cursor_advance(&zc)) {
1814                                 VERIFY0(scan_ds_queue_insert(scn,
1815                                     zfs_strtonum(za.za_name, NULL),
1816                                     dsl_dataset_phys(ds)->ds_creation_txg));
1817                         }
1818                         zap_cursor_fini(&zc);
1819                 } else {
1820                         VERIFY0(dmu_objset_find_dp(dp, dp->dp_root_dir_obj,
1821                             enqueue_clones_cb, &ds->ds_object,
1822                             DS_FIND_CHILDREN));
1823                 }
1824         }
1825 
1826 out:
1827         dsl_dataset_rele(ds, FTAG);
1828 }
1829 
1830 /* ARGSUSED */
1831 static int
1832 enqueue_cb(dsl_pool_t *dp, dsl_dataset_t *hds, void *arg)
1833 {
1834         dsl_dataset_t *ds;
1835         int err;
1836         dsl_scan_t *scn = dp->dp_scan;
1837 
1838         err = dsl_dataset_hold_obj(dp, hds->ds_object, FTAG, &ds);
1839         if (err)
1840                 return (err);
1841 
1842         while (dsl_dataset_phys(ds)->ds_prev_snap_obj != 0) {
1843                 dsl_dataset_t *prev;
1844                 err = dsl_dataset_hold_obj(dp,
1845                     dsl_dataset_phys(ds)->ds_prev_snap_obj, FTAG, &prev);
1846                 if (err) {
1847                         dsl_dataset_rele(ds, FTAG);
1848                         return (err);
1849                 }
1850 
1851                 /*
1852                  * If this is a clone, we don't need to worry about it for now.
1853                  */
1854                 if (dsl_dataset_phys(prev)->ds_next_snap_obj != ds->ds_object) {
1855                         dsl_dataset_rele(ds, FTAG);
1856                         dsl_dataset_rele(prev, FTAG);
1857                         return (0);
1858                 }
1859                 dsl_dataset_rele(ds, FTAG);
1860                 ds = prev;
1861         }
1862 
1863         VERIFY0(scan_ds_queue_insert(scn, ds->ds_object,
1864             dsl_dataset_phys(ds)->ds_prev_snap_txg));
1865         dsl_dataset_rele(ds, FTAG);
1866         return (0);
1867 }
1868 
1869 /*
1870  * Scrub/dedup interaction.
1871  *
1872  * If there are N references to a deduped block, we don't want to scrub it
1873  * N times -- ideally, we should scrub it exactly once.
1874  *
1875  * We leverage the fact that the dde's replication class (enum ddt_class)
1876  * is ordered from highest replication class (DDT_CLASS_DITTO) to lowest
1877  * (DDT_CLASS_UNIQUE) so that we may walk the DDT in that order.
1878  *
1879  * To prevent excess scrubbing, the scrub begins by walking the DDT
1880  * to find all blocks with refcnt > 1, and scrubs each of these once.
1881  * Since there are two replication classes which contain blocks with
1882  * refcnt > 1, we scrub the highest replication class (DDT_CLASS_DITTO) first.
1883  * Finally the top-down scrub begins, only visiting blocks with refcnt == 1.
1884  *
1885  * There would be nothing more to say if a block's refcnt couldn't change
1886  * during a scrub, but of course it can so we must account for changes
1887  * in a block's replication class.
1888  *
1889  * Here's an example of what can occur:
1890  *
1891  * If a block has refcnt > 1 during the DDT scrub phase, but has refcnt == 1
1892  * when visited during the top-down scrub phase, it will be scrubbed twice.
1893  * This negates our scrub optimization, but is otherwise harmless.
1894  *
1895  * If a block has refcnt == 1 during the DDT scrub phase, but has refcnt > 1
1896  * on each visit during the top-down scrub phase, it will never be scrubbed.
1897  * To catch this, ddt_sync_entry() notifies the scrub code whenever a block's
1898  * reference class transitions to a higher level (i.e DDT_CLASS_UNIQUE to
1899  * DDT_CLASS_DUPLICATE); if it transitions from refcnt == 1 to refcnt > 1
1900  * while a scrub is in progress, it scrubs the block right then.
1901  */
1902 static void
1903 dsl_scan_ddt(dsl_scan_t *scn, dmu_tx_t *tx)
1904 {
1905         ddt_bookmark_t *ddb = &scn->scn_phys.scn_ddt_bookmark;
1906         ddt_entry_t dde = { 0 };
1907         int error;
1908         uint64_t n = 0;
1909 
1910         while ((error = ddt_walk(scn->scn_dp->dp_spa, ddb, &dde)) == 0) {
1911                 ddt_t *ddt;
1912 
1913                 if (ddb->ddb_class > scn->scn_phys.scn_ddt_class_max)
1914                         break;
1915                 DTRACE_PROBE1(scan_ddb, ddt_bookmark_t *, ddb);
1916                 dprintf("visiting ddb=%llu/%llu/%llu/%llx\n",
1917                     (longlong_t)ddb->ddb_class,
1918                     (longlong_t)ddb->ddb_type,
1919                     (longlong_t)ddb->ddb_checksum,
1920                     (longlong_t)ddb->ddb_cursor);
1921 
1922                 /* There should be no pending changes to the dedup table */
1923                 ddt = scn->scn_dp->dp_spa->spa_ddt[ddb->ddb_checksum];
1924 #ifdef ZFS_DEBUG
1925                 for (uint_t i = 0; i < DDT_HASHSZ; i++)
1926                         ASSERT(avl_first(&ddt->ddt_tree[i]) == NULL);
1927 #endif
1928                 dsl_scan_ddt_entry(scn, ddb->ddb_checksum, &dde, tx);
1929                 n++;
1930 
1931                 if (dsl_scan_check_suspend(scn, NULL))
1932                         break;
1933         }
1934 
1935         DTRACE_PROBE2(scan_ddt_done, dsl_scan_t *, scn, uint64_t, n);
1936         zfs_dbgmsg("scanned %llu ddt entries with class_max = %u; "
1937             "suspending=%u", (longlong_t)n,
1938             (int)scn->scn_phys.scn_ddt_class_max, (int)scn->scn_suspending);
1939 
1940         ASSERT(error == 0 || error == ENOENT);
1941         ASSERT(error != ENOENT ||
1942             ddb->ddb_class > scn->scn_phys.scn_ddt_class_max);
1943 }
1944 
1945 /* ARGSUSED */
1946 void
1947 dsl_scan_ddt_entry(dsl_scan_t *scn, enum zio_checksum checksum,
1948     ddt_entry_t *dde, dmu_tx_t *tx)
1949 {
1950         const ddt_key_t *ddk = &dde->dde_key;
1951         ddt_phys_t *ddp = dde->dde_phys;
1952         blkptr_t bp;
1953         zbookmark_phys_t zb = { 0 };
1954 
1955         if (scn->scn_phys.scn_state != DSS_SCANNING)
1956                 return;
1957 
1958         for (int p = 0; p < DDT_PHYS_TYPES; p++, ddp++) {
1959                 if (ddp->ddp_phys_birth == 0 ||
1960                     ddp->ddp_phys_birth > scn->scn_phys.scn_max_txg)
1961                         continue;
1962                 ddt_bp_create(checksum, ddk, ddp, &bp);
1963 
1964                 scn->scn_visited_this_txg++;
1965                 scan_funcs[scn->scn_phys.scn_func](scn->scn_dp, &bp, &zb);
1966         }
1967 }
1968 
1969 static void
1970 dsl_scan_visit(dsl_scan_t *scn, dmu_tx_t *tx)
1971 {
1972         dsl_pool_t *dp = scn->scn_dp;
1973         uint64_t dsobj, txg;
1974 
1975         if (scn->scn_phys.scn_ddt_bookmark.ddb_class <=
1976             scn->scn_phys.scn_ddt_class_max) {
1977                 scn->scn_phys.scn_cur_min_txg = scn->scn_phys.scn_min_txg;
1978                 scn->scn_phys.scn_cur_max_txg = scn->scn_phys.scn_max_txg;
1979                 dsl_scan_ddt(scn, tx);
1980                 if (scn->scn_suspending)
1981                         return;
1982         }
1983 
1984         if (scn->scn_phys.scn_bookmark.zb_objset == DMU_META_OBJSET) {
1985                 /* First do the MOS & ORIGIN */
1986 
1987                 scn->scn_phys.scn_cur_min_txg = scn->scn_phys.scn_min_txg;
1988                 scn->scn_phys.scn_cur_max_txg = scn->scn_phys.scn_max_txg;
1989                 dsl_scan_visit_rootbp(scn, NULL,
1990                     &dp->dp_meta_rootbp, tx);
1991                 spa_set_rootblkptr(dp->dp_spa, &dp->dp_meta_rootbp);
1992                 if (scn->scn_suspending)
1993                         return;
1994 
1995                 if (spa_version(dp->dp_spa) < SPA_VERSION_DSL_SCRUB) {
1996                         VERIFY0(dmu_objset_find_dp(dp, dp->dp_root_dir_obj,
1997                             enqueue_cb, NULL, DS_FIND_CHILDREN));
1998                 } else {
1999                         dsl_scan_visitds(scn,
2000                             dp->dp_origin_snap->ds_object, tx);
2001                 }
2002                 ASSERT(!scn->scn_suspending);
2003         } else if (scn->scn_phys.scn_bookmark.zb_objset !=
2004             ZB_DESTROYED_OBJSET) {
2005                 uint64_t dsobj = scn->scn_phys.scn_bookmark.zb_objset;
2006                 /*
2007                  * If we were suspended, continue from here.  Note if the
2008                  * ds we were suspended on was deleted, the zb_objset may
2009                  * be -1, so we will skip this and find a new objset
2010                  * below.
2011                  */
2012                 dsl_scan_visitds(scn, dsobj, tx);
2013                 if (scn->scn_suspending)
2014                         return;
2015         }
2016 
2017         /*
2018          * In case we were suspended right at the end of the ds, zero the
2019          * bookmark so we don't think that we're still trying to resume.
2020          */
2021         bzero(&scn->scn_phys.scn_bookmark, sizeof (zbookmark_phys_t));
2022 
2023         /* keep pulling things out of the zap-object-as-queue */
2024         while (scan_ds_queue_first(scn, &dsobj, &txg)) {
2025                 dsl_dataset_t *ds;
2026 
2027                 scan_ds_queue_remove(scn, dsobj);
2028 
2029                 /* Set up min/max txg */
2030                 VERIFY3U(0, ==, dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds));
2031                 if (txg != 0) {
2032                         scn->scn_phys.scn_cur_min_txg =
2033                             MAX(scn->scn_phys.scn_min_txg, txg);
2034                 } else {
2035                         scn->scn_phys.scn_cur_min_txg =
2036                             MAX(scn->scn_phys.scn_min_txg,
2037                             dsl_dataset_phys(ds)->ds_prev_snap_txg);
2038                 }
2039                 scn->scn_phys.scn_cur_max_txg = dsl_scan_ds_maxtxg(ds);
2040                 dsl_dataset_rele(ds, FTAG);
2041 
2042                 dsl_scan_visitds(scn, dsobj, tx);
2043                 if (scn->scn_suspending)
2044                         return;
2045         }
2046         /* No more objsets to fetch, we're done */
2047         scn->scn_phys.scn_bookmark.zb_objset = ZB_DESTROYED_OBJSET;
2048         ASSERT0(scn->scn_suspending);
2049 }
2050 
2051 static boolean_t
2052 dsl_scan_free_should_suspend(dsl_scan_t *scn)
2053 {
2054         uint64_t elapsed_nanosecs;
2055 
2056         if (zfs_recover)
2057                 return (B_FALSE);
2058 
2059         if (scn->scn_visited_this_txg >= zfs_free_max_blocks)
2060                 return (B_TRUE);
2061 
2062         elapsed_nanosecs = gethrtime() - scn->scn_sync_start_time;
2063         return (elapsed_nanosecs / NANOSEC > zfs_txg_timeout ||
2064             (NSEC2MSEC(elapsed_nanosecs) > zfs_free_min_time_ms &&
2065             txg_sync_waiting(scn->scn_dp)) ||
2066             spa_shutting_down(scn->scn_dp->dp_spa));
2067 }
2068 
2069 static int
2070 dsl_scan_free_block_cb(void *arg, const blkptr_t *bp, dmu_tx_t *tx)
2071 {
2072         dsl_scan_t *scn = arg;
2073 
2074         if (!scn->scn_is_bptree ||
2075             (BP_GET_LEVEL(bp) == 0 && BP_GET_TYPE(bp) != DMU_OT_OBJSET)) {
2076                 if (dsl_scan_free_should_suspend(scn))
2077                         return (SET_ERROR(ERESTART));
2078         }
2079 
2080         zio_nowait(zio_free_sync(scn->scn_zio_root, scn->scn_dp->dp_spa,
2081             dmu_tx_get_txg(tx), bp, 0));
2082         dsl_dir_diduse_space(tx->tx_pool->dp_free_dir, DD_USED_HEAD,
2083             -bp_get_dsize_sync(scn->scn_dp->dp_spa, bp),
2084             -BP_GET_PSIZE(bp), -BP_GET_UCSIZE(bp), tx);
2085         scn->scn_visited_this_txg++;
2086         return (0);
2087 }
2088 
2089 boolean_t
2090 dsl_scan_active(dsl_scan_t *scn)
2091 {
2092         spa_t *spa = scn->scn_dp->dp_spa;
2093         uint64_t used = 0, comp, uncomp;
2094 
2095         if (spa->spa_load_state != SPA_LOAD_NONE)
2096                 return (B_FALSE);
2097         if (spa_shutting_down(spa))
2098                 return (B_FALSE);
2099         if ((dsl_scan_is_running(scn) && !dsl_scan_is_paused_scrub(scn)) ||
2100             (scn->scn_async_destroying && !scn->scn_async_stalled))
2101                 return (B_TRUE);
2102 
2103         if (spa_version(scn->scn_dp->dp_spa) >= SPA_VERSION_DEADLISTS) {
2104                 (void) bpobj_space(&scn->scn_dp->dp_free_bpobj,
2105                     &used, &comp, &uncomp);
2106         }
2107         return (used != 0);
2108 }
2109 
2110 /* Called whenever a txg syncs. */
2111 void
2112 dsl_scan_sync(dsl_pool_t *dp, dmu_tx_t *tx)
2113 {
2114         dsl_scan_t *scn = dp->dp_scan;
2115         spa_t *spa = dp->dp_spa;
2116         int err = 0;
2117 
2118         /*
2119          * Check for scn_restart_txg before checking spa_load_state, so
2120          * that we can restart an old-style scan while the pool is being
2121          * imported (see dsl_scan_init).
2122          */
2123         if (dsl_scan_restarting(scn, tx)) {
2124                 pool_scan_func_t func = POOL_SCAN_SCRUB;
2125                 dsl_scan_done(scn, B_FALSE, tx);
2126                 if (vdev_resilver_needed(spa->spa_root_vdev, NULL, NULL))
2127                         func = POOL_SCAN_RESILVER;
2128                 zfs_dbgmsg("restarting scan func=%u txg=%llu",
2129                     func, tx->tx_txg);
2130                 dsl_scan_setup_sync(&func, tx);
2131         }
2132 
2133         /*
2134          * Only process scans in sync pass 1.
2135          */
2136         if (spa_sync_pass(dp->dp_spa) > 1)
2137                 return;
2138 
2139         /*
2140          * If the spa is shutting down, then stop scanning. This will
2141          * ensure that the scan does not dirty any new data during the
2142          * shutdown phase.
2143          */
2144         if (spa_shutting_down(spa))
2145                 return;
2146 
2147         /*
2148          * If the scan is inactive due to a stalled async destroy, try again.
2149          */
2150         if (!scn->scn_async_stalled && !dsl_scan_active(scn))
2151                 return;
2152 
2153         scn->scn_visited_this_txg = 0;
2154         scn->scn_suspending = B_FALSE;
2155         scn->scn_sync_start_time = gethrtime();
2156         spa->spa_scrub_active = B_TRUE;
2157 
2158         /*
2159          * First process the async destroys.  If we suspend, don't do
2160          * any scrubbing or resilvering.  This ensures that there are no
2161          * async destroys while we are scanning, so the scan code doesn't
2162          * have to worry about traversing it.  It is also faster to free the
2163          * blocks than to scrub them.
2164          */
2165         if (zfs_free_bpobj_enabled &&
2166             spa_version(dp->dp_spa) >= SPA_VERSION_DEADLISTS) {
2167                 scn->scn_is_bptree = B_FALSE;
2168                 scn->scn_zio_root = zio_root(dp->dp_spa, NULL,
2169                     NULL, ZIO_FLAG_MUSTSUCCEED);
2170                 err = bpobj_iterate(&dp->dp_free_bpobj,
2171                     dsl_scan_free_block_cb, scn, tx);
2172                 VERIFY3U(0, ==, zio_wait(scn->scn_zio_root));
2173 
2174                 if (err != 0 && err != ERESTART)
2175                         zfs_panic_recover("error %u from bpobj_iterate()", err);
2176         }
2177 
2178         if (err == 0 && spa_feature_is_active(spa, SPA_FEATURE_ASYNC_DESTROY)) {
2179                 ASSERT(scn->scn_async_destroying);
2180                 scn->scn_is_bptree = B_TRUE;
2181                 scn->scn_zio_root = zio_root(dp->dp_spa, NULL,
2182                     NULL, ZIO_FLAG_MUSTSUCCEED);
2183                 err = bptree_iterate(dp->dp_meta_objset,
2184                     dp->dp_bptree_obj, B_TRUE, dsl_scan_free_block_cb, scn, tx);
2185                 VERIFY0(zio_wait(scn->scn_zio_root));
2186 
2187                 if (err == EIO || err == ECKSUM) {
2188                         err = 0;
2189                 } else if (err != 0 && err != ERESTART) {
2190                         zfs_panic_recover("error %u from "
2191                             "traverse_dataset_destroyed()", err);
2192                 }
2193 
2194                 if (bptree_is_empty(dp->dp_meta_objset, dp->dp_bptree_obj)) {
2195                         /* finished; deactivate async destroy feature */
2196                         spa_feature_decr(spa, SPA_FEATURE_ASYNC_DESTROY, tx);
2197                         ASSERT(!spa_feature_is_active(spa,
2198                             SPA_FEATURE_ASYNC_DESTROY));
2199                         VERIFY0(zap_remove(dp->dp_meta_objset,
2200                             DMU_POOL_DIRECTORY_OBJECT,
2201                             DMU_POOL_BPTREE_OBJ, tx));
2202                         VERIFY0(bptree_free(dp->dp_meta_objset,
2203                             dp->dp_bptree_obj, tx));
2204                         dp->dp_bptree_obj = 0;
2205                         scn->scn_async_destroying = B_FALSE;
2206                         scn->scn_async_stalled = B_FALSE;
2207                 } else {
2208                         /*
2209                          * If we didn't make progress, mark the async
2210                          * destroy as stalled, so that we will not initiate
2211                          * a spa_sync() on its behalf.  Note that we only
2212                          * check this if we are not finished, because if the
2213                          * bptree had no blocks for us to visit, we can
2214                          * finish without "making progress".
2215                          */
2216                         scn->scn_async_stalled =
2217                             (scn->scn_visited_this_txg == 0);
2218                 }
2219         }
2220         if (scn->scn_visited_this_txg) {
2221                 zfs_dbgmsg("freed %llu blocks in %llums from "
2222                     "free_bpobj/bptree txg %llu; err=%u",
2223                     (longlong_t)scn->scn_visited_this_txg,
2224                     (longlong_t)
2225                     NSEC2MSEC(gethrtime() - scn->scn_sync_start_time),
2226                     (longlong_t)tx->tx_txg, err);
2227                 scn->scn_visited_this_txg = 0;
2228 
2229                 /*
2230                  * Write out changes to the DDT that may be required as a
2231                  * result of the blocks freed.  This ensures that the DDT
2232                  * is clean when a scrub/resilver runs.
2233                  */
2234                 ddt_sync(spa, tx->tx_txg);
2235         }
2236         if (err != 0)
2237                 return;
2238         if (dp->dp_free_dir != NULL && !scn->scn_async_destroying &&
2239             zfs_free_leak_on_eio &&
2240             (dsl_dir_phys(dp->dp_free_dir)->dd_used_bytes != 0 ||
2241             dsl_dir_phys(dp->dp_free_dir)->dd_compressed_bytes != 0 ||
2242             dsl_dir_phys(dp->dp_free_dir)->dd_uncompressed_bytes != 0)) {
2243                 /*
2244                  * We have finished background destroying, but there is still
2245                  * some space left in the dp_free_dir. Transfer this leaked
2246                  * space to the dp_leak_dir.
2247                  */
2248                 if (dp->dp_leak_dir == NULL) {
2249                         rrw_enter(&dp->dp_config_rwlock, RW_WRITER, FTAG);
2250                         (void) dsl_dir_create_sync(dp, dp->dp_root_dir,
2251                             LEAK_DIR_NAME, tx);
2252                         VERIFY0(dsl_pool_open_special_dir(dp,
2253                             LEAK_DIR_NAME, &dp->dp_leak_dir));
2254                         rrw_exit(&dp->dp_config_rwlock, FTAG);
2255                 }
2256                 dsl_dir_diduse_space(dp->dp_leak_dir, DD_USED_HEAD,
2257                     dsl_dir_phys(dp->dp_free_dir)->dd_used_bytes,
2258                     dsl_dir_phys(dp->dp_free_dir)->dd_compressed_bytes,
2259                     dsl_dir_phys(dp->dp_free_dir)->dd_uncompressed_bytes, tx);
2260                 dsl_dir_diduse_space(dp->dp_free_dir, DD_USED_HEAD,
2261                     -dsl_dir_phys(dp->dp_free_dir)->dd_used_bytes,
2262                     -dsl_dir_phys(dp->dp_free_dir)->dd_compressed_bytes,
2263                     -dsl_dir_phys(dp->dp_free_dir)->dd_uncompressed_bytes, tx);
2264         }
2265         if (dp->dp_free_dir != NULL && !scn->scn_async_destroying) {
2266                 /* finished; verify that space accounting went to zero */
2267                 ASSERT0(dsl_dir_phys(dp->dp_free_dir)->dd_used_bytes);
2268                 ASSERT0(dsl_dir_phys(dp->dp_free_dir)->dd_compressed_bytes);
2269                 ASSERT0(dsl_dir_phys(dp->dp_free_dir)->dd_uncompressed_bytes);
2270         }
2271 
2272         if (!dsl_scan_is_running(scn))
2273                 return;
2274 
2275         if (!zfs_scan_direct) {
2276                 if (!scn->scn_is_sorted)
2277                         scn->scn_last_queue_run_time = 0;
2278                 scn->scn_is_sorted = B_TRUE;
2279         }
2280 
2281         if (scn->scn_done_txg == tx->tx_txg ||
2282             scn->scn_phys.scn_state == DSS_FINISHING) {
2283                 ASSERT(!scn->scn_suspending);
2284                 if (scn->scn_bytes_pending != 0) {
2285                         ASSERT(scn->scn_is_sorted);
2286                         scn->scn_phys.scn_state = DSS_FINISHING;
2287                         goto finish;
2288                 }
2289                 /* finished with scan. */
2290                 zfs_dbgmsg("txg %llu scan complete", tx->tx_txg);
2291                 dsl_scan_done(scn, B_TRUE, tx);
2292                 ASSERT3U(spa->spa_scrub_inflight, ==, 0);
2293                 dsl_scan_sync_state(scn, tx, SYNC_MANDATORY);
2294                 return;
2295         }
2296 
2297         if (dsl_scan_is_paused_scrub(scn))
2298                 return;
2299 
2300         if (scn->scn_phys.scn_ddt_bookmark.ddb_class <=
2301             scn->scn_phys.scn_ddt_class_max) {
2302                 zfs_dbgmsg("doing scan sync txg %llu; "
2303                     "ddt bm=%llu/%llu/%llu/%llx",
2304                     (longlong_t)tx->tx_txg,
2305                     (longlong_t)scn->scn_phys.scn_ddt_bookmark.ddb_class,
2306                     (longlong_t)scn->scn_phys.scn_ddt_bookmark.ddb_type,
2307                     (longlong_t)scn->scn_phys.scn_ddt_bookmark.ddb_checksum,
2308                     (longlong_t)scn->scn_phys.scn_ddt_bookmark.ddb_cursor);
2309                 ASSERT(scn->scn_phys.scn_bookmark.zb_objset == 0);
2310                 ASSERT(scn->scn_phys.scn_bookmark.zb_object == 0);
2311                 ASSERT(scn->scn_phys.scn_bookmark.zb_level == 0);
2312                 ASSERT(scn->scn_phys.scn_bookmark.zb_blkid == 0);
2313         } else {
2314                 zfs_dbgmsg("doing scan sync txg %llu; bm=%llu/%llu/%llu/%llu",
2315                     (longlong_t)tx->tx_txg,
2316                     (longlong_t)scn->scn_phys.scn_bookmark.zb_objset,
2317                     (longlong_t)scn->scn_phys.scn_bookmark.zb_object,
2318                     (longlong_t)scn->scn_phys.scn_bookmark.zb_level,
2319                     (longlong_t)scn->scn_phys.scn_bookmark.zb_blkid);
2320         }
2321 
2322         if (scn->scn_is_sorted) {
2323                 /*
2324                  * This is the out-of-order queue handling. We determine our
2325                  * memory usage and based on that switch states between normal
2326                  * operation (i.e. don't issue queued up I/O unless we've
2327                  * reached the end of scanning) and 'clearing' (issue queued
2328                  * extents just to clear up some memory).
2329                  */
2330                 mem_lim_t mlim = scan_io_queue_mem_lim(scn);
2331 
2332                 if (mlim == MEM_LIM_HARD && !scn->scn_clearing)
2333                         scn->scn_clearing = B_TRUE;
2334                 else if (mlim == MEM_LIM_NONE && scn->scn_clearing)
2335                         scn->scn_clearing = B_FALSE;
2336 
2337                 if ((scn->scn_checkpointing || ddi_get_lbolt() -
2338                     scn->scn_last_checkpoint > ZFS_SCAN_CHECKPOINT_INTVAL) &&
2339                     scn->scn_phys.scn_state != DSS_FINISHING &&
2340                     !scn->scn_clearing) {
2341                         scn->scn_checkpointing = B_TRUE;
2342                 }
2343         }
2344 
2345         if (!scn->scn_clearing && !scn->scn_checkpointing) {
2346                 scn->scn_zio_root = zio_root(dp->dp_spa, NULL,
2347                     NULL, ZIO_FLAG_CANFAIL);
2348                 dsl_pool_config_enter(dp, FTAG);
2349                 dsl_scan_visit(scn, tx);
2350                 dsl_pool_config_exit(dp, FTAG);
2351                 (void) zio_wait(scn->scn_zio_root);
2352                 scn->scn_zio_root = NULL;
2353 
2354                 zfs_dbgmsg("visited %llu blocks in %llums",
2355                     (longlong_t)scn->scn_visited_this_txg,
2356                     (longlong_t)NSEC2MSEC(gethrtime() -
2357                     scn->scn_sync_start_time));
2358 
2359                 if (!scn->scn_suspending) {
2360                         scn->scn_done_txg = tx->tx_txg + 1;
2361                         zfs_dbgmsg("txg %llu traversal complete, waiting "
2362                             "till txg %llu", tx->tx_txg, scn->scn_done_txg);
2363                 }
2364         }
2365         if (!scn->scn_suspending) {
2366                 scn->scn_done_txg = tx->tx_txg + 1;
2367                 zfs_dbgmsg("txg %llu traversal complete, waiting till txg %llu",
2368                     tx->tx_txg, scn->scn_done_txg);
2369         }
2370 finish:
2371         if (scn->scn_is_sorted) {
2372                 dsl_pool_config_enter(dp, FTAG);
2373                 scan_io_queues_run(scn);
2374                 dsl_pool_config_exit(dp, FTAG);
2375         }
2376 
2377         if (DSL_SCAN_IS_SCRUB_RESILVER(scn)) {
2378                 mutex_enter(&spa->spa_scrub_lock);
2379                 while (spa->spa_scrub_inflight > 0) {
2380                         cv_wait(&spa->spa_scrub_io_cv,
2381                             &spa->spa_scrub_lock);
2382                 }
2383                 mutex_exit(&spa->spa_scrub_lock);
2384         }
2385 
2386         dsl_scan_sync_state(scn, tx, SYNC_OPTIONAL);
2387 }
2388 
2389 /*
2390  * This will start a new scan, or restart an existing one.
2391  */
2392 void
2393 dsl_resilver_restart(dsl_pool_t *dp, uint64_t txg)
2394 {
2395         /* Stop any ongoing TRIMs */
2396         spa_man_trim_stop(dp->dp_spa);
2397 
2398         if (txg == 0) {
2399                 dmu_tx_t *tx;
2400                 tx = dmu_tx_create_dd(dp->dp_mos_dir);
2401                 VERIFY(0 == dmu_tx_assign(tx, TXG_WAIT));
2402 
2403                 txg = dmu_tx_get_txg(tx);
2404                 dp->dp_scan->scn_restart_txg = txg;
2405                 dmu_tx_commit(tx);
2406         } else {
2407                 dp->dp_scan->scn_restart_txg = txg;
2408         }
2409         zfs_dbgmsg("restarting resilver txg=%llu", txg);
2410 }
2411 
2412 boolean_t
2413 dsl_scan_resilvering(dsl_pool_t *dp)
2414 {
2415         return (dsl_scan_is_running(dp->dp_scan) &&
2416             dp->dp_scan->scn_phys.scn_func == POOL_SCAN_RESILVER);
2417 }
2418 
2419 /*
2420  * scrub consumers
2421  */
2422 
2423 static void
2424 count_block(dsl_scan_t *scn, zfs_all_blkstats_t *zab, const blkptr_t *bp)
2425 {
2426         int i;
2427 
2428         for (i = 0; i < BP_GET_NDVAS(bp); i++)
2429                 atomic_add_64(&scn->scn_bytes_issued,
2430                     DVA_GET_ASIZE(&bp->blk_dva[i]));
2431 
2432         /*
2433          * If we resume after a reboot, zab will be NULL; don't record
2434          * incomplete stats in that case.
2435          */
2436         if (zab == NULL)
2437                 return;
2438 
2439         for (i = 0; i < 4; i++) {
2440                 int l = (i < 2) ? BP_GET_LEVEL(bp) : DN_MAX_LEVELS;
2441                 int t = (i & 1) ? BP_GET_TYPE(bp) : DMU_OT_TOTAL;
2442                 if (t & DMU_OT_NEWTYPE)
2443                         t = DMU_OT_OTHER;
2444                 zfs_blkstat_t *zb = &zab->zab_type[l][t];
2445                 int equal;
2446 
2447                 zb->zb_count++;
2448                 zb->zb_asize += BP_GET_ASIZE(bp);
2449                 zb->zb_lsize += BP_GET_LSIZE(bp);
2450                 zb->zb_psize += BP_GET_PSIZE(bp);
2451                 zb->zb_gangs += BP_COUNT_GANG(bp);
2452 
2453                 switch (BP_GET_NDVAS(bp)) {
2454                 case 2:
2455                         if (DVA_GET_VDEV(&bp->blk_dva[0]) ==
2456                             DVA_GET_VDEV(&bp->blk_dva[1]))
2457                                 zb->zb_ditto_2_of_2_samevdev++;
2458                         break;
2459                 case 3:
2460                         equal = (DVA_GET_VDEV(&bp->blk_dva[0]) ==
2461                             DVA_GET_VDEV(&bp->blk_dva[1])) +
2462                             (DVA_GET_VDEV(&bp->blk_dva[0]) ==
2463                             DVA_GET_VDEV(&bp->blk_dva[2])) +
2464                             (DVA_GET_VDEV(&bp->blk_dva[1]) ==
2465                             DVA_GET_VDEV(&bp->blk_dva[2]));
2466                         if (equal == 1)
2467                                 zb->zb_ditto_2_of_3_samevdev++;
2468                         else if (equal == 3)
2469                                 zb->zb_ditto_3_of_3_samevdev++;
2470                         break;
2471                 }
2472         }
2473 }
2474 
2475 static void
2476 dsl_scan_scrub_done(zio_t *zio)
2477 {
2478         spa_t *spa = zio->io_spa;
2479 
2480         abd_free(zio->io_abd);
2481 
2482         mutex_enter(&spa->spa_scrub_lock);
2483         spa->spa_scrub_inflight--;
2484         cv_broadcast(&spa->spa_scrub_io_cv);
2485         mutex_exit(&spa->spa_scrub_lock);
2486 
2487         if (zio->io_error && (zio->io_error != ECKSUM ||
2488             !(zio->io_flags & ZIO_FLAG_SPECULATIVE))) {
2489                 atomic_inc_64(&spa->spa_dsl_pool->dp_scan->scn_phys.scn_errors);
2490                 DTRACE_PROBE1(scan_error, zio_t *, zio);
2491         }
2492 }
2493 
2494 static int
2495 dsl_scan_scrub_cb(dsl_pool_t *dp,
2496     const blkptr_t *bp, const zbookmark_phys_t *zb)
2497 {
2498         dsl_scan_t *scn = dp->dp_scan;
2499         spa_t *spa = dp->dp_spa;
2500         uint64_t phys_birth = BP_PHYSICAL_BIRTH(bp);
2501         boolean_t needs_io;
2502         int zio_flags = ZIO_FLAG_SCAN_THREAD | ZIO_FLAG_RAW | ZIO_FLAG_CANFAIL;
2503         boolean_t ignore_dva0;
2504 
2505         if (phys_birth <= scn->scn_phys.scn_min_txg ||
2506             phys_birth >= scn->scn_phys.scn_max_txg)
2507                 return (0);
2508 
2509         if (BP_IS_EMBEDDED(bp)) {
2510                 count_block(scn, dp->dp_blkstats, bp);
2511                 return (0);
2512         }
2513 
2514         ASSERT(DSL_SCAN_IS_SCRUB_RESILVER(scn));
2515         if (scn->scn_phys.scn_func == POOL_SCAN_SCRUB ||
2516             scn->scn_phys.scn_func == POOL_SCAN_MOS ||
2517             scn->scn_phys.scn_func == POOL_SCAN_META) {
2518                 zio_flags |= ZIO_FLAG_SCRUB;
2519                 needs_io = B_TRUE;
2520         } else {
2521                 ASSERT3U(scn->scn_phys.scn_func, ==, POOL_SCAN_RESILVER);
2522                 zio_flags |= ZIO_FLAG_RESILVER;
2523                 needs_io = B_FALSE;
2524         }
2525 
2526         /* If it's an intent log block, failure is expected. */
2527         if (zb->zb_level == ZB_ZIL_LEVEL)
2528                 zio_flags |= ZIO_FLAG_SPECULATIVE;
2529 
2530         if (scn->scn_phys.scn_func == POOL_SCAN_MOS)
2531                 needs_io = (zb->zb_objset == 0);
2532 
2533         if (scn->scn_phys.scn_func == POOL_SCAN_META)
2534                 needs_io = zb->zb_objset == 0 || BP_GET_LEVEL(bp) != 0 ||
2535                     DMU_OT_IS_METADATA(BP_GET_TYPE(bp));
2536 
2537         DTRACE_PROBE3(scan_needs_io, boolean_t, needs_io,
2538             const blkptr_t *, bp, spa_t *, spa);
2539 
2540         /*
2541          * WBC will invalidate DVA[0] after migrating the block to the main
2542          * pool. If the user subsequently disables WBC and removes the special
2543          * device, DVA[0] can now point to a hole vdev. We won't try to do
2544          * I/O to it, but we must also avoid doing DTL checks.
2545          */
2546         ignore_dva0 = (BP_IS_SPECIAL(bp) &&
2547             wbc_bp_is_migrated(spa_get_wbc_data(spa), bp));
2548 
2549         for (int d = 0; d < BP_GET_NDVAS(bp); d++) {
2550                 vdev_t *vd;
2551 
2552                 /*
2553                  * Keep track of how much data we've examined so that
2554                  * zpool(1M) status can make useful progress reports.
2555                  */
2556                 scn->scn_phys.scn_examined += DVA_GET_ASIZE(&bp->blk_dva[d]);
2557                 spa->spa_scan_pass_exam += DVA_GET_ASIZE(&bp->blk_dva[d]);
2558 
2559                 /* WBC-invalidated DVA post-migration, so skip it */
2560                 if (d == 0 && ignore_dva0)
2561                         continue;
2562                 vd = vdev_lookup_top(spa, DVA_GET_VDEV(&bp->blk_dva[d]));
2563 
2564                 /* if it's a resilver, this may not be in the target range */
2565                 if (!needs_io && scn->scn_phys.scn_func != POOL_SCAN_MOS &&
2566                     scn->scn_phys.scn_func != POOL_SCAN_META) {
2567                         if (DVA_GET_GANG(&bp->blk_dva[d])) {
2568                                 /*
2569                                  * Gang members may be spread across multiple
2570                                  * vdevs, so the best estimate we have is the
2571                                  * scrub range, which has already been checked.
2572                                  * XXX -- it would be better to change our
2573                                  * allocation policy to ensure that all
2574                                  * gang members reside on the same vdev.
2575                                  */
2576                                 needs_io = B_TRUE;
2577                                 DTRACE_PROBE2(gang_bp, const blkptr_t *, bp,
2578                                     spa_t *, spa);
2579                         } else {
2580                                 needs_io = vdev_dtl_contains(vd, DTL_PARTIAL,
2581                                     phys_birth, 1);
2582                                 if (needs_io)
2583                                         DTRACE_PROBE2(dtl, const blkptr_t *,
2584                                             bp, spa_t *, spa);
2585                         }
2586                 }
2587         }
2588 
2589         if (needs_io && !zfs_no_scrub_io) {
2590                 dsl_scan_enqueue(dp, bp, zio_flags, zb);
2591         } else {
2592                 count_block(scn, dp->dp_blkstats, bp);
2593         }
2594 
2595         /* do not relocate this block */
2596         return (0);
2597 }
2598 
2599 /*
2600  * Called by the ZFS_IOC_POOL_SCAN ioctl to start a scrub or resilver.
2601  * Can also be called to resume a paused scrub.
2602  */
2603 int
2604 dsl_scan(dsl_pool_t *dp, pool_scan_func_t func)
2605 {
2606         spa_t *spa = dp->dp_spa;
2607         dsl_scan_t *scn = dp->dp_scan;
2608 
2609         /*
2610          * Purge all vdev caches and probe all devices.  We do this here
2611          * rather than in sync context because this requires a writer lock
2612          * on the spa_config lock, which we can't do from sync context.  The
2613          * spa_scrub_reopen flag indicates that vdev_open() should not
2614          * attempt to start another scrub.
2615          */
2616         spa_vdev_state_enter(spa, SCL_NONE);
2617         spa->spa_scrub_reopen = B_TRUE;
2618         vdev_reopen(spa->spa_root_vdev);
2619         spa->spa_scrub_reopen = B_FALSE;
2620         (void) spa_vdev_state_exit(spa, NULL, 0);
2621 
2622         if (func == POOL_SCAN_SCRUB && dsl_scan_is_paused_scrub(scn)) {
2623                 /* got scrub start cmd, resume paused scrub */
2624                 int err = dsl_scrub_set_pause_resume(scn->scn_dp,
2625                     POOL_SCRUB_NORMAL);
2626                 if (err == 0)
2627                         return (ECANCELED);
2628 
2629                 return (SET_ERROR(err));
2630         }
2631 
2632         return (dsl_sync_task(spa_name(spa), dsl_scan_setup_check,
2633             dsl_scan_setup_sync, &func, 0, ZFS_SPACE_CHECK_NONE));
2634 }
2635 
2636 static boolean_t
2637 dsl_scan_restarting(dsl_scan_t *scn, dmu_tx_t *tx)
2638 {
2639         return (scn->scn_restart_txg != 0 &&
2640             scn->scn_restart_txg <= tx->tx_txg);
2641 }
2642 
2643 /*
2644  * Grand theory statement on scan queue sorting
2645  *
2646  * Scanning is implemented by recursively traversing all indirection levels
2647  * in an object and reading all blocks referenced from said objects. This
2648  * results in us approximately traversing the object from lowest logical
2649  * offset to the highest. Naturally, if we were simply read all blocks in
2650  * this order, we would require that the blocks be also physically arranged
2651  * in sort of a linear fashion on the vdevs. However, this is frequently
2652  * not the case on pools. So we instead stick the I/Os into a reordering
2653  * queue and issue them out of logical order and in a way that most benefits
2654  * physical disks (LBA-order).
2655  *
2656  * This sorting algorithm is subject to limitations. We can't do this with
2657  * blocks that are non-leaf, because the scanner itself depends on these
2658  * being available ASAP for further metadata traversal. So we exclude any
2659  * block that is bp_level > 0. Fortunately, this usually represents only
2660  * around 1% of our data volume, so no great loss.
2661  *
2662  * As a further limitation, we cannot sort blocks which have more than
2663  * one DVA present (copies > 1), because there's no sensible way to sort
2664  * these (how do you sort a queue based on multiple contradictory
2665  * criteria?). So we exclude those as well. Again, these are very rarely
2666  * used for leaf blocks, usually only on metadata.
2667  *
2668  * WBC consideration: we can't sort blocks which have not yet been fully
2669  * migrated to normal devices, because their data can reside purely on the
2670  * special device or on both normal and special. This would require larger
2671  * data structures to track both DVAs in our queues and we need the
2672  * smallest in-core structures we can possibly get to get good sorting
2673  * performance. Therefore, blocks which have not yet been fully migrated
2674  * out of the WBC are processed as non-sortable and issued immediately.
2675  *
2676  * Queue management:
2677  *
2678  * Ideally, we would want to scan all metadata and queue up all leaf block
2679  * I/O prior to starting to issue it, because that allows us to do an
2680  * optimal sorting job. This can however consume large amounts of memory.
2681  * Therefore we continuously monitor the size of the queues and constrain
2682  * them to 5% (zfs_scan_mem_lim_fact) of physmem. If the queues grow larger
2683  * than this limit, we clear out a few of the largest extents at the head
2684  * of the queues to make room for more scanning. Hopefully, these extents
2685  * will be fairly large and contiguous, allowing us to approach sequential
2686  * I/O throughput even without a fully sorted tree.
2687  *
2688  * Metadata scanning takes place in dsl_scan_visit(), which is called from
2689  * dsl_scan_sync() every spa_sync(). If we have either fully scanned all
2690  * metadata on the pool, or we need to make room in memory because our
2691  * queues are too large, dsl_scan_visit() is postponed and
2692  * scan_io_queues_run() is called from dsl_scan_sync() instead. That means,
2693  * metadata scanning and queued I/O issuing are mutually exclusive. This is
2694  * to provide maximum sequential I/O throughput for the queued I/O issue
2695  * process. Sequential I/O performance is significantly negatively impacted
2696  * if it is interleaved with random I/O.
2697  *
2698  * Backwards compatibility
2699  *
2700  * This new algorithm is backwards compatible with the legacy on-disk data
2701  * structures. If imported on a machine without the new sorting algorithm,
2702  * the scan simply resumes from the last checkpoint.
2703  */
2704 
2705 /*
2706  * Given a set of I/O parameters as discovered by the metadata traversal
2707  * process, attempts to place the I/O into the reordering queue (if
2708  * possible), or immediately executes the I/O. The check for whether an
2709  * I/O is suitable for sorting is performed here.
2710  */
2711 static void
2712 dsl_scan_enqueue(dsl_pool_t *dp, const blkptr_t *bp, int zio_flags,
2713     const zbookmark_phys_t *zb)
2714 {
2715         spa_t *spa = dp->dp_spa;
2716 
2717         ASSERT(!BP_IS_EMBEDDED(bp));
2718         if (!dp->dp_scan->scn_is_sorted || (BP_IS_SPECIAL(bp) &&
2719             !wbc_bp_is_migrated(spa_get_wbc_data(spa), bp))) {
2720                 scan_exec_io(dp, bp, zio_flags, zb, B_TRUE);
2721                 return;
2722         }
2723 
2724         for (int i = 0; i < BP_GET_NDVAS(bp); i++) {
2725                 dva_t dva;
2726                 vdev_t *vdev;
2727 
2728                 /* On special BPs we only support handling the normal DVA */
2729                 if (BP_IS_SPECIAL(bp) && i != WBC_NORMAL_DVA)
2730                         continue;
2731 
2732                 dva = bp->blk_dva[i];
2733                 vdev = vdev_lookup_top(spa, DVA_GET_VDEV(&dva));
2734                 ASSERT(vdev != NULL);
2735 
2736                 mutex_enter(&vdev->vdev_scan_io_queue_lock);
2737                 if (vdev->vdev_scan_io_queue == NULL)
2738                         vdev->vdev_scan_io_queue = scan_io_queue_create(vdev);
2739                 ASSERT(dp->dp_scan != NULL);
2740                 scan_io_queue_insert(dp->dp_scan, vdev->vdev_scan_io_queue, bp,
2741                     i, zio_flags, zb);
2742                 mutex_exit(&vdev->vdev_scan_io_queue_lock);
2743         }
2744 }
2745 
2746 /*
2747  * Given a scanning zio's information, executes the zio. The zio need
2748  * not necessarily be only sortable, this function simply executes the
2749  * zio, no matter what it is. The limit_inflight flag controls whether
2750  * we limit the number of concurrently executing scan zio's to
2751  * zfs_top_maxinflight times the number of top-level vdevs. This is
2752  * used during metadata discovery to pace the generation of I/O and
2753  * properly time the pausing of the scanning algorithm. The queue
2754  * processing part uses a different method of controlling timing and
2755  * so doesn't need this limit applied to its zio's.
2756  */
2757 static void
2758 scan_exec_io(dsl_pool_t *dp, const blkptr_t *bp, int zio_flags,
2759     const zbookmark_phys_t *zb, boolean_t limit_inflight)
2760 {
2761         spa_t *spa = dp->dp_spa;
2762         size_t size = BP_GET_PSIZE(bp);
2763         vdev_t *rvd = spa->spa_root_vdev;
2764         uint64_t maxinflight = rvd->vdev_children * zfs_top_maxinflight;
2765         dsl_scan_t *scn = dp->dp_scan;
2766         zio_priority_t prio;
2767 
2768         mutex_enter(&spa->spa_scrub_lock);
2769         while (limit_inflight && spa->spa_scrub_inflight >= maxinflight)
2770                 cv_wait(&spa->spa_scrub_io_cv, &spa->spa_scrub_lock);
2771         spa->spa_scrub_inflight++;
2772         mutex_exit(&spa->spa_scrub_lock);
2773 
2774         for (int i = 0; i < BP_GET_NDVAS(bp); i++)
2775                 atomic_add_64(&spa->spa_scan_pass_work,
2776                     DVA_GET_ASIZE(&bp->blk_dva[i]));
2777 
2778         count_block(dp->dp_scan, dp->dp_blkstats, bp);
2779         DTRACE_PROBE3(do_io, uint64_t, dp->dp_scan->scn_phys.scn_func,
2780             boolean_t, B_TRUE, spa_t *, spa);
2781         prio = (scn->scn_phys.scn_func == POOL_SCAN_RESILVER ?
2782             ZIO_PRIORITY_RESILVER : ZIO_PRIORITY_SCRUB);
2783         zio_nowait(zio_read(NULL, spa, bp, abd_alloc_for_io(size, B_FALSE),
2784             size, dsl_scan_scrub_done, NULL, prio, zio_flags, zb));
2785 }
2786 
2787 /*
2788  * Given all the info we got from our metadata scanning process, we
2789  * construct a scan_io_t and insert it into the scan sorting queue. The
2790  * I/O must already be suitable for us to process. This is controlled
2791  * by dsl_scan_enqueue().
2792  */
2793 static void
2794 scan_io_queue_insert(dsl_scan_t *scn, dsl_scan_io_queue_t *queue,
2795     const blkptr_t *bp, int dva_i, int zio_flags, const zbookmark_phys_t *zb)
2796 {
2797         scan_io_t *sio = kmem_zalloc(sizeof (*sio), KM_SLEEP);
2798         avl_index_t idx;
2799         uint64_t offset, asize;
2800 
2801         ASSERT(MUTEX_HELD(&queue->q_vd->vdev_scan_io_queue_lock));
2802 
2803         bp2sio(bp, sio, dva_i);
2804         sio->sio_flags = zio_flags;
2805         sio->sio_zb = *zb;
2806         offset = SCAN_IO_GET_OFFSET(sio);
2807         asize = sio->sio_asize;
2808 
2809         if (avl_find(&queue->q_zios_by_addr, sio, &idx) != NULL) {
2810                 /* block is already scheduled for reading */
2811                 kmem_free(sio, sizeof (*sio));
2812                 return;
2813         }
2814         avl_insert(&queue->q_zios_by_addr, sio, idx);
2815         atomic_add_64(&queue->q_zio_bytes, asize);
2816 
2817         /*
2818          * Increment the bytes pending counter now so that we can't
2819          * get an integer underflow in case the worker processes the
2820          * zio before we get to incrementing this counter.
2821          */
2822         mutex_enter(&scn->scn_status_lock);
2823         scn->scn_bytes_pending += asize;
2824         mutex_exit(&scn->scn_status_lock);
2825 
2826         range_tree_set_gap(queue->q_exts_by_addr, zfs_scan_max_ext_gap);
2827         range_tree_add_fill(queue->q_exts_by_addr, offset, asize, asize);
2828 }
2829 
2830 /* q_exts_by_addr segment add callback. */
2831 /*ARGSUSED*/
2832 static void
2833 scan_io_queue_insert_cb(range_tree_t *rt, range_seg_t *rs, void *arg)
2834 {
2835         dsl_scan_io_queue_t *queue = arg;
2836         avl_index_t idx;
2837         ASSERT(MUTEX_HELD(&queue->q_vd->vdev_scan_io_queue_lock));
2838         VERIFY3P(avl_find(&queue->q_exts_by_size, rs, &idx), ==, NULL);
2839         avl_insert(&queue->q_exts_by_size, rs, idx);
2840 }
2841 
2842 /* q_exts_by_addr segment remove callback. */
2843 /*ARGSUSED*/
2844 static void
2845 scan_io_queue_remove_cb(range_tree_t *rt, range_seg_t *rs, void *arg)
2846 {
2847         dsl_scan_io_queue_t *queue = arg;
2848         avl_remove(&queue->q_exts_by_size, rs);
2849 }
2850 
2851 /* q_exts_by_addr vacate callback. */
2852 /*ARGSUSED*/
2853 static void
2854 scan_io_queue_vacate_cb(range_tree_t *rt, void *arg)
2855 {
2856         dsl_scan_io_queue_t *queue = arg;
2857         void *cookie = NULL;
2858         while (avl_destroy_nodes(&queue->q_exts_by_size, &cookie) != NULL)
2859                 ;
2860 }
2861 
2862 /*
2863  * This is the primary extent sorting algorithm. We balance two parameters:
2864  * 1) how many bytes of I/O are in an extent
2865  * 2) how well the extent is filled with I/O (as a fraction of its total size)
2866  * Since we allow extents to have gaps between their constituent I/Os, it's
2867  * possible to have a fairly large extent that contains the same amount of
2868  * I/O bytes than a much smaller extent, which just packs the I/O more tightly.
2869  * The algorithm sorts based on a score calculated from the extent's size,
2870  * the relative fill volume (in %) and a "fill weight" parameter that controls
2871  * the split between whether we prefer larger extents or more well populated
2872  * extents:
2873  *
2874  * SCORE = FILL_IN_BYTES + (FILL_IN_PERCENT * FILL_IN_BYTES * FILL_WEIGHT)
2875  *
2876  * Example:
2877  * 1) assume extsz = 64 MiB
2878  * 2) assume fill = 32 MiB (extent is half full)
2879  * 3) assume fill_weight = 3
2880  * 4)   SCORE = 32M + (((32M * 100) / 64M) * 3 * 32M) / 100
2881  *      SCORE = 32M + (50 * 3 * 32M) / 100
2882  *      SCORE = 32M + (4800M / 100)
2883  *      SCORE = 32M + 48M
2884  *               ^     ^
2885  *               |     +--- final total relative fill-based score
2886  *               +--------- final total fill-based score
2887  *      SCORE = 80M
2888  *
2889  * As can be seen, at fill_ratio=3, the algorithm is slightly biased towards
2890  * extents that are more completely filled (in a 3:2 ratio) vs just larger.
2891  */
2892 static int
2893 ext_size_compar(const void *x, const void *y)
2894 {
2895         const range_seg_t *rsa = x, *rsb = y;
2896         uint64_t sa = rsa->rs_end - rsa->rs_start,
2897             sb = rsb->rs_end - rsb->rs_start;
2898         uint64_t score_a, score_b;
2899 
2900         score_a = rsa->rs_fill + (((rsa->rs_fill * 100) / sa) *
2901             fill_weight * rsa->rs_fill) / 100;
2902         score_b = rsb->rs_fill + (((rsb->rs_fill * 100) / sb) *
2903             fill_weight * rsb->rs_fill) / 100;
2904 
2905         if (score_a > score_b)
2906                 return (-1);
2907         if (score_a == score_b) {
2908                 if (rsa->rs_start < rsb->rs_start)
2909                         return (-1);
2910                 if (rsa->rs_start == rsb->rs_start)
2911                         return (0);
2912                 return (1);
2913         }
2914         return (1);
2915 }
2916 
2917 /*
2918  * Comparator for the q_zios_by_addr tree. Sorting is simply performed
2919  * based on LBA-order (from lowest to highest).
2920  */
2921 static int
2922 io_addr_compar(const void *x, const void *y)
2923 {
2924         const scan_io_t *a = x, *b = y;
2925         uint64_t off_a = SCAN_IO_GET_OFFSET(a);
2926         uint64_t off_b = SCAN_IO_GET_OFFSET(b);
2927         if (off_a < off_b)
2928                 return (-1);
2929         if (off_a == off_b)
2930                 return (0);
2931         return (1);
2932 }
2933 
2934 static dsl_scan_io_queue_t *
2935 scan_io_queue_create(vdev_t *vd)
2936 {
2937         dsl_scan_t *scn = vd->vdev_spa->spa_dsl_pool->dp_scan;
2938         dsl_scan_io_queue_t *q = kmem_zalloc(sizeof (*q), KM_SLEEP);
2939 
2940         q->q_scn = scn;
2941         q->q_vd = vd;
2942         cv_init(&q->q_cv, NULL, CV_DEFAULT, NULL);
2943         q->q_exts_by_addr = range_tree_create(&scan_io_queue_ops, q,
2944             &q->q_vd->vdev_scan_io_queue_lock);
2945         avl_create(&q->q_exts_by_size, ext_size_compar,
2946             sizeof (range_seg_t), offsetof(range_seg_t, rs_pp_node));
2947         avl_create(&q->q_zios_by_addr, io_addr_compar,
2948             sizeof (scan_io_t), offsetof(scan_io_t, sio_nodes.sio_addr_node));
2949 
2950         return (q);
2951 }
2952 
2953 /*
2954  * Destroyes a scan queue and all segments and scan_io_t's contained in it.
2955  * No further execution of I/O occurs, anything pending in the queue is
2956  * simply dropped. Prior to calling this, the queue should have been
2957  * removed from its parent top-level vdev, hence holding the queue's
2958  * lock is not permitted.
2959  */
2960 void
2961 dsl_scan_io_queue_destroy(dsl_scan_io_queue_t *queue)
2962 {
2963         dsl_scan_t *scn = queue->q_scn;
2964         scan_io_t *sio;
2965         uint64_t bytes_dequeued = 0;
2966         kmutex_t *q_lock = &queue->q_vd->vdev_scan_io_queue_lock;
2967 
2968         ASSERT(!MUTEX_HELD(q_lock));
2969 
2970 #ifdef  DEBUG   /* This is for the ASSERT(range_tree_contains... below */
2971         mutex_enter(q_lock);
2972 #endif
2973         while ((sio = avl_first(&queue->q_zios_by_addr)) != NULL) {
2974                 ASSERT(range_tree_contains(queue->q_exts_by_addr,
2975                     SCAN_IO_GET_OFFSET(sio), sio->sio_asize));
2976                 bytes_dequeued += sio->sio_asize;
2977                 avl_remove(&queue->q_zios_by_addr, sio);
2978                 kmem_free(sio, sizeof (*sio));
2979         }
2980 #ifdef  DEBUG
2981         mutex_exit(q_lock);
2982 #endif
2983 
2984         mutex_enter(&scn->scn_status_lock);
2985         ASSERT3U(scn->scn_bytes_pending, >=, bytes_dequeued);
2986         scn->scn_bytes_pending -= bytes_dequeued;
2987         mutex_exit(&scn->scn_status_lock);
2988 
2989         /* lock here to avoid tripping assertion in range_tree_vacate */
2990         mutex_enter(q_lock);
2991         range_tree_vacate(queue->q_exts_by_addr, NULL, queue);
2992         mutex_exit(q_lock);
2993 
2994         range_tree_destroy(queue->q_exts_by_addr);
2995         avl_destroy(&queue->q_exts_by_size);
2996         avl_destroy(&queue->q_zios_by_addr);
2997         cv_destroy(&queue->q_cv);
2998 
2999         kmem_free(queue, sizeof (*queue));
3000 }
3001 
3002 /*
3003  * Properly transfers a dsl_scan_queue_t from `svd' to `tvd'. This is
3004  * called on behalf of vdev_top_transfer when creating or destroying
3005  * a mirror vdev due to zpool attach/detach.
3006  */
3007 void
3008 dsl_scan_io_queue_vdev_xfer(vdev_t *svd, vdev_t *tvd)
3009 {
3010         mutex_enter(&svd->vdev_scan_io_queue_lock);
3011         mutex_enter(&tvd->vdev_scan_io_queue_lock);
3012 
3013         VERIFY3P(tvd->vdev_scan_io_queue, ==, NULL);
3014         tvd->vdev_scan_io_queue = svd->vdev_scan_io_queue;
3015         svd->vdev_scan_io_queue = NULL;
3016         if (tvd->vdev_scan_io_queue != NULL) {
3017                 tvd->vdev_scan_io_queue->q_vd = tvd;
3018                 range_tree_set_lock(tvd->vdev_scan_io_queue->q_exts_by_addr,
3019                     &tvd->vdev_scan_io_queue_lock);
3020         }
3021 
3022         mutex_exit(&tvd->vdev_scan_io_queue_lock);
3023         mutex_exit(&svd->vdev_scan_io_queue_lock);
3024 }
3025 
3026 static void
3027 scan_io_queues_destroy(dsl_scan_t *scn)
3028 {
3029         vdev_t *rvd = scn->scn_dp->dp_spa->spa_root_vdev;
3030 
3031         for (uint64_t i = 0; i < rvd->vdev_children; i++) {
3032                 vdev_t *tvd = rvd->vdev_child[i];
3033                 dsl_scan_io_queue_t *queue;
3034 
3035                 mutex_enter(&tvd->vdev_scan_io_queue_lock);
3036                 queue = tvd->vdev_scan_io_queue;
3037                 tvd->vdev_scan_io_queue = NULL;
3038                 mutex_exit(&tvd->vdev_scan_io_queue_lock);
3039 
3040                 if (queue != NULL)
3041                         dsl_scan_io_queue_destroy(queue);
3042         }
3043 }
3044 
3045 /*
3046  * Computes the memory limit state that we're currently in. A sorted scan
3047  * needs quite a bit of memory to hold the sorting queues, so we need to
3048  * reasonably constrain their size so they don't impact overall system
3049  * performance. We compute two limits:
3050  * 1) Hard memory limit: if the amount of memory used by the sorting
3051  *      queues on a pool gets above this value, we stop the metadata
3052  *      scanning portion and start issuing the queued up and sorted
3053  *      I/Os to reduce memory usage.
3054  *      This limit is calculated as a fraction of physmem (by default 5%).
3055  *      We constrain the lower bound of the hard limit to an absolute
3056  *      minimum of zfs_scan_mem_lim_min (default: 16 MiB). We also constrain
3057  *      the upper bound to 5% of the total pool size - no chance we'll
3058  *      ever need that much memory, but just to keep the value in check.
3059  * 2) Soft memory limit: once we hit the hard memory limit, we start
3060  *      issuing I/O to lower queue memory usage, but we don't want to
3061  *      completely empty them out, as having more in the queues allows
3062  *      us to make better sorting decisions. So we stop the issuing of
3063  *      I/Os once the amount of memory used drops below the soft limit
3064  *      (at which point we stop issuing I/O and start scanning metadata
3065  *      again).
3066  *      The limit is calculated by subtracting a fraction of the hard
3067  *      limit from the hard limit. By default this fraction is 10%, so
3068  *      the soft limit is 90% of the hard limit. We cap the size of the
3069  *      difference between the hard and soft limits at an absolute
3070  *      maximum of zfs_scan_mem_lim_soft_max (default: 128 MiB) - this is
3071  *      sufficient to not cause too frequent switching between the
3072  *      metadata scan and I/O issue (even at 2k recordsize, 128 MiB's
3073  *      worth of queues is about 1.2 GiB of on-pool data, so scanning
3074  *      that should take at least a decent fraction of a second).
3075  */
3076 static mem_lim_t
3077 scan_io_queue_mem_lim(dsl_scan_t *scn)
3078 {
3079         vdev_t *rvd = scn->scn_dp->dp_spa->spa_root_vdev;
3080         uint64_t mlim_hard, mlim_soft, mused;
3081         uint64_t alloc = metaslab_class_get_alloc(spa_normal_class(
3082             scn->scn_dp->dp_spa));
3083 
3084         mlim_hard = MAX((physmem / zfs_scan_mem_lim_fact) * PAGESIZE,
3085             zfs_scan_mem_lim_min);
3086         mlim_hard = MIN(mlim_hard, alloc / 20);
3087         mlim_soft = mlim_hard - MIN(mlim_hard / zfs_scan_mem_lim_soft_fact,
3088             zfs_scan_mem_lim_soft_max);
3089         mused = 0;
3090         for (uint64_t i = 0; i < rvd->vdev_children; i++) {
3091                 vdev_t *tvd = rvd->vdev_child[i];
3092                 dsl_scan_io_queue_t *queue;
3093 
3094                 mutex_enter(&tvd->vdev_scan_io_queue_lock);
3095                 queue = tvd->vdev_scan_io_queue;
3096                 if (queue != NULL) {
3097                         /* #extents in exts_by_size = # in exts_by_addr */
3098                         mused += avl_numnodes(&queue->q_exts_by_size) *
3099                             sizeof (range_seg_t) +
3100                             (avl_numnodes(&queue->q_zios_by_addr) +
3101                             queue->q_num_issuing_zios) * sizeof (scan_io_t);
3102                 }
3103                 mutex_exit(&tvd->vdev_scan_io_queue_lock);
3104         }
3105         DTRACE_PROBE4(queue_mem_lim, dsl_scan_t *, scn, uint64_t, mlim_hard,
3106             uint64_t, mlim_soft, uint64_t, mused);
3107 
3108         if (mused >= mlim_hard)
3109                 return (MEM_LIM_HARD);
3110         else if (mused >= mlim_soft)
3111                 return (MEM_LIM_SOFT);
3112         else
3113                 return (MEM_LIM_NONE);
3114 }
3115 
3116 /*
3117  * Given a list of scan_io_t's in io_list, this issues the io's out to
3118  * disk. Passing shutdown=B_TRUE instead discards the zio's without
3119  * issuing them. This consumes the io_list and frees the scan_io_t's.
3120  * This is called when emptying queues, either when we're up against
3121  * the memory limit or we have finished scanning.
3122  */
3123 static void
3124 scan_io_queue_issue(list_t *io_list, dsl_scan_io_queue_t *queue)
3125 {
3126         dsl_scan_t *scn = queue->q_scn;
3127         scan_io_t *sio;
3128         int64_t bytes_issued = 0;
3129 
3130         while ((sio = list_head(io_list)) != NULL) {
3131                 blkptr_t bp;
3132 
3133                 sio2bp(sio, &bp, queue->q_vd->vdev_id);
3134                 bytes_issued += sio->sio_asize;
3135                 scan_exec_io(scn->scn_dp, &bp, sio->sio_flags, &sio->sio_zb,
3136                     B_FALSE);
3137                 (void) list_remove_head(io_list);
3138                 ASSERT(queue->q_num_issuing_zios > 0);
3139                 atomic_dec_64(&queue->q_num_issuing_zios);
3140                 kmem_free(sio, sizeof (*sio));
3141         }
3142 
3143         mutex_enter(&scn->scn_status_lock);
3144         ASSERT3U(scn->scn_bytes_pending, >=, bytes_issued);
3145         scn->scn_bytes_pending -= bytes_issued;
3146         mutex_exit(&scn->scn_status_lock);
3147 
3148         ASSERT3U(queue->q_zio_bytes, >=, bytes_issued);
3149         atomic_add_64(&queue->q_zio_bytes, -bytes_issued);
3150 
3151         list_destroy(io_list);
3152 }
3153 
3154 /*
3155  * Given a range_seg_t (extent) and a list, this function passes over a
3156  * scan queue and gathers up the appropriate ios which fit into that
3157  * scan seg (starting from lowest LBA). During this, we observe that we
3158  * don't go over the `limit' in the total amount of scan_io_t bytes that
3159  * were gathered. At the end, we remove the appropriate amount of space
3160  * from the q_exts_by_addr. If we have consumed the entire scan seg, we
3161  * remove it completely from q_exts_by_addr. If we've only consumed a
3162  * portion of it, we shorten the scan seg appropriately. A future call
3163  * will consume more of the scan seg's constituent io's until
3164  * consuming the extent completely. If we've reduced the size of the
3165  * scan seg, we of course reinsert it in the appropriate spot in the
3166  * q_exts_by_size tree.
3167  */
3168 static uint64_t
3169 scan_io_queue_gather(const range_seg_t *rs, list_t *list,
3170     dsl_scan_io_queue_t *queue, uint64_t limit)
3171 {
3172         scan_io_t srch_sio, *sio, *next_sio;
3173         avl_index_t idx;
3174         int64_t num_zios = 0, bytes = 0;
3175         boolean_t size_limited = B_FALSE;
3176 
3177         ASSERT(rs != NULL);
3178         ASSERT3U(limit, !=, 0);
3179         ASSERT(MUTEX_HELD(&queue->q_vd->vdev_scan_io_queue_lock));
3180 
3181         list_create(list, sizeof (scan_io_t),
3182             offsetof(scan_io_t, sio_nodes.sio_list_node));
3183         SCAN_IO_SET_OFFSET(&srch_sio, rs->rs_start);
3184 
3185         /*
3186          * The exact start of the extent might not contain any matching zios,
3187          * so if that's the case, examine the next one in the tree.
3188          */
3189         sio = avl_find(&queue->q_zios_by_addr, &srch_sio, &idx);
3190         if (sio == NULL)
3191                 sio = avl_nearest(&queue->q_zios_by_addr, idx, AVL_AFTER);
3192 
3193         while (sio != NULL && SCAN_IO_GET_OFFSET(sio) < rs->rs_end) {
3194                 if (bytes >= limit) {
3195                         size_limited = B_TRUE;
3196                         break;
3197                 }
3198                 ASSERT3U(SCAN_IO_GET_OFFSET(sio), >=, rs->rs_start);
3199                 ASSERT3U(SCAN_IO_GET_OFFSET(sio) + sio->sio_asize, <=,
3200                     rs->rs_end);
3201 
3202                 next_sio = AVL_NEXT(&queue->q_zios_by_addr, sio);
3203                 avl_remove(&queue->q_zios_by_addr, sio);
3204                 list_insert_tail(list, sio);
3205                 num_zios++;
3206                 bytes += sio->sio_asize;
3207                 sio = next_sio;
3208         }
3209 
3210         if (size_limited) {
3211                 uint64_t end;
3212                 sio = list_tail(list);
3213                 end = SCAN_IO_GET_OFFSET(sio) + sio->sio_asize;
3214                 range_tree_remove_fill(queue->q_exts_by_addr, rs->rs_start,
3215                     end - rs->rs_start, bytes, 0);
3216         } else {
3217                 /*
3218                  * Whole extent consumed, remove it all, including any head
3219                  * or tail overhang.
3220                  */
3221                 range_tree_remove_fill(queue->q_exts_by_addr, rs->rs_start,
3222                     rs->rs_end - rs->rs_start, bytes, 0);
3223         }
3224         atomic_add_64(&queue->q_num_issuing_zios, num_zios);
3225 
3226         return (bytes);
3227 }
3228 
3229 /*
3230  * This is called from the queue emptying thread and selects the next
3231  * extent from which we are to issue io's. The behavior of this function
3232  * depends on the state of the scan, the current memory consumption and
3233  * whether or not we are performing a scan shutdown.
3234  * 1) We select extents in an elevator algorithm (LBA-order) if:
3235  *      a) the scan has finished traversing metadata (DSS_FINISHING)
3236  *      b) the scan needs to perform a checkpoint
3237  * 2) We select the largest available extent if we are up against the
3238  *      memory limit.
3239  * 3) Otherwise we don't select any extents.
3240  */
3241 static const range_seg_t *
3242 scan_io_queue_fetch_ext(dsl_scan_io_queue_t *queue)
3243 {
3244         dsl_scan_t *scn = queue->q_scn;
3245 
3246         ASSERT(MUTEX_HELD(&queue->q_vd->vdev_scan_io_queue_lock));
3247         ASSERT0(queue->q_issuing_rs.rs_start);
3248         ASSERT0(queue->q_issuing_rs.rs_end);
3249         ASSERT(scn->scn_is_sorted);
3250 
3251         if (scn->scn_phys.scn_state == DSS_FINISHING ||
3252             scn->scn_checkpointing) {
3253                 /*
3254                  * When the scan has finished traversing all metadata and is
3255                  * in the DSS_FINISHING state or a checkpoint has been
3256                  * requested, no new extents will be added to the sorting
3257                  * queue, so the way we are sorted now is as good as it'll
3258                  * get. So instead, switch to issuing extents in linear order.
3259                  */
3260                 return (range_tree_first(queue->q_exts_by_addr));
3261         } else if (scn->scn_clearing) {
3262                 return (avl_first(&queue->q_exts_by_size));
3263         } else {
3264                 return (NULL);
3265         }
3266 }
3267 
3268 /*
3269  * Empties a scan queue until we have issued at least info->qri_limit
3270  * bytes, or the queue is empty. This is called via the scn_taskq so as
3271  * to parallelize processing of all top-level vdevs as much as possible.
3272  */
3273 static void
3274 scan_io_queues_run_one(io_queue_run_info_t *info)
3275 {
3276         dsl_scan_io_queue_t *queue = info->qri_queue;
3277         uint64_t limit = info->qri_limit;
3278         dsl_scan_t *scn = queue->q_scn;
3279         kmutex_t *q_lock = &queue->q_vd->vdev_scan_io_queue_lock;
3280         list_t zio_list;
3281         const range_seg_t *rs;
3282         uint64_t issued = 0;
3283 
3284         ASSERT(scn->scn_is_sorted);
3285 
3286         /* loop until we have issued as much I/O as was requested */
3287         while (issued < limit) {
3288                 scan_io_t *first_io, *last_io;
3289 
3290                 mutex_enter(q_lock);
3291                 /* First we select the extent we'll be issuing from next. */
3292                 rs = scan_io_queue_fetch_ext(queue);
3293                 DTRACE_PROBE2(queue_fetch_ext, range_seg_t *, rs,
3294                     dsl_scan_io_queue_t *, queue);
3295                 if (rs == NULL) {
3296                         mutex_exit(q_lock);
3297                         break;
3298                 }
3299 
3300                 /*
3301                  * We have selected which extent needs to be processed next,
3302                  * gather up the corresponding zio's, taking care not to step
3303                  * over the limit.
3304                  */
3305                 issued += scan_io_queue_gather(rs, &zio_list, queue,
3306                     limit - issued);
3307                 first_io = list_head(&zio_list);
3308                 last_io = list_tail(&zio_list);
3309                 if (first_io != NULL) {
3310                         /*
3311                          * We have zio's to issue. Construct a fake range
3312                          * seg that covers the whole list of zio's to issue
3313                          * (the list is guaranteed to be LBA-ordered) and
3314                          * save that in the queue's "in flight" segment.
3315                          * This is used to prevent freeing I/O from hitting
3316                          * that range while we're working on it.
3317                          */
3318                         ASSERT(last_io != NULL);
3319                         queue->q_issuing_rs.rs_start =
3320                             SCAN_IO_GET_OFFSET(first_io);
3321                         queue->q_issuing_rs.rs_end =
3322                             SCAN_IO_GET_OFFSET(last_io) + last_io->sio_asize;
3323                 }
3324                 mutex_exit(q_lock);
3325 
3326                 /*
3327                  * Issuing zio's can take a long time (especially because
3328                  * we are contrained by zfs_top_maxinflight), so drop the
3329                  * queue lock.
3330                  */
3331                 scan_io_queue_issue(&zio_list, queue);
3332 
3333                 mutex_enter(q_lock);
3334                 /* invalidate the in-flight I/O range */
3335                 bzero(&queue->q_issuing_rs, sizeof (queue->q_issuing_rs));
3336                 cv_broadcast(&queue->q_cv);
3337                 mutex_exit(q_lock);
3338         }
3339 }
3340 
3341 /*
3342  * Performs an emptying run on all scan queues in the pool. This just
3343  * punches out one thread per top-level vdev, each of which processes
3344  * only that vdev's scan queue. We can parallelize the I/O here because
3345  * we know that each queue's io's only affect its own top-level vdev.
3346  * The amount of I/O dequeued per run of this function is calibrated
3347  * dynamically so that its total run time doesn't exceed
3348  * zfs_scan_dequeue_run_target_ms + zfs_dequeue_run_bonus_ms. The
3349  * timing algorithm aims to hit the target value, but still
3350  * oversubscribes the amount of data that it is allowed to fetch by
3351  * the bonus value. This is to allow for non-equal completion times
3352  * across top-level vdevs.
3353  *
3354  * This function waits for the queue runs to complete, and must be
3355  * called from dsl_scan_sync (or in general, syncing context).
3356  */
3357 static void
3358 scan_io_queues_run(dsl_scan_t *scn)
3359 {
3360         spa_t *spa = scn->scn_dp->dp_spa;
3361         uint64_t dirty_limit, total_limit, total_bytes;
3362         io_queue_run_info_t *info;
3363         uint64_t dequeue_min = zfs_scan_dequeue_min *
3364             spa->spa_root_vdev->vdev_children;
3365 
3366         ASSERT(scn->scn_is_sorted);
3367         ASSERT(spa_config_held(spa, SCL_CONFIG, RW_READER));
3368 
3369         if (scn->scn_taskq == NULL) {
3370                 char *tq_name = kmem_zalloc(ZFS_MAX_DATASET_NAME_LEN + 16,
3371                     KM_SLEEP);
3372                 const int nthreads = spa->spa_root_vdev->vdev_children;
3373 
3374                 /*
3375                  * We need to make this taskq *always* execute as many
3376                  * threads in parallel as we have top-level vdevs and no
3377                  * less, otherwise strange serialization of the calls to
3378                  * scan_io_queues_run_one can occur during spa_sync runs
3379                  * and that significantly impacts performance.
3380                  */
3381                 (void) snprintf(tq_name, ZFS_MAX_DATASET_NAME_LEN + 16,
3382                     "dsl_scan_tq_%s", spa->spa_name);
3383                 scn->scn_taskq = taskq_create(tq_name, nthreads, minclsyspri,
3384                     nthreads, nthreads, TASKQ_PREPOPULATE);
3385                 kmem_free(tq_name, ZFS_MAX_DATASET_NAME_LEN + 16);
3386         }
3387 
3388         /*
3389          * This is the automatic run time calibration algorithm. We gauge
3390          * how long spa_sync took since last time we were invoked. If it
3391          * took longer than our target + bonus values, we reduce the
3392          * amount of data that the queues are allowed to process in this
3393          * iteration. Conversely, if it took less than target + bonus,
3394          * we increase the amount of data the queues are allowed to process.
3395          * This is designed as a partial load-following algorithm, so if
3396          * other ZFS users start issuing I/O, we back off, until we hit our
3397          * minimum issue amount (per-TL-vdev) of zfs_scan_dequeue_min bytes.
3398          */
3399         if (scn->scn_last_queue_run_time != 0) {
3400                 uint64_t now = ddi_get_lbolt64();
3401                 uint64_t delta_ms = TICK_TO_MSEC(now -
3402                     scn->scn_last_queue_run_time);
3403                 uint64_t bonus = zfs_dequeue_run_bonus_ms;
3404 
3405                 bonus = MIN(bonus, DEQUEUE_BONUS_MS_MAX);
3406                 if (delta_ms <= bonus)
3407                         delta_ms = bonus + 1;
3408 
3409                 scn->scn_last_dequeue_limit = MAX(dequeue_min,
3410                     (scn->scn_last_dequeue_limit *
3411                     zfs_scan_dequeue_run_target_ms) / (delta_ms - bonus));
3412                 scn->scn_last_queue_run_time = now;
3413         } else {
3414                 scn->scn_last_queue_run_time = ddi_get_lbolt64();
3415                 scn->scn_last_dequeue_limit = dequeue_min;
3416         }
3417 
3418         /*
3419          * We also constrain the amount of data we are allowed to issue
3420          * by the zfs_dirty_data_max value - this serves as basically a
3421          * sanity check just to prevent us from issuing huge amounts of
3422          * data to be dequeued per run.
3423          */
3424         dirty_limit = (zfs_vdev_async_write_active_min_dirty_percent *
3425             zfs_dirty_data_max) / 100;
3426         if (dirty_limit >= scn->scn_dp->dp_dirty_total)
3427                 dirty_limit -= scn->scn_dp->dp_dirty_total;
3428         else
3429                 dirty_limit = 0;
3430 
3431         total_limit = MAX(MIN(scn->scn_last_dequeue_limit, dirty_limit),
3432             dequeue_min);
3433 
3434         /*
3435          * We use this to determine how much data each queue is allowed to
3436          * issue this run. We take the amount of dirty data available in
3437          * the current txg and proportionally split it between each queue,
3438          * depending on how full a given queue is. No need to lock here,
3439          * new data can't enter the queue, since that's only done in our
3440          * sync thread.
3441          */
3442         total_bytes = scn->scn_bytes_pending;
3443         if (total_bytes == 0)
3444                 return;
3445 
3446         info = kmem_zalloc(sizeof (*info) * spa->spa_root_vdev->vdev_children,
3447             KM_SLEEP);
3448         for (uint64_t i = 0; i < spa->spa_root_vdev->vdev_children; i++) {
3449                 vdev_t *vd = spa->spa_root_vdev->vdev_child[i];
3450                 dsl_scan_io_queue_t *queue;
3451                 uint64_t limit;
3452 
3453                 /*
3454                  * No need to lock to check if the queue exists, since this
3455                  * is called from sync context only and queues are only
3456                  * created in sync context also.
3457                  */
3458                 queue = vd->vdev_scan_io_queue;
3459                 if (queue == NULL)
3460                         continue;
3461 
3462                 /*
3463                  * Compute the per-queue limit as a fraction of the queue's
3464                  * size, relative to the total amount of zio bytes in all
3465                  * all queues. 1000 here is the fixed-point precision. If
3466                  * there are ever  more than 1000 top-level vdevs, this
3467                  * code might misbehave.
3468                  */
3469                 limit = MAX((((queue->q_zio_bytes * 1000) / total_bytes) *
3470                     total_limit) / 1000, zfs_scan_dequeue_min);
3471 
3472                 info[i].qri_queue = queue;
3473                 info[i].qri_limit = limit;
3474 
3475                 VERIFY(taskq_dispatch(scn->scn_taskq,
3476                     (void (*)(void *))scan_io_queues_run_one, &info[i],
3477                     TQ_SLEEP) != NULL);
3478         }
3479 
3480         /*
3481          * We need to wait for all queues to finish their run, just to keep
3482          * things nice and consistent. This doesn't necessarily mean all
3483          * I/O generated by the queues emptying has finished (there may be
3484          * up to zfs_top_maxinflight zio's still processing on behalf of
3485          * each queue).
3486          */
3487         taskq_wait(scn->scn_taskq);
3488 
3489         kmem_free(info, sizeof (*info) * spa->spa_root_vdev->vdev_children);
3490 }
3491 
3492 /*
3493  * Callback invoked when a zio_free() zio is executing. This needs to be
3494  * intercepted to prevent the zio from deallocating a particular portion
3495  * of disk space and it then getting reallocated and written to, while we
3496  * still have it queued up for processing, or even while we're trying to
3497  * scrub or resilver it.
3498  */
3499 void
3500 dsl_scan_freed(spa_t *spa, const blkptr_t *bp)
3501 {
3502         dsl_pool_t *dp = spa->spa_dsl_pool;
3503         dsl_scan_t *scn = dp->dp_scan;
3504 
3505         ASSERT(!BP_IS_EMBEDDED(bp));
3506         ASSERT(scn != NULL);
3507         if (!dsl_scan_is_running(scn))
3508                 return;
3509 
3510         for (int i = 0; i < BP_GET_NDVAS(bp); i++) {
3511                 if (BP_IS_SPECIAL(bp) && i != WBC_NORMAL_DVA)
3512                         continue;
3513                 dsl_scan_freed_dva(spa, bp, i);
3514         }
3515 }
3516 
3517 static void
3518 dsl_scan_freed_dva(spa_t *spa, const blkptr_t *bp, int dva_i)
3519 {
3520         dsl_pool_t *dp = spa->spa_dsl_pool;
3521         dsl_scan_t *scn = dp->dp_scan;
3522         vdev_t *vdev;
3523         kmutex_t *q_lock;
3524         dsl_scan_io_queue_t *queue;
3525         scan_io_t srch, *sio;
3526         avl_index_t idx;
3527         uint64_t offset;
3528         int64_t asize;
3529 
3530         ASSERT(!BP_IS_EMBEDDED(bp));
3531         ASSERT(scn != NULL);
3532         ASSERT(!BP_IS_SPECIAL(bp) || dva_i == WBC_NORMAL_DVA);
3533 
3534         vdev = vdev_lookup_top(spa, DVA_GET_VDEV(&bp->blk_dva[dva_i]));
3535         ASSERT(vdev != NULL);
3536         q_lock = &vdev->vdev_scan_io_queue_lock;
3537         queue = vdev->vdev_scan_io_queue;
3538 
3539         mutex_enter(q_lock);
3540         if (queue == NULL) {
3541                 mutex_exit(q_lock);
3542                 return;
3543         }
3544 
3545         bp2sio(bp, &srch, dva_i);
3546         offset = SCAN_IO_GET_OFFSET(&srch);
3547         asize = srch.sio_asize;
3548 
3549         /*
3550          * We can find the zio in two states:
3551          * 1) Cold, just sitting in the queue of zio's to be issued at
3552          *      some point in the future. In this case, all we do is
3553          *      remove the zio from the q_zios_by_addr tree, decrement
3554          *      its data volume from the containing range_seg_t and
3555          *      resort the q_exts_by_size tree to reflect that the
3556          *      range_seg_t has lost some of its 'fill'. We don't shorten
3557          *      the range_seg_t - this is usually rare enough not to be
3558          *      worth the extra hassle of trying keep track of precise
3559          *      extent boundaries.
3560          * 2) Hot, where the zio is currently in-flight in
3561          *      dsl_scan_issue_ios. In this case, we can't simply
3562          *      reach in and stop the in-flight zio's, so we instead
3563          *      block the caller. Eventually, dsl_scan_issue_ios will
3564          *      be done with issuing the zio's it gathered and will
3565          *      signal us.
3566          */
3567         sio = avl_find(&queue->q_zios_by_addr, &srch, &idx);
3568         if (sio != NULL) {
3569                 range_seg_t *rs;
3570 
3571                 /* Got it while it was cold in the queue */
3572                 ASSERT3U(srch.sio_asize, ==, sio->sio_asize);
3573                 DTRACE_PROBE2(dequeue_now, const blkptr_t *, bp,
3574                     dsl_scan_queue_t *, queue);
3575                 count_block(scn, dp->dp_blkstats, bp);
3576                 ASSERT(range_tree_contains(queue->q_exts_by_addr, offset,
3577                     asize));
3578                 avl_remove(&queue->q_zios_by_addr, sio);
3579 
3580                 /*
3581                  * Since we're taking this scan_io_t out of its parent
3582                  * range_seg_t, we need to alter the range_seg_t's rs_fill
3583                  * value, so this changes its ordering position. We need
3584                  * to reinsert in its appropriate place in q_exts_by_size.
3585                  */
3586                 rs = range_tree_find(queue->q_exts_by_addr,
3587                     SCAN_IO_GET_OFFSET(sio), sio->sio_asize);
3588                 ASSERT(rs != NULL);
3589                 ASSERT3U(rs->rs_fill, >=, sio->sio_asize);
3590                 avl_remove(&queue->q_exts_by_size, rs);
3591                 ASSERT3U(rs->rs_fill, >=, sio->sio_asize);
3592                 rs->rs_fill -= sio->sio_asize;
3593                 VERIFY3P(avl_find(&queue->q_exts_by_size, rs, &idx), ==, NULL);
3594                 avl_insert(&queue->q_exts_by_size, rs, idx);
3595 
3596                 /*
3597                  * We only update the queue byte counter in the cold path,
3598                  * otherwise it will already have been accounted for as
3599                  * part of the zio's execution.
3600                  */
3601                 ASSERT3U(queue->q_zio_bytes, >=, asize);
3602                 atomic_add_64(&queue->q_zio_bytes, -asize);
3603 
3604                 mutex_enter(&scn->scn_status_lock);
3605                 ASSERT3U(scn->scn_bytes_pending, >=, asize);
3606                 scn->scn_bytes_pending -= asize;
3607                 mutex_exit(&scn->scn_status_lock);
3608 
3609                 kmem_free(sio, sizeof (*sio));
3610         } else {
3611                 /*
3612                  * If it's part of an extent that's currently being issued,
3613                  * wait until the extent has been consumed. In this case it's
3614                  * not us who is dequeueing this zio, so no need to
3615                  * decrement its size from scn_bytes_pending or the queue.
3616                  */
3617                 while (queue->q_issuing_rs.rs_start <= offset &&
3618                     queue->q_issuing_rs.rs_end >= offset + asize) {
3619                         DTRACE_PROBE2(dequeue_wait, const blkptr_t *, bp,
3620                             dsl_scan_queue_t *, queue);
3621                         cv_wait(&queue->q_cv, &vdev->vdev_scan_io_queue_lock);
3622                 }
3623         }
3624         mutex_exit(q_lock);
3625 }