3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  23  * Copyright (c) 2011, 2017 by Delphix. All rights reserved.
  24  * Copyright (c) 2014 Integros [integros.com]
  25  */
  26 
  27 /* Portions Copyright 2010 Robert Milkowski */
  28 
  29 #include <sys/zfs_context.h>
  30 #include <sys/spa.h>
  31 #include <sys/dmu.h>
  32 #include <sys/zap.h>
  33 #include <sys/arc.h>
  34 #include <sys/stat.h>
  35 #include <sys/resource.h>
  36 #include <sys/zil.h>
  37 #include <sys/zil_impl.h>
  38 #include <sys/dsl_dataset.h>
  39 #include <sys/vdev_impl.h>
  40 #include <sys/dmu_tx.h>
  41 #include <sys/dsl_pool.h>
  42 #include <sys/abd.h>
 
 
  74  * block in the chain, and the ZIL header points to the first block in
  75  * the chain.
  76  *
  77  * Note, there is not a fixed place in the pool to hold these ZIL
  78  * blocks; they are dynamically allocated and freed as needed from the
  79  * blocks available on the pool, though they can be preferentially
  80  * allocated from a dedicated "log" vdev.
  81  */
  82 
  83 /*
  84  * This controls the amount of time that a ZIL block (lwb) will remain
  85  * "open" when it isn't "full", and it has a thread waiting for it to be
  86  * committed to stable storage. Please refer to the zil_commit_waiter()
  87  * function (and the comments within it) for more details.
  88  */
  89 int zfs_commit_timeout_pct = 5;
  90 
  91 /*
  92  * Disable intent logging replay.  This global ZIL switch affects all pools.
  93  */
  94 int zil_replay_disable = 0;
  95 
  96 /*
  97  * Tunable parameter for debugging or performance analysis.  Setting
  98  * zfs_nocacheflush will cause corruption on power loss if a volatile
  99  * out-of-order write cache is enabled.
 100  */
 101 boolean_t zfs_nocacheflush = B_FALSE;
 102 
 103 /*
 104  * Limit SLOG write size per commit executed with synchronous priority.
 105  * Any writes above that will be executed with lower (asynchronous) priority
 106  * to limit potential SLOG device abuse by single active ZIL writer.
 107  */
 108 uint64_t zil_slog_bulk = 768 * 1024;
 109 
 110 static kmem_cache_t *zil_lwb_cache;
 111 static kmem_cache_t *zil_zcw_cache;
 112 
 113 static void zil_async_to_sync(zilog_t *zilog, uint64_t foid);
 114 
 
 
 501         mutex_exit(&zilog->zl_lock);
 502 
 503         ASSERT(!MUTEX_HELD(&lwb->lwb_vdev_lock));
 504         ASSERT(avl_is_empty(&lwb->lwb_vdev_tree));
 505         VERIFY(list_is_empty(&lwb->lwb_waiters));
 506 
 507         return (lwb);
 508 }
 509 
 510 static void
 511 zil_free_lwb(zilog_t *zilog, lwb_t *lwb)
 512 {
 513         ASSERT(MUTEX_HELD(&zilog->zl_lock));
 514         ASSERT(!MUTEX_HELD(&lwb->lwb_vdev_lock));
 515         VERIFY(list_is_empty(&lwb->lwb_waiters));
 516         ASSERT(avl_is_empty(&lwb->lwb_vdev_tree));
 517         ASSERT3P(lwb->lwb_write_zio, ==, NULL);
 518         ASSERT3P(lwb->lwb_root_zio, ==, NULL);
 519         ASSERT3U(lwb->lwb_max_txg, <=, spa_syncing_txg(zilog->zl_spa));
 520         ASSERT(lwb->lwb_state == LWB_STATE_CLOSED ||
 521             lwb->lwb_state == LWB_STATE_DONE);
 522 
 523         /*
 524          * Clear the zilog's field to indicate this lwb is no longer
 525          * valid, and prevent use-after-free errors.
 526          */
 527         if (zilog->zl_last_lwb_opened == lwb)
 528                 zilog->zl_last_lwb_opened = NULL;
 529 
 530         kmem_cache_free(zil_lwb_cache, lwb);
 531 }
 532 
 533 /*
 534  * Called when we create in-memory log transactions so that we know
 535  * to cleanup the itxs at the end of spa_sync().
 536  */
 537 void
 538 zilog_dirty(zilog_t *zilog, uint64_t txg)
 539 {
 540         dsl_pool_t *dp = zilog->zl_dmu_pool;
 541         dsl_dataset_t *ds = dmu_objset_ds(zilog->zl_os);
 
 859 /*
 860  * This function is used when the given waiter is to be linked into an
 861  * lwb's "lwb_waiter" list; i.e. when the itx is committed to the lwb.
 862  * At this point, the waiter will no longer be referenced by the itx,
 863  * and instead, will be referenced by the lwb.
 864  */
 865 static void
 866 zil_commit_waiter_link_lwb(zil_commit_waiter_t *zcw, lwb_t *lwb)
 867 {
 868         /*
 869          * The lwb_waiters field of the lwb is protected by the zilog's
 870          * zl_lock, thus it must be held when calling this function.
 871          */
 872         ASSERT(MUTEX_HELD(&lwb->lwb_zilog->zl_lock));
 873 
 874         mutex_enter(&zcw->zcw_lock);
 875         ASSERT(!list_link_active(&zcw->zcw_node));
 876         ASSERT3P(zcw->zcw_lwb, ==, NULL);
 877         ASSERT3P(lwb, !=, NULL);
 878         ASSERT(lwb->lwb_state == LWB_STATE_OPENED ||
 879             lwb->lwb_state == LWB_STATE_ISSUED);
 880 
 881         list_insert_tail(&lwb->lwb_waiters, zcw);
 882         zcw->zcw_lwb = lwb;
 883         mutex_exit(&zcw->zcw_lock);
 884 }
 885 
 886 /*
 887  * This function is used when zio_alloc_zil() fails to allocate a ZIL
 888  * block, and the given waiter must be linked to the "nolwb waiters"
 889  * list inside of zil_process_commit_list().
 890  */
 891 static void
 892 zil_commit_waiter_link_nolwb(zil_commit_waiter_t *zcw, list_t *nolwb)
 893 {
 894         mutex_enter(&zcw->zcw_lock);
 895         ASSERT(!list_link_active(&zcw->zcw_node));
 896         ASSERT3P(zcw->zcw_lwb, ==, NULL);
 897         list_insert_tail(nolwb, zcw);
 898         mutex_exit(&zcw->zcw_lock);
 899 }
 
 905         avl_index_t where;
 906         zil_vdev_node_t *zv, zvsearch;
 907         int ndvas = BP_GET_NDVAS(bp);
 908         int i;
 909 
 910         if (zfs_nocacheflush)
 911                 return;
 912 
 913         mutex_enter(&lwb->lwb_vdev_lock);
 914         for (i = 0; i < ndvas; i++) {
 915                 zvsearch.zv_vdev = DVA_GET_VDEV(&bp->blk_dva[i]);
 916                 if (avl_find(t, &zvsearch, &where) == NULL) {
 917                         zv = kmem_alloc(sizeof (*zv), KM_SLEEP);
 918                         zv->zv_vdev = zvsearch.zv_vdev;
 919                         avl_insert(t, zv, where);
 920                 }
 921         }
 922         mutex_exit(&lwb->lwb_vdev_lock);
 923 }
 924 
 925 void
 926 zil_lwb_add_txg(lwb_t *lwb, uint64_t txg)
 927 {
 928         lwb->lwb_max_txg = MAX(lwb->lwb_max_txg, txg);
 929 }
 930 
 931 /*
 932  * This function is a called after all VDEVs associated with a given lwb
 933  * write have completed their DKIOCFLUSHWRITECACHE command; or as soon
 934  * as the lwb write completes, if "zfs_nocacheflush" is set.
 935  *
 936  * The intention is for this function to be called as soon as the
 937  * contents of an lwb are considered "stable" on disk, and will survive
 938  * any sudden loss of power. At this point, any threads waiting for the
 939  * lwb to reach this state are signalled, and the "waiter" structures
 940  * are marked "done".
 941  */
 942 static void
 943 zil_lwb_flush_vdevs_done(zio_t *zio)
 944 {
 945         lwb_t *lwb = zio->io_private;
 946         zilog_t *zilog = lwb->lwb_zilog;
 947         dmu_tx_t *tx = lwb->lwb_tx;
 948         zil_commit_waiter_t *zcw;
 949 
 950         spa_config_exit(zilog->zl_spa, SCL_STATE, lwb);
 951 
 952         zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
 953 
 954         mutex_enter(&zilog->zl_lock);
 955 
 956         /*
 957          * Ensure the lwb buffer pointer is cleared before releasing the
 958          * txg. If we have had an allocation failure and the txg is
 959          * waiting to sync then we want zil_sync() to remove the lwb so
 960          * that it's not picked up as the next new one in
 961          * zil_process_commit_list(). zil_sync() will only remove the
 962          * lwb if lwb_buf is null.
 963          */
 964         lwb->lwb_buf = NULL;
 965         lwb->lwb_tx = NULL;
 966 
 967         ASSERT3U(lwb->lwb_issued_timestamp, >, 0);
 968         zilog->zl_last_lwb_latency = gethrtime() - lwb->lwb_issued_timestamp;
 969 
 970         lwb->lwb_root_zio = NULL;
 971         lwb->lwb_state = LWB_STATE_DONE;
 972 
 973         if (zilog->zl_last_lwb_opened == lwb) {
 974                 /*
 975                  * Remember the highest committed log sequence number
 976                  * for ztest. We only update this value when all the log
 977                  * writes succeeded, because ztest wants to ASSERT that
 978                  * it got the whole log chain.
 979                  */
 980                 zilog->zl_commit_lr_seq = zilog->zl_lr_seq;
 981         }
 982 
 983         while ((zcw = list_head(&lwb->lwb_waiters)) != NULL) {
 984                 mutex_enter(&zcw->zcw_lock);
 985 
 986                 ASSERT(list_link_active(&zcw->zcw_node));
 987                 list_remove(&lwb->lwb_waiters, zcw);
 988 
 989                 ASSERT3P(zcw->zcw_lwb, ==, lwb);
 990                 zcw->zcw_lwb = NULL;
 991 
 992                 zcw->zcw_zio_error = zio->io_error;
 993 
 994                 ASSERT3B(zcw->zcw_done, ==, B_FALSE);
 995                 zcw->zcw_done = B_TRUE;
 996                 cv_broadcast(&zcw->zcw_cv);
 997 
 998                 mutex_exit(&zcw->zcw_lock);
 999         }
1000 
1001         mutex_exit(&zilog->zl_lock);
1002 
1003         /*
1004          * Now that we've written this log block, we have a stable pointer
1005          * to the next block in the chain, so it's OK to let the txg in
1006          * which we allocated the next block sync.
1007          */
1008         dmu_tx_commit(tx);
1009 }
1010 
1011 /*
1012  * This is called when an lwb write completes. This means, this specific
1013  * lwb was written to disk, and all dependent lwb have also been
1014  * written to disk.
1015  *
1016  * At this point, a DKIOCFLUSHWRITECACHE command hasn't been issued to
1017  * the VDEVs involved in writing out this specific lwb. The lwb will be
1018  * "done" once zil_lwb_flush_vdevs_done() is called, which occurs in the
1019  * zio completion callback for the lwb's root zio.
1020  */
1021 static void
1022 zil_lwb_write_done(zio_t *zio)
1023 {
1024         lwb_t *lwb = zio->io_private;
1025         spa_t *spa = zio->io_spa;
1026         zilog_t *zilog = lwb->lwb_zilog;
1027         avl_tree_t *t = &lwb->lwb_vdev_tree;
1028         void *cookie = NULL;
1029         zil_vdev_node_t *zv;
1030 
1031         ASSERT3S(spa_config_held(spa, SCL_STATE, RW_READER), !=, 0);
1032 
1033         ASSERT(BP_GET_COMPRESS(zio->io_bp) == ZIO_COMPRESS_OFF);
1034         ASSERT(BP_GET_TYPE(zio->io_bp) == DMU_OT_INTENT_LOG);
1035         ASSERT(BP_GET_LEVEL(zio->io_bp) == 0);
1036         ASSERT(BP_GET_BYTEORDER(zio->io_bp) == ZFS_HOST_BYTEORDER);
1037         ASSERT(!BP_IS_GANG(zio->io_bp));
1038         ASSERT(!BP_IS_HOLE(zio->io_bp));
1039         ASSERT(BP_GET_FILL(zio->io_bp) == 0);
1040 
1041         abd_put(zio->io_abd);
1042 
1043         ASSERT3S(lwb->lwb_state, ==, LWB_STATE_ISSUED);
1044 
1045         mutex_enter(&zilog->zl_lock);
1046         lwb->lwb_write_zio = NULL;
1047         mutex_exit(&zilog->zl_lock);
1048 
1049         if (avl_numnodes(t) == 0)
1050                 return;
1051 
1052         /*
1053          * If there was an IO error, we're not going to call zio_flush()
1054          * on these vdevs, so we simply empty the tree and free the
1055          * nodes. We avoid calling zio_flush() since there isn't any
1056          * good reason for doing so, after the lwb block failed to be
1057          * written out.
1058          */
1059         if (zio->io_error != 0) {
1060                 while ((zv = avl_destroy_nodes(t, &cookie)) != NULL)
1061                         kmem_free(zv, sizeof (*zv));
1062                 return;
1063         }
1064 
1065         while ((zv = avl_destroy_nodes(t, &cookie)) != NULL) {
1066                 vdev_t *vd = vdev_lookup_top(spa, zv->zv_vdev);
1067                 if (vd != NULL)
1068                         zio_flush(lwb->lwb_root_zio, vd);
1069                 kmem_free(zv, sizeof (*zv));
1070         }
1071 }
1072 
1073 /*
1074  * This function's purpose is to "open" an lwb such that it is ready to
1075  * accept new itxs being committed to it. To do this, the lwb's zio
1076  * structures are created, and linked to the lwb. This function is
1077  * idempotent; if the passed in lwb has already been opened, this
1078  * function is essentially a no-op.
1079  */
1080 static void
1081 zil_lwb_write_open(zilog_t *zilog, lwb_t *lwb)
1082 {
1083         zbookmark_phys_t zb;
1084         zio_priority_t prio;
1085 
1086         ASSERT(MUTEX_HELD(&zilog->zl_issuer_lock));
1087         ASSERT3P(lwb, !=, NULL);
1088         EQUIV(lwb->lwb_root_zio == NULL, lwb->lwb_state == LWB_STATE_CLOSED);
1089         EQUIV(lwb->lwb_root_zio != NULL, lwb->lwb_state == LWB_STATE_OPENED);
1090 
1091         SET_BOOKMARK(&zb, lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_OBJSET],
1092             ZB_ZIL_OBJECT, ZB_ZIL_LEVEL,
 
1097                     BP_GET_LSIZE(&lwb->lwb_blk));
1098 
1099                 if (!lwb->lwb_slog || zilog->zl_cur_used <= zil_slog_bulk)
1100                         prio = ZIO_PRIORITY_SYNC_WRITE;
1101                 else
1102                         prio = ZIO_PRIORITY_ASYNC_WRITE;
1103 
1104                 lwb->lwb_root_zio = zio_root(zilog->zl_spa,
1105                     zil_lwb_flush_vdevs_done, lwb, ZIO_FLAG_CANFAIL);
1106                 ASSERT3P(lwb->lwb_root_zio, !=, NULL);
1107 
1108                 lwb->lwb_write_zio = zio_rewrite(lwb->lwb_root_zio,
1109                     zilog->zl_spa, 0, &lwb->lwb_blk, lwb_abd,
1110                     BP_GET_LSIZE(&lwb->lwb_blk), zil_lwb_write_done, lwb,
1111                     prio, ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE, &zb);
1112                 ASSERT3P(lwb->lwb_write_zio, !=, NULL);
1113 
1114                 lwb->lwb_state = LWB_STATE_OPENED;
1115 
1116                 mutex_enter(&zilog->zl_lock);
1117 
1118                 /*
1119                  * The zilog's "zl_last_lwb_opened" field is used to
1120                  * build the lwb/zio dependency chain, which is used to
1121                  * preserve the ordering of lwb completions that is
1122                  * required by the semantics of the ZIL. Each new lwb
1123                  * zio becomes a parent of the "previous" lwb zio, such
1124                  * that the new lwb's zio cannot complete until the
1125                  * "previous" lwb's zio completes.
1126                  *
1127                  * This is required by the semantics of zil_commit();
1128                  * the commit waiters attached to the lwbs will be woken
1129                  * in the lwb zio's completion callback, so this zio
1130                  * dependency graph ensures the waiters are woken in the
1131                  * correct order (the same order the lwbs were created).
1132                  */
1133                 lwb_t *last_lwb_opened = zilog->zl_last_lwb_opened;
1134                 if (last_lwb_opened != NULL &&
1135                     last_lwb_opened->lwb_state != LWB_STATE_DONE) {
1136                         ASSERT(last_lwb_opened->lwb_state == LWB_STATE_OPENED ||
1137                             last_lwb_opened->lwb_state == LWB_STATE_ISSUED);
1138                         ASSERT3P(last_lwb_opened->lwb_root_zio, !=, NULL);
1139                         zio_add_child(lwb->lwb_root_zio,
1140                             last_lwb_opened->lwb_root_zio);
1141                 }
1142                 zilog->zl_last_lwb_opened = lwb;
1143 
1144                 mutex_exit(&zilog->zl_lock);
1145         }
1146 
1147         ASSERT3P(lwb->lwb_root_zio, !=, NULL);
1148         ASSERT3P(lwb->lwb_write_zio, !=, NULL);
1149         ASSERT3S(lwb->lwb_state, ==, LWB_STATE_OPENED);
1150 }
1151 
1152 /*
1153  * Define a limited set of intent log block sizes.
1154  *
1155  * These must be a multiple of 4KB. Note only the amount used (again
1156  * aligned to 4KB) actually gets written. However, we can't always just
1157  * allocate SPA_OLD_MAXBLOCKSIZE as the slog space could be exhausted.
1158  */
1159 uint64_t zil_block_buckets[] = {
1160     4096,               /* non TX_WRITE */
1161     8192+4096,          /* data base */
1162     32*1024 + 4096,     /* NFS writes */
1163     UINT64_MAX
 
1191         } else {
1192                 zilc = (zil_chain_t *)(lwb->lwb_buf + lwb->lwb_sz);
1193                 bp = &zilc->zc_next_blk;
1194         }
1195 
1196         ASSERT(lwb->lwb_nused <= lwb->lwb_sz);
1197 
1198         /*
1199          * Allocate the next block and save its address in this block
1200          * before writing it in order to establish the log chain.
1201          * Note that if the allocation of nlwb synced before we wrote
1202          * the block that points at it (lwb), we'd leak it if we crashed.
1203          * Therefore, we don't do dmu_tx_commit() until zil_lwb_write_done().
1204          * We dirty the dataset to ensure that zil_sync() will be called
1205          * to clean up in the event of allocation failure or I/O failure.
1206          */
1207 
1208         tx = dmu_tx_create(zilog->zl_os);
1209 
1210         /*
1211          * Since we are not going to create any new dirty data, and we
1212          * can even help with clearing the existing dirty data, we
1213          * should not be subject to the dirty data based delays. We
1214          * use TXG_NOTHROTTLE to bypass the delay mechanism.
1215          */
1216         VERIFY0(dmu_tx_assign(tx, TXG_WAIT | TXG_NOTHROTTLE));
1217 
1218         dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
1219         txg = dmu_tx_get_txg(tx);
1220 
1221         lwb->lwb_tx = tx;
1222 
1223         /*
1224          * Log blocks are pre-allocated. Here we select the size of the next
1225          * block, based on size used in the last block.
1226          * - first find the smallest bucket that will fit the block from a
1227          *   limited set of block sizes. This is because it's faster to write
1228          *   blocks allocated from the same metaslab as they are adjacent or
1229          *   close.
1230          * - next find the maximum from the new suggested size and an array of
1231          *   previous sizes. This lessens a picket fence effect of wrongly
1232          *   guesssing the size if we have a stream of say 2k, 64k, 2k, 64k
1233          *   requests.
1234          *
1235          * Note we only write what is used, but we can't just allocate
1236          * the maximum block size because we can exhaust the available
1237          * pool log space.
 
1823  * completion, or b) skip them altogether.
1824  *
1825  * This is used as a performance optimization to prevent commit itxs
1826  * from generating new lwbs when it's unnecessary to do so.
1827  */
1828 static void
1829 zil_prune_commit_list(zilog_t *zilog)
1830 {
1831         itx_t *itx;
1832 
1833         ASSERT(MUTEX_HELD(&zilog->zl_issuer_lock));
1834 
1835         while (itx = list_head(&zilog->zl_itx_commit_list)) {
1836                 lr_t *lrc = &itx->itx_lr;
1837                 if (lrc->lrc_txtype != TX_COMMIT)
1838                         break;
1839 
1840                 mutex_enter(&zilog->zl_lock);
1841 
1842                 lwb_t *last_lwb = zilog->zl_last_lwb_opened;
1843                 if (last_lwb == NULL || last_lwb->lwb_state == LWB_STATE_DONE) {
1844                         /*
1845                          * All of the itxs this waiter was waiting on
1846                          * must have already completed (or there were
1847                          * never any itx's for it to wait on), so it's
1848                          * safe to skip this waiter and mark it done.
1849                          */
1850                         zil_commit_waiter_skip(itx->itx_private);
1851                 } else {
1852                         zil_commit_waiter_link_lwb(itx->itx_private, last_lwb);
1853                         itx->itx_private = NULL;
1854                 }
1855 
1856                 mutex_exit(&zilog->zl_lock);
1857 
1858                 list_remove(&zilog->zl_itx_commit_list, itx);
1859                 zil_itx_destroy(itx);
1860         }
1861 
1862         IMPLY(itx != NULL, itx->itx_lr.lrc_txtype != TX_COMMIT);
1863 }
 
1904         lwb_t *lwb;
1905         itx_t *itx;
1906 
1907         ASSERT(MUTEX_HELD(&zilog->zl_issuer_lock));
1908 
1909         /*
1910          * Return if there's nothing to commit before we dirty the fs by
1911          * calling zil_create().
1912          */
1913         if (list_head(&zilog->zl_itx_commit_list) == NULL)
1914                 return;
1915 
1916         list_create(&nolwb_waiters, sizeof (zil_commit_waiter_t),
1917             offsetof(zil_commit_waiter_t, zcw_node));
1918 
1919         lwb = list_tail(&zilog->zl_lwb_list);
1920         if (lwb == NULL) {
1921                 lwb = zil_create(zilog);
1922         } else {
1923                 ASSERT3S(lwb->lwb_state, !=, LWB_STATE_ISSUED);
1924                 ASSERT3S(lwb->lwb_state, !=, LWB_STATE_DONE);
1925         }
1926 
1927         while (itx = list_head(&zilog->zl_itx_commit_list)) {
1928                 lr_t *lrc = &itx->itx_lr;
1929                 uint64_t txg = lrc->lrc_txg;
1930 
1931                 ASSERT3U(txg, !=, 0);
1932 
1933                 if (lrc->lrc_txtype == TX_COMMIT) {
1934                         DTRACE_PROBE2(zil__process__commit__itx,
1935                             zilog_t *, zilog, itx_t *, itx);
1936                 } else {
1937                         DTRACE_PROBE2(zil__process__normal__itx,
1938                             zilog_t *, zilog, itx_t *, itx);
1939                 }
1940 
1941                 boolean_t synced = txg <= spa_last_synced_txg(spa);
1942                 boolean_t frozen = txg > spa_freeze_txg(spa);
1943 
1944                 /*
 
2006                  * the ZIL write pipeline; see the comment within
2007                  * zil_commit_writer_stall() for more details.
2008                  */
2009                 zil_commit_writer_stall(zilog);
2010 
2011                 /*
2012                  * Additionally, we have to signal and mark the "nolwb"
2013                  * waiters as "done" here, since without an lwb, we
2014                  * can't do this via zil_lwb_flush_vdevs_done() like
2015                  * normal.
2016                  */
2017                 zil_commit_waiter_t *zcw;
2018                 while (zcw = list_head(&nolwb_waiters)) {
2019                         zil_commit_waiter_skip(zcw);
2020                         list_remove(&nolwb_waiters, zcw);
2021                 }
2022         } else {
2023                 ASSERT(list_is_empty(&nolwb_waiters));
2024                 ASSERT3P(lwb, !=, NULL);
2025                 ASSERT3S(lwb->lwb_state, !=, LWB_STATE_ISSUED);
2026                 ASSERT3S(lwb->lwb_state, !=, LWB_STATE_DONE);
2027 
2028                 /*
2029                  * At this point, the ZIL block pointed at by the "lwb"
2030                  * variable is in one of the following states: "closed"
2031                  * or "open".
2032                  *
2033                  * If its "closed", then no itxs have been committed to
2034                  * it, so there's no point in issuing its zio (i.e.
2035                  * it's "empty").
2036                  *
2037                  * If its "open" state, then it contains one or more
2038                  * itxs that eventually need to be committed to stable
2039                  * storage. In this case we intentionally do not issue
2040                  * the lwb's zio to disk yet, and instead rely on one of
2041                  * the following two mechanisms for issuing the zio:
2042                  *
2043                  * 1. Ideally, there will be more ZIL activity occuring
2044                  * on the system, such that this function will be
2045                  * immediately called again (not necessarily by the same
2046                  * thread) and this lwb's zio will be issued via
 
2127 
2128 static void
2129 zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw)
2130 {
2131         ASSERT(!MUTEX_HELD(&zilog->zl_issuer_lock));
2132         ASSERT(MUTEX_HELD(&zcw->zcw_lock));
2133         ASSERT3B(zcw->zcw_done, ==, B_FALSE);
2134 
2135         lwb_t *lwb = zcw->zcw_lwb;
2136         ASSERT3P(lwb, !=, NULL);
2137         ASSERT3S(lwb->lwb_state, !=, LWB_STATE_CLOSED);
2138 
2139         /*
2140          * If the lwb has already been issued by another thread, we can
2141          * immediately return since there's no work to be done (the
2142          * point of this function is to issue the lwb). Additionally, we
2143          * do this prior to acquiring the zl_issuer_lock, to avoid
2144          * acquiring it when it's not necessary to do so.
2145          */
2146         if (lwb->lwb_state == LWB_STATE_ISSUED ||
2147             lwb->lwb_state == LWB_STATE_DONE)
2148                 return;
2149 
2150         /*
2151          * In order to call zil_lwb_write_issue() we must hold the
2152          * zilog's "zl_issuer_lock". We can't simply acquire that lock,
2153          * since we're already holding the commit waiter's "zcw_lock",
2154          * and those two locks are aquired in the opposite order
2155          * elsewhere.
2156          */
2157         mutex_exit(&zcw->zcw_lock);
2158         mutex_enter(&zilog->zl_issuer_lock);
2159         mutex_enter(&zcw->zcw_lock);
2160 
2161         /*
2162          * Since we just dropped and re-acquired the commit waiter's
2163          * lock, we have to re-check to see if the waiter was marked
2164          * "done" during that process. If the waiter was marked "done",
2165          * the "lwb" pointer is no longer valid (it can be free'd after
2166          * the waiter is marked "done"), so without this check we could
2167          * wind up with a use-after-free error below.
 
2175          * We've already checked this above, but since we hadn't acquired
2176          * the zilog's zl_issuer_lock, we have to perform this check a
2177          * second time while holding the lock.
2178          *
2179          * We don't need to hold the zl_lock since the lwb cannot transition
2180          * from OPENED to ISSUED while we hold the zl_issuer_lock. The lwb
2181          * _can_ transition from ISSUED to DONE, but it's OK to race with
2182          * that transition since we treat the lwb the same, whether it's in
2183          * the ISSUED or DONE states.
2184          *
2185          * The important thing, is we treat the lwb differently depending on
2186          * if it's ISSUED or OPENED, and block any other threads that might
2187          * attempt to issue this lwb. For that reason we hold the
2188          * zl_issuer_lock when checking the lwb_state; we must not call
2189          * zil_lwb_write_issue() if the lwb had already been issued.
2190          *
2191          * See the comment above the lwb_state_t structure definition for
2192          * more details on the lwb states, and locking requirements.
2193          */
2194         if (lwb->lwb_state == LWB_STATE_ISSUED ||
2195             lwb->lwb_state == LWB_STATE_DONE)
2196                 goto out;
2197 
2198         ASSERT3S(lwb->lwb_state, ==, LWB_STATE_OPENED);
2199 
2200         /*
2201          * As described in the comments above zil_commit_waiter() and
2202          * zil_process_commit_list(), we need to issue this lwb's zio
2203          * since we've reached the commit waiter's timeout and it still
2204          * hasn't been issued.
2205          */
2206         lwb_t *nlwb = zil_lwb_write_issue(zilog, lwb);
2207 
2208         ASSERT3S(lwb->lwb_state, !=, LWB_STATE_OPENED);
2209 
2210         /*
2211          * Since the lwb's zio hadn't been issued by the time this thread
2212          * reached its timeout, we reset the zilog's "zl_cur_used" field
2213          * to influence the zil block size selection algorithm.
2214          *
2215          * By having to issue the lwb's zio here, it means the size of the
 
 
2348                                  * isn't done.
2349                                  */
2350                                 ASSERT3P(lwb, ==, zcw->zcw_lwb);
2351                                 ASSERT3S(lwb->lwb_state, !=, LWB_STATE_OPENED);
2352                         }
2353                 } else {
2354                         /*
2355                          * If the lwb isn't open, then it must have already
2356                          * been issued. In that case, there's no need to
2357                          * use a timeout when waiting for the lwb to
2358                          * complete.
2359                          *
2360                          * Additionally, if the lwb is NULL, the waiter
2361                          * will soon be signalled and marked done via
2362                          * zil_clean() and zil_itxg_clean(), so no timeout
2363                          * is required.
2364                          */
2365 
2366                         IMPLY(lwb != NULL,
2367                             lwb->lwb_state == LWB_STATE_ISSUED ||
2368                             lwb->lwb_state == LWB_STATE_DONE);
2369                         cv_wait(&zcw->zcw_cv, &zcw->zcw_lock);
2370                 }
2371         }
2372 
2373         mutex_exit(&zcw->zcw_lock);
2374 }
2375 
2376 static zil_commit_waiter_t *
2377 zil_alloc_commit_waiter()
2378 {
2379         zil_commit_waiter_t *zcw = kmem_cache_alloc(zil_zcw_cache, KM_SLEEP);
2380 
2381         cv_init(&zcw->zcw_cv, NULL, CV_DEFAULT, NULL);
2382         mutex_init(&zcw->zcw_lock, NULL, MUTEX_DEFAULT, NULL);
2383         list_link_init(&zcw->zcw_node);
2384         zcw->zcw_lwb = NULL;
2385         zcw->zcw_done = B_FALSE;
2386         zcw->zcw_zio_error = 0;
2387 
2388         return (zcw);
 
2990          * be active (e.g. filesystem not mounted), so there's nothing
2991          * to clean up.
2992          */
2993         if (BP_IS_HOLE(&zh->zh_log)) {
2994                 ASSERT(cookiep != NULL); /* fast path already handled */
2995 
2996                 *cookiep = os;
2997                 mutex_exit(&zilog->zl_lock);
2998                 return (0);
2999         }
3000 
3001         zilog->zl_suspending = B_TRUE;
3002         mutex_exit(&zilog->zl_lock);
3003 
3004         /*
3005          * We need to use zil_commit_impl to ensure we wait for all
3006          * LWB_STATE_OPENED and LWB_STATE_ISSUED lwb's to be committed
3007          * to disk before proceeding. If we used zil_commit instead, it
3008          * would just call txg_wait_synced(), because zl_suspend is set.
3009          * txg_wait_synced() doesn't wait for these lwb's to be
3010          * LWB_STATE_DONE before returning.
3011          */
3012         zil_commit_impl(zilog, 0);
3013 
3014         /*
3015          * Now that we've ensured all lwb's are LWB_STATE_DONE, we use
3016          * txg_wait_synced() to ensure the data from the zilog has
3017          * migrated to the main pool before calling zil_destroy().
3018          */
3019         txg_wait_synced(zilog->zl_dmu_pool, 0);
3020 
3021         zil_destroy(zilog, B_FALSE);
3022 
3023         mutex_enter(&zilog->zl_lock);
3024         zilog->zl_suspending = B_FALSE;
3025         cv_broadcast(&zilog->zl_cv_suspend);
3026         mutex_exit(&zilog->zl_lock);
3027 
3028         if (cookiep == NULL)
3029                 zil_resume(os);
3030         else
3031                 *cookiep = os;
3032         return (0);
3033 }
3034 
3035 void
3036 zil_resume(void *cookie)
 
3200 }
3201 
3202 boolean_t
3203 zil_replaying(zilog_t *zilog, dmu_tx_t *tx)
3204 {
3205         if (zilog->zl_sync == ZFS_SYNC_DISABLED)
3206                 return (B_TRUE);
3207 
3208         if (zilog->zl_replay) {
3209                 dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
3210                 zilog->zl_replayed_seq[dmu_tx_get_txg(tx) & TXG_MASK] =
3211                     zilog->zl_replaying_seq;
3212                 return (B_TRUE);
3213         }
3214 
3215         return (B_FALSE);
3216 }
3217 
3218 /* ARGSUSED */
3219 int
3220 zil_reset(const char *osname, void *arg)
3221 {
3222         int error;
3223 
3224         error = zil_suspend(osname, NULL);
3225         if (error != 0)
3226                 return (SET_ERROR(EEXIST));
3227         return (0);
3228 }
 | 
 
 
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  23  * Copyright 2013 Nexenta Systems, Inc.  All rights reserved.
  24  * Copyright (c) 2011, 2017 by Delphix. All rights reserved.
  25  * Copyright (c) 2014 Integros [integros.com]
  26  */
  27 
  28 /* Portions Copyright 2010 Robert Milkowski */
  29 
  30 #include <sys/zfs_context.h>
  31 #include <sys/spa.h>
  32 #include <sys/dmu.h>
  33 #include <sys/zap.h>
  34 #include <sys/arc.h>
  35 #include <sys/stat.h>
  36 #include <sys/resource.h>
  37 #include <sys/zil.h>
  38 #include <sys/zil_impl.h>
  39 #include <sys/dsl_dataset.h>
  40 #include <sys/vdev_impl.h>
  41 #include <sys/dmu_tx.h>
  42 #include <sys/dsl_pool.h>
  43 #include <sys/abd.h>
 
 
  75  * block in the chain, and the ZIL header points to the first block in
  76  * the chain.
  77  *
  78  * Note, there is not a fixed place in the pool to hold these ZIL
  79  * blocks; they are dynamically allocated and freed as needed from the
  80  * blocks available on the pool, though they can be preferentially
  81  * allocated from a dedicated "log" vdev.
  82  */
  83 
  84 /*
  85  * This controls the amount of time that a ZIL block (lwb) will remain
  86  * "open" when it isn't "full", and it has a thread waiting for it to be
  87  * committed to stable storage. Please refer to the zil_commit_waiter()
  88  * function (and the comments within it) for more details.
  89  */
  90 int zfs_commit_timeout_pct = 5;
  91 
  92 /*
  93  * Disable intent logging replay.  This global ZIL switch affects all pools.
  94  */
  95 int zil_replay_disable = 0;    /* disable intent logging replay */
  96 
  97 /*
  98  * Tunable parameter for debugging or performance analysis.  Setting
  99  * zfs_nocacheflush will cause corruption on power loss if a volatile
 100  * out-of-order write cache is enabled.
 101  */
 102 boolean_t zfs_nocacheflush = B_FALSE;
 103 
 104 /*
 105  * Limit SLOG write size per commit executed with synchronous priority.
 106  * Any writes above that will be executed with lower (asynchronous) priority
 107  * to limit potential SLOG device abuse by single active ZIL writer.
 108  */
 109 uint64_t zil_slog_bulk = 768 * 1024;
 110 
 111 static kmem_cache_t *zil_lwb_cache;
 112 static kmem_cache_t *zil_zcw_cache;
 113 
 114 static void zil_async_to_sync(zilog_t *zilog, uint64_t foid);
 115 
 
 
 502         mutex_exit(&zilog->zl_lock);
 503 
 504         ASSERT(!MUTEX_HELD(&lwb->lwb_vdev_lock));
 505         ASSERT(avl_is_empty(&lwb->lwb_vdev_tree));
 506         VERIFY(list_is_empty(&lwb->lwb_waiters));
 507 
 508         return (lwb);
 509 }
 510 
 511 static void
 512 zil_free_lwb(zilog_t *zilog, lwb_t *lwb)
 513 {
 514         ASSERT(MUTEX_HELD(&zilog->zl_lock));
 515         ASSERT(!MUTEX_HELD(&lwb->lwb_vdev_lock));
 516         VERIFY(list_is_empty(&lwb->lwb_waiters));
 517         ASSERT(avl_is_empty(&lwb->lwb_vdev_tree));
 518         ASSERT3P(lwb->lwb_write_zio, ==, NULL);
 519         ASSERT3P(lwb->lwb_root_zio, ==, NULL);
 520         ASSERT3U(lwb->lwb_max_txg, <=, spa_syncing_txg(zilog->zl_spa));
 521         ASSERT(lwb->lwb_state == LWB_STATE_CLOSED ||
 522             lwb->lwb_state == LWB_STATE_FLUSH_DONE);
 523 
 524         /*
 525          * Clear the zilog's field to indicate this lwb is no longer
 526          * valid, and prevent use-after-free errors.
 527          */
 528         if (zilog->zl_last_lwb_opened == lwb)
 529                 zilog->zl_last_lwb_opened = NULL;
 530 
 531         kmem_cache_free(zil_lwb_cache, lwb);
 532 }
 533 
 534 /*
 535  * Called when we create in-memory log transactions so that we know
 536  * to cleanup the itxs at the end of spa_sync().
 537  */
 538 void
 539 zilog_dirty(zilog_t *zilog, uint64_t txg)
 540 {
 541         dsl_pool_t *dp = zilog->zl_dmu_pool;
 542         dsl_dataset_t *ds = dmu_objset_ds(zilog->zl_os);
 
 860 /*
 861  * This function is used when the given waiter is to be linked into an
 862  * lwb's "lwb_waiter" list; i.e. when the itx is committed to the lwb.
 863  * At this point, the waiter will no longer be referenced by the itx,
 864  * and instead, will be referenced by the lwb.
 865  */
 866 static void
 867 zil_commit_waiter_link_lwb(zil_commit_waiter_t *zcw, lwb_t *lwb)
 868 {
 869         /*
 870          * The lwb_waiters field of the lwb is protected by the zilog's
 871          * zl_lock, thus it must be held when calling this function.
 872          */
 873         ASSERT(MUTEX_HELD(&lwb->lwb_zilog->zl_lock));
 874 
 875         mutex_enter(&zcw->zcw_lock);
 876         ASSERT(!list_link_active(&zcw->zcw_node));
 877         ASSERT3P(zcw->zcw_lwb, ==, NULL);
 878         ASSERT3P(lwb, !=, NULL);
 879         ASSERT(lwb->lwb_state == LWB_STATE_OPENED ||
 880             lwb->lwb_state == LWB_STATE_ISSUED ||
 881             lwb->lwb_state == LWB_STATE_WRITE_DONE);
 882 
 883         list_insert_tail(&lwb->lwb_waiters, zcw);
 884         zcw->zcw_lwb = lwb;
 885         mutex_exit(&zcw->zcw_lock);
 886 }
 887 
 888 /*
 889  * This function is used when zio_alloc_zil() fails to allocate a ZIL
 890  * block, and the given waiter must be linked to the "nolwb waiters"
 891  * list inside of zil_process_commit_list().
 892  */
 893 static void
 894 zil_commit_waiter_link_nolwb(zil_commit_waiter_t *zcw, list_t *nolwb)
 895 {
 896         mutex_enter(&zcw->zcw_lock);
 897         ASSERT(!list_link_active(&zcw->zcw_node));
 898         ASSERT3P(zcw->zcw_lwb, ==, NULL);
 899         list_insert_tail(nolwb, zcw);
 900         mutex_exit(&zcw->zcw_lock);
 901 }
 
 907         avl_index_t where;
 908         zil_vdev_node_t *zv, zvsearch;
 909         int ndvas = BP_GET_NDVAS(bp);
 910         int i;
 911 
 912         if (zfs_nocacheflush)
 913                 return;
 914 
 915         mutex_enter(&lwb->lwb_vdev_lock);
 916         for (i = 0; i < ndvas; i++) {
 917                 zvsearch.zv_vdev = DVA_GET_VDEV(&bp->blk_dva[i]);
 918                 if (avl_find(t, &zvsearch, &where) == NULL) {
 919                         zv = kmem_alloc(sizeof (*zv), KM_SLEEP);
 920                         zv->zv_vdev = zvsearch.zv_vdev;
 921                         avl_insert(t, zv, where);
 922                 }
 923         }
 924         mutex_exit(&lwb->lwb_vdev_lock);
 925 }
 926 
 927 static void
 928 zil_lwb_flush_defer(lwb_t *lwb, lwb_t *nlwb)
 929 {
 930         avl_tree_t *src = &lwb->lwb_vdev_tree;
 931         avl_tree_t *dst = &nlwb->lwb_vdev_tree;
 932         void *cookie = NULL;
 933         zil_vdev_node_t *zv;
 934 
 935         ASSERT3S(lwb->lwb_state, ==, LWB_STATE_WRITE_DONE);
 936         ASSERT3S(nlwb->lwb_state, !=, LWB_STATE_WRITE_DONE);
 937         ASSERT3S(nlwb->lwb_state, !=, LWB_STATE_FLUSH_DONE);
 938 
 939         /*
 940          * While 'lwb' is at a point in its lifetime where lwb_vdev_tree does
 941          * not need the protection of lwb_vdev_lock (it will only be modified
 942          * while holding zilog->zl_lock) as its writes and those of its
 943          * children have all completed.  The younger 'nlwb' may be waiting on
 944          * future writes to additional vdevs.
 945          */
 946         mutex_enter(&nlwb->lwb_vdev_lock);
 947         /*
 948          * Tear down the 'lwb' vdev tree, ensuring that entries which do not
 949          * exist in 'nlwb' are moved to it, freeing any would-be duplicates.
 950          */
 951         while ((zv = avl_destroy_nodes(src, &cookie)) != NULL) {
 952                 avl_index_t where;
 953 
 954                 if (avl_find(dst, zv, &where) == NULL) {
 955                         avl_insert(dst, zv, where);
 956                 } else {
 957                         kmem_free(zv, sizeof (*zv));
 958                 }
 959         }
 960         mutex_exit(&nlwb->lwb_vdev_lock);
 961 }
 962 
 963 void
 964 zil_lwb_add_txg(lwb_t *lwb, uint64_t txg)
 965 {
 966         lwb->lwb_max_txg = MAX(lwb->lwb_max_txg, txg);
 967 }
 968 
 969 /*
 970  * This function is a called after all vdevs associated with a given lwb
 971  * write have completed their DKIOCFLUSHWRITECACHE command; or as soon
 972  * as the lwb write completes, if "zil_nocacheflush" is set. Further,
 973  * all "previous" lwb's will have completed before this function is
 974  * called; i.e. this function is called for all previous lwbs before
 975  * it's called for "this" lwb (enforced via zio the dependencies
 976  * configured in zil_lwb_set_zio_dependency()).
 977  *
 978  * The intention is for this function to be called as soon as the
 979  * contents of an lwb are considered "stable" on disk, and will survive
 980  * any sudden loss of power. At this point, any threads waiting for the
 981  * lwb to reach this state are signalled, and the "waiter" structures
 982  * are marked "done".
 983  */
 984 static void
 985 zil_lwb_flush_vdevs_done(zio_t *zio)
 986 {
 987         lwb_t *lwb = zio->io_private;
 988         zilog_t *zilog = lwb->lwb_zilog;
 989         dmu_tx_t *tx = lwb->lwb_tx;
 990         zil_commit_waiter_t *zcw;
 991 
 992         spa_config_exit(zilog->zl_spa, SCL_STATE, lwb);
 993 
 994         zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
 995 
 996         mutex_enter(&zilog->zl_lock);
 997 
 998         /*
 999          * Ensure the lwb buffer pointer is cleared before releasing the
1000          * txg. If we have had an allocation failure and the txg is
1001          * waiting to sync then we want zil_sync() to remove the lwb so
1002          * that it's not picked up as the next new one in
1003          * zil_process_commit_list(). zil_sync() will only remove the
1004          * lwb if lwb_buf is null.
1005          */
1006         lwb->lwb_buf = NULL;
1007         lwb->lwb_tx = NULL;
1008 
1009         ASSERT3U(lwb->lwb_issued_timestamp, >, 0);
1010         zilog->zl_last_lwb_latency = gethrtime() - lwb->lwb_issued_timestamp;
1011 
1012         lwb->lwb_root_zio = NULL;
1013 
1014         ASSERT3S(lwb->lwb_state, ==, LWB_STATE_WRITE_DONE);
1015         lwb->lwb_state = LWB_STATE_FLUSH_DONE;
1016 
1017         if (zilog->zl_last_lwb_opened == lwb) {
1018                 /*
1019                  * Remember the highest committed log sequence number
1020                  * for ztest. We only update this value when all the log
1021                  * writes succeeded, because ztest wants to ASSERT that
1022                  * it got the whole log chain.
1023                  */
1024                 zilog->zl_commit_lr_seq = zilog->zl_lr_seq;
1025         }
1026 
1027         while ((zcw = list_head(&lwb->lwb_waiters)) != NULL) {
1028                 mutex_enter(&zcw->zcw_lock);
1029 
1030                 ASSERT(list_link_active(&zcw->zcw_node));
1031                 list_remove(&lwb->lwb_waiters, zcw);
1032 
1033                 ASSERT3P(zcw->zcw_lwb, ==, lwb);
1034                 zcw->zcw_lwb = NULL;
1035 
1036                 zcw->zcw_zio_error = zio->io_error;
1037 
1038                 ASSERT3B(zcw->zcw_done, ==, B_FALSE);
1039                 zcw->zcw_done = B_TRUE;
1040                 cv_broadcast(&zcw->zcw_cv);
1041 
1042                 mutex_exit(&zcw->zcw_lock);
1043         }
1044 
1045         mutex_exit(&zilog->zl_lock);
1046 
1047         /*
1048          * Now that we've written this log block, we have a stable pointer
1049          * to the next block in the chain, so it's OK to let the txg in
1050          * which we allocated the next block sync.
1051          */
1052         dmu_tx_commit(tx);
1053 }
1054 
1055 /*
1056  * This is called when an lwb's write zio completes. The callback's
1057  * purpose is to issue the DKIOCFLUSHWRITECACHE commands for the vdevs
1058  * in the lwb's lwb_vdev_tree. The tree will contain the vdevs involved
1059  * in writing out this specific lwb's data, and in the case that cache
1060  * flushes have been deferred, vdevs involved in writing the data for
1061  * previous lwbs. The writes corresponding to all the vdevs in the
1062  * lwb_vdev_tree will have completed by the time this is called, due to
1063  * the zio dependencies configured in zil_lwb_set_zio_dependency(),
1064  * which takes deferred flushes into account. The lwb will be "done"
1065  * once zil_lwb_flush_vdevs_done() is called, which occurs in the zio
1066  * completion callback for the lwb's root zio.
1067  */
1068 static void
1069 zil_lwb_write_done(zio_t *zio)
1070 {
1071         lwb_t *lwb = zio->io_private;
1072         spa_t *spa = zio->io_spa;
1073         zilog_t *zilog = lwb->lwb_zilog;
1074         avl_tree_t *t = &lwb->lwb_vdev_tree;
1075         void *cookie = NULL;
1076         zil_vdev_node_t *zv;
1077         lwb_t *nlwb;
1078 
1079         ASSERT3S(spa_config_held(spa, SCL_STATE, RW_READER), !=, 0);
1080 
1081         ASSERT(BP_GET_COMPRESS(zio->io_bp) == ZIO_COMPRESS_OFF);
1082         ASSERT(BP_GET_TYPE(zio->io_bp) == DMU_OT_INTENT_LOG);
1083         ASSERT(BP_GET_LEVEL(zio->io_bp) == 0);
1084         ASSERT(BP_GET_BYTEORDER(zio->io_bp) == ZFS_HOST_BYTEORDER);
1085         ASSERT(!BP_IS_GANG(zio->io_bp));
1086         ASSERT(!BP_IS_HOLE(zio->io_bp));
1087         ASSERT(BP_GET_FILL(zio->io_bp) == 0);
1088 
1089         abd_put(zio->io_abd);
1090 
1091         mutex_enter(&zilog->zl_lock);
1092         ASSERT3S(lwb->lwb_state, ==, LWB_STATE_ISSUED);
1093         lwb->lwb_state = LWB_STATE_WRITE_DONE;
1094         lwb->lwb_write_zio = NULL;
1095         nlwb = list_next(&zilog->zl_lwb_list, lwb);
1096         mutex_exit(&zilog->zl_lock);
1097 
1098         if (avl_numnodes(t) == 0)
1099                 return;
1100 
1101         /*
1102          * If there was an IO error, we're not going to call zio_flush()
1103          * on these vdevs, so we simply empty the tree and free the
1104          * nodes. We avoid calling zio_flush() since there isn't any
1105          * good reason for doing so, after the lwb block failed to be
1106          * written out.
1107          */
1108         if (zio->io_error != 0) {
1109                 while ((zv = avl_destroy_nodes(t, &cookie)) != NULL)
1110                         kmem_free(zv, sizeof (*zv));
1111                 return;
1112         }
1113 
1114         /*
1115          * If this lwb does not have any threads waiting for it to
1116          * complete, we want to defer issuing the DKIOCFLUSHWRITECACHE
1117          * command to the vdevs written to by "this" lwb, and instead
1118          * rely on the "next" lwb to handle the DKIOCFLUSHWRITECACHE
1119          * command for those vdevs. Thus, we merge the vdev tree of
1120          * "this" lwb with the vdev tree of the "next" lwb in the list,
1121          * and assume the "next" lwb will handle flushing the vdevs (or
1122          * deferring the flush(s) again).
1123          *
1124          * This is a useful performance optimization, especially for
1125          * workloads with lots of async write activity and few sync
1126          * write and/or fsync activity, as it has the potential to
1127          * coalesce multiple flush commands to a vdev into one.
1128          */
1129         if (list_head(&lwb->lwb_waiters) == NULL && nlwb != NULL) {
1130                 zil_lwb_flush_defer(lwb, nlwb);
1131                 ASSERT(avl_is_empty(&lwb->lwb_vdev_tree));
1132                 return;
1133         }
1134 
1135         while ((zv = avl_destroy_nodes(t, &cookie)) != NULL) {
1136                 vdev_t *vd = vdev_lookup_top(spa, zv->zv_vdev);
1137                 if (vd != NULL)
1138                         zio_flush(lwb->lwb_root_zio, vd);
1139                 kmem_free(zv, sizeof (*zv));
1140         }
1141 }
1142 
1143 static void
1144 zil_lwb_set_zio_dependency(zilog_t *zilog, lwb_t *lwb)
1145 {
1146         lwb_t *last_lwb_opened = zilog->zl_last_lwb_opened;
1147 
1148         ASSERT(MUTEX_HELD(&zilog->zl_issuer_lock));
1149         ASSERT(MUTEX_HELD(&zilog->zl_lock));
1150 
1151         /*
1152          * The zilog's "zl_last_lwb_opened" field is used to build the
1153          * lwb/zio dependency chain, which is used to preserve the
1154          * ordering of lwb completions that is required by the semantics
1155          * of the ZIL. Each new lwb zio becomes a parent of the
1156          * "previous" lwb zio, such that the new lwb's zio cannot
1157          * complete until the "previous" lwb's zio completes.
1158          *
1159          * This is required by the semantics of zil_commit(); the commit
1160          * waiters attached to the lwbs will be woken in the lwb zio's
1161          * completion callback, so this zio dependency graph ensures the
1162          * waiters are woken in the correct order (the same order the
1163          * lwbs were created).
1164          */
1165         if (last_lwb_opened != NULL &&
1166             last_lwb_opened->lwb_state != LWB_STATE_FLUSH_DONE) {
1167                 ASSERT(last_lwb_opened->lwb_state == LWB_STATE_OPENED ||
1168                     last_lwb_opened->lwb_state == LWB_STATE_ISSUED ||
1169                     last_lwb_opened->lwb_state == LWB_STATE_WRITE_DONE);
1170 
1171                 ASSERT3P(last_lwb_opened->lwb_root_zio, !=, NULL);
1172                 zio_add_child(lwb->lwb_root_zio,
1173                     last_lwb_opened->lwb_root_zio);
1174 
1175                 /*
1176                  * If the previous lwb's write hasn't already completed,
1177                  * we also want to order the completion of the lwb write
1178                  * zios (above, we only order the completion of the lwb
1179                  * root zios). This is required because of how we can
1180                  * defer the DKIOCFLUSHWRITECACHE commands for each lwb.
1181                  *
1182                  * When the DKIOCFLUSHWRITECACHE commands are defered,
1183                  * the previous lwb will rely on this lwb to flush the
1184                  * vdevs written to by that previous lwb. Thus, we need
1185                  * to ensure this lwb doesn't issue the flush until
1186                  * after the previous lwb's write completes. We ensure
1187                  * this ordering by setting the zio parent/child
1188                  * relationship here.
1189                  *
1190                  * Without this relationship on the lwb's write zio,
1191                  * it's possible for this lwb's write to complete prior
1192                  * to the previous lwb's write completing; and thus, the
1193                  * vdevs for the previous lwb would be flushed prior to
1194                  * that lwb's data being written to those vdevs (the
1195                  * vdevs are flushed in the lwb write zio's completion
1196                  * handler, zil_lwb_write_done()).
1197                  */
1198                 if (last_lwb_opened->lwb_state != LWB_STATE_WRITE_DONE) {
1199                         ASSERT(last_lwb_opened->lwb_state == LWB_STATE_OPENED ||
1200                             last_lwb_opened->lwb_state == LWB_STATE_ISSUED);
1201 
1202                         ASSERT3P(last_lwb_opened->lwb_write_zio, !=, NULL);
1203                         zio_add_child(lwb->lwb_write_zio,
1204                             last_lwb_opened->lwb_write_zio);
1205                 }
1206         }
1207 }
1208 
1209 
1210 /*
1211  * This function's purpose is to "open" an lwb such that it is ready to
1212  * accept new itxs being committed to it. To do this, the lwb's zio
1213  * structures are created, and linked to the lwb. This function is
1214  * idempotent; if the passed in lwb has already been opened, this
1215  * function is essentially a no-op.
1216  */
1217 static void
1218 zil_lwb_write_open(zilog_t *zilog, lwb_t *lwb)
1219 {
1220         zbookmark_phys_t zb;
1221         zio_priority_t prio;
1222 
1223         ASSERT(MUTEX_HELD(&zilog->zl_issuer_lock));
1224         ASSERT3P(lwb, !=, NULL);
1225         EQUIV(lwb->lwb_root_zio == NULL, lwb->lwb_state == LWB_STATE_CLOSED);
1226         EQUIV(lwb->lwb_root_zio != NULL, lwb->lwb_state == LWB_STATE_OPENED);
1227 
1228         SET_BOOKMARK(&zb, lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_OBJSET],
1229             ZB_ZIL_OBJECT, ZB_ZIL_LEVEL,
 
1234                     BP_GET_LSIZE(&lwb->lwb_blk));
1235 
1236                 if (!lwb->lwb_slog || zilog->zl_cur_used <= zil_slog_bulk)
1237                         prio = ZIO_PRIORITY_SYNC_WRITE;
1238                 else
1239                         prio = ZIO_PRIORITY_ASYNC_WRITE;
1240 
1241                 lwb->lwb_root_zio = zio_root(zilog->zl_spa,
1242                     zil_lwb_flush_vdevs_done, lwb, ZIO_FLAG_CANFAIL);
1243                 ASSERT3P(lwb->lwb_root_zio, !=, NULL);
1244 
1245                 lwb->lwb_write_zio = zio_rewrite(lwb->lwb_root_zio,
1246                     zilog->zl_spa, 0, &lwb->lwb_blk, lwb_abd,
1247                     BP_GET_LSIZE(&lwb->lwb_blk), zil_lwb_write_done, lwb,
1248                     prio, ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE, &zb);
1249                 ASSERT3P(lwb->lwb_write_zio, !=, NULL);
1250 
1251                 lwb->lwb_state = LWB_STATE_OPENED;
1252 
1253                 mutex_enter(&zilog->zl_lock);
1254                 zil_lwb_set_zio_dependency(zilog, lwb);
1255                 zilog->zl_last_lwb_opened = lwb;
1256                 mutex_exit(&zilog->zl_lock);
1257         }
1258 
1259         ASSERT3P(lwb->lwb_root_zio, !=, NULL);
1260         ASSERT3P(lwb->lwb_write_zio, !=, NULL);
1261         ASSERT3S(lwb->lwb_state, ==, LWB_STATE_OPENED);
1262 }
1263 
1264 /*
1265  * Define a limited set of intent log block sizes.
1266  *
1267  * These must be a multiple of 4KB. Note only the amount used (again
1268  * aligned to 4KB) actually gets written. However, we can't always just
1269  * allocate SPA_OLD_MAXBLOCKSIZE as the slog space could be exhausted.
1270  */
1271 uint64_t zil_block_buckets[] = {
1272     4096,               /* non TX_WRITE */
1273     8192+4096,          /* data base */
1274     32*1024 + 4096,     /* NFS writes */
1275     UINT64_MAX
 
1303         } else {
1304                 zilc = (zil_chain_t *)(lwb->lwb_buf + lwb->lwb_sz);
1305                 bp = &zilc->zc_next_blk;
1306         }
1307 
1308         ASSERT(lwb->lwb_nused <= lwb->lwb_sz);
1309 
1310         /*
1311          * Allocate the next block and save its address in this block
1312          * before writing it in order to establish the log chain.
1313          * Note that if the allocation of nlwb synced before we wrote
1314          * the block that points at it (lwb), we'd leak it if we crashed.
1315          * Therefore, we don't do dmu_tx_commit() until zil_lwb_write_done().
1316          * We dirty the dataset to ensure that zil_sync() will be called
1317          * to clean up in the event of allocation failure or I/O failure.
1318          */
1319 
1320         tx = dmu_tx_create(zilog->zl_os);
1321 
1322         /*
1323          * Since we are not going to create any new dirty data and we can even
1324          * help with clearing the existing dirty data, we should not be subject
1325          * to the dirty data based delays.
1326          * We (ab)use TXG_WAITED to bypass the delay mechanism.
1327          * One side effect from using TXG_WAITED is that dmu_tx_assign() can
1328          * fail if the pool is suspended.  Those are dramatic circumstances,
1329          * so we return NULL to signal that the normal ZIL processing is not
1330          * possible and txg_wait_synced() should be used to ensure that the data
1331          * is on disk.
1332          */
1333         error = dmu_tx_assign(tx, TXG_WAITED);
1334         if (error != 0) {
1335                 ASSERT3S(error, ==, EIO);
1336                 dmu_tx_abort(tx);
1337                 return (NULL);
1338         }
1339         dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
1340         txg = dmu_tx_get_txg(tx);
1341 
1342         lwb->lwb_tx = tx;
1343 
1344         /*
1345          * Log blocks are pre-allocated. Here we select the size of the next
1346          * block, based on size used in the last block.
1347          * - first find the smallest bucket that will fit the block from a
1348          *   limited set of block sizes. This is because it's faster to write
1349          *   blocks allocated from the same metaslab as they are adjacent or
1350          *   close.
1351          * - next find the maximum from the new suggested size and an array of
1352          *   previous sizes. This lessens a picket fence effect of wrongly
1353          *   guesssing the size if we have a stream of say 2k, 64k, 2k, 64k
1354          *   requests.
1355          *
1356          * Note we only write what is used, but we can't just allocate
1357          * the maximum block size because we can exhaust the available
1358          * pool log space.
 
1944  * completion, or b) skip them altogether.
1945  *
1946  * This is used as a performance optimization to prevent commit itxs
1947  * from generating new lwbs when it's unnecessary to do so.
1948  */
1949 static void
1950 zil_prune_commit_list(zilog_t *zilog)
1951 {
1952         itx_t *itx;
1953 
1954         ASSERT(MUTEX_HELD(&zilog->zl_issuer_lock));
1955 
1956         while (itx = list_head(&zilog->zl_itx_commit_list)) {
1957                 lr_t *lrc = &itx->itx_lr;
1958                 if (lrc->lrc_txtype != TX_COMMIT)
1959                         break;
1960 
1961                 mutex_enter(&zilog->zl_lock);
1962 
1963                 lwb_t *last_lwb = zilog->zl_last_lwb_opened;
1964                 if (last_lwb == NULL ||
1965                     last_lwb->lwb_state == LWB_STATE_FLUSH_DONE) {
1966                         /*
1967                          * All of the itxs this waiter was waiting on
1968                          * must have already completed (or there were
1969                          * never any itx's for it to wait on), so it's
1970                          * safe to skip this waiter and mark it done.
1971                          */
1972                         zil_commit_waiter_skip(itx->itx_private);
1973                 } else {
1974                         zil_commit_waiter_link_lwb(itx->itx_private, last_lwb);
1975                         itx->itx_private = NULL;
1976                 }
1977 
1978                 mutex_exit(&zilog->zl_lock);
1979 
1980                 list_remove(&zilog->zl_itx_commit_list, itx);
1981                 zil_itx_destroy(itx);
1982         }
1983 
1984         IMPLY(itx != NULL, itx->itx_lr.lrc_txtype != TX_COMMIT);
1985 }
 
2026         lwb_t *lwb;
2027         itx_t *itx;
2028 
2029         ASSERT(MUTEX_HELD(&zilog->zl_issuer_lock));
2030 
2031         /*
2032          * Return if there's nothing to commit before we dirty the fs by
2033          * calling zil_create().
2034          */
2035         if (list_head(&zilog->zl_itx_commit_list) == NULL)
2036                 return;
2037 
2038         list_create(&nolwb_waiters, sizeof (zil_commit_waiter_t),
2039             offsetof(zil_commit_waiter_t, zcw_node));
2040 
2041         lwb = list_tail(&zilog->zl_lwb_list);
2042         if (lwb == NULL) {
2043                 lwb = zil_create(zilog);
2044         } else {
2045                 ASSERT3S(lwb->lwb_state, !=, LWB_STATE_ISSUED);
2046                 ASSERT3S(lwb->lwb_state, !=, LWB_STATE_WRITE_DONE);
2047                 ASSERT3S(lwb->lwb_state, !=, LWB_STATE_FLUSH_DONE);
2048         }
2049 
2050         while (itx = list_head(&zilog->zl_itx_commit_list)) {
2051                 lr_t *lrc = &itx->itx_lr;
2052                 uint64_t txg = lrc->lrc_txg;
2053 
2054                 ASSERT3U(txg, !=, 0);
2055 
2056                 if (lrc->lrc_txtype == TX_COMMIT) {
2057                         DTRACE_PROBE2(zil__process__commit__itx,
2058                             zilog_t *, zilog, itx_t *, itx);
2059                 } else {
2060                         DTRACE_PROBE2(zil__process__normal__itx,
2061                             zilog_t *, zilog, itx_t *, itx);
2062                 }
2063 
2064                 boolean_t synced = txg <= spa_last_synced_txg(spa);
2065                 boolean_t frozen = txg > spa_freeze_txg(spa);
2066 
2067                 /*
 
2129                  * the ZIL write pipeline; see the comment within
2130                  * zil_commit_writer_stall() for more details.
2131                  */
2132                 zil_commit_writer_stall(zilog);
2133 
2134                 /*
2135                  * Additionally, we have to signal and mark the "nolwb"
2136                  * waiters as "done" here, since without an lwb, we
2137                  * can't do this via zil_lwb_flush_vdevs_done() like
2138                  * normal.
2139                  */
2140                 zil_commit_waiter_t *zcw;
2141                 while (zcw = list_head(&nolwb_waiters)) {
2142                         zil_commit_waiter_skip(zcw);
2143                         list_remove(&nolwb_waiters, zcw);
2144                 }
2145         } else {
2146                 ASSERT(list_is_empty(&nolwb_waiters));
2147                 ASSERT3P(lwb, !=, NULL);
2148                 ASSERT3S(lwb->lwb_state, !=, LWB_STATE_ISSUED);
2149                 ASSERT3S(lwb->lwb_state, !=, LWB_STATE_WRITE_DONE);
2150                 ASSERT3S(lwb->lwb_state, !=, LWB_STATE_FLUSH_DONE);
2151 
2152                 /*
2153                  * At this point, the ZIL block pointed at by the "lwb"
2154                  * variable is in one of the following states: "closed"
2155                  * or "open".
2156                  *
2157                  * If its "closed", then no itxs have been committed to
2158                  * it, so there's no point in issuing its zio (i.e.
2159                  * it's "empty").
2160                  *
2161                  * If its "open" state, then it contains one or more
2162                  * itxs that eventually need to be committed to stable
2163                  * storage. In this case we intentionally do not issue
2164                  * the lwb's zio to disk yet, and instead rely on one of
2165                  * the following two mechanisms for issuing the zio:
2166                  *
2167                  * 1. Ideally, there will be more ZIL activity occuring
2168                  * on the system, such that this function will be
2169                  * immediately called again (not necessarily by the same
2170                  * thread) and this lwb's zio will be issued via
 
2251 
2252 static void
2253 zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw)
2254 {
2255         ASSERT(!MUTEX_HELD(&zilog->zl_issuer_lock));
2256         ASSERT(MUTEX_HELD(&zcw->zcw_lock));
2257         ASSERT3B(zcw->zcw_done, ==, B_FALSE);
2258 
2259         lwb_t *lwb = zcw->zcw_lwb;
2260         ASSERT3P(lwb, !=, NULL);
2261         ASSERT3S(lwb->lwb_state, !=, LWB_STATE_CLOSED);
2262 
2263         /*
2264          * If the lwb has already been issued by another thread, we can
2265          * immediately return since there's no work to be done (the
2266          * point of this function is to issue the lwb). Additionally, we
2267          * do this prior to acquiring the zl_issuer_lock, to avoid
2268          * acquiring it when it's not necessary to do so.
2269          */
2270         if (lwb->lwb_state == LWB_STATE_ISSUED ||
2271             lwb->lwb_state == LWB_STATE_WRITE_DONE ||
2272             lwb->lwb_state == LWB_STATE_FLUSH_DONE)
2273                 return;
2274 
2275         /*
2276          * In order to call zil_lwb_write_issue() we must hold the
2277          * zilog's "zl_issuer_lock". We can't simply acquire that lock,
2278          * since we're already holding the commit waiter's "zcw_lock",
2279          * and those two locks are aquired in the opposite order
2280          * elsewhere.
2281          */
2282         mutex_exit(&zcw->zcw_lock);
2283         mutex_enter(&zilog->zl_issuer_lock);
2284         mutex_enter(&zcw->zcw_lock);
2285 
2286         /*
2287          * Since we just dropped and re-acquired the commit waiter's
2288          * lock, we have to re-check to see if the waiter was marked
2289          * "done" during that process. If the waiter was marked "done",
2290          * the "lwb" pointer is no longer valid (it can be free'd after
2291          * the waiter is marked "done"), so without this check we could
2292          * wind up with a use-after-free error below.
 
2300          * We've already checked this above, but since we hadn't acquired
2301          * the zilog's zl_issuer_lock, we have to perform this check a
2302          * second time while holding the lock.
2303          *
2304          * We don't need to hold the zl_lock since the lwb cannot transition
2305          * from OPENED to ISSUED while we hold the zl_issuer_lock. The lwb
2306          * _can_ transition from ISSUED to DONE, but it's OK to race with
2307          * that transition since we treat the lwb the same, whether it's in
2308          * the ISSUED or DONE states.
2309          *
2310          * The important thing, is we treat the lwb differently depending on
2311          * if it's ISSUED or OPENED, and block any other threads that might
2312          * attempt to issue this lwb. For that reason we hold the
2313          * zl_issuer_lock when checking the lwb_state; we must not call
2314          * zil_lwb_write_issue() if the lwb had already been issued.
2315          *
2316          * See the comment above the lwb_state_t structure definition for
2317          * more details on the lwb states, and locking requirements.
2318          */
2319         if (lwb->lwb_state == LWB_STATE_ISSUED ||
2320             lwb->lwb_state == LWB_STATE_WRITE_DONE ||
2321             lwb->lwb_state == LWB_STATE_FLUSH_DONE)
2322                 goto out;
2323 
2324         ASSERT3S(lwb->lwb_state, ==, LWB_STATE_OPENED);
2325 
2326         /*
2327          * As described in the comments above zil_commit_waiter() and
2328          * zil_process_commit_list(), we need to issue this lwb's zio
2329          * since we've reached the commit waiter's timeout and it still
2330          * hasn't been issued.
2331          */
2332         lwb_t *nlwb = zil_lwb_write_issue(zilog, lwb);
2333 
2334         ASSERT3S(lwb->lwb_state, !=, LWB_STATE_OPENED);
2335 
2336         /*
2337          * Since the lwb's zio hadn't been issued by the time this thread
2338          * reached its timeout, we reset the zilog's "zl_cur_used" field
2339          * to influence the zil block size selection algorithm.
2340          *
2341          * By having to issue the lwb's zio here, it means the size of the
 
 
2474                                  * isn't done.
2475                                  */
2476                                 ASSERT3P(lwb, ==, zcw->zcw_lwb);
2477                                 ASSERT3S(lwb->lwb_state, !=, LWB_STATE_OPENED);
2478                         }
2479                 } else {
2480                         /*
2481                          * If the lwb isn't open, then it must have already
2482                          * been issued. In that case, there's no need to
2483                          * use a timeout when waiting for the lwb to
2484                          * complete.
2485                          *
2486                          * Additionally, if the lwb is NULL, the waiter
2487                          * will soon be signalled and marked done via
2488                          * zil_clean() and zil_itxg_clean(), so no timeout
2489                          * is required.
2490                          */
2491 
2492                         IMPLY(lwb != NULL,
2493                             lwb->lwb_state == LWB_STATE_ISSUED ||
2494                             lwb->lwb_state == LWB_STATE_WRITE_DONE ||
2495                             lwb->lwb_state == LWB_STATE_FLUSH_DONE);
2496                         cv_wait(&zcw->zcw_cv, &zcw->zcw_lock);
2497                 }
2498         }
2499 
2500         mutex_exit(&zcw->zcw_lock);
2501 }
2502 
2503 static zil_commit_waiter_t *
2504 zil_alloc_commit_waiter()
2505 {
2506         zil_commit_waiter_t *zcw = kmem_cache_alloc(zil_zcw_cache, KM_SLEEP);
2507 
2508         cv_init(&zcw->zcw_cv, NULL, CV_DEFAULT, NULL);
2509         mutex_init(&zcw->zcw_lock, NULL, MUTEX_DEFAULT, NULL);
2510         list_link_init(&zcw->zcw_node);
2511         zcw->zcw_lwb = NULL;
2512         zcw->zcw_done = B_FALSE;
2513         zcw->zcw_zio_error = 0;
2514 
2515         return (zcw);
 
3117          * be active (e.g. filesystem not mounted), so there's nothing
3118          * to clean up.
3119          */
3120         if (BP_IS_HOLE(&zh->zh_log)) {
3121                 ASSERT(cookiep != NULL); /* fast path already handled */
3122 
3123                 *cookiep = os;
3124                 mutex_exit(&zilog->zl_lock);
3125                 return (0);
3126         }
3127 
3128         zilog->zl_suspending = B_TRUE;
3129         mutex_exit(&zilog->zl_lock);
3130 
3131         /*
3132          * We need to use zil_commit_impl to ensure we wait for all
3133          * LWB_STATE_OPENED and LWB_STATE_ISSUED lwb's to be committed
3134          * to disk before proceeding. If we used zil_commit instead, it
3135          * would just call txg_wait_synced(), because zl_suspend is set.
3136          * txg_wait_synced() doesn't wait for these lwb's to be
3137          * LWB_STATE_FLUSH_DONE before returning.
3138          */
3139         zil_commit_impl(zilog, 0);
3140 
3141         /*
3142          * Now that we've ensured all lwb's are LWB_STATE_FLUSH_DONE, we
3143          * use txg_wait_synced() to ensure the data from the zilog has
3144          * migrated to the main pool before calling zil_destroy().
3145          */
3146         txg_wait_synced(zilog->zl_dmu_pool, 0);
3147 
3148         zil_destroy(zilog, B_FALSE);
3149 
3150         mutex_enter(&zilog->zl_lock);
3151         zilog->zl_suspending = B_FALSE;
3152         cv_broadcast(&zilog->zl_cv_suspend);
3153         mutex_exit(&zilog->zl_lock);
3154 
3155         if (cookiep == NULL)
3156                 zil_resume(os);
3157         else
3158                 *cookiep = os;
3159         return (0);
3160 }
3161 
3162 void
3163 zil_resume(void *cookie)
 
3327 }
3328 
3329 boolean_t
3330 zil_replaying(zilog_t *zilog, dmu_tx_t *tx)
3331 {
3332         if (zilog->zl_sync == ZFS_SYNC_DISABLED)
3333                 return (B_TRUE);
3334 
3335         if (zilog->zl_replay) {
3336                 dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
3337                 zilog->zl_replayed_seq[dmu_tx_get_txg(tx) & TXG_MASK] =
3338                     zilog->zl_replaying_seq;
3339                 return (B_TRUE);
3340         }
3341 
3342         return (B_FALSE);
3343 }
3344 
3345 /* ARGSUSED */
3346 int
3347 zil_vdev_offline(const char *osname, void *arg)
3348 {
3349         int error;
3350 
3351         error = zil_suspend(osname, NULL);
3352         if (error != 0)
3353                 return (SET_ERROR(EEXIST));
3354         return (0);
3355 }
 |