Print this page
    
NEX-20218 Backport Illumos #9474 txg_kick() fails to see that we are quiescing, forcing transactions to their next stages without leaving them accumulate changes
MFV illumos-gate@fa41d87de9ec9000964c605eb01d6dc19e4a1abe
    9464 txg_kick() fails to see that we are quiescing, forcing transactions to their next stages without leaving them accumulate changes
    Reviewed by: Matt Ahrens <matt@delphix.com>
    Reviewed by: Brad Lewis <brad.lewis@delphix.com>
    Reviewed by: Andriy Gapon <avg@FreeBSD.org>
    Approved by: Dan McDonald <danmcd@joyent.com>
NEX-6859 TX-commit callback that is registered in sync-ctx causes system panic
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
    
      
        | Split | 
	Close | 
      
      | Expand all | 
      | Collapse all | 
    
    
          --- old/usr/src/uts/common/fs/zfs/txg.c
          +++ new/usr/src/uts/common/fs/zfs/txg.c
   1    1  /*
   2    2   * CDDL HEADER START
   3    3   *
   4    4   * The contents of this file are subject to the terms of the
   5    5   * Common Development and Distribution License (the "License").
   6    6   * You may not use this file except in compliance with the License.
   7    7   *
   8    8   * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9    9   * or http://www.opensolaris.org/os/licensing.
  10   10   * See the License for the specific language governing permissions
  11   11   * and limitations under the License.
  12   12   *
  13   13   * When distributing Covered Code, include this CDDL HEADER in each
  14   14   * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  
    | 
      ↓ open down ↓ | 
    14 lines elided | 
    
      ↑ open up ↑ | 
  
  15   15   * If applicable, add the following below this CDDL HEADER, with the
  16   16   * fields enclosed by brackets "[]" replaced with your own identifying
  17   17   * information: Portions Copyright [yyyy] [name of copyright owner]
  18   18   *
  19   19   * CDDL HEADER END
  20   20   */
  21   21  /*
  22   22   * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  23   23   * Portions Copyright 2011 Martin Matuska
  24   24   * Copyright (c) 2012, 2017 by Delphix. All rights reserved.
       25 + * Copyright 2016 Nexenta Systems, Inc.  All rights reserved.
  25   26   */
  26   27  
  27   28  #include <sys/zfs_context.h>
  28   29  #include <sys/txg_impl.h>
  29   30  #include <sys/dmu_impl.h>
  30   31  #include <sys/dmu_tx.h>
  31   32  #include <sys/dsl_pool.h>
  32   33  #include <sys/dsl_scan.h>
  33   34  #include <sys/zil.h>
  34   35  #include <sys/callb.h>
  35   36  
  36   37  /*
  37   38   * ZFS Transaction Groups
  38   39   * ----------------------
  39   40   *
  40   41   * ZFS transaction groups are, as the name implies, groups of transactions
  41   42   * that act on persistent state. ZFS asserts consistency at the granularity of
  42   43   * these transaction groups. Each successive transaction group (txg) is
  43   44   * assigned a 64-bit consecutive identifier. There are three active
  44   45   * transaction group states: open, quiescing, or syncing. At any given time,
  45   46   * there may be an active txg associated with each state; each active txg may
  46   47   * either be processing, or blocked waiting to enter the next state. There may
  47   48   * be up to three active txgs, and there is always a txg in the open state
  48   49   * (though it may be blocked waiting to enter the quiescing state). In broad
  49   50   * strokes, transactions -- operations that change in-memory structures -- are
  50   51   * accepted into the txg in the open state, and are completed while the txg is
  51   52   * in the open or quiescing states. The accumulated changes are written to
  52   53   * disk in the syncing state.
  53   54   *
  54   55   * Open
  55   56   *
  56   57   * When a new txg becomes active, it first enters the open state. New
  57   58   * transactions -- updates to in-memory structures -- are assigned to the
  58   59   * currently open txg. There is always a txg in the open state so that ZFS can
  59   60   * accept new changes (though the txg may refuse new changes if it has hit
  60   61   * some limit). ZFS advances the open txg to the next state for a variety of
  61   62   * reasons such as it hitting a time or size threshold, or the execution of an
  62   63   * administrative action that must be completed in the syncing state.
  63   64   *
  64   65   * Quiescing
  65   66   *
  66   67   * After a txg exits the open state, it enters the quiescing state. The
  67   68   * quiescing state is intended to provide a buffer between accepting new
  68   69   * transactions in the open state and writing them out to stable storage in
  69   70   * the syncing state. While quiescing, transactions can continue their
  70   71   * operation without delaying either of the other states. Typically, a txg is
  71   72   * in the quiescing state very briefly since the operations are bounded by
  72   73   * software latencies rather than, say, slower I/O latencies. After all
  73   74   * transactions complete, the txg is ready to enter the next state.
  74   75   *
  75   76   * Syncing
  76   77   *
  77   78   * In the syncing state, the in-memory state built up during the open and (to
  78   79   * a lesser degree) the quiescing states is written to stable storage. The
  79   80   * process of writing out modified data can, in turn modify more data. For
  80   81   * example when we write new blocks, we need to allocate space for them; those
  81   82   * allocations modify metadata (space maps)... which themselves must be
  82   83   * written to stable storage. During the sync state, ZFS iterates, writing out
  83   84   * data until it converges and all in-memory changes have been written out.
  84   85   * The first such pass is the largest as it encompasses all the modified user
  85   86   * data (as opposed to filesystem metadata). Subsequent passes typically have
  86   87   * far less data to write as they consist exclusively of filesystem metadata.
  87   88   *
  88   89   * To ensure convergence, after a certain number of passes ZFS begins
  89   90   * overwriting locations on stable storage that had been allocated earlier in
  90   91   * the syncing state (and subsequently freed). ZFS usually allocates new
  91   92   * blocks to optimize for large, continuous, writes. For the syncing state to
  92   93   * converge however it must complete a pass where no new blocks are allocated
  93   94   * since each allocation requires a modification of persistent metadata.
  94   95   * Further, to hasten convergence, after a prescribed number of passes, ZFS
  95   96   * also defers frees, and stops compressing.
  96   97   *
  
    | 
      ↓ open down ↓ | 
    62 lines elided | 
    
      ↑ open up ↑ | 
  
  97   98   * In addition to writing out user data, we must also execute synctasks during
  98   99   * the syncing context. A synctask is the mechanism by which some
  99  100   * administrative activities work such as creating and destroying snapshots or
 100  101   * datasets. Note that when a synctask is initiated it enters the open txg,
 101  102   * and ZFS then pushes that txg as quickly as possible to completion of the
 102  103   * syncing state in order to reduce the latency of the administrative
 103  104   * activity. To complete the syncing state, ZFS writes out a new uberblock,
 104  105   * the root of the tree of blocks that comprise all state stored on the ZFS
 105  106   * pool. Finally, if there is a quiesced txg waiting, we signal that it can
 106  107   * now transition to the syncing state.
      108 + *
      109 + * It is possible to register a callback for a TX, so the callback will be
      110 + * called after sync of the corresponding TX-group to disk.
      111 + * Required callback and its optional argument can registered by using
      112 + * dmu_tx_callback_register().
      113 + * All callback are executed async via taskq (see txg_dispatch_callbacks).
      114 + * There are 2 possible cases when a registered callback is called:
      115 + *  1) the corresponding TX is commited to disk (the first arg is 0)
      116 + *  2) the corresponding TX is aborted (the first arg is ECANCELED)
 107  117   */
 108  118  
 109  119  static void txg_sync_thread(void *arg);
 110  120  static void txg_quiesce_thread(void *arg);
 111  121  
 112  122  int zfs_txg_timeout = 5;        /* max seconds worth of delta per txg */
 113  123  
 114  124  /*
 115  125   * Prepare the txg subsystem.
 116  126   */
 117  127  void
 118  128  txg_init(dsl_pool_t *dp, uint64_t txg)
 119  129  {
 120  130          tx_state_t *tx = &dp->dp_tx;
 121  131          int c;
 122  132          bzero(tx, sizeof (tx_state_t));
 123  133  
 124  134          tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
 125  135  
 126  136          for (c = 0; c < max_ncpus; c++) {
 127  137                  int i;
 128  138  
 129  139                  mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
 130  140                  mutex_init(&tx->tx_cpu[c].tc_open_lock, NULL, MUTEX_DEFAULT,
 131  141                      NULL);
 132  142                  for (i = 0; i < TXG_SIZE; i++) {
 133  143                          cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
 134  144                              NULL);
 135  145                          list_create(&tx->tx_cpu[c].tc_callbacks[i],
 136  146                              sizeof (dmu_tx_callback_t),
 137  147                              offsetof(dmu_tx_callback_t, dcb_node));
 138  148                  }
 139  149          }
 140  150  
 141  151          mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
 142  152  
 143  153          cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL);
 144  154          cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL);
 145  155          cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL);
 146  156          cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL);
 147  157          cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL);
 148  158  
 149  159          tx->tx_open_txg = txg;
 150  160  }
 151  161  
 152  162  /*
 153  163   * Close down the txg subsystem.
 154  164   */
 155  165  void
 156  166  txg_fini(dsl_pool_t *dp)
 157  167  {
 158  168          tx_state_t *tx = &dp->dp_tx;
 159  169          int c;
 160  170  
 161  171          ASSERT0(tx->tx_threads);
 162  172  
 163  173          mutex_destroy(&tx->tx_sync_lock);
 164  174  
 165  175          cv_destroy(&tx->tx_sync_more_cv);
 166  176          cv_destroy(&tx->tx_sync_done_cv);
 167  177          cv_destroy(&tx->tx_quiesce_more_cv);
 168  178          cv_destroy(&tx->tx_quiesce_done_cv);
 169  179          cv_destroy(&tx->tx_exit_cv);
 170  180  
 171  181          for (c = 0; c < max_ncpus; c++) {
 172  182                  int i;
 173  183  
 174  184                  mutex_destroy(&tx->tx_cpu[c].tc_open_lock);
 175  185                  mutex_destroy(&tx->tx_cpu[c].tc_lock);
 176  186                  for (i = 0; i < TXG_SIZE; i++) {
 177  187                          cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
 178  188                          list_destroy(&tx->tx_cpu[c].tc_callbacks[i]);
 179  189                  }
 180  190          }
 181  191  
 182  192          if (tx->tx_commit_cb_taskq != NULL)
 183  193                  taskq_destroy(tx->tx_commit_cb_taskq);
 184  194  
 185  195          kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
 186  196  
 187  197          bzero(tx, sizeof (tx_state_t));
 188  198  }
 189  199  
 190  200  /*
 191  201   * Start syncing transaction groups.
 192  202   */
 193  203  void
 194  204  txg_sync_start(dsl_pool_t *dp)
 195  205  {
 196  206          tx_state_t *tx = &dp->dp_tx;
 197  207  
 198  208          mutex_enter(&tx->tx_sync_lock);
 199  209  
 200  210          dprintf("pool %p\n", dp);
 201  211  
 202  212          ASSERT0(tx->tx_threads);
 203  213  
 204  214          tx->tx_threads = 2;
 205  215  
 206  216          tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
 207  217              dp, 0, &p0, TS_RUN, minclsyspri);
 208  218  
 209  219          /*
 210  220           * The sync thread can need a larger-than-default stack size on
 211  221           * 32-bit x86.  This is due in part to nested pools and
 212  222           * scrub_visitbp() recursion.
 213  223           */
 214  224          tx->tx_sync_thread = thread_create(NULL, 32<<10, txg_sync_thread,
 215  225              dp, 0, &p0, TS_RUN, minclsyspri);
 216  226  
 217  227          mutex_exit(&tx->tx_sync_lock);
 218  228  }
 219  229  
 220  230  static void
 221  231  txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
 222  232  {
 223  233          CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
 224  234          mutex_enter(&tx->tx_sync_lock);
 225  235  }
 226  236  
 227  237  static void
 228  238  txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
 229  239  {
 230  240          ASSERT(*tpp != NULL);
 231  241          *tpp = NULL;
 232  242          tx->tx_threads--;
 233  243          cv_broadcast(&tx->tx_exit_cv);
 234  244          CALLB_CPR_EXIT(cpr);            /* drops &tx->tx_sync_lock */
 235  245          thread_exit();
 236  246  }
 237  247  
 238  248  static void
 239  249  txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, clock_t time)
 240  250  {
 241  251          CALLB_CPR_SAFE_BEGIN(cpr);
 242  252  
 243  253          if (time)
 244  254                  (void) cv_timedwait(cv, &tx->tx_sync_lock,
 245  255                      ddi_get_lbolt() + time);
 246  256          else
 247  257                  cv_wait(cv, &tx->tx_sync_lock);
 248  258  
 249  259          CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
 250  260  }
 251  261  
 252  262  /*
 253  263   * Stop syncing transaction groups.
 254  264   */
 255  265  void
 256  266  txg_sync_stop(dsl_pool_t *dp)
 257  267  {
 258  268          tx_state_t *tx = &dp->dp_tx;
 259  269  
 260  270          dprintf("pool %p\n", dp);
 261  271          /*
 262  272           * Finish off any work in progress.
 263  273           */
 264  274          ASSERT3U(tx->tx_threads, ==, 2);
 265  275  
 266  276          /*
 267  277           * We need to ensure that we've vacated the deferred space_maps.
 268  278           */
 269  279          txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE);
 270  280  
 271  281          /*
 272  282           * Wake all sync threads and wait for them to die.
 273  283           */
 274  284          mutex_enter(&tx->tx_sync_lock);
 275  285  
 276  286          ASSERT3U(tx->tx_threads, ==, 2);
 277  287  
 278  288          tx->tx_exiting = 1;
 279  289  
 280  290          cv_broadcast(&tx->tx_quiesce_more_cv);
 281  291          cv_broadcast(&tx->tx_quiesce_done_cv);
 282  292          cv_broadcast(&tx->tx_sync_more_cv);
 283  293  
 284  294          while (tx->tx_threads != 0)
 285  295                  cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
 286  296  
 287  297          tx->tx_exiting = 0;
 288  298  
 289  299          mutex_exit(&tx->tx_sync_lock);
 290  300  }
 291  301  
 292  302  uint64_t
 293  303  txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
 294  304  {
 295  305          tx_state_t *tx = &dp->dp_tx;
 296  306          tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
 297  307          uint64_t txg;
 298  308  
 299  309          mutex_enter(&tc->tc_open_lock);
 300  310          txg = tx->tx_open_txg;
 301  311  
 302  312          mutex_enter(&tc->tc_lock);
 303  313          tc->tc_count[txg & TXG_MASK]++;
 304  314          mutex_exit(&tc->tc_lock);
 305  315  
 306  316          th->th_cpu = tc;
 307  317          th->th_txg = txg;
 308  318  
 309  319          return (txg);
 310  320  }
 311  321  
 312  322  void
 313  323  txg_rele_to_quiesce(txg_handle_t *th)
 314  324  {
 315  325          tx_cpu_t *tc = th->th_cpu;
 316  326  
 317  327          ASSERT(!MUTEX_HELD(&tc->tc_lock));
 318  328          mutex_exit(&tc->tc_open_lock);
 319  329  }
 320  330  
 321  331  void
  
    | 
      ↓ open down ↓ | 
    205 lines elided | 
    
      ↑ open up ↑ | 
  
 322  332  txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks)
 323  333  {
 324  334          tx_cpu_t *tc = th->th_cpu;
 325  335          int g = th->th_txg & TXG_MASK;
 326  336  
 327  337          mutex_enter(&tc->tc_lock);
 328  338          list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
 329  339          mutex_exit(&tc->tc_lock);
 330  340  }
 331  341  
      342 +/* This register function can be called only from sync-context */
 332  343  void
      344 +txg_register_callbacks_sync(dsl_pool_t *dp, uint64_t txg, list_t *tx_callbacks)
      345 +{
      346 +        tx_state_t *tx = &dp->dp_tx;
      347 +        tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
      348 +        txg_handle_t th;
      349 +
      350 +        VERIFY3U(tx->tx_syncing_txg, ==, txg);
      351 +
      352 +        th.th_cpu = tc;
      353 +        th.th_txg = txg;
      354 +
      355 +        txg_register_callbacks(&th, tx_callbacks);
      356 +}
      357 +
      358 +void
 333  359  txg_rele_to_sync(txg_handle_t *th)
 334  360  {
 335  361          tx_cpu_t *tc = th->th_cpu;
 336  362          int g = th->th_txg & TXG_MASK;
 337  363  
 338  364          mutex_enter(&tc->tc_lock);
 339  365          ASSERT(tc->tc_count[g] != 0);
 340  366          if (--tc->tc_count[g] == 0)
 341  367                  cv_broadcast(&tc->tc_cv[g]);
 342  368          mutex_exit(&tc->tc_lock);
 343  369  
 344  370          th->th_cpu = NULL;      /* defensive */
 345  371  }
 346  372  
 347  373  /*
 348  374   * Blocks until all transactions in the group are committed.
 349  375   *
 350  376   * On return, the transaction group has reached a stable state in which it can
 351  377   * then be passed off to the syncing context.
 352  378   */
 353  379  static void
 354  380  txg_quiesce(dsl_pool_t *dp, uint64_t txg)
 355  381  {
 356  382          tx_state_t *tx = &dp->dp_tx;
 357  383          int g = txg & TXG_MASK;
 358  384          int c;
 359  385  
 360  386          /*
 361  387           * Grab all tc_open_locks so nobody else can get into this txg.
 362  388           */
 363  389          for (c = 0; c < max_ncpus; c++)
 364  390                  mutex_enter(&tx->tx_cpu[c].tc_open_lock);
 365  391  
 366  392          ASSERT(txg == tx->tx_open_txg);
 367  393          tx->tx_open_txg++;
 368  394          tx->tx_open_time = gethrtime();
 369  395  
 370  396          DTRACE_PROBE2(txg__quiescing, dsl_pool_t *, dp, uint64_t, txg);
 371  397          DTRACE_PROBE2(txg__opened, dsl_pool_t *, dp, uint64_t, tx->tx_open_txg);
 372  398  
 373  399          /*
 374  400           * Now that we've incremented tx_open_txg, we can let threads
 375  401           * enter the next transaction group.
 376  402           */
 377  403          for (c = 0; c < max_ncpus; c++)
 378  404                  mutex_exit(&tx->tx_cpu[c].tc_open_lock);
 379  405  
 380  406          /*
 381  407           * Quiesce the transaction group by waiting for everyone to txg_exit().
 382  408           */
 383  409          for (c = 0; c < max_ncpus; c++) {
 384  410                  tx_cpu_t *tc = &tx->tx_cpu[c];
 385  411                  mutex_enter(&tc->tc_lock);
 386  412                  while (tc->tc_count[g] != 0)
 387  413                          cv_wait(&tc->tc_cv[g], &tc->tc_lock);
 388  414                  mutex_exit(&tc->tc_lock);
 389  415          }
 390  416  }
 391  417  
 392  418  static void
 393  419  txg_do_callbacks(list_t *cb_list)
 394  420  {
 395  421          dmu_tx_do_callbacks(cb_list, 0);
 396  422  
 397  423          list_destroy(cb_list);
 398  424  
 399  425          kmem_free(cb_list, sizeof (list_t));
 400  426  }
 401  427  
 402  428  /*
 403  429   * Dispatch the commit callbacks registered on this txg to worker threads.
 404  430   *
 405  431   * If no callbacks are registered for a given TXG, nothing happens.
 406  432   * This function creates a taskq for the associated pool, if needed.
 407  433   */
 408  434  static void
 409  435  txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
 410  436  {
 411  437          int c;
 412  438          tx_state_t *tx = &dp->dp_tx;
 413  439          list_t *cb_list;
 414  440  
 415  441          for (c = 0; c < max_ncpus; c++) {
 416  442                  tx_cpu_t *tc = &tx->tx_cpu[c];
 417  443                  /*
 418  444                   * No need to lock tx_cpu_t at this point, since this can
 419  445                   * only be called once a txg has been synced.
 420  446                   */
 421  447  
 422  448                  int g = txg & TXG_MASK;
 423  449  
 424  450                  if (list_is_empty(&tc->tc_callbacks[g]))
 425  451                          continue;
 426  452  
 427  453                  if (tx->tx_commit_cb_taskq == NULL) {
 428  454                          /*
 429  455                           * Commit callback taskq hasn't been created yet.
 430  456                           */
 431  457                          tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
 432  458                              max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2,
 433  459                              TASKQ_PREPOPULATE);
 434  460                  }
 435  461  
 436  462                  cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP);
  
    | 
      ↓ open down ↓ | 
    94 lines elided | 
    
      ↑ open up ↑ | 
  
 437  463                  list_create(cb_list, sizeof (dmu_tx_callback_t),
 438  464                      offsetof(dmu_tx_callback_t, dcb_node));
 439  465  
 440  466                  list_move_tail(cb_list, &tc->tc_callbacks[g]);
 441  467  
 442  468                  (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *)
 443  469                      txg_do_callbacks, cb_list, TQ_SLEEP);
 444  470          }
 445  471  }
 446  472  
      473 +static boolean_t
      474 +txg_is_syncing(dsl_pool_t *dp)
      475 +{
      476 +        tx_state_t *tx = &dp->dp_tx;
      477 +        ASSERT(MUTEX_HELD(&tx->tx_sync_lock));
      478 +        return (tx->tx_syncing_txg != 0);
      479 +}
      480 +
      481 +static boolean_t
      482 +txg_is_quiescing(dsl_pool_t *dp)
      483 +{
      484 +        tx_state_t *tx = &dp->dp_tx;
      485 +        ASSERT(MUTEX_HELD(&tx->tx_sync_lock));
      486 +        return (tx->tx_quiescing_txg != 0);
      487 +}
      488 +
      489 +static boolean_t
      490 +txg_has_quiesced_to_sync(dsl_pool_t *dp)
      491 +{
      492 +        tx_state_t *tx = &dp->dp_tx;
      493 +        ASSERT(MUTEX_HELD(&tx->tx_sync_lock));
      494 +        return (tx->tx_quiesced_txg != 0);
      495 +}
      496 +
 447  497  static void
 448  498  txg_sync_thread(void *arg)
 449  499  {
 450  500          dsl_pool_t *dp = arg;
 451  501          spa_t *spa = dp->dp_spa;
 452  502          tx_state_t *tx = &dp->dp_tx;
 453  503          callb_cpr_t cpr;
 454  504          uint64_t start, delta;
 455  505  
 456  506          txg_thread_enter(tx, &cpr);
 457  507  
 458  508          start = delta = 0;
 459  509          for (;;) {
 460  510                  uint64_t timeout = zfs_txg_timeout * hz;
 461  511                  uint64_t timer;
 462  512                  uint64_t txg;
  
    | 
      ↓ open down ↓ | 
    6 lines elided | 
    
      ↑ open up ↑ | 
  
 463  513  
 464  514                  /*
 465  515                   * We sync when we're scanning, there's someone waiting
 466  516                   * on us, or the quiesce thread has handed off a txg to
 467  517                   * us, or we have reached our timeout.
 468  518                   */
 469  519                  timer = (delta >= timeout ? 0 : timeout - delta);
 470  520                  while (!dsl_scan_active(dp->dp_scan) &&
 471  521                      !tx->tx_exiting && timer > 0 &&
 472  522                      tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
 473      -                    tx->tx_quiesced_txg == 0 &&
      523 +                    !txg_has_quiesced_to_sync(dp) &&
 474  524                      dp->dp_dirty_total < zfs_dirty_data_sync) {
 475  525                          dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
 476  526                              tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
 477  527                          txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
 478  528                          delta = ddi_get_lbolt() - start;
 479  529                          timer = (delta > timeout ? 0 : timeout - delta);
 480  530                  }
 481  531  
 482  532                  /*
 483  533                   * Wait until the quiesce thread hands off a txg to us,
 484  534                   * prompting it to do so if necessary.
 485  535                   */
 486      -                while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
      536 +                while (!tx->tx_exiting && !txg_has_quiesced_to_sync(dp)) {
 487  537                          if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
 488  538                                  tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
 489  539                          cv_broadcast(&tx->tx_quiesce_more_cv);
 490  540                          txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
 491  541                  }
 492  542  
 493  543                  if (tx->tx_exiting)
 494  544                          txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
 495  545  
 496  546                  /*
 497  547                   * Consume the quiesced txg which has been handed off to
 498  548                   * us.  This may cause the quiescing thread to now be
 499  549                   * able to quiesce another txg, so we must signal it.
 500  550                   */
      551 +                ASSERT(tx->tx_quiesced_txg != 0);
 501  552                  txg = tx->tx_quiesced_txg;
 502  553                  tx->tx_quiesced_txg = 0;
 503  554                  tx->tx_syncing_txg = txg;
 504  555                  DTRACE_PROBE2(txg__syncing, dsl_pool_t *, dp, uint64_t, txg);
 505  556                  cv_broadcast(&tx->tx_quiesce_more_cv);
 506  557  
 507  558                  dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
 508  559                      txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
 509  560                  mutex_exit(&tx->tx_sync_lock);
 510  561  
 511  562                  start = ddi_get_lbolt();
 512  563                  spa_sync(spa, txg);
 513  564                  delta = ddi_get_lbolt() - start;
 514  565  
 515  566                  mutex_enter(&tx->tx_sync_lock);
 516  567                  tx->tx_synced_txg = txg;
 517  568                  tx->tx_syncing_txg = 0;
 518  569                  DTRACE_PROBE2(txg__synced, dsl_pool_t *, dp, uint64_t, txg);
 519  570                  cv_broadcast(&tx->tx_sync_done_cv);
 520  571  
 521  572                  /*
 522  573                   * Dispatch commit callbacks to worker threads.
 523  574                   */
 524  575                  txg_dispatch_callbacks(dp, txg);
 525  576          }
 526  577  }
 527  578  
 528  579  static void
 529  580  txg_quiesce_thread(void *arg)
 530  581  {
 531  582          dsl_pool_t *dp = arg;
 532  583          tx_state_t *tx = &dp->dp_tx;
 533  584          callb_cpr_t cpr;
 534  585  
 535  586          txg_thread_enter(tx, &cpr);
 536  587  
 537  588          for (;;) {
 538  589                  uint64_t txg;
  
    | 
      ↓ open down ↓ | 
    28 lines elided | 
    
      ↑ open up ↑ | 
  
 539  590  
 540  591                  /*
 541  592                   * We quiesce when there's someone waiting on us.
 542  593                   * However, we can only have one txg in "quiescing" or
 543  594                   * "quiesced, waiting to sync" state.  So we wait until
 544  595                   * the "quiesced, waiting to sync" txg has been consumed
 545  596                   * by the sync thread.
 546  597                   */
 547  598                  while (!tx->tx_exiting &&
 548  599                      (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
 549      -                    tx->tx_quiesced_txg != 0))
      600 +                    txg_has_quiesced_to_sync(dp)))
 550  601                          txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
 551  602  
 552  603                  if (tx->tx_exiting)
 553  604                          txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
 554  605  
 555  606                  txg = tx->tx_open_txg;
 556  607                  dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
 557  608                      txg, tx->tx_quiesce_txg_waiting,
 558  609                      tx->tx_sync_txg_waiting);
      610 +                tx->tx_quiescing_txg = txg;
      611 +
 559  612                  mutex_exit(&tx->tx_sync_lock);
 560  613                  txg_quiesce(dp, txg);
 561  614                  mutex_enter(&tx->tx_sync_lock);
 562  615  
 563  616                  /*
 564  617                   * Hand this txg off to the sync thread.
 565  618                   */
 566  619                  dprintf("quiesce done, handing off txg %llu\n", txg);
      620 +                tx->tx_quiescing_txg = 0;
 567  621                  tx->tx_quiesced_txg = txg;
 568  622                  DTRACE_PROBE2(txg__quiesced, dsl_pool_t *, dp, uint64_t, txg);
 569  623                  cv_broadcast(&tx->tx_sync_more_cv);
 570  624                  cv_broadcast(&tx->tx_quiesce_done_cv);
 571  625          }
 572  626  }
 573  627  
 574  628  /*
 575  629   * Delay this thread by delay nanoseconds if we are still in the open
 576  630   * transaction group and there is already a waiting txg quiescing or quiesced.
 577  631   * Abort the delay if this txg stalls or enters the quiescing state.
 578  632   */
 579  633  void
 580  634  txg_delay(dsl_pool_t *dp, uint64_t txg, hrtime_t delay, hrtime_t resolution)
 581  635  {
 582  636          tx_state_t *tx = &dp->dp_tx;
 583  637          hrtime_t start = gethrtime();
 584  638  
 585  639          /* don't delay if this txg could transition to quiescing immediately */
 586  640          if (tx->tx_open_txg > txg ||
 587  641              tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
 588  642                  return;
 589  643  
 590  644          mutex_enter(&tx->tx_sync_lock);
 591  645          if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
 592  646                  mutex_exit(&tx->tx_sync_lock);
 593  647                  return;
 594  648          }
 595  649  
 596  650          while (gethrtime() - start < delay &&
 597  651              tx->tx_syncing_txg < txg-1 && !txg_stalled(dp)) {
 598  652                  (void) cv_timedwait_hires(&tx->tx_quiesce_more_cv,
 599  653                      &tx->tx_sync_lock, delay, resolution, 0);
 600  654          }
 601  655  
 602  656          mutex_exit(&tx->tx_sync_lock);
 603  657  }
 604  658  
 605  659  void
 606  660  txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
 607  661  {
 608  662          tx_state_t *tx = &dp->dp_tx;
 609  663  
 610  664          ASSERT(!dsl_pool_config_held(dp));
 611  665  
 612  666          mutex_enter(&tx->tx_sync_lock);
 613  667          ASSERT3U(tx->tx_threads, ==, 2);
 614  668          if (txg == 0)
 615  669                  txg = tx->tx_open_txg + TXG_DEFER_SIZE;
 616  670          if (tx->tx_sync_txg_waiting < txg)
 617  671                  tx->tx_sync_txg_waiting = txg;
 618  672          dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
 619  673              txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
 620  674          while (tx->tx_synced_txg < txg) {
 621  675                  dprintf("broadcasting sync more "
 622  676                      "tx_synced=%llu waiting=%llu dp=%p\n",
 623  677                      tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
 624  678                  cv_broadcast(&tx->tx_sync_more_cv);
 625  679                  cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
 626  680          }
 627  681          mutex_exit(&tx->tx_sync_lock);
 628  682  }
 629  683  
 630  684  void
 631  685  txg_wait_open(dsl_pool_t *dp, uint64_t txg)
 632  686  {
 633  687          tx_state_t *tx = &dp->dp_tx;
 634  688  
 635  689          ASSERT(!dsl_pool_config_held(dp));
 636  690  
 637  691          mutex_enter(&tx->tx_sync_lock);
 638  692          ASSERT3U(tx->tx_threads, ==, 2);
 639  693          if (txg == 0)
 640  694                  txg = tx->tx_open_txg + 1;
 641  695          if (tx->tx_quiesce_txg_waiting < txg)
 642  696                  tx->tx_quiesce_txg_waiting = txg;
 643  697          dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
 644  698              txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
 645  699          while (tx->tx_open_txg < txg) {
 646  700                  cv_broadcast(&tx->tx_quiesce_more_cv);
 647  701                  cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
 648  702          }
 649  703          mutex_exit(&tx->tx_sync_lock);
 650  704  }
 651  705  
 652  706  /*
 653  707   * If there isn't a txg syncing or in the pipeline, push another txg through
  
    | 
      ↓ open down ↓ | 
    77 lines elided | 
    
      ↑ open up ↑ | 
  
 654  708   * the pipeline by queiscing the open txg.
 655  709   */
 656  710  void
 657  711  txg_kick(dsl_pool_t *dp)
 658  712  {
 659  713          tx_state_t *tx = &dp->dp_tx;
 660  714  
 661  715          ASSERT(!dsl_pool_config_held(dp));
 662  716  
 663  717          mutex_enter(&tx->tx_sync_lock);
 664      -        if (tx->tx_syncing_txg == 0 &&
      718 +        if (!txg_is_syncing(dp) &&
      719 +            !txg_is_quiescing(dp) &&
 665  720              tx->tx_quiesce_txg_waiting <= tx->tx_open_txg &&
 666  721              tx->tx_sync_txg_waiting <= tx->tx_synced_txg &&
 667  722              tx->tx_quiesced_txg <= tx->tx_synced_txg) {
 668  723                  tx->tx_quiesce_txg_waiting = tx->tx_open_txg + 1;
 669  724                  cv_broadcast(&tx->tx_quiesce_more_cv);
 670  725          }
 671  726          mutex_exit(&tx->tx_sync_lock);
 672  727  }
 673  728  
 674  729  boolean_t
 675  730  txg_stalled(dsl_pool_t *dp)
 676  731  {
 677  732          tx_state_t *tx = &dp->dp_tx;
 678  733          return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
 679  734  }
 680  735  
 681  736  boolean_t
 682  737  txg_sync_waiting(dsl_pool_t *dp)
 683  738  {
 684  739          tx_state_t *tx = &dp->dp_tx;
 685  740  
 686  741          return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
 687  742              tx->tx_quiesced_txg != 0);
 688  743  }
 689  744  
 690  745  /*
 691  746   * Verify that this txg is active (open, quiescing, syncing).  Non-active
 692  747   * txg's should not be manipulated.
 693  748   */
 694  749  void
 695  750  txg_verify(spa_t *spa, uint64_t txg)
 696  751  {
 697  752          dsl_pool_t *dp = spa_get_dsl(spa);
 698  753          if (txg <= TXG_INITIAL || txg == ZILTEST_TXG)
 699  754                  return;
 700  755          ASSERT3U(txg, <=, dp->dp_tx.tx_open_txg);
 701  756          ASSERT3U(txg, >=, dp->dp_tx.tx_synced_txg);
 702  757          ASSERT3U(txg, >=, dp->dp_tx.tx_open_txg - TXG_CONCURRENT_STATES);
 703  758  }
 704  759  
 705  760  /*
 706  761   * Per-txg object lists.
 707  762   */
 708  763  void
 709  764  txg_list_create(txg_list_t *tl, spa_t *spa, size_t offset)
 710  765  {
 711  766          int t;
 712  767  
 713  768          mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
 714  769  
 715  770          tl->tl_offset = offset;
 716  771          tl->tl_spa = spa;
 717  772  
 718  773          for (t = 0; t < TXG_SIZE; t++)
 719  774                  tl->tl_head[t] = NULL;
 720  775  }
 721  776  
 722  777  void
 723  778  txg_list_destroy(txg_list_t *tl)
 724  779  {
 725  780          int t;
 726  781  
 727  782          for (t = 0; t < TXG_SIZE; t++)
 728  783                  ASSERT(txg_list_empty(tl, t));
 729  784  
 730  785          mutex_destroy(&tl->tl_lock);
 731  786  }
 732  787  
 733  788  boolean_t
 734  789  txg_list_empty(txg_list_t *tl, uint64_t txg)
 735  790  {
 736  791          txg_verify(tl->tl_spa, txg);
 737  792          return (tl->tl_head[txg & TXG_MASK] == NULL);
 738  793  }
 739  794  
 740  795  /*
 741  796   * Returns true if all txg lists are empty.
 742  797   *
 743  798   * Warning: this is inherently racy (an item could be added immediately
 744  799   * after this function returns). We don't bother with the lock because
 745  800   * it wouldn't change the semantics.
 746  801   */
 747  802  boolean_t
 748  803  txg_all_lists_empty(txg_list_t *tl)
 749  804  {
 750  805          for (int i = 0; i < TXG_SIZE; i++) {
 751  806                  if (!txg_list_empty(tl, i)) {
 752  807                          return (B_FALSE);
 753  808                  }
 754  809          }
 755  810          return (B_TRUE);
 756  811  }
 757  812  
 758  813  /*
 759  814   * Add an entry to the list (unless it's already on the list).
 760  815   * Returns B_TRUE if it was actually added.
 761  816   */
 762  817  boolean_t
 763  818  txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
 764  819  {
 765  820          int t = txg & TXG_MASK;
 766  821          txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
 767  822          boolean_t add;
 768  823  
 769  824          txg_verify(tl->tl_spa, txg);
 770  825          mutex_enter(&tl->tl_lock);
 771  826          add = (tn->tn_member[t] == 0);
 772  827          if (add) {
 773  828                  tn->tn_member[t] = 1;
 774  829                  tn->tn_next[t] = tl->tl_head[t];
 775  830                  tl->tl_head[t] = tn;
 776  831          }
 777  832          mutex_exit(&tl->tl_lock);
 778  833  
 779  834          return (add);
 780  835  }
 781  836  
 782  837  /*
 783  838   * Add an entry to the end of the list, unless it's already on the list.
 784  839   * (walks list to find end)
 785  840   * Returns B_TRUE if it was actually added.
 786  841   */
 787  842  boolean_t
 788  843  txg_list_add_tail(txg_list_t *tl, void *p, uint64_t txg)
 789  844  {
 790  845          int t = txg & TXG_MASK;
 791  846          txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
 792  847          boolean_t add;
 793  848  
 794  849          txg_verify(tl->tl_spa, txg);
 795  850          mutex_enter(&tl->tl_lock);
 796  851          add = (tn->tn_member[t] == 0);
 797  852          if (add) {
 798  853                  txg_node_t **tp;
 799  854  
 800  855                  for (tp = &tl->tl_head[t]; *tp != NULL; tp = &(*tp)->tn_next[t])
 801  856                          continue;
 802  857  
 803  858                  tn->tn_member[t] = 1;
 804  859                  tn->tn_next[t] = NULL;
 805  860                  *tp = tn;
 806  861          }
 807  862          mutex_exit(&tl->tl_lock);
 808  863  
 809  864          return (add);
 810  865  }
 811  866  
 812  867  /*
 813  868   * Remove the head of the list and return it.
 814  869   */
  
    | 
      ↓ open down ↓ | 
    140 lines elided | 
    
      ↑ open up ↑ | 
  
 815  870  void *
 816  871  txg_list_remove(txg_list_t *tl, uint64_t txg)
 817  872  {
 818  873          int t = txg & TXG_MASK;
 819  874          txg_node_t *tn;
 820  875          void *p = NULL;
 821  876  
 822  877          txg_verify(tl->tl_spa, txg);
 823  878          mutex_enter(&tl->tl_lock);
 824  879          if ((tn = tl->tl_head[t]) != NULL) {
 825      -                ASSERT(tn->tn_member[t]);
 826      -                ASSERT(tn->tn_next[t] == NULL || tn->tn_next[t]->tn_member[t]);
 827  880                  p = (char *)tn - tl->tl_offset;
 828  881                  tl->tl_head[t] = tn->tn_next[t];
 829  882                  tn->tn_next[t] = NULL;
 830  883                  tn->tn_member[t] = 0;
 831  884          }
 832  885          mutex_exit(&tl->tl_lock);
 833  886  
 834  887          return (p);
 835  888  }
 836  889  
 837  890  /*
 838  891   * Remove a specific item from the list and return it.
 839  892   */
 840  893  void *
 841  894  txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
 842  895  {
 843  896          int t = txg & TXG_MASK;
 844  897          txg_node_t *tn, **tp;
 845  898  
 846  899          txg_verify(tl->tl_spa, txg);
 847  900          mutex_enter(&tl->tl_lock);
 848  901  
 849  902          for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
 850  903                  if ((char *)tn - tl->tl_offset == p) {
 851  904                          *tp = tn->tn_next[t];
 852  905                          tn->tn_next[t] = NULL;
 853  906                          tn->tn_member[t] = 0;
 854  907                          mutex_exit(&tl->tl_lock);
 855  908                          return (p);
 856  909                  }
 857  910          }
 858  911  
 859  912          mutex_exit(&tl->tl_lock);
 860  913  
 861  914          return (NULL);
 862  915  }
 863  916  
 864  917  boolean_t
 865  918  txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
 866  919  {
 867  920          int t = txg & TXG_MASK;
 868  921          txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
 869  922  
 870  923          txg_verify(tl->tl_spa, txg);
 871  924          return (tn->tn_member[t] != 0);
 872  925  }
 873  926  
 874  927  /*
 875  928   * Walk a txg list -- only safe if you know it's not changing.
 876  929   */
 877  930  void *
 878  931  txg_list_head(txg_list_t *tl, uint64_t txg)
 879  932  {
 880  933          int t = txg & TXG_MASK;
 881  934          txg_node_t *tn = tl->tl_head[t];
 882  935  
 883  936          txg_verify(tl->tl_spa, txg);
 884  937          return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
 885  938  }
 886  939  
 887  940  void *
 888  941  txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
 889  942  {
 890  943          int t = txg & TXG_MASK;
 891  944          txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
 892  945  
 893  946          txg_verify(tl->tl_spa, txg);
 894  947          tn = tn->tn_next[t];
 895  948  
 896  949          return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
 897  950  }
  
    | 
      ↓ open down ↓ | 
    61 lines elided | 
    
      ↑ open up ↑ | 
  
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX