1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  23  * Portions Copyright 2011 Martin Matuska
  24  * Copyright (c) 2012, 2017 by Delphix. All rights reserved.
  25  */
  26 
  27 #include <sys/zfs_context.h>
  28 #include <sys/txg_impl.h>
  29 #include <sys/dmu_impl.h>
  30 #include <sys/dmu_tx.h>
  31 #include <sys/dsl_pool.h>
  32 #include <sys/dsl_scan.h>
  33 #include <sys/zil.h>
  34 #include <sys/callb.h>
  35 
  36 /*
  37  * ZFS Transaction Groups
  38  * ----------------------
  39  *
  40  * ZFS transaction groups are, as the name implies, groups of transactions
  41  * that act on persistent state. ZFS asserts consistency at the granularity of
  42  * these transaction groups. Each successive transaction group (txg) is
  43  * assigned a 64-bit consecutive identifier. There are three active
  44  * transaction group states: open, quiescing, or syncing. At any given time,
  45  * there may be an active txg associated with each state; each active txg may
  46  * either be processing, or blocked waiting to enter the next state. There may
  47  * be up to three active txgs, and there is always a txg in the open state
  48  * (though it may be blocked waiting to enter the quiescing state). In broad
  49  * strokes, transactions -- operations that change in-memory structures -- are
  50  * accepted into the txg in the open state, and are completed while the txg is
  51  * in the open or quiescing states. The accumulated changes are written to
  52  * disk in the syncing state.
  53  *
  54  * Open
  55  *
  56  * When a new txg becomes active, it first enters the open state. New
  57  * transactions -- updates to in-memory structures -- are assigned to the
  58  * currently open txg. There is always a txg in the open state so that ZFS can
  59  * accept new changes (though the txg may refuse new changes if it has hit
  60  * some limit). ZFS advances the open txg to the next state for a variety of
  61  * reasons such as it hitting a time or size threshold, or the execution of an
  62  * administrative action that must be completed in the syncing state.
  63  *
  64  * Quiescing
  65  *
  66  * After a txg exits the open state, it enters the quiescing state. The
  67  * quiescing state is intended to provide a buffer between accepting new
  68  * transactions in the open state and writing them out to stable storage in
  69  * the syncing state. While quiescing, transactions can continue their
  70  * operation without delaying either of the other states. Typically, a txg is
  71  * in the quiescing state very briefly since the operations are bounded by
  72  * software latencies rather than, say, slower I/O latencies. After all
  73  * transactions complete, the txg is ready to enter the next state.
  74  *
  75  * Syncing
  76  *
  77  * In the syncing state, the in-memory state built up during the open and (to
  78  * a lesser degree) the quiescing states is written to stable storage. The
  79  * process of writing out modified data can, in turn modify more data. For
  80  * example when we write new blocks, we need to allocate space for them; those
  81  * allocations modify metadata (space maps)... which themselves must be
  82  * written to stable storage. During the sync state, ZFS iterates, writing out
  83  * data until it converges and all in-memory changes have been written out.
  84  * The first such pass is the largest as it encompasses all the modified user
  85  * data (as opposed to filesystem metadata). Subsequent passes typically have
  86  * far less data to write as they consist exclusively of filesystem metadata.
  87  *
  88  * To ensure convergence, after a certain number of passes ZFS begins
  89  * overwriting locations on stable storage that had been allocated earlier in
  90  * the syncing state (and subsequently freed). ZFS usually allocates new
  91  * blocks to optimize for large, continuous, writes. For the syncing state to
  92  * converge however it must complete a pass where no new blocks are allocated
  93  * since each allocation requires a modification of persistent metadata.
  94  * Further, to hasten convergence, after a prescribed number of passes, ZFS
  95  * also defers frees, and stops compressing.
  96  *
  97  * In addition to writing out user data, we must also execute synctasks during
  98  * the syncing context. A synctask is the mechanism by which some
  99  * administrative activities work such as creating and destroying snapshots or
 100  * datasets. Note that when a synctask is initiated it enters the open txg,
 101  * and ZFS then pushes that txg as quickly as possible to completion of the
 102  * syncing state in order to reduce the latency of the administrative
 103  * activity. To complete the syncing state, ZFS writes out a new uberblock,
 104  * the root of the tree of blocks that comprise all state stored on the ZFS
 105  * pool. Finally, if there is a quiesced txg waiting, we signal that it can
 106  * now transition to the syncing state.
 107  */
 108 
 109 static void txg_sync_thread(void *arg);
 110 static void txg_quiesce_thread(void *arg);
 111 
 112 int zfs_txg_timeout = 5;        /* max seconds worth of delta per txg */
 113 
 114 /*
 115  * Prepare the txg subsystem.
 116  */
 117 void
 118 txg_init(dsl_pool_t *dp, uint64_t txg)
 119 {
 120         tx_state_t *tx = &dp->dp_tx;
 121         int c;
 122         bzero(tx, sizeof (tx_state_t));
 123 
 124         tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
 125 
 126         for (c = 0; c < max_ncpus; c++) {
 127                 int i;
 128 
 129                 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
 130                 mutex_init(&tx->tx_cpu[c].tc_open_lock, NULL, MUTEX_DEFAULT,
 131                     NULL);
 132                 for (i = 0; i < TXG_SIZE; i++) {
 133                         cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
 134                             NULL);
 135                         list_create(&tx->tx_cpu[c].tc_callbacks[i],
 136                             sizeof (dmu_tx_callback_t),
 137                             offsetof(dmu_tx_callback_t, dcb_node));
 138                 }
 139         }
 140 
 141         mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
 142 
 143         cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL);
 144         cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL);
 145         cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL);
 146         cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL);
 147         cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL);
 148 
 149         tx->tx_open_txg = txg;
 150 }
 151 
 152 /*
 153  * Close down the txg subsystem.
 154  */
 155 void
 156 txg_fini(dsl_pool_t *dp)
 157 {
 158         tx_state_t *tx = &dp->dp_tx;
 159         int c;
 160 
 161         ASSERT0(tx->tx_threads);
 162 
 163         mutex_destroy(&tx->tx_sync_lock);
 164 
 165         cv_destroy(&tx->tx_sync_more_cv);
 166         cv_destroy(&tx->tx_sync_done_cv);
 167         cv_destroy(&tx->tx_quiesce_more_cv);
 168         cv_destroy(&tx->tx_quiesce_done_cv);
 169         cv_destroy(&tx->tx_exit_cv);
 170 
 171         for (c = 0; c < max_ncpus; c++) {
 172                 int i;
 173 
 174                 mutex_destroy(&tx->tx_cpu[c].tc_open_lock);
 175                 mutex_destroy(&tx->tx_cpu[c].tc_lock);
 176                 for (i = 0; i < TXG_SIZE; i++) {
 177                         cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
 178                         list_destroy(&tx->tx_cpu[c].tc_callbacks[i]);
 179                 }
 180         }
 181 
 182         if (tx->tx_commit_cb_taskq != NULL)
 183                 taskq_destroy(tx->tx_commit_cb_taskq);
 184 
 185         kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
 186 
 187         bzero(tx, sizeof (tx_state_t));
 188 }
 189 
 190 /*
 191  * Start syncing transaction groups.
 192  */
 193 void
 194 txg_sync_start(dsl_pool_t *dp)
 195 {
 196         tx_state_t *tx = &dp->dp_tx;
 197 
 198         mutex_enter(&tx->tx_sync_lock);
 199 
 200         dprintf("pool %p\n", dp);
 201 
 202         ASSERT0(tx->tx_threads);
 203 
 204         tx->tx_threads = 2;
 205 
 206         tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
 207             dp, 0, &p0, TS_RUN, minclsyspri);
 208 
 209         /*
 210          * The sync thread can need a larger-than-default stack size on
 211          * 32-bit x86.  This is due in part to nested pools and
 212          * scrub_visitbp() recursion.
 213          */
 214         tx->tx_sync_thread = thread_create(NULL, 32<<10, txg_sync_thread,
 215             dp, 0, &p0, TS_RUN, minclsyspri);
 216 
 217         mutex_exit(&tx->tx_sync_lock);
 218 }
 219 
 220 static void
 221 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
 222 {
 223         CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
 224         mutex_enter(&tx->tx_sync_lock);
 225 }
 226 
 227 static void
 228 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
 229 {
 230         ASSERT(*tpp != NULL);
 231         *tpp = NULL;
 232         tx->tx_threads--;
 233         cv_broadcast(&tx->tx_exit_cv);
 234         CALLB_CPR_EXIT(cpr);            /* drops &tx->tx_sync_lock */
 235         thread_exit();
 236 }
 237 
 238 static void
 239 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, clock_t time)
 240 {
 241         CALLB_CPR_SAFE_BEGIN(cpr);
 242 
 243         if (time)
 244                 (void) cv_timedwait(cv, &tx->tx_sync_lock,
 245                     ddi_get_lbolt() + time);
 246         else
 247                 cv_wait(cv, &tx->tx_sync_lock);
 248 
 249         CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
 250 }
 251 
 252 /*
 253  * Stop syncing transaction groups.
 254  */
 255 void
 256 txg_sync_stop(dsl_pool_t *dp)
 257 {
 258         tx_state_t *tx = &dp->dp_tx;
 259 
 260         dprintf("pool %p\n", dp);
 261         /*
 262          * Finish off any work in progress.
 263          */
 264         ASSERT3U(tx->tx_threads, ==, 2);
 265 
 266         /*
 267          * We need to ensure that we've vacated the deferred space_maps.
 268          */
 269         txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE);
 270 
 271         /*
 272          * Wake all sync threads and wait for them to die.
 273          */
 274         mutex_enter(&tx->tx_sync_lock);
 275 
 276         ASSERT3U(tx->tx_threads, ==, 2);
 277 
 278         tx->tx_exiting = 1;
 279 
 280         cv_broadcast(&tx->tx_quiesce_more_cv);
 281         cv_broadcast(&tx->tx_quiesce_done_cv);
 282         cv_broadcast(&tx->tx_sync_more_cv);
 283 
 284         while (tx->tx_threads != 0)
 285                 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
 286 
 287         tx->tx_exiting = 0;
 288 
 289         mutex_exit(&tx->tx_sync_lock);
 290 }
 291 
 292 uint64_t
 293 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
 294 {
 295         tx_state_t *tx = &dp->dp_tx;
 296         tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
 297         uint64_t txg;
 298 
 299         mutex_enter(&tc->tc_open_lock);
 300         txg = tx->tx_open_txg;
 301 
 302         mutex_enter(&tc->tc_lock);
 303         tc->tc_count[txg & TXG_MASK]++;
 304         mutex_exit(&tc->tc_lock);
 305 
 306         th->th_cpu = tc;
 307         th->th_txg = txg;
 308 
 309         return (txg);
 310 }
 311 
 312 void
 313 txg_rele_to_quiesce(txg_handle_t *th)
 314 {
 315         tx_cpu_t *tc = th->th_cpu;
 316 
 317         ASSERT(!MUTEX_HELD(&tc->tc_lock));
 318         mutex_exit(&tc->tc_open_lock);
 319 }
 320 
 321 void
 322 txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks)
 323 {
 324         tx_cpu_t *tc = th->th_cpu;
 325         int g = th->th_txg & TXG_MASK;
 326 
 327         mutex_enter(&tc->tc_lock);
 328         list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
 329         mutex_exit(&tc->tc_lock);
 330 }
 331 
 332 void
 333 txg_rele_to_sync(txg_handle_t *th)
 334 {
 335         tx_cpu_t *tc = th->th_cpu;
 336         int g = th->th_txg & TXG_MASK;
 337 
 338         mutex_enter(&tc->tc_lock);
 339         ASSERT(tc->tc_count[g] != 0);
 340         if (--tc->tc_count[g] == 0)
 341                 cv_broadcast(&tc->tc_cv[g]);
 342         mutex_exit(&tc->tc_lock);
 343 
 344         th->th_cpu = NULL;   /* defensive */
 345 }
 346 
 347 /*
 348  * Blocks until all transactions in the group are committed.
 349  *
 350  * On return, the transaction group has reached a stable state in which it can
 351  * then be passed off to the syncing context.
 352  */
 353 static void
 354 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
 355 {
 356         tx_state_t *tx = &dp->dp_tx;
 357         int g = txg & TXG_MASK;
 358         int c;
 359 
 360         /*
 361          * Grab all tc_open_locks so nobody else can get into this txg.
 362          */
 363         for (c = 0; c < max_ncpus; c++)
 364                 mutex_enter(&tx->tx_cpu[c].tc_open_lock);
 365 
 366         ASSERT(txg == tx->tx_open_txg);
 367         tx->tx_open_txg++;
 368         tx->tx_open_time = gethrtime();
 369 
 370         DTRACE_PROBE2(txg__quiescing, dsl_pool_t *, dp, uint64_t, txg);
 371         DTRACE_PROBE2(txg__opened, dsl_pool_t *, dp, uint64_t, tx->tx_open_txg);
 372 
 373         /*
 374          * Now that we've incremented tx_open_txg, we can let threads
 375          * enter the next transaction group.
 376          */
 377         for (c = 0; c < max_ncpus; c++)
 378                 mutex_exit(&tx->tx_cpu[c].tc_open_lock);
 379 
 380         /*
 381          * Quiesce the transaction group by waiting for everyone to txg_exit().
 382          */
 383         for (c = 0; c < max_ncpus; c++) {
 384                 tx_cpu_t *tc = &tx->tx_cpu[c];
 385                 mutex_enter(&tc->tc_lock);
 386                 while (tc->tc_count[g] != 0)
 387                         cv_wait(&tc->tc_cv[g], &tc->tc_lock);
 388                 mutex_exit(&tc->tc_lock);
 389         }
 390 }
 391 
 392 static void
 393 txg_do_callbacks(list_t *cb_list)
 394 {
 395         dmu_tx_do_callbacks(cb_list, 0);
 396 
 397         list_destroy(cb_list);
 398 
 399         kmem_free(cb_list, sizeof (list_t));
 400 }
 401 
 402 /*
 403  * Dispatch the commit callbacks registered on this txg to worker threads.
 404  *
 405  * If no callbacks are registered for a given TXG, nothing happens.
 406  * This function creates a taskq for the associated pool, if needed.
 407  */
 408 static void
 409 txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
 410 {
 411         int c;
 412         tx_state_t *tx = &dp->dp_tx;
 413         list_t *cb_list;
 414 
 415         for (c = 0; c < max_ncpus; c++) {
 416                 tx_cpu_t *tc = &tx->tx_cpu[c];
 417                 /*
 418                  * No need to lock tx_cpu_t at this point, since this can
 419                  * only be called once a txg has been synced.
 420                  */
 421 
 422                 int g = txg & TXG_MASK;
 423 
 424                 if (list_is_empty(&tc->tc_callbacks[g]))
 425                         continue;
 426 
 427                 if (tx->tx_commit_cb_taskq == NULL) {
 428                         /*
 429                          * Commit callback taskq hasn't been created yet.
 430                          */
 431                         tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
 432                             max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2,
 433                             TASKQ_PREPOPULATE);
 434                 }
 435 
 436                 cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP);
 437                 list_create(cb_list, sizeof (dmu_tx_callback_t),
 438                     offsetof(dmu_tx_callback_t, dcb_node));
 439 
 440                 list_move_tail(cb_list, &tc->tc_callbacks[g]);
 441 
 442                 (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *)
 443                     txg_do_callbacks, cb_list, TQ_SLEEP);
 444         }
 445 }
 446 
 447 static void
 448 txg_sync_thread(void *arg)
 449 {
 450         dsl_pool_t *dp = arg;
 451         spa_t *spa = dp->dp_spa;
 452         tx_state_t *tx = &dp->dp_tx;
 453         callb_cpr_t cpr;
 454         uint64_t start, delta;
 455 
 456         txg_thread_enter(tx, &cpr);
 457 
 458         start = delta = 0;
 459         for (;;) {
 460                 uint64_t timeout = zfs_txg_timeout * hz;
 461                 uint64_t timer;
 462                 uint64_t txg;
 463 
 464                 /*
 465                  * We sync when we're scanning, there's someone waiting
 466                  * on us, or the quiesce thread has handed off a txg to
 467                  * us, or we have reached our timeout.
 468                  */
 469                 timer = (delta >= timeout ? 0 : timeout - delta);
 470                 while (!dsl_scan_active(dp->dp_scan) &&
 471                     !tx->tx_exiting && timer > 0 &&
 472                     tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
 473                     tx->tx_quiesced_txg == 0 &&
 474                     dp->dp_dirty_total < zfs_dirty_data_sync) {
 475                         dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
 476                             tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
 477                         txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
 478                         delta = ddi_get_lbolt() - start;
 479                         timer = (delta > timeout ? 0 : timeout - delta);
 480                 }
 481 
 482                 /*
 483                  * Wait until the quiesce thread hands off a txg to us,
 484                  * prompting it to do so if necessary.
 485                  */
 486                 while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
 487                         if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
 488                                 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
 489                         cv_broadcast(&tx->tx_quiesce_more_cv);
 490                         txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
 491                 }
 492 
 493                 if (tx->tx_exiting)
 494                         txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
 495 
 496                 /*
 497                  * Consume the quiesced txg which has been handed off to
 498                  * us.  This may cause the quiescing thread to now be
 499                  * able to quiesce another txg, so we must signal it.
 500                  */
 501                 txg = tx->tx_quiesced_txg;
 502                 tx->tx_quiesced_txg = 0;
 503                 tx->tx_syncing_txg = txg;
 504                 DTRACE_PROBE2(txg__syncing, dsl_pool_t *, dp, uint64_t, txg);
 505                 cv_broadcast(&tx->tx_quiesce_more_cv);
 506 
 507                 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
 508                     txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
 509                 mutex_exit(&tx->tx_sync_lock);
 510 
 511                 start = ddi_get_lbolt();
 512                 spa_sync(spa, txg);
 513                 delta = ddi_get_lbolt() - start;
 514 
 515                 mutex_enter(&tx->tx_sync_lock);
 516                 tx->tx_synced_txg = txg;
 517                 tx->tx_syncing_txg = 0;
 518                 DTRACE_PROBE2(txg__synced, dsl_pool_t *, dp, uint64_t, txg);
 519                 cv_broadcast(&tx->tx_sync_done_cv);
 520 
 521                 /*
 522                  * Dispatch commit callbacks to worker threads.
 523                  */
 524                 txg_dispatch_callbacks(dp, txg);
 525         }
 526 }
 527 
 528 static void
 529 txg_quiesce_thread(void *arg)
 530 {
 531         dsl_pool_t *dp = arg;
 532         tx_state_t *tx = &dp->dp_tx;
 533         callb_cpr_t cpr;
 534 
 535         txg_thread_enter(tx, &cpr);
 536 
 537         for (;;) {
 538                 uint64_t txg;
 539 
 540                 /*
 541                  * We quiesce when there's someone waiting on us.
 542                  * However, we can only have one txg in "quiescing" or
 543                  * "quiesced, waiting to sync" state.  So we wait until
 544                  * the "quiesced, waiting to sync" txg has been consumed
 545                  * by the sync thread.
 546                  */
 547                 while (!tx->tx_exiting &&
 548                     (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
 549                     tx->tx_quiesced_txg != 0))
 550                         txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
 551 
 552                 if (tx->tx_exiting)
 553                         txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
 554 
 555                 txg = tx->tx_open_txg;
 556                 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
 557                     txg, tx->tx_quiesce_txg_waiting,
 558                     tx->tx_sync_txg_waiting);
 559                 mutex_exit(&tx->tx_sync_lock);
 560                 txg_quiesce(dp, txg);
 561                 mutex_enter(&tx->tx_sync_lock);
 562 
 563                 /*
 564                  * Hand this txg off to the sync thread.
 565                  */
 566                 dprintf("quiesce done, handing off txg %llu\n", txg);
 567                 tx->tx_quiesced_txg = txg;
 568                 DTRACE_PROBE2(txg__quiesced, dsl_pool_t *, dp, uint64_t, txg);
 569                 cv_broadcast(&tx->tx_sync_more_cv);
 570                 cv_broadcast(&tx->tx_quiesce_done_cv);
 571         }
 572 }
 573 
 574 /*
 575  * Delay this thread by delay nanoseconds if we are still in the open
 576  * transaction group and there is already a waiting txg quiescing or quiesced.
 577  * Abort the delay if this txg stalls or enters the quiescing state.
 578  */
 579 void
 580 txg_delay(dsl_pool_t *dp, uint64_t txg, hrtime_t delay, hrtime_t resolution)
 581 {
 582         tx_state_t *tx = &dp->dp_tx;
 583         hrtime_t start = gethrtime();
 584 
 585         /* don't delay if this txg could transition to quiescing immediately */
 586         if (tx->tx_open_txg > txg ||
 587             tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
 588                 return;
 589 
 590         mutex_enter(&tx->tx_sync_lock);
 591         if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
 592                 mutex_exit(&tx->tx_sync_lock);
 593                 return;
 594         }
 595 
 596         while (gethrtime() - start < delay &&
 597             tx->tx_syncing_txg < txg-1 && !txg_stalled(dp)) {
 598                 (void) cv_timedwait_hires(&tx->tx_quiesce_more_cv,
 599                     &tx->tx_sync_lock, delay, resolution, 0);
 600         }
 601 
 602         mutex_exit(&tx->tx_sync_lock);
 603 }
 604 
 605 void
 606 txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
 607 {
 608         tx_state_t *tx = &dp->dp_tx;
 609 
 610         ASSERT(!dsl_pool_config_held(dp));
 611 
 612         mutex_enter(&tx->tx_sync_lock);
 613         ASSERT3U(tx->tx_threads, ==, 2);
 614         if (txg == 0)
 615                 txg = tx->tx_open_txg + TXG_DEFER_SIZE;
 616         if (tx->tx_sync_txg_waiting < txg)
 617                 tx->tx_sync_txg_waiting = txg;
 618         dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
 619             txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
 620         while (tx->tx_synced_txg < txg) {
 621                 dprintf("broadcasting sync more "
 622                     "tx_synced=%llu waiting=%llu dp=%p\n",
 623                     tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
 624                 cv_broadcast(&tx->tx_sync_more_cv);
 625                 cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
 626         }
 627         mutex_exit(&tx->tx_sync_lock);
 628 }
 629 
 630 void
 631 txg_wait_open(dsl_pool_t *dp, uint64_t txg)
 632 {
 633         tx_state_t *tx = &dp->dp_tx;
 634 
 635         ASSERT(!dsl_pool_config_held(dp));
 636 
 637         mutex_enter(&tx->tx_sync_lock);
 638         ASSERT3U(tx->tx_threads, ==, 2);
 639         if (txg == 0)
 640                 txg = tx->tx_open_txg + 1;
 641         if (tx->tx_quiesce_txg_waiting < txg)
 642                 tx->tx_quiesce_txg_waiting = txg;
 643         dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
 644             txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
 645         while (tx->tx_open_txg < txg) {
 646                 cv_broadcast(&tx->tx_quiesce_more_cv);
 647                 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
 648         }
 649         mutex_exit(&tx->tx_sync_lock);
 650 }
 651 
 652 /*
 653  * If there isn't a txg syncing or in the pipeline, push another txg through
 654  * the pipeline by queiscing the open txg.
 655  */
 656 void
 657 txg_kick(dsl_pool_t *dp)
 658 {
 659         tx_state_t *tx = &dp->dp_tx;
 660 
 661         ASSERT(!dsl_pool_config_held(dp));
 662 
 663         mutex_enter(&tx->tx_sync_lock);
 664         if (tx->tx_syncing_txg == 0 &&
 665             tx->tx_quiesce_txg_waiting <= tx->tx_open_txg &&
 666             tx->tx_sync_txg_waiting <= tx->tx_synced_txg &&
 667             tx->tx_quiesced_txg <= tx->tx_synced_txg) {
 668                 tx->tx_quiesce_txg_waiting = tx->tx_open_txg + 1;
 669                 cv_broadcast(&tx->tx_quiesce_more_cv);
 670         }
 671         mutex_exit(&tx->tx_sync_lock);
 672 }
 673 
 674 boolean_t
 675 txg_stalled(dsl_pool_t *dp)
 676 {
 677         tx_state_t *tx = &dp->dp_tx;
 678         return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
 679 }
 680 
 681 boolean_t
 682 txg_sync_waiting(dsl_pool_t *dp)
 683 {
 684         tx_state_t *tx = &dp->dp_tx;
 685 
 686         return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
 687             tx->tx_quiesced_txg != 0);
 688 }
 689 
 690 /*
 691  * Verify that this txg is active (open, quiescing, syncing).  Non-active
 692  * txg's should not be manipulated.
 693  */
 694 void
 695 txg_verify(spa_t *spa, uint64_t txg)
 696 {
 697         dsl_pool_t *dp = spa_get_dsl(spa);
 698         if (txg <= TXG_INITIAL || txg == ZILTEST_TXG)
 699                 return;
 700         ASSERT3U(txg, <=, dp->dp_tx.tx_open_txg);
 701         ASSERT3U(txg, >=, dp->dp_tx.tx_synced_txg);
 702         ASSERT3U(txg, >=, dp->dp_tx.tx_open_txg - TXG_CONCURRENT_STATES);
 703 }
 704 
 705 /*
 706  * Per-txg object lists.
 707  */
 708 void
 709 txg_list_create(txg_list_t *tl, spa_t *spa, size_t offset)
 710 {
 711         int t;
 712 
 713         mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
 714 
 715         tl->tl_offset = offset;
 716         tl->tl_spa = spa;
 717 
 718         for (t = 0; t < TXG_SIZE; t++)
 719                 tl->tl_head[t] = NULL;
 720 }
 721 
 722 void
 723 txg_list_destroy(txg_list_t *tl)
 724 {
 725         int t;
 726 
 727         for (t = 0; t < TXG_SIZE; t++)
 728                 ASSERT(txg_list_empty(tl, t));
 729 
 730         mutex_destroy(&tl->tl_lock);
 731 }
 732 
 733 boolean_t
 734 txg_list_empty(txg_list_t *tl, uint64_t txg)
 735 {
 736         txg_verify(tl->tl_spa, txg);
 737         return (tl->tl_head[txg & TXG_MASK] == NULL);
 738 }
 739 
 740 /*
 741  * Returns true if all txg lists are empty.
 742  *
 743  * Warning: this is inherently racy (an item could be added immediately
 744  * after this function returns). We don't bother with the lock because
 745  * it wouldn't change the semantics.
 746  */
 747 boolean_t
 748 txg_all_lists_empty(txg_list_t *tl)
 749 {
 750         for (int i = 0; i < TXG_SIZE; i++) {
 751                 if (!txg_list_empty(tl, i)) {
 752                         return (B_FALSE);
 753                 }
 754         }
 755         return (B_TRUE);
 756 }
 757 
 758 /*
 759  * Add an entry to the list (unless it's already on the list).
 760  * Returns B_TRUE if it was actually added.
 761  */
 762 boolean_t
 763 txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
 764 {
 765         int t = txg & TXG_MASK;
 766         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
 767         boolean_t add;
 768 
 769         txg_verify(tl->tl_spa, txg);
 770         mutex_enter(&tl->tl_lock);
 771         add = (tn->tn_member[t] == 0);
 772         if (add) {
 773                 tn->tn_member[t] = 1;
 774                 tn->tn_next[t] = tl->tl_head[t];
 775                 tl->tl_head[t] = tn;
 776         }
 777         mutex_exit(&tl->tl_lock);
 778 
 779         return (add);
 780 }
 781 
 782 /*
 783  * Add an entry to the end of the list, unless it's already on the list.
 784  * (walks list to find end)
 785  * Returns B_TRUE if it was actually added.
 786  */
 787 boolean_t
 788 txg_list_add_tail(txg_list_t *tl, void *p, uint64_t txg)
 789 {
 790         int t = txg & TXG_MASK;
 791         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
 792         boolean_t add;
 793 
 794         txg_verify(tl->tl_spa, txg);
 795         mutex_enter(&tl->tl_lock);
 796         add = (tn->tn_member[t] == 0);
 797         if (add) {
 798                 txg_node_t **tp;
 799 
 800                 for (tp = &tl->tl_head[t]; *tp != NULL; tp = &(*tp)->tn_next[t])
 801                         continue;
 802 
 803                 tn->tn_member[t] = 1;
 804                 tn->tn_next[t] = NULL;
 805                 *tp = tn;
 806         }
 807         mutex_exit(&tl->tl_lock);
 808 
 809         return (add);
 810 }
 811 
 812 /*
 813  * Remove the head of the list and return it.
 814  */
 815 void *
 816 txg_list_remove(txg_list_t *tl, uint64_t txg)
 817 {
 818         int t = txg & TXG_MASK;
 819         txg_node_t *tn;
 820         void *p = NULL;
 821 
 822         txg_verify(tl->tl_spa, txg);
 823         mutex_enter(&tl->tl_lock);
 824         if ((tn = tl->tl_head[t]) != NULL) {
 825                 ASSERT(tn->tn_member[t]);
 826                 ASSERT(tn->tn_next[t] == NULL || tn->tn_next[t]->tn_member[t]);
 827                 p = (char *)tn - tl->tl_offset;
 828                 tl->tl_head[t] = tn->tn_next[t];
 829                 tn->tn_next[t] = NULL;
 830                 tn->tn_member[t] = 0;
 831         }
 832         mutex_exit(&tl->tl_lock);
 833 
 834         return (p);
 835 }
 836 
 837 /*
 838  * Remove a specific item from the list and return it.
 839  */
 840 void *
 841 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
 842 {
 843         int t = txg & TXG_MASK;
 844         txg_node_t *tn, **tp;
 845 
 846         txg_verify(tl->tl_spa, txg);
 847         mutex_enter(&tl->tl_lock);
 848 
 849         for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
 850                 if ((char *)tn - tl->tl_offset == p) {
 851                         *tp = tn->tn_next[t];
 852                         tn->tn_next[t] = NULL;
 853                         tn->tn_member[t] = 0;
 854                         mutex_exit(&tl->tl_lock);
 855                         return (p);
 856                 }
 857         }
 858 
 859         mutex_exit(&tl->tl_lock);
 860 
 861         return (NULL);
 862 }
 863 
 864 boolean_t
 865 txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
 866 {
 867         int t = txg & TXG_MASK;
 868         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
 869 
 870         txg_verify(tl->tl_spa, txg);
 871         return (tn->tn_member[t] != 0);
 872 }
 873 
 874 /*
 875  * Walk a txg list -- only safe if you know it's not changing.
 876  */
 877 void *
 878 txg_list_head(txg_list_t *tl, uint64_t txg)
 879 {
 880         int t = txg & TXG_MASK;
 881         txg_node_t *tn = tl->tl_head[t];
 882 
 883         txg_verify(tl->tl_spa, txg);
 884         return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
 885 }
 886 
 887 void *
 888 txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
 889 {
 890         int t = txg & TXG_MASK;
 891         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
 892 
 893         txg_verify(tl->tl_spa, txg);
 894         tn = tn->tn_next[t];
 895 
 896         return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
 897 }