1 /*
   2  * This file and its contents are supplied under the terms of the
   3  * Common Development and Distribution License ("CDDL"), version 1.0.
   4  * You may only use this file in accordance with the terms of version
   5  * 1.0 of the CDDL.
   6  *
   7  * A full copy of the text of the CDDL should have accompanied this
   8  * source.  A copy of the CDDL is also available via the Internet at
   9  * http://www.illumos.org/license/CDDL.
  10  */
  11 
  12 /*
  13  * Copyright 2017 Nexenta Systems, Inc.  All rights reserved.
  14  */
  15 
  16 /*
  17  * WriteBackCache (WBC) basics.
  18  * ZFS allows to store up to 3 dva per block pointer. Normally, all of the dvas
  19  * are valid at all time (or at least supposed to be so, and if data under a
  20  * dva is broken it is repaired with data under another dva). WBC alters the
  21  * behaviour. Each cached with WBC block has two dvas, and validity of them
  22  * changes during time. At first, when zfs decides to chace a block with WBC,
  23  * two dvas are allocated: one on a special device and one on a normal one.
  24  * Data is written to the special dva only. At the time the special dva is
  25  * valid and the normal one contains garbage. Later, after move operation is
  26  * performed for a block, i.e. the data stored under the special dva is copied
  27  * to the place pointed by the normal dva, the special dva is freed and can be
  28  * reused and the normal dva now valid and contains actual data.
  29  * To let zfs know which dva is valid and which is not, all data is moved by
  30  * chunks bounded with birth txg. When a new chunck of data should be moved, a
  31  * snapshot (recursive, starting at the very root dataset) is created. the
  32  * snapshot is used to perform simple traverse over it and not to miss any
  33  * block. The txg boundaries are from old_move_snap_txg + 1 to new_move_snap.
  34  * Checking blocks' birth txg against those boundaries, zfs understand which
  35  * dva is valid at the moment.
  36  */
  37 
  38 #include <sys/fm/fs/zfs.h>
  39 #include <sys/special.h>
  40 #include <sys/spa_impl.h>
  41 #include <sys/zio.h>
  42 #include <sys/zio_checksum.h>
  43 #include <sys/dmu.h>
  44 #include <sys/dmu_tx.h>
  45 #include <sys/zap.h>
  46 #include <sys/zil.h>
  47 #include <sys/ddt.h>
  48 #include <sys/dmu_traverse.h>
  49 #include <sys/dmu_objset.h>
  50 #include <sys/dsl_pool.h>
  51 #include <sys/dsl_dataset.h>
  52 #include <sys/dsl_dir.h>
  53 #include <sys/dsl_scan.h>
  54 #include <sys/dsl_prop.h>
  55 #include <sys/arc.h>
  56 #include <sys/vdev_impl.h>
  57 #include <sys/mutex.h>
  58 #include <sys/time.h>
  59 #include <sys/arc.h>
  60 #include <sys/zio_compress.h>
  61 #include <sys/zfs_ioctl.h>
  62 #ifdef _KERNEL
  63 #include <sys/ddi.h>
  64 #endif
  65 
  66 extern int zfs_txg_timeout;
  67 extern int zfs_scan_min_time_ms;
  68 extern uint64_t zfs_dirty_data_sync;
  69 extern uint64_t krrp_debug;
  70 
  71 typedef enum {
  72         WBC_READ_FROM_SPECIAL = 1,
  73         WBC_WRITE_TO_NORMAL,
  74 } wbc_io_type_t;
  75 
  76 /*
  77  * timeout (in seconds) that is used to schedule a job that moves
  78  * blocks from a special device to other deivices in a pool
  79  */
  80 int zfs_wbc_schedtmo = 0;
  81 
  82 uint64_t zfs_wbc_data_max = 48 << 20; /* Max data to migrate in a pass */
  83 
  84 uint64_t wbc_mv_cancel_threshold_initial = 20;
  85 /* we are not sure if we need logic of threshold increment */
  86 uint64_t wbc_mv_cancel_threshold_step = 0;
  87 uint64_t wbc_mv_cancel_threshold_cap = 50;
  88 
  89 static boolean_t wbc_check_space(spa_t *spa);
  90 
  91 static void wbc_free_block(wbc_block_t *block);
  92 static void wbc_clean_tree(wbc_data_t *wbc_data, avl_tree_t *tree);
  93 static void wbc_clean_plan_tree(wbc_data_t *wbc_data);
  94 static void wbc_clean_moved_tree(wbc_data_t *wbc_data);
  95 
  96 static void wbc_activate_impl(spa_t *spa, boolean_t pool_creation);
  97 static wbc_block_t *wbc_create_block(wbc_data_t *wbc_data,
  98     const blkptr_t *bp);
  99 static void wbc_move_block(void *arg);
 100 static int wbc_move_block_impl(wbc_block_t *block);
 101 static int wbc_collect_special_blocks(dsl_pool_t *dp);
 102 static void wbc_close_window(wbc_data_t *wbc_data);
 103 static void wbc_write_update_window(void *void_avl_tree, dmu_tx_t *tx);
 104 
 105 static int wbc_io(wbc_io_type_t type, wbc_block_t *block, abd_t *data);
 106 static int wbc_blocks_compare(const void *arg1, const void *arg2);
 107 static int wbc_instances_compare(const void *arg1, const void *arg2);
 108 
 109 static void wbc_unregister_instance_impl(wbc_instance_t *wbc_instance,
 110     boolean_t rele_autosnap);
 111 static void wbc_unregister_instances(wbc_data_t *wbc_data);
 112 static wbc_instance_t *wbc_register_instance(wbc_data_t *wbc_data,
 113     objset_t *os);
 114 static void wbc_unregister_instance(wbc_data_t *wbc_data, objset_t *os,
 115     boolean_t rele_autosnap);
 116 static wbc_instance_t *wbc_lookup_instance(wbc_data_t *wbc_data,
 117     uint64_t ds_object, avl_index_t *where);
 118 static void wbc_rele_autosnaps(wbc_data_t *wbc_data, uint64_t txg_to_rele,
 119     boolean_t purge);
 120 
 121 void
 122 wbc_init(wbc_data_t *wbc_data, spa_t *spa)
 123 {
 124         (void) memset(wbc_data, 0, sizeof (wbc_data_t));
 125 
 126         wbc_data->wbc_spa = spa;
 127 
 128         mutex_init(&wbc_data->wbc_lock, NULL, MUTEX_DEFAULT, NULL);
 129         cv_init(&wbc_data->wbc_cv, NULL, CV_DEFAULT, NULL);
 130 
 131         avl_create(&wbc_data->wbc_blocks, wbc_blocks_compare,
 132             sizeof (wbc_block_t), offsetof(wbc_block_t, node));
 133         avl_create(&wbc_data->wbc_moved_blocks, wbc_blocks_compare,
 134             sizeof (wbc_block_t), offsetof(wbc_block_t, node));
 135         avl_create(&wbc_data->wbc_instances, wbc_instances_compare,
 136             sizeof (wbc_instance_t), offsetof(wbc_instance_t, node));
 137 
 138         wbc_data->wbc_instance_fini = taskq_create("wbc_instance_finalization",
 139             1, maxclsyspri, 50, INT_MAX, TASKQ_PREPOPULATE);
 140 }
 141 
 142 void
 143 wbc_fini(wbc_data_t *wbc_data)
 144 {
 145         taskq_wait(wbc_data->wbc_instance_fini);
 146         taskq_destroy(wbc_data->wbc_instance_fini);
 147 
 148         mutex_enter(&wbc_data->wbc_lock);
 149 
 150         wbc_clean_plan_tree(wbc_data);
 151         wbc_clean_moved_tree(wbc_data);
 152 
 153         avl_destroy(&wbc_data->wbc_blocks);
 154         avl_destroy(&wbc_data->wbc_moved_blocks);
 155         avl_destroy(&wbc_data->wbc_instances);
 156 
 157         mutex_exit(&wbc_data->wbc_lock);
 158 
 159         cv_destroy(&wbc_data->wbc_cv);
 160         mutex_destroy(&wbc_data->wbc_lock);
 161 
 162         wbc_data->wbc_spa = NULL;
 163 }
 164 
 165 #ifndef _KERNEL
 166 /*ARGSUSED*/
 167 static clock_t
 168 drv_usectohz(uint64_t time)
 169 {
 170         return (1000);
 171 }
 172 #endif
 173 
 174 static wbc_block_t *
 175 wbc_create_block(wbc_data_t *wbc_data, const blkptr_t *bp)
 176 {
 177         wbc_block_t *block;
 178 
 179         block = kmem_alloc(sizeof (*block), KM_NOSLEEP);
 180         if (block == NULL)
 181                 return (NULL);
 182 
 183         /*
 184          * Fill information describing data we need to move
 185          */
 186 #ifdef _KERNEL
 187         DTRACE_PROBE6(wbc_plan_block_data,
 188             uint64_t, BP_PHYSICAL_BIRTH(bp),
 189             uint64_t, DVA_GET_VDEV(&bp->blk_dva[0]),
 190             uint64_t, DVA_GET_OFFSET(&bp->blk_dva[0]),
 191             uint64_t, DVA_GET_VDEV(&bp->blk_dva[1]),
 192             uint64_t, DVA_GET_OFFSET(&bp->blk_dva[1]),
 193             uint64_t, BP_GET_PSIZE(bp));
 194 #endif
 195 
 196         mutex_init(&block->lock, NULL, MUTEX_DEFAULT, NULL);
 197         block->data = wbc_data;
 198         block->blk_prop = 0;
 199 
 200         block->dva[0] = bp->blk_dva[0];
 201         block->dva[1] = bp->blk_dva[1];
 202         block->btxg = BP_PHYSICAL_BIRTH(bp);
 203 
 204         WBCBP_SET_COMPRESS(block, BP_GET_COMPRESS(bp));
 205         WBCBP_SET_PSIZE(block, BP_GET_PSIZE(bp));
 206         WBCBP_SET_LSIZE(block, BP_GET_LSIZE(bp));
 207 
 208         return (block);
 209 }
 210 
 211 static void
 212 wbc_free_block(wbc_block_t *block)
 213 {
 214         mutex_destroy(&block->lock);
 215         kmem_free(block, sizeof (*block));
 216 }
 217 
 218 static void
 219 wbc_clean_tree(wbc_data_t *wbc_data, avl_tree_t *tree)
 220 {
 221         void *cookie = NULL;
 222         wbc_block_t *block = NULL;
 223 
 224         ASSERT(MUTEX_HELD(&wbc_data->wbc_lock));
 225 
 226         while ((block = avl_destroy_nodes(tree, &cookie)) != NULL)
 227                 wbc_free_block(block);
 228 }
 229 
 230 static void
 231 wbc_clean_plan_tree(wbc_data_t *wbc_data)
 232 {
 233         wbc_clean_tree(wbc_data, &wbc_data->wbc_blocks);
 234         wbc_data->wbc_blocks_count = 0;
 235 }
 236 
 237 static void
 238 wbc_clean_moved_tree(wbc_data_t *wbc_data)
 239 {
 240         wbc_clean_tree(wbc_data, &wbc_data->wbc_moved_blocks);
 241         wbc_data->wbc_blocks_mv = 0;
 242 }
 243 
 244 /* WBC-MOVE routines */
 245 
 246 /*
 247  * Writeback Cache Migration Tunables
 248  *
 249  * 1. wbc_idle_delay_ms - time to sleep when there are no blocks to move
 250  *    OR, when we need to update the current spa utilization by the user/app
 251  *
 252  * 2. wbc_throttle_move_delay_ms - sleep to abide by the maximum
 253  *    permitted rate of migration
 254  *
 255  * 3. wbc_update_statistics_interval_ms - pool utilization recompute interval
 256  *    (all tunables above are in milliseconds)
 257  *
 258  * 4. wbc_min_move_tasks_count & wbc_max_move_tasks_count: the min/max number
 259  *    of concurrent active taskq workers processing the blocks to be migrated
 260  *
 261  * 5. wbc_spa_util_low_wm & wbc_spa_util_high_wm - min/max spa utilization
 262  *    levels to control the rate of migration: low_wm corresponds to the
 263  *    highest rate, and vise versa.
 264  */
 265 uint64_t wbc_idle_delay_ms = 1000;
 266 uint64_t wbc_throttle_move_delay_ms = 10;
 267 uint64_t wbc_update_statistics_interval_ms = 60000;
 268 
 269 uint64_t wbc_min_move_tasks_count = 1;
 270 uint64_t wbc_max_move_tasks_count = 256;
 271 
 272 uint64_t wbc_spa_util_low_wm = 10;
 273 uint64_t wbc_spa_util_high_wm = 90;
 274 
 275 /*
 276  * Per-queue limits on the number of I/O's active to
 277  * each device from vdev_queue.c. Default value: 10.
 278  */
 279 extern uint32_t zfs_vdev_async_write_max_active;
 280 
 281 /*
 282  * Throtte special=>normal migration of collected blocks.
 283  * Returns B_TRUE indicating that the mover must slow down, B_FALSE otherwise.
 284  */
 285 static boolean_t
 286 wbc_throttle_move(wbc_data_t *wbc_data)
 287 {
 288         wbc_stat_t *wbc_stat = &wbc_data->wbc_stat;
 289         uint64_t spa_util = wbc_stat->wbc_spa_util;
 290         uint64_t blocks_in_progress = 0;
 291         uint64_t max_tasks = 0;
 292         uint64_t delta_tasks = 0;
 293 
 294         if (wbc_data->wbc_locked)
 295                 return (B_TRUE);
 296 
 297         /* get throttled by the taskq itself */
 298         if (spa_util < wbc_spa_util_low_wm)
 299                 return (B_FALSE);
 300 
 301         blocks_in_progress =
 302             wbc_data->wbc_blocks_out - wbc_data->wbc_blocks_mv;
 303 
 304         if (wbc_data->wbc_move_threads <= wbc_min_move_tasks_count)
 305                 return (blocks_in_progress > wbc_min_move_tasks_count);
 306 
 307         max_tasks = wbc_data->wbc_move_threads - wbc_min_move_tasks_count;
 308 
 309         spa_util = MIN(spa_util, wbc_spa_util_high_wm);
 310         spa_util = MAX(spa_util, wbc_spa_util_low_wm);
 311 
 312         /*
 313          * Number of concurrent taskq workers is:
 314          * min + throttle-defined delta
 315          */
 316         delta_tasks =
 317             max_tasks - max_tasks * (wbc_spa_util_high_wm - spa_util) /
 318             (wbc_spa_util_high_wm - wbc_spa_util_low_wm);
 319 
 320         DTRACE_PROBE4(wbc_throttle_move,
 321             spa_t *, wbc_data->wbc_spa,
 322             uint64_t, blocks_in_progress,
 323             uint64_t, max_tasks,
 324             uint64_t, delta_tasks);
 325 
 326         return (blocks_in_progress > (wbc_min_move_tasks_count + delta_tasks));
 327 }
 328 
 329 /*
 330  * Walk the WBC-collected-blocks AVL and for each WBC block (wbc_block_t):
 331  * 1. yank it from the collected-blocks AVL tree
 332  * 2. add it to the moved-blocks AVL tree
 333  * 3. dispatch taskq to execute the special=>normal migration
 334  * Break when either reaching an upper limit, in total bytes, or when
 335  * wbc_throttle_move() (the "throttler") wants us to slow-down
 336  */
 337 static void
 338 wbc_move_blocks_tree(wbc_data_t *wbc_data)
 339 {
 340         wbc_stat_t *wbc_stat = &wbc_data->wbc_stat;
 341         uint64_t written_bytes = 0;
 342         uint64_t active_txg = 0;
 343 
 344         mutex_enter(&wbc_data->wbc_lock);
 345         active_txg = wbc_data->wbc_finish_txg;
 346 
 347         for (;;) {
 348                 wbc_block_t *block = NULL;
 349 
 350                 if (wbc_data->wbc_thr_exit)
 351                         break;
 352 
 353                 /*
 354                  * Move the block to the tree of moved blocks
 355                  * and place into the queue of blocks to be
 356                  * physically moved
 357                  */
 358                 block = avl_first(&wbc_data->wbc_blocks);
 359                 if (block == NULL)
 360                         break;
 361 
 362                 wbc_data->wbc_blocks_count--;
 363                 ASSERT(wbc_data->wbc_blocks_count >= 0);
 364                 avl_remove(&wbc_data->wbc_blocks, block);
 365                 avl_add(&wbc_data->wbc_moved_blocks, block);
 366                 wbc_data->wbc_blocks_out++;
 367 
 368                 mutex_exit(&wbc_data->wbc_lock);
 369 
 370                 /* TQ_SLEEP guarantees the successful dispatching */
 371                 VERIFY(taskq_dispatch(wbc_data->wbc_move_taskq,
 372                     wbc_move_block, block, TQ_SLEEP) != 0);
 373 
 374                 written_bytes += WBCBP_GET_PSIZE(block);
 375 
 376                 mutex_enter(&wbc_data->wbc_lock);
 377 
 378                 if (active_txg != wbc_data->wbc_finish_txg)
 379                         break;
 380 
 381                 /*
 382                  * Update existing WBC statistics during
 383                  * the next wbc_move_begin() iteration
 384                  */
 385                 if (ddi_get_lbolt() - wbc_stat->wbc_stat_lbolt >
 386                     drv_usectohz(wbc_update_statistics_interval_ms * MILLISEC))
 387                         wbc_stat->wbc_stat_update = B_TRUE;
 388 
 389                 if (written_bytes > zfs_wbc_data_max ||
 390                     wbc_throttle_move(wbc_data))
 391                         break;
 392         }
 393 
 394         mutex_exit(&wbc_data->wbc_lock);
 395 
 396         DTRACE_PROBE2(wbc_move_blocks_tree,
 397             spa_t *, wbc_data->wbc_spa,
 398             uint64_t, written_bytes);
 399 }
 400 
 401 /*
 402  * Begin new writecache migration iteration.
 403  * Returns B_TRUE if the migration can proceed, B_FALSE otherwise.
 404  * Is called from the wbc_thread prior to moving the next batch
 405  * of blocks.
 406  *
 407  * Quick theory of operation:
 408  * 1. If the pool is idle we can allow ourselves to speed-up
 409  *    special => normal migration
 410  * 2. And vise versa, higher utilization of this spa under user
 411  *    workload must have /more/ system resources for itself
 412  * 3. Which means in turn less system resources for the writecache.
 413  * 4. Finally, since the pool's utilization is used to speed-up or
 414  *    slow down (throttle) migrations. measuring of this utilization
 415  *    must be done in isolation - that is, when writecache migration
 416  *    is either not running at all or contributes relatively
 417  *    little to the total utilization.
 418  *
 419  * In in this wbc_move_begin() we periodcially update wbc_spa_util
 420  * and use it to throttle writecache via wbc_throttle_move()
 421  *
 422  * Note that we actually sleep here based on the following tunables:
 423  *
 424  * 1. wbc_idle_delay_ms when there are no blocks to move
 425  *    OR, when we need to update the spa utilization by the user
 426  *
 427  * 2. sleep wbc_throttle_move_delay_ms when the throttling mechanism
 428  *    tells us to slow down
 429  */
 430 static boolean_t
 431 wbc_move_begin(wbc_data_t *wbc_data)
 432 {
 433         spa_t *spa = wbc_data->wbc_spa;
 434         wbc_stat_t *wbc_stat = &wbc_data->wbc_stat;
 435         spa_avg_stat_t *spa_stat = &spa->spa_avg_stat;
 436 
 437         for (;;) {
 438                 boolean_t throttle_move = B_FALSE;
 439                 boolean_t stat_update = B_FALSE;
 440                 uint64_t blocks_count = 0;
 441                 uint64_t delay = 0;
 442 
 443                 mutex_enter(&wbc_data->wbc_lock);
 444 
 445                 if (spa->spa_state == POOL_STATE_UNINITIALIZED ||
 446                     wbc_data->wbc_thr_exit) {
 447                         mutex_exit(&wbc_data->wbc_lock);
 448                         return (B_FALSE);
 449                 }
 450 
 451                 blocks_count = wbc_data->wbc_blocks_count;
 452                 throttle_move = wbc_throttle_move(wbc_data);
 453                 stat_update = wbc_stat->wbc_stat_update;
 454 
 455                 mutex_exit(&wbc_data->wbc_lock);
 456 
 457                 DTRACE_PROBE3(wbc_move_begin,
 458                     spa_t *, spa,
 459                     uint64_t, blocks_count,
 460                     boolean_t, throttle_move);
 461 
 462                 if (stat_update) {
 463                         /*
 464                          * Waits for all previously scheduled
 465                          * move tasks to complete
 466                          */
 467                         taskq_wait(wbc_data->wbc_move_taskq);
 468                         delay = wbc_idle_delay_ms;
 469                 } else if (blocks_count == 0) {
 470                         delay = wbc_idle_delay_ms;
 471                 } else if (throttle_move) {
 472                         delay = wbc_throttle_move_delay_ms;
 473                 } else {
 474                         return (B_TRUE);
 475                 }
 476 
 477                 mutex_enter(&wbc_data->wbc_lock);
 478 
 479                 /*
 480                  * Sleep wbc_idle_delay_ms when there are no blocks to move
 481                  * or when we need to update the spa utilization by the user.
 482                  * Sleep wbc_throttle_move_delay_ms when the throttling
 483                  * mechanism tells us to slow down.
 484                  */
 485                 (void) cv_timedwait(&wbc_data->wbc_cv,
 486                     &wbc_data->wbc_lock,
 487                     ddi_get_lbolt() + drv_usectohz(delay * MILLISEC));
 488 
 489                 /* Update WBC statistics after idle period */
 490                 if (wbc_stat->wbc_stat_update) {
 491                         DTRACE_PROBE2(wbc_move_begin_update_stat,
 492                             spa_t *, spa, uint64_t, spa_stat->spa_utilization);
 493                         wbc_stat->wbc_stat_update = B_FALSE;
 494                         wbc_stat->wbc_stat_lbolt = ddi_get_lbolt();
 495                         wbc_stat->wbc_spa_util = spa_stat->spa_utilization;
 496                 }
 497 
 498                 mutex_exit(&wbc_data->wbc_lock);
 499 
 500                 /* Return B_TRUE if the migration can proceed */
 501                 if (blocks_count > 0 && !throttle_move)
 502                         return (B_TRUE);
 503         }
 504 }
 505 
 506 /*
 507  * Thread to manage the data movement from
 508  * special devices to normal devices.
 509  * This thread runs as long as the spa is active.
 510  */
 511 static void
 512 wbc_thread(wbc_data_t *wbc_data)
 513 {
 514         spa_t *spa = wbc_data->wbc_spa;
 515         char tq_name[MAXPATHLEN];
 516 
 517         DTRACE_PROBE1(wbc_thread_start, spa_t *, spa);
 518 
 519         /* Prepare move queue and make the WBC active */
 520         (void) snprintf(tq_name, sizeof (tq_name),
 521             "%s_wbc_move", spa->spa_name);
 522 
 523         wbc_data->wbc_move_taskq = taskq_create(tq_name,
 524             wbc_data->wbc_move_threads, maxclsyspri,
 525             50, INT_MAX, TASKQ_PREPOPULATE);
 526 
 527         /* Main dispatch loop */
 528         for (;;) {
 529                 if (!wbc_move_begin(wbc_data))
 530                         break;
 531 
 532                 wbc_move_blocks_tree(wbc_data);
 533         }
 534 
 535         taskq_wait(wbc_data->wbc_move_taskq);
 536         taskq_destroy(wbc_data->wbc_move_taskq);
 537 
 538         wbc_data->wbc_thread = NULL;
 539         DTRACE_PROBE1(wbc_thread_done, spa_t *, spa);
 540         thread_exit();
 541 }
 542 
 543 static uint64_t wbc_fault_limit = 10;
 544 
 545 typedef struct {
 546         void *buf;
 547         int len;
 548 } wbc_arc_bypass_t;
 549 
 550 static int
 551 wbc_arc_bypass_cb(void *buf, int len, void *arg)
 552 {
 553         wbc_arc_bypass_t *bypass = arg;
 554 
 555         bypass->len = len;
 556 
 557         (void) memcpy(bypass->buf, buf, len);
 558 
 559         return (0);
 560 }
 561 
 562 /*
 563  * FIXME: Temporary disabled because this logic
 564  * needs to be adjusted according to ARC-Compression changes
 565  */
 566 uint64_t wbc_arc_enabled = 0;
 567 
 568 /*
 569  * Moves blocks from a special device to other devices in a pool.
 570  */
 571 void
 572 wbc_move_block(void *arg)
 573 {
 574         wbc_block_t *block = arg;
 575         wbc_data_t *wbc_data = block->data;
 576         spa_t *spa = wbc_data->wbc_spa;
 577         int err = 0;
 578 
 579         if (wbc_data->wbc_purge || !wbc_data->wbc_isvalid) {
 580                 atomic_inc_64(&wbc_data->wbc_blocks_mv);
 581                 return;
 582         }
 583 
 584         err = wbc_move_block_impl(block);
 585         if (err == 0) {
 586                 atomic_inc_64(&wbc_data->wbc_blocks_mv);
 587         } else {
 588                 /* io error occured */
 589                 if (++wbc_data->wbc_fault_moves >= wbc_fault_limit) {
 590                         /* error limit exceeded - disable WBC */
 591                         cmn_err(CE_WARN,
 592                             "WBC: can not move data on %s with error[%d]. "
 593                             "Current window will be purged\n",
 594                             spa->spa_name, err);
 595 
 596                         mutex_enter(&wbc_data->wbc_lock);
 597                         wbc_purge_window(spa, NULL);
 598                         mutex_exit(&wbc_data->wbc_lock);
 599                 } else {
 600                         cmn_err(CE_WARN,
 601                             "WBC: can not move data on %s with error[%d]\n"
 602                             "WBC: retry block (fault limit: %llu/%llu)",
 603                             spa->spa_name, err,
 604                             (unsigned long long) wbc_data->wbc_fault_moves,
 605                             (unsigned long long) wbc_fault_limit);
 606 
 607                         /*
 608                          * re-plan the block with the highest priority and
 609                          * try to move it again
 610                          *
 611                          * TQ_SLEEP guarantees the successful dispatching
 612                          */
 613                         VERIFY(taskq_dispatch(wbc_data->wbc_move_taskq,
 614                             wbc_move_block, block, TQ_SLEEP | TQ_FRONT) != 0);
 615                 }
 616         }
 617 }
 618 
 619 static int
 620 wbc_move_block_impl(wbc_block_t *block)
 621 {
 622         abd_t *buf;
 623         int err = 0;
 624         wbc_data_t *wbc_data = block->data;
 625         spa_t *spa = wbc_data->wbc_spa;
 626 
 627         if (WBCBP_IS_DELETED(block))
 628                 return (0);
 629 
 630         spa_config_enter(spa, SCL_VDEV | SCL_STATE_ALL, FTAG, RW_READER);
 631 
 632         buf = abd_alloc_for_io(WBCBP_GET_PSIZE(block), B_FALSE);
 633 
 634         /* FIXME: This needs to be fixed as part of NEX-14168 */
 635 #if 0
 636         if (wbc_arc_enabled) {
 637                 blkptr_t pseudo_bp = { 0 };
 638                 wbc_arc_bypass_t bypass = { 0 };
 639                 void *dbuf = NULL;
 640 
 641                 if (WBCBP_GET_COMPRESS(block) != ZIO_COMPRESS_OFF) {
 642                         dbuf = zio_data_buf_alloc(WBCBP_GET_LSIZE(block));
 643                         bypass.buf = dbuf;
 644                 } else {
 645                         bypass.buf = buf;
 646                 }
 647 
 648                 pseudo_bp.blk_dva[0] = block->dva[0];
 649                 pseudo_bp.blk_dva[1] = block->dva[1];
 650                 BP_SET_BIRTH(&pseudo_bp, block->btxg, block->btxg);
 651 
 652                 mutex_enter(&block->lock);
 653                 if (WBCBP_IS_DELETED(block)) {
 654                         if (WBCBP_GET_COMPRESS(block) != ZIO_COMPRESS_OFF)
 655                                 zio_data_buf_free(dbuf, WBCBP_GET_LSIZE(block));
 656 
 657                         goto out;
 658                 }
 659 
 660                 err = arc_io_bypass(spa, &pseudo_bp,
 661                     wbc_arc_bypass_cb, &bypass);
 662 
 663                 if (!err && WBCBP_GET_COMPRESS(block) != ZIO_COMPRESS_OFF) {
 664                         size_t size = zio_compress_data(
 665                             (enum zio_compress)WBCBP_GET_COMPRESS(block),
 666                             dbuf, buf, bypass.len);
 667                         size_t rounded =
 668                             P2ROUNDUP(size, (size_t)SPA_MINBLOCKSIZE);
 669                         if (rounded != WBCBP_GET_PSIZE(block)) {
 670                                 /* random error to get to slow path */
 671                                 err = ERANGE;
 672                                 cmn_err(CE_WARN, "WBC WARN: ARC COMPRESSION "
 673                                     "FAILED: %u %u %u",
 674                                     (unsigned)size,
 675                                     (unsigned)WBCBP_GET_PSIZE(block),
 676                                     (unsigned)WBCBP_GET_COMPRESS(block));
 677                         } else if (rounded > size) {
 678                                 bzero((char *)buf + size, rounded - size);
 679                         }
 680                 }
 681 
 682                 if (WBCBP_GET_COMPRESS(block) != ZIO_COMPRESS_OFF)
 683                         zio_data_buf_free(dbuf, WBCBP_GET_LSIZE(block));
 684 
 685         } else {
 686                 err = ENOTSUP;
 687                 mutex_enter(&block->lock);
 688                 if (WBCBP_IS_DELETED(block))
 689                         goto out;
 690         }
 691 #endif
 692         /*
 693          * This code should be removed after
 694          * uncomment the above "if 0 - endif"
 695          */
 696         err = ENOTSUP;
 697         mutex_enter(&block->lock);
 698         if (WBCBP_IS_DELETED(block))
 699                 goto out;
 700 
 701 
 702         /*
 703          * Any error means that arc read failed and block is being moved via
 704          * slow path
 705          */
 706         if (err != 0) {
 707                 err = wbc_io(WBC_READ_FROM_SPECIAL, block, buf);
 708                 if (err != 0) {
 709                         cmn_err(CE_WARN, "WBC: move task has failed to read:"
 710                             " error [%d]", err);
 711                         goto out;
 712                 }
 713 
 714                 DTRACE_PROBE(wbc_move_from_disk);
 715         } else {
 716                 DTRACE_PROBE(wbc_move_from_arc);
 717         }
 718 
 719         err = wbc_io(WBC_WRITE_TO_NORMAL, block, buf);
 720         if (err) {
 721                 cmn_err(CE_WARN, "WBC: move task has failed to write: "
 722                     "error [%d]", err);
 723                 goto out;
 724         }
 725 
 726 #ifdef _KERNEL
 727         DTRACE_PROBE5(wbc_move_block_data,
 728             uint64_t, DVA_GET_VDEV(&block->dva[0]),
 729             uint64_t, DVA_GET_OFFSET(&block->dva[0]),
 730             uint64_t, DVA_GET_VDEV(&block->dva[1]),
 731             uint64_t, DVA_GET_OFFSET(&block->dva[1]),
 732             uint64_t, WBCBP_GET_PSIZE(block));
 733 #endif
 734 
 735 out:
 736         mutex_exit(&block->lock);
 737         abd_free(buf);
 738 
 739         spa_config_exit(spa, SCL_VDEV | SCL_STATE_ALL, FTAG);
 740 
 741         return (err);
 742 }
 743 
 744 /* WBC-WALK routines */
 745 
 746 int
 747 wbc_walk_lock(spa_t *spa)
 748 {
 749         wbc_data_t *wbc_data = spa_get_wbc_data(spa);
 750 
 751         mutex_enter(&wbc_data->wbc_lock);
 752         while (wbc_data->wbc_locked)
 753                 (void) cv_wait(&wbc_data->wbc_cv, &wbc_data->wbc_lock);
 754         if (wbc_data->wbc_thr_exit) {
 755                 mutex_exit(&wbc_data->wbc_lock);
 756                 return (ENOLCK);
 757         }
 758 
 759         wbc_data->wbc_locked = B_TRUE;
 760         while (wbc_data->wbc_walking)
 761                 (void) cv_wait(&wbc_data->wbc_cv, &wbc_data->wbc_lock);
 762         if (wbc_data->wbc_thr_exit) {
 763                 mutex_exit(&wbc_data->wbc_lock);
 764                 return (ENOLCK);
 765         }
 766 
 767         cv_broadcast(&wbc_data->wbc_cv);
 768         mutex_exit(&wbc_data->wbc_lock);
 769 
 770         return (0);
 771 }
 772 
 773 void
 774 wbc_walk_unlock(spa_t *spa)
 775 {
 776         wbc_data_t *wbc_data = spa_get_wbc_data(spa);
 777         mutex_enter(&wbc_data->wbc_lock);
 778         wbc_data->wbc_locked = B_FALSE;
 779         cv_broadcast(&wbc_data->wbc_cv);
 780         mutex_exit(&wbc_data->wbc_lock);
 781 }
 782 
 783 /* thread to collect blocks that must be moved */
 784 static void
 785 wbc_walk_thread(wbc_data_t *wbc_data)
 786 {
 787         spa_t *spa = wbc_data->wbc_spa;
 788         int err = 0;
 789 
 790         DTRACE_PROBE1(wbc_walk_thread_start, char *, spa->spa_name);
 791 
 792         for (;;) {
 793                 err = 0;
 794                 mutex_enter(&wbc_data->wbc_lock);
 795 
 796                 wbc_data->wbc_walking = B_FALSE;
 797 
 798                 cv_broadcast(&wbc_data->wbc_cv);
 799 
 800                 /* Set small wait time to delay walker restart */
 801                 do {
 802                         (void) cv_timedwait(&wbc_data->wbc_cv,
 803                             &wbc_data->wbc_lock,
 804                             ddi_get_lbolt() + hz / 4);
 805                 } while (spa->spa_state == POOL_STATE_UNINITIALIZED &&
 806                     !wbc_data->wbc_thr_exit);
 807 
 808                 if (wbc_data->wbc_thr_exit || !spa->spa_dsl_pool) {
 809                         mutex_exit(&wbc_data->wbc_lock);
 810                         break;
 811                 }
 812 
 813                 wbc_data->wbc_walking = B_TRUE;
 814 
 815                 cv_broadcast(&wbc_data->wbc_cv);
 816 
 817                 mutex_exit(&wbc_data->wbc_lock);
 818 
 819                 err = wbc_collect_special_blocks(spa->spa_dsl_pool);
 820                 if (err != 0) {
 821                         cmn_err(CE_WARN, "WBC: can not "
 822                             "traverse pool: error [%d]. "
 823                             "Current window will be purged\n", err);
 824 
 825                         wbc_purge_window(spa, NULL);
 826                 }
 827         }
 828 
 829         wbc_data->wbc_walk_thread = NULL;
 830 
 831         DTRACE_PROBE1(wbc_walk_thread_done, char *, spa->spa_name);
 832 
 833         thread_exit();
 834 }
 835 
 836 int wbc_force_trigger = 1;
 837 /*
 838  * This function triggers the write cache thread if the past
 839  * two sync context dif not sync more than 1/8th of
 840  * zfs_dirty_data_sync.
 841  * This function is called only if the current sync context
 842  * did not sync more than 1/16th of zfs_dirty_data_sync.
 843  */
 844 void
 845 wbc_trigger_wbcthread(spa_t *spa, uint64_t prev_sync_avg)
 846 {
 847         wbc_data_t *wbc_data = spa_get_wbc_data(spa);
 848 
 849         /*
 850          * Using mutex_tryenter() because if the worker is
 851          * holding the mutex, it is already up, no need
 852          * to cv_signal()
 853          */
 854         if ((wbc_force_trigger || prev_sync_avg < zfs_dirty_data_sync / 8) &&
 855             mutex_tryenter(&wbc_data->wbc_lock)) {
 856                 if (wbc_data->wbc_blocks_count != 0) {
 857                         DTRACE_PROBE1(wbc_trigger_worker, char *,
 858                             spa->spa_name);
 859                         cv_signal(&wbc_data->wbc_cv);
 860                 }
 861                 mutex_exit(&wbc_data->wbc_lock);
 862         }
 863 }
 864 
 865 static boolean_t
 866 wbc_should_pause_scanblocks(dsl_pool_t *dp,
 867     wbc_parseblock_cb_t *cbd, const zbookmark_phys_t *zb)
 868 {
 869         hrtime_t elapsed_ns;
 870 
 871         /*
 872          * We know how to resume iteration on level 0
 873          * blocks only
 874          */
 875         if (zb->zb_level != 0)
 876                 return (B_FALSE);
 877 
 878         /* We're resuming */
 879         if (!ZB_IS_ZERO(&cbd->zb))
 880                 return (B_FALSE);
 881 
 882         /*
 883          * We should stop if either traversal time
 884          * took more than zfs_txg_timeout or it took
 885          * more zfs_scan_min_time while somebody is waiting
 886          * for our transaction group.
 887          */
 888         elapsed_ns = gethrtime() - cbd->start_time;
 889         if (elapsed_ns / NANOSEC > zfs_txg_timeout ||
 890             (elapsed_ns / MICROSEC > zfs_scan_min_time_ms &&
 891             txg_sync_waiting(dp)) || spa_shutting_down(dp->dp_spa))
 892                 return (B_TRUE);
 893 
 894         return (B_FALSE);
 895 }
 896 
 897 /*
 898  * Callback passed in traversal function. Checks whether block is
 899  * special and hence should be planned for move
 900  */
 901 /* ARGSUSED */
 902 int
 903 wbc_traverse_ds_cb(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
 904     const zbookmark_phys_t *zb, const dnode_phys_t *dnp, void *arg)
 905 {
 906         wbc_data_t *wbc_data = spa_get_wbc_data(spa);
 907         wbc_parseblock_cb_t *cbd = arg;
 908         wbc_block_t *block, *found_block;
 909         avl_index_t where = NULL;
 910         boolean_t increment_counters = B_FALSE;
 911 
 912         /* skip ZIL blocks */
 913         if (bp == NULL || zb->zb_level == ZB_ZIL_LEVEL)
 914                 return (0);
 915 
 916         if (!BP_IS_SPECIAL(bp))
 917                 return (0);
 918 
 919         mutex_enter(&wbc_data->wbc_lock);
 920 
 921         if (wbc_data->wbc_thr_exit) {
 922                 mutex_exit(&wbc_data->wbc_lock);
 923                 return (ERESTART);
 924         }
 925 
 926         if (cbd->actv_txg != wbc_data->wbc_finish_txg) {
 927                 mutex_exit(&wbc_data->wbc_lock);
 928                 return (ERESTART);
 929         }
 930 
 931         if (wbc_should_pause_scanblocks(spa->spa_dsl_pool, cbd, zb)) {
 932                 mutex_exit(&wbc_data->wbc_lock);
 933                 return (ERESTART);
 934         }
 935 
 936         /*
 937          * If dedup is enabled then travesal gives us the original block,
 938          * that already moved as part of previous WBC-win.
 939          * So just skip it.
 940          */
 941         if (BP_PHYSICAL_BIRTH(bp) < wbc_data->wbc_start_txg) {
 942                 mutex_exit(&wbc_data->wbc_lock);
 943                 return (0);
 944         }
 945 
 946         block = wbc_create_block(wbc_data, bp);
 947         if (block == NULL) {
 948                 mutex_exit(&wbc_data->wbc_lock);
 949                 return (ERESTART);
 950         }
 951 
 952         /*
 953          * Before add the block to the tree of planned tree need
 954          * to check that:
 955          *  - a block with the same DVA is not contained in one of
 956          *  out trees (planned of moved)
 957          *  - a block is contained in a tree, so need to check that:
 958          *              - DVA already freed: need to free the corresponding
 959          *              wbc_block and add new wbc_block to
 960          *              the tree of planned blocks. This is possible if
 961          *              DVA was freed and later allocated for another data.
 962          *
 963          *              - DVA still allocated: is not required to add
 964          *              the new block to the tree of planned blocks,
 965          *              so just free it. This is possible if deduplication
 966          *              is enabled
 967          */
 968         found_block = avl_find(&wbc_data->wbc_moved_blocks, block, NULL);
 969         if (found_block != NULL) {
 970                 if (WBCBP_IS_DELETED(found_block)) {
 971                         avl_remove(&wbc_data->wbc_moved_blocks, found_block);
 972                         wbc_free_block(found_block);
 973                         goto insert;
 974                 } else {
 975                         wbc_free_block(block);
 976                         goto out;
 977                 }
 978         }
 979 
 980         found_block = avl_find(&wbc_data->wbc_blocks, block, &where);
 981         if (found_block != NULL) {
 982                 if (WBCBP_IS_DELETED(found_block)) {
 983                         avl_remove(&wbc_data->wbc_blocks, found_block);
 984                         wbc_free_block(found_block);
 985                         goto insert;
 986                 } else {
 987                         wbc_free_block(block);
 988                         goto out;
 989                 }
 990         }
 991 
 992         increment_counters = B_TRUE;
 993 
 994 insert:
 995         avl_insert(&wbc_data->wbc_blocks, block, where);
 996         cbd->bt_size += WBCBP_GET_PSIZE(block);
 997         if (increment_counters) {
 998                 wbc_data->wbc_blocks_count++;
 999                 wbc_data->wbc_blocks_in++;
1000         }
1001 
1002 out:
1003         mutex_exit(&wbc_data->wbc_lock);
1004 
1005         return (0);
1006 }
1007 
1008 /*
1009  * Iterate through data blocks on a "special" device and collect those
1010  * ones that can be moved to other devices in a pool.
1011  *
1012  * XXX: For now we collect as many blocks as possible in order to dispatch
1013  * them to the taskq later. It may be reasonable to invent a mechanism
1014  * which will allow not to store the whole `moving` tree in-core
1015  * (persistent move bookmark, for example)
1016  */
1017 int
1018 wbc_collect_special_blocks(dsl_pool_t *dp)
1019 {
1020         spa_t *spa = dp->dp_spa;
1021         wbc_data_t *wbc_data = spa_get_wbc_data(spa);
1022         wbc_parseblock_cb_t cb_data;
1023         int err = 0;
1024         hrtime_t scan_start;
1025         uint64_t diff;
1026 
1027         if (!zfs_wbc_schedtmo)
1028                 zfs_wbc_schedtmo = zfs_txg_timeout * 2;
1029 
1030         scan_start = gethrtime();
1031         diff = scan_start - dp->dp_spec_rtime;
1032         if (diff / NANOSEC < zfs_wbc_schedtmo)
1033                 return (0);
1034 
1035         cb_data.wbc_data = wbc_data;
1036         cb_data.zb = spa->spa_lszb;
1037         cb_data.start_time = scan_start;
1038         cb_data.actv_txg = wbc_data->wbc_finish_txg;
1039         cb_data.bt_size = 0ULL;
1040 
1041         /*
1042          * Traverse the range of txg to collect blocks
1043          */
1044         if (wbc_data->wbc_walk && wbc_data->wbc_finish_txg) {
1045                 if (krrp_debug) {
1046                         cmn_err(CE_NOTE, "WBC: new window (%llu; %llu)",
1047                             (unsigned long long)wbc_data->wbc_start_txg,
1048                             (unsigned long long)wbc_data->wbc_finish_txg);
1049                 }
1050                 err = traverse_pool(spa, wbc_data->wbc_start_txg - 1,
1051                     wbc_data->wbc_finish_txg + 1,
1052                     TRAVERSE_PREFETCH_METADATA | TRAVERSE_POST,
1053                     wbc_traverse_ds_cb, &cb_data, &cb_data.zb);
1054         }
1055 
1056         spa->spa_lszb = cb_data.zb;
1057         if (err != ERESTART && err != EAGAIN && (cb_data.bt_size == 0ULL) ||
1058             ZB_IS_ZERO(&cb_data.zb)) {
1059                 /*
1060                  * No more blocks to move or error state
1061                  */
1062                 mutex_enter(&wbc_data->wbc_lock);
1063                 wbc_data->wbc_walk = B_FALSE;
1064                 if (err) {
1065                         /*
1066                          * Something went wrong during the traversing
1067                          */
1068                         if (wbc_data->wbc_thr_exit) {
1069                                 mutex_exit(&wbc_data->wbc_lock);
1070                                 return (0);
1071                         }
1072 
1073                         cmn_err(CE_WARN,
1074                             "WBC: Can not collect data "
1075                             "because of error [%d]", err);
1076 
1077                         wbc_purge_window(spa, NULL);
1078                         wbc_data->wbc_wait_for_window = B_TRUE;
1079                         mutex_exit(&wbc_data->wbc_lock);
1080 
1081                         err = 0;
1082                 } else if (wbc_data->wbc_blocks_in == wbc_data->wbc_blocks_mv &&
1083                     !wbc_data->wbc_purge) {
1084                         /* Everything is moved, close the window */
1085                         if (wbc_data->wbc_finish_txg != 0)
1086                                 wbc_close_window(wbc_data);
1087 
1088                         /*
1089                          * Process of the window closing might be
1090                          * interrupted by wbc_purge_window()
1091                          * (e.g., when the pool gets destroyed, etc.)
1092                          * If this is the case we simply return. New
1093                          * WBC window will be opened later upon completion
1094                          * of the purge..
1095                          */
1096                         if (wbc_data->wbc_purge) {
1097                                 mutex_exit(&wbc_data->wbc_lock);
1098                                 return (0);
1099                         }
1100 
1101 
1102                         /* Say to others that walking stopped */
1103                         wbc_data->wbc_walking = B_FALSE;
1104                         wbc_data->wbc_wait_for_window = B_TRUE;
1105                         cv_broadcast(&wbc_data->wbc_cv);
1106 
1107                         /* and wait until a new window appears */
1108                         while (!wbc_data->wbc_walk && !wbc_data->wbc_thr_exit) {
1109                                 cv_wait(&wbc_data->wbc_cv,
1110                                     &wbc_data->wbc_lock);
1111                         }
1112 
1113                         if (wbc_data->wbc_thr_exit) {
1114                                 mutex_exit(&wbc_data->wbc_lock);
1115                                 return (0);
1116                         }
1117 
1118                         mutex_exit(&wbc_data->wbc_lock);
1119 
1120                         dsl_sync_task(spa->spa_name, NULL,
1121                             wbc_write_update_window, NULL,
1122                             ZFS_SPACE_CHECK_NONE, 0);
1123                 } else {
1124                         mutex_exit(&wbc_data->wbc_lock);
1125                 }
1126 
1127 
1128         } else if (err == ERESTART) {
1129                 /*
1130                  * We were interrupted, the iteration will be
1131                  * resumed later.
1132                  */
1133                 DTRACE_PROBE2(traverse__intr, spa_t *, spa,
1134                     wbc_parseblock_cb_t *, &cb_data);
1135                 err = 0;
1136         }
1137 
1138         dp->dp_spec_rtime = gethrtime();
1139 
1140         return (err);
1141 }
1142 
1143 /* WBC-THREAD_CONTROL */
1144 
1145 /* Starts WBC threads and set associated structures */
1146 void
1147 wbc_start_thread(spa_t *spa)
1148 {
1149         wbc_data_t *wbc_data = spa_get_wbc_data(spa);
1150         boolean_t lock_held;
1151 
1152         ASSERT(strcmp(spa->spa_name, TRYIMPORT_NAME) != 0);
1153         ASSERT(wbc_data->wbc_isvalid);
1154 
1155         lock_held = MUTEX_HELD(&wbc_data->wbc_lock);
1156         if (!lock_held)
1157                 mutex_enter(&wbc_data->wbc_lock);
1158 
1159         if (wbc_data->wbc_thread == NULL && wbc_data->wbc_walk_thread == NULL) {
1160                 wbc_data->wbc_thr_exit = B_FALSE;
1161 #ifdef _KERNEL
1162                 wbc_data->wbc_thread = thread_create(NULL, 0,
1163                     wbc_thread, wbc_data, 0, &p0, TS_RUN, maxclsyspri);
1164                 wbc_data->wbc_walk_thread = thread_create(NULL, 0,
1165                     wbc_walk_thread, wbc_data, 0, &p0, TS_RUN, maxclsyspri);
1166                 spa_start_perfmon_thread(spa);
1167 #endif
1168         }
1169 
1170         wbc_data->wbc_wait_for_window = B_TRUE;
1171         if (!lock_held)
1172                 mutex_exit(&wbc_data->wbc_lock);
1173 }
1174 
1175 /* Disables WBC thread and reset associated data structures */
1176 boolean_t
1177 wbc_stop_thread(spa_t *spa)
1178 {
1179         wbc_data_t *wbc_data = spa_get_wbc_data(spa);
1180         boolean_t stop = B_FALSE;
1181 
1182         stop |= spa_stop_perfmon_thread(spa);
1183         mutex_enter(&wbc_data->wbc_lock);
1184 
1185         /*
1186          * We do not want to wait the finishing of migration,
1187          * because it can take a long time
1188          */
1189         wbc_purge_window(spa, NULL);
1190         wbc_data->wbc_wait_for_window = B_FALSE;
1191 
1192         if (wbc_data->wbc_thread != NULL || wbc_data->wbc_walk_thread != NULL) {
1193                 wbc_data->wbc_thr_exit = B_TRUE;
1194                 cv_broadcast(&wbc_data->wbc_cv);
1195                 mutex_exit(&wbc_data->wbc_lock);
1196 #ifdef _KERNEL
1197                 if (wbc_data->wbc_thread)
1198                         thread_join(wbc_data->wbc_thread->t_did);
1199                 if (wbc_data->wbc_walk_thread)
1200                         thread_join(wbc_data->wbc_walk_thread->t_did);
1201 #endif
1202                 mutex_enter(&wbc_data->wbc_lock);
1203                 wbc_data->wbc_thread = NULL;
1204                 wbc_data->wbc_walk_thread = NULL;
1205                 stop |= B_TRUE;
1206         }
1207 
1208         wbc_clean_plan_tree(wbc_data);
1209         wbc_clean_moved_tree(wbc_data);
1210 
1211         mutex_exit(&wbc_data->wbc_lock);
1212 
1213         return (stop);
1214 }
1215 
1216 /* WBC-WND routines */
1217 
1218 #define DMU_POOL_WBC_START_TXG "wbc_start_txg"
1219 #define DMU_POOL_WBC_FINISH_TXG "wbc_finish_txg"
1220 #define DMU_POOL_WBC_TO_RELE_TXG "wbc_to_rele_txg"
1221 #define DMU_POOL_WBC_STATE_DELETE "wbc_state_delete"
1222 
1223 /* On-disk WBC parameters alternation */
1224 
1225 static void
1226 wbc_set_state_delete(void *void_spa, dmu_tx_t *tx)
1227 {
1228         uint64_t upd = 1;
1229         spa_t *spa = void_spa;
1230 
1231         (void) zap_update(spa->spa_dsl_pool->dp_meta_objset,
1232             DMU_POOL_DIRECTORY_OBJECT,
1233             DMU_POOL_WBC_STATE_DELETE, sizeof (uint64_t), 1, &upd, tx);
1234 }
1235 
1236 static void
1237 wbc_clean_state_delete(void *void_spa, dmu_tx_t *tx)
1238 {
1239         uint64_t upd = 0;
1240         spa_t *spa = void_spa;
1241 
1242         (void) zap_update(spa->spa_dsl_pool->dp_meta_objset,
1243             DMU_POOL_DIRECTORY_OBJECT,
1244             DMU_POOL_WBC_STATE_DELETE, sizeof (uint64_t), 1, &upd, tx);
1245 }
1246 
1247 static void
1248 wbc_free_special_dvas(spa_t *spa, avl_tree_t *tree_to_clean, uint64_t txg)
1249 {
1250         wbc_block_t *node;
1251         void *cookie = NULL;
1252 
1253         /*
1254          * Clean the tree of moved blocks, free special dva and
1255          * wbc_block structure of every block in the tree
1256          */
1257         spa_config_enter(spa, SCL_VDEV, FTAG, RW_READER);
1258 
1259         while ((node = avl_destroy_nodes(tree_to_clean, &cookie)) != NULL) {
1260                 if (!WBCBP_IS_DELETED(node)) {
1261                         metaslab_free_dva(spa, &node->dva[WBC_SPECIAL_DVA],
1262                             txg, B_FALSE);
1263                 }
1264 
1265                 wbc_free_block(node);
1266         }
1267 
1268         spa_config_exit(spa, SCL_VDEV, FTAG);
1269 }
1270 
1271 static void
1272 wbc_write_update_window(void *void_avl_tree, dmu_tx_t *tx)
1273 {
1274         spa_t *spa = dmu_tx_pool(tx)->dp_spa;
1275         wbc_data_t *wbc_data = spa_get_wbc_data(spa);
1276         avl_tree_t *tree_to_clean = void_avl_tree;
1277 
1278         if (tree_to_clean != NULL)
1279                 wbc_free_special_dvas(spa, tree_to_clean, tx->tx_txg);
1280 
1281         if (wbc_data->wbc_finish_txg == 0) {
1282                 /*
1283                  * The "delete" state is not valid,
1284                  * because window has been closed or purged
1285                  */
1286                 wbc_clean_state_delete(spa, tx);
1287         }
1288 
1289         (void) zap_update(spa->spa_dsl_pool->dp_meta_objset,
1290             DMU_POOL_DIRECTORY_OBJECT,
1291             DMU_POOL_WBC_START_TXG, sizeof (uint64_t), 1,
1292             &wbc_data->wbc_start_txg, tx);
1293         (void) zap_update(spa->spa_dsl_pool->dp_meta_objset,
1294             DMU_POOL_DIRECTORY_OBJECT,
1295             DMU_POOL_WBC_FINISH_TXG, sizeof (uint64_t), 1,
1296             &wbc_data->wbc_finish_txg, tx);
1297         (void) zap_update(spa->spa_dsl_pool->dp_meta_objset,
1298             DMU_POOL_DIRECTORY_OBJECT,
1299             DMU_POOL_WBC_TO_RELE_TXG, sizeof (uint64_t), 1,
1300             &wbc_data->wbc_txg_to_rele, tx);
1301 }
1302 
1303 static void
1304 wbc_close_window_impl(spa_t *spa, avl_tree_t *tree)
1305 {
1306         wbc_data_t *wbc_data = spa_get_wbc_data(spa);
1307         dmu_tx_t *tx;
1308         int err;
1309         uint64_t txg;
1310 
1311         ASSERT(MUTEX_HELD(&wbc_data->wbc_lock));
1312 
1313         wbc_data->wbc_delete = B_TRUE;
1314 
1315         mutex_exit(&wbc_data->wbc_lock);
1316         /*
1317          * Set flag that WBC has finished moving the window and
1318          * freeing special dvas now
1319          */
1320         dsl_sync_task(spa->spa_name, NULL,
1321             wbc_set_state_delete, spa, 0, ZFS_SPACE_CHECK_NONE);
1322 
1323         tx = dmu_tx_create_dd(spa->spa_dsl_pool->dp_mos_dir);
1324         err = dmu_tx_assign(tx, TXG_WAIT);
1325 
1326         VERIFY(err == 0);
1327 
1328         txg = tx->tx_txg;
1329 
1330         mutex_enter(&wbc_data->wbc_lock);
1331 
1332         /*
1333          * There was a purge while delete state was being written
1334          * Everything is reset so no frees are required or allowed
1335          */
1336         if (wbc_data->wbc_delete == B_FALSE) {
1337                 dmu_tx_commit(tx);
1338                 return;
1339         }
1340 
1341         /* Move left boundary of the window and reset the right one */
1342         wbc_data->wbc_start_txg = wbc_data->wbc_finish_txg + 1;
1343         wbc_data->wbc_finish_txg = 0;
1344         wbc_data->wbc_txg_to_rele = 0;
1345         wbc_data->wbc_roll_threshold = wbc_mv_cancel_threshold_initial;
1346         wbc_data->wbc_delete = B_FALSE;
1347 
1348         wbc_data->wbc_blocks_mv_last = wbc_data->wbc_blocks_mv;
1349 
1350         wbc_data->wbc_blocks_in = 0;
1351         wbc_data->wbc_blocks_out = 0;
1352         wbc_data->wbc_blocks_mv = 0;
1353 
1354         /* Write down new boundaries */
1355         dsl_sync_task_nowait(spa->spa_dsl_pool,
1356             wbc_write_update_window, tree, 0, ZFS_SPACE_CHECK_NONE, tx);
1357         dmu_tx_commit(tx);
1358 
1359         mutex_exit(&wbc_data->wbc_lock);
1360 
1361         /* Wait frees and WBC parameters to be synced to disk */
1362         txg_wait_synced(spa->spa_dsl_pool, txg);
1363 
1364         mutex_enter(&wbc_data->wbc_lock);
1365 }
1366 
1367 /* Close the WBC window and release the snapshot of its right boundary */
1368 static void
1369 wbc_close_window(wbc_data_t *wbc_data)
1370 {
1371         spa_t *spa = wbc_data->wbc_spa;
1372         uint64_t txg_to_rele = wbc_data->wbc_txg_to_rele;
1373 
1374         ASSERT(MUTEX_HELD(&wbc_data->wbc_lock));
1375 
1376         ASSERT0(wbc_data->wbc_blocks_count);
1377         ASSERT(avl_is_empty(&wbc_data->wbc_blocks));
1378 
1379         VERIFY(wbc_data->wbc_finish_txg != 0);
1380 
1381         if (krrp_debug) {
1382                 cmn_err(CE_NOTE, "WBC: window (%llu; %llu) has been completed\n"
1383                     "WBC: %llu blocks moved",
1384                     (unsigned long long)wbc_data->wbc_start_txg,
1385                     (unsigned long long)wbc_data->wbc_finish_txg,
1386                     (unsigned long long)wbc_data->wbc_blocks_mv);
1387                 VERIFY(wbc_data->wbc_blocks_mv == wbc_data->wbc_blocks_in);
1388                 VERIFY(wbc_data->wbc_blocks_mv == wbc_data->wbc_blocks_out);
1389         }
1390 
1391         wbc_close_window_impl(spa, &wbc_data->wbc_moved_blocks);
1392 
1393         wbc_rele_autosnaps(wbc_data, txg_to_rele, B_FALSE);
1394 }
1395 
1396 /*
1397  * To fini of a wbc_instance need to inherit wbc_mode.
1398  * During this operation will be called wbc_process_objset()
1399  * that will unregister this instance and destroy it
1400  */
1401 static void
1402 wbc_instance_finalization(void *arg)
1403 {
1404         wbc_instance_t *wbc_instance = arg;
1405 
1406         ASSERT(wbc_instance->fini_done);
1407 
1408 #ifdef _KERNEL
1409         /*
1410          * NVL needs to be populated here, because after
1411          * calling dsl_prop_inherit() wbc_instance cannot
1412          * be used
1413          */
1414         nvlist_t *event;
1415         event = fnvlist_alloc();
1416         fnvlist_add_string(event, "fsname", wbc_instance->ds_name);
1417 #endif
1418 
1419         VERIFY3U(dsl_prop_inherit(wbc_instance->ds_name,
1420             zfs_prop_to_name(ZFS_PROP_WBC_MODE),
1421             ZPROP_SRC_INHERITED), ==, 0);
1422 
1423 #ifdef _KERNEL
1424         zfs_event_post(ZFS_EC_STATUS, "wbc_done", event);
1425 #endif
1426 }
1427 
1428 static void
1429 wbc_rele_autosnaps(wbc_data_t *wbc_data, uint64_t txg_to_rele,
1430     boolean_t purge)
1431 {
1432         wbc_instance_t *wbc_instance;
1433 
1434         wbc_instance = avl_first(&wbc_data->wbc_instances);
1435         while (wbc_instance != NULL) {
1436                 if (wbc_instance->txg_to_rele != 0) {
1437                         VERIFY3U(wbc_instance->txg_to_rele,
1438                             ==, txg_to_rele);
1439                         if (wbc_instance->fini_migration &&
1440                             txg_to_rele > wbc_instance->txg_off && !purge) {
1441                                 /*
1442                                  * This WBC instance will be terminated in
1443                                  * the preallocated taskq
1444                                  *
1445                                  * WBC instance termination involves writing
1446                                  * and therefore requires sync context.
1447                                  * But since we are here already in the sync
1448                                  * context, the operation is task-dispatched
1449                                  */
1450                                 wbc_data->wbc_instance_fini_cnt--;
1451                                 wbc_instance->fini_done = B_TRUE;
1452                                 VERIFY(taskq_dispatch(
1453                                     wbc_data->wbc_instance_fini,
1454                                     wbc_instance_finalization, wbc_instance,
1455                                     TQ_SLEEP) != NULL);
1456                         } else if (wbc_instance->fini_migration) {
1457                                 autosnap_force_snap_fast(
1458                                     wbc_instance->wbc_autosnap_hdl);
1459                         }
1460 
1461                         autosnap_release_snapshots_by_txg(
1462                             wbc_instance->wbc_autosnap_hdl,
1463                             txg_to_rele, AUTOSNAP_NO_SNAP);
1464                         wbc_instance->txg_to_rele = 0;
1465                 } else if (wbc_instance->fini_migration) {
1466                         autosnap_force_snap_fast(
1467                             wbc_instance->wbc_autosnap_hdl);
1468                 }
1469 
1470                 wbc_instance = AVL_NEXT(&wbc_data->wbc_instances,
1471                     wbc_instance);
1472         }
1473 }
1474 
1475 /*
1476  * Purge pending blocks and reset right boundary.
1477  * It is used when dataset is deleted or an error
1478  * occured during traversing. If called in the
1479  * context of the sync thread, then syncing tx must
1480  * be passed. Outside the syncing thread NULL is
1481  * expected instead.
1482  */
1483 void
1484 wbc_purge_window(spa_t *spa, dmu_tx_t *tx)
1485 {
1486         wbc_data_t *wbc_data = spa_get_wbc_data(spa);
1487         uint64_t snap_txg;
1488 
1489         ASSERT(MUTEX_HELD(&wbc_data->wbc_lock));
1490 
1491         if (wbc_data->wbc_finish_txg == 0)
1492                 return;
1493 
1494         /*
1495          * Clean tree with blocks which are not queued
1496          * to be moved yet
1497          */
1498         wbc_clean_plan_tree(wbc_data);
1499 
1500         /*
1501          * Set purge on to notify move workers to skip all
1502          * blocks that are left in queue not to waste time
1503          * moving data which will be required to move again.
1504          * Wait until all queued blocks are processed.
1505          */
1506         wbc_data->wbc_purge = B_TRUE;
1507 
1508         /*
1509          * Reset the deletion flag to make sure
1510          * that the purge is appreciated by
1511          * dva[0] deleter
1512          */
1513         wbc_data->wbc_delete = B_FALSE;
1514 
1515         while (wbc_data->wbc_blocks_out !=
1516             wbc_data->wbc_blocks_mv &&
1517             !wbc_data->wbc_thr_exit) {
1518                 (void) cv_timedwait(&wbc_data->wbc_cv,
1519                     &wbc_data->wbc_lock,
1520                     ddi_get_lbolt() + 1000);
1521         }
1522 
1523         /*
1524          * Clean the tree of moved blocks
1525          */
1526         wbc_clean_moved_tree(wbc_data);
1527 
1528         wbc_data->wbc_blocks_in = 0;
1529         wbc_data->wbc_blocks_out = 0;
1530         wbc_data->wbc_blocks_mv = 0;
1531 
1532         /* Reset bookmark */
1533         bzero(&spa->spa_lszb, sizeof (spa->spa_lszb));
1534 
1535         snap_txg = wbc_data->wbc_txg_to_rele;
1536 
1537         /*
1538          * Reset right boundary and time of latest window
1539          * start to catch the closest snapshot which will be
1540          * created
1541          */
1542         wbc_data->wbc_finish_txg = 0;
1543         wbc_data->wbc_txg_to_rele = 0;
1544         wbc_data->wbc_latest_window_time = 0;
1545         wbc_data->wbc_roll_threshold =
1546             MIN(wbc_data->wbc_roll_threshold + wbc_mv_cancel_threshold_step,
1547             wbc_mv_cancel_threshold_cap);
1548 
1549         if (krrp_debug)
1550                 cmn_err(CE_NOTE, "WBC: Right boundary will be moved forward");
1551 
1552         if (tx) {
1553                 dsl_sync_task_nowait(spa->spa_dsl_pool,
1554                     wbc_write_update_window, NULL, 0, ZFS_SPACE_CHECK_NONE, tx);
1555         } else {
1556                 /*
1557                  * It is safe to drop the lock as the function has already
1558                  * set everything it wanted up to the moment and only need
1559                  * to update on-disk format
1560                  */
1561                 mutex_exit(&wbc_data->wbc_lock);
1562 
1563                 dsl_sync_task(spa->spa_name, NULL,
1564                     wbc_write_update_window, NULL, 0, ZFS_SPACE_CHECK_NONE);
1565                 mutex_enter(&wbc_data->wbc_lock);
1566         }
1567 
1568         wbc_rele_autosnaps(wbc_data, snap_txg, B_TRUE);
1569 
1570         /* Purge done */
1571         wbc_data->wbc_purge = B_FALSE;
1572 }
1573 
1574 /* Finalize interrupted with power cycle window */
1575 static void
1576 wbc_free_restore(spa_t *spa)
1577 {
1578         uint64_t ret;
1579         wbc_data_t *wbc_data = spa_get_wbc_data(spa);
1580         int err = zap_lookup(spa->spa_dsl_pool->dp_meta_objset,
1581             DMU_POOL_DIRECTORY_OBJECT,
1582             DMU_POOL_WBC_STATE_DELETE, sizeof (uint64_t), 1, &ret);
1583         boolean_t need_restore = err ? B_FALSE : (!!ret);
1584         wbc_parseblock_cb_t cb_data = { 0 };
1585 
1586         if (!need_restore) {
1587                 wbc_data->wbc_finish_txg = 0;
1588                 wbc_data->wbc_txg_to_rele = 0;
1589                 return;
1590         }
1591 
1592         /*
1593          * The mutex must be dropped to prevent recursive entry
1594          * It is safe as we are the only user of the WBC structures
1595          * at the point
1596          */
1597         mutex_exit(&wbc_data->wbc_lock);
1598         cb_data.wbc_data = wbc_data;
1599         err = traverse_pool(spa, wbc_data->wbc_start_txg - 1,
1600             wbc_data->wbc_finish_txg + 1,
1601             TRAVERSE_PREFETCH_METADATA | TRAVERSE_POST,
1602             wbc_traverse_ds_cb, &cb_data, &cb_data.zb);
1603 
1604         mutex_enter(&wbc_data->wbc_lock);
1605 
1606         wbc_close_window_impl(spa, &wbc_data->wbc_blocks);
1607         wbc_data->wbc_blocks_count = 0;
1608 }
1609 
1610 /*
1611  * The bool returned from this function tells to autosnapper
1612  * whether to take a new autosnapshot or not.
1613  * The new autosnapshot is used as the right boundary for a new
1614  * writecache migration window.
1615  */
1616 /*ARGSUSED*/
1617 static boolean_t
1618 wbc_confirm_cb(const char *name, boolean_t recursive, uint64_t txg, void *arg)
1619 {
1620         wbc_instance_t *wbc_instance = arg;
1621         wbc_data_t *wbc_data = wbc_instance->wbc_data;
1622 
1623         /*
1624          * The conditions are:
1625          * - no active writecache window currently
1626          * - writecache is not locked
1627          * - used space on special vdev is at or above min-watermark
1628          * or an instance waits for finalization
1629          */
1630         return (wbc_data->wbc_wait_for_window && !wbc_data->wbc_locked &&
1631             (!wbc_check_space(wbc_data->wbc_spa) ||
1632             wbc_data->wbc_instance_fini_cnt != 0));
1633 }
1634 
1635 uint64_t wbc_window_roll_delay_ms = 0;
1636 
1637 static boolean_t
1638 wbc_check_time(wbc_data_t *wbc_data)
1639 {
1640 #ifdef _KERNEL
1641         if (wbc_window_roll_delay_ms == 0)
1642                 return (B_FALSE);
1643 
1644         uint64_t time_spent =
1645             ddi_get_lbolt() - wbc_data->wbc_latest_window_time;
1646         return (time_spent < drv_usectohz(wbc_window_roll_delay_ms * MILLISEC));
1647 #else
1648         return (B_FALSE);
1649 #endif
1650 }
1651 
1652 /*
1653  * Returns B_TRUE if the percentage of used space on special vdev
1654  * is below ZPOOL_PROP_MINWATERMARK ("min-watermark", MIN_WN),
1655  * otherwise returns B_FALSE.
1656  *
1657  * Based on this return wbc_confirm_cb() caller either opens
1658  * a new writecache window, or not. In the latter case, when
1659  * the used space remains below min-watermark, writecache migration
1660  * does not run.
1661  *
1662  * Similarly to low-watermark and high-watermark that control
1663  * special vdev's used space and the rate of its utilization,
1664  * the min-watermark is a pool's property that can be set via:
1665  *
1666  * 'zpool set min-watermark <pool name>'
1667  *
1668  */
1669 static boolean_t
1670 wbc_check_space(spa_t *spa)
1671 {
1672         uint64_t percentage =
1673             spa_class_alloc_percentage(spa_special_class(spa));
1674 
1675         return (percentage < spa->spa_minwat);
1676 }
1677 
1678 /* Autosnap notification callback */
1679 /*ARGSUSED*/
1680 static boolean_t
1681 wbc_nc_cb(const char *name, boolean_t recursive, boolean_t autosnap,
1682     uint64_t txg, uint64_t etxg, void *arg)
1683 {
1684         boolean_t result = B_FALSE;
1685         wbc_instance_t *wbc_instance = arg;
1686         wbc_data_t *wbc_data = wbc_instance->wbc_data;
1687 
1688         mutex_enter(&wbc_data->wbc_lock);
1689         if (!wbc_data->wbc_isvalid) {
1690                 mutex_exit(&wbc_data->wbc_lock);
1691                 return (B_FALSE);
1692         }
1693 
1694         if (wbc_data->wbc_finish_txg != 0) {
1695                 if (wbc_data->wbc_finish_txg == etxg &&
1696                     !wbc_instance->fini_done) {
1697                         /* Same window-snapshot for another WBC-Instance */
1698                         wbc_instance->txg_to_rele = txg;
1699                         result = B_TRUE;
1700                 }
1701 
1702                 mutex_exit(&wbc_data->wbc_lock);
1703                 return (result);
1704         }
1705 
1706         if (wbc_data->wbc_walking) {
1707                 /* Current window already done, but is not closed yet */
1708                 result = B_FALSE;
1709         } else if (wbc_data->wbc_locked) {
1710                 /* WBC is locked by an external caller */
1711                 result = B_FALSE;
1712         } else if (wbc_instance->fini_done) {
1713                 /* Instance already done, so snapshot is not required */
1714                 result = B_FALSE;
1715         } else {
1716                 /* Accept new windows */
1717                 VERIFY0(wbc_data->wbc_blocks_count);
1718                 VERIFY(avl_is_empty(&wbc_data->wbc_blocks));
1719                 wbc_data->wbc_latest_window_time = ddi_get_lbolt();
1720                 wbc_data->wbc_first_move = B_FALSE;
1721                 wbc_data->wbc_walk = B_TRUE;
1722                 wbc_data->wbc_finish_txg = etxg;
1723                 wbc_data->wbc_txg_to_rele = txg;
1724                 wbc_data->wbc_altered_limit = 0;
1725                 wbc_data->wbc_altered_bytes = 0;
1726                 wbc_data->wbc_window_bytes = 0;
1727                 wbc_data->wbc_fault_moves = 0;
1728                 cv_broadcast(&wbc_data->wbc_cv);
1729                 result = B_TRUE;
1730                 wbc_instance->txg_to_rele = txg;
1731                 wbc_data->wbc_wait_for_window = B_FALSE;
1732         }
1733 
1734         mutex_exit(&wbc_data->wbc_lock);
1735         return (result);
1736 }
1737 
1738 /*ARGSUSED*/
1739 static void
1740 wbc_err_cb(const char *name, int err, uint64_t txg, void *arg)
1741 {
1742         if (krrp_debug) {
1743                 cmn_err(CE_WARN, "Autosnap can not create a snapshot "
1744                     "for writecached dataset [%s] at txg %llu [%d]",
1745                     name, (unsigned long long)txg, err);
1746         }
1747 }
1748 
1749 void
1750 wbc_add_bytes(spa_t *spa, uint64_t txg, uint64_t bytes)
1751 {
1752         wbc_data_t *wbc_data = &spa->spa_wbc;
1753 
1754         mutex_enter(&wbc_data->wbc_lock);
1755 
1756         if (wbc_data->wbc_finish_txg == txg) {
1757                 wbc_data->wbc_window_bytes += bytes;
1758                 wbc_data->wbc_altered_limit =
1759                     wbc_data->wbc_window_bytes *
1760                     wbc_data->wbc_roll_threshold / 100;
1761 
1762                 DTRACE_PROBE3(wbc_window_size, uint64_t, txg,
1763                     uint64_t, wbc_data->wbc_window_bytes,
1764                     uint64_t, wbc_data->wbc_altered_limit);
1765         }
1766 
1767         mutex_exit(&wbc_data->wbc_lock);
1768 }
1769 
1770 /* WBC-INIT routines */
1771 
1772 void
1773 wbc_activate(spa_t *spa, boolean_t pool_creation)
1774 {
1775         if (spa_feature_is_enabled(spa, SPA_FEATURE_WBC))
1776                 wbc_activate_impl(spa, pool_creation);
1777 }
1778 
1779 /*
1780  * This function is callback for dmu_objset_find_dp()
1781  * that is called during the initialization of WBC.
1782  *
1783  * Here we register wbc_instance for the given dataset
1784  * if WBC is activated for this datasets
1785  */
1786 /* ARGSUSED */
1787 static int
1788 wbc_activate_instances(dsl_pool_t *dp, dsl_dataset_t *ds, void *arg)
1789 {
1790         wbc_data_t *wbc_data = arg;
1791         objset_t *os = NULL;
1792         wbc_instance_t *wbc_instance = NULL;
1793         int rc = 0;
1794 
1795         (void) dmu_objset_from_ds(ds, &os);
1796         VERIFY(os != NULL);
1797 
1798         if (os->os_wbc_mode == ZFS_WBC_MODE_OFF)
1799                 return (0);
1800 
1801         if (os->os_dsl_dataset->ds_object != os->os_wbc_root_ds_obj)
1802                 return (0);
1803 
1804         mutex_enter(&wbc_data->wbc_lock);
1805 
1806         if (wbc_data->wbc_isvalid)
1807                 wbc_instance = wbc_register_instance(wbc_data, os);
1808         else
1809                 rc = EINTR;
1810 
1811         if (wbc_instance != NULL) {
1812                 if (os->os_wbc_mode == ZFS_WBC_MODE_OFF_DELAYED) {
1813                         wbc_instance->fini_migration = B_TRUE;
1814                         wbc_instance->txg_off = os->os_wbc_off_txg;
1815                         wbc_data->wbc_instance_fini_cnt++;
1816                 }
1817 
1818                 autosnap_force_snap_fast(wbc_instance->wbc_autosnap_hdl);
1819         }
1820 
1821         mutex_exit(&wbc_data->wbc_lock);
1822 
1823         return (rc);
1824 }
1825 
1826 /*
1827  * Second stage of the WBC initialization.
1828  *
1829  * We walk over all DS of the given pool to activate
1830  * wbc_instances for DSs with activated WBC
1831  */
1832 static void
1833 wbc_init_thread(void *arg)
1834 {
1835         wbc_data_t *wbc_data = arg;
1836         spa_t *spa = wbc_data->wbc_spa;
1837         dsl_dataset_t *ds_root = NULL;
1838         uint64_t dd_root_object;
1839         int err;
1840 
1841         /*
1842          * If the feature flag is active then need to
1843          * lookup the datasets that have enabled WBC
1844          */
1845         if (spa_feature_is_active(spa, SPA_FEATURE_WBC)) {
1846                 dsl_pool_config_enter(spa_get_dsl(spa), FTAG);
1847 
1848                 err = dsl_dataset_hold(spa_get_dsl(spa), spa->spa_name,
1849                     FTAG, &ds_root);
1850                 if (err != 0) {
1851                         dsl_pool_config_exit(spa_get_dsl(spa), FTAG);
1852                         mutex_enter(&wbc_data->wbc_lock);
1853                         goto out;
1854                 }
1855 
1856                 dd_root_object = ds_root->ds_dir->dd_object;
1857                 dsl_dataset_rele(ds_root, FTAG);
1858 
1859                 VERIFY0(dmu_objset_find_dp(spa_get_dsl(spa), dd_root_object,
1860                     wbc_activate_instances, wbc_data, DS_FIND_CHILDREN));
1861 
1862                 dsl_pool_config_exit(spa_get_dsl(spa), FTAG);
1863         }
1864 
1865         mutex_enter(&wbc_data->wbc_lock);
1866 
1867         wbc_data->wbc_ready_to_use = B_TRUE;
1868         if (avl_numnodes(&wbc_data->wbc_instances) != 0 &&
1869             !wbc_data->wbc_thr_exit)
1870                 wbc_start_thread(wbc_data->wbc_spa);
1871 
1872 out:
1873         wbc_data->wbc_init_thread = NULL;
1874         cv_broadcast(&wbc_data->wbc_cv);
1875         mutex_exit(&wbc_data->wbc_lock);
1876 }
1877 
1878 /*
1879  * Initialize WBC properties for the given pool.
1880  */
1881 static void
1882 wbc_activate_impl(spa_t *spa, boolean_t pool_creation)
1883 {
1884         wbc_data_t *wbc_data = spa_get_wbc_data(spa);
1885         wbc_stat_t *wbc_stat = &wbc_data->wbc_stat;
1886         uint64_t spa_children = spa->spa_root_vdev->vdev_children;
1887         int err = 0;
1888         boolean_t hold = B_FALSE;
1889 
1890         mutex_enter(&wbc_data->wbc_lock);
1891         if (wbc_data->wbc_isvalid) {
1892                 mutex_exit(&wbc_data->wbc_lock);
1893                 return;
1894         }
1895 
1896         /* Reset bookmark */
1897         bzero(&spa->spa_lszb, sizeof (spa->spa_lszb));
1898 
1899         wbc_data->wbc_roll_threshold = wbc_mv_cancel_threshold_initial;
1900         wbc_data->wbc_altered_limit = 0;
1901         wbc_data->wbc_altered_bytes = 0;
1902         wbc_data->wbc_window_bytes = 0;
1903 
1904         /* Reset statistics */
1905         wbc_stat->wbc_spa_util = 0;
1906         wbc_stat->wbc_stat_lbolt = 0;
1907         wbc_stat->wbc_stat_update = B_FALSE;
1908 
1909         /* Number of WBC block-moving threads - taskq nthreads */
1910         wbc_data->wbc_move_threads = MIN(wbc_max_move_tasks_count,
1911             spa_children * zfs_vdev_async_write_max_active);
1912 
1913         /*
1914          * Read WBC parameters to restore
1915          * latest WBC window's boundaries
1916          */
1917         if (!rrw_held(&spa->spa_dsl_pool->dp_config_rwlock,
1918             RW_WRITER)) {
1919                 rrw_enter(&spa->spa_dsl_pool->dp_config_rwlock,
1920                     RW_READER, FTAG);
1921                 hold = B_TRUE;
1922         }
1923 
1924         err = zap_lookup(spa->spa_dsl_pool->dp_meta_objset,
1925             DMU_POOL_DIRECTORY_OBJECT,
1926             DMU_POOL_WBC_START_TXG, sizeof (uint64_t), 1,
1927             &wbc_data->wbc_start_txg);
1928         if (err)
1929                 wbc_data->wbc_start_txg = 4;
1930 
1931         err = zap_lookup(spa->spa_dsl_pool->dp_meta_objset,
1932             DMU_POOL_DIRECTORY_OBJECT,
1933             DMU_POOL_WBC_FINISH_TXG, sizeof (uint64_t), 1,
1934             &wbc_data->wbc_finish_txg);
1935         if (!err) {
1936                 err = zap_lookup(spa->spa_dsl_pool->dp_meta_objset,
1937                     DMU_POOL_DIRECTORY_OBJECT,
1938                     DMU_POOL_WBC_TO_RELE_TXG, sizeof (uint64_t), 1,
1939                     &wbc_data->wbc_txg_to_rele);
1940         }
1941 
1942         if (hold)
1943                 rrw_exit(&spa->spa_dsl_pool->dp_config_rwlock, FTAG);
1944 
1945         if (err) {
1946                 wbc_data->wbc_finish_txg = 0;
1947                 wbc_data->wbc_txg_to_rele = 0;
1948         }
1949 
1950         wbc_data->wbc_latest_window_time = ddi_get_lbolt();
1951 
1952         wbc_data->wbc_ready_to_use = B_FALSE;
1953         wbc_data->wbc_thr_exit = B_FALSE;
1954         wbc_data->wbc_purge = B_FALSE;
1955         wbc_data->wbc_walk = B_TRUE;
1956         wbc_data->wbc_spa = spa;
1957         wbc_data->wbc_isvalid = B_TRUE;
1958         wbc_data->wbc_instance_fini_cnt = 0;
1959 
1960         /* Finalize window interrupted by power cycle or reimport */
1961         wbc_free_restore(spa);
1962 
1963         if (pool_creation) {
1964                 /* On create there is no reason to start init_thread */
1965                 wbc_data->wbc_ready_to_use = B_TRUE;
1966         } else {
1967                 /*
1968                  * On import need to restore wbc_instances.
1969                  * Do this asynchronously.
1970                  */
1971                 wbc_data->wbc_init_thread = thread_create(NULL, 0,
1972                     wbc_init_thread, wbc_data, 0, &p0, TS_RUN, maxclsyspri);
1973         }
1974 
1975         mutex_exit(&wbc_data->wbc_lock);
1976 
1977         DTRACE_PROBE2(wbc_spa_add, char *, spa->spa_name,
1978             spa_t *, spa);
1979 }
1980 
1981 void
1982 wbc_deactivate(spa_t *spa)
1983 {
1984         wbc_data_t *wbc_data = spa_get_wbc_data(spa);
1985 
1986         mutex_enter(&wbc_data->wbc_lock);
1987 
1988         if (!spa_has_special(spa) || !wbc_data->wbc_isvalid) {
1989                 mutex_exit(&wbc_data->wbc_lock);
1990                 return;
1991         }
1992 
1993         DTRACE_PROBE1(wbc_deactiv_start, char *, spa->spa_name);
1994 
1995         wbc_data->wbc_isvalid = B_FALSE;
1996 
1997         while (wbc_data->wbc_init_thread != NULL)
1998                 cv_wait(&wbc_data->wbc_cv, &wbc_data->wbc_lock);
1999 
2000         wbc_unregister_instances(wbc_data);
2001 
2002         VERIFY(avl_is_empty(&wbc_data->wbc_blocks));
2003         VERIFY(avl_is_empty(&wbc_data->wbc_moved_blocks));
2004 
2005         DTRACE_PROBE1(wbc_deactiv_done, char *, spa->spa_name);
2006 
2007         mutex_exit(&wbc_data->wbc_lock);
2008 }
2009 
2010 /*
2011  * AVL comparison function (callback) for writeback-cached blocks.
2012  * This function defines the tree's sorting order which is:
2013  * (vdev, offset) ascending, where vdev and offset are the respective
2014  * vdev id and offset of the block.
2015  *
2016  * Returns -1 if (block1 < block2), 0 if (block1 == block2),
2017  * and 1 when (block1 > block2).
2018  */
2019 static int
2020 wbc_blocks_compare(const void *arg1, const void *arg2)
2021 {
2022         wbc_block_t *block1 = (wbc_block_t *)arg1;
2023         wbc_block_t *block2 = (wbc_block_t *)arg2;
2024 
2025         /* calculate vdev and offset for block1 and block2 */
2026         uint64_t vdev1 = DVA_GET_VDEV(&block1->dva[WBC_SPECIAL_DVA]);
2027         uint64_t offset1 = DVA_GET_OFFSET(&block1->dva[WBC_SPECIAL_DVA]);
2028         uint64_t vdev2 = DVA_GET_VDEV(&block2->dva[WBC_SPECIAL_DVA]);
2029         uint64_t offset2 = DVA_GET_OFFSET(&block2->dva[WBC_SPECIAL_DVA]);
2030 
2031         /* compare vdev's and offsets */
2032         int cmp1 = (vdev1 < vdev2) ? (-1) : (vdev1 == vdev2 ? 0 : 1);
2033         int cmp2 = (offset1 < offset2) ? (-1) : (offset1 == offset2 ? 0 : 1);
2034         int cmp = (cmp1 == 0) ? cmp2 : cmp1;
2035 
2036         return (cmp);
2037 }
2038 
2039 static int
2040 wbc_instances_compare(const void *arg1, const void *arg2)
2041 {
2042         const wbc_instance_t *instance1 = arg1;
2043         const wbc_instance_t *instance2 = arg2;
2044 
2045         if (instance1->ds_object > instance2->ds_object)
2046                 return (1);
2047 
2048         if (instance1->ds_object < instance2->ds_object)
2049                 return (-1);
2050 
2051         return (0);
2052 }
2053 
2054 static int
2055 wbc_io(wbc_io_type_t type, wbc_block_t *block, abd_t *data)
2056 {
2057         zio_t *zio;
2058         zio_type_t zio_type;
2059         vdev_t *vd;
2060         uint64_t bias;
2061         size_t dva_num;
2062 
2063         if (type == WBC_READ_FROM_SPECIAL) {
2064                 zio_type = ZIO_TYPE_READ;
2065                 dva_num = WBC_SPECIAL_DVA;
2066         } else {
2067                 ASSERT(type == WBC_WRITE_TO_NORMAL);
2068                 zio_type = ZIO_TYPE_WRITE;
2069                 dva_num = WBC_NORMAL_DVA;
2070         }
2071 
2072         vd = vdev_lookup_top(block->data->wbc_spa,
2073             DVA_GET_VDEV(&block->dva[dva_num]));
2074         bias = vd->vdev_children == 0 ? VDEV_LABEL_START_SIZE : 0;
2075         zio = zio_wbc(zio_type, vd, data, WBCBP_GET_PSIZE(block),
2076             DVA_GET_OFFSET(&block->dva[dva_num]) + bias);
2077 
2078         return (zio_wait(zio));
2079 }
2080 
2081 /*
2082  * if birth_txg is less than windows, then block is on
2083  * normal device only otherwise it can be found on
2084  * special, because deletion goes under lock and until
2085  * deletion is done, the block is accessible on special
2086  */
2087 int
2088 wbc_select_dva(wbc_data_t *wbc_data, zio_t *zio)
2089 {
2090         uint64_t stxg;
2091         uint64_t ftxg;
2092         uint64_t btxg;
2093         int c;
2094 
2095         mutex_enter(&wbc_data->wbc_lock);
2096 
2097         stxg = wbc_data->wbc_start_txg;
2098         ftxg = wbc_data->wbc_finish_txg;
2099         btxg = BP_PHYSICAL_BIRTH(zio->io_bp);
2100 
2101         if (ftxg && btxg > ftxg) {
2102                 DTRACE_PROBE(wbc_read_special_after);
2103                 c = WBC_SPECIAL_DVA;
2104         } else if (btxg >= stxg) {
2105                 if (!ftxg && wbc_data->wbc_delete) {
2106                         DTRACE_PROBE(wbc_read_normal);
2107                         c = WBC_NORMAL_DVA;
2108                 } else {
2109                         DTRACE_PROBE(wbc_read_special_inside);
2110                         c = WBC_SPECIAL_DVA;
2111                 }
2112         } else {
2113                 DTRACE_PROBE(wbc_read_normal);
2114                 c = WBC_NORMAL_DVA;
2115         }
2116 
2117         mutex_exit(&wbc_data->wbc_lock);
2118 
2119         return (c);
2120 }
2121 
2122 /*
2123  * Checks if a special block has left the special device and has been fully
2124  * migrated by WBC to the normal pool.
2125  */
2126 boolean_t
2127 wbc_bp_is_migrated(wbc_data_t *wbc_data, const blkptr_t *bp)
2128 {
2129         boolean_t result;
2130 
2131         ASSERT(BP_IS_SPECIAL(bp));
2132         mutex_enter(&wbc_data->wbc_lock);
2133         result = BP_PHYSICAL_BIRTH(bp) < wbc_data->wbc_start_txg;
2134         mutex_exit(&wbc_data->wbc_lock);
2135 
2136         return (result);
2137 }
2138 
2139 /*
2140  * 3 cases can be here
2141  * 1st - birth_txg is less than window - only normal device should be free
2142  * 2nd - inside window both trees are checked and if both of the trees
2143  *      haven't this block and deletion in process, then block is already
2144  *      freed, otherwise both dva are freed
2145  * 3rd - birth_txg is higher than window - both dva must be freed
2146  */
2147 int
2148 wbc_first_valid_dva(const blkptr_t *bp,
2149     wbc_data_t *wbc_data, boolean_t removal)
2150 {
2151         int start_dva = 0;
2152 
2153         ASSERT(MUTEX_HELD(&wbc_data->wbc_lock));
2154 
2155         if (BP_PHYSICAL_BIRTH(bp) < wbc_data->wbc_start_txg) {
2156                 start_dva = 1;
2157         } else if (BP_PHYSICAL_BIRTH(bp) <= wbc_data->wbc_finish_txg) {
2158                 wbc_block_t search, *planned, *moved;
2159 
2160                 /* Only DVA[0] is required for search */
2161                 search.dva[WBC_SPECIAL_DVA] = bp->blk_dva[WBC_SPECIAL_DVA];
2162 
2163                 moved = avl_find(&wbc_data->wbc_moved_blocks,
2164                     &search, NULL);
2165                 if (moved != NULL && removal) {
2166                         /*
2167                          * later WBC will do free for this block
2168                          */
2169                         mutex_enter(&moved->lock);
2170                         WBCBP_MARK_DELETED(moved);
2171                         mutex_exit(&moved->lock);
2172                 }
2173 
2174                 planned = avl_find(&wbc_data->wbc_blocks,
2175                     &search, NULL);
2176                 if (planned != NULL && removal) {
2177                         avl_remove(&wbc_data->wbc_blocks, planned);
2178                         wbc_free_block(planned);
2179                 }
2180 
2181                 if (planned == NULL && moved == NULL && wbc_data->wbc_delete)
2182                         start_dva = 1;
2183         }
2184 
2185         return (start_dva);
2186 }
2187 
2188 /*
2189  * 1) for each dataset of the given pool at the dataset load time
2190  * 2) on each change of the wbc_mode property, for the dataset in
2191  * question and all its children
2192  *
2193  * see dsl_prop_register()/dsl_prop_unregister() and
2194  * dmu_objset_open_impl()/dmu_objset_evict()
2195  *
2196  * wbc_mode has 3 states:
2197  * ON, OFF - for user
2198  * OFF_DELAYED - for the internal using
2199  *
2200  * ON - generation of special BPs and migration
2201  * OFF_DELAYED - special BPs will not be created, but migration
2202  * still active to migrate. To migrate all blocks that still on SPECIAL
2203  * OFF - we migrated all blocks that were on special, so this instance
2204  * can be destroyed.
2205  */
2206 void
2207 wbc_mode_changed(void *arg, uint64_t newval)
2208 {
2209         objset_t *os = arg;
2210         wbc_data_t *wbc_data = spa_get_wbc_data(os->os_spa);
2211         wbc_mode_prop_val_t *val =
2212             (wbc_mode_prop_val_t *)((uintptr_t)newval);
2213 
2214         if (val->root_ds_object != 0) {
2215                 os->os_wbc_root_ds_obj = val->root_ds_object;
2216                 os->os_wbc_off_txg = val->txg_off;
2217                 if (val->txg_off == 0)
2218                         os->os_wbc_mode = ZFS_WBC_MODE_ON;
2219                 else
2220                         os->os_wbc_mode = ZFS_WBC_MODE_OFF_DELAYED;
2221         } else {
2222                 if (os->os_wbc_mode == ZFS_WBC_MODE_OFF)
2223                         return;
2224 
2225                 os->os_wbc_mode = ZFS_WBC_MODE_OFF;
2226         }
2227 
2228         DTRACE_PROBE4(wbc_mc,
2229             boolean_t, wbc_data->wbc_ready_to_use,
2230             uint64_t, os->os_dsl_dataset->ds_object,
2231             uint64_t, os->os_wbc_mode,
2232             uint64_t, os->os_wbc_root_ds_obj);
2233 
2234         wbc_process_objset(wbc_data, os, B_FALSE);
2235 
2236         if (os->os_wbc_mode == ZFS_WBC_MODE_OFF) {
2237                 os->os_wbc_root_ds_obj = 0;
2238                 os->os_wbc_off_txg = 0;
2239         }
2240 }
2241 
2242 /*
2243  * This function is called:
2244  * 1) on change of wbc_mode property
2245  * 2) on destroying of a DS
2246  *
2247  * It processes only top-level DS of a WBC-DS-tree
2248  */
2249 void
2250 wbc_process_objset(wbc_data_t *wbc_data,
2251     objset_t *os, boolean_t destroy)
2252 {
2253         wbc_instance_t *wbc_instance;
2254         size_t num_nodes_before, num_nodes_after;
2255 
2256         if (os->os_wbc_root_ds_obj == 0)
2257                 return;
2258 
2259         mutex_enter(&wbc_data->wbc_lock);
2260         /* Do not register instances too early */
2261         if (!wbc_data->wbc_isvalid || !wbc_data->wbc_ready_to_use) {
2262                 mutex_exit(&wbc_data->wbc_lock);
2263                 return;
2264         }
2265 
2266         if (os->os_dsl_dataset->ds_object != os->os_wbc_root_ds_obj) {
2267                 wbc_instance = wbc_lookup_instance(wbc_data,
2268                     os->os_wbc_root_ds_obj, NULL);
2269 
2270                 /*
2271                  * If instance for us does not exist, then WBC
2272                  * should not be enabled for this DS
2273                  */
2274                 if (wbc_instance == NULL)
2275                         os->os_wbc_mode = ZFS_WBC_MODE_OFF;
2276 
2277                 mutex_exit(&wbc_data->wbc_lock);
2278                 return;
2279         }
2280 
2281         num_nodes_before = avl_numnodes(&wbc_data->wbc_instances);
2282 
2283         if (os->os_wbc_mode == ZFS_WBC_MODE_OFF || destroy) {
2284                 wbc_unregister_instance(wbc_data, os, !destroy);
2285         } else {
2286                 wbc_instance = wbc_register_instance(wbc_data, os);
2287                 if (wbc_instance != NULL &&
2288                     os->os_wbc_mode == ZFS_WBC_MODE_OFF_DELAYED &&
2289                     !wbc_instance->fini_migration) {
2290                         wbc_instance->fini_migration = B_TRUE;
2291                         wbc_data->wbc_instance_fini_cnt++;
2292                         wbc_instance->txg_off = os->os_wbc_off_txg;
2293                         autosnap_force_snap_fast(
2294                             wbc_instance->wbc_autosnap_hdl);
2295                 }
2296 
2297                 if (wbc_instance == NULL) {
2298                         /*
2299                          * We do not want to write data to special
2300                          * if the data will not be migrated, because
2301                          * registration failed
2302                          */
2303                         os->os_wbc_mode = ZFS_WBC_MODE_OFF;
2304                 }
2305         }
2306 
2307         num_nodes_after = avl_numnodes(&wbc_data->wbc_instances);
2308 
2309         mutex_exit(&wbc_data->wbc_lock);
2310 
2311         /*
2312          * The first instance, so need to
2313          * start the collector and the mover
2314          */
2315         if ((num_nodes_after > num_nodes_before) &&
2316             (num_nodes_before == 0)) {
2317                 wbc_start_thread(wbc_data->wbc_spa);
2318         }
2319 
2320         /*
2321          * The last instance, so need to
2322          * stop the collector and the mover
2323          */
2324         if ((num_nodes_after < num_nodes_before) &&
2325             (num_nodes_after == 0)) {
2326                 (void) wbc_stop_thread(wbc_data->wbc_spa);
2327         }
2328 }
2329 
2330 static wbc_instance_t *
2331 wbc_register_instance(wbc_data_t *wbc_data, objset_t *os)
2332 {
2333         dsl_dataset_t *ds = os->os_dsl_dataset;
2334         wbc_instance_t *wbc_instance;
2335         avl_index_t where = NULL;
2336 
2337         ASSERT(MUTEX_HELD(&wbc_data->wbc_lock));
2338 
2339         /* Is it already registered? */
2340         wbc_instance = wbc_lookup_instance(wbc_data,
2341             ds->ds_object, &where);
2342         if (wbc_instance != NULL)
2343                 return (wbc_instance);
2344 
2345         wbc_instance = kmem_zalloc(sizeof (wbc_instance_t), KM_SLEEP);
2346         wbc_instance->ds_object = ds->ds_object;
2347         wbc_instance->wbc_data = wbc_data;
2348         dsl_dataset_name(ds, wbc_instance->ds_name);
2349         wbc_instance->wbc_autosnap_hdl =
2350             autosnap_register_handler_impl(wbc_data->wbc_spa, wbc_instance->ds_name,
2351             AUTOSNAP_CREATOR | AUTOSNAP_DESTROYER |
2352             AUTOSNAP_RECURSIVE | AUTOSNAP_WBC,
2353             wbc_confirm_cb, wbc_nc_cb, wbc_err_cb, wbc_instance);
2354         if (wbc_instance->wbc_autosnap_hdl == NULL) {
2355                 cmn_err(CE_WARN, "Cannot register autosnap handler "
2356                     "for WBC-Instance (%s)", wbc_instance->ds_name);
2357                 kmem_free(wbc_instance, sizeof (wbc_instance_t));
2358                 return (NULL);
2359         }
2360 
2361         DTRACE_PROBE2(register_done,
2362             uint64_t, wbc_instance->ds_object,
2363             char *, wbc_instance->ds_name);
2364 
2365         avl_insert(&wbc_data->wbc_instances, wbc_instance, where);
2366 
2367         return (wbc_instance);
2368 }
2369 
2370 static void
2371 wbc_unregister_instance(wbc_data_t *wbc_data, objset_t *os,
2372     boolean_t rele_autosnap)
2373 {
2374         dsl_dataset_t *ds = os->os_dsl_dataset;
2375         wbc_instance_t *wbc_instance;
2376 
2377         ASSERT(MUTEX_HELD(&wbc_data->wbc_lock));
2378 
2379         wbc_instance = wbc_lookup_instance(wbc_data, ds->ds_object, NULL);
2380         if (wbc_instance != NULL) {
2381                 DTRACE_PROBE1(unregister_done,
2382                     uint64_t, wbc_instance->ds_object);
2383 
2384                 avl_remove(&wbc_data->wbc_instances, wbc_instance);
2385                 wbc_unregister_instance_impl(wbc_instance,
2386                     rele_autosnap && (wbc_instance->txg_to_rele != 0));
2387         }
2388 }
2389 
2390 static void
2391 wbc_unregister_instances(wbc_data_t *wbc_data)
2392 {
2393         void *cookie = NULL;
2394         wbc_instance_t *wbc_instance;
2395 
2396         ASSERT(MUTEX_HELD(&wbc_data->wbc_lock));
2397 
2398         while ((wbc_instance = avl_destroy_nodes(
2399             &wbc_data->wbc_instances, &cookie)) != NULL)
2400                 wbc_unregister_instance_impl(wbc_instance, B_FALSE);
2401 }
2402 
2403 static void
2404 wbc_unregister_instance_impl(wbc_instance_t *wbc_instance,
2405     boolean_t rele_autosnap)
2406 {
2407         if (rele_autosnap) {
2408                 autosnap_release_snapshots_by_txg(
2409                     wbc_instance->wbc_autosnap_hdl,
2410                     wbc_instance->txg_to_rele,
2411                     AUTOSNAP_NO_SNAP);
2412         }
2413 
2414         autosnap_unregister_handler(wbc_instance->wbc_autosnap_hdl);
2415         kmem_free(wbc_instance, sizeof (wbc_instance_t));
2416 }
2417 
2418 static wbc_instance_t *
2419 wbc_lookup_instance(wbc_data_t *wbc_data,
2420     uint64_t ds_object, avl_index_t *where)
2421 {
2422         wbc_instance_t wbc_instance;
2423 
2424         ASSERT(MUTEX_HELD(&wbc_data->wbc_lock));
2425 
2426         wbc_instance.ds_object = ds_object;
2427         return (avl_find(&wbc_data->wbc_instances,
2428             &wbc_instance, where));
2429 }
2430 
2431 /*
2432  * Returns:
2433  * 0  - the dataset is a top-level (root) writecached dataset
2434  * EOPNOTSUPP - the dataset is a writecached child
2435  * ENOTACTIVE - is not writecached
2436  * other zfs err - cannot open the pool, is busy, etc.
2437  */
2438 int
2439 wbc_check_dataset(const char *ds_name)
2440 {
2441         int error;
2442         spa_t *spa = NULL;
2443         dsl_dataset_t *ds = NULL;
2444         objset_t *os = NULL;
2445         zfs_wbc_mode_t wbc_mode;
2446         uint64_t wbc_root_object, ds_object;
2447 
2448         error = spa_open(ds_name, &spa, FTAG);
2449         if (error != 0)
2450                 return (error);
2451 
2452         dsl_pool_config_enter(spa_get_dsl(spa), FTAG);
2453         error = dsl_dataset_hold(spa_get_dsl(spa), ds_name, FTAG, &ds);
2454         if (error) {
2455                 dsl_pool_config_exit(spa_get_dsl(spa), FTAG);
2456                 spa_close(spa, FTAG);
2457                 return (error);
2458         }
2459 
2460         error = dmu_objset_from_ds(ds, &os);
2461         dsl_pool_config_exit(spa_get_dsl(spa), FTAG);
2462         if (error) {
2463                 dsl_dataset_rele(ds, FTAG);
2464                 spa_close(spa, FTAG);
2465                 return (error);
2466         }
2467 
2468         wbc_mode = os->os_wbc_mode;
2469         wbc_root_object = os->os_wbc_root_ds_obj;
2470         ds_object = ds->ds_object;
2471         dsl_dataset_rele(ds, FTAG);
2472         spa_close(spa, FTAG);
2473 
2474         if (wbc_mode != ZFS_WBC_MODE_OFF) {
2475                 if (wbc_root_object != ds_object) {
2476                         /* The child of writecached ds-tree */
2477                         return (EOPNOTSUPP);
2478                 }
2479 
2480                 /* The root of writecached ds-tree */
2481                 return (0);
2482         }
2483 
2484         /* not writecached */
2485         return (ENOTACTIVE);
2486 }
2487 
2488 /*
2489  * The function requires that all the writecache
2490  * instances are already disabled
2491  */
2492 boolean_t
2493 wbc_try_disable(wbc_data_t *wbc_data)
2494 {
2495         boolean_t result = B_FALSE;
2496 
2497         mutex_enter(&wbc_data->wbc_lock);
2498 
2499         if (avl_numnodes(&wbc_data->wbc_instances) == 0) {
2500                 wbc_data->wbc_isvalid = B_FALSE;
2501                 result = B_TRUE;
2502         }
2503 
2504         mutex_exit(&wbc_data->wbc_lock);
2505 
2506         return (result);
2507 }