1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  23  * Copyright (c) 2011 by Delphix. All rights reserved.
  24  */
  25 
  26 #include <sys/dsl_pool.h>
  27 #include <sys/dsl_dataset.h>
  28 #include <sys/dsl_prop.h>
  29 #include <sys/dsl_dir.h>
  30 #include <sys/dsl_synctask.h>
  31 #include <sys/dsl_scan.h>
  32 #include <sys/dnode.h>
  33 #include <sys/dmu_tx.h>
  34 #include <sys/dmu_objset.h>
  35 #include <sys/arc.h>
  36 #include <sys/zap.h>
  37 #include <sys/zio.h>
  38 #include <sys/zfs_context.h>
  39 #include <sys/fs/zfs.h>
  40 #include <sys/zfs_znode.h>
  41 #include <sys/spa_impl.h>
  42 #include <sys/dsl_deadlist.h>
  43 
  44 int zfs_no_write_throttle = 0;
  45 int zfs_write_limit_shift = 3;                  /* 1/8th of physical memory */
  46 int zfs_txg_synctime_ms = 1000;         /* target millisecs to sync a txg */
  47 
  48 uint64_t zfs_write_limit_min = 32 << 20;  /* min write limit is 32MB */
  49 uint64_t zfs_write_limit_max = 0;               /* max data payload per txg */
  50 uint64_t zfs_write_limit_inflated = 0;
  51 uint64_t zfs_write_limit_override = 0;
  52 
  53 kmutex_t zfs_write_limit_lock;
  54 
  55 static pgcnt_t old_physmem = 0;
  56 
  57 int
  58 dsl_pool_open_special_dir(dsl_pool_t *dp, const char *name, dsl_dir_t **ddp)
  59 {
  60         uint64_t obj;
  61         int err;
  62 
  63         err = zap_lookup(dp->dp_meta_objset,
  64             dp->dp_root_dir->dd_phys->dd_child_dir_zapobj,
  65             name, sizeof (obj), 1, &obj);
  66         if (err)
  67                 return (err);
  68 
  69         return (dsl_dir_open_obj(dp, obj, name, dp, ddp));
  70 }
  71 
  72 static dsl_pool_t *
  73 dsl_pool_open_impl(spa_t *spa, uint64_t txg)
  74 {
  75         dsl_pool_t *dp;
  76         blkptr_t *bp = spa_get_rootblkptr(spa);
  77 
  78         dp = kmem_zalloc(sizeof (dsl_pool_t), KM_SLEEP);
  79         dp->dp_spa = spa;
  80         dp->dp_meta_rootbp = *bp;
  81         rw_init(&dp->dp_config_rwlock, NULL, RW_DEFAULT, NULL);
  82         dp->dp_write_limit = zfs_write_limit_min;
  83         txg_init(dp, txg);
  84 
  85         txg_list_create(&dp->dp_dirty_datasets,
  86             offsetof(dsl_dataset_t, ds_dirty_link));
  87         txg_list_create(&dp->dp_dirty_dirs,
  88             offsetof(dsl_dir_t, dd_dirty_link));
  89         txg_list_create(&dp->dp_sync_tasks,
  90             offsetof(dsl_sync_task_group_t, dstg_node));
  91         list_create(&dp->dp_synced_datasets, sizeof (dsl_dataset_t),
  92             offsetof(dsl_dataset_t, ds_synced_link));
  93 
  94         mutex_init(&dp->dp_lock, NULL, MUTEX_DEFAULT, NULL);
  95 
  96         dp->dp_vnrele_taskq = taskq_create("zfs_vn_rele_taskq", 1, minclsyspri,
  97             1, 4, 0);
  98 
  99         return (dp);
 100 }
 101 
 102 int
 103 dsl_pool_open(spa_t *spa, uint64_t txg, dsl_pool_t **dpp)
 104 {
 105         int err;
 106         dsl_pool_t *dp = dsl_pool_open_impl(spa, txg);
 107         dsl_dir_t *dd;
 108         dsl_dataset_t *ds;
 109         uint64_t obj;
 110 
 111         rw_enter(&dp->dp_config_rwlock, RW_WRITER);
 112         err = dmu_objset_open_impl(spa, NULL, &dp->dp_meta_rootbp,
 113             &dp->dp_meta_objset);
 114         if (err)
 115                 goto out;
 116 
 117         err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
 118             DMU_POOL_ROOT_DATASET, sizeof (uint64_t), 1,
 119             &dp->dp_root_dir_obj);
 120         if (err)
 121                 goto out;
 122 
 123         err = dsl_dir_open_obj(dp, dp->dp_root_dir_obj,
 124             NULL, dp, &dp->dp_root_dir);
 125         if (err)
 126                 goto out;
 127 
 128         err = dsl_pool_open_special_dir(dp, MOS_DIR_NAME, &dp->dp_mos_dir);
 129         if (err)
 130                 goto out;
 131 
 132         if (spa_version(spa) >= SPA_VERSION_ORIGIN) {
 133                 err = dsl_pool_open_special_dir(dp, ORIGIN_DIR_NAME, &dd);
 134                 if (err)
 135                         goto out;
 136                 err = dsl_dataset_hold_obj(dp, dd->dd_phys->dd_head_dataset_obj,
 137                     FTAG, &ds);
 138                 if (err == 0) {
 139                         err = dsl_dataset_hold_obj(dp,
 140                             ds->ds_phys->ds_prev_snap_obj, dp,
 141                             &dp->dp_origin_snap);
 142                         dsl_dataset_rele(ds, FTAG);
 143                 }
 144                 dsl_dir_close(dd, dp);
 145                 if (err)
 146                         goto out;
 147         }
 148 
 149         if (spa_version(spa) >= SPA_VERSION_DEADLISTS) {
 150                 err = dsl_pool_open_special_dir(dp, FREE_DIR_NAME,
 151                     &dp->dp_free_dir);
 152                 if (err)
 153                         goto out;
 154 
 155                 err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
 156                     DMU_POOL_FREE_BPOBJ, sizeof (uint64_t), 1, &obj);
 157                 if (err)
 158                         goto out;
 159                 VERIFY3U(0, ==, bpobj_open(&dp->dp_free_bpobj,
 160                     dp->dp_meta_objset, obj));
 161         }
 162 
 163         err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
 164             DMU_POOL_TMP_USERREFS, sizeof (uint64_t), 1,
 165             &dp->dp_tmp_userrefs_obj);
 166         if (err == ENOENT)
 167                 err = 0;
 168         if (err)
 169                 goto out;
 170 
 171         err = dsl_scan_init(dp, txg);
 172 
 173 out:
 174         rw_exit(&dp->dp_config_rwlock);
 175         if (err)
 176                 dsl_pool_close(dp);
 177         else
 178                 *dpp = dp;
 179 
 180         return (err);
 181 }
 182 
 183 void
 184 dsl_pool_close(dsl_pool_t *dp)
 185 {
 186         /* drop our references from dsl_pool_open() */
 187 
 188         /*
 189          * Since we held the origin_snap from "syncing" context (which
 190          * includes pool-opening context), it actually only got a "ref"
 191          * and not a hold, so just drop that here.
 192          */
 193         if (dp->dp_origin_snap)
 194                 dsl_dataset_drop_ref(dp->dp_origin_snap, dp);
 195         if (dp->dp_mos_dir)
 196                 dsl_dir_close(dp->dp_mos_dir, dp);
 197         if (dp->dp_free_dir)
 198                 dsl_dir_close(dp->dp_free_dir, dp);
 199         if (dp->dp_root_dir)
 200                 dsl_dir_close(dp->dp_root_dir, dp);
 201 
 202         bpobj_close(&dp->dp_free_bpobj);
 203 
 204         /* undo the dmu_objset_open_impl(mos) from dsl_pool_open() */
 205         if (dp->dp_meta_objset)
 206                 dmu_objset_evict(dp->dp_meta_objset);
 207 
 208         txg_list_destroy(&dp->dp_dirty_datasets);
 209         txg_list_destroy(&dp->dp_sync_tasks);
 210         txg_list_destroy(&dp->dp_dirty_dirs);
 211         list_destroy(&dp->dp_synced_datasets);
 212 
 213         arc_flush(dp->dp_spa);
 214         txg_fini(dp);
 215         dsl_scan_fini(dp);
 216         rw_destroy(&dp->dp_config_rwlock);
 217         mutex_destroy(&dp->dp_lock);
 218         taskq_destroy(dp->dp_vnrele_taskq);
 219         if (dp->dp_blkstats)
 220                 kmem_free(dp->dp_blkstats, sizeof (zfs_all_blkstats_t));
 221         kmem_free(dp, sizeof (dsl_pool_t));
 222 }
 223 
 224 dsl_pool_t *
 225 dsl_pool_create(spa_t *spa, nvlist_t *zplprops, uint64_t txg)
 226 {
 227         int err;
 228         dsl_pool_t *dp = dsl_pool_open_impl(spa, txg);
 229         dmu_tx_t *tx = dmu_tx_create_assigned(dp, txg);
 230         objset_t *os;
 231         dsl_dataset_t *ds;
 232         uint64_t obj;
 233 
 234         /* create and open the MOS (meta-objset) */
 235         dp->dp_meta_objset = dmu_objset_create_impl(spa,
 236             NULL, &dp->dp_meta_rootbp, DMU_OST_META, tx);
 237 
 238         /* create the pool directory */
 239         err = zap_create_claim(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
 240             DMU_OT_OBJECT_DIRECTORY, DMU_OT_NONE, 0, tx);
 241         ASSERT3U(err, ==, 0);
 242 
 243         /* Initialize scan structures */
 244         VERIFY3U(0, ==, dsl_scan_init(dp, txg));
 245 
 246         /* create and open the root dir */
 247         dp->dp_root_dir_obj = dsl_dir_create_sync(dp, NULL, NULL, tx);
 248         VERIFY(0 == dsl_dir_open_obj(dp, dp->dp_root_dir_obj,
 249             NULL, dp, &dp->dp_root_dir));
 250 
 251         /* create and open the meta-objset dir */
 252         (void) dsl_dir_create_sync(dp, dp->dp_root_dir, MOS_DIR_NAME, tx);
 253         VERIFY(0 == dsl_pool_open_special_dir(dp,
 254             MOS_DIR_NAME, &dp->dp_mos_dir));
 255 
 256         if (spa_version(spa) >= SPA_VERSION_DEADLISTS) {
 257                 /* create and open the free dir */
 258                 (void) dsl_dir_create_sync(dp, dp->dp_root_dir,
 259                     FREE_DIR_NAME, tx);
 260                 VERIFY(0 == dsl_pool_open_special_dir(dp,
 261                     FREE_DIR_NAME, &dp->dp_free_dir));
 262 
 263                 /* create and open the free_bplist */
 264                 obj = bpobj_alloc(dp->dp_meta_objset, SPA_MAXBLOCKSIZE, tx);
 265                 VERIFY(zap_add(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
 266                     DMU_POOL_FREE_BPOBJ, sizeof (uint64_t), 1, &obj, tx) == 0);
 267                 VERIFY3U(0, ==, bpobj_open(&dp->dp_free_bpobj,
 268                     dp->dp_meta_objset, obj));
 269         }
 270 
 271         if (spa_version(spa) >= SPA_VERSION_DSL_SCRUB)
 272                 dsl_pool_create_origin(dp, tx);
 273 
 274         /* create the root dataset */
 275         obj = dsl_dataset_create_sync_dd(dp->dp_root_dir, NULL, 0, tx);
 276 
 277         /* create the root objset */
 278         VERIFY(0 == dsl_dataset_hold_obj(dp, obj, FTAG, &ds));
 279         os = dmu_objset_create_impl(dp->dp_spa, ds,
 280             dsl_dataset_get_blkptr(ds), DMU_OST_ZFS, tx);
 281 #ifdef _KERNEL
 282         zfs_create_fs(os, kcred, zplprops, tx);
 283 #endif
 284         dsl_dataset_rele(ds, FTAG);
 285 
 286         dmu_tx_commit(tx);
 287 
 288         return (dp);
 289 }
 290 
 291 static int
 292 deadlist_enqueue_cb(void *arg, const blkptr_t *bp, dmu_tx_t *tx)
 293 {
 294         dsl_deadlist_t *dl = arg;
 295         dsl_pool_t *dp = dmu_objset_pool(dl->dl_os);
 296         rw_enter(&dp->dp_config_rwlock, RW_READER);
 297         dsl_deadlist_insert(dl, bp, tx);
 298         rw_exit(&dp->dp_config_rwlock);
 299         return (0);
 300 }
 301 
 302 void
 303 dsl_pool_sync(dsl_pool_t *dp, uint64_t txg)
 304 {
 305         zio_t *zio;
 306         dmu_tx_t *tx;
 307         dsl_dir_t *dd;
 308         dsl_dataset_t *ds;
 309         dsl_sync_task_group_t *dstg;
 310         objset_t *mos = dp->dp_meta_objset;
 311         hrtime_t start, write_time;
 312         uint64_t data_written;
 313         int err;
 314 
 315         /*
 316          * We need to copy dp_space_towrite() before doing
 317          * dsl_sync_task_group_sync(), because
 318          * dsl_dataset_snapshot_reserve_space() will increase
 319          * dp_space_towrite but not actually write anything.
 320          */
 321         data_written = dp->dp_space_towrite[txg & TXG_MASK];
 322 
 323         tx = dmu_tx_create_assigned(dp, txg);
 324 
 325         dp->dp_read_overhead = 0;
 326         start = gethrtime();
 327 
 328         zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
 329         while (ds = txg_list_remove(&dp->dp_dirty_datasets, txg)) {
 330                 /*
 331                  * We must not sync any non-MOS datasets twice, because
 332                  * we may have taken a snapshot of them.  However, we
 333                  * may sync newly-created datasets on pass 2.
 334                  */
 335                 ASSERT(!list_link_active(&ds->ds_synced_link));
 336                 list_insert_tail(&dp->dp_synced_datasets, ds);
 337                 dsl_dataset_sync(ds, zio, tx);
 338         }
 339         DTRACE_PROBE(pool_sync__1setup);
 340         err = zio_wait(zio);
 341 
 342         write_time = gethrtime() - start;
 343         ASSERT(err == 0);
 344         DTRACE_PROBE(pool_sync__2rootzio);
 345 
 346         for (ds = list_head(&dp->dp_synced_datasets); ds;
 347             ds = list_next(&dp->dp_synced_datasets, ds))
 348                 dmu_objset_do_userquota_updates(ds->ds_objset, tx);
 349 
 350         /*
 351          * Sync the datasets again to push out the changes due to
 352          * userspace updates.  This must be done before we process the
 353          * sync tasks, because that could cause a snapshot of a dataset
 354          * whose ds_bp will be rewritten when we do this 2nd sync.
 355          */
 356         zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
 357         while (ds = txg_list_remove(&dp->dp_dirty_datasets, txg)) {
 358                 ASSERT(list_link_active(&ds->ds_synced_link));
 359                 dmu_buf_rele(ds->ds_dbuf, ds);
 360                 dsl_dataset_sync(ds, zio, tx);
 361         }
 362         err = zio_wait(zio);
 363 
 364         /*
 365          * Move dead blocks from the pending deadlist to the on-disk
 366          * deadlist.
 367          */
 368         for (ds = list_head(&dp->dp_synced_datasets); ds;
 369             ds = list_next(&dp->dp_synced_datasets, ds)) {
 370                 bplist_iterate(&ds->ds_pending_deadlist,
 371                     deadlist_enqueue_cb, &ds->ds_deadlist, tx);
 372         }
 373 
 374         while (dstg = txg_list_remove(&dp->dp_sync_tasks, txg)) {
 375                 /*
 376                  * No more sync tasks should have been added while we
 377                  * were syncing.
 378                  */
 379                 ASSERT(spa_sync_pass(dp->dp_spa) == 1);
 380                 dsl_sync_task_group_sync(dstg, tx);
 381         }
 382         DTRACE_PROBE(pool_sync__3task);
 383 
 384         start = gethrtime();
 385         while (dd = txg_list_remove(&dp->dp_dirty_dirs, txg))
 386                 dsl_dir_sync(dd, tx);
 387         write_time += gethrtime() - start;
 388 
 389         start = gethrtime();
 390         if (list_head(&mos->os_dirty_dnodes[txg & TXG_MASK]) != NULL ||
 391             list_head(&mos->os_free_dnodes[txg & TXG_MASK]) != NULL) {
 392                 zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
 393                 dmu_objset_sync(mos, zio, tx);
 394                 err = zio_wait(zio);
 395                 ASSERT(err == 0);
 396                 dprintf_bp(&dp->dp_meta_rootbp, "meta objset rootbp is %s", "");
 397                 spa_set_rootblkptr(dp->dp_spa, &dp->dp_meta_rootbp);
 398         }
 399         write_time += gethrtime() - start;
 400         DTRACE_PROBE2(pool_sync__4io, hrtime_t, write_time,
 401             hrtime_t, dp->dp_read_overhead);
 402         write_time -= dp->dp_read_overhead;
 403 
 404         dmu_tx_commit(tx);
 405 
 406         dp->dp_space_towrite[txg & TXG_MASK] = 0;
 407         ASSERT(dp->dp_tempreserved[txg & TXG_MASK] == 0);
 408 
 409         /*
 410          * If the write limit max has not been explicitly set, set it
 411          * to a fraction of available physical memory (default 1/8th).
 412          * Note that we must inflate the limit because the spa
 413          * inflates write sizes to account for data replication.
 414          * Check this each sync phase to catch changing memory size.
 415          */
 416         if (physmem != old_physmem && zfs_write_limit_shift) {
 417                 mutex_enter(&zfs_write_limit_lock);
 418                 old_physmem = physmem;
 419                 zfs_write_limit_max = ptob(physmem) >> zfs_write_limit_shift;
 420                 zfs_write_limit_inflated = MAX(zfs_write_limit_min,
 421                     spa_get_asize(dp->dp_spa, zfs_write_limit_max));
 422                 mutex_exit(&zfs_write_limit_lock);
 423         }
 424 
 425         /*
 426          * Attempt to keep the sync time consistent by adjusting the
 427          * amount of write traffic allowed into each transaction group.
 428          * Weight the throughput calculation towards the current value:
 429          *      thru = 3/4 old_thru + 1/4 new_thru
 430          *
 431          * Note: write_time is in nanosecs, so write_time/MICROSEC
 432          * yields millisecs
 433          */
 434         ASSERT(zfs_write_limit_min > 0);
 435         if (data_written > zfs_write_limit_min / 8 && write_time > MICROSEC) {
 436                 uint64_t throughput = data_written / (write_time / MICROSEC);
 437 
 438                 if (dp->dp_throughput)
 439                         dp->dp_throughput = throughput / 4 +
 440                             3 * dp->dp_throughput / 4;
 441                 else
 442                         dp->dp_throughput = throughput;
 443                 dp->dp_write_limit = MIN(zfs_write_limit_inflated,
 444                     MAX(zfs_write_limit_min,
 445                     dp->dp_throughput * zfs_txg_synctime_ms));
 446         }
 447 }
 448 
 449 void
 450 dsl_pool_sync_done(dsl_pool_t *dp, uint64_t txg)
 451 {
 452         dsl_dataset_t *ds;
 453         objset_t *os;
 454 
 455         while (ds = list_head(&dp->dp_synced_datasets)) {
 456                 list_remove(&dp->dp_synced_datasets, ds);
 457                 os = ds->ds_objset;
 458                 zil_clean(os->os_zil, txg);
 459                 ASSERT(!dmu_objset_is_dirty(os, txg));
 460                 dmu_buf_rele(ds->ds_dbuf, ds);
 461         }
 462         ASSERT(!dmu_objset_is_dirty(dp->dp_meta_objset, txg));
 463 }
 464 
 465 /*
 466  * TRUE if the current thread is the tx_sync_thread or if we
 467  * are being called from SPA context during pool initialization.
 468  */
 469 int
 470 dsl_pool_sync_context(dsl_pool_t *dp)
 471 {
 472         return (curthread == dp->dp_tx.tx_sync_thread ||
 473             spa_get_dsl(dp->dp_spa) == NULL);
 474 }
 475 
 476 uint64_t
 477 dsl_pool_adjustedsize(dsl_pool_t *dp, boolean_t netfree)
 478 {
 479         uint64_t space, resv;
 480 
 481         /*
 482          * Reserve about 1.6% (1/64), or at least 32MB, for allocation
 483          * efficiency.
 484          * XXX The intent log is not accounted for, so it must fit
 485          * within this slop.
 486          *
 487          * If we're trying to assess whether it's OK to do a free,
 488          * cut the reservation in half to allow forward progress
 489          * (e.g. make it possible to rm(1) files from a full pool).
 490          */
 491         space = spa_get_dspace(dp->dp_spa);
 492         resv = MAX(space >> 6, SPA_MINDEVSIZE >> 1);
 493         if (netfree)
 494                 resv >>= 1;
 495 
 496         return (space - resv);
 497 }
 498 
 499 int
 500 dsl_pool_tempreserve_space(dsl_pool_t *dp, uint64_t space, dmu_tx_t *tx)
 501 {
 502         uint64_t reserved = 0;
 503         uint64_t write_limit = (zfs_write_limit_override ?
 504             zfs_write_limit_override : dp->dp_write_limit);
 505 
 506         if (zfs_no_write_throttle) {
 507                 atomic_add_64(&dp->dp_tempreserved[tx->tx_txg & TXG_MASK],
 508                     space);
 509                 return (0);
 510         }
 511 
 512         /*
 513          * Check to see if we have exceeded the maximum allowed IO for
 514          * this transaction group.  We can do this without locks since
 515          * a little slop here is ok.  Note that we do the reserved check
 516          * with only half the requested reserve: this is because the
 517          * reserve requests are worst-case, and we really don't want to
 518          * throttle based off of worst-case estimates.
 519          */
 520         if (write_limit > 0) {
 521                 reserved = dp->dp_space_towrite[tx->tx_txg & TXG_MASK]
 522                     + dp->dp_tempreserved[tx->tx_txg & TXG_MASK] / 2;
 523 
 524                 if (reserved && reserved > write_limit)
 525                         return (ERESTART);
 526         }
 527 
 528         atomic_add_64(&dp->dp_tempreserved[tx->tx_txg & TXG_MASK], space);
 529 
 530         /*
 531          * If this transaction group is over 7/8ths capacity, delay
 532          * the caller 1 clock tick.  This will slow down the "fill"
 533          * rate until the sync process can catch up with us.
 534          */
 535         if (reserved && reserved > (write_limit - (write_limit >> 3)))
 536                 txg_delay(dp, tx->tx_txg, 1);
 537 
 538         return (0);
 539 }
 540 
 541 void
 542 dsl_pool_tempreserve_clear(dsl_pool_t *dp, int64_t space, dmu_tx_t *tx)
 543 {
 544         ASSERT(dp->dp_tempreserved[tx->tx_txg & TXG_MASK] >= space);
 545         atomic_add_64(&dp->dp_tempreserved[tx->tx_txg & TXG_MASK], -space);
 546 }
 547 
 548 void
 549 dsl_pool_memory_pressure(dsl_pool_t *dp)
 550 {
 551         uint64_t space_inuse = 0;
 552         int i;
 553 
 554         if (dp->dp_write_limit == zfs_write_limit_min)
 555                 return;
 556 
 557         for (i = 0; i < TXG_SIZE; i++) {
 558                 space_inuse += dp->dp_space_towrite[i];
 559                 space_inuse += dp->dp_tempreserved[i];
 560         }
 561         dp->dp_write_limit = MAX(zfs_write_limit_min,
 562             MIN(dp->dp_write_limit, space_inuse / 4));
 563 }
 564 
 565 void
 566 dsl_pool_willuse_space(dsl_pool_t *dp, int64_t space, dmu_tx_t *tx)
 567 {
 568         if (space > 0) {
 569                 mutex_enter(&dp->dp_lock);
 570                 dp->dp_space_towrite[tx->tx_txg & TXG_MASK] += space;
 571                 mutex_exit(&dp->dp_lock);
 572         }
 573 }
 574 
 575 /* ARGSUSED */
 576 static int
 577 upgrade_clones_cb(spa_t *spa, uint64_t dsobj, const char *dsname, void *arg)
 578 {
 579         dmu_tx_t *tx = arg;
 580         dsl_dataset_t *ds, *prev = NULL;
 581         int err;
 582         dsl_pool_t *dp = spa_get_dsl(spa);
 583 
 584         err = dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds);
 585         if (err)
 586                 return (err);
 587 
 588         while (ds->ds_phys->ds_prev_snap_obj != 0) {
 589                 err = dsl_dataset_hold_obj(dp, ds->ds_phys->ds_prev_snap_obj,
 590                     FTAG, &prev);
 591                 if (err) {
 592                         dsl_dataset_rele(ds, FTAG);
 593                         return (err);
 594                 }
 595 
 596                 if (prev->ds_phys->ds_next_snap_obj != ds->ds_object)
 597                         break;
 598                 dsl_dataset_rele(ds, FTAG);
 599                 ds = prev;
 600                 prev = NULL;
 601         }
 602 
 603         if (prev == NULL) {
 604                 prev = dp->dp_origin_snap;
 605 
 606                 /*
 607                  * The $ORIGIN can't have any data, or the accounting
 608                  * will be wrong.
 609                  */
 610                 ASSERT(prev->ds_phys->ds_bp.blk_birth == 0);
 611 
 612                 /* The origin doesn't get attached to itself */
 613                 if (ds->ds_object == prev->ds_object) {
 614                         dsl_dataset_rele(ds, FTAG);
 615                         return (0);
 616                 }
 617 
 618                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
 619                 ds->ds_phys->ds_prev_snap_obj = prev->ds_object;
 620                 ds->ds_phys->ds_prev_snap_txg = prev->ds_phys->ds_creation_txg;
 621 
 622                 dmu_buf_will_dirty(ds->ds_dir->dd_dbuf, tx);
 623                 ds->ds_dir->dd_phys->dd_origin_obj = prev->ds_object;
 624 
 625                 dmu_buf_will_dirty(prev->ds_dbuf, tx);
 626                 prev->ds_phys->ds_num_children++;
 627 
 628                 if (ds->ds_phys->ds_next_snap_obj == 0) {
 629                         ASSERT(ds->ds_prev == NULL);
 630                         VERIFY(0 == dsl_dataset_hold_obj(dp,
 631                             ds->ds_phys->ds_prev_snap_obj, ds, &ds->ds_prev));
 632                 }
 633         }
 634 
 635         ASSERT(ds->ds_dir->dd_phys->dd_origin_obj == prev->ds_object);
 636         ASSERT(ds->ds_phys->ds_prev_snap_obj == prev->ds_object);
 637 
 638         if (prev->ds_phys->ds_next_clones_obj == 0) {
 639                 dmu_buf_will_dirty(prev->ds_dbuf, tx);
 640                 prev->ds_phys->ds_next_clones_obj =
 641                     zap_create(dp->dp_meta_objset,
 642                     DMU_OT_NEXT_CLONES, DMU_OT_NONE, 0, tx);
 643         }
 644         VERIFY(0 == zap_add_int(dp->dp_meta_objset,
 645             prev->ds_phys->ds_next_clones_obj, ds->ds_object, tx));
 646 
 647         dsl_dataset_rele(ds, FTAG);
 648         if (prev != dp->dp_origin_snap)
 649                 dsl_dataset_rele(prev, FTAG);
 650         return (0);
 651 }
 652 
 653 void
 654 dsl_pool_upgrade_clones(dsl_pool_t *dp, dmu_tx_t *tx)
 655 {
 656         ASSERT(dmu_tx_is_syncing(tx));
 657         ASSERT(dp->dp_origin_snap != NULL);
 658 
 659         VERIFY3U(0, ==, dmu_objset_find_spa(dp->dp_spa, NULL, upgrade_clones_cb,
 660             tx, DS_FIND_CHILDREN));
 661 }
 662 
 663 /* ARGSUSED */
 664 static int
 665 upgrade_dir_clones_cb(spa_t *spa, uint64_t dsobj, const char *dsname, void *arg)
 666 {
 667         dmu_tx_t *tx = arg;
 668         dsl_dataset_t *ds;
 669         dsl_pool_t *dp = spa_get_dsl(spa);
 670         objset_t *mos = dp->dp_meta_objset;
 671 
 672         VERIFY3U(0, ==, dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds));
 673 
 674         if (ds->ds_dir->dd_phys->dd_origin_obj) {
 675                 dsl_dataset_t *origin;
 676 
 677                 VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
 678                     ds->ds_dir->dd_phys->dd_origin_obj, FTAG, &origin));
 679 
 680                 if (origin->ds_dir->dd_phys->dd_clones == 0) {
 681                         dmu_buf_will_dirty(origin->ds_dir->dd_dbuf, tx);
 682                         origin->ds_dir->dd_phys->dd_clones = zap_create(mos,
 683                             DMU_OT_DSL_CLONES, DMU_OT_NONE, 0, tx);
 684                 }
 685 
 686                 VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
 687                     origin->ds_dir->dd_phys->dd_clones, dsobj, tx));
 688 
 689                 dsl_dataset_rele(origin, FTAG);
 690         }
 691 
 692         dsl_dataset_rele(ds, FTAG);
 693         return (0);
 694 }
 695 
 696 void
 697 dsl_pool_upgrade_dir_clones(dsl_pool_t *dp, dmu_tx_t *tx)
 698 {
 699         ASSERT(dmu_tx_is_syncing(tx));
 700         uint64_t obj;
 701 
 702         (void) dsl_dir_create_sync(dp, dp->dp_root_dir, FREE_DIR_NAME, tx);
 703         VERIFY(0 == dsl_pool_open_special_dir(dp,
 704             FREE_DIR_NAME, &dp->dp_free_dir));
 705 
 706         /*
 707          * We can't use bpobj_alloc(), because spa_version() still
 708          * returns the old version, and we need a new-version bpobj with
 709          * subobj support.  So call dmu_object_alloc() directly.
 710          */
 711         obj = dmu_object_alloc(dp->dp_meta_objset, DMU_OT_BPOBJ,
 712             SPA_MAXBLOCKSIZE, DMU_OT_BPOBJ_HDR, sizeof (bpobj_phys_t), tx);
 713         VERIFY3U(0, ==, zap_add(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
 714             DMU_POOL_FREE_BPOBJ, sizeof (uint64_t), 1, &obj, tx));
 715         VERIFY3U(0, ==, bpobj_open(&dp->dp_free_bpobj,
 716             dp->dp_meta_objset, obj));
 717 
 718         VERIFY3U(0, ==, dmu_objset_find_spa(dp->dp_spa, NULL,
 719             upgrade_dir_clones_cb, tx, DS_FIND_CHILDREN));
 720 }
 721 
 722 void
 723 dsl_pool_create_origin(dsl_pool_t *dp, dmu_tx_t *tx)
 724 {
 725         uint64_t dsobj;
 726         dsl_dataset_t *ds;
 727 
 728         ASSERT(dmu_tx_is_syncing(tx));
 729         ASSERT(dp->dp_origin_snap == NULL);
 730 
 731         /* create the origin dir, ds, & snap-ds */
 732         rw_enter(&dp->dp_config_rwlock, RW_WRITER);
 733         dsobj = dsl_dataset_create_sync(dp->dp_root_dir, ORIGIN_DIR_NAME,
 734             NULL, 0, kcred, tx);
 735         VERIFY(0 == dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds));
 736         dsl_dataset_snapshot_sync(ds, ORIGIN_DIR_NAME, tx);
 737         VERIFY(0 == dsl_dataset_hold_obj(dp, ds->ds_phys->ds_prev_snap_obj,
 738             dp, &dp->dp_origin_snap));
 739         dsl_dataset_rele(ds, FTAG);
 740         rw_exit(&dp->dp_config_rwlock);
 741 }
 742 
 743 taskq_t *
 744 dsl_pool_vnrele_taskq(dsl_pool_t *dp)
 745 {
 746         return (dp->dp_vnrele_taskq);
 747 }
 748 
 749 /*
 750  * Walk through the pool-wide zap object of temporary snapshot user holds
 751  * and release them.
 752  */
 753 void
 754 dsl_pool_clean_tmp_userrefs(dsl_pool_t *dp)
 755 {
 756         zap_attribute_t za;
 757         zap_cursor_t zc;
 758         objset_t *mos = dp->dp_meta_objset;
 759         uint64_t zapobj = dp->dp_tmp_userrefs_obj;
 760 
 761         if (zapobj == 0)
 762                 return;
 763         ASSERT(spa_version(dp->dp_spa) >= SPA_VERSION_USERREFS);
 764 
 765         for (zap_cursor_init(&zc, mos, zapobj);
 766             zap_cursor_retrieve(&zc, &za) == 0;
 767             zap_cursor_advance(&zc)) {
 768                 char *htag;
 769                 uint64_t dsobj;
 770 
 771                 htag = strchr(za.za_name, '-');
 772                 *htag = '\0';
 773                 ++htag;
 774                 dsobj = strtonum(za.za_name, NULL);
 775                 (void) dsl_dataset_user_release_tmp(dp, dsobj, htag, B_FALSE);
 776         }
 777         zap_cursor_fini(&zc);
 778 }
 779 
 780 /*
 781  * Create the pool-wide zap object for storing temporary snapshot holds.
 782  */
 783 void
 784 dsl_pool_user_hold_create_obj(dsl_pool_t *dp, dmu_tx_t *tx)
 785 {
 786         objset_t *mos = dp->dp_meta_objset;
 787 
 788         ASSERT(dp->dp_tmp_userrefs_obj == 0);
 789         ASSERT(dmu_tx_is_syncing(tx));
 790 
 791         dp->dp_tmp_userrefs_obj = zap_create(mos, DMU_OT_USERREFS,
 792             DMU_OT_NONE, 0, tx);
 793 
 794         VERIFY(zap_add(mos, DMU_POOL_DIRECTORY_OBJECT, DMU_POOL_TMP_USERREFS,
 795             sizeof (uint64_t), 1, &dp->dp_tmp_userrefs_obj, tx) == 0);
 796 }
 797 
 798 static int
 799 dsl_pool_user_hold_rele_impl(dsl_pool_t *dp, uint64_t dsobj,
 800     const char *tag, uint64_t *now, dmu_tx_t *tx, boolean_t holding)
 801 {
 802         objset_t *mos = dp->dp_meta_objset;
 803         uint64_t zapobj = dp->dp_tmp_userrefs_obj;
 804         char *name;
 805         int error;
 806 
 807         ASSERT(spa_version(dp->dp_spa) >= SPA_VERSION_USERREFS);
 808         ASSERT(dmu_tx_is_syncing(tx));
 809 
 810         /*
 811          * If the pool was created prior to SPA_VERSION_USERREFS, the
 812          * zap object for temporary holds might not exist yet.
 813          */
 814         if (zapobj == 0) {
 815                 if (holding) {
 816                         dsl_pool_user_hold_create_obj(dp, tx);
 817                         zapobj = dp->dp_tmp_userrefs_obj;
 818                 } else {
 819                         return (ENOENT);
 820                 }
 821         }
 822 
 823         name = kmem_asprintf("%llx-%s", (u_longlong_t)dsobj, tag);
 824         if (holding)
 825                 error = zap_add(mos, zapobj, name, 8, 1, now, tx);
 826         else
 827                 error = zap_remove(mos, zapobj, name, tx);
 828         strfree(name);
 829 
 830         return (error);
 831 }
 832 
 833 /*
 834  * Add a temporary hold for the given dataset object and tag.
 835  */
 836 int
 837 dsl_pool_user_hold(dsl_pool_t *dp, uint64_t dsobj, const char *tag,
 838     uint64_t *now, dmu_tx_t *tx)
 839 {
 840         return (dsl_pool_user_hold_rele_impl(dp, dsobj, tag, now, tx, B_TRUE));
 841 }
 842 
 843 /*
 844  * Release a temporary hold for the given dataset object and tag.
 845  */
 846 int
 847 dsl_pool_user_release(dsl_pool_t *dp, uint64_t dsobj, const char *tag,
 848     dmu_tx_t *tx)
 849 {
 850         return (dsl_pool_user_hold_rele_impl(dp, dsobj, tag, NULL,
 851             tx, B_FALSE));
 852 }