Print this page
    
NEX-19083 backport OS-7314 zil_commit should omit cache thrash
9962 zil_commit should omit cache thrash
Reviewed by: Matt Ahrens <matt@delphix.com>
Reviewed by: Brad Lewis <brad.lewis@delphix.com>
Reviewed by: Patrick Mooney <patrick.mooney@joyent.com>
Reviewed by: Jerry Jelinek <jerry.jelinek@joyent.com>
Approved by: Joshua M. Clulow <josh@sysmgr.org>
NEX-9752 backport illumos 6950 ARC should cache compressed data
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
Reviewed by: Yuri Pankov <yuri.pankov@nexenta.com>
6950 ARC should cache compressed data
Reviewed by: Prakash Surya <prakash.surya@delphix.com>
Reviewed by: Dan Kimmel <dan.kimmel@delphix.com>
Reviewed by: Matt Ahrens <mahrens@delphix.com>
Reviewed by: Paul Dagnelie <pcd@delphix.com>
Reviewed by: Don Brady <don.brady@intel.com>
Reviewed by: Richard Elling <Richard.Elling@RichardElling.com>
Approved by: Richard Lowe <richlowe@richlowe.net>
NEX-5367 special vdev: sync-write options (NEW)
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
5269 zpool import slow
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: George Wilson <george@delphix.com>
Reviewed by: Dan McDonald <danmcd@omniti.com>
Approved by: Dan McDonald <danmcd@omniti.com>
4370 avoid transmitting holes during zfs send
4371 DMU code clean up
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: George Wilson <george.wilson@delphix.com>
Reviewed by: Christopher Siden <christopher.siden@delphix.com>
Reviewed by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
Approved by: Garrett D'Amore <garrett@damore.org>
re #12585 rb4049 ZFS++ work port - refactoring to improve separation of open/closed code, bug fixes, performance improvements - open code
Bug 11205: add missing libzfs_closed_stubs.c to fix opensource-only build.
ZFS plus work: special vdevs, cos, cos/vdev properties
    
      
        | Split | 
	Close | 
      
      | Expand all | 
      | Collapse all | 
    
    
          --- old/usr/src/uts/common/fs/zfs/zil.c
          +++ new/usr/src/uts/common/fs/zfs/zil.c
   1    1  /*
   2    2   * CDDL HEADER START
   3    3   *
   4    4   * The contents of this file are subject to the terms of the
   5    5   * Common Development and Distribution License (the "License").
   6    6   * You may not use this file except in compliance with the License.
   7    7   *
   8    8   * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9    9   * or http://www.opensolaris.org/os/licensing.
  10   10   * See the License for the specific language governing permissions
  11   11   * and limitations under the License.
  12   12   *
  
    | 
      ↓ open down ↓ | 
    12 lines elided | 
    
      ↑ open up ↑ | 
  
  13   13   * When distributing Covered Code, include this CDDL HEADER in each
  14   14   * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15   15   * If applicable, add the following below this CDDL HEADER, with the
  16   16   * fields enclosed by brackets "[]" replaced with your own identifying
  17   17   * information: Portions Copyright [yyyy] [name of copyright owner]
  18   18   *
  19   19   * CDDL HEADER END
  20   20   */
  21   21  /*
  22   22   * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
       23 + * Copyright 2013 Nexenta Systems, Inc.  All rights reserved.
  23   24   * Copyright (c) 2011, 2017 by Delphix. All rights reserved.
  24   25   * Copyright (c) 2014 Integros [integros.com]
  25   26   */
  26   27  
  27   28  /* Portions Copyright 2010 Robert Milkowski */
  28   29  
  29   30  #include <sys/zfs_context.h>
  30   31  #include <sys/spa.h>
  31   32  #include <sys/dmu.h>
  32   33  #include <sys/zap.h>
  33   34  #include <sys/arc.h>
  34   35  #include <sys/stat.h>
  35   36  #include <sys/resource.h>
  36   37  #include <sys/zil.h>
  37   38  #include <sys/zil_impl.h>
  38   39  #include <sys/dsl_dataset.h>
  39   40  #include <sys/vdev_impl.h>
  40   41  #include <sys/dmu_tx.h>
  41   42  #include <sys/dsl_pool.h>
  42   43  #include <sys/abd.h>
  43   44  
  44   45  /*
  45   46   * The ZFS Intent Log (ZIL) saves "transaction records" (itxs) of system
  46   47   * calls that change the file system. Each itx has enough information to
  47   48   * be able to replay them after a system crash, power loss, or
  48   49   * equivalent failure mode. These are stored in memory until either:
  49   50   *
  50   51   *   1. they are committed to the pool by the DMU transaction group
  51   52   *      (txg), at which point they can be discarded; or
  52   53   *   2. they are committed to the on-disk ZIL for the dataset being
  53   54   *      modified (e.g. due to an fsync, O_DSYNC, or other synchronous
  54   55   *      requirement).
  55   56   *
  56   57   * In the event of a crash or power loss, the itxs contained by each
  57   58   * dataset's on-disk ZIL will be replayed when that dataset is first
  58   59   * instantianted (e.g. if the dataset is a normal fileystem, when it is
  59   60   * first mounted).
  60   61   *
  61   62   * As hinted at above, there is one ZIL per dataset (both the in-memory
  62   63   * representation, and the on-disk representation). The on-disk format
  63   64   * consists of 3 parts:
  64   65   *
  65   66   *      - a single, per-dataset, ZIL header; which points to a chain of
  66   67   *      - zero or more ZIL blocks; each of which contains
  67   68   *      - zero or more ZIL records
  68   69   *
  69   70   * A ZIL record holds the information necessary to replay a single
  70   71   * system call transaction. A ZIL block can hold many ZIL records, and
  71   72   * the blocks are chained together, similarly to a singly linked list.
  72   73   *
  73   74   * Each ZIL block contains a block pointer (blkptr_t) to the next ZIL
  74   75   * block in the chain, and the ZIL header points to the first block in
  75   76   * the chain.
  76   77   *
  77   78   * Note, there is not a fixed place in the pool to hold these ZIL
  78   79   * blocks; they are dynamically allocated and freed as needed from the
  79   80   * blocks available on the pool, though they can be preferentially
  80   81   * allocated from a dedicated "log" vdev.
  81   82   */
  82   83  
  83   84  /*
  
    | 
      ↓ open down ↓ | 
    51 lines elided | 
    
      ↑ open up ↑ | 
  
  84   85   * This controls the amount of time that a ZIL block (lwb) will remain
  85   86   * "open" when it isn't "full", and it has a thread waiting for it to be
  86   87   * committed to stable storage. Please refer to the zil_commit_waiter()
  87   88   * function (and the comments within it) for more details.
  88   89   */
  89   90  int zfs_commit_timeout_pct = 5;
  90   91  
  91   92  /*
  92   93   * Disable intent logging replay.  This global ZIL switch affects all pools.
  93   94   */
  94      -int zil_replay_disable = 0;
       95 +int zil_replay_disable = 0;    /* disable intent logging replay */
  95   96  
  96   97  /*
  97   98   * Tunable parameter for debugging or performance analysis.  Setting
  98   99   * zfs_nocacheflush will cause corruption on power loss if a volatile
  99  100   * out-of-order write cache is enabled.
 100  101   */
 101  102  boolean_t zfs_nocacheflush = B_FALSE;
 102  103  
 103  104  /*
 104  105   * Limit SLOG write size per commit executed with synchronous priority.
 105  106   * Any writes above that will be executed with lower (asynchronous) priority
 106  107   * to limit potential SLOG device abuse by single active ZIL writer.
 107  108   */
 108  109  uint64_t zil_slog_bulk = 768 * 1024;
 109  110  
 110  111  static kmem_cache_t *zil_lwb_cache;
 111  112  static kmem_cache_t *zil_zcw_cache;
 112  113  
 113  114  static void zil_async_to_sync(zilog_t *zilog, uint64_t foid);
 114  115  
 115  116  #define LWB_EMPTY(lwb) ((BP_GET_LSIZE(&lwb->lwb_blk) - \
 116  117      sizeof (zil_chain_t)) == (lwb->lwb_sz - lwb->lwb_nused))
 117  118  
 118  119  static int
 119  120  zil_bp_compare(const void *x1, const void *x2)
 120  121  {
 121  122          const dva_t *dva1 = &((zil_bp_node_t *)x1)->zn_dva;
 122  123          const dva_t *dva2 = &((zil_bp_node_t *)x2)->zn_dva;
 123  124  
 124  125          if (DVA_GET_VDEV(dva1) < DVA_GET_VDEV(dva2))
 125  126                  return (-1);
 126  127          if (DVA_GET_VDEV(dva1) > DVA_GET_VDEV(dva2))
 127  128                  return (1);
 128  129  
 129  130          if (DVA_GET_OFFSET(dva1) < DVA_GET_OFFSET(dva2))
 130  131                  return (-1);
 131  132          if (DVA_GET_OFFSET(dva1) > DVA_GET_OFFSET(dva2))
 132  133                  return (1);
 133  134  
 134  135          return (0);
 135  136  }
 136  137  
 137  138  static void
 138  139  zil_bp_tree_init(zilog_t *zilog)
 139  140  {
 140  141          avl_create(&zilog->zl_bp_tree, zil_bp_compare,
 141  142              sizeof (zil_bp_node_t), offsetof(zil_bp_node_t, zn_node));
 142  143  }
 143  144  
 144  145  static void
 145  146  zil_bp_tree_fini(zilog_t *zilog)
 146  147  {
 147  148          avl_tree_t *t = &zilog->zl_bp_tree;
 148  149          zil_bp_node_t *zn;
 149  150          void *cookie = NULL;
 150  151  
 151  152          while ((zn = avl_destroy_nodes(t, &cookie)) != NULL)
 152  153                  kmem_free(zn, sizeof (zil_bp_node_t));
 153  154  
 154  155          avl_destroy(t);
 155  156  }
 156  157  
 157  158  int
 158  159  zil_bp_tree_add(zilog_t *zilog, const blkptr_t *bp)
 159  160  {
 160  161          avl_tree_t *t = &zilog->zl_bp_tree;
 161  162          const dva_t *dva;
 162  163          zil_bp_node_t *zn;
 163  164          avl_index_t where;
 164  165  
 165  166          if (BP_IS_EMBEDDED(bp))
 166  167                  return (0);
 167  168  
 168  169          dva = BP_IDENTITY(bp);
 169  170  
 170  171          if (avl_find(t, dva, &where) != NULL)
 171  172                  return (SET_ERROR(EEXIST));
 172  173  
 173  174          zn = kmem_alloc(sizeof (zil_bp_node_t), KM_SLEEP);
 174  175          zn->zn_dva = *dva;
 175  176          avl_insert(t, zn, where);
 176  177  
 177  178          return (0);
 178  179  }
 179  180  
 180  181  static zil_header_t *
 181  182  zil_header_in_syncing_context(zilog_t *zilog)
 182  183  {
 183  184          return ((zil_header_t *)zilog->zl_header);
 184  185  }
 185  186  
 186  187  static void
 187  188  zil_init_log_chain(zilog_t *zilog, blkptr_t *bp)
 188  189  {
 189  190          zio_cksum_t *zc = &bp->blk_cksum;
 190  191  
 191  192          zc->zc_word[ZIL_ZC_GUID_0] = spa_get_random(-1ULL);
 192  193          zc->zc_word[ZIL_ZC_GUID_1] = spa_get_random(-1ULL);
 193  194          zc->zc_word[ZIL_ZC_OBJSET] = dmu_objset_id(zilog->zl_os);
 194  195          zc->zc_word[ZIL_ZC_SEQ] = 1ULL;
 195  196  }
 196  197  
 197  198  /*
 198  199   * Read a log block and make sure it's valid.
 199  200   */
 200  201  static int
 201  202  zil_read_log_block(zilog_t *zilog, const blkptr_t *bp, blkptr_t *nbp, void *dst,
 202  203      char **end)
 203  204  {
 204  205          enum zio_flag zio_flags = ZIO_FLAG_CANFAIL;
 205  206          arc_flags_t aflags = ARC_FLAG_WAIT;
 206  207          arc_buf_t *abuf = NULL;
 207  208          zbookmark_phys_t zb;
 208  209          int error;
 209  210  
 210  211          if (zilog->zl_header->zh_claim_txg == 0)
 211  212                  zio_flags |= ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB;
 212  213  
 213  214          if (!(zilog->zl_header->zh_flags & ZIL_CLAIM_LR_SEQ_VALID))
 214  215                  zio_flags |= ZIO_FLAG_SPECULATIVE;
 215  216  
 216  217          SET_BOOKMARK(&zb, bp->blk_cksum.zc_word[ZIL_ZC_OBJSET],
 217  218              ZB_ZIL_OBJECT, ZB_ZIL_LEVEL, bp->blk_cksum.zc_word[ZIL_ZC_SEQ]);
 218  219  
 219  220          error = arc_read(NULL, zilog->zl_spa, bp, arc_getbuf_func, &abuf,
 220  221              ZIO_PRIORITY_SYNC_READ, zio_flags, &aflags, &zb);
 221  222  
 222  223          if (error == 0) {
 223  224                  zio_cksum_t cksum = bp->blk_cksum;
 224  225  
 225  226                  /*
 226  227                   * Validate the checksummed log block.
 227  228                   *
 228  229                   * Sequence numbers should be... sequential.  The checksum
 229  230                   * verifier for the next block should be bp's checksum plus 1.
 230  231                   *
 231  232                   * Also check the log chain linkage and size used.
 232  233                   */
 233  234                  cksum.zc_word[ZIL_ZC_SEQ]++;
 234  235  
 235  236                  if (BP_GET_CHECKSUM(bp) == ZIO_CHECKSUM_ZILOG2) {
 236  237                          zil_chain_t *zilc = abuf->b_data;
 237  238                          char *lr = (char *)(zilc + 1);
 238  239                          uint64_t len = zilc->zc_nused - sizeof (zil_chain_t);
 239  240  
 240  241                          if (bcmp(&cksum, &zilc->zc_next_blk.blk_cksum,
 241  242                              sizeof (cksum)) || BP_IS_HOLE(&zilc->zc_next_blk)) {
 242  243                                  error = SET_ERROR(ECKSUM);
 243  244                          } else {
 244  245                                  ASSERT3U(len, <=, SPA_OLD_MAXBLOCKSIZE);
 245  246                                  bcopy(lr, dst, len);
 246  247                                  *end = (char *)dst + len;
 247  248                                  *nbp = zilc->zc_next_blk;
 248  249                          }
 249  250                  } else {
 250  251                          char *lr = abuf->b_data;
 251  252                          uint64_t size = BP_GET_LSIZE(bp);
 252  253                          zil_chain_t *zilc = (zil_chain_t *)(lr + size) - 1;
 253  254  
 254  255                          if (bcmp(&cksum, &zilc->zc_next_blk.blk_cksum,
 255  256                              sizeof (cksum)) || BP_IS_HOLE(&zilc->zc_next_blk) ||
 256  257                              (zilc->zc_nused > (size - sizeof (*zilc)))) {
 257  258                                  error = SET_ERROR(ECKSUM);
 258  259                          } else {
 259  260                                  ASSERT3U(zilc->zc_nused, <=,
 260  261                                      SPA_OLD_MAXBLOCKSIZE);
 261  262                                  bcopy(lr, dst, zilc->zc_nused);
 262  263                                  *end = (char *)dst + zilc->zc_nused;
 263  264                                  *nbp = zilc->zc_next_blk;
 264  265                          }
 265  266                  }
 266  267  
 267  268                  arc_buf_destroy(abuf, &abuf);
 268  269          }
 269  270  
 270  271          return (error);
 271  272  }
 272  273  
 273  274  /*
 274  275   * Read a TX_WRITE log data block.
 275  276   */
 276  277  static int
 277  278  zil_read_log_data(zilog_t *zilog, const lr_write_t *lr, void *wbuf)
 278  279  {
 279  280          enum zio_flag zio_flags = ZIO_FLAG_CANFAIL;
 280  281          const blkptr_t *bp = &lr->lr_blkptr;
 281  282          arc_flags_t aflags = ARC_FLAG_WAIT;
 282  283          arc_buf_t *abuf = NULL;
 283  284          zbookmark_phys_t zb;
 284  285          int error;
 285  286  
 286  287          if (BP_IS_HOLE(bp)) {
 287  288                  if (wbuf != NULL)
 288  289                          bzero(wbuf, MAX(BP_GET_LSIZE(bp), lr->lr_length));
 289  290                  return (0);
 290  291          }
 291  292  
 292  293          if (zilog->zl_header->zh_claim_txg == 0)
 293  294                  zio_flags |= ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB;
 294  295  
 295  296          SET_BOOKMARK(&zb, dmu_objset_id(zilog->zl_os), lr->lr_foid,
 296  297              ZB_ZIL_LEVEL, lr->lr_offset / BP_GET_LSIZE(bp));
 297  298  
 298  299          error = arc_read(NULL, zilog->zl_spa, bp, arc_getbuf_func, &abuf,
 299  300              ZIO_PRIORITY_SYNC_READ, zio_flags, &aflags, &zb);
 300  301  
 301  302          if (error == 0) {
 302  303                  if (wbuf != NULL)
 303  304                          bcopy(abuf->b_data, wbuf, arc_buf_size(abuf));
 304  305                  arc_buf_destroy(abuf, &abuf);
 305  306          }
 306  307  
 307  308          return (error);
 308  309  }
 309  310  
 310  311  /*
 311  312   * Parse the intent log, and call parse_func for each valid record within.
 312  313   */
 313  314  int
 314  315  zil_parse(zilog_t *zilog, zil_parse_blk_func_t *parse_blk_func,
 315  316      zil_parse_lr_func_t *parse_lr_func, void *arg, uint64_t txg)
 316  317  {
 317  318          const zil_header_t *zh = zilog->zl_header;
 318  319          boolean_t claimed = !!zh->zh_claim_txg;
 319  320          uint64_t claim_blk_seq = claimed ? zh->zh_claim_blk_seq : UINT64_MAX;
 320  321          uint64_t claim_lr_seq = claimed ? zh->zh_claim_lr_seq : UINT64_MAX;
 321  322          uint64_t max_blk_seq = 0;
 322  323          uint64_t max_lr_seq = 0;
 323  324          uint64_t blk_count = 0;
 324  325          uint64_t lr_count = 0;
 325  326          blkptr_t blk, next_blk;
 326  327          char *lrbuf, *lrp;
 327  328          int error = 0;
 328  329  
 329  330          /*
 330  331           * Old logs didn't record the maximum zh_claim_lr_seq.
 331  332           */
 332  333          if (!(zh->zh_flags & ZIL_CLAIM_LR_SEQ_VALID))
 333  334                  claim_lr_seq = UINT64_MAX;
 334  335  
 335  336          /*
 336  337           * Starting at the block pointed to by zh_log we read the log chain.
 337  338           * For each block in the chain we strongly check that block to
 338  339           * ensure its validity.  We stop when an invalid block is found.
 339  340           * For each block pointer in the chain we call parse_blk_func().
 340  341           * For each record in each valid block we call parse_lr_func().
 341  342           * If the log has been claimed, stop if we encounter a sequence
 342  343           * number greater than the highest claimed sequence number.
 343  344           */
 344  345          lrbuf = zio_buf_alloc(SPA_OLD_MAXBLOCKSIZE);
 345  346          zil_bp_tree_init(zilog);
 346  347  
 347  348          for (blk = zh->zh_log; !BP_IS_HOLE(&blk); blk = next_blk) {
 348  349                  uint64_t blk_seq = blk.blk_cksum.zc_word[ZIL_ZC_SEQ];
 349  350                  int reclen;
 350  351                  char *end;
 351  352  
 352  353                  if (blk_seq > claim_blk_seq)
 353  354                          break;
 354  355                  if ((error = parse_blk_func(zilog, &blk, arg, txg)) != 0)
 355  356                          break;
 356  357                  ASSERT3U(max_blk_seq, <, blk_seq);
 357  358                  max_blk_seq = blk_seq;
 358  359                  blk_count++;
 359  360  
 360  361                  if (max_lr_seq == claim_lr_seq && max_blk_seq == claim_blk_seq)
 361  362                          break;
 362  363  
 363  364                  error = zil_read_log_block(zilog, &blk, &next_blk, lrbuf, &end);
 364  365                  if (error != 0)
 365  366                          break;
 366  367  
 367  368                  for (lrp = lrbuf; lrp < end; lrp += reclen) {
 368  369                          lr_t *lr = (lr_t *)lrp;
 369  370                          reclen = lr->lrc_reclen;
 370  371                          ASSERT3U(reclen, >=, sizeof (lr_t));
 371  372                          if (lr->lrc_seq > claim_lr_seq)
 372  373                                  goto done;
 373  374                          if ((error = parse_lr_func(zilog, lr, arg, txg)) != 0)
 374  375                                  goto done;
 375  376                          ASSERT3U(max_lr_seq, <, lr->lrc_seq);
 376  377                          max_lr_seq = lr->lrc_seq;
 377  378                          lr_count++;
 378  379                  }
 379  380          }
 380  381  done:
 381  382          zilog->zl_parse_error = error;
 382  383          zilog->zl_parse_blk_seq = max_blk_seq;
 383  384          zilog->zl_parse_lr_seq = max_lr_seq;
 384  385          zilog->zl_parse_blk_count = blk_count;
 385  386          zilog->zl_parse_lr_count = lr_count;
 386  387  
 387  388          ASSERT(!claimed || !(zh->zh_flags & ZIL_CLAIM_LR_SEQ_VALID) ||
 388  389              (max_blk_seq == claim_blk_seq && max_lr_seq == claim_lr_seq));
 389  390  
 390  391          zil_bp_tree_fini(zilog);
 391  392          zio_buf_free(lrbuf, SPA_OLD_MAXBLOCKSIZE);
 392  393  
 393  394          return (error);
 394  395  }
 395  396  
 396  397  static int
 397  398  zil_claim_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t first_txg)
 398  399  {
 399  400          /*
 400  401           * Claim log block if not already committed and not already claimed.
 401  402           * If tx == NULL, just verify that the block is claimable.
 402  403           */
 403  404          if (BP_IS_HOLE(bp) || bp->blk_birth < first_txg ||
 404  405              zil_bp_tree_add(zilog, bp) != 0)
 405  406                  return (0);
 406  407  
 407  408          return (zio_wait(zio_claim(NULL, zilog->zl_spa,
 408  409              tx == NULL ? 0 : first_txg, bp, spa_claim_notify, NULL,
 409  410              ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB)));
 410  411  }
 411  412  
 412  413  static int
 413  414  zil_claim_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t first_txg)
 414  415  {
 415  416          lr_write_t *lr = (lr_write_t *)lrc;
 416  417          int error;
 417  418  
 418  419          if (lrc->lrc_txtype != TX_WRITE)
 419  420                  return (0);
 420  421  
 421  422          /*
 422  423           * If the block is not readable, don't claim it.  This can happen
 423  424           * in normal operation when a log block is written to disk before
 424  425           * some of the dmu_sync() blocks it points to.  In this case, the
 425  426           * transaction cannot have been committed to anyone (we would have
 426  427           * waited for all writes to be stable first), so it is semantically
 427  428           * correct to declare this the end of the log.
 428  429           */
 429  430          if (lr->lr_blkptr.blk_birth >= first_txg &&
 430  431              (error = zil_read_log_data(zilog, lr, NULL)) != 0)
 431  432                  return (error);
 432  433          return (zil_claim_log_block(zilog, &lr->lr_blkptr, tx, first_txg));
 433  434  }
 434  435  
 435  436  /* ARGSUSED */
 436  437  static int
 437  438  zil_free_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t claim_txg)
 438  439  {
 439  440          zio_free_zil(zilog->zl_spa, dmu_tx_get_txg(tx), bp);
 440  441  
 441  442          return (0);
 442  443  }
 443  444  
 444  445  static int
 445  446  zil_free_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t claim_txg)
 446  447  {
 447  448          lr_write_t *lr = (lr_write_t *)lrc;
 448  449          blkptr_t *bp = &lr->lr_blkptr;
 449  450  
 450  451          /*
 451  452           * If we previously claimed it, we need to free it.
 452  453           */
 453  454          if (claim_txg != 0 && lrc->lrc_txtype == TX_WRITE &&
 454  455              bp->blk_birth >= claim_txg && zil_bp_tree_add(zilog, bp) == 0 &&
 455  456              !BP_IS_HOLE(bp))
 456  457                  zio_free(zilog->zl_spa, dmu_tx_get_txg(tx), bp);
 457  458  
 458  459          return (0);
 459  460  }
 460  461  
 461  462  static int
 462  463  zil_lwb_vdev_compare(const void *x1, const void *x2)
 463  464  {
 464  465          const uint64_t v1 = ((zil_vdev_node_t *)x1)->zv_vdev;
 465  466          const uint64_t v2 = ((zil_vdev_node_t *)x2)->zv_vdev;
 466  467  
 467  468          if (v1 < v2)
 468  469                  return (-1);
 469  470          if (v1 > v2)
 470  471                  return (1);
 471  472  
 472  473          return (0);
 473  474  }
 474  475  
 475  476  static lwb_t *
 476  477  zil_alloc_lwb(zilog_t *zilog, blkptr_t *bp, boolean_t slog, uint64_t txg)
 477  478  {
 478  479          lwb_t *lwb;
 479  480  
 480  481          lwb = kmem_cache_alloc(zil_lwb_cache, KM_SLEEP);
 481  482          lwb->lwb_zilog = zilog;
 482  483          lwb->lwb_blk = *bp;
 483  484          lwb->lwb_slog = slog;
 484  485          lwb->lwb_state = LWB_STATE_CLOSED;
 485  486          lwb->lwb_buf = zio_buf_alloc(BP_GET_LSIZE(bp));
 486  487          lwb->lwb_max_txg = txg;
 487  488          lwb->lwb_write_zio = NULL;
 488  489          lwb->lwb_root_zio = NULL;
 489  490          lwb->lwb_tx = NULL;
 490  491          lwb->lwb_issued_timestamp = 0;
 491  492          if (BP_GET_CHECKSUM(bp) == ZIO_CHECKSUM_ZILOG2) {
 492  493                  lwb->lwb_nused = sizeof (zil_chain_t);
 493  494                  lwb->lwb_sz = BP_GET_LSIZE(bp);
 494  495          } else {
 495  496                  lwb->lwb_nused = 0;
 496  497                  lwb->lwb_sz = BP_GET_LSIZE(bp) - sizeof (zil_chain_t);
 497  498          }
 498  499  
 499  500          mutex_enter(&zilog->zl_lock);
 500  501          list_insert_tail(&zilog->zl_lwb_list, lwb);
 501  502          mutex_exit(&zilog->zl_lock);
 502  503  
 503  504          ASSERT(!MUTEX_HELD(&lwb->lwb_vdev_lock));
 504  505          ASSERT(avl_is_empty(&lwb->lwb_vdev_tree));
 505  506          VERIFY(list_is_empty(&lwb->lwb_waiters));
 506  507  
 507  508          return (lwb);
 508  509  }
 509  510  
 510  511  static void
  
    | 
      ↓ open down ↓ | 
    406 lines elided | 
    
      ↑ open up ↑ | 
  
 511  512  zil_free_lwb(zilog_t *zilog, lwb_t *lwb)
 512  513  {
 513  514          ASSERT(MUTEX_HELD(&zilog->zl_lock));
 514  515          ASSERT(!MUTEX_HELD(&lwb->lwb_vdev_lock));
 515  516          VERIFY(list_is_empty(&lwb->lwb_waiters));
 516  517          ASSERT(avl_is_empty(&lwb->lwb_vdev_tree));
 517  518          ASSERT3P(lwb->lwb_write_zio, ==, NULL);
 518  519          ASSERT3P(lwb->lwb_root_zio, ==, NULL);
 519  520          ASSERT3U(lwb->lwb_max_txg, <=, spa_syncing_txg(zilog->zl_spa));
 520  521          ASSERT(lwb->lwb_state == LWB_STATE_CLOSED ||
 521      -            lwb->lwb_state == LWB_STATE_DONE);
      522 +            lwb->lwb_state == LWB_STATE_FLUSH_DONE);
 522  523  
 523  524          /*
 524  525           * Clear the zilog's field to indicate this lwb is no longer
 525  526           * valid, and prevent use-after-free errors.
 526  527           */
 527  528          if (zilog->zl_last_lwb_opened == lwb)
 528  529                  zilog->zl_last_lwb_opened = NULL;
 529  530  
 530  531          kmem_cache_free(zil_lwb_cache, lwb);
 531  532  }
 532  533  
 533  534  /*
 534  535   * Called when we create in-memory log transactions so that we know
 535  536   * to cleanup the itxs at the end of spa_sync().
 536  537   */
 537  538  void
 538  539  zilog_dirty(zilog_t *zilog, uint64_t txg)
 539  540  {
 540  541          dsl_pool_t *dp = zilog->zl_dmu_pool;
 541  542          dsl_dataset_t *ds = dmu_objset_ds(zilog->zl_os);
 542  543  
 543  544          ASSERT(spa_writeable(zilog->zl_spa));
 544  545  
 545  546          if (ds->ds_is_snapshot)
 546  547                  panic("dirtying snapshot!");
 547  548  
 548  549          if (txg_list_add(&dp->dp_dirty_zilogs, zilog, txg)) {
 549  550                  /* up the hold count until we can be written out */
 550  551                  dmu_buf_add_ref(ds->ds_dbuf, zilog);
 551  552  
 552  553                  zilog->zl_dirty_max_txg = MAX(txg, zilog->zl_dirty_max_txg);
 553  554          }
 554  555  }
 555  556  
 556  557  /*
 557  558   * Determine if the zil is dirty in the specified txg. Callers wanting to
 558  559   * ensure that the dirty state does not change must hold the itxg_lock for
 559  560   * the specified txg. Holding the lock will ensure that the zil cannot be
 560  561   * dirtied (zil_itx_assign) or cleaned (zil_clean) while we check its current
 561  562   * state.
 562  563   */
 563  564  boolean_t
 564  565  zilog_is_dirty_in_txg(zilog_t *zilog, uint64_t txg)
 565  566  {
 566  567          dsl_pool_t *dp = zilog->zl_dmu_pool;
 567  568  
 568  569          if (txg_list_member(&dp->dp_dirty_zilogs, zilog, txg & TXG_MASK))
 569  570                  return (B_TRUE);
 570  571          return (B_FALSE);
 571  572  }
 572  573  
 573  574  /*
 574  575   * Determine if the zil is dirty. The zil is considered dirty if it has
 575  576   * any pending itx records that have not been cleaned by zil_clean().
 576  577   */
 577  578  boolean_t
 578  579  zilog_is_dirty(zilog_t *zilog)
 579  580  {
 580  581          dsl_pool_t *dp = zilog->zl_dmu_pool;
 581  582  
 582  583          for (int t = 0; t < TXG_SIZE; t++) {
 583  584                  if (txg_list_member(&dp->dp_dirty_zilogs, zilog, t))
 584  585                          return (B_TRUE);
 585  586          }
 586  587          return (B_FALSE);
 587  588  }
 588  589  
 589  590  /*
 590  591   * Create an on-disk intent log.
 591  592   */
 592  593  static lwb_t *
 593  594  zil_create(zilog_t *zilog)
 594  595  {
 595  596          const zil_header_t *zh = zilog->zl_header;
 596  597          lwb_t *lwb = NULL;
 597  598          uint64_t txg = 0;
 598  599          dmu_tx_t *tx = NULL;
 599  600          blkptr_t blk;
 600  601          int error = 0;
 601  602          boolean_t slog = FALSE;
 602  603  
 603  604          /*
 604  605           * Wait for any previous destroy to complete.
 605  606           */
 606  607          txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
 607  608  
 608  609          ASSERT(zh->zh_claim_txg == 0);
 609  610          ASSERT(zh->zh_replay_seq == 0);
 610  611  
 611  612          blk = zh->zh_log;
 612  613  
 613  614          /*
 614  615           * Allocate an initial log block if:
 615  616           *    - there isn't one already
 616  617           *    - the existing block is the wrong endianess
 617  618           */
 618  619          if (BP_IS_HOLE(&blk) || BP_SHOULD_BYTESWAP(&blk)) {
 619  620                  tx = dmu_tx_create(zilog->zl_os);
 620  621                  VERIFY0(dmu_tx_assign(tx, TXG_WAIT));
 621  622                  dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
 622  623                  txg = dmu_tx_get_txg(tx);
 623  624  
 624  625                  if (!BP_IS_HOLE(&blk)) {
 625  626                          zio_free_zil(zilog->zl_spa, txg, &blk);
 626  627                          BP_ZERO(&blk);
 627  628                  }
 628  629  
 629  630                  error = zio_alloc_zil(zilog->zl_spa, txg, &blk, NULL,
 630  631                      ZIL_MIN_BLKSZ, &slog);
 631  632  
 632  633                  if (error == 0)
 633  634                          zil_init_log_chain(zilog, &blk);
 634  635          }
 635  636  
 636  637          /*
 637  638           * Allocate a log write block (lwb) for the first log block.
 638  639           */
 639  640          if (error == 0)
 640  641                  lwb = zil_alloc_lwb(zilog, &blk, slog, txg);
 641  642  
 642  643          /*
 643  644           * If we just allocated the first log block, commit our transaction
 644  645           * and wait for zil_sync() to stuff the block poiner into zh_log.
 645  646           * (zh is part of the MOS, so we cannot modify it in open context.)
 646  647           */
 647  648          if (tx != NULL) {
 648  649                  dmu_tx_commit(tx);
 649  650                  txg_wait_synced(zilog->zl_dmu_pool, txg);
 650  651          }
 651  652  
 652  653          ASSERT(bcmp(&blk, &zh->zh_log, sizeof (blk)) == 0);
 653  654  
 654  655          return (lwb);
 655  656  }
 656  657  
 657  658  /*
 658  659   * In one tx, free all log blocks and clear the log header. If keep_first
 659  660   * is set, then we're replaying a log with no content. We want to keep the
 660  661   * first block, however, so that the first synchronous transaction doesn't
 661  662   * require a txg_wait_synced() in zil_create(). We don't need to
 662  663   * txg_wait_synced() here either when keep_first is set, because both
 663  664   * zil_create() and zil_destroy() will wait for any in-progress destroys
 664  665   * to complete.
 665  666   */
 666  667  void
 667  668  zil_destroy(zilog_t *zilog, boolean_t keep_first)
 668  669  {
 669  670          const zil_header_t *zh = zilog->zl_header;
 670  671          lwb_t *lwb;
 671  672          dmu_tx_t *tx;
 672  673          uint64_t txg;
 673  674  
 674  675          /*
 675  676           * Wait for any previous destroy to complete.
 676  677           */
 677  678          txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
 678  679  
 679  680          zilog->zl_old_header = *zh;             /* debugging aid */
 680  681  
 681  682          if (BP_IS_HOLE(&zh->zh_log))
 682  683                  return;
 683  684  
 684  685          tx = dmu_tx_create(zilog->zl_os);
 685  686          VERIFY0(dmu_tx_assign(tx, TXG_WAIT));
 686  687          dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
 687  688          txg = dmu_tx_get_txg(tx);
 688  689  
 689  690          mutex_enter(&zilog->zl_lock);
 690  691  
 691  692          ASSERT3U(zilog->zl_destroy_txg, <, txg);
 692  693          zilog->zl_destroy_txg = txg;
 693  694          zilog->zl_keep_first = keep_first;
 694  695  
 695  696          if (!list_is_empty(&zilog->zl_lwb_list)) {
 696  697                  ASSERT(zh->zh_claim_txg == 0);
 697  698                  VERIFY(!keep_first);
 698  699                  while ((lwb = list_head(&zilog->zl_lwb_list)) != NULL) {
 699  700                          list_remove(&zilog->zl_lwb_list, lwb);
 700  701                          if (lwb->lwb_buf != NULL)
 701  702                                  zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
 702  703                          zio_free(zilog->zl_spa, txg, &lwb->lwb_blk);
 703  704                          zil_free_lwb(zilog, lwb);
 704  705                  }
 705  706          } else if (!keep_first) {
 706  707                  zil_destroy_sync(zilog, tx);
 707  708          }
 708  709          mutex_exit(&zilog->zl_lock);
 709  710  
 710  711          dmu_tx_commit(tx);
 711  712  }
 712  713  
 713  714  void
 714  715  zil_destroy_sync(zilog_t *zilog, dmu_tx_t *tx)
 715  716  {
 716  717          ASSERT(list_is_empty(&zilog->zl_lwb_list));
 717  718          (void) zil_parse(zilog, zil_free_log_block,
 718  719              zil_free_log_record, tx, zilog->zl_header->zh_claim_txg);
 719  720  }
 720  721  
 721  722  int
 722  723  zil_claim(dsl_pool_t *dp, dsl_dataset_t *ds, void *txarg)
 723  724  {
 724  725          dmu_tx_t *tx = txarg;
 725  726          uint64_t first_txg = dmu_tx_get_txg(tx);
 726  727          zilog_t *zilog;
 727  728          zil_header_t *zh;
 728  729          objset_t *os;
 729  730          int error;
 730  731  
 731  732          error = dmu_objset_own_obj(dp, ds->ds_object,
 732  733              DMU_OST_ANY, B_FALSE, FTAG, &os);
 733  734          if (error != 0) {
 734  735                  /*
 735  736                   * EBUSY indicates that the objset is inconsistent, in which
 736  737                   * case it can not have a ZIL.
 737  738                   */
 738  739                  if (error != EBUSY) {
 739  740                          cmn_err(CE_WARN, "can't open objset for %llu, error %u",
 740  741                              (unsigned long long)ds->ds_object, error);
 741  742                  }
 742  743                  return (0);
 743  744          }
 744  745  
 745  746          zilog = dmu_objset_zil(os);
 746  747          zh = zil_header_in_syncing_context(zilog);
 747  748  
 748  749          if (spa_get_log_state(zilog->zl_spa) == SPA_LOG_CLEAR) {
 749  750                  if (!BP_IS_HOLE(&zh->zh_log))
 750  751                          zio_free_zil(zilog->zl_spa, first_txg, &zh->zh_log);
 751  752                  BP_ZERO(&zh->zh_log);
 752  753                  dsl_dataset_dirty(dmu_objset_ds(os), tx);
 753  754                  dmu_objset_disown(os, FTAG);
 754  755                  return (0);
 755  756          }
 756  757  
 757  758          /*
 758  759           * Claim all log blocks if we haven't already done so, and remember
 759  760           * the highest claimed sequence number.  This ensures that if we can
 760  761           * read only part of the log now (e.g. due to a missing device),
 761  762           * but we can read the entire log later, we will not try to replay
 762  763           * or destroy beyond the last block we successfully claimed.
 763  764           */
 764  765          ASSERT3U(zh->zh_claim_txg, <=, first_txg);
 765  766          if (zh->zh_claim_txg == 0 && !BP_IS_HOLE(&zh->zh_log)) {
 766  767                  (void) zil_parse(zilog, zil_claim_log_block,
 767  768                      zil_claim_log_record, tx, first_txg);
 768  769                  zh->zh_claim_txg = first_txg;
 769  770                  zh->zh_claim_blk_seq = zilog->zl_parse_blk_seq;
 770  771                  zh->zh_claim_lr_seq = zilog->zl_parse_lr_seq;
 771  772                  if (zilog->zl_parse_lr_count || zilog->zl_parse_blk_count > 1)
 772  773                          zh->zh_flags |= ZIL_REPLAY_NEEDED;
 773  774                  zh->zh_flags |= ZIL_CLAIM_LR_SEQ_VALID;
 774  775                  dsl_dataset_dirty(dmu_objset_ds(os), tx);
 775  776          }
 776  777  
 777  778          ASSERT3U(first_txg, ==, (spa_last_synced_txg(zilog->zl_spa) + 1));
 778  779          dmu_objset_disown(os, FTAG);
 779  780          return (0);
 780  781  }
 781  782  
 782  783  /*
 783  784   * Check the log by walking the log chain.
 784  785   * Checksum errors are ok as they indicate the end of the chain.
 785  786   * Any other error (no device or read failure) returns an error.
 786  787   */
 787  788  /* ARGSUSED */
 788  789  int
 789  790  zil_check_log_chain(dsl_pool_t *dp, dsl_dataset_t *ds, void *tx)
 790  791  {
 791  792          zilog_t *zilog;
 792  793          objset_t *os;
 793  794          blkptr_t *bp;
 794  795          int error;
 795  796  
 796  797          ASSERT(tx == NULL);
 797  798  
 798  799          error = dmu_objset_from_ds(ds, &os);
 799  800          if (error != 0) {
 800  801                  cmn_err(CE_WARN, "can't open objset %llu, error %d",
 801  802                      (unsigned long long)ds->ds_object, error);
 802  803                  return (0);
 803  804          }
 804  805  
 805  806          zilog = dmu_objset_zil(os);
 806  807          bp = (blkptr_t *)&zilog->zl_header->zh_log;
 807  808  
 808  809          /*
 809  810           * Check the first block and determine if it's on a log device
 810  811           * which may have been removed or faulted prior to loading this
 811  812           * pool.  If so, there's no point in checking the rest of the log
 812  813           * as its content should have already been synced to the pool.
 813  814           */
 814  815          if (!BP_IS_HOLE(bp)) {
 815  816                  vdev_t *vd;
 816  817                  boolean_t valid = B_TRUE;
 817  818  
 818  819                  spa_config_enter(os->os_spa, SCL_STATE, FTAG, RW_READER);
 819  820                  vd = vdev_lookup_top(os->os_spa, DVA_GET_VDEV(&bp->blk_dva[0]));
 820  821                  if (vd->vdev_islog && vdev_is_dead(vd))
 821  822                          valid = vdev_log_state_valid(vd);
 822  823                  spa_config_exit(os->os_spa, SCL_STATE, FTAG);
 823  824  
 824  825                  if (!valid)
 825  826                          return (0);
 826  827          }
 827  828  
 828  829          /*
 829  830           * Because tx == NULL, zil_claim_log_block() will not actually claim
 830  831           * any blocks, but just determine whether it is possible to do so.
 831  832           * In addition to checking the log chain, zil_claim_log_block()
 832  833           * will invoke zio_claim() with a done func of spa_claim_notify(),
 833  834           * which will update spa_max_claim_txg.  See spa_load() for details.
 834  835           */
 835  836          error = zil_parse(zilog, zil_claim_log_block, zil_claim_log_record, tx,
 836  837              zilog->zl_header->zh_claim_txg ? -1ULL : spa_first_txg(os->os_spa));
 837  838  
 838  839          return ((error == ECKSUM || error == ENOENT) ? 0 : error);
 839  840  }
 840  841  
 841  842  /*
 842  843   * When an itx is "skipped", this function is used to properly mark the
 843  844   * waiter as "done, and signal any thread(s) waiting on it. An itx can
 844  845   * be skipped (and not committed to an lwb) for a variety of reasons,
 845  846   * one of them being that the itx was committed via spa_sync(), prior to
 846  847   * it being committed to an lwb; this can happen if a thread calling
 847  848   * zil_commit() is racing with spa_sync().
 848  849   */
 849  850  static void
 850  851  zil_commit_waiter_skip(zil_commit_waiter_t *zcw)
 851  852  {
 852  853          mutex_enter(&zcw->zcw_lock);
 853  854          ASSERT3B(zcw->zcw_done, ==, B_FALSE);
 854  855          zcw->zcw_done = B_TRUE;
 855  856          cv_broadcast(&zcw->zcw_cv);
 856  857          mutex_exit(&zcw->zcw_lock);
 857  858  }
 858  859  
 859  860  /*
 860  861   * This function is used when the given waiter is to be linked into an
 861  862   * lwb's "lwb_waiter" list; i.e. when the itx is committed to the lwb.
 862  863   * At this point, the waiter will no longer be referenced by the itx,
 863  864   * and instead, will be referenced by the lwb.
 864  865   */
 865  866  static void
 866  867  zil_commit_waiter_link_lwb(zil_commit_waiter_t *zcw, lwb_t *lwb)
 867  868  {
 868  869          /*
  
    | 
      ↓ open down ↓ | 
    337 lines elided | 
    
      ↑ open up ↑ | 
  
 869  870           * The lwb_waiters field of the lwb is protected by the zilog's
 870  871           * zl_lock, thus it must be held when calling this function.
 871  872           */
 872  873          ASSERT(MUTEX_HELD(&lwb->lwb_zilog->zl_lock));
 873  874  
 874  875          mutex_enter(&zcw->zcw_lock);
 875  876          ASSERT(!list_link_active(&zcw->zcw_node));
 876  877          ASSERT3P(zcw->zcw_lwb, ==, NULL);
 877  878          ASSERT3P(lwb, !=, NULL);
 878  879          ASSERT(lwb->lwb_state == LWB_STATE_OPENED ||
 879      -            lwb->lwb_state == LWB_STATE_ISSUED);
      880 +            lwb->lwb_state == LWB_STATE_ISSUED ||
      881 +            lwb->lwb_state == LWB_STATE_WRITE_DONE);
 880  882  
 881  883          list_insert_tail(&lwb->lwb_waiters, zcw);
 882  884          zcw->zcw_lwb = lwb;
 883  885          mutex_exit(&zcw->zcw_lock);
 884  886  }
 885  887  
 886  888  /*
 887  889   * This function is used when zio_alloc_zil() fails to allocate a ZIL
 888  890   * block, and the given waiter must be linked to the "nolwb waiters"
 889  891   * list inside of zil_process_commit_list().
 890  892   */
 891  893  static void
 892  894  zil_commit_waiter_link_nolwb(zil_commit_waiter_t *zcw, list_t *nolwb)
 893  895  {
 894  896          mutex_enter(&zcw->zcw_lock);
 895  897          ASSERT(!list_link_active(&zcw->zcw_node));
 896  898          ASSERT3P(zcw->zcw_lwb, ==, NULL);
 897  899          list_insert_tail(nolwb, zcw);
 898  900          mutex_exit(&zcw->zcw_lock);
 899  901  }
 900  902  
 901  903  void
 902  904  zil_lwb_add_block(lwb_t *lwb, const blkptr_t *bp)
 903  905  {
 904  906          avl_tree_t *t = &lwb->lwb_vdev_tree;
 905  907          avl_index_t where;
 906  908          zil_vdev_node_t *zv, zvsearch;
 907  909          int ndvas = BP_GET_NDVAS(bp);
 908  910          int i;
 909  911  
 910  912          if (zfs_nocacheflush)
 911  913                  return;
 912  914  
 913  915          mutex_enter(&lwb->lwb_vdev_lock);
 914  916          for (i = 0; i < ndvas; i++) {
  
    | 
      ↓ open down ↓ | 
    25 lines elided | 
    
      ↑ open up ↑ | 
  
 915  917                  zvsearch.zv_vdev = DVA_GET_VDEV(&bp->blk_dva[i]);
 916  918                  if (avl_find(t, &zvsearch, &where) == NULL) {
 917  919                          zv = kmem_alloc(sizeof (*zv), KM_SLEEP);
 918  920                          zv->zv_vdev = zvsearch.zv_vdev;
 919  921                          avl_insert(t, zv, where);
 920  922                  }
 921  923          }
 922  924          mutex_exit(&lwb->lwb_vdev_lock);
 923  925  }
 924  926  
      927 +static void
      928 +zil_lwb_flush_defer(lwb_t *lwb, lwb_t *nlwb)
      929 +{
      930 +        avl_tree_t *src = &lwb->lwb_vdev_tree;
      931 +        avl_tree_t *dst = &nlwb->lwb_vdev_tree;
      932 +        void *cookie = NULL;
      933 +        zil_vdev_node_t *zv;
      934 +
      935 +        ASSERT3S(lwb->lwb_state, ==, LWB_STATE_WRITE_DONE);
      936 +        ASSERT3S(nlwb->lwb_state, !=, LWB_STATE_WRITE_DONE);
      937 +        ASSERT3S(nlwb->lwb_state, !=, LWB_STATE_FLUSH_DONE);
      938 +
      939 +        /*
      940 +         * While 'lwb' is at a point in its lifetime where lwb_vdev_tree does
      941 +         * not need the protection of lwb_vdev_lock (it will only be modified
      942 +         * while holding zilog->zl_lock) as its writes and those of its
      943 +         * children have all completed.  The younger 'nlwb' may be waiting on
      944 +         * future writes to additional vdevs.
      945 +         */
      946 +        mutex_enter(&nlwb->lwb_vdev_lock);
      947 +        /*
      948 +         * Tear down the 'lwb' vdev tree, ensuring that entries which do not
      949 +         * exist in 'nlwb' are moved to it, freeing any would-be duplicates.
      950 +         */
      951 +        while ((zv = avl_destroy_nodes(src, &cookie)) != NULL) {
      952 +                avl_index_t where;
      953 +
      954 +                if (avl_find(dst, zv, &where) == NULL) {
      955 +                        avl_insert(dst, zv, where);
      956 +                } else {
      957 +                        kmem_free(zv, sizeof (*zv));
      958 +                }
      959 +        }
      960 +        mutex_exit(&nlwb->lwb_vdev_lock);
      961 +}
      962 +
 925  963  void
 926  964  zil_lwb_add_txg(lwb_t *lwb, uint64_t txg)
 927  965  {
 928  966          lwb->lwb_max_txg = MAX(lwb->lwb_max_txg, txg);
 929  967  }
 930  968  
 931  969  /*
 932      - * This function is a called after all VDEVs associated with a given lwb
      970 + * This function is a called after all vdevs associated with a given lwb
 933  971   * write have completed their DKIOCFLUSHWRITECACHE command; or as soon
 934      - * as the lwb write completes, if "zfs_nocacheflush" is set.
      972 + * as the lwb write completes, if "zil_nocacheflush" is set. Further,
      973 + * all "previous" lwb's will have completed before this function is
      974 + * called; i.e. this function is called for all previous lwbs before
      975 + * it's called for "this" lwb (enforced via zio the dependencies
      976 + * configured in zil_lwb_set_zio_dependency()).
 935  977   *
 936  978   * The intention is for this function to be called as soon as the
 937  979   * contents of an lwb are considered "stable" on disk, and will survive
 938  980   * any sudden loss of power. At this point, any threads waiting for the
 939  981   * lwb to reach this state are signalled, and the "waiter" structures
 940  982   * are marked "done".
 941  983   */
 942  984  static void
 943  985  zil_lwb_flush_vdevs_done(zio_t *zio)
 944  986  {
 945  987          lwb_t *lwb = zio->io_private;
 946  988          zilog_t *zilog = lwb->lwb_zilog;
 947  989          dmu_tx_t *tx = lwb->lwb_tx;
 948  990          zil_commit_waiter_t *zcw;
 949  991  
 950  992          spa_config_exit(zilog->zl_spa, SCL_STATE, lwb);
 951  993  
 952  994          zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
 953  995  
 954  996          mutex_enter(&zilog->zl_lock);
 955  997  
 956  998          /*
 957  999           * Ensure the lwb buffer pointer is cleared before releasing the
 958 1000           * txg. If we have had an allocation failure and the txg is
 959 1001           * waiting to sync then we want zil_sync() to remove the lwb so
 960 1002           * that it's not picked up as the next new one in
  
    | 
      ↓ open down ↓ | 
    16 lines elided | 
    
      ↑ open up ↑ | 
  
 961 1003           * zil_process_commit_list(). zil_sync() will only remove the
 962 1004           * lwb if lwb_buf is null.
 963 1005           */
 964 1006          lwb->lwb_buf = NULL;
 965 1007          lwb->lwb_tx = NULL;
 966 1008  
 967 1009          ASSERT3U(lwb->lwb_issued_timestamp, >, 0);
 968 1010          zilog->zl_last_lwb_latency = gethrtime() - lwb->lwb_issued_timestamp;
 969 1011  
 970 1012          lwb->lwb_root_zio = NULL;
 971      -        lwb->lwb_state = LWB_STATE_DONE;
 972 1013  
     1014 +        ASSERT3S(lwb->lwb_state, ==, LWB_STATE_WRITE_DONE);
     1015 +        lwb->lwb_state = LWB_STATE_FLUSH_DONE;
     1016 +
 973 1017          if (zilog->zl_last_lwb_opened == lwb) {
 974 1018                  /*
 975 1019                   * Remember the highest committed log sequence number
 976 1020                   * for ztest. We only update this value when all the log
 977 1021                   * writes succeeded, because ztest wants to ASSERT that
 978 1022                   * it got the whole log chain.
 979 1023                   */
 980 1024                  zilog->zl_commit_lr_seq = zilog->zl_lr_seq;
 981 1025          }
 982 1026  
 983 1027          while ((zcw = list_head(&lwb->lwb_waiters)) != NULL) {
 984 1028                  mutex_enter(&zcw->zcw_lock);
 985 1029  
 986 1030                  ASSERT(list_link_active(&zcw->zcw_node));
 987 1031                  list_remove(&lwb->lwb_waiters, zcw);
 988 1032  
 989 1033                  ASSERT3P(zcw->zcw_lwb, ==, lwb);
 990 1034                  zcw->zcw_lwb = NULL;
 991 1035  
 992 1036                  zcw->zcw_zio_error = zio->io_error;
 993 1037  
 994 1038                  ASSERT3B(zcw->zcw_done, ==, B_FALSE);
 995 1039                  zcw->zcw_done = B_TRUE;
 996 1040                  cv_broadcast(&zcw->zcw_cv);
 997 1041  
 998 1042                  mutex_exit(&zcw->zcw_lock);
 999 1043          }
1000 1044  
1001 1045          mutex_exit(&zilog->zl_lock);
  
    | 
      ↓ open down ↓ | 
    19 lines elided | 
    
      ↑ open up ↑ | 
  
1002 1046  
1003 1047          /*
1004 1048           * Now that we've written this log block, we have a stable pointer
1005 1049           * to the next block in the chain, so it's OK to let the txg in
1006 1050           * which we allocated the next block sync.
1007 1051           */
1008 1052          dmu_tx_commit(tx);
1009 1053  }
1010 1054  
1011 1055  /*
1012      - * This is called when an lwb write completes. This means, this specific
1013      - * lwb was written to disk, and all dependent lwb have also been
1014      - * written to disk.
1015      - *
1016      - * At this point, a DKIOCFLUSHWRITECACHE command hasn't been issued to
1017      - * the VDEVs involved in writing out this specific lwb. The lwb will be
1018      - * "done" once zil_lwb_flush_vdevs_done() is called, which occurs in the
1019      - * zio completion callback for the lwb's root zio.
     1056 + * This is called when an lwb's write zio completes. The callback's
     1057 + * purpose is to issue the DKIOCFLUSHWRITECACHE commands for the vdevs
     1058 + * in the lwb's lwb_vdev_tree. The tree will contain the vdevs involved
     1059 + * in writing out this specific lwb's data, and in the case that cache
     1060 + * flushes have been deferred, vdevs involved in writing the data for
     1061 + * previous lwbs. The writes corresponding to all the vdevs in the
     1062 + * lwb_vdev_tree will have completed by the time this is called, due to
     1063 + * the zio dependencies configured in zil_lwb_set_zio_dependency(),
     1064 + * which takes deferred flushes into account. The lwb will be "done"
     1065 + * once zil_lwb_flush_vdevs_done() is called, which occurs in the zio
     1066 + * completion callback for the lwb's root zio.
1020 1067   */
1021 1068  static void
1022 1069  zil_lwb_write_done(zio_t *zio)
1023 1070  {
1024 1071          lwb_t *lwb = zio->io_private;
1025 1072          spa_t *spa = zio->io_spa;
1026 1073          zilog_t *zilog = lwb->lwb_zilog;
1027 1074          avl_tree_t *t = &lwb->lwb_vdev_tree;
1028 1075          void *cookie = NULL;
1029 1076          zil_vdev_node_t *zv;
     1077 +        lwb_t *nlwb;
1030 1078  
1031 1079          ASSERT3S(spa_config_held(spa, SCL_STATE, RW_READER), !=, 0);
1032 1080  
1033 1081          ASSERT(BP_GET_COMPRESS(zio->io_bp) == ZIO_COMPRESS_OFF);
1034 1082          ASSERT(BP_GET_TYPE(zio->io_bp) == DMU_OT_INTENT_LOG);
1035 1083          ASSERT(BP_GET_LEVEL(zio->io_bp) == 0);
1036 1084          ASSERT(BP_GET_BYTEORDER(zio->io_bp) == ZFS_HOST_BYTEORDER);
1037 1085          ASSERT(!BP_IS_GANG(zio->io_bp));
1038 1086          ASSERT(!BP_IS_HOLE(zio->io_bp));
1039 1087          ASSERT(BP_GET_FILL(zio->io_bp) == 0);
1040 1088  
1041 1089          abd_put(zio->io_abd);
1042 1090  
1043      -        ASSERT3S(lwb->lwb_state, ==, LWB_STATE_ISSUED);
1044      -
1045 1091          mutex_enter(&zilog->zl_lock);
     1092 +        ASSERT3S(lwb->lwb_state, ==, LWB_STATE_ISSUED);
     1093 +        lwb->lwb_state = LWB_STATE_WRITE_DONE;
1046 1094          lwb->lwb_write_zio = NULL;
     1095 +        nlwb = list_next(&zilog->zl_lwb_list, lwb);
1047 1096          mutex_exit(&zilog->zl_lock);
1048 1097  
1049 1098          if (avl_numnodes(t) == 0)
1050 1099                  return;
1051 1100  
1052 1101          /*
1053 1102           * If there was an IO error, we're not going to call zio_flush()
1054 1103           * on these vdevs, so we simply empty the tree and free the
1055 1104           * nodes. We avoid calling zio_flush() since there isn't any
1056 1105           * good reason for doing so, after the lwb block failed to be
1057 1106           * written out.
1058 1107           */
1059 1108          if (zio->io_error != 0) {
1060 1109                  while ((zv = avl_destroy_nodes(t, &cookie)) != NULL)
1061 1110                          kmem_free(zv, sizeof (*zv));
1062 1111                  return;
1063 1112          }
1064 1113  
     1114 +        /*
     1115 +         * If this lwb does not have any threads waiting for it to
     1116 +         * complete, we want to defer issuing the DKIOCFLUSHWRITECACHE
     1117 +         * command to the vdevs written to by "this" lwb, and instead
     1118 +         * rely on the "next" lwb to handle the DKIOCFLUSHWRITECACHE
     1119 +         * command for those vdevs. Thus, we merge the vdev tree of
     1120 +         * "this" lwb with the vdev tree of the "next" lwb in the list,
     1121 +         * and assume the "next" lwb will handle flushing the vdevs (or
     1122 +         * deferring the flush(s) again).
     1123 +         *
     1124 +         * This is a useful performance optimization, especially for
     1125 +         * workloads with lots of async write activity and few sync
     1126 +         * write and/or fsync activity, as it has the potential to
     1127 +         * coalesce multiple flush commands to a vdev into one.
     1128 +         */
     1129 +        if (list_head(&lwb->lwb_waiters) == NULL && nlwb != NULL) {
     1130 +                zil_lwb_flush_defer(lwb, nlwb);
     1131 +                ASSERT(avl_is_empty(&lwb->lwb_vdev_tree));
     1132 +                return;
     1133 +        }
     1134 +
1065 1135          while ((zv = avl_destroy_nodes(t, &cookie)) != NULL) {
1066 1136                  vdev_t *vd = vdev_lookup_top(spa, zv->zv_vdev);
1067 1137                  if (vd != NULL)
1068 1138                          zio_flush(lwb->lwb_root_zio, vd);
1069 1139                  kmem_free(zv, sizeof (*zv));
1070 1140          }
1071 1141  }
1072 1142  
     1143 +static void
     1144 +zil_lwb_set_zio_dependency(zilog_t *zilog, lwb_t *lwb)
     1145 +{
     1146 +        lwb_t *last_lwb_opened = zilog->zl_last_lwb_opened;
     1147 +
     1148 +        ASSERT(MUTEX_HELD(&zilog->zl_issuer_lock));
     1149 +        ASSERT(MUTEX_HELD(&zilog->zl_lock));
     1150 +
     1151 +        /*
     1152 +         * The zilog's "zl_last_lwb_opened" field is used to build the
     1153 +         * lwb/zio dependency chain, which is used to preserve the
     1154 +         * ordering of lwb completions that is required by the semantics
     1155 +         * of the ZIL. Each new lwb zio becomes a parent of the
     1156 +         * "previous" lwb zio, such that the new lwb's zio cannot
     1157 +         * complete until the "previous" lwb's zio completes.
     1158 +         *
     1159 +         * This is required by the semantics of zil_commit(); the commit
     1160 +         * waiters attached to the lwbs will be woken in the lwb zio's
     1161 +         * completion callback, so this zio dependency graph ensures the
     1162 +         * waiters are woken in the correct order (the same order the
     1163 +         * lwbs were created).
     1164 +         */
     1165 +        if (last_lwb_opened != NULL &&
     1166 +            last_lwb_opened->lwb_state != LWB_STATE_FLUSH_DONE) {
     1167 +                ASSERT(last_lwb_opened->lwb_state == LWB_STATE_OPENED ||
     1168 +                    last_lwb_opened->lwb_state == LWB_STATE_ISSUED ||
     1169 +                    last_lwb_opened->lwb_state == LWB_STATE_WRITE_DONE);
     1170 +
     1171 +                ASSERT3P(last_lwb_opened->lwb_root_zio, !=, NULL);
     1172 +                zio_add_child(lwb->lwb_root_zio,
     1173 +                    last_lwb_opened->lwb_root_zio);
     1174 +
     1175 +                /*
     1176 +                 * If the previous lwb's write hasn't already completed,
     1177 +                 * we also want to order the completion of the lwb write
     1178 +                 * zios (above, we only order the completion of the lwb
     1179 +                 * root zios). This is required because of how we can
     1180 +                 * defer the DKIOCFLUSHWRITECACHE commands for each lwb.
     1181 +                 *
     1182 +                 * When the DKIOCFLUSHWRITECACHE commands are defered,
     1183 +                 * the previous lwb will rely on this lwb to flush the
     1184 +                 * vdevs written to by that previous lwb. Thus, we need
     1185 +                 * to ensure this lwb doesn't issue the flush until
     1186 +                 * after the previous lwb's write completes. We ensure
     1187 +                 * this ordering by setting the zio parent/child
     1188 +                 * relationship here.
     1189 +                 *
     1190 +                 * Without this relationship on the lwb's write zio,
     1191 +                 * it's possible for this lwb's write to complete prior
     1192 +                 * to the previous lwb's write completing; and thus, the
     1193 +                 * vdevs for the previous lwb would be flushed prior to
     1194 +                 * that lwb's data being written to those vdevs (the
     1195 +                 * vdevs are flushed in the lwb write zio's completion
     1196 +                 * handler, zil_lwb_write_done()).
     1197 +                 */
     1198 +                if (last_lwb_opened->lwb_state != LWB_STATE_WRITE_DONE) {
     1199 +                        ASSERT(last_lwb_opened->lwb_state == LWB_STATE_OPENED ||
     1200 +                            last_lwb_opened->lwb_state == LWB_STATE_ISSUED);
     1201 +
     1202 +                        ASSERT3P(last_lwb_opened->lwb_write_zio, !=, NULL);
     1203 +                        zio_add_child(lwb->lwb_write_zio,
     1204 +                            last_lwb_opened->lwb_write_zio);
     1205 +                }
     1206 +        }
     1207 +}
     1208 +
     1209 +
1073 1210  /*
1074 1211   * This function's purpose is to "open" an lwb such that it is ready to
1075 1212   * accept new itxs being committed to it. To do this, the lwb's zio
1076 1213   * structures are created, and linked to the lwb. This function is
1077 1214   * idempotent; if the passed in lwb has already been opened, this
1078 1215   * function is essentially a no-op.
1079 1216   */
1080 1217  static void
1081 1218  zil_lwb_write_open(zilog_t *zilog, lwb_t *lwb)
1082 1219  {
1083 1220          zbookmark_phys_t zb;
1084 1221          zio_priority_t prio;
1085 1222  
1086 1223          ASSERT(MUTEX_HELD(&zilog->zl_issuer_lock));
1087 1224          ASSERT3P(lwb, !=, NULL);
1088 1225          EQUIV(lwb->lwb_root_zio == NULL, lwb->lwb_state == LWB_STATE_CLOSED);
1089 1226          EQUIV(lwb->lwb_root_zio != NULL, lwb->lwb_state == LWB_STATE_OPENED);
1090 1227  
1091 1228          SET_BOOKMARK(&zb, lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_OBJSET],
1092 1229              ZB_ZIL_OBJECT, ZB_ZIL_LEVEL,
1093 1230              lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_SEQ]);
1094 1231  
1095 1232          if (lwb->lwb_root_zio == NULL) {
1096 1233                  abd_t *lwb_abd = abd_get_from_buf(lwb->lwb_buf,
1097 1234                      BP_GET_LSIZE(&lwb->lwb_blk));
1098 1235  
1099 1236                  if (!lwb->lwb_slog || zilog->zl_cur_used <= zil_slog_bulk)
1100 1237                          prio = ZIO_PRIORITY_SYNC_WRITE;
1101 1238                  else
1102 1239                          prio = ZIO_PRIORITY_ASYNC_WRITE;
1103 1240  
1104 1241                  lwb->lwb_root_zio = zio_root(zilog->zl_spa,
1105 1242                      zil_lwb_flush_vdevs_done, lwb, ZIO_FLAG_CANFAIL);
1106 1243                  ASSERT3P(lwb->lwb_root_zio, !=, NULL);
  
    | 
      ↓ open down ↓ | 
    24 lines elided | 
    
      ↑ open up ↑ | 
  
1107 1244  
1108 1245                  lwb->lwb_write_zio = zio_rewrite(lwb->lwb_root_zio,
1109 1246                      zilog->zl_spa, 0, &lwb->lwb_blk, lwb_abd,
1110 1247                      BP_GET_LSIZE(&lwb->lwb_blk), zil_lwb_write_done, lwb,
1111 1248                      prio, ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE, &zb);
1112 1249                  ASSERT3P(lwb->lwb_write_zio, !=, NULL);
1113 1250  
1114 1251                  lwb->lwb_state = LWB_STATE_OPENED;
1115 1252  
1116 1253                  mutex_enter(&zilog->zl_lock);
1117      -
1118      -                /*
1119      -                 * The zilog's "zl_last_lwb_opened" field is used to
1120      -                 * build the lwb/zio dependency chain, which is used to
1121      -                 * preserve the ordering of lwb completions that is
1122      -                 * required by the semantics of the ZIL. Each new lwb
1123      -                 * zio becomes a parent of the "previous" lwb zio, such
1124      -                 * that the new lwb's zio cannot complete until the
1125      -                 * "previous" lwb's zio completes.
1126      -                 *
1127      -                 * This is required by the semantics of zil_commit();
1128      -                 * the commit waiters attached to the lwbs will be woken
1129      -                 * in the lwb zio's completion callback, so this zio
1130      -                 * dependency graph ensures the waiters are woken in the
1131      -                 * correct order (the same order the lwbs were created).
1132      -                 */
1133      -                lwb_t *last_lwb_opened = zilog->zl_last_lwb_opened;
1134      -                if (last_lwb_opened != NULL &&
1135      -                    last_lwb_opened->lwb_state != LWB_STATE_DONE) {
1136      -                        ASSERT(last_lwb_opened->lwb_state == LWB_STATE_OPENED ||
1137      -                            last_lwb_opened->lwb_state == LWB_STATE_ISSUED);
1138      -                        ASSERT3P(last_lwb_opened->lwb_root_zio, !=, NULL);
1139      -                        zio_add_child(lwb->lwb_root_zio,
1140      -                            last_lwb_opened->lwb_root_zio);
1141      -                }
     1254 +                zil_lwb_set_zio_dependency(zilog, lwb);
1142 1255                  zilog->zl_last_lwb_opened = lwb;
1143      -
1144 1256                  mutex_exit(&zilog->zl_lock);
1145 1257          }
1146 1258  
1147 1259          ASSERT3P(lwb->lwb_root_zio, !=, NULL);
1148 1260          ASSERT3P(lwb->lwb_write_zio, !=, NULL);
1149 1261          ASSERT3S(lwb->lwb_state, ==, LWB_STATE_OPENED);
1150 1262  }
1151 1263  
1152 1264  /*
1153 1265   * Define a limited set of intent log block sizes.
1154 1266   *
1155 1267   * These must be a multiple of 4KB. Note only the amount used (again
1156 1268   * aligned to 4KB) actually gets written. However, we can't always just
1157 1269   * allocate SPA_OLD_MAXBLOCKSIZE as the slog space could be exhausted.
1158 1270   */
1159 1271  uint64_t zil_block_buckets[] = {
1160 1272      4096,               /* non TX_WRITE */
1161 1273      8192+4096,          /* data base */
1162 1274      32*1024 + 4096,     /* NFS writes */
1163 1275      UINT64_MAX
1164 1276  };
1165 1277  
1166 1278  /*
1167 1279   * Start a log block write and advance to the next log block.
1168 1280   * Calls are serialized.
1169 1281   */
1170 1282  static lwb_t *
1171 1283  zil_lwb_write_issue(zilog_t *zilog, lwb_t *lwb)
1172 1284  {
1173 1285          lwb_t *nlwb = NULL;
1174 1286          zil_chain_t *zilc;
1175 1287          spa_t *spa = zilog->zl_spa;
1176 1288          blkptr_t *bp;
1177 1289          dmu_tx_t *tx;
1178 1290          uint64_t txg;
1179 1291          uint64_t zil_blksz, wsz;
1180 1292          int i, error;
1181 1293          boolean_t slog;
1182 1294  
1183 1295          ASSERT(MUTEX_HELD(&zilog->zl_issuer_lock));
1184 1296          ASSERT3P(lwb->lwb_root_zio, !=, NULL);
1185 1297          ASSERT3P(lwb->lwb_write_zio, !=, NULL);
1186 1298          ASSERT3S(lwb->lwb_state, ==, LWB_STATE_OPENED);
1187 1299  
1188 1300          if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2) {
1189 1301                  zilc = (zil_chain_t *)lwb->lwb_buf;
1190 1302                  bp = &zilc->zc_next_blk;
1191 1303          } else {
1192 1304                  zilc = (zil_chain_t *)(lwb->lwb_buf + lwb->lwb_sz);
1193 1305                  bp = &zilc->zc_next_blk;
1194 1306          }
1195 1307  
1196 1308          ASSERT(lwb->lwb_nused <= lwb->lwb_sz);
1197 1309  
1198 1310          /*
1199 1311           * Allocate the next block and save its address in this block
1200 1312           * before writing it in order to establish the log chain.
  
    | 
      ↓ open down ↓ | 
    47 lines elided | 
    
      ↑ open up ↑ | 
  
1201 1313           * Note that if the allocation of nlwb synced before we wrote
1202 1314           * the block that points at it (lwb), we'd leak it if we crashed.
1203 1315           * Therefore, we don't do dmu_tx_commit() until zil_lwb_write_done().
1204 1316           * We dirty the dataset to ensure that zil_sync() will be called
1205 1317           * to clean up in the event of allocation failure or I/O failure.
1206 1318           */
1207 1319  
1208 1320          tx = dmu_tx_create(zilog->zl_os);
1209 1321  
1210 1322          /*
1211      -         * Since we are not going to create any new dirty data, and we
1212      -         * can even help with clearing the existing dirty data, we
1213      -         * should not be subject to the dirty data based delays. We
1214      -         * use TXG_NOTHROTTLE to bypass the delay mechanism.
     1323 +         * Since we are not going to create any new dirty data and we can even
     1324 +         * help with clearing the existing dirty data, we should not be subject
     1325 +         * to the dirty data based delays.
     1326 +         * We (ab)use TXG_WAITED to bypass the delay mechanism.
     1327 +         * One side effect from using TXG_WAITED is that dmu_tx_assign() can
     1328 +         * fail if the pool is suspended.  Those are dramatic circumstances,
     1329 +         * so we return NULL to signal that the normal ZIL processing is not
     1330 +         * possible and txg_wait_synced() should be used to ensure that the data
     1331 +         * is on disk.
1215 1332           */
1216      -        VERIFY0(dmu_tx_assign(tx, TXG_WAIT | TXG_NOTHROTTLE));
1217      -
     1333 +        error = dmu_tx_assign(tx, TXG_WAITED);
     1334 +        if (error != 0) {
     1335 +                ASSERT3S(error, ==, EIO);
     1336 +                dmu_tx_abort(tx);
     1337 +                return (NULL);
     1338 +        }
1218 1339          dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
1219 1340          txg = dmu_tx_get_txg(tx);
1220 1341  
1221 1342          lwb->lwb_tx = tx;
1222 1343  
1223 1344          /*
1224 1345           * Log blocks are pre-allocated. Here we select the size of the next
1225 1346           * block, based on size used in the last block.
1226 1347           * - first find the smallest bucket that will fit the block from a
1227 1348           *   limited set of block sizes. This is because it's faster to write
1228 1349           *   blocks allocated from the same metaslab as they are adjacent or
1229 1350           *   close.
1230 1351           * - next find the maximum from the new suggested size and an array of
1231 1352           *   previous sizes. This lessens a picket fence effect of wrongly
1232 1353           *   guesssing the size if we have a stream of say 2k, 64k, 2k, 64k
1233 1354           *   requests.
1234 1355           *
1235 1356           * Note we only write what is used, but we can't just allocate
1236 1357           * the maximum block size because we can exhaust the available
1237 1358           * pool log space.
1238 1359           */
1239 1360          zil_blksz = zilog->zl_cur_used + sizeof (zil_chain_t);
1240 1361          for (i = 0; zil_blksz > zil_block_buckets[i]; i++)
1241 1362                  continue;
1242 1363          zil_blksz = zil_block_buckets[i];
1243 1364          if (zil_blksz == UINT64_MAX)
1244 1365                  zil_blksz = SPA_OLD_MAXBLOCKSIZE;
1245 1366          zilog->zl_prev_blks[zilog->zl_prev_rotor] = zil_blksz;
1246 1367          for (i = 0; i < ZIL_PREV_BLKS; i++)
1247 1368                  zil_blksz = MAX(zil_blksz, zilog->zl_prev_blks[i]);
1248 1369          zilog->zl_prev_rotor = (zilog->zl_prev_rotor + 1) & (ZIL_PREV_BLKS - 1);
1249 1370  
1250 1371          BP_ZERO(bp);
1251 1372  
1252 1373          /* pass the old blkptr in order to spread log blocks across devs */
1253 1374          error = zio_alloc_zil(spa, txg, bp, &lwb->lwb_blk, zil_blksz, &slog);
1254 1375          if (error == 0) {
1255 1376                  ASSERT3U(bp->blk_birth, ==, txg);
1256 1377                  bp->blk_cksum = lwb->lwb_blk.blk_cksum;
1257 1378                  bp->blk_cksum.zc_word[ZIL_ZC_SEQ]++;
1258 1379  
1259 1380                  /*
1260 1381                   * Allocate a new log write block (lwb).
1261 1382                   */
1262 1383                  nlwb = zil_alloc_lwb(zilog, bp, slog, txg);
1263 1384          }
1264 1385  
1265 1386          if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2) {
1266 1387                  /* For Slim ZIL only write what is used. */
1267 1388                  wsz = P2ROUNDUP_TYPED(lwb->lwb_nused, ZIL_MIN_BLKSZ, uint64_t);
1268 1389                  ASSERT3U(wsz, <=, lwb->lwb_sz);
1269 1390                  zio_shrink(lwb->lwb_write_zio, wsz);
1270 1391  
1271 1392          } else {
1272 1393                  wsz = lwb->lwb_sz;
1273 1394          }
1274 1395  
1275 1396          zilc->zc_pad = 0;
1276 1397          zilc->zc_nused = lwb->lwb_nused;
1277 1398          zilc->zc_eck.zec_cksum = lwb->lwb_blk.blk_cksum;
1278 1399  
1279 1400          /*
1280 1401           * clear unused data for security
1281 1402           */
1282 1403          bzero(lwb->lwb_buf + lwb->lwb_nused, wsz - lwb->lwb_nused);
1283 1404  
1284 1405          spa_config_enter(zilog->zl_spa, SCL_STATE, lwb, RW_READER);
1285 1406  
1286 1407          zil_lwb_add_block(lwb, &lwb->lwb_blk);
1287 1408          lwb->lwb_issued_timestamp = gethrtime();
1288 1409          lwb->lwb_state = LWB_STATE_ISSUED;
1289 1410  
1290 1411          zio_nowait(lwb->lwb_root_zio);
1291 1412          zio_nowait(lwb->lwb_write_zio);
1292 1413  
1293 1414          /*
1294 1415           * If there was an allocation failure then nlwb will be null which
1295 1416           * forces a txg_wait_synced().
1296 1417           */
1297 1418          return (nlwb);
1298 1419  }
1299 1420  
1300 1421  static lwb_t *
1301 1422  zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb)
1302 1423  {
1303 1424          lr_t *lrcb, *lrc;
1304 1425          lr_write_t *lrwb, *lrw;
1305 1426          char *lr_buf;
1306 1427          uint64_t dlen, dnow, lwb_sp, reclen, txg;
1307 1428  
1308 1429          ASSERT(MUTEX_HELD(&zilog->zl_issuer_lock));
1309 1430          ASSERT3P(lwb, !=, NULL);
1310 1431          ASSERT3P(lwb->lwb_buf, !=, NULL);
1311 1432  
1312 1433          zil_lwb_write_open(zilog, lwb);
1313 1434  
1314 1435          lrc = &itx->itx_lr;
1315 1436          lrw = (lr_write_t *)lrc;
1316 1437  
1317 1438          /*
1318 1439           * A commit itx doesn't represent any on-disk state; instead
1319 1440           * it's simply used as a place holder on the commit list, and
1320 1441           * provides a mechanism for attaching a "commit waiter" onto the
1321 1442           * correct lwb (such that the waiter can be signalled upon
1322 1443           * completion of that lwb). Thus, we don't process this itx's
1323 1444           * log record if it's a commit itx (these itx's don't have log
1324 1445           * records), and instead link the itx's waiter onto the lwb's
1325 1446           * list of waiters.
1326 1447           *
1327 1448           * For more details, see the comment above zil_commit().
1328 1449           */
1329 1450          if (lrc->lrc_txtype == TX_COMMIT) {
1330 1451                  mutex_enter(&zilog->zl_lock);
1331 1452                  zil_commit_waiter_link_lwb(itx->itx_private, lwb);
1332 1453                  itx->itx_private = NULL;
1333 1454                  mutex_exit(&zilog->zl_lock);
1334 1455                  return (lwb);
1335 1456          }
1336 1457  
1337 1458          if (lrc->lrc_txtype == TX_WRITE && itx->itx_wr_state == WR_NEED_COPY) {
1338 1459                  dlen = P2ROUNDUP_TYPED(
1339 1460                      lrw->lr_length, sizeof (uint64_t), uint64_t);
1340 1461          } else {
1341 1462                  dlen = 0;
1342 1463          }
1343 1464          reclen = lrc->lrc_reclen;
1344 1465          zilog->zl_cur_used += (reclen + dlen);
1345 1466          txg = lrc->lrc_txg;
1346 1467  
1347 1468          ASSERT3U(zilog->zl_cur_used, <, UINT64_MAX - (reclen + dlen));
1348 1469  
1349 1470  cont:
1350 1471          /*
1351 1472           * If this record won't fit in the current log block, start a new one.
1352 1473           * For WR_NEED_COPY optimize layout for minimal number of chunks.
1353 1474           */
1354 1475          lwb_sp = lwb->lwb_sz - lwb->lwb_nused;
1355 1476          if (reclen > lwb_sp || (reclen + dlen > lwb_sp &&
1356 1477              lwb_sp < ZIL_MAX_WASTE_SPACE && (dlen % ZIL_MAX_LOG_DATA == 0 ||
1357 1478              lwb_sp < reclen + dlen % ZIL_MAX_LOG_DATA))) {
1358 1479                  lwb = zil_lwb_write_issue(zilog, lwb);
1359 1480                  if (lwb == NULL)
1360 1481                          return (NULL);
1361 1482                  zil_lwb_write_open(zilog, lwb);
1362 1483                  ASSERT(LWB_EMPTY(lwb));
1363 1484                  lwb_sp = lwb->lwb_sz - lwb->lwb_nused;
1364 1485                  ASSERT3U(reclen + MIN(dlen, sizeof (uint64_t)), <=, lwb_sp);
1365 1486          }
1366 1487  
1367 1488          dnow = MIN(dlen, lwb_sp - reclen);
1368 1489          lr_buf = lwb->lwb_buf + lwb->lwb_nused;
1369 1490          bcopy(lrc, lr_buf, reclen);
1370 1491          lrcb = (lr_t *)lr_buf;          /* Like lrc, but inside lwb. */
1371 1492          lrwb = (lr_write_t *)lrcb;      /* Like lrw, but inside lwb. */
1372 1493  
1373 1494          /*
1374 1495           * If it's a write, fetch the data or get its blkptr as appropriate.
1375 1496           */
1376 1497          if (lrc->lrc_txtype == TX_WRITE) {
1377 1498                  if (txg > spa_freeze_txg(zilog->zl_spa))
1378 1499                          txg_wait_synced(zilog->zl_dmu_pool, txg);
1379 1500                  if (itx->itx_wr_state != WR_COPIED) {
1380 1501                          char *dbuf;
1381 1502                          int error;
1382 1503  
1383 1504                          if (itx->itx_wr_state == WR_NEED_COPY) {
1384 1505                                  dbuf = lr_buf + reclen;
1385 1506                                  lrcb->lrc_reclen += dnow;
1386 1507                                  if (lrwb->lr_length > dnow)
1387 1508                                          lrwb->lr_length = dnow;
1388 1509                                  lrw->lr_offset += dnow;
1389 1510                                  lrw->lr_length -= dnow;
1390 1511                          } else {
1391 1512                                  ASSERT(itx->itx_wr_state == WR_INDIRECT);
1392 1513                                  dbuf = NULL;
1393 1514                          }
1394 1515  
1395 1516                          /*
1396 1517                           * We pass in the "lwb_write_zio" rather than
1397 1518                           * "lwb_root_zio" so that the "lwb_write_zio"
1398 1519                           * becomes the parent of any zio's created by
1399 1520                           * the "zl_get_data" callback. The vdevs are
1400 1521                           * flushed after the "lwb_write_zio" completes,
1401 1522                           * so we want to make sure that completion
1402 1523                           * callback waits for these additional zio's,
1403 1524                           * such that the vdevs used by those zio's will
1404 1525                           * be included in the lwb's vdev tree, and those
1405 1526                           * vdevs will be properly flushed. If we passed
1406 1527                           * in "lwb_root_zio" here, then these additional
1407 1528                           * vdevs may not be flushed; e.g. if these zio's
1408 1529                           * completed after "lwb_write_zio" completed.
1409 1530                           */
1410 1531                          error = zilog->zl_get_data(itx->itx_private,
1411 1532                              lrwb, dbuf, lwb, lwb->lwb_write_zio);
1412 1533  
1413 1534                          if (error == EIO) {
1414 1535                                  txg_wait_synced(zilog->zl_dmu_pool, txg);
1415 1536                                  return (lwb);
1416 1537                          }
1417 1538                          if (error != 0) {
1418 1539                                  ASSERT(error == ENOENT || error == EEXIST ||
1419 1540                                      error == EALREADY);
1420 1541                                  return (lwb);
1421 1542                          }
1422 1543                  }
1423 1544          }
1424 1545  
1425 1546          /*
1426 1547           * We're actually making an entry, so update lrc_seq to be the
1427 1548           * log record sequence number.  Note that this is generally not
1428 1549           * equal to the itx sequence number because not all transactions
1429 1550           * are synchronous, and sometimes spa_sync() gets there first.
1430 1551           */
1431 1552          lrcb->lrc_seq = ++zilog->zl_lr_seq;
1432 1553          lwb->lwb_nused += reclen + dnow;
1433 1554  
1434 1555          zil_lwb_add_txg(lwb, txg);
1435 1556  
1436 1557          ASSERT3U(lwb->lwb_nused, <=, lwb->lwb_sz);
1437 1558          ASSERT0(P2PHASE(lwb->lwb_nused, sizeof (uint64_t)));
1438 1559  
1439 1560          dlen -= dnow;
1440 1561          if (dlen > 0) {
1441 1562                  zilog->zl_cur_used += reclen;
1442 1563                  goto cont;
1443 1564          }
1444 1565  
1445 1566          return (lwb);
1446 1567  }
1447 1568  
1448 1569  itx_t *
1449 1570  zil_itx_create(uint64_t txtype, size_t lrsize)
1450 1571  {
1451 1572          itx_t *itx;
1452 1573  
1453 1574          lrsize = P2ROUNDUP_TYPED(lrsize, sizeof (uint64_t), size_t);
1454 1575  
1455 1576          itx = kmem_alloc(offsetof(itx_t, itx_lr) + lrsize, KM_SLEEP);
1456 1577          itx->itx_lr.lrc_txtype = txtype;
1457 1578          itx->itx_lr.lrc_reclen = lrsize;
1458 1579          itx->itx_lr.lrc_seq = 0;        /* defensive */
1459 1580          itx->itx_sync = B_TRUE;         /* default is synchronous */
1460 1581  
1461 1582          return (itx);
1462 1583  }
1463 1584  
1464 1585  void
1465 1586  zil_itx_destroy(itx_t *itx)
1466 1587  {
1467 1588          kmem_free(itx, offsetof(itx_t, itx_lr) + itx->itx_lr.lrc_reclen);
1468 1589  }
1469 1590  
1470 1591  /*
1471 1592   * Free up the sync and async itxs. The itxs_t has already been detached
1472 1593   * so no locks are needed.
1473 1594   */
1474 1595  static void
1475 1596  zil_itxg_clean(itxs_t *itxs)
1476 1597  {
1477 1598          itx_t *itx;
1478 1599          list_t *list;
1479 1600          avl_tree_t *t;
1480 1601          void *cookie;
1481 1602          itx_async_node_t *ian;
1482 1603  
1483 1604          list = &itxs->i_sync_list;
1484 1605          while ((itx = list_head(list)) != NULL) {
1485 1606                  /*
1486 1607                   * In the general case, commit itxs will not be found
1487 1608                   * here, as they'll be committed to an lwb via
1488 1609                   * zil_lwb_commit(), and free'd in that function. Having
1489 1610                   * said that, it is still possible for commit itxs to be
1490 1611                   * found here, due to the following race:
1491 1612                   *
1492 1613                   *      - a thread calls zil_commit() which assigns the
1493 1614                   *        commit itx to a per-txg i_sync_list
1494 1615                   *      - zil_itxg_clean() is called (e.g. via spa_sync())
1495 1616                   *        while the waiter is still on the i_sync_list
1496 1617                   *
1497 1618                   * There's nothing to prevent syncing the txg while the
1498 1619                   * waiter is on the i_sync_list. This normally doesn't
1499 1620                   * happen because spa_sync() is slower than zil_commit(),
1500 1621                   * but if zil_commit() calls txg_wait_synced() (e.g.
1501 1622                   * because zil_create() or zil_commit_writer_stall() is
1502 1623                   * called) we will hit this case.
1503 1624                   */
1504 1625                  if (itx->itx_lr.lrc_txtype == TX_COMMIT)
1505 1626                          zil_commit_waiter_skip(itx->itx_private);
1506 1627  
1507 1628                  list_remove(list, itx);
1508 1629                  zil_itx_destroy(itx);
1509 1630          }
1510 1631  
1511 1632          cookie = NULL;
1512 1633          t = &itxs->i_async_tree;
1513 1634          while ((ian = avl_destroy_nodes(t, &cookie)) != NULL) {
1514 1635                  list = &ian->ia_list;
1515 1636                  while ((itx = list_head(list)) != NULL) {
1516 1637                          list_remove(list, itx);
1517 1638                          /* commit itxs should never be on the async lists. */
1518 1639                          ASSERT3U(itx->itx_lr.lrc_txtype, !=, TX_COMMIT);
1519 1640                          zil_itx_destroy(itx);
1520 1641                  }
1521 1642                  list_destroy(list);
1522 1643                  kmem_free(ian, sizeof (itx_async_node_t));
1523 1644          }
1524 1645          avl_destroy(t);
1525 1646  
1526 1647          kmem_free(itxs, sizeof (itxs_t));
1527 1648  }
1528 1649  
1529 1650  static int
1530 1651  zil_aitx_compare(const void *x1, const void *x2)
1531 1652  {
1532 1653          const uint64_t o1 = ((itx_async_node_t *)x1)->ia_foid;
1533 1654          const uint64_t o2 = ((itx_async_node_t *)x2)->ia_foid;
1534 1655  
1535 1656          if (o1 < o2)
1536 1657                  return (-1);
1537 1658          if (o1 > o2)
1538 1659                  return (1);
1539 1660  
1540 1661          return (0);
1541 1662  }
1542 1663  
1543 1664  /*
1544 1665   * Remove all async itx with the given oid.
1545 1666   */
1546 1667  static void
1547 1668  zil_remove_async(zilog_t *zilog, uint64_t oid)
1548 1669  {
1549 1670          uint64_t otxg, txg;
1550 1671          itx_async_node_t *ian;
1551 1672          avl_tree_t *t;
1552 1673          avl_index_t where;
1553 1674          list_t clean_list;
1554 1675          itx_t *itx;
1555 1676  
1556 1677          ASSERT(oid != 0);
1557 1678          list_create(&clean_list, sizeof (itx_t), offsetof(itx_t, itx_node));
1558 1679  
1559 1680          if (spa_freeze_txg(zilog->zl_spa) != UINT64_MAX) /* ziltest support */
1560 1681                  otxg = ZILTEST_TXG;
1561 1682          else
1562 1683                  otxg = spa_last_synced_txg(zilog->zl_spa) + 1;
1563 1684  
1564 1685          for (txg = otxg; txg < (otxg + TXG_CONCURRENT_STATES); txg++) {
1565 1686                  itxg_t *itxg = &zilog->zl_itxg[txg & TXG_MASK];
1566 1687  
1567 1688                  mutex_enter(&itxg->itxg_lock);
1568 1689                  if (itxg->itxg_txg != txg) {
1569 1690                          mutex_exit(&itxg->itxg_lock);
1570 1691                          continue;
1571 1692                  }
1572 1693  
1573 1694                  /*
1574 1695                   * Locate the object node and append its list.
1575 1696                   */
1576 1697                  t = &itxg->itxg_itxs->i_async_tree;
1577 1698                  ian = avl_find(t, &oid, &where);
1578 1699                  if (ian != NULL)
1579 1700                          list_move_tail(&clean_list, &ian->ia_list);
1580 1701                  mutex_exit(&itxg->itxg_lock);
1581 1702          }
1582 1703          while ((itx = list_head(&clean_list)) != NULL) {
1583 1704                  list_remove(&clean_list, itx);
1584 1705                  /* commit itxs should never be on the async lists. */
1585 1706                  ASSERT3U(itx->itx_lr.lrc_txtype, !=, TX_COMMIT);
1586 1707                  zil_itx_destroy(itx);
1587 1708          }
1588 1709          list_destroy(&clean_list);
1589 1710  }
1590 1711  
1591 1712  void
1592 1713  zil_itx_assign(zilog_t *zilog, itx_t *itx, dmu_tx_t *tx)
1593 1714  {
1594 1715          uint64_t txg;
1595 1716          itxg_t *itxg;
1596 1717          itxs_t *itxs, *clean = NULL;
1597 1718  
1598 1719          /*
1599 1720           * Object ids can be re-instantiated in the next txg so
1600 1721           * remove any async transactions to avoid future leaks.
1601 1722           * This can happen if a fsync occurs on the re-instantiated
1602 1723           * object for a WR_INDIRECT or WR_NEED_COPY write, which gets
1603 1724           * the new file data and flushes a write record for the old object.
1604 1725           */
1605 1726          if ((itx->itx_lr.lrc_txtype & ~TX_CI) == TX_REMOVE)
1606 1727                  zil_remove_async(zilog, itx->itx_oid);
1607 1728  
1608 1729          /*
1609 1730           * Ensure the data of a renamed file is committed before the rename.
1610 1731           */
1611 1732          if ((itx->itx_lr.lrc_txtype & ~TX_CI) == TX_RENAME)
1612 1733                  zil_async_to_sync(zilog, itx->itx_oid);
1613 1734  
1614 1735          if (spa_freeze_txg(zilog->zl_spa) != UINT64_MAX)
1615 1736                  txg = ZILTEST_TXG;
1616 1737          else
1617 1738                  txg = dmu_tx_get_txg(tx);
1618 1739  
1619 1740          itxg = &zilog->zl_itxg[txg & TXG_MASK];
1620 1741          mutex_enter(&itxg->itxg_lock);
1621 1742          itxs = itxg->itxg_itxs;
1622 1743          if (itxg->itxg_txg != txg) {
1623 1744                  if (itxs != NULL) {
1624 1745                          /*
1625 1746                           * The zil_clean callback hasn't got around to cleaning
1626 1747                           * this itxg. Save the itxs for release below.
1627 1748                           * This should be rare.
1628 1749                           */
1629 1750                          zfs_dbgmsg("zil_itx_assign: missed itx cleanup for "
1630 1751                              "txg %llu", itxg->itxg_txg);
1631 1752                          clean = itxg->itxg_itxs;
1632 1753                  }
1633 1754                  itxg->itxg_txg = txg;
1634 1755                  itxs = itxg->itxg_itxs = kmem_zalloc(sizeof (itxs_t), KM_SLEEP);
1635 1756  
1636 1757                  list_create(&itxs->i_sync_list, sizeof (itx_t),
1637 1758                      offsetof(itx_t, itx_node));
1638 1759                  avl_create(&itxs->i_async_tree, zil_aitx_compare,
1639 1760                      sizeof (itx_async_node_t),
1640 1761                      offsetof(itx_async_node_t, ia_node));
1641 1762          }
1642 1763          if (itx->itx_sync) {
1643 1764                  list_insert_tail(&itxs->i_sync_list, itx);
1644 1765          } else {
1645 1766                  avl_tree_t *t = &itxs->i_async_tree;
1646 1767                  uint64_t foid = ((lr_ooo_t *)&itx->itx_lr)->lr_foid;
1647 1768                  itx_async_node_t *ian;
1648 1769                  avl_index_t where;
1649 1770  
1650 1771                  ian = avl_find(t, &foid, &where);
1651 1772                  if (ian == NULL) {
1652 1773                          ian = kmem_alloc(sizeof (itx_async_node_t), KM_SLEEP);
1653 1774                          list_create(&ian->ia_list, sizeof (itx_t),
1654 1775                              offsetof(itx_t, itx_node));
1655 1776                          ian->ia_foid = foid;
1656 1777                          avl_insert(t, ian, where);
1657 1778                  }
1658 1779                  list_insert_tail(&ian->ia_list, itx);
1659 1780          }
1660 1781  
1661 1782          itx->itx_lr.lrc_txg = dmu_tx_get_txg(tx);
1662 1783  
1663 1784          /*
1664 1785           * We don't want to dirty the ZIL using ZILTEST_TXG, because
1665 1786           * zil_clean() will never be called using ZILTEST_TXG. Thus, we
1666 1787           * need to be careful to always dirty the ZIL using the "real"
1667 1788           * TXG (not itxg_txg) even when the SPA is frozen.
1668 1789           */
1669 1790          zilog_dirty(zilog, dmu_tx_get_txg(tx));
1670 1791          mutex_exit(&itxg->itxg_lock);
1671 1792  
1672 1793          /* Release the old itxs now we've dropped the lock */
1673 1794          if (clean != NULL)
1674 1795                  zil_itxg_clean(clean);
1675 1796  }
1676 1797  
1677 1798  /*
1678 1799   * If there are any in-memory intent log transactions which have now been
1679 1800   * synced then start up a taskq to free them. We should only do this after we
1680 1801   * have written out the uberblocks (i.e. txg has been comitted) so that
1681 1802   * don't inadvertently clean out in-memory log records that would be required
1682 1803   * by zil_commit().
1683 1804   */
1684 1805  void
1685 1806  zil_clean(zilog_t *zilog, uint64_t synced_txg)
1686 1807  {
1687 1808          itxg_t *itxg = &zilog->zl_itxg[synced_txg & TXG_MASK];
1688 1809          itxs_t *clean_me;
1689 1810  
1690 1811          ASSERT3U(synced_txg, <, ZILTEST_TXG);
1691 1812  
1692 1813          mutex_enter(&itxg->itxg_lock);
1693 1814          if (itxg->itxg_itxs == NULL || itxg->itxg_txg == ZILTEST_TXG) {
1694 1815                  mutex_exit(&itxg->itxg_lock);
1695 1816                  return;
1696 1817          }
1697 1818          ASSERT3U(itxg->itxg_txg, <=, synced_txg);
1698 1819          ASSERT3U(itxg->itxg_txg, !=, 0);
1699 1820          clean_me = itxg->itxg_itxs;
1700 1821          itxg->itxg_itxs = NULL;
1701 1822          itxg->itxg_txg = 0;
1702 1823          mutex_exit(&itxg->itxg_lock);
1703 1824          /*
1704 1825           * Preferably start a task queue to free up the old itxs but
1705 1826           * if taskq_dispatch can't allocate resources to do that then
1706 1827           * free it in-line. This should be rare. Note, using TQ_SLEEP
1707 1828           * created a bad performance problem.
1708 1829           */
1709 1830          ASSERT3P(zilog->zl_dmu_pool, !=, NULL);
1710 1831          ASSERT3P(zilog->zl_dmu_pool->dp_zil_clean_taskq, !=, NULL);
1711 1832          if (taskq_dispatch(zilog->zl_dmu_pool->dp_zil_clean_taskq,
1712 1833              (void (*)(void *))zil_itxg_clean, clean_me, TQ_NOSLEEP) == NULL)
1713 1834                  zil_itxg_clean(clean_me);
1714 1835  }
1715 1836  
1716 1837  /*
1717 1838   * This function will traverse the queue of itxs that need to be
1718 1839   * committed, and move them onto the ZIL's zl_itx_commit_list.
1719 1840   */
1720 1841  static void
1721 1842  zil_get_commit_list(zilog_t *zilog)
1722 1843  {
1723 1844          uint64_t otxg, txg;
1724 1845          list_t *commit_list = &zilog->zl_itx_commit_list;
1725 1846  
1726 1847          ASSERT(MUTEX_HELD(&zilog->zl_issuer_lock));
1727 1848  
1728 1849          if (spa_freeze_txg(zilog->zl_spa) != UINT64_MAX) /* ziltest support */
1729 1850                  otxg = ZILTEST_TXG;
1730 1851          else
1731 1852                  otxg = spa_last_synced_txg(zilog->zl_spa) + 1;
1732 1853  
1733 1854          /*
1734 1855           * This is inherently racy, since there is nothing to prevent
1735 1856           * the last synced txg from changing. That's okay since we'll
1736 1857           * only commit things in the future.
1737 1858           */
1738 1859          for (txg = otxg; txg < (otxg + TXG_CONCURRENT_STATES); txg++) {
1739 1860                  itxg_t *itxg = &zilog->zl_itxg[txg & TXG_MASK];
1740 1861  
1741 1862                  mutex_enter(&itxg->itxg_lock);
1742 1863                  if (itxg->itxg_txg != txg) {
1743 1864                          mutex_exit(&itxg->itxg_lock);
1744 1865                          continue;
1745 1866                  }
1746 1867  
1747 1868                  /*
1748 1869                   * If we're adding itx records to the zl_itx_commit_list,
1749 1870                   * then the zil better be dirty in this "txg". We can assert
1750 1871                   * that here since we're holding the itxg_lock which will
1751 1872                   * prevent spa_sync from cleaning it. Once we add the itxs
1752 1873                   * to the zl_itx_commit_list we must commit it to disk even
1753 1874                   * if it's unnecessary (i.e. the txg was synced).
1754 1875                   */
1755 1876                  ASSERT(zilog_is_dirty_in_txg(zilog, txg) ||
1756 1877                      spa_freeze_txg(zilog->zl_spa) != UINT64_MAX);
1757 1878                  list_move_tail(commit_list, &itxg->itxg_itxs->i_sync_list);
1758 1879  
1759 1880                  mutex_exit(&itxg->itxg_lock);
1760 1881          }
1761 1882  }
1762 1883  
1763 1884  /*
1764 1885   * Move the async itxs for a specified object to commit into sync lists.
1765 1886   */
1766 1887  static void
1767 1888  zil_async_to_sync(zilog_t *zilog, uint64_t foid)
1768 1889  {
1769 1890          uint64_t otxg, txg;
1770 1891          itx_async_node_t *ian;
1771 1892          avl_tree_t *t;
1772 1893          avl_index_t where;
1773 1894  
1774 1895          if (spa_freeze_txg(zilog->zl_spa) != UINT64_MAX) /* ziltest support */
1775 1896                  otxg = ZILTEST_TXG;
1776 1897          else
1777 1898                  otxg = spa_last_synced_txg(zilog->zl_spa) + 1;
1778 1899  
1779 1900          /*
1780 1901           * This is inherently racy, since there is nothing to prevent
1781 1902           * the last synced txg from changing.
1782 1903           */
1783 1904          for (txg = otxg; txg < (otxg + TXG_CONCURRENT_STATES); txg++) {
1784 1905                  itxg_t *itxg = &zilog->zl_itxg[txg & TXG_MASK];
1785 1906  
1786 1907                  mutex_enter(&itxg->itxg_lock);
1787 1908                  if (itxg->itxg_txg != txg) {
1788 1909                          mutex_exit(&itxg->itxg_lock);
1789 1910                          continue;
1790 1911                  }
1791 1912  
1792 1913                  /*
1793 1914                   * If a foid is specified then find that node and append its
1794 1915                   * list. Otherwise walk the tree appending all the lists
1795 1916                   * to the sync list. We add to the end rather than the
1796 1917                   * beginning to ensure the create has happened.
1797 1918                   */
1798 1919                  t = &itxg->itxg_itxs->i_async_tree;
1799 1920                  if (foid != 0) {
1800 1921                          ian = avl_find(t, &foid, &where);
1801 1922                          if (ian != NULL) {
1802 1923                                  list_move_tail(&itxg->itxg_itxs->i_sync_list,
1803 1924                                      &ian->ia_list);
1804 1925                          }
1805 1926                  } else {
1806 1927                          void *cookie = NULL;
1807 1928  
1808 1929                          while ((ian = avl_destroy_nodes(t, &cookie)) != NULL) {
1809 1930                                  list_move_tail(&itxg->itxg_itxs->i_sync_list,
1810 1931                                      &ian->ia_list);
1811 1932                                  list_destroy(&ian->ia_list);
1812 1933                                  kmem_free(ian, sizeof (itx_async_node_t));
1813 1934                          }
1814 1935                  }
1815 1936                  mutex_exit(&itxg->itxg_lock);
1816 1937          }
1817 1938  }
1818 1939  
1819 1940  /*
1820 1941   * This function will prune commit itxs that are at the head of the
1821 1942   * commit list (it won't prune past the first non-commit itx), and
1822 1943   * either: a) attach them to the last lwb that's still pending
1823 1944   * completion, or b) skip them altogether.
1824 1945   *
1825 1946   * This is used as a performance optimization to prevent commit itxs
1826 1947   * from generating new lwbs when it's unnecessary to do so.
1827 1948   */
1828 1949  static void
1829 1950  zil_prune_commit_list(zilog_t *zilog)
1830 1951  {
1831 1952          itx_t *itx;
1832 1953  
  
    | 
      ↓ open down ↓ | 
    605 lines elided | 
    
      ↑ open up ↑ | 
  
1833 1954          ASSERT(MUTEX_HELD(&zilog->zl_issuer_lock));
1834 1955  
1835 1956          while (itx = list_head(&zilog->zl_itx_commit_list)) {
1836 1957                  lr_t *lrc = &itx->itx_lr;
1837 1958                  if (lrc->lrc_txtype != TX_COMMIT)
1838 1959                          break;
1839 1960  
1840 1961                  mutex_enter(&zilog->zl_lock);
1841 1962  
1842 1963                  lwb_t *last_lwb = zilog->zl_last_lwb_opened;
1843      -                if (last_lwb == NULL || last_lwb->lwb_state == LWB_STATE_DONE) {
     1964 +                if (last_lwb == NULL ||
     1965 +                    last_lwb->lwb_state == LWB_STATE_FLUSH_DONE) {
1844 1966                          /*
1845 1967                           * All of the itxs this waiter was waiting on
1846 1968                           * must have already completed (or there were
1847 1969                           * never any itx's for it to wait on), so it's
1848 1970                           * safe to skip this waiter and mark it done.
1849 1971                           */
1850 1972                          zil_commit_waiter_skip(itx->itx_private);
1851 1973                  } else {
1852 1974                          zil_commit_waiter_link_lwb(itx->itx_private, last_lwb);
1853 1975                          itx->itx_private = NULL;
1854 1976                  }
1855 1977  
1856 1978                  mutex_exit(&zilog->zl_lock);
1857 1979  
1858 1980                  list_remove(&zilog->zl_itx_commit_list, itx);
1859 1981                  zil_itx_destroy(itx);
1860 1982          }
1861 1983  
1862 1984          IMPLY(itx != NULL, itx->itx_lr.lrc_txtype != TX_COMMIT);
1863 1985  }
1864 1986  
1865 1987  static void
1866 1988  zil_commit_writer_stall(zilog_t *zilog)
1867 1989  {
1868 1990          /*
1869 1991           * When zio_alloc_zil() fails to allocate the next lwb block on
1870 1992           * disk, we must call txg_wait_synced() to ensure all of the
1871 1993           * lwbs in the zilog's zl_lwb_list are synced and then freed (in
1872 1994           * zil_sync()), such that any subsequent ZIL writer (i.e. a call
1873 1995           * to zil_process_commit_list()) will have to call zil_create(),
1874 1996           * and start a new ZIL chain.
1875 1997           *
1876 1998           * Since zil_alloc_zil() failed, the lwb that was previously
1877 1999           * issued does not have a pointer to the "next" lwb on disk.
1878 2000           * Thus, if another ZIL writer thread was to allocate the "next"
1879 2001           * on-disk lwb, that block could be leaked in the event of a
1880 2002           * crash (because the previous lwb on-disk would not point to
1881 2003           * it).
1882 2004           *
1883 2005           * We must hold the zilog's zl_issuer_lock while we do this, to
1884 2006           * ensure no new threads enter zil_process_commit_list() until
1885 2007           * all lwb's in the zl_lwb_list have been synced and freed
1886 2008           * (which is achieved via the txg_wait_synced() call).
1887 2009           */
1888 2010          ASSERT(MUTEX_HELD(&zilog->zl_issuer_lock));
1889 2011          txg_wait_synced(zilog->zl_dmu_pool, 0);
1890 2012          ASSERT3P(list_tail(&zilog->zl_lwb_list), ==, NULL);
1891 2013  }
1892 2014  
1893 2015  /*
1894 2016   * This function will traverse the commit list, creating new lwbs as
1895 2017   * needed, and committing the itxs from the commit list to these newly
1896 2018   * created lwbs. Additionally, as a new lwb is created, the previous
1897 2019   * lwb will be issued to the zio layer to be written to disk.
1898 2020   */
1899 2021  static void
1900 2022  zil_process_commit_list(zilog_t *zilog)
1901 2023  {
1902 2024          spa_t *spa = zilog->zl_spa;
1903 2025          list_t nolwb_waiters;
1904 2026          lwb_t *lwb;
1905 2027          itx_t *itx;
1906 2028  
1907 2029          ASSERT(MUTEX_HELD(&zilog->zl_issuer_lock));
1908 2030  
1909 2031          /*
1910 2032           * Return if there's nothing to commit before we dirty the fs by
1911 2033           * calling zil_create().
1912 2034           */
1913 2035          if (list_head(&zilog->zl_itx_commit_list) == NULL)
  
    | 
      ↓ open down ↓ | 
    60 lines elided | 
    
      ↑ open up ↑ | 
  
1914 2036                  return;
1915 2037  
1916 2038          list_create(&nolwb_waiters, sizeof (zil_commit_waiter_t),
1917 2039              offsetof(zil_commit_waiter_t, zcw_node));
1918 2040  
1919 2041          lwb = list_tail(&zilog->zl_lwb_list);
1920 2042          if (lwb == NULL) {
1921 2043                  lwb = zil_create(zilog);
1922 2044          } else {
1923 2045                  ASSERT3S(lwb->lwb_state, !=, LWB_STATE_ISSUED);
1924      -                ASSERT3S(lwb->lwb_state, !=, LWB_STATE_DONE);
     2046 +                ASSERT3S(lwb->lwb_state, !=, LWB_STATE_WRITE_DONE);
     2047 +                ASSERT3S(lwb->lwb_state, !=, LWB_STATE_FLUSH_DONE);
1925 2048          }
1926 2049  
1927 2050          while (itx = list_head(&zilog->zl_itx_commit_list)) {
1928 2051                  lr_t *lrc = &itx->itx_lr;
1929 2052                  uint64_t txg = lrc->lrc_txg;
1930 2053  
1931 2054                  ASSERT3U(txg, !=, 0);
1932 2055  
1933 2056                  if (lrc->lrc_txtype == TX_COMMIT) {
1934 2057                          DTRACE_PROBE2(zil__process__commit__itx,
1935 2058                              zilog_t *, zilog, itx_t *, itx);
1936 2059                  } else {
1937 2060                          DTRACE_PROBE2(zil__process__normal__itx,
1938 2061                              zilog_t *, zilog, itx_t *, itx);
1939 2062                  }
1940 2063  
1941 2064                  boolean_t synced = txg <= spa_last_synced_txg(spa);
1942 2065                  boolean_t frozen = txg > spa_freeze_txg(spa);
1943 2066  
1944 2067                  /*
1945 2068                   * If the txg of this itx has already been synced out, then
1946 2069                   * we don't need to commit this itx to an lwb. This is
1947 2070                   * because the data of this itx will have already been
1948 2071                   * written to the main pool. This is inherently racy, and
1949 2072                   * it's still ok to commit an itx whose txg has already
1950 2073                   * been synced; this will result in a write that's
1951 2074                   * unnecessary, but will do no harm.
1952 2075                   *
1953 2076                   * With that said, we always want to commit TX_COMMIT itxs
1954 2077                   * to an lwb, regardless of whether or not that itx's txg
1955 2078                   * has been synced out. We do this to ensure any OPENED lwb
1956 2079                   * will always have at least one zil_commit_waiter_t linked
1957 2080                   * to the lwb.
1958 2081                   *
1959 2082                   * As a counter-example, if we skipped TX_COMMIT itx's
1960 2083                   * whose txg had already been synced, the following
1961 2084                   * situation could occur if we happened to be racing with
1962 2085                   * spa_sync:
1963 2086                   *
1964 2087                   * 1. we commit a non-TX_COMMIT itx to an lwb, where the
1965 2088                   *    itx's txg is 10 and the last synced txg is 9.
1966 2089                   * 2. spa_sync finishes syncing out txg 10.
1967 2090                   * 3. we move to the next itx in the list, it's a TX_COMMIT
1968 2091                   *    whose txg is 10, so we skip it rather than committing
1969 2092                   *    it to the lwb used in (1).
1970 2093                   *
1971 2094                   * If the itx that is skipped in (3) is the last TX_COMMIT
1972 2095                   * itx in the commit list, than it's possible for the lwb
1973 2096                   * used in (1) to remain in the OPENED state indefinitely.
1974 2097                   *
1975 2098                   * To prevent the above scenario from occuring, ensuring
1976 2099                   * that once an lwb is OPENED it will transition to ISSUED
1977 2100                   * and eventually DONE, we always commit TX_COMMIT itx's to
1978 2101                   * an lwb here, even if that itx's txg has already been
1979 2102                   * synced.
1980 2103                   *
1981 2104                   * Finally, if the pool is frozen, we _always_ commit the
1982 2105                   * itx.  The point of freezing the pool is to prevent data
1983 2106                   * from being written to the main pool via spa_sync, and
1984 2107                   * instead rely solely on the ZIL to persistently store the
1985 2108                   * data; i.e.  when the pool is frozen, the last synced txg
1986 2109                   * value can't be trusted.
1987 2110                   */
1988 2111                  if (frozen || !synced || lrc->lrc_txtype == TX_COMMIT) {
1989 2112                          if (lwb != NULL) {
1990 2113                                  lwb = zil_lwb_commit(zilog, itx, lwb);
1991 2114                          } else if (lrc->lrc_txtype == TX_COMMIT) {
1992 2115                                  ASSERT3P(lwb, ==, NULL);
1993 2116                                  zil_commit_waiter_link_nolwb(
1994 2117                                      itx->itx_private, &nolwb_waiters);
1995 2118                          }
1996 2119                  }
1997 2120  
1998 2121                  list_remove(&zilog->zl_itx_commit_list, itx);
1999 2122                  zil_itx_destroy(itx);
2000 2123          }
2001 2124  
2002 2125          if (lwb == NULL) {
2003 2126                  /*
2004 2127                   * This indicates zio_alloc_zil() failed to allocate the
2005 2128                   * "next" lwb on-disk. When this happens, we must stall
2006 2129                   * the ZIL write pipeline; see the comment within
2007 2130                   * zil_commit_writer_stall() for more details.
2008 2131                   */
2009 2132                  zil_commit_writer_stall(zilog);
2010 2133  
2011 2134                  /*
2012 2135                   * Additionally, we have to signal and mark the "nolwb"
2013 2136                   * waiters as "done" here, since without an lwb, we
2014 2137                   * can't do this via zil_lwb_flush_vdevs_done() like
2015 2138                   * normal.
  
    | 
      ↓ open down ↓ | 
    81 lines elided | 
    
      ↑ open up ↑ | 
  
2016 2139                   */
2017 2140                  zil_commit_waiter_t *zcw;
2018 2141                  while (zcw = list_head(&nolwb_waiters)) {
2019 2142                          zil_commit_waiter_skip(zcw);
2020 2143                          list_remove(&nolwb_waiters, zcw);
2021 2144                  }
2022 2145          } else {
2023 2146                  ASSERT(list_is_empty(&nolwb_waiters));
2024 2147                  ASSERT3P(lwb, !=, NULL);
2025 2148                  ASSERT3S(lwb->lwb_state, !=, LWB_STATE_ISSUED);
2026      -                ASSERT3S(lwb->lwb_state, !=, LWB_STATE_DONE);
     2149 +                ASSERT3S(lwb->lwb_state, !=, LWB_STATE_WRITE_DONE);
     2150 +                ASSERT3S(lwb->lwb_state, !=, LWB_STATE_FLUSH_DONE);
2027 2151  
2028 2152                  /*
2029 2153                   * At this point, the ZIL block pointed at by the "lwb"
2030 2154                   * variable is in one of the following states: "closed"
2031 2155                   * or "open".
2032 2156                   *
2033 2157                   * If its "closed", then no itxs have been committed to
2034 2158                   * it, so there's no point in issuing its zio (i.e.
2035 2159                   * it's "empty").
2036 2160                   *
2037 2161                   * If its "open" state, then it contains one or more
2038 2162                   * itxs that eventually need to be committed to stable
2039 2163                   * storage. In this case we intentionally do not issue
2040 2164                   * the lwb's zio to disk yet, and instead rely on one of
2041 2165                   * the following two mechanisms for issuing the zio:
2042 2166                   *
2043 2167                   * 1. Ideally, there will be more ZIL activity occuring
2044 2168                   * on the system, such that this function will be
2045 2169                   * immediately called again (not necessarily by the same
2046 2170                   * thread) and this lwb's zio will be issued via
2047 2171                   * zil_lwb_commit(). This way, the lwb is guaranteed to
2048 2172                   * be "full" when it is issued to disk, and we'll make
2049 2173                   * use of the lwb's size the best we can.
2050 2174                   *
2051 2175                   * 2. If there isn't sufficient ZIL activity occuring on
2052 2176                   * the system, such that this lwb's zio isn't issued via
2053 2177                   * zil_lwb_commit(), zil_commit_waiter() will issue the
2054 2178                   * lwb's zio. If this occurs, the lwb is not guaranteed
2055 2179                   * to be "full" by the time its zio is issued, and means
2056 2180                   * the size of the lwb was "too large" given the amount
2057 2181                   * of ZIL activity occuring on the system at that time.
2058 2182                   *
2059 2183                   * We do this for a couple of reasons:
2060 2184                   *
2061 2185                   * 1. To try and reduce the number of IOPs needed to
2062 2186                   * write the same number of itxs. If an lwb has space
2063 2187                   * available in it's buffer for more itxs, and more itxs
2064 2188                   * will be committed relatively soon (relative to the
2065 2189                   * latency of performing a write), then it's beneficial
2066 2190                   * to wait for these "next" itxs. This way, more itxs
2067 2191                   * can be committed to stable storage with fewer writes.
2068 2192                   *
2069 2193                   * 2. To try and use the largest lwb block size that the
2070 2194                   * incoming rate of itxs can support. Again, this is to
2071 2195                   * try and pack as many itxs into as few lwbs as
2072 2196                   * possible, without significantly impacting the latency
2073 2197                   * of each individual itx.
2074 2198                   */
2075 2199          }
2076 2200  }
2077 2201  
2078 2202  /*
2079 2203   * This function is responsible for ensuring the passed in commit waiter
2080 2204   * (and associated commit itx) is committed to an lwb. If the waiter is
2081 2205   * not already committed to an lwb, all itxs in the zilog's queue of
2082 2206   * itxs will be processed. The assumption is the passed in waiter's
2083 2207   * commit itx will found in the queue just like the other non-commit
2084 2208   * itxs, such that when the entire queue is processed, the waiter will
2085 2209   * have been commited to an lwb.
2086 2210   *
2087 2211   * The lwb associated with the passed in waiter is not guaranteed to
2088 2212   * have been issued by the time this function completes. If the lwb is
2089 2213   * not issued, we rely on future calls to zil_commit_writer() to issue
2090 2214   * the lwb, or the timeout mechanism found in zil_commit_waiter().
2091 2215   */
2092 2216  static void
2093 2217  zil_commit_writer(zilog_t *zilog, zil_commit_waiter_t *zcw)
2094 2218  {
2095 2219          ASSERT(!MUTEX_HELD(&zilog->zl_lock));
2096 2220          ASSERT(spa_writeable(zilog->zl_spa));
2097 2221  
2098 2222          mutex_enter(&zilog->zl_issuer_lock);
2099 2223  
2100 2224          if (zcw->zcw_lwb != NULL || zcw->zcw_done) {
2101 2225                  /*
2102 2226                   * It's possible that, while we were waiting to acquire
2103 2227                   * the "zl_issuer_lock", another thread committed this
2104 2228                   * waiter to an lwb. If that occurs, we bail out early,
2105 2229                   * without processing any of the zilog's queue of itxs.
2106 2230                   *
2107 2231                   * On certain workloads and system configurations, the
2108 2232                   * "zl_issuer_lock" can become highly contended. In an
2109 2233                   * attempt to reduce this contention, we immediately drop
2110 2234                   * the lock if the waiter has already been processed.
2111 2235                   *
2112 2236                   * We've measured this optimization to reduce CPU spent
2113 2237                   * contending on this lock by up to 5%, using a system
2114 2238                   * with 32 CPUs, low latency storage (~50 usec writes),
2115 2239                   * and 1024 threads performing sync writes.
2116 2240                   */
2117 2241                  goto out;
2118 2242          }
2119 2243  
2120 2244          zil_get_commit_list(zilog);
2121 2245          zil_prune_commit_list(zilog);
2122 2246          zil_process_commit_list(zilog);
2123 2247  
2124 2248  out:
2125 2249          mutex_exit(&zilog->zl_issuer_lock);
2126 2250  }
2127 2251  
2128 2252  static void
2129 2253  zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw)
2130 2254  {
2131 2255          ASSERT(!MUTEX_HELD(&zilog->zl_issuer_lock));
2132 2256          ASSERT(MUTEX_HELD(&zcw->zcw_lock));
2133 2257          ASSERT3B(zcw->zcw_done, ==, B_FALSE);
2134 2258  
2135 2259          lwb_t *lwb = zcw->zcw_lwb;
2136 2260          ASSERT3P(lwb, !=, NULL);
  
    | 
      ↓ open down ↓ | 
    100 lines elided | 
    
      ↑ open up ↑ | 
  
2137 2261          ASSERT3S(lwb->lwb_state, !=, LWB_STATE_CLOSED);
2138 2262  
2139 2263          /*
2140 2264           * If the lwb has already been issued by another thread, we can
2141 2265           * immediately return since there's no work to be done (the
2142 2266           * point of this function is to issue the lwb). Additionally, we
2143 2267           * do this prior to acquiring the zl_issuer_lock, to avoid
2144 2268           * acquiring it when it's not necessary to do so.
2145 2269           */
2146 2270          if (lwb->lwb_state == LWB_STATE_ISSUED ||
2147      -            lwb->lwb_state == LWB_STATE_DONE)
     2271 +            lwb->lwb_state == LWB_STATE_WRITE_DONE ||
     2272 +            lwb->lwb_state == LWB_STATE_FLUSH_DONE)
2148 2273                  return;
2149 2274  
2150 2275          /*
2151 2276           * In order to call zil_lwb_write_issue() we must hold the
2152 2277           * zilog's "zl_issuer_lock". We can't simply acquire that lock,
2153 2278           * since we're already holding the commit waiter's "zcw_lock",
2154 2279           * and those two locks are aquired in the opposite order
2155 2280           * elsewhere.
2156 2281           */
2157 2282          mutex_exit(&zcw->zcw_lock);
2158 2283          mutex_enter(&zilog->zl_issuer_lock);
2159 2284          mutex_enter(&zcw->zcw_lock);
2160 2285  
2161 2286          /*
2162 2287           * Since we just dropped and re-acquired the commit waiter's
2163 2288           * lock, we have to re-check to see if the waiter was marked
2164 2289           * "done" during that process. If the waiter was marked "done",
2165 2290           * the "lwb" pointer is no longer valid (it can be free'd after
2166 2291           * the waiter is marked "done"), so without this check we could
2167 2292           * wind up with a use-after-free error below.
2168 2293           */
2169 2294          if (zcw->zcw_done)
2170 2295                  goto out;
2171 2296  
2172 2297          ASSERT3P(lwb, ==, zcw->zcw_lwb);
2173 2298  
2174 2299          /*
2175 2300           * We've already checked this above, but since we hadn't acquired
2176 2301           * the zilog's zl_issuer_lock, we have to perform this check a
2177 2302           * second time while holding the lock.
2178 2303           *
2179 2304           * We don't need to hold the zl_lock since the lwb cannot transition
2180 2305           * from OPENED to ISSUED while we hold the zl_issuer_lock. The lwb
2181 2306           * _can_ transition from ISSUED to DONE, but it's OK to race with
2182 2307           * that transition since we treat the lwb the same, whether it's in
2183 2308           * the ISSUED or DONE states.
2184 2309           *
  
    | 
      ↓ open down ↓ | 
    27 lines elided | 
    
      ↑ open up ↑ | 
  
2185 2310           * The important thing, is we treat the lwb differently depending on
2186 2311           * if it's ISSUED or OPENED, and block any other threads that might
2187 2312           * attempt to issue this lwb. For that reason we hold the
2188 2313           * zl_issuer_lock when checking the lwb_state; we must not call
2189 2314           * zil_lwb_write_issue() if the lwb had already been issued.
2190 2315           *
2191 2316           * See the comment above the lwb_state_t structure definition for
2192 2317           * more details on the lwb states, and locking requirements.
2193 2318           */
2194 2319          if (lwb->lwb_state == LWB_STATE_ISSUED ||
2195      -            lwb->lwb_state == LWB_STATE_DONE)
     2320 +            lwb->lwb_state == LWB_STATE_WRITE_DONE ||
     2321 +            lwb->lwb_state == LWB_STATE_FLUSH_DONE)
2196 2322                  goto out;
2197 2323  
2198 2324          ASSERT3S(lwb->lwb_state, ==, LWB_STATE_OPENED);
2199 2325  
2200 2326          /*
2201 2327           * As described in the comments above zil_commit_waiter() and
2202 2328           * zil_process_commit_list(), we need to issue this lwb's zio
2203 2329           * since we've reached the commit waiter's timeout and it still
2204 2330           * hasn't been issued.
2205 2331           */
2206 2332          lwb_t *nlwb = zil_lwb_write_issue(zilog, lwb);
2207 2333  
2208 2334          ASSERT3S(lwb->lwb_state, !=, LWB_STATE_OPENED);
2209 2335  
2210 2336          /*
2211 2337           * Since the lwb's zio hadn't been issued by the time this thread
2212 2338           * reached its timeout, we reset the zilog's "zl_cur_used" field
2213 2339           * to influence the zil block size selection algorithm.
2214 2340           *
2215 2341           * By having to issue the lwb's zio here, it means the size of the
2216 2342           * lwb was too large, given the incoming throughput of itxs.  By
2217 2343           * setting "zl_cur_used" to zero, we communicate this fact to the
2218 2344           * block size selection algorithm, so it can take this informaiton
2219 2345           * into account, and potentially select a smaller size for the
2220 2346           * next lwb block that is allocated.
2221 2347           */
2222 2348          zilog->zl_cur_used = 0;
2223 2349  
2224 2350          if (nlwb == NULL) {
2225 2351                  /*
2226 2352                   * When zil_lwb_write_issue() returns NULL, this
2227 2353                   * indicates zio_alloc_zil() failed to allocate the
2228 2354                   * "next" lwb on-disk. When this occurs, the ZIL write
2229 2355                   * pipeline must be stalled; see the comment within the
2230 2356                   * zil_commit_writer_stall() function for more details.
2231 2357                   *
2232 2358                   * We must drop the commit waiter's lock prior to
2233 2359                   * calling zil_commit_writer_stall() or else we can wind
2234 2360                   * up with the following deadlock:
2235 2361                   *
2236 2362                   * - This thread is waiting for the txg to sync while
2237 2363                   *   holding the waiter's lock; txg_wait_synced() is
2238 2364                   *   used within txg_commit_writer_stall().
2239 2365                   *
2240 2366                   * - The txg can't sync because it is waiting for this
2241 2367                   *   lwb's zio callback to call dmu_tx_commit().
2242 2368                   *
2243 2369                   * - The lwb's zio callback can't call dmu_tx_commit()
2244 2370                   *   because it's blocked trying to acquire the waiter's
2245 2371                   *   lock, which occurs prior to calling dmu_tx_commit()
2246 2372                   */
2247 2373                  mutex_exit(&zcw->zcw_lock);
2248 2374                  zil_commit_writer_stall(zilog);
2249 2375                  mutex_enter(&zcw->zcw_lock);
2250 2376          }
2251 2377  
2252 2378  out:
2253 2379          mutex_exit(&zilog->zl_issuer_lock);
2254 2380          ASSERT(MUTEX_HELD(&zcw->zcw_lock));
2255 2381  }
2256 2382  
2257 2383  /*
2258 2384   * This function is responsible for performing the following two tasks:
2259 2385   *
2260 2386   * 1. its primary responsibility is to block until the given "commit
2261 2387   *    waiter" is considered "done".
2262 2388   *
2263 2389   * 2. its secondary responsibility is to issue the zio for the lwb that
2264 2390   *    the given "commit waiter" is waiting on, if this function has
2265 2391   *    waited "long enough" and the lwb is still in the "open" state.
2266 2392   *
2267 2393   * Given a sufficient amount of itxs being generated and written using
2268 2394   * the ZIL, the lwb's zio will be issued via the zil_lwb_commit()
2269 2395   * function. If this does not occur, this secondary responsibility will
2270 2396   * ensure the lwb is issued even if there is not other synchronous
2271 2397   * activity on the system.
2272 2398   *
2273 2399   * For more details, see zil_process_commit_list(); more specifically,
2274 2400   * the comment at the bottom of that function.
2275 2401   */
2276 2402  static void
2277 2403  zil_commit_waiter(zilog_t *zilog, zil_commit_waiter_t *zcw)
2278 2404  {
2279 2405          ASSERT(!MUTEX_HELD(&zilog->zl_lock));
2280 2406          ASSERT(!MUTEX_HELD(&zilog->zl_issuer_lock));
2281 2407          ASSERT(spa_writeable(zilog->zl_spa));
2282 2408  
2283 2409          mutex_enter(&zcw->zcw_lock);
2284 2410  
2285 2411          /*
2286 2412           * The timeout is scaled based on the lwb latency to avoid
2287 2413           * significantly impacting the latency of each individual itx.
2288 2414           * For more details, see the comment at the bottom of the
2289 2415           * zil_process_commit_list() function.
2290 2416           */
2291 2417          int pct = MAX(zfs_commit_timeout_pct, 1);
2292 2418          hrtime_t sleep = (zilog->zl_last_lwb_latency * pct) / 100;
2293 2419          hrtime_t wakeup = gethrtime() + sleep;
2294 2420          boolean_t timedout = B_FALSE;
2295 2421  
2296 2422          while (!zcw->zcw_done) {
2297 2423                  ASSERT(MUTEX_HELD(&zcw->zcw_lock));
2298 2424  
2299 2425                  lwb_t *lwb = zcw->zcw_lwb;
2300 2426  
2301 2427                  /*
2302 2428                   * Usually, the waiter will have a non-NULL lwb field here,
2303 2429                   * but it's possible for it to be NULL as a result of
2304 2430                   * zil_commit() racing with spa_sync().
2305 2431                   *
2306 2432                   * When zil_clean() is called, it's possible for the itxg
2307 2433                   * list (which may be cleaned via a taskq) to contain
2308 2434                   * commit itxs. When this occurs, the commit waiters linked
2309 2435                   * off of these commit itxs will not be committed to an
2310 2436                   * lwb.  Additionally, these commit waiters will not be
2311 2437                   * marked done until zil_commit_waiter_skip() is called via
2312 2438                   * zil_itxg_clean().
2313 2439                   *
2314 2440                   * Thus, it's possible for this commit waiter (i.e. the
2315 2441                   * "zcw" variable) to be found in this "in between" state;
2316 2442                   * where it's "zcw_lwb" field is NULL, and it hasn't yet
2317 2443                   * been skipped, so it's "zcw_done" field is still B_FALSE.
2318 2444                   */
2319 2445                  IMPLY(lwb != NULL, lwb->lwb_state != LWB_STATE_CLOSED);
2320 2446  
2321 2447                  if (lwb != NULL && lwb->lwb_state == LWB_STATE_OPENED) {
2322 2448                          ASSERT3B(timedout, ==, B_FALSE);
2323 2449  
2324 2450                          /*
2325 2451                           * If the lwb hasn't been issued yet, then we
2326 2452                           * need to wait with a timeout, in case this
2327 2453                           * function needs to issue the lwb after the
2328 2454                           * timeout is reached; responsibility (2) from
2329 2455                           * the comment above this function.
2330 2456                           */
2331 2457                          clock_t timeleft = cv_timedwait_hires(&zcw->zcw_cv,
2332 2458                              &zcw->zcw_lock, wakeup, USEC2NSEC(1),
2333 2459                              CALLOUT_FLAG_ABSOLUTE);
2334 2460  
2335 2461                          if (timeleft >= 0 || zcw->zcw_done)
2336 2462                                  continue;
2337 2463  
2338 2464                          timedout = B_TRUE;
2339 2465                          zil_commit_waiter_timeout(zilog, zcw);
2340 2466  
2341 2467                          if (!zcw->zcw_done) {
2342 2468                                  /*
2343 2469                                   * If the commit waiter has already been
2344 2470                                   * marked "done", it's possible for the
2345 2471                                   * waiter's lwb structure to have already
2346 2472                                   * been freed.  Thus, we can only reliably
2347 2473                                   * make these assertions if the waiter
2348 2474                                   * isn't done.
2349 2475                                   */
2350 2476                                  ASSERT3P(lwb, ==, zcw->zcw_lwb);
2351 2477                                  ASSERT3S(lwb->lwb_state, !=, LWB_STATE_OPENED);
2352 2478                          }
2353 2479                  } else {
2354 2480                          /*
2355 2481                           * If the lwb isn't open, then it must have already
2356 2482                           * been issued. In that case, there's no need to
2357 2483                           * use a timeout when waiting for the lwb to
  
    | 
      ↓ open down ↓ | 
    152 lines elided | 
    
      ↑ open up ↑ | 
  
2358 2484                           * complete.
2359 2485                           *
2360 2486                           * Additionally, if the lwb is NULL, the waiter
2361 2487                           * will soon be signalled and marked done via
2362 2488                           * zil_clean() and zil_itxg_clean(), so no timeout
2363 2489                           * is required.
2364 2490                           */
2365 2491  
2366 2492                          IMPLY(lwb != NULL,
2367 2493                              lwb->lwb_state == LWB_STATE_ISSUED ||
2368      -                            lwb->lwb_state == LWB_STATE_DONE);
     2494 +                            lwb->lwb_state == LWB_STATE_WRITE_DONE ||
     2495 +                            lwb->lwb_state == LWB_STATE_FLUSH_DONE);
2369 2496                          cv_wait(&zcw->zcw_cv, &zcw->zcw_lock);
2370 2497                  }
2371 2498          }
2372 2499  
2373 2500          mutex_exit(&zcw->zcw_lock);
2374 2501  }
2375 2502  
2376 2503  static zil_commit_waiter_t *
2377 2504  zil_alloc_commit_waiter()
2378 2505  {
2379 2506          zil_commit_waiter_t *zcw = kmem_cache_alloc(zil_zcw_cache, KM_SLEEP);
2380 2507  
2381 2508          cv_init(&zcw->zcw_cv, NULL, CV_DEFAULT, NULL);
2382 2509          mutex_init(&zcw->zcw_lock, NULL, MUTEX_DEFAULT, NULL);
2383 2510          list_link_init(&zcw->zcw_node);
2384 2511          zcw->zcw_lwb = NULL;
2385 2512          zcw->zcw_done = B_FALSE;
2386 2513          zcw->zcw_zio_error = 0;
2387 2514  
2388 2515          return (zcw);
2389 2516  }
2390 2517  
2391 2518  static void
2392 2519  zil_free_commit_waiter(zil_commit_waiter_t *zcw)
2393 2520  {
2394 2521          ASSERT(!list_link_active(&zcw->zcw_node));
2395 2522          ASSERT3P(zcw->zcw_lwb, ==, NULL);
2396 2523          ASSERT3B(zcw->zcw_done, ==, B_TRUE);
2397 2524          mutex_destroy(&zcw->zcw_lock);
2398 2525          cv_destroy(&zcw->zcw_cv);
2399 2526          kmem_cache_free(zil_zcw_cache, zcw);
2400 2527  }
2401 2528  
2402 2529  /*
2403 2530   * This function is used to create a TX_COMMIT itx and assign it. This
2404 2531   * way, it will be linked into the ZIL's list of synchronous itxs, and
2405 2532   * then later committed to an lwb (or skipped) when
2406 2533   * zil_process_commit_list() is called.
2407 2534   */
2408 2535  static void
2409 2536  zil_commit_itx_assign(zilog_t *zilog, zil_commit_waiter_t *zcw)
2410 2537  {
2411 2538          dmu_tx_t *tx = dmu_tx_create(zilog->zl_os);
2412 2539          VERIFY0(dmu_tx_assign(tx, TXG_WAIT));
2413 2540  
2414 2541          itx_t *itx = zil_itx_create(TX_COMMIT, sizeof (lr_t));
2415 2542          itx->itx_sync = B_TRUE;
2416 2543          itx->itx_private = zcw;
2417 2544  
2418 2545          zil_itx_assign(zilog, itx, tx);
2419 2546  
2420 2547          dmu_tx_commit(tx);
2421 2548  }
2422 2549  
2423 2550  /*
2424 2551   * Commit ZFS Intent Log transactions (itxs) to stable storage.
2425 2552   *
2426 2553   * When writing ZIL transactions to the on-disk representation of the
2427 2554   * ZIL, the itxs are committed to a Log Write Block (lwb). Multiple
2428 2555   * itxs can be committed to a single lwb. Once a lwb is written and
2429 2556   * committed to stable storage (i.e. the lwb is written, and vdevs have
2430 2557   * been flushed), each itx that was committed to that lwb is also
2431 2558   * considered to be committed to stable storage.
2432 2559   *
2433 2560   * When an itx is committed to an lwb, the log record (lr_t) contained
2434 2561   * by the itx is copied into the lwb's zio buffer, and once this buffer
2435 2562   * is written to disk, it becomes an on-disk ZIL block.
2436 2563   *
2437 2564   * As itxs are generated, they're inserted into the ZIL's queue of
2438 2565   * uncommitted itxs. The semantics of zil_commit() are such that it will
2439 2566   * block until all itxs that were in the queue when it was called, are
2440 2567   * committed to stable storage.
2441 2568   *
2442 2569   * If "foid" is zero, this means all "synchronous" and "asynchronous"
2443 2570   * itxs, for all objects in the dataset, will be committed to stable
2444 2571   * storage prior to zil_commit() returning. If "foid" is non-zero, all
2445 2572   * "synchronous" itxs for all objects, but only "asynchronous" itxs
2446 2573   * that correspond to the foid passed in, will be committed to stable
2447 2574   * storage prior to zil_commit() returning.
2448 2575   *
2449 2576   * Generally speaking, when zil_commit() is called, the consumer doesn't
2450 2577   * actually care about _all_ of the uncommitted itxs. Instead, they're
2451 2578   * simply trying to waiting for a specific itx to be committed to disk,
2452 2579   * but the interface(s) for interacting with the ZIL don't allow such
2453 2580   * fine-grained communication. A better interface would allow a consumer
2454 2581   * to create and assign an itx, and then pass a reference to this itx to
2455 2582   * zil_commit(); such that zil_commit() would return as soon as that
2456 2583   * specific itx was committed to disk (instead of waiting for _all_
2457 2584   * itxs to be committed).
2458 2585   *
2459 2586   * When a thread calls zil_commit() a special "commit itx" will be
2460 2587   * generated, along with a corresponding "waiter" for this commit itx.
2461 2588   * zil_commit() will wait on this waiter's CV, such that when the waiter
2462 2589   * is marked done, and signalled, zil_commit() will return.
2463 2590   *
2464 2591   * This commit itx is inserted into the queue of uncommitted itxs. This
2465 2592   * provides an easy mechanism for determining which itxs were in the
2466 2593   * queue prior to zil_commit() having been called, and which itxs were
2467 2594   * added after zil_commit() was called.
2468 2595   *
2469 2596   * The commit it is special; it doesn't have any on-disk representation.
2470 2597   * When a commit itx is "committed" to an lwb, the waiter associated
2471 2598   * with it is linked onto the lwb's list of waiters. Then, when that lwb
2472 2599   * completes, each waiter on the lwb's list is marked done and signalled
2473 2600   * -- allowing the thread waiting on the waiter to return from zil_commit().
2474 2601   *
2475 2602   * It's important to point out a few critical factors that allow us
2476 2603   * to make use of the commit itxs, commit waiters, per-lwb lists of
2477 2604   * commit waiters, and zio completion callbacks like we're doing:
2478 2605   *
2479 2606   *   1. The list of waiters for each lwb is traversed, and each commit
2480 2607   *      waiter is marked "done" and signalled, in the zio completion
2481 2608   *      callback of the lwb's zio[*].
2482 2609   *
2483 2610   *      * Actually, the waiters are signalled in the zio completion
2484 2611   *        callback of the root zio for the DKIOCFLUSHWRITECACHE commands
2485 2612   *        that are sent to the vdevs upon completion of the lwb zio.
2486 2613   *
2487 2614   *   2. When the itxs are inserted into the ZIL's queue of uncommitted
2488 2615   *      itxs, the order in which they are inserted is preserved[*]; as
2489 2616   *      itxs are added to the queue, they are added to the tail of
2490 2617   *      in-memory linked lists.
2491 2618   *
2492 2619   *      When committing the itxs to lwbs (to be written to disk), they
2493 2620   *      are committed in the same order in which the itxs were added to
2494 2621   *      the uncommitted queue's linked list(s); i.e. the linked list of
2495 2622   *      itxs to commit is traversed from head to tail, and each itx is
2496 2623   *      committed to an lwb in that order.
2497 2624   *
2498 2625   *      * To clarify:
2499 2626   *
2500 2627   *        - the order of "sync" itxs is preserved w.r.t. other
2501 2628   *          "sync" itxs, regardless of the corresponding objects.
2502 2629   *        - the order of "async" itxs is preserved w.r.t. other
2503 2630   *          "async" itxs corresponding to the same object.
2504 2631   *        - the order of "async" itxs is *not* preserved w.r.t. other
2505 2632   *          "async" itxs corresponding to different objects.
2506 2633   *        - the order of "sync" itxs w.r.t. "async" itxs (or vice
2507 2634   *          versa) is *not* preserved, even for itxs that correspond
2508 2635   *          to the same object.
2509 2636   *
2510 2637   *      For more details, see: zil_itx_assign(), zil_async_to_sync(),
2511 2638   *      zil_get_commit_list(), and zil_process_commit_list().
2512 2639   *
2513 2640   *   3. The lwbs represent a linked list of blocks on disk. Thus, any
2514 2641   *      lwb cannot be considered committed to stable storage, until its
2515 2642   *      "previous" lwb is also committed to stable storage. This fact,
2516 2643   *      coupled with the fact described above, means that itxs are
2517 2644   *      committed in (roughly) the order in which they were generated.
2518 2645   *      This is essential because itxs are dependent on prior itxs.
2519 2646   *      Thus, we *must not* deem an itx as being committed to stable
2520 2647   *      storage, until *all* prior itxs have also been committed to
2521 2648   *      stable storage.
2522 2649   *
2523 2650   *      To enforce this ordering of lwb zio's, while still leveraging as
2524 2651   *      much of the underlying storage performance as possible, we rely
2525 2652   *      on two fundamental concepts:
2526 2653   *
2527 2654   *          1. The creation and issuance of lwb zio's is protected by
2528 2655   *             the zilog's "zl_issuer_lock", which ensures only a single
2529 2656   *             thread is creating and/or issuing lwb's at a time
2530 2657   *          2. The "previous" lwb is a child of the "current" lwb
2531 2658   *             (leveraging the zio parent-child depenency graph)
2532 2659   *
2533 2660   *      By relying on this parent-child zio relationship, we can have
2534 2661   *      many lwb zio's concurrently issued to the underlying storage,
2535 2662   *      but the order in which they complete will be the same order in
2536 2663   *      which they were created.
2537 2664   */
2538 2665  void
2539 2666  zil_commit(zilog_t *zilog, uint64_t foid)
2540 2667  {
2541 2668          /*
2542 2669           * We should never attempt to call zil_commit on a snapshot for
2543 2670           * a couple of reasons:
2544 2671           *
2545 2672           * 1. A snapshot may never be modified, thus it cannot have any
2546 2673           *    in-flight itxs that would have modified the dataset.
2547 2674           *
2548 2675           * 2. By design, when zil_commit() is called, a commit itx will
2549 2676           *    be assigned to this zilog; as a result, the zilog will be
2550 2677           *    dirtied. We must not dirty the zilog of a snapshot; there's
2551 2678           *    checks in the code that enforce this invariant, and will
2552 2679           *    cause a panic if it's not upheld.
2553 2680           */
2554 2681          ASSERT3B(dmu_objset_is_snapshot(zilog->zl_os), ==, B_FALSE);
2555 2682  
2556 2683          if (zilog->zl_sync == ZFS_SYNC_DISABLED)
2557 2684                  return;
2558 2685  
2559 2686          if (!spa_writeable(zilog->zl_spa)) {
2560 2687                  /*
2561 2688                   * If the SPA is not writable, there should never be any
2562 2689                   * pending itxs waiting to be committed to disk. If that
2563 2690                   * weren't true, we'd skip writing those itxs out, and
2564 2691                   * would break the sematics of zil_commit(); thus, we're
2565 2692                   * verifying that truth before we return to the caller.
2566 2693                   */
2567 2694                  ASSERT(list_is_empty(&zilog->zl_lwb_list));
2568 2695                  ASSERT3P(zilog->zl_last_lwb_opened, ==, NULL);
2569 2696                  for (int i = 0; i < TXG_SIZE; i++)
2570 2697                          ASSERT3P(zilog->zl_itxg[i].itxg_itxs, ==, NULL);
2571 2698                  return;
2572 2699          }
2573 2700  
2574 2701          /*
2575 2702           * If the ZIL is suspended, we don't want to dirty it by calling
2576 2703           * zil_commit_itx_assign() below, nor can we write out
2577 2704           * lwbs like would be done in zil_commit_write(). Thus, we
2578 2705           * simply rely on txg_wait_synced() to maintain the necessary
2579 2706           * semantics, and avoid calling those functions altogether.
2580 2707           */
2581 2708          if (zilog->zl_suspend > 0) {
2582 2709                  txg_wait_synced(zilog->zl_dmu_pool, 0);
2583 2710                  return;
2584 2711          }
2585 2712  
2586 2713          zil_commit_impl(zilog, foid);
2587 2714  }
2588 2715  
2589 2716  void
2590 2717  zil_commit_impl(zilog_t *zilog, uint64_t foid)
2591 2718  {
2592 2719          /*
2593 2720           * Move the "async" itxs for the specified foid to the "sync"
2594 2721           * queues, such that they will be later committed (or skipped)
2595 2722           * to an lwb when zil_process_commit_list() is called.
2596 2723           *
2597 2724           * Since these "async" itxs must be committed prior to this
2598 2725           * call to zil_commit returning, we must perform this operation
2599 2726           * before we call zil_commit_itx_assign().
2600 2727           */
2601 2728          zil_async_to_sync(zilog, foid);
2602 2729  
2603 2730          /*
2604 2731           * We allocate a new "waiter" structure which will initially be
2605 2732           * linked to the commit itx using the itx's "itx_private" field.
2606 2733           * Since the commit itx doesn't represent any on-disk state,
2607 2734           * when it's committed to an lwb, rather than copying the its
2608 2735           * lr_t into the lwb's buffer, the commit itx's "waiter" will be
2609 2736           * added to the lwb's list of waiters. Then, when the lwb is
2610 2737           * committed to stable storage, each waiter in the lwb's list of
2611 2738           * waiters will be marked "done", and signalled.
2612 2739           *
2613 2740           * We must create the waiter and assign the commit itx prior to
2614 2741           * calling zil_commit_writer(), or else our specific commit itx
2615 2742           * is not guaranteed to be committed to an lwb prior to calling
2616 2743           * zil_commit_waiter().
2617 2744           */
2618 2745          zil_commit_waiter_t *zcw = zil_alloc_commit_waiter();
2619 2746          zil_commit_itx_assign(zilog, zcw);
2620 2747  
2621 2748          zil_commit_writer(zilog, zcw);
2622 2749          zil_commit_waiter(zilog, zcw);
2623 2750  
2624 2751          if (zcw->zcw_zio_error != 0) {
2625 2752                  /*
2626 2753                   * If there was an error writing out the ZIL blocks that
2627 2754                   * this thread is waiting on, then we fallback to
2628 2755                   * relying on spa_sync() to write out the data this
2629 2756                   * thread is waiting on. Obviously this has performance
2630 2757                   * implications, but the expectation is for this to be
2631 2758                   * an exceptional case, and shouldn't occur often.
2632 2759                   */
2633 2760                  DTRACE_PROBE2(zil__commit__io__error,
2634 2761                      zilog_t *, zilog, zil_commit_waiter_t *, zcw);
2635 2762                  txg_wait_synced(zilog->zl_dmu_pool, 0);
2636 2763          }
2637 2764  
2638 2765          zil_free_commit_waiter(zcw);
2639 2766  }
2640 2767  
2641 2768  /*
2642 2769   * Called in syncing context to free committed log blocks and update log header.
2643 2770   */
2644 2771  void
2645 2772  zil_sync(zilog_t *zilog, dmu_tx_t *tx)
2646 2773  {
2647 2774          zil_header_t *zh = zil_header_in_syncing_context(zilog);
2648 2775          uint64_t txg = dmu_tx_get_txg(tx);
2649 2776          spa_t *spa = zilog->zl_spa;
2650 2777          uint64_t *replayed_seq = &zilog->zl_replayed_seq[txg & TXG_MASK];
2651 2778          lwb_t *lwb;
2652 2779  
2653 2780          /*
2654 2781           * We don't zero out zl_destroy_txg, so make sure we don't try
2655 2782           * to destroy it twice.
2656 2783           */
2657 2784          if (spa_sync_pass(spa) != 1)
2658 2785                  return;
2659 2786  
2660 2787          mutex_enter(&zilog->zl_lock);
2661 2788  
2662 2789          ASSERT(zilog->zl_stop_sync == 0);
2663 2790  
2664 2791          if (*replayed_seq != 0) {
2665 2792                  ASSERT(zh->zh_replay_seq < *replayed_seq);
2666 2793                  zh->zh_replay_seq = *replayed_seq;
2667 2794                  *replayed_seq = 0;
2668 2795          }
2669 2796  
2670 2797          if (zilog->zl_destroy_txg == txg) {
2671 2798                  blkptr_t blk = zh->zh_log;
2672 2799  
2673 2800                  ASSERT(list_head(&zilog->zl_lwb_list) == NULL);
2674 2801  
2675 2802                  bzero(zh, sizeof (zil_header_t));
2676 2803                  bzero(zilog->zl_replayed_seq, sizeof (zilog->zl_replayed_seq));
2677 2804  
2678 2805                  if (zilog->zl_keep_first) {
2679 2806                          /*
2680 2807                           * If this block was part of log chain that couldn't
2681 2808                           * be claimed because a device was missing during
2682 2809                           * zil_claim(), but that device later returns,
2683 2810                           * then this block could erroneously appear valid.
2684 2811                           * To guard against this, assign a new GUID to the new
2685 2812                           * log chain so it doesn't matter what blk points to.
2686 2813                           */
2687 2814                          zil_init_log_chain(zilog, &blk);
2688 2815                          zh->zh_log = blk;
2689 2816                  }
2690 2817          }
2691 2818  
2692 2819          while ((lwb = list_head(&zilog->zl_lwb_list)) != NULL) {
2693 2820                  zh->zh_log = lwb->lwb_blk;
2694 2821                  if (lwb->lwb_buf != NULL || lwb->lwb_max_txg > txg)
2695 2822                          break;
2696 2823                  list_remove(&zilog->zl_lwb_list, lwb);
2697 2824                  zio_free(spa, txg, &lwb->lwb_blk);
2698 2825                  zil_free_lwb(zilog, lwb);
2699 2826  
2700 2827                  /*
2701 2828                   * If we don't have anything left in the lwb list then
2702 2829                   * we've had an allocation failure and we need to zero
2703 2830                   * out the zil_header blkptr so that we don't end
2704 2831                   * up freeing the same block twice.
2705 2832                   */
2706 2833                  if (list_head(&zilog->zl_lwb_list) == NULL)
2707 2834                          BP_ZERO(&zh->zh_log);
2708 2835          }
2709 2836          mutex_exit(&zilog->zl_lock);
2710 2837  }
2711 2838  
2712 2839  /* ARGSUSED */
2713 2840  static int
2714 2841  zil_lwb_cons(void *vbuf, void *unused, int kmflag)
2715 2842  {
2716 2843          lwb_t *lwb = vbuf;
2717 2844          list_create(&lwb->lwb_waiters, sizeof (zil_commit_waiter_t),
2718 2845              offsetof(zil_commit_waiter_t, zcw_node));
2719 2846          avl_create(&lwb->lwb_vdev_tree, zil_lwb_vdev_compare,
2720 2847              sizeof (zil_vdev_node_t), offsetof(zil_vdev_node_t, zv_node));
2721 2848          mutex_init(&lwb->lwb_vdev_lock, NULL, MUTEX_DEFAULT, NULL);
2722 2849          return (0);
2723 2850  }
2724 2851  
2725 2852  /* ARGSUSED */
2726 2853  static void
2727 2854  zil_lwb_dest(void *vbuf, void *unused)
2728 2855  {
2729 2856          lwb_t *lwb = vbuf;
2730 2857          mutex_destroy(&lwb->lwb_vdev_lock);
2731 2858          avl_destroy(&lwb->lwb_vdev_tree);
2732 2859          list_destroy(&lwb->lwb_waiters);
2733 2860  }
2734 2861  
2735 2862  void
2736 2863  zil_init(void)
2737 2864  {
2738 2865          zil_lwb_cache = kmem_cache_create("zil_lwb_cache",
2739 2866              sizeof (lwb_t), 0, zil_lwb_cons, zil_lwb_dest, NULL, NULL, NULL, 0);
2740 2867  
2741 2868          zil_zcw_cache = kmem_cache_create("zil_zcw_cache",
2742 2869              sizeof (zil_commit_waiter_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
2743 2870  }
2744 2871  
2745 2872  void
2746 2873  zil_fini(void)
2747 2874  {
2748 2875          kmem_cache_destroy(zil_zcw_cache);
2749 2876          kmem_cache_destroy(zil_lwb_cache);
2750 2877  }
2751 2878  
2752 2879  void
2753 2880  zil_set_sync(zilog_t *zilog, uint64_t sync)
2754 2881  {
2755 2882          zilog->zl_sync = sync;
2756 2883  }
2757 2884  
2758 2885  void
2759 2886  zil_set_logbias(zilog_t *zilog, uint64_t logbias)
2760 2887  {
2761 2888          zilog->zl_logbias = logbias;
2762 2889  }
2763 2890  
2764 2891  zilog_t *
2765 2892  zil_alloc(objset_t *os, zil_header_t *zh_phys)
2766 2893  {
2767 2894          zilog_t *zilog;
2768 2895  
2769 2896          zilog = kmem_zalloc(sizeof (zilog_t), KM_SLEEP);
2770 2897  
2771 2898          zilog->zl_header = zh_phys;
2772 2899          zilog->zl_os = os;
2773 2900          zilog->zl_spa = dmu_objset_spa(os);
2774 2901          zilog->zl_dmu_pool = dmu_objset_pool(os);
2775 2902          zilog->zl_destroy_txg = TXG_INITIAL - 1;
2776 2903          zilog->zl_logbias = dmu_objset_logbias(os);
2777 2904          zilog->zl_sync = dmu_objset_syncprop(os);
2778 2905          zilog->zl_dirty_max_txg = 0;
2779 2906          zilog->zl_last_lwb_opened = NULL;
2780 2907          zilog->zl_last_lwb_latency = 0;
2781 2908  
2782 2909          mutex_init(&zilog->zl_lock, NULL, MUTEX_DEFAULT, NULL);
2783 2910          mutex_init(&zilog->zl_issuer_lock, NULL, MUTEX_DEFAULT, NULL);
2784 2911  
2785 2912          for (int i = 0; i < TXG_SIZE; i++) {
2786 2913                  mutex_init(&zilog->zl_itxg[i].itxg_lock, NULL,
2787 2914                      MUTEX_DEFAULT, NULL);
2788 2915          }
2789 2916  
2790 2917          list_create(&zilog->zl_lwb_list, sizeof (lwb_t),
2791 2918              offsetof(lwb_t, lwb_node));
2792 2919  
2793 2920          list_create(&zilog->zl_itx_commit_list, sizeof (itx_t),
2794 2921              offsetof(itx_t, itx_node));
2795 2922  
2796 2923          cv_init(&zilog->zl_cv_suspend, NULL, CV_DEFAULT, NULL);
2797 2924  
2798 2925          return (zilog);
2799 2926  }
2800 2927  
2801 2928  void
2802 2929  zil_free(zilog_t *zilog)
2803 2930  {
2804 2931          zilog->zl_stop_sync = 1;
2805 2932  
2806 2933          ASSERT0(zilog->zl_suspend);
2807 2934          ASSERT0(zilog->zl_suspending);
2808 2935  
2809 2936          ASSERT(list_is_empty(&zilog->zl_lwb_list));
2810 2937          list_destroy(&zilog->zl_lwb_list);
2811 2938  
2812 2939          ASSERT(list_is_empty(&zilog->zl_itx_commit_list));
2813 2940          list_destroy(&zilog->zl_itx_commit_list);
2814 2941  
2815 2942          for (int i = 0; i < TXG_SIZE; i++) {
2816 2943                  /*
2817 2944                   * It's possible for an itx to be generated that doesn't dirty
2818 2945                   * a txg (e.g. ztest TX_TRUNCATE). So there's no zil_clean()
2819 2946                   * callback to remove the entry. We remove those here.
2820 2947                   *
2821 2948                   * Also free up the ziltest itxs.
2822 2949                   */
2823 2950                  if (zilog->zl_itxg[i].itxg_itxs)
2824 2951                          zil_itxg_clean(zilog->zl_itxg[i].itxg_itxs);
2825 2952                  mutex_destroy(&zilog->zl_itxg[i].itxg_lock);
2826 2953          }
2827 2954  
2828 2955          mutex_destroy(&zilog->zl_issuer_lock);
2829 2956          mutex_destroy(&zilog->zl_lock);
2830 2957  
2831 2958          cv_destroy(&zilog->zl_cv_suspend);
2832 2959  
2833 2960          kmem_free(zilog, sizeof (zilog_t));
2834 2961  }
2835 2962  
2836 2963  /*
2837 2964   * Open an intent log.
2838 2965   */
2839 2966  zilog_t *
2840 2967  zil_open(objset_t *os, zil_get_data_t *get_data)
2841 2968  {
2842 2969          zilog_t *zilog = dmu_objset_zil(os);
2843 2970  
2844 2971          ASSERT3P(zilog->zl_get_data, ==, NULL);
2845 2972          ASSERT3P(zilog->zl_last_lwb_opened, ==, NULL);
2846 2973          ASSERT(list_is_empty(&zilog->zl_lwb_list));
2847 2974  
2848 2975          zilog->zl_get_data = get_data;
2849 2976  
2850 2977          return (zilog);
2851 2978  }
2852 2979  
2853 2980  /*
2854 2981   * Close an intent log.
2855 2982   */
2856 2983  void
2857 2984  zil_close(zilog_t *zilog)
2858 2985  {
2859 2986          lwb_t *lwb;
2860 2987          uint64_t txg;
2861 2988  
2862 2989          if (!dmu_objset_is_snapshot(zilog->zl_os)) {
2863 2990                  zil_commit(zilog, 0);
2864 2991          } else {
2865 2992                  ASSERT3P(list_tail(&zilog->zl_lwb_list), ==, NULL);
2866 2993                  ASSERT0(zilog->zl_dirty_max_txg);
2867 2994                  ASSERT3B(zilog_is_dirty(zilog), ==, B_FALSE);
2868 2995          }
2869 2996  
2870 2997          mutex_enter(&zilog->zl_lock);
2871 2998          lwb = list_tail(&zilog->zl_lwb_list);
2872 2999          if (lwb == NULL)
2873 3000                  txg = zilog->zl_dirty_max_txg;
2874 3001          else
2875 3002                  txg = MAX(zilog->zl_dirty_max_txg, lwb->lwb_max_txg);
2876 3003          mutex_exit(&zilog->zl_lock);
2877 3004  
2878 3005          /*
2879 3006           * We need to use txg_wait_synced() to wait long enough for the
2880 3007           * ZIL to be clean, and to wait for all pending lwbs to be
2881 3008           * written out.
2882 3009           */
2883 3010          if (txg != 0)
2884 3011                  txg_wait_synced(zilog->zl_dmu_pool, txg);
2885 3012  
2886 3013          if (zilog_is_dirty(zilog))
2887 3014                  zfs_dbgmsg("zil (%p) is dirty, txg %llu", zilog, txg);
2888 3015          VERIFY(!zilog_is_dirty(zilog));
2889 3016  
2890 3017          zilog->zl_get_data = NULL;
2891 3018  
2892 3019          /*
2893 3020           * We should have only one lwb left on the list; remove it now.
2894 3021           */
2895 3022          mutex_enter(&zilog->zl_lock);
2896 3023          lwb = list_head(&zilog->zl_lwb_list);
2897 3024          if (lwb != NULL) {
2898 3025                  ASSERT3P(lwb, ==, list_tail(&zilog->zl_lwb_list));
2899 3026                  ASSERT3S(lwb->lwb_state, !=, LWB_STATE_ISSUED);
2900 3027                  list_remove(&zilog->zl_lwb_list, lwb);
2901 3028                  zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
2902 3029                  zil_free_lwb(zilog, lwb);
2903 3030          }
2904 3031          mutex_exit(&zilog->zl_lock);
2905 3032  }
2906 3033  
2907 3034  static char *suspend_tag = "zil suspending";
2908 3035  
2909 3036  /*
2910 3037   * Suspend an intent log.  While in suspended mode, we still honor
2911 3038   * synchronous semantics, but we rely on txg_wait_synced() to do it.
2912 3039   * On old version pools, we suspend the log briefly when taking a
2913 3040   * snapshot so that it will have an empty intent log.
2914 3041   *
2915 3042   * Long holds are not really intended to be used the way we do here --
2916 3043   * held for such a short time.  A concurrent caller of dsl_dataset_long_held()
2917 3044   * could fail.  Therefore we take pains to only put a long hold if it is
2918 3045   * actually necessary.  Fortunately, it will only be necessary if the
2919 3046   * objset is currently mounted (or the ZVOL equivalent).  In that case it
2920 3047   * will already have a long hold, so we are not really making things any worse.
2921 3048   *
2922 3049   * Ideally, we would locate the existing long-holder (i.e. the zfsvfs_t or
2923 3050   * zvol_state_t), and use their mechanism to prevent their hold from being
2924 3051   * dropped (e.g. VFS_HOLD()).  However, that would be even more pain for
2925 3052   * very little gain.
2926 3053   *
2927 3054   * if cookiep == NULL, this does both the suspend & resume.
2928 3055   * Otherwise, it returns with the dataset "long held", and the cookie
2929 3056   * should be passed into zil_resume().
2930 3057   */
2931 3058  int
2932 3059  zil_suspend(const char *osname, void **cookiep)
2933 3060  {
2934 3061          objset_t *os;
2935 3062          zilog_t *zilog;
2936 3063          const zil_header_t *zh;
2937 3064          int error;
2938 3065  
2939 3066          error = dmu_objset_hold(osname, suspend_tag, &os);
2940 3067          if (error != 0)
2941 3068                  return (error);
2942 3069          zilog = dmu_objset_zil(os);
2943 3070  
2944 3071          mutex_enter(&zilog->zl_lock);
2945 3072          zh = zilog->zl_header;
2946 3073  
2947 3074          if (zh->zh_flags & ZIL_REPLAY_NEEDED) {         /* unplayed log */
2948 3075                  mutex_exit(&zilog->zl_lock);
2949 3076                  dmu_objset_rele(os, suspend_tag);
2950 3077                  return (SET_ERROR(EBUSY));
2951 3078          }
2952 3079  
2953 3080          /*
2954 3081           * Don't put a long hold in the cases where we can avoid it.  This
2955 3082           * is when there is no cookie so we are doing a suspend & resume
2956 3083           * (i.e. called from zil_vdev_offline()), and there's nothing to do
2957 3084           * for the suspend because it's already suspended, or there's no ZIL.
2958 3085           */
2959 3086          if (cookiep == NULL && !zilog->zl_suspending &&
2960 3087              (zilog->zl_suspend > 0 || BP_IS_HOLE(&zh->zh_log))) {
2961 3088                  mutex_exit(&zilog->zl_lock);
2962 3089                  dmu_objset_rele(os, suspend_tag);
2963 3090                  return (0);
2964 3091          }
2965 3092  
2966 3093          dsl_dataset_long_hold(dmu_objset_ds(os), suspend_tag);
2967 3094          dsl_pool_rele(dmu_objset_pool(os), suspend_tag);
2968 3095  
2969 3096          zilog->zl_suspend++;
2970 3097  
2971 3098          if (zilog->zl_suspend > 1) {
2972 3099                  /*
2973 3100                   * Someone else is already suspending it.
2974 3101                   * Just wait for them to finish.
2975 3102                   */
2976 3103  
2977 3104                  while (zilog->zl_suspending)
2978 3105                          cv_wait(&zilog->zl_cv_suspend, &zilog->zl_lock);
2979 3106                  mutex_exit(&zilog->zl_lock);
2980 3107  
2981 3108                  if (cookiep == NULL)
2982 3109                          zil_resume(os);
2983 3110                  else
2984 3111                          *cookiep = os;
2985 3112                  return (0);
2986 3113          }
2987 3114  
2988 3115          /*
2989 3116           * If there is no pointer to an on-disk block, this ZIL must not
2990 3117           * be active (e.g. filesystem not mounted), so there's nothing
2991 3118           * to clean up.
2992 3119           */
2993 3120          if (BP_IS_HOLE(&zh->zh_log)) {
2994 3121                  ASSERT(cookiep != NULL); /* fast path already handled */
2995 3122  
2996 3123                  *cookiep = os;
2997 3124                  mutex_exit(&zilog->zl_lock);
2998 3125                  return (0);
2999 3126          }
  
    | 
      ↓ open down ↓ | 
    621 lines elided | 
    
      ↑ open up ↑ | 
  
3000 3127  
3001 3128          zilog->zl_suspending = B_TRUE;
3002 3129          mutex_exit(&zilog->zl_lock);
3003 3130  
3004 3131          /*
3005 3132           * We need to use zil_commit_impl to ensure we wait for all
3006 3133           * LWB_STATE_OPENED and LWB_STATE_ISSUED lwb's to be committed
3007 3134           * to disk before proceeding. If we used zil_commit instead, it
3008 3135           * would just call txg_wait_synced(), because zl_suspend is set.
3009 3136           * txg_wait_synced() doesn't wait for these lwb's to be
3010      -         * LWB_STATE_DONE before returning.
     3137 +         * LWB_STATE_FLUSH_DONE before returning.
3011 3138           */
3012 3139          zil_commit_impl(zilog, 0);
3013 3140  
3014 3141          /*
3015      -         * Now that we've ensured all lwb's are LWB_STATE_DONE, we use
3016      -         * txg_wait_synced() to ensure the data from the zilog has
     3142 +         * Now that we've ensured all lwb's are LWB_STATE_FLUSH_DONE, we
     3143 +         * use txg_wait_synced() to ensure the data from the zilog has
3017 3144           * migrated to the main pool before calling zil_destroy().
3018 3145           */
3019 3146          txg_wait_synced(zilog->zl_dmu_pool, 0);
3020 3147  
3021 3148          zil_destroy(zilog, B_FALSE);
3022 3149  
3023 3150          mutex_enter(&zilog->zl_lock);
3024 3151          zilog->zl_suspending = B_FALSE;
3025 3152          cv_broadcast(&zilog->zl_cv_suspend);
3026 3153          mutex_exit(&zilog->zl_lock);
3027 3154  
3028 3155          if (cookiep == NULL)
3029 3156                  zil_resume(os);
3030 3157          else
3031 3158                  *cookiep = os;
3032 3159          return (0);
3033 3160  }
3034 3161  
3035 3162  void
3036 3163  zil_resume(void *cookie)
3037 3164  {
3038 3165          objset_t *os = cookie;
3039 3166          zilog_t *zilog = dmu_objset_zil(os);
3040 3167  
3041 3168          mutex_enter(&zilog->zl_lock);
3042 3169          ASSERT(zilog->zl_suspend != 0);
3043 3170          zilog->zl_suspend--;
3044 3171          mutex_exit(&zilog->zl_lock);
3045 3172          dsl_dataset_long_rele(dmu_objset_ds(os), suspend_tag);
3046 3173          dsl_dataset_rele(dmu_objset_ds(os), suspend_tag);
3047 3174  }
3048 3175  
3049 3176  typedef struct zil_replay_arg {
3050 3177          zil_replay_func_t **zr_replay;
3051 3178          void            *zr_arg;
3052 3179          boolean_t       zr_byteswap;
3053 3180          char            *zr_lr;
3054 3181  } zil_replay_arg_t;
3055 3182  
3056 3183  static int
3057 3184  zil_replay_error(zilog_t *zilog, lr_t *lr, int error)
3058 3185  {
3059 3186          char name[ZFS_MAX_DATASET_NAME_LEN];
3060 3187  
3061 3188          zilog->zl_replaying_seq--;      /* didn't actually replay this one */
3062 3189  
3063 3190          dmu_objset_name(zilog->zl_os, name);
3064 3191  
3065 3192          cmn_err(CE_WARN, "ZFS replay transaction error %d, "
3066 3193              "dataset %s, seq 0x%llx, txtype %llu %s\n", error, name,
3067 3194              (u_longlong_t)lr->lrc_seq,
3068 3195              (u_longlong_t)(lr->lrc_txtype & ~TX_CI),
3069 3196              (lr->lrc_txtype & TX_CI) ? "CI" : "");
3070 3197  
3071 3198          return (error);
3072 3199  }
3073 3200  
3074 3201  static int
3075 3202  zil_replay_log_record(zilog_t *zilog, lr_t *lr, void *zra, uint64_t claim_txg)
3076 3203  {
3077 3204          zil_replay_arg_t *zr = zra;
3078 3205          const zil_header_t *zh = zilog->zl_header;
3079 3206          uint64_t reclen = lr->lrc_reclen;
3080 3207          uint64_t txtype = lr->lrc_txtype;
3081 3208          int error = 0;
3082 3209  
3083 3210          zilog->zl_replaying_seq = lr->lrc_seq;
3084 3211  
3085 3212          if (lr->lrc_seq <= zh->zh_replay_seq)   /* already replayed */
3086 3213                  return (0);
3087 3214  
3088 3215          if (lr->lrc_txg < claim_txg)            /* already committed */
3089 3216                  return (0);
3090 3217  
3091 3218          /* Strip case-insensitive bit, still present in log record */
3092 3219          txtype &= ~TX_CI;
3093 3220  
3094 3221          if (txtype == 0 || txtype >= TX_MAX_TYPE)
3095 3222                  return (zil_replay_error(zilog, lr, EINVAL));
3096 3223  
3097 3224          /*
3098 3225           * If this record type can be logged out of order, the object
3099 3226           * (lr_foid) may no longer exist.  That's legitimate, not an error.
3100 3227           */
3101 3228          if (TX_OOO(txtype)) {
3102 3229                  error = dmu_object_info(zilog->zl_os,
3103 3230                      ((lr_ooo_t *)lr)->lr_foid, NULL);
3104 3231                  if (error == ENOENT || error == EEXIST)
3105 3232                          return (0);
3106 3233          }
3107 3234  
3108 3235          /*
3109 3236           * Make a copy of the data so we can revise and extend it.
3110 3237           */
3111 3238          bcopy(lr, zr->zr_lr, reclen);
3112 3239  
3113 3240          /*
3114 3241           * If this is a TX_WRITE with a blkptr, suck in the data.
3115 3242           */
3116 3243          if (txtype == TX_WRITE && reclen == sizeof (lr_write_t)) {
3117 3244                  error = zil_read_log_data(zilog, (lr_write_t *)lr,
3118 3245                      zr->zr_lr + reclen);
3119 3246                  if (error != 0)
3120 3247                          return (zil_replay_error(zilog, lr, error));
3121 3248          }
3122 3249  
3123 3250          /*
3124 3251           * The log block containing this lr may have been byteswapped
3125 3252           * so that we can easily examine common fields like lrc_txtype.
3126 3253           * However, the log is a mix of different record types, and only the
3127 3254           * replay vectors know how to byteswap their records.  Therefore, if
3128 3255           * the lr was byteswapped, undo it before invoking the replay vector.
3129 3256           */
3130 3257          if (zr->zr_byteswap)
3131 3258                  byteswap_uint64_array(zr->zr_lr, reclen);
3132 3259  
3133 3260          /*
3134 3261           * We must now do two things atomically: replay this log record,
3135 3262           * and update the log header sequence number to reflect the fact that
3136 3263           * we did so. At the end of each replay function the sequence number
3137 3264           * is updated if we are in replay mode.
3138 3265           */
3139 3266          error = zr->zr_replay[txtype](zr->zr_arg, zr->zr_lr, zr->zr_byteswap);
3140 3267          if (error != 0) {
3141 3268                  /*
3142 3269                   * The DMU's dnode layer doesn't see removes until the txg
3143 3270                   * commits, so a subsequent claim can spuriously fail with
3144 3271                   * EEXIST. So if we receive any error we try syncing out
3145 3272                   * any removes then retry the transaction.  Note that we
3146 3273                   * specify B_FALSE for byteswap now, so we don't do it twice.
3147 3274                   */
3148 3275                  txg_wait_synced(spa_get_dsl(zilog->zl_spa), 0);
3149 3276                  error = zr->zr_replay[txtype](zr->zr_arg, zr->zr_lr, B_FALSE);
3150 3277                  if (error != 0)
3151 3278                          return (zil_replay_error(zilog, lr, error));
3152 3279          }
3153 3280          return (0);
3154 3281  }
3155 3282  
3156 3283  /* ARGSUSED */
3157 3284  static int
3158 3285  zil_incr_blks(zilog_t *zilog, blkptr_t *bp, void *arg, uint64_t claim_txg)
3159 3286  {
3160 3287          zilog->zl_replay_blks++;
3161 3288  
3162 3289          return (0);
3163 3290  }
3164 3291  
3165 3292  /*
3166 3293   * If this dataset has a non-empty intent log, replay it and destroy it.
3167 3294   */
3168 3295  void
3169 3296  zil_replay(objset_t *os, void *arg, zil_replay_func_t *replay_func[TX_MAX_TYPE])
3170 3297  {
3171 3298          zilog_t *zilog = dmu_objset_zil(os);
3172 3299          const zil_header_t *zh = zilog->zl_header;
3173 3300          zil_replay_arg_t zr;
3174 3301  
3175 3302          if ((zh->zh_flags & ZIL_REPLAY_NEEDED) == 0) {
3176 3303                  zil_destroy(zilog, B_TRUE);
3177 3304                  return;
3178 3305          }
3179 3306  
3180 3307          zr.zr_replay = replay_func;
3181 3308          zr.zr_arg = arg;
3182 3309          zr.zr_byteswap = BP_SHOULD_BYTESWAP(&zh->zh_log);
3183 3310          zr.zr_lr = kmem_alloc(2 * SPA_MAXBLOCKSIZE, KM_SLEEP);
3184 3311  
3185 3312          /*
3186 3313           * Wait for in-progress removes to sync before starting replay.
3187 3314           */
3188 3315          txg_wait_synced(zilog->zl_dmu_pool, 0);
3189 3316  
3190 3317          zilog->zl_replay = B_TRUE;
3191 3318          zilog->zl_replay_time = ddi_get_lbolt();
3192 3319          ASSERT(zilog->zl_replay_blks == 0);
3193 3320          (void) zil_parse(zilog, zil_incr_blks, zil_replay_log_record, &zr,
3194 3321              zh->zh_claim_txg);
3195 3322          kmem_free(zr.zr_lr, 2 * SPA_MAXBLOCKSIZE);
3196 3323  
3197 3324          zil_destroy(zilog, B_FALSE);
3198 3325          txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
3199 3326          zilog->zl_replay = B_FALSE;
3200 3327  }
3201 3328  
3202 3329  boolean_t
3203 3330  zil_replaying(zilog_t *zilog, dmu_tx_t *tx)
3204 3331  {
3205 3332          if (zilog->zl_sync == ZFS_SYNC_DISABLED)
3206 3333                  return (B_TRUE);
3207 3334  
3208 3335          if (zilog->zl_replay) {
3209 3336                  dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
  
    | 
      ↓ open down ↓ | 
    183 lines elided | 
    
      ↑ open up ↑ | 
  
3210 3337                  zilog->zl_replayed_seq[dmu_tx_get_txg(tx) & TXG_MASK] =
3211 3338                      zilog->zl_replaying_seq;
3212 3339                  return (B_TRUE);
3213 3340          }
3214 3341  
3215 3342          return (B_FALSE);
3216 3343  }
3217 3344  
3218 3345  /* ARGSUSED */
3219 3346  int
3220      -zil_reset(const char *osname, void *arg)
     3347 +zil_vdev_offline(const char *osname, void *arg)
3221 3348  {
3222 3349          int error;
3223 3350  
3224 3351          error = zil_suspend(osname, NULL);
3225 3352          if (error != 0)
3226 3353                  return (SET_ERROR(EEXIST));
3227 3354          return (0);
3228 3355  }
    
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX