Print this page
NEX-20218 Backport Illumos #9474 txg_kick() fails to see that we are quiescing, forcing transactions to their next stages without leaving them accumulate changes
MFV illumos-gate@fa41d87de9ec9000964c605eb01d6dc19e4a1abe
    9464 txg_kick() fails to see that we are quiescing, forcing transactions to their next stages without leaving them accumulate changes
    Reviewed by: Matt Ahrens <matt@delphix.com>
    Reviewed by: Brad Lewis <brad.lewis@delphix.com>
    Reviewed by: Andriy Gapon <avg@FreeBSD.org>
    Approved by: Dan McDonald <danmcd@joyent.com>
NEX-6859 TX-commit callback that is registered in sync-ctx causes system panic
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>


   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  23  * Portions Copyright 2011 Martin Matuska
  24  * Copyright (c) 2012, 2017 by Delphix. All rights reserved.

  25  */
  26 
  27 #include <sys/zfs_context.h>
  28 #include <sys/txg_impl.h>
  29 #include <sys/dmu_impl.h>
  30 #include <sys/dmu_tx.h>
  31 #include <sys/dsl_pool.h>
  32 #include <sys/dsl_scan.h>
  33 #include <sys/zil.h>
  34 #include <sys/callb.h>
  35 
  36 /*
  37  * ZFS Transaction Groups
  38  * ----------------------
  39  *
  40  * ZFS transaction groups are, as the name implies, groups of transactions
  41  * that act on persistent state. ZFS asserts consistency at the granularity of
  42  * these transaction groups. Each successive transaction group (txg) is
  43  * assigned a 64-bit consecutive identifier. There are three active
  44  * transaction group states: open, quiescing, or syncing. At any given time,


  87  *
  88  * To ensure convergence, after a certain number of passes ZFS begins
  89  * overwriting locations on stable storage that had been allocated earlier in
  90  * the syncing state (and subsequently freed). ZFS usually allocates new
  91  * blocks to optimize for large, continuous, writes. For the syncing state to
  92  * converge however it must complete a pass where no new blocks are allocated
  93  * since each allocation requires a modification of persistent metadata.
  94  * Further, to hasten convergence, after a prescribed number of passes, ZFS
  95  * also defers frees, and stops compressing.
  96  *
  97  * In addition to writing out user data, we must also execute synctasks during
  98  * the syncing context. A synctask is the mechanism by which some
  99  * administrative activities work such as creating and destroying snapshots or
 100  * datasets. Note that when a synctask is initiated it enters the open txg,
 101  * and ZFS then pushes that txg as quickly as possible to completion of the
 102  * syncing state in order to reduce the latency of the administrative
 103  * activity. To complete the syncing state, ZFS writes out a new uberblock,
 104  * the root of the tree of blocks that comprise all state stored on the ZFS
 105  * pool. Finally, if there is a quiesced txg waiting, we signal that it can
 106  * now transition to the syncing state.









 107  */
 108 
 109 static void txg_sync_thread(void *arg);
 110 static void txg_quiesce_thread(void *arg);
 111 
 112 int zfs_txg_timeout = 5;        /* max seconds worth of delta per txg */
 113 
 114 /*
 115  * Prepare the txg subsystem.
 116  */
 117 void
 118 txg_init(dsl_pool_t *dp, uint64_t txg)
 119 {
 120         tx_state_t *tx = &dp->dp_tx;
 121         int c;
 122         bzero(tx, sizeof (tx_state_t));
 123 
 124         tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
 125 
 126         for (c = 0; c < max_ncpus; c++) {


 312 void
 313 txg_rele_to_quiesce(txg_handle_t *th)
 314 {
 315         tx_cpu_t *tc = th->th_cpu;
 316 
 317         ASSERT(!MUTEX_HELD(&tc->tc_lock));
 318         mutex_exit(&tc->tc_open_lock);
 319 }
 320 
 321 void
 322 txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks)
 323 {
 324         tx_cpu_t *tc = th->th_cpu;
 325         int g = th->th_txg & TXG_MASK;
 326 
 327         mutex_enter(&tc->tc_lock);
 328         list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
 329         mutex_exit(&tc->tc_lock);
 330 }
 331 

 332 void















 333 txg_rele_to_sync(txg_handle_t *th)
 334 {
 335         tx_cpu_t *tc = th->th_cpu;
 336         int g = th->th_txg & TXG_MASK;
 337 
 338         mutex_enter(&tc->tc_lock);
 339         ASSERT(tc->tc_count[g] != 0);
 340         if (--tc->tc_count[g] == 0)
 341                 cv_broadcast(&tc->tc_cv[g]);
 342         mutex_exit(&tc->tc_lock);
 343 
 344         th->th_cpu = NULL;   /* defensive */
 345 }
 346 
 347 /*
 348  * Blocks until all transactions in the group are committed.
 349  *
 350  * On return, the transaction group has reached a stable state in which it can
 351  * then be passed off to the syncing context.
 352  */


 427                 if (tx->tx_commit_cb_taskq == NULL) {
 428                         /*
 429                          * Commit callback taskq hasn't been created yet.
 430                          */
 431                         tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
 432                             max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2,
 433                             TASKQ_PREPOPULATE);
 434                 }
 435 
 436                 cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP);
 437                 list_create(cb_list, sizeof (dmu_tx_callback_t),
 438                     offsetof(dmu_tx_callback_t, dcb_node));
 439 
 440                 list_move_tail(cb_list, &tc->tc_callbacks[g]);
 441 
 442                 (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *)
 443                     txg_do_callbacks, cb_list, TQ_SLEEP);
 444         }
 445 }
 446 
























 447 static void
 448 txg_sync_thread(void *arg)
 449 {
 450         dsl_pool_t *dp = arg;
 451         spa_t *spa = dp->dp_spa;
 452         tx_state_t *tx = &dp->dp_tx;
 453         callb_cpr_t cpr;
 454         uint64_t start, delta;
 455 
 456         txg_thread_enter(tx, &cpr);
 457 
 458         start = delta = 0;
 459         for (;;) {
 460                 uint64_t timeout = zfs_txg_timeout * hz;
 461                 uint64_t timer;
 462                 uint64_t txg;
 463 
 464                 /*
 465                  * We sync when we're scanning, there's someone waiting
 466                  * on us, or the quiesce thread has handed off a txg to
 467                  * us, or we have reached our timeout.
 468                  */
 469                 timer = (delta >= timeout ? 0 : timeout - delta);
 470                 while (!dsl_scan_active(dp->dp_scan) &&
 471                     !tx->tx_exiting && timer > 0 &&
 472                     tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
 473                     tx->tx_quiesced_txg == 0 &&
 474                     dp->dp_dirty_total < zfs_dirty_data_sync) {
 475                         dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
 476                             tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
 477                         txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
 478                         delta = ddi_get_lbolt() - start;
 479                         timer = (delta > timeout ? 0 : timeout - delta);
 480                 }
 481 
 482                 /*
 483                  * Wait until the quiesce thread hands off a txg to us,
 484                  * prompting it to do so if necessary.
 485                  */
 486                 while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
 487                         if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
 488                                 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
 489                         cv_broadcast(&tx->tx_quiesce_more_cv);
 490                         txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
 491                 }
 492 
 493                 if (tx->tx_exiting)
 494                         txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
 495 
 496                 /*
 497                  * Consume the quiesced txg which has been handed off to
 498                  * us.  This may cause the quiescing thread to now be
 499                  * able to quiesce another txg, so we must signal it.
 500                  */

 501                 txg = tx->tx_quiesced_txg;
 502                 tx->tx_quiesced_txg = 0;
 503                 tx->tx_syncing_txg = txg;
 504                 DTRACE_PROBE2(txg__syncing, dsl_pool_t *, dp, uint64_t, txg);
 505                 cv_broadcast(&tx->tx_quiesce_more_cv);
 506 
 507                 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
 508                     txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
 509                 mutex_exit(&tx->tx_sync_lock);
 510 
 511                 start = ddi_get_lbolt();
 512                 spa_sync(spa, txg);
 513                 delta = ddi_get_lbolt() - start;
 514 
 515                 mutex_enter(&tx->tx_sync_lock);
 516                 tx->tx_synced_txg = txg;
 517                 tx->tx_syncing_txg = 0;
 518                 DTRACE_PROBE2(txg__synced, dsl_pool_t *, dp, uint64_t, txg);
 519                 cv_broadcast(&tx->tx_sync_done_cv);
 520 


 529 txg_quiesce_thread(void *arg)
 530 {
 531         dsl_pool_t *dp = arg;
 532         tx_state_t *tx = &dp->dp_tx;
 533         callb_cpr_t cpr;
 534 
 535         txg_thread_enter(tx, &cpr);
 536 
 537         for (;;) {
 538                 uint64_t txg;
 539 
 540                 /*
 541                  * We quiesce when there's someone waiting on us.
 542                  * However, we can only have one txg in "quiescing" or
 543                  * "quiesced, waiting to sync" state.  So we wait until
 544                  * the "quiesced, waiting to sync" txg has been consumed
 545                  * by the sync thread.
 546                  */
 547                 while (!tx->tx_exiting &&
 548                     (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
 549                     tx->tx_quiesced_txg != 0))
 550                         txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
 551 
 552                 if (tx->tx_exiting)
 553                         txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
 554 
 555                 txg = tx->tx_open_txg;
 556                 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
 557                     txg, tx->tx_quiesce_txg_waiting,
 558                     tx->tx_sync_txg_waiting);


 559                 mutex_exit(&tx->tx_sync_lock);
 560                 txg_quiesce(dp, txg);
 561                 mutex_enter(&tx->tx_sync_lock);
 562 
 563                 /*
 564                  * Hand this txg off to the sync thread.
 565                  */
 566                 dprintf("quiesce done, handing off txg %llu\n", txg);

 567                 tx->tx_quiesced_txg = txg;
 568                 DTRACE_PROBE2(txg__quiesced, dsl_pool_t *, dp, uint64_t, txg);
 569                 cv_broadcast(&tx->tx_sync_more_cv);
 570                 cv_broadcast(&tx->tx_quiesce_done_cv);
 571         }
 572 }
 573 
 574 /*
 575  * Delay this thread by delay nanoseconds if we are still in the open
 576  * transaction group and there is already a waiting txg quiescing or quiesced.
 577  * Abort the delay if this txg stalls or enters the quiescing state.
 578  */
 579 void
 580 txg_delay(dsl_pool_t *dp, uint64_t txg, hrtime_t delay, hrtime_t resolution)
 581 {
 582         tx_state_t *tx = &dp->dp_tx;
 583         hrtime_t start = gethrtime();
 584 
 585         /* don't delay if this txg could transition to quiescing immediately */
 586         if (tx->tx_open_txg > txg ||


 644             txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
 645         while (tx->tx_open_txg < txg) {
 646                 cv_broadcast(&tx->tx_quiesce_more_cv);
 647                 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
 648         }
 649         mutex_exit(&tx->tx_sync_lock);
 650 }
 651 
 652 /*
 653  * If there isn't a txg syncing or in the pipeline, push another txg through
 654  * the pipeline by queiscing the open txg.
 655  */
 656 void
 657 txg_kick(dsl_pool_t *dp)
 658 {
 659         tx_state_t *tx = &dp->dp_tx;
 660 
 661         ASSERT(!dsl_pool_config_held(dp));
 662 
 663         mutex_enter(&tx->tx_sync_lock);
 664         if (tx->tx_syncing_txg == 0 &&

 665             tx->tx_quiesce_txg_waiting <= tx->tx_open_txg &&
 666             tx->tx_sync_txg_waiting <= tx->tx_synced_txg &&
 667             tx->tx_quiesced_txg <= tx->tx_synced_txg) {
 668                 tx->tx_quiesce_txg_waiting = tx->tx_open_txg + 1;
 669                 cv_broadcast(&tx->tx_quiesce_more_cv);
 670         }
 671         mutex_exit(&tx->tx_sync_lock);
 672 }
 673 
 674 boolean_t
 675 txg_stalled(dsl_pool_t *dp)
 676 {
 677         tx_state_t *tx = &dp->dp_tx;
 678         return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
 679 }
 680 
 681 boolean_t
 682 txg_sync_waiting(dsl_pool_t *dp)
 683 {
 684         tx_state_t *tx = &dp->dp_tx;


 805                 *tp = tn;
 806         }
 807         mutex_exit(&tl->tl_lock);
 808 
 809         return (add);
 810 }
 811 
 812 /*
 813  * Remove the head of the list and return it.
 814  */
 815 void *
 816 txg_list_remove(txg_list_t *tl, uint64_t txg)
 817 {
 818         int t = txg & TXG_MASK;
 819         txg_node_t *tn;
 820         void *p = NULL;
 821 
 822         txg_verify(tl->tl_spa, txg);
 823         mutex_enter(&tl->tl_lock);
 824         if ((tn = tl->tl_head[t]) != NULL) {
 825                 ASSERT(tn->tn_member[t]);
 826                 ASSERT(tn->tn_next[t] == NULL || tn->tn_next[t]->tn_member[t]);
 827                 p = (char *)tn - tl->tl_offset;
 828                 tl->tl_head[t] = tn->tn_next[t];
 829                 tn->tn_next[t] = NULL;
 830                 tn->tn_member[t] = 0;
 831         }
 832         mutex_exit(&tl->tl_lock);
 833 
 834         return (p);
 835 }
 836 
 837 /*
 838  * Remove a specific item from the list and return it.
 839  */
 840 void *
 841 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
 842 {
 843         int t = txg & TXG_MASK;
 844         txg_node_t *tn, **tp;
 845 
 846         txg_verify(tl->tl_spa, txg);




   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  23  * Portions Copyright 2011 Martin Matuska
  24  * Copyright (c) 2012, 2017 by Delphix. All rights reserved.
  25  * Copyright 2016 Nexenta Systems, Inc.  All rights reserved.
  26  */
  27 
  28 #include <sys/zfs_context.h>
  29 #include <sys/txg_impl.h>
  30 #include <sys/dmu_impl.h>
  31 #include <sys/dmu_tx.h>
  32 #include <sys/dsl_pool.h>
  33 #include <sys/dsl_scan.h>
  34 #include <sys/zil.h>
  35 #include <sys/callb.h>
  36 
  37 /*
  38  * ZFS Transaction Groups
  39  * ----------------------
  40  *
  41  * ZFS transaction groups are, as the name implies, groups of transactions
  42  * that act on persistent state. ZFS asserts consistency at the granularity of
  43  * these transaction groups. Each successive transaction group (txg) is
  44  * assigned a 64-bit consecutive identifier. There are three active
  45  * transaction group states: open, quiescing, or syncing. At any given time,


  88  *
  89  * To ensure convergence, after a certain number of passes ZFS begins
  90  * overwriting locations on stable storage that had been allocated earlier in
  91  * the syncing state (and subsequently freed). ZFS usually allocates new
  92  * blocks to optimize for large, continuous, writes. For the syncing state to
  93  * converge however it must complete a pass where no new blocks are allocated
  94  * since each allocation requires a modification of persistent metadata.
  95  * Further, to hasten convergence, after a prescribed number of passes, ZFS
  96  * also defers frees, and stops compressing.
  97  *
  98  * In addition to writing out user data, we must also execute synctasks during
  99  * the syncing context. A synctask is the mechanism by which some
 100  * administrative activities work such as creating and destroying snapshots or
 101  * datasets. Note that when a synctask is initiated it enters the open txg,
 102  * and ZFS then pushes that txg as quickly as possible to completion of the
 103  * syncing state in order to reduce the latency of the administrative
 104  * activity. To complete the syncing state, ZFS writes out a new uberblock,
 105  * the root of the tree of blocks that comprise all state stored on the ZFS
 106  * pool. Finally, if there is a quiesced txg waiting, we signal that it can
 107  * now transition to the syncing state.
 108  *
 109  * It is possible to register a callback for a TX, so the callback will be
 110  * called after sync of the corresponding TX-group to disk.
 111  * Required callback and its optional argument can registered by using
 112  * dmu_tx_callback_register().
 113  * All callback are executed async via taskq (see txg_dispatch_callbacks).
 114  * There are 2 possible cases when a registered callback is called:
 115  *  1) the corresponding TX is commited to disk (the first arg is 0)
 116  *  2) the corresponding TX is aborted (the first arg is ECANCELED)
 117  */
 118 
 119 static void txg_sync_thread(void *arg);
 120 static void txg_quiesce_thread(void *arg);
 121 
 122 int zfs_txg_timeout = 5;        /* max seconds worth of delta per txg */
 123 
 124 /*
 125  * Prepare the txg subsystem.
 126  */
 127 void
 128 txg_init(dsl_pool_t *dp, uint64_t txg)
 129 {
 130         tx_state_t *tx = &dp->dp_tx;
 131         int c;
 132         bzero(tx, sizeof (tx_state_t));
 133 
 134         tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
 135 
 136         for (c = 0; c < max_ncpus; c++) {


 322 void
 323 txg_rele_to_quiesce(txg_handle_t *th)
 324 {
 325         tx_cpu_t *tc = th->th_cpu;
 326 
 327         ASSERT(!MUTEX_HELD(&tc->tc_lock));
 328         mutex_exit(&tc->tc_open_lock);
 329 }
 330 
 331 void
 332 txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks)
 333 {
 334         tx_cpu_t *tc = th->th_cpu;
 335         int g = th->th_txg & TXG_MASK;
 336 
 337         mutex_enter(&tc->tc_lock);
 338         list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
 339         mutex_exit(&tc->tc_lock);
 340 }
 341 
 342 /* This register function can be called only from sync-context */
 343 void
 344 txg_register_callbacks_sync(dsl_pool_t *dp, uint64_t txg, list_t *tx_callbacks)
 345 {
 346         tx_state_t *tx = &dp->dp_tx;
 347         tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
 348         txg_handle_t th;
 349 
 350         VERIFY3U(tx->tx_syncing_txg, ==, txg);
 351 
 352         th.th_cpu = tc;
 353         th.th_txg = txg;
 354 
 355         txg_register_callbacks(&th, tx_callbacks);
 356 }
 357 
 358 void
 359 txg_rele_to_sync(txg_handle_t *th)
 360 {
 361         tx_cpu_t *tc = th->th_cpu;
 362         int g = th->th_txg & TXG_MASK;
 363 
 364         mutex_enter(&tc->tc_lock);
 365         ASSERT(tc->tc_count[g] != 0);
 366         if (--tc->tc_count[g] == 0)
 367                 cv_broadcast(&tc->tc_cv[g]);
 368         mutex_exit(&tc->tc_lock);
 369 
 370         th->th_cpu = NULL;   /* defensive */
 371 }
 372 
 373 /*
 374  * Blocks until all transactions in the group are committed.
 375  *
 376  * On return, the transaction group has reached a stable state in which it can
 377  * then be passed off to the syncing context.
 378  */


 453                 if (tx->tx_commit_cb_taskq == NULL) {
 454                         /*
 455                          * Commit callback taskq hasn't been created yet.
 456                          */
 457                         tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
 458                             max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2,
 459                             TASKQ_PREPOPULATE);
 460                 }
 461 
 462                 cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP);
 463                 list_create(cb_list, sizeof (dmu_tx_callback_t),
 464                     offsetof(dmu_tx_callback_t, dcb_node));
 465 
 466                 list_move_tail(cb_list, &tc->tc_callbacks[g]);
 467 
 468                 (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *)
 469                     txg_do_callbacks, cb_list, TQ_SLEEP);
 470         }
 471 }
 472 
 473 static boolean_t
 474 txg_is_syncing(dsl_pool_t *dp)
 475 {
 476         tx_state_t *tx = &dp->dp_tx;
 477         ASSERT(MUTEX_HELD(&tx->tx_sync_lock));
 478         return (tx->tx_syncing_txg != 0);
 479 }
 480 
 481 static boolean_t
 482 txg_is_quiescing(dsl_pool_t *dp)
 483 {
 484         tx_state_t *tx = &dp->dp_tx;
 485         ASSERT(MUTEX_HELD(&tx->tx_sync_lock));
 486         return (tx->tx_quiescing_txg != 0);
 487 }
 488 
 489 static boolean_t
 490 txg_has_quiesced_to_sync(dsl_pool_t *dp)
 491 {
 492         tx_state_t *tx = &dp->dp_tx;
 493         ASSERT(MUTEX_HELD(&tx->tx_sync_lock));
 494         return (tx->tx_quiesced_txg != 0);
 495 }
 496 
 497 static void
 498 txg_sync_thread(void *arg)
 499 {
 500         dsl_pool_t *dp = arg;
 501         spa_t *spa = dp->dp_spa;
 502         tx_state_t *tx = &dp->dp_tx;
 503         callb_cpr_t cpr;
 504         uint64_t start, delta;
 505 
 506         txg_thread_enter(tx, &cpr);
 507 
 508         start = delta = 0;
 509         for (;;) {
 510                 uint64_t timeout = zfs_txg_timeout * hz;
 511                 uint64_t timer;
 512                 uint64_t txg;
 513 
 514                 /*
 515                  * We sync when we're scanning, there's someone waiting
 516                  * on us, or the quiesce thread has handed off a txg to
 517                  * us, or we have reached our timeout.
 518                  */
 519                 timer = (delta >= timeout ? 0 : timeout - delta);
 520                 while (!dsl_scan_active(dp->dp_scan) &&
 521                     !tx->tx_exiting && timer > 0 &&
 522                     tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
 523                     !txg_has_quiesced_to_sync(dp) &&
 524                     dp->dp_dirty_total < zfs_dirty_data_sync) {
 525                         dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
 526                             tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
 527                         txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
 528                         delta = ddi_get_lbolt() - start;
 529                         timer = (delta > timeout ? 0 : timeout - delta);
 530                 }
 531 
 532                 /*
 533                  * Wait until the quiesce thread hands off a txg to us,
 534                  * prompting it to do so if necessary.
 535                  */
 536                 while (!tx->tx_exiting && !txg_has_quiesced_to_sync(dp)) {
 537                         if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
 538                                 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
 539                         cv_broadcast(&tx->tx_quiesce_more_cv);
 540                         txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
 541                 }
 542 
 543                 if (tx->tx_exiting)
 544                         txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
 545 
 546                 /*
 547                  * Consume the quiesced txg which has been handed off to
 548                  * us.  This may cause the quiescing thread to now be
 549                  * able to quiesce another txg, so we must signal it.
 550                  */
 551                 ASSERT(tx->tx_quiesced_txg != 0);
 552                 txg = tx->tx_quiesced_txg;
 553                 tx->tx_quiesced_txg = 0;
 554                 tx->tx_syncing_txg = txg;
 555                 DTRACE_PROBE2(txg__syncing, dsl_pool_t *, dp, uint64_t, txg);
 556                 cv_broadcast(&tx->tx_quiesce_more_cv);
 557 
 558                 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
 559                     txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
 560                 mutex_exit(&tx->tx_sync_lock);
 561 
 562                 start = ddi_get_lbolt();
 563                 spa_sync(spa, txg);
 564                 delta = ddi_get_lbolt() - start;
 565 
 566                 mutex_enter(&tx->tx_sync_lock);
 567                 tx->tx_synced_txg = txg;
 568                 tx->tx_syncing_txg = 0;
 569                 DTRACE_PROBE2(txg__synced, dsl_pool_t *, dp, uint64_t, txg);
 570                 cv_broadcast(&tx->tx_sync_done_cv);
 571 


 580 txg_quiesce_thread(void *arg)
 581 {
 582         dsl_pool_t *dp = arg;
 583         tx_state_t *tx = &dp->dp_tx;
 584         callb_cpr_t cpr;
 585 
 586         txg_thread_enter(tx, &cpr);
 587 
 588         for (;;) {
 589                 uint64_t txg;
 590 
 591                 /*
 592                  * We quiesce when there's someone waiting on us.
 593                  * However, we can only have one txg in "quiescing" or
 594                  * "quiesced, waiting to sync" state.  So we wait until
 595                  * the "quiesced, waiting to sync" txg has been consumed
 596                  * by the sync thread.
 597                  */
 598                 while (!tx->tx_exiting &&
 599                     (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
 600                     txg_has_quiesced_to_sync(dp)))
 601                         txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
 602 
 603                 if (tx->tx_exiting)
 604                         txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
 605 
 606                 txg = tx->tx_open_txg;
 607                 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
 608                     txg, tx->tx_quiesce_txg_waiting,
 609                     tx->tx_sync_txg_waiting);
 610                 tx->tx_quiescing_txg = txg;
 611 
 612                 mutex_exit(&tx->tx_sync_lock);
 613                 txg_quiesce(dp, txg);
 614                 mutex_enter(&tx->tx_sync_lock);
 615 
 616                 /*
 617                  * Hand this txg off to the sync thread.
 618                  */
 619                 dprintf("quiesce done, handing off txg %llu\n", txg);
 620                 tx->tx_quiescing_txg = 0;
 621                 tx->tx_quiesced_txg = txg;
 622                 DTRACE_PROBE2(txg__quiesced, dsl_pool_t *, dp, uint64_t, txg);
 623                 cv_broadcast(&tx->tx_sync_more_cv);
 624                 cv_broadcast(&tx->tx_quiesce_done_cv);
 625         }
 626 }
 627 
 628 /*
 629  * Delay this thread by delay nanoseconds if we are still in the open
 630  * transaction group and there is already a waiting txg quiescing or quiesced.
 631  * Abort the delay if this txg stalls or enters the quiescing state.
 632  */
 633 void
 634 txg_delay(dsl_pool_t *dp, uint64_t txg, hrtime_t delay, hrtime_t resolution)
 635 {
 636         tx_state_t *tx = &dp->dp_tx;
 637         hrtime_t start = gethrtime();
 638 
 639         /* don't delay if this txg could transition to quiescing immediately */
 640         if (tx->tx_open_txg > txg ||


 698             txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
 699         while (tx->tx_open_txg < txg) {
 700                 cv_broadcast(&tx->tx_quiesce_more_cv);
 701                 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
 702         }
 703         mutex_exit(&tx->tx_sync_lock);
 704 }
 705 
 706 /*
 707  * If there isn't a txg syncing or in the pipeline, push another txg through
 708  * the pipeline by queiscing the open txg.
 709  */
 710 void
 711 txg_kick(dsl_pool_t *dp)
 712 {
 713         tx_state_t *tx = &dp->dp_tx;
 714 
 715         ASSERT(!dsl_pool_config_held(dp));
 716 
 717         mutex_enter(&tx->tx_sync_lock);
 718         if (!txg_is_syncing(dp) &&
 719             !txg_is_quiescing(dp) &&
 720             tx->tx_quiesce_txg_waiting <= tx->tx_open_txg &&
 721             tx->tx_sync_txg_waiting <= tx->tx_synced_txg &&
 722             tx->tx_quiesced_txg <= tx->tx_synced_txg) {
 723                 tx->tx_quiesce_txg_waiting = tx->tx_open_txg + 1;
 724                 cv_broadcast(&tx->tx_quiesce_more_cv);
 725         }
 726         mutex_exit(&tx->tx_sync_lock);
 727 }
 728 
 729 boolean_t
 730 txg_stalled(dsl_pool_t *dp)
 731 {
 732         tx_state_t *tx = &dp->dp_tx;
 733         return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
 734 }
 735 
 736 boolean_t
 737 txg_sync_waiting(dsl_pool_t *dp)
 738 {
 739         tx_state_t *tx = &dp->dp_tx;


 860                 *tp = tn;
 861         }
 862         mutex_exit(&tl->tl_lock);
 863 
 864         return (add);
 865 }
 866 
 867 /*
 868  * Remove the head of the list and return it.
 869  */
 870 void *
 871 txg_list_remove(txg_list_t *tl, uint64_t txg)
 872 {
 873         int t = txg & TXG_MASK;
 874         txg_node_t *tn;
 875         void *p = NULL;
 876 
 877         txg_verify(tl->tl_spa, txg);
 878         mutex_enter(&tl->tl_lock);
 879         if ((tn = tl->tl_head[t]) != NULL) {


 880                 p = (char *)tn - tl->tl_offset;
 881                 tl->tl_head[t] = tn->tn_next[t];
 882                 tn->tn_next[t] = NULL;
 883                 tn->tn_member[t] = 0;
 884         }
 885         mutex_exit(&tl->tl_lock);
 886 
 887         return (p);
 888 }
 889 
 890 /*
 891  * Remove a specific item from the list and return it.
 892  */
 893 void *
 894 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
 895 {
 896         int t = txg & TXG_MASK;
 897         txg_node_t *tn, **tp;
 898 
 899         txg_verify(tl->tl_spa, txg);