Print this page
NEX-13140 DVA-throttle support for special-class
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
Reviewed by: Yuri Pankov <yuri.pankov@nexenta.com>
NEX-13135 Running BDD tests exposes a panic in ZFS TRIM due to a trimset overlap
Reviewed by: Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
NEX-10069 ZFS_READONLY is a little too strict (fix test lint)
NEX-9553 Move ss_fill gap logic from scan algorithm into range_tree.c
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
Reviewed by: Yuri Pankov <yuri.pankov@nexenta.com>
NEX-6088 ZFS scrub/resilver take excessively long due to issuing lots of random IO
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
NEX-5553 ZFS auto-trim, manual-trim and scrub can race and deadlock
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
Reviewed by: Rob Gittins <rob.gittins@nexenta.com>
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
NEX-5795 Rename 'wrc' as 'wbc' in the source and in the tech docs
Reviewed by: Alex Aizman <alex.aizman@nexenta.com>
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
NEX-4720 WRC: DVA allocation bypass for special BPs works incorrect
Reviewed by: Alex Aizman <alex.aizman@nexenta.com>
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
NEX-4683 WRC: Special block pointer must know that it is special
Reviewed by: Alex Aizman <alex.aizman@nexenta.com>
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
NEX-4620 ZFS autotrim triggering is unreliable
NEX-4622 On-demand TRIM code illogically enumerates metaslabs via mg_ms_tree
Reviewed by: Josef 'Jeff' Sipek <josef.sipek@nexenta.com>
Reviewed by: Hans Rosenfeld <hans.rosenfeld@nexenta.com>
6295 metaslab_condense's dbgmsg should include vdev id
Reviewed by: George Wilson <george.wilson@delphix.com>
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: Andriy Gapon <avg@freebsd.org>
Reviewed by: Xin Li <delphij@freebsd.org>
Reviewed by: Justin Gibbs <gibbs@scsiguy.com>
Approved by: Richard Lowe <richlowe@richlowe.net>
NEX-4245 WRC: Code cleanup and refactoring to simplify merge with upstream
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
Reviewed by: Alex Aizman <alex.aizman@nexenta.com>
NEX-4059 On-demand TRIM can sometimes race in metaslab_load
Reviewed by: Alek Pinchuk <alek@nexenta.com>
Reviewed by: Josef 'Jeff' Sipek <josef.sipek@nexenta.com>
NEX-3984 On-demand TRIM
Reviewed by: Alek Pinchuk <alek@nexenta.com>
Reviewed by: Josef 'Jeff' Sipek <josef.sipek@nexenta.com>
Conflicts:
        usr/src/common/zfs/zpool_prop.c
        usr/src/uts/common/sys/fs/zfs.h
NEX-3710 WRC improvements and bug-fixes
 * refactored WRC move-logic to use zio kmem_cashes
 * replace size and compression fields by blk_prop field
   (the same in blkptr_t) to little reduce size of wrc_block_t
   and use similar macros as for blkptr_t to get PSIZE, LSIZE
   and COMPRESSION
 * make CPU more happy by reduce atomic calls
 * removed unused code
 * fixed naming of variables
 * fixed possible system panic after restart system
   with enabled WRC
 * fixed a race that causes system panic
Reviewed by: Alek Pinchuk <alek@nexenta.com>
Reviewed by: Josef 'Jeff' Sipek <josef.sipek@nexenta.com>
NEX-3558 KRRP Integration
NEX-3508 CLONE - Port NEX-2946 Add UNMAP/TRIM functionality to ZFS and illumos
Reviewed by: Josef Sipek <josef.sipek@nexenta.com>
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
Conflicts:
    usr/src/uts/common/io/scsi/targets/sd.c
    usr/src/uts/common/sys/scsi/targets/sddef.h
OS-197 Series of zpool exports and imports can hang the system
Reviewed by: Sarah Jelinek <sarah.jelinek@nexetna.com>
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
Reviewed by: Rob Gittins <rob.gittens@nexenta.com>
Reviewed by: Tony Nguyen <tony.nguyen@nexenta.com>
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
re #8346 rb2639 KT disk failures

@@ -21,10 +21,11 @@
 /*
  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  * Copyright (c) 2011, 2015 by Delphix. All rights reserved.
  * Copyright (c) 2013 by Saso Kiselkov. All rights reserved.
  * Copyright (c) 2014 Integros [integros.com]
+ * Copyright 2017 Nexenta Systems, Inc. All rights reserved.
  */
 
 #include <sys/zfs_context.h>
 #include <sys/dmu.h>
 #include <sys/dmu_tx.h>

@@ -32,11 +33,11 @@
 #include <sys/metaslab_impl.h>
 #include <sys/vdev_impl.h>
 #include <sys/zio.h>
 #include <sys/spa_impl.h>
 #include <sys/zfeature.h>
-#include <sys/vdev_indirect_mapping.h>
+#include <sys/wbc.h>
 
 #define GANG_ALLOCATION(flags) \
         ((flags) & (METASLAB_GANG_CHILD | METASLAB_GANG_HEADER))
 
 uint64_t metaslab_aliquot = 512ULL << 10;

@@ -165,15 +166,10 @@
  * Enable/disable metaslab group biasing.
  */
 boolean_t metaslab_bias_enabled = B_TRUE;
 
 /*
- * Enable/disable remapping of indirect DVAs to their concrete vdevs.
- */
-boolean_t zfs_remap_blkptr_enable = B_TRUE;
-
-/*
  * Enable/disable segment-based metaslab selection.
  */
 boolean_t zfs_metaslab_segment_weight_enabled = B_TRUE;
 
 /*

@@ -199,16 +195,50 @@
  */
 uint64_t metaslab_trace_max_entries = 5000;
 
 static uint64_t metaslab_weight(metaslab_t *);
 static void metaslab_set_fragmentation(metaslab_t *);
-static void metaslab_free_impl(vdev_t *, uint64_t, uint64_t, uint64_t);
-static void metaslab_check_free_impl(vdev_t *, uint64_t, uint64_t);
 
 kmem_cache_t *metaslab_alloc_trace_cache;
 
 /*
+ * Toggle between space-based DVA allocator 0, latency-based 1 or hybrid 2.
+ * A value other than 0, 1 or 2 will be considered 0 (default).
+ */
+int metaslab_alloc_dva_algorithm = 0;
+
+/*
+ * How many TXG's worth of updates should be aggregated per TRIM/UNMAP
+ * issued to the underlying vdev. We keep two range trees of extents
+ * (called "trim sets") to be trimmed per metaslab, the `current' and
+ * the `previous' TS. New free's are added to the current TS. Then,
+ * once `zfs_txgs_per_trim' transactions have elapsed, the `current'
+ * TS becomes the `previous' TS and a new, blank TS is created to be
+ * the new `current', which will then start accumulating any new frees.
+ * Once another zfs_txgs_per_trim TXGs have passed, the previous TS's
+ * extents are trimmed, the TS is destroyed and the current TS again
+ * becomes the previous TS.
+ * This serves to fulfill two functions: aggregate many small frees
+ * into fewer larger trim operations (which should help with devices
+ * which do not take so kindly to them) and to allow for disaster
+ * recovery (extents won't get trimmed immediately, but instead only
+ * after passing this rather long timeout, thus not preserving
+ * 'zfs import -F' functionality).
+ */
+unsigned int zfs_txgs_per_trim = 32;
+
+static void metaslab_trim_remove(void *arg, uint64_t offset, uint64_t size);
+static void metaslab_trim_add(void *arg, uint64_t offset, uint64_t size);
+
+static zio_t *metaslab_exec_trim(metaslab_t *msp);
+
+static metaslab_trimset_t *metaslab_new_trimset(uint64_t txg, kmutex_t *lock);
+static void metaslab_free_trimset(metaslab_trimset_t *ts);
+static boolean_t metaslab_check_trim_conflict(metaslab_t *msp,
+    uint64_t *offset, uint64_t size, uint64_t align, uint64_t limit);
+
+/*
  * ==========================================================================
  * Metaslab classes
  * ==========================================================================
  */
 metaslab_class_t *

@@ -216,10 +246,14 @@
 {
         metaslab_class_t *mc;
 
         mc = kmem_zalloc(sizeof (metaslab_class_t), KM_SLEEP);
 
+        mutex_init(&mc->mc_alloc_lock, NULL, MUTEX_DEFAULT, NULL);
+        avl_create(&mc->mc_alloc_tree, zio_bookmark_compare,
+            sizeof (zio_t), offsetof(zio_t, io_alloc_node));
+
         mc->mc_spa = spa;
         mc->mc_rotor = NULL;
         mc->mc_ops = ops;
         mutex_init(&mc->mc_lock, NULL, MUTEX_DEFAULT, NULL);
         refcount_create_tracked(&mc->mc_alloc_slots);

@@ -234,10 +268,13 @@
         ASSERT(mc->mc_alloc == 0);
         ASSERT(mc->mc_deferred == 0);
         ASSERT(mc->mc_space == 0);
         ASSERT(mc->mc_dspace == 0);
 
+        avl_destroy(&mc->mc_alloc_tree);
+        mutex_destroy(&mc->mc_alloc_lock);
+
         refcount_destroy(&mc->mc_alloc_slots);
         mutex_destroy(&mc->mc_lock);
         kmem_free(mc, sizeof (metaslab_class_t));
 }
 

@@ -320,11 +357,11 @@
 
                 /*
                  * Skip any holes, uninitialized top-levels, or
                  * vdevs that are not in this metalab class.
                  */
-                if (!vdev_is_concrete(tvd) || tvd->vdev_ms_shift == 0 ||
+                if (tvd->vdev_ishole || tvd->vdev_ms_shift == 0 ||
                     mg->mg_class != mc) {
                         continue;
                 }
 
                 for (i = 0; i < RANGE_TREE_HISTOGRAM_SIZE; i++)

@@ -355,14 +392,14 @@
         for (int c = 0; c < rvd->vdev_children; c++) {
                 vdev_t *tvd = rvd->vdev_child[c];
                 metaslab_group_t *mg = tvd->vdev_mg;
 
                 /*
-                 * Skip any holes, uninitialized top-levels,
-                 * or vdevs that are not in this metalab class.
+                 * Skip any holes, uninitialized top-levels, or
+                 * vdevs that are not in this metalab class.
                  */
-                if (!vdev_is_concrete(tvd) || tvd->vdev_ms_shift == 0 ||
+                if (tvd->vdev_ishole || tvd->vdev_ms_shift == 0 ||
                     mg->mg_class != mc) {
                         continue;
                 }
 
                 /*

@@ -404,11 +441,11 @@
         for (int c = 0; c < rvd->vdev_children; c++) {
                 uint64_t tspace;
                 vdev_t *tvd = rvd->vdev_child[c];
                 metaslab_group_t *mg = tvd->vdev_mg;
 
-                if (!vdev_is_concrete(tvd) || tvd->vdev_ms_shift == 0 ||
+                if (tvd->vdev_ishole || tvd->vdev_ms_shift == 0 ||
                     mg->mg_class != mc) {
                         continue;
                 }
 
                 /*

@@ -516,12 +553,10 @@
         vdev_stat_t *vs = &vd->vdev_stat;
         boolean_t was_allocatable;
         boolean_t was_initialized;
 
         ASSERT(vd == vd->vdev_top);
-        ASSERT3U(spa_config_held(mc->mc_spa, SCL_ALLOC, RW_READER), ==,
-            SCL_ALLOC);
 
         mutex_enter(&mg->mg_lock);
         was_allocatable = mg->mg_allocatable;
         was_initialized = mg->mg_initialized;
 

@@ -615,10 +650,11 @@
          * either because we never activated in the first place or
          * because we're done, and possibly removing the vdev.
          */
         ASSERT(mg->mg_activation_count <= 0);
 
+        if (mg->mg_taskq)
         taskq_destroy(mg->mg_taskq);
         avl_destroy(&mg->mg_metaslab_tree);
         mutex_destroy(&mg->mg_lock);
         refcount_destroy(&mg->mg_alloc_queue_depth);
         kmem_free(mg, sizeof (metaslab_group_t));

@@ -628,11 +664,11 @@
 metaslab_group_activate(metaslab_group_t *mg)
 {
         metaslab_class_t *mc = mg->mg_class;
         metaslab_group_t *mgprev, *mgnext;
 
-        ASSERT3U(spa_config_held(mc->mc_spa, SCL_ALLOC, RW_WRITER), !=, 0);
+        ASSERT(spa_config_held(mc->mc_spa, SCL_ALLOC, RW_WRITER));
 
         ASSERT(mc->mc_rotor != mg);
         ASSERT(mg->mg_prev == NULL);
         ASSERT(mg->mg_next == NULL);
         ASSERT(mg->mg_activation_count <= 0);

@@ -654,52 +690,27 @@
                 mgnext->mg_prev = mg;
         }
         mc->mc_rotor = mg;
 }
 
-/*
- * Passivate a metaslab group and remove it from the allocation rotor.
- * Callers must hold both the SCL_ALLOC and SCL_ZIO lock prior to passivating
- * a metaslab group. This function will momentarily drop spa_config_locks
- * that are lower than the SCL_ALLOC lock (see comment below).
- */
 void
 metaslab_group_passivate(metaslab_group_t *mg)
 {
         metaslab_class_t *mc = mg->mg_class;
-        spa_t *spa = mc->mc_spa;
         metaslab_group_t *mgprev, *mgnext;
-        int locks = spa_config_held(spa, SCL_ALL, RW_WRITER);
 
-        ASSERT3U(spa_config_held(spa, SCL_ALLOC | SCL_ZIO, RW_WRITER), ==,
-            (SCL_ALLOC | SCL_ZIO));
+        ASSERT(spa_config_held(mc->mc_spa, SCL_ALLOC, RW_WRITER));
 
         if (--mg->mg_activation_count != 0) {
                 ASSERT(mc->mc_rotor != mg);
                 ASSERT(mg->mg_prev == NULL);
                 ASSERT(mg->mg_next == NULL);
                 ASSERT(mg->mg_activation_count < 0);
                 return;
         }
 
-        /*
-         * The spa_config_lock is an array of rwlocks, ordered as
-         * follows (from highest to lowest):
-         *      SCL_CONFIG > SCL_STATE > SCL_L2ARC > SCL_ALLOC >
-         *      SCL_ZIO > SCL_FREE > SCL_VDEV
-         * (For more information about the spa_config_lock see spa_misc.c)
-         * The higher the lock, the broader its coverage. When we passivate
-         * a metaslab group, we must hold both the SCL_ALLOC and the SCL_ZIO
-         * config locks. However, the metaslab group's taskq might be trying
-         * to preload metaslabs so we must drop the SCL_ZIO lock and any
-         * lower locks to allow the I/O to complete. At a minimum,
-         * we continue to hold the SCL_ALLOC lock, which prevents any future
-         * allocations from taking place and any changes to the vdev tree.
-         */
-        spa_config_exit(spa, locks & ~(SCL_ZIO - 1), spa);
         taskq_wait(mg->mg_taskq);
-        spa_config_enter(spa, locks & ~(SCL_ZIO - 1), spa, RW_WRITER);
         metaslab_group_alloc_update(mg);
 
         mgprev = mg->mg_prev;
         mgnext = mg->mg_next;
 

@@ -1139,23 +1150,24 @@
  * This is a helper function that can be used by the allocator to find
  * a suitable block to allocate. This will search the specified AVL
  * tree looking for a block that matches the specified criteria.
  */
 static uint64_t
-metaslab_block_picker(avl_tree_t *t, uint64_t *cursor, uint64_t size,
-    uint64_t align)
+metaslab_block_picker(metaslab_t *msp, avl_tree_t *t, uint64_t *cursor,
+    uint64_t size, uint64_t align)
 {
         range_seg_t *rs = metaslab_block_find(t, *cursor, size);
 
-        while (rs != NULL) {
+        for (; rs != NULL; rs = AVL_NEXT(t, rs)) {
                 uint64_t offset = P2ROUNDUP(rs->rs_start, align);
 
-                if (offset + size <= rs->rs_end) {
+                if (offset + size <= rs->rs_end &&
+                    !metaslab_check_trim_conflict(msp, &offset, size, align,
+                    rs->rs_end)) {
                         *cursor = offset + size;
                         return (offset);
                 }
-                rs = AVL_NEXT(t, rs);
         }
 
         /*
          * If we know we've searched the whole map (*cursor == 0), give up.
          * Otherwise, reset the cursor to the beginning and try again.

@@ -1162,11 +1174,11 @@
          */
         if (*cursor == 0)
                 return (-1ULL);
 
         *cursor = 0;
-        return (metaslab_block_picker(t, cursor, size, align));
+        return (metaslab_block_picker(msp, t, cursor, size, align));
 }
 
 /*
  * ==========================================================================
  * The first-fit block allocator

@@ -1184,11 +1196,11 @@
          */
         uint64_t align = size & -size;
         uint64_t *cursor = &msp->ms_lbas[highbit64(align) - 1];
         avl_tree_t *t = &msp->ms_tree->rt_root;
 
-        return (metaslab_block_picker(t, cursor, size, align));
+        return (metaslab_block_picker(msp, t, cursor, size, align));
 }
 
 static metaslab_ops_t metaslab_ff_ops = {
         metaslab_ff_alloc
 };

@@ -1232,11 +1244,11 @@
             free_pct < metaslab_df_free_pct) {
                 t = &msp->ms_size_tree;
                 *cursor = 0;
         }
 
-        return (metaslab_block_picker(t, cursor, size, 1ULL));
+        return (metaslab_block_picker(msp, t, cursor, size, 1ULL));
 }
 
 static metaslab_ops_t metaslab_df_ops = {
         metaslab_df_alloc
 };

@@ -1264,18 +1276,24 @@
 
         ASSERT3U(*cursor_end, >=, *cursor);
 
         if ((*cursor + size) > *cursor_end) {
                 range_seg_t *rs;
-
-                rs = avl_last(&msp->ms_size_tree);
-                if (rs == NULL || (rs->rs_end - rs->rs_start) < size)
-                        return (-1ULL);
-
+                for (rs = avl_last(&msp->ms_size_tree);
+                    rs != NULL && rs->rs_end - rs->rs_start >= size;
+                    rs = AVL_PREV(&msp->ms_size_tree, rs)) {
                 *cursor = rs->rs_start;
                 *cursor_end = rs->rs_end;
+                        if (!metaslab_check_trim_conflict(msp, cursor, size,
+                            1, *cursor_end)) {
+                                /* segment appears to be acceptable */
+                                break;
         }
+                }
+                if (rs == NULL || rs->rs_end - rs->rs_start < size)
+                        return (-1ULL);
+        }
 
         offset = *cursor;
         *cursor += size;
 
         return (offset);

@@ -1307,10 +1325,12 @@
         avl_index_t where;
         range_seg_t *rs, rsearch;
         uint64_t hbit = highbit64(size);
         uint64_t *cursor = &msp->ms_lbas[hbit - 1];
         uint64_t max_size = metaslab_block_maxsize(msp);
+        /* mutable copy for adjustment by metaslab_check_trim_conflict */
+        uint64_t adjustable_start;
 
         ASSERT(MUTEX_HELD(&msp->ms_lock));
         ASSERT3U(avl_numnodes(t), ==, avl_numnodes(&msp->ms_size_tree));
 
         if (max_size < size)

@@ -1318,27 +1338,36 @@
 
         rsearch.rs_start = *cursor;
         rsearch.rs_end = *cursor + size;
 
         rs = avl_find(t, &rsearch, &where);
-        if (rs == NULL || (rs->rs_end - rs->rs_start) < size) {
+        if (rs != NULL)
+                adjustable_start = rs->rs_start;
+        if (rs == NULL || rs->rs_end - adjustable_start < size ||
+            metaslab_check_trim_conflict(msp, &adjustable_start, size, 1,
+            rs->rs_end)) {
+                /* segment not usable, try the largest remaining one */
                 t = &msp->ms_size_tree;
 
                 rsearch.rs_start = 0;
                 rsearch.rs_end = MIN(max_size,
                     1ULL << (hbit + metaslab_ndf_clump_shift));
                 rs = avl_find(t, &rsearch, &where);
                 if (rs == NULL)
                         rs = avl_nearest(t, where, AVL_AFTER);
                 ASSERT(rs != NULL);
+                adjustable_start = rs->rs_start;
+                if (rs->rs_end - adjustable_start < size ||
+                    metaslab_check_trim_conflict(msp, &adjustable_start,
+                    size, 1, rs->rs_end)) {
+                        /* even largest remaining segment not usable */
+                        return (-1ULL);
         }
-
-        if ((rs->rs_end - rs->rs_start) >= size) {
-                *cursor = rs->rs_start + size;
-                return (rs->rs_start);
         }
-        return (-1ULL);
+
+        *cursor = adjustable_start + size;
+        return (*cursor);
 }
 
 static metaslab_ops_t metaslab_ndf_ops = {
         metaslab_ndf_alloc
 };

@@ -1374,16 +1403,10 @@
         ASSERT(MUTEX_HELD(&msp->ms_lock));
         ASSERT(!msp->ms_loaded);
         ASSERT(!msp->ms_loading);
 
         msp->ms_loading = B_TRUE;
-        /*
-         * Nobody else can manipulate a loading metaslab, so it's now safe
-         * to drop the lock.  This way we don't have to hold the lock while
-         * reading the spacemap from disk.
-         */
-        mutex_exit(&msp->ms_lock);
 
         /*
          * If the space map has not been allocated yet, then treat
          * all the space in the metaslab as free and add it to the
          * ms_tree.

@@ -1392,21 +1415,21 @@
                 error = space_map_load(msp->ms_sm, msp->ms_tree, SM_FREE);
         else
                 range_tree_add(msp->ms_tree, msp->ms_start, msp->ms_size);
 
         success = (error == 0);
-
-        mutex_enter(&msp->ms_lock);
         msp->ms_loading = B_FALSE;
 
         if (success) {
                 ASSERT3P(msp->ms_group, !=, NULL);
                 msp->ms_loaded = B_TRUE;
 
                 for (int t = 0; t < TXG_DEFER_SIZE; t++) {
                         range_tree_walk(msp->ms_defertree[t],
                             range_tree_remove, msp->ms_tree);
+                        range_tree_walk(msp->ms_defertree[t],
+                            metaslab_trim_remove, msp);
                 }
                 msp->ms_max_size = metaslab_block_maxsize(msp);
         }
         cv_broadcast(&msp->ms_load_cv);
         return (error);

@@ -1431,12 +1454,12 @@
         metaslab_t *ms;
         int error;
 
         ms = kmem_zalloc(sizeof (metaslab_t), KM_SLEEP);
         mutex_init(&ms->ms_lock, NULL, MUTEX_DEFAULT, NULL);
-        mutex_init(&ms->ms_sync_lock, NULL, MUTEX_DEFAULT, NULL);
         cv_init(&ms->ms_load_cv, NULL, CV_DEFAULT, NULL);
+        cv_init(&ms->ms_trim_cv, NULL, CV_DEFAULT, NULL);
         ms->ms_id = id;
         ms->ms_start = id << vd->vdev_ms_shift;
         ms->ms_size = 1ULL << vd->vdev_ms_shift;
 
         /*

@@ -1443,28 +1466,30 @@
          * We only open space map objects that already exist. All others
          * will be opened when we finally allocate an object for it.
          */
         if (object != 0) {
                 error = space_map_open(&ms->ms_sm, mos, object, ms->ms_start,
-                    ms->ms_size, vd->vdev_ashift);
+                    ms->ms_size, vd->vdev_ashift, &ms->ms_lock);
 
                 if (error != 0) {
                         kmem_free(ms, sizeof (metaslab_t));
                         return (error);
                 }
 
                 ASSERT(ms->ms_sm != NULL);
         }
 
+        ms->ms_cur_ts = metaslab_new_trimset(0, &ms->ms_lock);
+
         /*
          * We create the main range tree here, but we don't create the
          * other range trees until metaslab_sync_done().  This serves
          * two purposes: it allows metaslab_sync_done() to detect the
          * addition of new space; and for debugging, it ensures that we'd
          * data fault on any attempt to use this metaslab before it's ready.
          */
-        ms->ms_tree = range_tree_create(&metaslab_rt_ops, ms);
+        ms->ms_tree = range_tree_create(&metaslab_rt_ops, ms, &ms->ms_lock);
         metaslab_group_add(mg, ms);
 
         metaslab_set_fragmentation(ms);
 
         /*

@@ -1524,16 +1549,21 @@
 
         for (int t = 0; t < TXG_DEFER_SIZE; t++) {
                 range_tree_destroy(msp->ms_defertree[t]);
         }
 
+        metaslab_free_trimset(msp->ms_cur_ts);
+        if (msp->ms_prev_ts)
+                metaslab_free_trimset(msp->ms_prev_ts);
+        ASSERT3P(msp->ms_trimming_ts, ==, NULL);
+
         ASSERT0(msp->ms_deferspace);
 
         mutex_exit(&msp->ms_lock);
         cv_destroy(&msp->ms_load_cv);
+        cv_destroy(&msp->ms_trim_cv);
         mutex_destroy(&msp->ms_lock);
-        mutex_destroy(&msp->ms_sync_lock);
 
         kmem_free(msp, sizeof (metaslab_t));
 }
 
 #define FRAGMENTATION_TABLE_SIZE        17

@@ -1895,15 +1925,18 @@
         uint64_t weight;
 
         ASSERT(MUTEX_HELD(&msp->ms_lock));
 
         /*
-         * If this vdev is in the process of being removed, there is nothing
+         * This vdev is in the process of being removed so there is nothing
          * for us to do here.
          */
-        if (vd->vdev_removing)
+        if (vd->vdev_removing) {
+                ASSERT0(space_map_allocated(msp->ms_sm));
+                ASSERT0(vd->vdev_ms_shift);
                 return (0);
+        }
 
         metaslab_set_fragmentation(msp);
 
         /*
          * Update the maximum size if the metaslab is loaded. This will

@@ -2031,17 +2064,14 @@
                 taskq_wait(mg->mg_taskq);
                 return;
         }
 
         mutex_enter(&mg->mg_lock);
-
         /*
          * Load the next potential metaslabs
          */
         for (msp = avl_first(t); msp != NULL; msp = AVL_NEXT(t, msp)) {
-                ASSERT3P(msp->ms_group, ==, mg);
-
                 /*
                  * We preload only the maximum number of metaslabs specified
                  * by metaslab_preload_limit. If a metaslab is being forced
                  * to condense then we preload it too. This will ensure
                  * that force condensing happens in the next txg.

@@ -2064,11 +2094,11 @@
  * 1. The size of the space map object should not dramatically increase as a
  * result of writing out the free space range tree.
  *
  * 2. The minimal on-disk space map representation is zfs_condense_pct/100
  * times the size than the free space range tree representation
- * (i.e. zfs_condense_pct = 110 and in-core = 1MB, minimal = 1.1MB).
+ * (i.e. zfs_condense_pct = 110 and in-core = 1MB, minimal = 1.1.MB).
  *
  * 3. The on-disk size of the space map should actually decrease.
  *
  * Checking the first condition is tricky since we don't want to walk
  * the entire AVL tree calculating the estimated on-disk size. Instead we

@@ -2161,11 +2191,11 @@
          * that have been freed in this txg, any deferred frees that exist,
          * and any allocation in the future. Removing segments should be
          * a relatively inexpensive operation since we expect these trees to
          * have a small number of nodes.
          */
-        condense_tree = range_tree_create(NULL, NULL);
+        condense_tree = range_tree_create(NULL, NULL, &msp->ms_lock);
         range_tree_add(condense_tree, msp->ms_start, msp->ms_size);
 
         /*
          * Remove what's been freed in this txg from the condense_tree.
          * Since we're in sync_pass 1, we know that all the frees from

@@ -2194,10 +2224,11 @@
          */
         msp->ms_condensing = B_TRUE;
 
         mutex_exit(&msp->ms_lock);
         space_map_truncate(sm, tx);
+        mutex_enter(&msp->ms_lock);
 
         /*
          * While we would ideally like to create a space map representation
          * that consists only of allocation records, doing so can be
          * prohibitively expensive because the in-core free tree can be

@@ -2210,11 +2241,10 @@
         space_map_write(sm, condense_tree, SM_ALLOC, tx);
         range_tree_vacate(condense_tree, NULL, NULL);
         range_tree_destroy(condense_tree);
 
         space_map_write(sm, msp->ms_tree, SM_FREE, tx);
-        mutex_enter(&msp->ms_lock);
         msp->ms_condensing = B_FALSE;
 }
 
 /*
  * Write a metaslab to disk in the context of the specified transaction group.

@@ -2230,15 +2260,18 @@
         dmu_tx_t *tx;
         uint64_t object = space_map_object(msp->ms_sm);
 
         ASSERT(!vd->vdev_ishole);
 
+        mutex_enter(&msp->ms_lock);
+
         /*
          * This metaslab has just been added so there's no work to do now.
          */
         if (msp->ms_freeingtree == NULL) {
                 ASSERT3P(alloctree, ==, NULL);
+                mutex_exit(&msp->ms_lock);
                 return;
         }
 
         ASSERT3P(alloctree, !=, NULL);
         ASSERT3P(msp->ms_freeingtree, !=, NULL);

@@ -2250,28 +2283,26 @@
          * is being forced to condense and it's loaded, we need to let it
          * through.
          */
         if (range_tree_space(alloctree) == 0 &&
             range_tree_space(msp->ms_freeingtree) == 0 &&
-            !(msp->ms_loaded && msp->ms_condense_wanted))
+            !(msp->ms_loaded && msp->ms_condense_wanted)) {
+                mutex_exit(&msp->ms_lock);
                 return;
+        }
 
 
         VERIFY(txg <= spa_final_dirty_txg(spa));
 
         /*
          * The only state that can actually be changing concurrently with
          * metaslab_sync() is the metaslab's ms_tree.  No other thread can
          * be modifying this txg's alloctree, freeingtree, freedtree, or
-         * space_map_phys_t.  We drop ms_lock whenever we could call
-         * into the DMU, because the DMU can call down to us
-         * (e.g. via zio_free()) at any time.
-         *
-         * The spa_vdev_remove_thread() can be reading metaslab state
-         * concurrently, and it is locked out by the ms_sync_lock.  Note
-         * that the ms_lock is insufficient for this, because it is dropped
-         * by space_map_write().
+         * space_map_phys_t. Therefore, we only hold ms_lock to satify
+         * space map ASSERTs. We drop it whenever we call into the DMU,
+         * because the DMU can call down to us (e.g. via zio_free()) at
+         * any time.
          */
 
         tx = dmu_tx_create_assigned(spa_get_dsl(spa), txg);
 
         if (msp->ms_sm == NULL) {

@@ -2279,17 +2310,15 @@
 
                 new_object = space_map_alloc(mos, tx);
                 VERIFY3U(new_object, !=, 0);
 
                 VERIFY0(space_map_open(&msp->ms_sm, mos, new_object,
-                    msp->ms_start, msp->ms_size, vd->vdev_ashift));
+                    msp->ms_start, msp->ms_size, vd->vdev_ashift,
+                    &msp->ms_lock));
                 ASSERT(msp->ms_sm != NULL);
         }
 
-        mutex_enter(&msp->ms_sync_lock);
-        mutex_enter(&msp->ms_lock);
-
         /*
          * Note: metaslab_condense() clears the space map's histogram.
          * Therefore we must verify and remove this histogram before
          * condensing.
          */

@@ -2299,19 +2328,17 @@
 
         if (msp->ms_loaded && spa_sync_pass(spa) == 1 &&
             metaslab_should_condense(msp)) {
                 metaslab_condense(msp, txg, tx);
         } else {
-                mutex_exit(&msp->ms_lock);
                 space_map_write(msp->ms_sm, alloctree, SM_ALLOC, tx);
                 space_map_write(msp->ms_sm, msp->ms_freeingtree, SM_FREE, tx);
-                mutex_enter(&msp->ms_lock);
         }
 
         if (msp->ms_loaded) {
                 /*
-                 * When the space map is loaded, we have an accurate
+                 * When the space map is loaded, we have an accruate
                  * histogram in the range tree. This gives us an opportunity
                  * to bring the space map's histogram up-to-date so we clear
                  * it first before updating it.
                  */
                 space_map_histogram_clear(msp->ms_sm);

@@ -2375,11 +2402,10 @@
         if (object != space_map_object(msp->ms_sm)) {
                 object = space_map_object(msp->ms_sm);
                 dmu_write(mos, vd->vdev_ms_array, sizeof (uint64_t) *
                     msp->ms_id, sizeof (uint64_t), &object, tx);
         }
-        mutex_exit(&msp->ms_sync_lock);
         dmu_tx_commit(tx);
 }
 
 /*
  * Called after a transaction group has completely synced to mark

@@ -2405,33 +2431,37 @@
          */
         if (msp->ms_freedtree == NULL) {
                 for (int t = 0; t < TXG_SIZE; t++) {
                         ASSERT(msp->ms_alloctree[t] == NULL);
 
-                        msp->ms_alloctree[t] = range_tree_create(NULL, NULL);
+                        msp->ms_alloctree[t] = range_tree_create(NULL, msp,
+                            &msp->ms_lock);
                 }
 
                 ASSERT3P(msp->ms_freeingtree, ==, NULL);
-                msp->ms_freeingtree = range_tree_create(NULL, NULL);
+                msp->ms_freeingtree = range_tree_create(NULL, msp,
+                    &msp->ms_lock);
 
                 ASSERT3P(msp->ms_freedtree, ==, NULL);
-                msp->ms_freedtree = range_tree_create(NULL, NULL);
+                msp->ms_freedtree = range_tree_create(NULL, msp,
+                    &msp->ms_lock);
 
                 for (int t = 0; t < TXG_DEFER_SIZE; t++) {
                         ASSERT(msp->ms_defertree[t] == NULL);
 
-                        msp->ms_defertree[t] = range_tree_create(NULL, NULL);
+                        msp->ms_defertree[t] = range_tree_create(NULL, msp,
+                            &msp->ms_lock);
                 }
 
                 vdev_space_update(vd, 0, 0, msp->ms_size);
         }
 
         defer_tree = &msp->ms_defertree[txg % TXG_DEFER_SIZE];
 
         uint64_t free_space = metaslab_class_get_space(spa_normal_class(spa)) -
             metaslab_class_get_alloc(spa_normal_class(spa));
-        if (free_space <= spa_get_slop_space(spa) || vd->vdev_removing) {
+        if (free_space <= spa_get_slop_space(spa)) {
                 defer_allowed = B_FALSE;
         }
 
         defer_delta = 0;
         alloc_delta = space_map_alloc_delta(msp->ms_sm);

@@ -2454,10 +2484,18 @@
          * Move the frees from the defer_tree back to the free
          * range tree (if it's loaded). Swap the freed_tree and the
          * defer_tree -- this is safe to do because we've just emptied out
          * the defer_tree.
          */
+        if (spa_get_auto_trim(spa) == SPA_AUTO_TRIM_ON &&
+            !vd->vdev_man_trimming) {
+                range_tree_walk(*defer_tree, metaslab_trim_add, msp);
+                if (!defer_allowed) {
+                        range_tree_walk(msp->ms_freedtree, metaslab_trim_add,
+                            msp);
+                }
+        }
         range_tree_vacate(*defer_tree,
             msp->ms_loaded ? range_tree_add : NULL, msp->ms_tree);
         if (defer_allowed) {
                 range_tree_swap(&msp->ms_freedtree, defer_tree);
         } else {

@@ -2497,37 +2535,23 @@
 
                 if (!metaslab_debug_unload)
                         metaslab_unload(msp);
         }
 
-        ASSERT0(range_tree_space(msp->ms_alloctree[txg & TXG_MASK]));
-        ASSERT0(range_tree_space(msp->ms_freeingtree));
-        ASSERT0(range_tree_space(msp->ms_freedtree));
-
         mutex_exit(&msp->ms_lock);
 }
 
 void
 metaslab_sync_reassess(metaslab_group_t *mg)
 {
-        spa_t *spa = mg->mg_class->mc_spa;
-
-        spa_config_enter(spa, SCL_ALLOC, FTAG, RW_READER);
         metaslab_group_alloc_update(mg);
         mg->mg_fragmentation = metaslab_group_fragmentation(mg);
 
         /*
-         * Preload the next potential metaslabs but only on active
-         * metaslab groups. We can get into a state where the metaslab
-         * is no longer active since we dirty metaslabs as we remove a
-         * a device, thus potentially making the metaslab group eligible
-         * for preloading.
+         * Preload the next potential metaslabs
          */
-        if (mg->mg_activation_count > 0) {
                 metaslab_group_preload(mg);
-        }
-        spa_config_exit(spa, SCL_ALLOC, FTAG);
 }
 
 static uint64_t
 metaslab_distance(metaslab_t *msp, dva_t *dva)
 {

@@ -2717,10 +2741,11 @@
 
                 VERIFY0(P2PHASE(start, 1ULL << vd->vdev_ashift));
                 VERIFY0(P2PHASE(size, 1ULL << vd->vdev_ashift));
                 VERIFY3U(range_tree_space(rt) - size, <=, msp->ms_size);
                 range_tree_remove(rt, start, size);
+                metaslab_trim_remove(msp, start, size);
 
                 if (range_tree_space(msp->ms_alloctree[txg & TXG_MASK]) == 0)
                         vdev_dirty(mg->mg_vd, VDD_METASLAB, msp, txg);
 
                 range_tree_add(msp->ms_alloctree[txg & TXG_MASK], start, size);

@@ -2738,11 +2763,12 @@
         return (start);
 }
 
 static uint64_t
 metaslab_group_alloc_normal(metaslab_group_t *mg, zio_alloc_list_t *zal,
-    uint64_t asize, uint64_t txg, uint64_t min_distance, dva_t *dva, int d)
+    uint64_t asize, uint64_t txg, uint64_t min_distance, dva_t *dva, int d,
+    int flags)
 {
         metaslab_t *msp = NULL;
         uint64_t offset = -1ULL;
         uint64_t activation_weight;
         uint64_t target_distance;

@@ -2759,10 +2785,11 @@
         metaslab_t *search = kmem_alloc(sizeof (*search), KM_SLEEP);
         search->ms_weight = UINT64_MAX;
         search->ms_start = 0;
         for (;;) {
                 boolean_t was_active;
+                boolean_t pass_primary = B_TRUE;
                 avl_tree_t *t = &mg->mg_metaslab_tree;
                 avl_index_t idx;
 
                 mutex_enter(&mg->mg_lock);
 

@@ -2796,25 +2823,36 @@
                          */
                         if (msp->ms_condensing)
                                 continue;
 
                         was_active = msp->ms_weight & METASLAB_ACTIVE_MASK;
-                        if (activation_weight == METASLAB_WEIGHT_PRIMARY)
+                        if (flags & METASLAB_USE_WEIGHT_SECONDARY) {
+                                if (!pass_primary) {
+                                        DTRACE_PROBE(metaslab_use_secondary);
+                                        activation_weight =
+                                            METASLAB_WEIGHT_SECONDARY;
                                 break;
+                                }
 
+                                pass_primary = B_FALSE;
+                        } else {
+                                if (activation_weight ==
+                                    METASLAB_WEIGHT_PRIMARY)
+                                        break;
+
                         target_distance = min_distance +
                             (space_map_allocated(msp->ms_sm) != 0 ? 0 :
                             min_distance >> 1);
 
-                        for (i = 0; i < d; i++) {
+                                for (i = 0; i < d; i++)
                                 if (metaslab_distance(msp, &dva[i]) <
                                     target_distance)
                                         break;
-                        }
                         if (i == d)
                                 break;
                 }
+                }
                 mutex_exit(&mg->mg_lock);
                 if (msp == NULL) {
                         kmem_free(search, sizeof (*search));
                         return (-1ULL);
                 }

@@ -2931,17 +2969,18 @@
         return (offset);
 }
 
 static uint64_t
 metaslab_group_alloc(metaslab_group_t *mg, zio_alloc_list_t *zal,
-    uint64_t asize, uint64_t txg, uint64_t min_distance, dva_t *dva, int d)
+    uint64_t asize, uint64_t txg, uint64_t min_distance, dva_t *dva,
+    int d, int flags)
 {
         uint64_t offset;
         ASSERT(mg->mg_initialized);
 
         offset = metaslab_group_alloc_normal(mg, zal, asize, txg,
-            min_distance, dva, d);
+            min_distance, dva, d, flags);
 
         mutex_enter(&mg->mg_lock);
         if (offset == -1ULL) {
                 mg->mg_failed_allocations++;
                 metaslab_trace_add(zal, mg, NULL, asize, d,

@@ -2975,11 +3014,11 @@
 int ditto_same_vdev_distance_shift = 3;
 
 /*
  * Allocate a block for the specified i/o.
  */
-int
+static int
 metaslab_alloc_dva(spa_t *spa, metaslab_class_t *mc, uint64_t psize,
     dva_t *dva, int d, dva_t *hintdva, uint64_t txg, int flags,
     zio_alloc_list_t *zal)
 {
         metaslab_group_t *mg, *rotor;

@@ -3021,15 +3060,14 @@
         if (hintdva) {
                 vd = vdev_lookup_top(spa, DVA_GET_VDEV(&hintdva[d]));
 
                 /*
                  * It's possible the vdev we're using as the hint no
-                 * longer exists or its mg has been closed (e.g. by
-                 * device removal).  Consult the rotor when
+                 * longer exists (i.e. removed). Consult the rotor when
                  * all else fails.
                  */
-                if (vd != NULL && vd->vdev_mg != NULL) {
+                if (vd != NULL) {
                         mg = vd->vdev_mg;
 
                         if (flags & METASLAB_HINTBP_AVOID &&
                             mg->mg_next != NULL)
                                 mg = mg->mg_next;

@@ -3120,11 +3158,11 @@
 
                 uint64_t asize = vdev_psize_to_asize(vd, psize);
                 ASSERT(P2PHASE(asize, 1ULL << vd->vdev_ashift) == 0);
 
                 uint64_t offset = metaslab_group_alloc(mg, zal, asize, txg,
-                    distance, dva, d);
+                    distance, dva, d, flags);
 
                 if (offset != -1ULL) {
                         /*
                          * If we've just selected this metaslab group,
                          * figure out whether the corresponding vdev is

@@ -3131,14 +3169,19 @@
                          * over- or under-used relative to the pool,
                          * and set an allocation bias to even it out.
                          */
                         if (mc->mc_aliquot == 0 && metaslab_bias_enabled) {
                                 vdev_stat_t *vs = &vd->vdev_stat;
-                                int64_t vu, cu;
+                                vdev_stat_t *pvs = &vd->vdev_parent->vdev_stat;
+                                int64_t vu, cu, vu_io;
 
                                 vu = (vs->vs_alloc * 100) / (vs->vs_space + 1);
                                 cu = (mc->mc_alloc * 100) / (mc->mc_space + 1);
+                                vu_io =
+                                    (((vs->vs_iotime[ZIO_TYPE_WRITE] * 100) /
+                                    (pvs->vs_iotime[ZIO_TYPE_WRITE] + 1)) *
+                                    (vd->vdev_parent->vdev_children)) - 100;
 
                                 /*
                                  * Calculate how much more or less we should
                                  * try to allocate from this device during
                                  * this iteration around the rotor.

@@ -3151,10 +3194,29 @@
                                  * This reduces allocations by 307K for this
                                  * iteration.
                                  */
                                 mg->mg_bias = ((cu - vu) *
                                     (int64_t)mg->mg_aliquot) / 100;
+
+                                /*
+                                 * Experiment: space-based DVA allocator 0,
+                                 * latency-based 1 or hybrid 2.
+                                 */
+                                switch (metaslab_alloc_dva_algorithm) {
+                                case 1:
+                                        mg->mg_bias =
+                                            (vu_io * (int64_t)mg->mg_aliquot) /
+                                            100;
+                                        break;
+                                case 2:
+                                        mg->mg_bias =
+                                            ((((cu - vu) + vu_io) / 2) *
+                                            (int64_t)mg->mg_aliquot) / 100;
+                                        break;
+                                default:
+                                        break;
+                                }
                         } else if (!metaslab_bias_enabled) {
                                 mg->mg_bias = 0;
                         }
 
                         if (atomic_add_64_nv(&mc->mc_aliquot, asize) >=

@@ -3165,10 +3227,12 @@
 
                         DVA_SET_VDEV(&dva[d], vd->vdev_id);
                         DVA_SET_OFFSET(&dva[d], offset);
                         DVA_SET_GANG(&dva[d], !!(flags & METASLAB_GANG_HEADER));
                         DVA_SET_ASIZE(&dva[d], asize);
+                        DTRACE_PROBE3(alloc_dva_probe, uint64_t, vd->vdev_id,
+                            uint64_t, offset, uint64_t, psize);
 
                         return (0);
                 }
 next:
                 mc->mc_rotor = mg->mg_next;

@@ -3187,232 +3251,27 @@
 
         metaslab_trace_add(zal, rotor, NULL, psize, d, TRACE_ENOSPC);
         return (SET_ERROR(ENOSPC));
 }
 
-void
-metaslab_free_concrete(vdev_t *vd, uint64_t offset, uint64_t asize,
-    uint64_t txg)
-{
-        metaslab_t *msp;
-        spa_t *spa = vd->vdev_spa;
-
-        ASSERT3U(txg, ==, spa->spa_syncing_txg);
-        ASSERT(vdev_is_concrete(vd));
-        ASSERT3U(spa_config_held(spa, SCL_ALL, RW_READER), !=, 0);
-        ASSERT3U(offset >> vd->vdev_ms_shift, <, vd->vdev_ms_count);
-
-        msp = vd->vdev_ms[offset >> vd->vdev_ms_shift];
-
-        VERIFY(!msp->ms_condensing);
-        VERIFY3U(offset, >=, msp->ms_start);
-        VERIFY3U(offset + asize, <=, msp->ms_start + msp->ms_size);
-        VERIFY0(P2PHASE(offset, 1ULL << vd->vdev_ashift));
-        VERIFY0(P2PHASE(asize, 1ULL << vd->vdev_ashift));
-
-        metaslab_check_free_impl(vd, offset, asize);
-        mutex_enter(&msp->ms_lock);
-        if (range_tree_space(msp->ms_freeingtree) == 0) {
-                vdev_dirty(vd, VDD_METASLAB, msp, txg);
-        }
-        range_tree_add(msp->ms_freeingtree, offset, asize);
-        mutex_exit(&msp->ms_lock);
-}
-
-/* ARGSUSED */
-void
-metaslab_free_impl_cb(uint64_t inner_offset, vdev_t *vd, uint64_t offset,
-    uint64_t size, void *arg)
-{
-        uint64_t *txgp = arg;
-
-        if (vd->vdev_ops->vdev_op_remap != NULL)
-                vdev_indirect_mark_obsolete(vd, offset, size, *txgp);
-        else
-                metaslab_free_impl(vd, offset, size, *txgp);
-}
-
-static void
-metaslab_free_impl(vdev_t *vd, uint64_t offset, uint64_t size,
-    uint64_t txg)
-{
-        spa_t *spa = vd->vdev_spa;
-
-        ASSERT3U(spa_config_held(spa, SCL_ALL, RW_READER), !=, 0);
-
-        if (txg > spa_freeze_txg(spa))
-                return;
-
-        if (spa->spa_vdev_removal != NULL &&
-            spa->spa_vdev_removal->svr_vdev == vd &&
-            vdev_is_concrete(vd)) {
-                /*
-                 * Note: we check if the vdev is concrete because when
-                 * we complete the removal, we first change the vdev to be
-                 * an indirect vdev (in open context), and then (in syncing
-                 * context) clear spa_vdev_removal.
-                 */
-                free_from_removing_vdev(vd, offset, size, txg);
-        } else if (vd->vdev_ops->vdev_op_remap != NULL) {
-                vdev_indirect_mark_obsolete(vd, offset, size, txg);
-                vd->vdev_ops->vdev_op_remap(vd, offset, size,
-                    metaslab_free_impl_cb, &txg);
-        } else {
-                metaslab_free_concrete(vd, offset, size, txg);
-        }
-}
-
-typedef struct remap_blkptr_cb_arg {
-        blkptr_t *rbca_bp;
-        spa_remap_cb_t rbca_cb;
-        vdev_t *rbca_remap_vd;
-        uint64_t rbca_remap_offset;
-        void *rbca_cb_arg;
-} remap_blkptr_cb_arg_t;
-
-void
-remap_blkptr_cb(uint64_t inner_offset, vdev_t *vd, uint64_t offset,
-    uint64_t size, void *arg)
-{
-        remap_blkptr_cb_arg_t *rbca = arg;
-        blkptr_t *bp = rbca->rbca_bp;
-
-        /* We can not remap split blocks. */
-        if (size != DVA_GET_ASIZE(&bp->blk_dva[0]))
-                return;
-        ASSERT0(inner_offset);
-
-        if (rbca->rbca_cb != NULL) {
-                /*
-                 * At this point we know that we are not handling split
-                 * blocks and we invoke the callback on the previous
-                 * vdev which must be indirect.
-                 */
-                ASSERT3P(rbca->rbca_remap_vd->vdev_ops, ==, &vdev_indirect_ops);
-
-                rbca->rbca_cb(rbca->rbca_remap_vd->vdev_id,
-                    rbca->rbca_remap_offset, size, rbca->rbca_cb_arg);
-
-                /* set up remap_blkptr_cb_arg for the next call */
-                rbca->rbca_remap_vd = vd;
-                rbca->rbca_remap_offset = offset;
-        }
-
-        /*
-         * The phys birth time is that of dva[0].  This ensures that we know
-         * when each dva was written, so that resilver can determine which
-         * blocks need to be scrubbed (i.e. those written during the time
-         * the vdev was offline).  It also ensures that the key used in
-         * the ARC hash table is unique (i.e. dva[0] + phys_birth).  If
-         * we didn't change the phys_birth, a lookup in the ARC for a
-         * remapped BP could find the data that was previously stored at
-         * this vdev + offset.
-         */
-        vdev_t *oldvd = vdev_lookup_top(vd->vdev_spa,
-            DVA_GET_VDEV(&bp->blk_dva[0]));
-        vdev_indirect_births_t *vib = oldvd->vdev_indirect_births;
-        bp->blk_phys_birth = vdev_indirect_births_physbirth(vib,
-            DVA_GET_OFFSET(&bp->blk_dva[0]), DVA_GET_ASIZE(&bp->blk_dva[0]));
-
-        DVA_SET_VDEV(&bp->blk_dva[0], vd->vdev_id);
-        DVA_SET_OFFSET(&bp->blk_dva[0], offset);
-}
-
 /*
- * If the block pointer contains any indirect DVAs, modify them to refer to
- * concrete DVAs.  Note that this will sometimes not be possible, leaving
- * the indirect DVA in place.  This happens if the indirect DVA spans multiple
- * segments in the mapping (i.e. it is a "split block").
- *
- * If the BP was remapped, calls the callback on the original dva (note the
- * callback can be called multiple times if the original indirect DVA refers
- * to another indirect DVA, etc).
- *
- * Returns TRUE if the BP was remapped.
+ * Free the block represented by DVA in the context of the specified
+ * transaction group.
  */
-boolean_t
-spa_remap_blkptr(spa_t *spa, blkptr_t *bp, spa_remap_cb_t callback, void *arg)
-{
-        remap_blkptr_cb_arg_t rbca;
-
-        if (!zfs_remap_blkptr_enable)
-                return (B_FALSE);
-
-        if (!spa_feature_is_enabled(spa, SPA_FEATURE_OBSOLETE_COUNTS))
-                return (B_FALSE);
-
-        /*
-         * Dedup BP's can not be remapped, because ddt_phys_select() depends
-         * on DVA[0] being the same in the BP as in the DDT (dedup table).
-         */
-        if (BP_GET_DEDUP(bp))
-                return (B_FALSE);
-
-        /*
-         * Gang blocks can not be remapped, because
-         * zio_checksum_gang_verifier() depends on the DVA[0] that's in
-         * the BP used to read the gang block header (GBH) being the same
-         * as the DVA[0] that we allocated for the GBH.
-         */
-        if (BP_IS_GANG(bp))
-                return (B_FALSE);
-
-        /*
-         * Embedded BP's have no DVA to remap.
-         */
-        if (BP_GET_NDVAS(bp) < 1)
-                return (B_FALSE);
-
-        /*
-         * Note: we only remap dva[0].  If we remapped other dvas, we
-         * would no longer know what their phys birth txg is.
-         */
-        dva_t *dva = &bp->blk_dva[0];
-
-        uint64_t offset = DVA_GET_OFFSET(dva);
-        uint64_t size = DVA_GET_ASIZE(dva);
-        vdev_t *vd = vdev_lookup_top(spa, DVA_GET_VDEV(dva));
-
-        if (vd->vdev_ops->vdev_op_remap == NULL)
-                return (B_FALSE);
-
-        rbca.rbca_bp = bp;
-        rbca.rbca_cb = callback;
-        rbca.rbca_remap_vd = vd;
-        rbca.rbca_remap_offset = offset;
-        rbca.rbca_cb_arg = arg;
-
-        /*
-         * remap_blkptr_cb() will be called in order for each level of
-         * indirection, until a concrete vdev is reached or a split block is
-         * encountered. old_vd and old_offset are updated within the callback
-         * as we go from the one indirect vdev to the next one (either concrete
-         * or indirect again) in that order.
-         */
-        vd->vdev_ops->vdev_op_remap(vd, offset, size, remap_blkptr_cb, &rbca);
-
-        /* Check if the DVA wasn't remapped because it is a split block */
-        if (DVA_GET_VDEV(&rbca.rbca_bp->blk_dva[0]) == vd->vdev_id)
-                return (B_FALSE);
-
-        return (B_TRUE);
-}
-
-/*
- * Undo the allocation of a DVA which happened in the given transaction group.
- */
 void
-metaslab_unalloc_dva(spa_t *spa, const dva_t *dva, uint64_t txg)
+metaslab_free_dva(spa_t *spa, const dva_t *dva, uint64_t txg, boolean_t now)
 {
-        metaslab_t *msp;
-        vdev_t *vd;
         uint64_t vdev = DVA_GET_VDEV(dva);
         uint64_t offset = DVA_GET_OFFSET(dva);
         uint64_t size = DVA_GET_ASIZE(dva);
+        vdev_t *vd;
+        metaslab_t *msp;
 
+        DTRACE_PROBE3(free_dva_probe, uint64_t, vdev,
+            uint64_t, offset, uint64_t, size);
+
         ASSERT(DVA_IS_VALID(dva));
-        ASSERT3U(spa_config_held(spa, SCL_ALL, RW_READER), !=, 0);
 
         if (txg > spa_freeze_txg(spa))
                 return;
 
         if ((vd = vdev_lookup_top(spa, vdev)) == NULL ||

@@ -3421,21 +3280,18 @@
                     (u_longlong_t)vdev, (u_longlong_t)offset);
                 ASSERT(0);
                 return;
         }
 
-        ASSERT(!vd->vdev_removing);
-        ASSERT(vdev_is_concrete(vd));
-        ASSERT0(vd->vdev_indirect_config.vic_mapping_object);
-        ASSERT3P(vd->vdev_indirect_mapping, ==, NULL);
+        msp = vd->vdev_ms[offset >> vd->vdev_ms_shift];
 
         if (DVA_GET_GANG(dva))
                 size = vdev_psize_to_asize(vd, SPA_GANGBLOCKSIZE);
 
-        msp = vd->vdev_ms[offset >> vd->vdev_ms_shift];
-
         mutex_enter(&msp->ms_lock);
+
+        if (now) {
         range_tree_remove(msp->ms_alloctree[txg & TXG_MASK],
             offset, size);
 
         VERIFY(!msp->ms_condensing);
         VERIFY3U(offset, >=, msp->ms_start);

@@ -3443,33 +3299,80 @@
         VERIFY3U(range_tree_space(msp->ms_tree) + size, <=,
             msp->ms_size);
         VERIFY0(P2PHASE(offset, 1ULL << vd->vdev_ashift));
         VERIFY0(P2PHASE(size, 1ULL << vd->vdev_ashift));
         range_tree_add(msp->ms_tree, offset, size);
+                if (spa_get_auto_trim(spa) == SPA_AUTO_TRIM_ON &&
+                    !vd->vdev_man_trimming)
+                        metaslab_trim_add(msp, offset, size);
+                msp->ms_max_size = metaslab_block_maxsize(msp);
+        } else {
+                VERIFY3U(txg, ==, spa->spa_syncing_txg);
+                if (range_tree_space(msp->ms_freeingtree) == 0)
+                        vdev_dirty(vd, VDD_METASLAB, msp, txg);
+                range_tree_add(msp->ms_freeingtree, offset, size);
+        }
+
         mutex_exit(&msp->ms_lock);
 }
 
 /*
- * Free the block represented by DVA in the context of the specified
- * transaction group.
+ * Intent log support: upon opening the pool after a crash, notify the SPA
+ * of blocks that the intent log has allocated for immediate write, but
+ * which are still considered free by the SPA because the last transaction
+ * group didn't commit yet.
  */
-void
-metaslab_free_dva(spa_t *spa, const dva_t *dva, uint64_t txg)
+static int
+metaslab_claim_dva(spa_t *spa, const dva_t *dva, uint64_t txg)
 {
         uint64_t vdev = DVA_GET_VDEV(dva);
         uint64_t offset = DVA_GET_OFFSET(dva);
         uint64_t size = DVA_GET_ASIZE(dva);
-        vdev_t *vd = vdev_lookup_top(spa, vdev);
+        vdev_t *vd;
+        metaslab_t *msp;
+        int error = 0;
 
         ASSERT(DVA_IS_VALID(dva));
-        ASSERT3U(spa_config_held(spa, SCL_ALL, RW_READER), !=, 0);
 
-        if (DVA_GET_GANG(dva)) {
+        if ((vd = vdev_lookup_top(spa, vdev)) == NULL ||
+            (offset >> vd->vdev_ms_shift) >= vd->vdev_ms_count)
+                return (SET_ERROR(ENXIO));
+
+        msp = vd->vdev_ms[offset >> vd->vdev_ms_shift];
+
+        if (DVA_GET_GANG(dva))
                 size = vdev_psize_to_asize(vd, SPA_GANGBLOCKSIZE);
+
+        mutex_enter(&msp->ms_lock);
+
+        if ((txg != 0 && spa_writeable(spa)) || !msp->ms_loaded)
+                error = metaslab_activate(msp, METASLAB_WEIGHT_SECONDARY);
+
+        if (error == 0 && !range_tree_contains(msp->ms_tree, offset, size))
+                error = SET_ERROR(ENOENT);
+
+        if (error || txg == 0) {        /* txg == 0 indicates dry run */
+                mutex_exit(&msp->ms_lock);
+                return (error);
         }
 
-        metaslab_free_impl(vd, offset, size, txg);
+        VERIFY(!msp->ms_condensing);
+        VERIFY0(P2PHASE(offset, 1ULL << vd->vdev_ashift));
+        VERIFY0(P2PHASE(size, 1ULL << vd->vdev_ashift));
+        VERIFY3U(range_tree_space(msp->ms_tree) - size, <=, msp->ms_size);
+        range_tree_remove(msp->ms_tree, offset, size);
+        metaslab_trim_remove(msp, offset, size);
+
+        if (spa_writeable(spa)) {       /* don't dirty if we're zdb(1M) */
+                if (range_tree_space(msp->ms_alloctree[txg & TXG_MASK]) == 0)
+                        vdev_dirty(vd, VDD_METASLAB, msp, txg);
+                range_tree_add(msp->ms_alloctree[txg & TXG_MASK], offset, size);
+        }
+
+        mutex_exit(&msp->ms_lock);
+
+        return (0);
 }
 
 /*
  * Reserve some allocation slots. The reservation system must be called
  * before we call into the allocator. If there aren't any available slots

@@ -3516,127 +3419,11 @@
                 (void) refcount_remove(&mc->mc_alloc_slots, zio);
         }
         mutex_exit(&mc->mc_lock);
 }
 
-static int
-metaslab_claim_concrete(vdev_t *vd, uint64_t offset, uint64_t size,
-    uint64_t txg)
-{
-        metaslab_t *msp;
-        spa_t *spa = vd->vdev_spa;
-        int error = 0;
-
-        if (offset >> vd->vdev_ms_shift >= vd->vdev_ms_count)
-                return (ENXIO);
-
-        ASSERT3P(vd->vdev_ms, !=, NULL);
-        msp = vd->vdev_ms[offset >> vd->vdev_ms_shift];
-
-        mutex_enter(&msp->ms_lock);
-
-        if ((txg != 0 && spa_writeable(spa)) || !msp->ms_loaded)
-                error = metaslab_activate(msp, METASLAB_WEIGHT_SECONDARY);
-
-        if (error == 0 && !range_tree_contains(msp->ms_tree, offset, size))
-                error = SET_ERROR(ENOENT);
-
-        if (error || txg == 0) {        /* txg == 0 indicates dry run */
-                mutex_exit(&msp->ms_lock);
-                return (error);
-        }
-
-        VERIFY(!msp->ms_condensing);
-        VERIFY0(P2PHASE(offset, 1ULL << vd->vdev_ashift));
-        VERIFY0(P2PHASE(size, 1ULL << vd->vdev_ashift));
-        VERIFY3U(range_tree_space(msp->ms_tree) - size, <=, msp->ms_size);
-        range_tree_remove(msp->ms_tree, offset, size);
-
-        if (spa_writeable(spa)) {       /* don't dirty if we're zdb(1M) */
-                if (range_tree_space(msp->ms_alloctree[txg & TXG_MASK]) == 0)
-                        vdev_dirty(vd, VDD_METASLAB, msp, txg);
-                range_tree_add(msp->ms_alloctree[txg & TXG_MASK], offset, size);
-        }
-
-        mutex_exit(&msp->ms_lock);
-
-        return (0);
-}
-
-typedef struct metaslab_claim_cb_arg_t {
-        uint64_t        mcca_txg;
-        int             mcca_error;
-} metaslab_claim_cb_arg_t;
-
-/* ARGSUSED */
-static void
-metaslab_claim_impl_cb(uint64_t inner_offset, vdev_t *vd, uint64_t offset,
-    uint64_t size, void *arg)
-{
-        metaslab_claim_cb_arg_t *mcca_arg = arg;
-
-        if (mcca_arg->mcca_error == 0) {
-                mcca_arg->mcca_error = metaslab_claim_concrete(vd, offset,
-                    size, mcca_arg->mcca_txg);
-        }
-}
-
 int
-metaslab_claim_impl(vdev_t *vd, uint64_t offset, uint64_t size, uint64_t txg)
-{
-        if (vd->vdev_ops->vdev_op_remap != NULL) {
-                metaslab_claim_cb_arg_t arg;
-
-                /*
-                 * Only zdb(1M) can claim on indirect vdevs.  This is used
-                 * to detect leaks of mapped space (that are not accounted
-                 * for in the obsolete counts, spacemap, or bpobj).
-                 */
-                ASSERT(!spa_writeable(vd->vdev_spa));
-                arg.mcca_error = 0;
-                arg.mcca_txg = txg;
-
-                vd->vdev_ops->vdev_op_remap(vd, offset, size,
-                    metaslab_claim_impl_cb, &arg);
-
-                if (arg.mcca_error == 0) {
-                        arg.mcca_error = metaslab_claim_concrete(vd,
-                            offset, size, txg);
-                }
-                return (arg.mcca_error);
-        } else {
-                return (metaslab_claim_concrete(vd, offset, size, txg));
-        }
-}
-
-/*
- * Intent log support: upon opening the pool after a crash, notify the SPA
- * of blocks that the intent log has allocated for immediate write, but
- * which are still considered free by the SPA because the last transaction
- * group didn't commit yet.
- */
-static int
-metaslab_claim_dva(spa_t *spa, const dva_t *dva, uint64_t txg)
-{
-        uint64_t vdev = DVA_GET_VDEV(dva);
-        uint64_t offset = DVA_GET_OFFSET(dva);
-        uint64_t size = DVA_GET_ASIZE(dva);
-        vdev_t *vd;
-
-        if ((vd = vdev_lookup_top(spa, vdev)) == NULL) {
-                return (SET_ERROR(ENXIO));
-        }
-
-        ASSERT(DVA_IS_VALID(dva));
-
-        if (DVA_GET_GANG(dva))
-                size = vdev_psize_to_asize(vd, SPA_GANGBLOCKSIZE);
-
-        return (metaslab_claim_impl(vd, offset, size, txg));
-}
-
-int
 metaslab_alloc(spa_t *spa, metaslab_class_t *mc, uint64_t psize, blkptr_t *bp,
     int ndvas, uint64_t txg, blkptr_t *hintbp, int flags,
     zio_alloc_list_t *zal, zio_t *zio)
 {
         dva_t *dva = bp->blk_dva;

@@ -3656,16 +3443,60 @@
         ASSERT(ndvas > 0 && ndvas <= spa_max_replication(spa));
         ASSERT(BP_GET_NDVAS(bp) == 0);
         ASSERT(hintbp == NULL || ndvas <= BP_GET_NDVAS(hintbp));
         ASSERT3P(zal, !=, NULL);
 
+        if (mc == spa_special_class(spa) && !BP_IS_METADATA(bp) &&
+            !(flags & (METASLAB_GANG_HEADER)) &&
+            !(spa->spa_meta_policy.spa_small_data_to_special &&
+            psize <= spa->spa_meta_policy.spa_small_data_to_special)) {
+                error = metaslab_alloc_dva(spa, spa_normal_class(spa),
+                    psize, &dva[WBC_NORMAL_DVA], 0, NULL, txg,
+                    flags | METASLAB_USE_WEIGHT_SECONDARY, zal);
+                if (error == 0) {
+                        error = metaslab_alloc_dva(spa, mc, psize,
+                            &dva[WBC_SPECIAL_DVA], 0, NULL, txg, flags, zal);
+                        if (error != 0) {
+                                error = 0;
+                                /*
+                                 * Change the place of NORMAL and cleanup the
+                                 * second DVA. After that this BP is just a
+                                 * regular BP with one DVA
+                                 *
+                                 * This operation is valid only if:
+                                 * WBC_SPECIAL_DVA is dva[0]
+                                 * WBC_NORMAL_DVA is dva[1]
+                                 *
+                                 * see wbc.h
+                                 */
+                                bcopy(&dva[WBC_NORMAL_DVA],
+                                    &dva[WBC_SPECIAL_DVA], sizeof (dva_t));
+                                bzero(&dva[WBC_NORMAL_DVA], sizeof (dva_t));
+
+                                /*
+                                 * Allocation of special DVA has failed,
+                                 * so this BP will be a regular BP and need
+                                 * to update the metaslab group's queue depth
+                                 * based on the newly allocated dva.
+                                 */
+                                metaslab_group_alloc_increment(spa,
+                                    DVA_GET_VDEV(&dva[0]), zio, flags);
+                        } else {
+                                BP_SET_SPECIAL(bp, 1);
+                        }
+                } else {
+                        spa_config_exit(spa, SCL_ALLOC, FTAG);
+                        return (error);
+                }
+        } else {
         for (int d = 0; d < ndvas; d++) {
-                error = metaslab_alloc_dva(spa, mc, psize, dva, d, hintdva,
-                    txg, flags, zal);
+                        error = metaslab_alloc_dva(spa, mc, psize, dva, d,
+                            hintdva, txg, flags, zal);
                 if (error != 0) {
                         for (d--; d >= 0; d--) {
-                                metaslab_unalloc_dva(spa, &dva[d], txg);
+                                        metaslab_free_dva(spa, &dva[d],
+                                            txg, B_TRUE);
                                 metaslab_group_alloc_decrement(spa,
                                     DVA_GET_VDEV(&dva[d]), zio, flags);
                                 bzero(&dva[d], sizeof (dva_t));
                         }
                         spa_config_exit(spa, SCL_ALLOC, FTAG);

@@ -3676,14 +3507,14 @@
                          * based on the newly allocated dva.
                          */
                         metaslab_group_alloc_increment(spa,
                             DVA_GET_VDEV(&dva[d]), zio, flags);
                 }
-
         }
-        ASSERT(error == 0);
         ASSERT(BP_GET_NDVAS(bp) == ndvas);
+        }
+        ASSERT(error == 0);
 
         spa_config_exit(spa, SCL_ALLOC, FTAG);
 
         BP_SET_BIRTH(bp, txg, txg);
 

@@ -3699,17 +3530,32 @@
         ASSERT(!BP_IS_HOLE(bp));
         ASSERT(!now || bp->blk_birth >= spa_syncing_txg(spa));
 
         spa_config_enter(spa, SCL_FREE, FTAG, RW_READER);
 
-        for (int d = 0; d < ndvas; d++) {
-                if (now) {
-                        metaslab_unalloc_dva(spa, &dva[d], txg);
+        if (BP_IS_SPECIAL(bp)) {
+                int start_dva;
+                wbc_data_t *wbc_data = spa_get_wbc_data(spa);
+
+                mutex_enter(&wbc_data->wbc_lock);
+                start_dva = wbc_first_valid_dva(bp, wbc_data, B_TRUE);
+                mutex_exit(&wbc_data->wbc_lock);
+
+                /*
+                 * Actual freeing should not be locked as
+                 * the block is already exempted from WBC
+                 * trees, and thus will not be moved
+                 */
+                metaslab_free_dva(spa, &dva[WBC_NORMAL_DVA], txg, now);
+                if (start_dva == 0) {
+                        metaslab_free_dva(spa, &dva[WBC_SPECIAL_DVA],
+                            txg, now);
+                }
                 } else {
-                        metaslab_free_dva(spa, &dva[d], txg);
+                for (int d = 0; d < ndvas; d++)
+                        metaslab_free_dva(spa, &dva[d], txg, now);
                 }
-        }
 
         spa_config_exit(spa, SCL_FREE, FTAG);
 }
 
 int

@@ -3730,81 +3576,346 @@
                         return (error);
         }
 
         spa_config_enter(spa, SCL_ALLOC, FTAG, RW_READER);
 
+        if (BP_IS_SPECIAL(bp)) {
+                int start_dva;
+                wbc_data_t *wbc_data = spa_get_wbc_data(spa);
+
+                mutex_enter(&wbc_data->wbc_lock);
+                start_dva = wbc_first_valid_dva(bp, wbc_data, B_FALSE);
+
+                /*
+                 * Actual claiming should be under lock for WBC blocks. It must
+                 * be done to ensure zdb will not fail. The only other user of
+                 * the claiming is ZIL whose blocks can not be WBC ones, and
+                 * thus the lock will not be held for them.
+                 */
+                error = metaslab_claim_dva(spa,
+                    &dva[WBC_NORMAL_DVA], txg);
+                if (error == 0 && start_dva == 0) {
+                        error = metaslab_claim_dva(spa,
+                            &dva[WBC_SPECIAL_DVA], txg);
+                }
+
+                mutex_exit(&wbc_data->wbc_lock);
+        } else {
         for (int d = 0; d < ndvas; d++)
-                if ((error = metaslab_claim_dva(spa, &dva[d], txg)) != 0)
+                        if ((error = metaslab_claim_dva(spa,
+                            &dva[d], txg)) != 0)
                         break;
+        }
 
         spa_config_exit(spa, SCL_ALLOC, FTAG);
 
         ASSERT(error == 0 || txg == 0);
 
         return (error);
 }
 
-/* ARGSUSED */
-static void
-metaslab_check_free_impl_cb(uint64_t inner, vdev_t *vd, uint64_t offset,
-    uint64_t size, void *arg)
+void
+metaslab_check_free(spa_t *spa, const blkptr_t *bp)
 {
-        if (vd->vdev_ops == &vdev_indirect_ops)
-                return;
-
-        metaslab_check_free_impl(vd, offset, size);
-}
-
-static void
-metaslab_check_free_impl(vdev_t *vd, uint64_t offset, uint64_t size)
-{
-        metaslab_t *msp;
-        spa_t *spa = vd->vdev_spa;
-
         if ((zfs_flags & ZFS_DEBUG_ZIO_FREE) == 0)
                 return;
 
-        if (vd->vdev_ops->vdev_op_remap != NULL) {
-                vd->vdev_ops->vdev_op_remap(vd, offset, size,
-                    metaslab_check_free_impl_cb, NULL);
+        if (BP_IS_SPECIAL(bp)) {
+                /* Do not check frees for WBC blocks */
                 return;
         }
 
-        ASSERT(vdev_is_concrete(vd));
-        ASSERT3U(offset >> vd->vdev_ms_shift, <, vd->vdev_ms_count);
-        ASSERT3U(spa_config_held(spa, SCL_ALL, RW_READER), !=, 0);
+        spa_config_enter(spa, SCL_VDEV, FTAG, RW_READER);
+        for (int i = 0; i < BP_GET_NDVAS(bp); i++) {
+                uint64_t vdev = DVA_GET_VDEV(&bp->blk_dva[i]);
+                vdev_t *vd = vdev_lookup_top(spa, vdev);
+                uint64_t offset = DVA_GET_OFFSET(&bp->blk_dva[i]);
+                uint64_t size = DVA_GET_ASIZE(&bp->blk_dva[i]);
+                metaslab_t *msp = vd->vdev_ms[offset >> vd->vdev_ms_shift];
 
-        msp = vd->vdev_ms[offset >> vd->vdev_ms_shift];
-
-        mutex_enter(&msp->ms_lock);
-        if (msp->ms_loaded)
+                if (msp->ms_loaded) {
                 range_tree_verify(msp->ms_tree, offset, size);
+                        range_tree_verify(msp->ms_cur_ts->ts_tree,
+                            offset, size);
+                        if (msp->ms_prev_ts != NULL) {
+                                range_tree_verify(msp->ms_prev_ts->ts_tree,
+                                    offset, size);
+                        }
+                }
 
         range_tree_verify(msp->ms_freeingtree, offset, size);
         range_tree_verify(msp->ms_freedtree, offset, size);
         for (int j = 0; j < TXG_DEFER_SIZE; j++)
                 range_tree_verify(msp->ms_defertree[j], offset, size);
+        }
+        spa_config_exit(spa, SCL_VDEV, FTAG);
+}
+
+/*
+ * Trims all free space in the metaslab. Returns the root TRIM zio (that the
+ * caller should zio_wait() for) and the amount of space in the metaslab that
+ * has been scheduled for trimming in the `delta' return argument.
+ */
+zio_t *
+metaslab_trim_all(metaslab_t *msp, uint64_t *delta)
+{
+        boolean_t was_loaded;
+        uint64_t trimmed_space;
+        zio_t *trim_io;
+
+        ASSERT(!MUTEX_HELD(&msp->ms_group->mg_lock));
+
+        mutex_enter(&msp->ms_lock);
+
+        while (msp->ms_loading)
+                metaslab_load_wait(msp);
+        /* If we loaded the metaslab, unload it when we're done. */
+        was_loaded = msp->ms_loaded;
+        if (!was_loaded) {
+                if (metaslab_load(msp) != 0) {
         mutex_exit(&msp->ms_lock);
+                        return (0);
+                }
+        }
+        /* Flush out any scheduled extents and add everything in ms_tree. */
+        range_tree_vacate(msp->ms_cur_ts->ts_tree, NULL, NULL);
+        range_tree_walk(msp->ms_tree, metaslab_trim_add, msp);
+
+        /* Force this trim to take place ASAP. */
+        if (msp->ms_prev_ts != NULL)
+                metaslab_free_trimset(msp->ms_prev_ts);
+        msp->ms_prev_ts = msp->ms_cur_ts;
+        msp->ms_cur_ts = metaslab_new_trimset(0, &msp->ms_lock);
+        trimmed_space = range_tree_space(msp->ms_tree);
+        if (!was_loaded)
+                metaslab_unload(msp);
+
+        trim_io = metaslab_exec_trim(msp);
+        mutex_exit(&msp->ms_lock);
+        *delta = trimmed_space;
+
+        return (trim_io);
 }
 
+/*
+ * Notifies the trimsets in a metaslab that an extent has been allocated.
+ * This removes the segment from the queues of extents awaiting to be trimmed.
+ */
+static void
+metaslab_trim_remove(void *arg, uint64_t offset, uint64_t size)
+{
+        metaslab_t *msp = arg;
+
+        range_tree_remove_overlap(msp->ms_cur_ts->ts_tree, offset, size);
+        if (msp->ms_prev_ts != NULL) {
+                range_tree_remove_overlap(msp->ms_prev_ts->ts_tree, offset,
+                    size);
+        }
+}
+
+/*
+ * Notifies the trimsets in a metaslab that an extent has been freed.
+ * This adds the segment to the currently open queue of extents awaiting
+ * to be trimmed.
+ */
+static void
+metaslab_trim_add(void *arg, uint64_t offset, uint64_t size)
+{
+        metaslab_t *msp = arg;
+        ASSERT(msp->ms_cur_ts != NULL);
+        range_tree_add(msp->ms_cur_ts->ts_tree, offset, size);
+}
+
+/*
+ * Does a metaslab's automatic trim operation processing. This must be
+ * called from metaslab_sync, with the txg number of the txg. This function
+ * issues trims in intervals as dictated by the zfs_txgs_per_trim tunable.
+ */
 void
-metaslab_check_free(spa_t *spa, const blkptr_t *bp)
+metaslab_auto_trim(metaslab_t *msp, uint64_t txg)
 {
-        if ((zfs_flags & ZFS_DEBUG_ZIO_FREE) == 0)
-                return;
+        /* for atomicity */
+        uint64_t txgs_per_trim = zfs_txgs_per_trim;
 
-        spa_config_enter(spa, SCL_VDEV, FTAG, RW_READER);
-        for (int i = 0; i < BP_GET_NDVAS(bp); i++) {
-                uint64_t vdev = DVA_GET_VDEV(&bp->blk_dva[i]);
-                vdev_t *vd = vdev_lookup_top(spa, vdev);
-                uint64_t offset = DVA_GET_OFFSET(&bp->blk_dva[i]);
-                uint64_t size = DVA_GET_ASIZE(&bp->blk_dva[i]);
+        ASSERT(!MUTEX_HELD(&msp->ms_lock));
+        mutex_enter(&msp->ms_lock);
 
-                if (DVA_GET_GANG(&bp->blk_dva[i]))
-                        size = vdev_psize_to_asize(vd, SPA_GANGBLOCKSIZE);
+        /*
+         * Since we typically have hundreds of metaslabs per vdev, but we only
+         * trim them once every zfs_txgs_per_trim txgs, it'd be best if we
+         * could sequence the TRIM commands from all metaslabs so that they
+         * don't all always pound the device in the same txg. We do so by
+         * artificially inflating the birth txg of the first trim set by a
+         * sequence number derived from the metaslab's starting offset
+         * (modulo zfs_txgs_per_trim). Thus, for the default 200 metaslabs and
+         * 32 txgs per trim, we'll only be trimming ~6.25 metaslabs per txg.
+         *
+         * If we detect that the txg has advanced too far ahead of ts_birth,
+         * it means our birth txg is out of lockstep. Recompute it by
+         * rounding down to the nearest zfs_txgs_per_trim multiple and adding
+         * our metaslab id modulo zfs_txgs_per_trim.
+         */
+        if (txg > msp->ms_cur_ts->ts_birth + txgs_per_trim) {
+                msp->ms_cur_ts->ts_birth = (txg / txgs_per_trim) *
+                    txgs_per_trim + (msp->ms_id % txgs_per_trim);
+        }
 
-                ASSERT3P(vd, !=, NULL);
+        /* Time to swap out the current and previous trimsets */
+        if (txg == msp->ms_cur_ts->ts_birth + txgs_per_trim) {
+                if (msp->ms_prev_ts != NULL) {
+                        if (msp->ms_trimming_ts != NULL) {
+                                spa_t *spa = msp->ms_group->mg_class->mc_spa;
+                                /*
+                                 * The previous trim run is still ongoing, so
+                                 * the device is reacting slowly to our trim
+                                 * requests. Drop this trimset, so as not to
+                                 * back the device up with trim requests.
+                                 */
+                                spa_trimstats_auto_slow_incr(spa);
+                                metaslab_free_trimset(msp->ms_prev_ts);
+                        } else if (msp->ms_group->mg_vd->vdev_man_trimming) {
+                                /*
+                                 * If a manual trim is ongoing, we want to
+                                 * inhibit autotrim temporarily so it doesn't
+                                 * slow down the manual trim.
+                                 */
+                                metaslab_free_trimset(msp->ms_prev_ts);
+                        } else {
+                                /*
+                                 * Trim out aged extents on the vdevs - these
+                                 * are safe to be destroyed now. We'll keep
+                                 * the trimset around to deny allocations from
+                                 * these regions while the trims are ongoing.
+                                 */
+                                zio_nowait(metaslab_exec_trim(msp));
+                        }
+                }
+                msp->ms_prev_ts = msp->ms_cur_ts;
+                msp->ms_cur_ts = metaslab_new_trimset(txg, &msp->ms_lock);
+        }
+        mutex_exit(&msp->ms_lock);
+}
 
-                metaslab_check_free_impl(vd, offset, size);
+static void
+metaslab_trim_done(zio_t *zio)
+{
+        metaslab_t *msp = zio->io_private;
+        boolean_t held;
+
+        ASSERT(msp != NULL);
+        ASSERT(msp->ms_trimming_ts != NULL);
+        held = MUTEX_HELD(&msp->ms_lock);
+        if (!held)
+                mutex_enter(&msp->ms_lock);
+        metaslab_free_trimset(msp->ms_trimming_ts);
+        msp->ms_trimming_ts = NULL;
+        cv_signal(&msp->ms_trim_cv);
+        if (!held)
+                mutex_exit(&msp->ms_lock);
+}
+
+/*
+ * Executes a zio_trim on a range tree holding freed extents in the metaslab.
+ */
+static zio_t *
+metaslab_exec_trim(metaslab_t *msp)
+{
+        metaslab_group_t *mg = msp->ms_group;
+        spa_t *spa = mg->mg_class->mc_spa;
+        vdev_t *vd = mg->mg_vd;
+        range_tree_t *trim_tree;
+        zio_t *zio;
+
+        ASSERT(MUTEX_HELD(&msp->ms_lock));
+
+        /* wait for a preceding trim to finish */
+        while (msp->ms_trimming_ts != NULL)
+                cv_wait(&msp->ms_trim_cv, &msp->ms_lock);
+        msp->ms_trimming_ts = msp->ms_prev_ts;
+        msp->ms_prev_ts = NULL;
+        trim_tree = msp->ms_trimming_ts->ts_tree;
+#ifdef  DEBUG
+        if (msp->ms_loaded) {
+                for (range_seg_t *rs = avl_first(&trim_tree->rt_root);
+                    rs != NULL; rs = AVL_NEXT(&trim_tree->rt_root, rs)) {
+                        if (!range_tree_contains(msp->ms_tree,
+                            rs->rs_start, rs->rs_end - rs->rs_start)) {
+                                panic("trimming allocated region; mss=%p",
+                                    (void*)rs);
         }
-        spa_config_exit(spa, SCL_VDEV, FTAG);
+                }
+        }
+#endif
+
+        /* Nothing to trim */
+        if (range_tree_space(trim_tree) == 0) {
+                metaslab_free_trimset(msp->ms_trimming_ts);
+                msp->ms_trimming_ts = 0;
+                return (zio_root(spa, NULL, NULL, 0));
+        }
+        zio = zio_trim(spa, vd, trim_tree, metaslab_trim_done, msp, 0,
+            ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE | ZIO_FLAG_DONT_RETRY |
+            ZIO_FLAG_CONFIG_WRITER, msp);
+
+        return (zio);
+}
+
+/*
+ * Allocates and initializes a new trimset structure. The `txg' argument
+ * indicates when this trimset was born and `lock' indicates the lock to
+ * link to the range tree.
+ */
+static metaslab_trimset_t *
+metaslab_new_trimset(uint64_t txg, kmutex_t *lock)
+{
+        metaslab_trimset_t *ts;
+
+        ts = kmem_zalloc(sizeof (*ts), KM_SLEEP);
+        ts->ts_birth = txg;
+        ts->ts_tree = range_tree_create(NULL, NULL, lock);
+
+        return (ts);
+}
+
+/*
+ * Destroys and frees a trim set previously allocated by metaslab_new_trimset.
+ */
+static void
+metaslab_free_trimset(metaslab_trimset_t *ts)
+{
+        range_tree_vacate(ts->ts_tree, NULL, NULL);
+        range_tree_destroy(ts->ts_tree);
+        kmem_free(ts, sizeof (*ts));
+}
+
+/*
+ * Checks whether an allocation conflicts with an ongoing trim operation in
+ * the given metaslab. This function takes a segment starting at `*offset'
+ * of `size' and checks whether it hits any region in the metaslab currently
+ * being trimmed. If yes, it tries to adjust the allocation to the end of
+ * the region being trimmed (P2ROUNDUP aligned by `align'), but only up to
+ * `limit' (no part of the allocation is allowed to go past this point).
+ *
+ * Returns B_FALSE if either the original allocation wasn't in conflict, or
+ * the conflict could be resolved by adjusting the value stored in `offset'
+ * such that the whole allocation still fits below `limit'. Returns B_TRUE
+ * if the allocation conflict couldn't be resolved.
+ */
+static boolean_t metaslab_check_trim_conflict(metaslab_t *msp,
+    uint64_t *offset, uint64_t size, uint64_t align, uint64_t limit)
+{
+        uint64_t new_offset;
+
+        if (msp->ms_trimming_ts == NULL)
+                /* no trim conflict, original offset is OK */
+                return (B_FALSE);
+
+        new_offset = P2ROUNDUP(range_tree_find_gap(msp->ms_trimming_ts->ts_tree,
+            *offset, size), align);
+        if (new_offset != *offset && new_offset + size > limit)
+                /* trim conflict and adjustment not possible */
+                return (B_TRUE);
+
+        /* trim conflict, but adjusted offset still within limit */
+        *offset = new_offset;
+        return (B_FALSE);
 }