Print this page
NEX-19742 A race between ARC and L2ARC causes system panic
Reviewed by: Joyce McIntosh <joyce.mcintosh@nexenta.com>
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
NEX-16904 Need to port Illumos Bug #9433 to fix ARC hit rate
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
Reviewed by: Yuri Pankov <yuri.pankov@nexenta.com>
NEX-15303 ARC-ABD logic works incorrect when deduplication is enabled
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
Reviewed by: Yuri Pankov <yuri.pankov@nexenta.com>
NEX-15303 ARC-ABD logic works incorrect when deduplication is enabled
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
Reviewed by: Yuri Pankov <yuri.pankov@nexenta.com>
NEX-15446 set zfs_ddt_limit_type to DDT_LIMIT_TO_ARC
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
NEX-15446 set zfs_ddt_limit_type to DDT_LIMIT_TO_ARC
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
NEX-14571 remove isal support remnants
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
NEX-9752 backport illumos 6950 ARC should cache compressed data
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
Reviewed by: Yuri Pankov <yuri.pankov@nexenta.com>
6950 ARC should cache compressed data
Reviewed by: Prakash Surya <prakash.surya@delphix.com>
Reviewed by: Dan Kimmel <dan.kimmel@delphix.com>
Reviewed by: Matt Ahrens <mahrens@delphix.com>
Reviewed by: Paul Dagnelie <pcd@delphix.com>
Reviewed by: Don Brady <don.brady@intel.com>
Reviewed by: Richard Elling <Richard.Elling@RichardElling.com>
Approved by: Richard Lowe <richlowe@richlowe.net>
NEX-8057 renaming of mount points should not be allowed (redo)
Reviewed by: Alek Pinchuk <alek@nexenta.com>
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
NEX-5785 zdb: assertion failed for thread 0xf8a20240, thread-id 130: mp->initialized == B_TRUE, file ../common/kernel.c, line 162
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexent.com>
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
Reviewed by: Yuri Pankov <yuri.pankov@nexenta.com>
NEX-4228 dedup arcstats are redundant
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
NEX-7317 Getting assert !refcount_is_zero(&scl->scl_count) when trying to import pool
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
NEX-5671 assertion: (ab->b_l2hdr.b_asize) >> (9) >= 1 (0x0 >= 0x1), file: ../../common/fs/zfs/arc.c, line: 8275
Reviewed by: Josef 'Jeff' Sipek <josef.sipek@nexenta.com>
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
Revert "Merge pull request #520 in OS/nza-kernel from ~SASO.KISELKOV/nza-kernel:NEX-5671-pl2arc-le_psize to master"
This reverts commit b63e91b939886744224854ea365d70e05ddd6077, reversing
changes made to a6e3a0255c8b22f65343bf641ffefaf9ae948fd4.
NEX-5671 assertion: (ab->b_l2hdr.b_asize) >> (9) >= 1 (0x0 >= 0x1), file: ../../common/fs/zfs/arc.c, line: 8275
Reviewed by: Josef 'Jeff' Sipek <josef.sipek@nexenta.com>
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
NEX-5058 WBC: Race between the purging of window and opening new one
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
Reviewed by: Alex Aizman <alex.aizman@nexenta.com>
NEX-2830 ZFS smart compression
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
6421 Add missing multilist_destroy calls to arc_fini
Reviewed by: Dan Kimmel <dan.kimmel@delphix.com>
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: Jorgen Lundman <lundman@lundman.net>
Approved by: Robert Mustacchi <rm@joyent.com>
6293 ztest failure: error == 28 (0xc == 0x1c) in ztest_tx_assign()
Reviewed by: George Wilson <george.wilson@delphix.com>
Reviewed by: Prakash Surya <prakash.surya@delphix.com>
Reviewed by: Richard Elling <Richard.Elling@RichardElling.com>
Approved by: Richard Lowe <richlowe@richlowe.net>
5219 l2arc_write_buffers() may write beyond target_sz
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: Saso Kiselkov <skiselkov@gmail.com>
Reviewed by: George Wilson <george@delphix.com>
Reviewed by: Steven Hartland <steven.hartland@multiplay.co.uk>
Reviewed by: Justin Gibbs <gibbs@FreeBSD.org>
Approved by: Matthew Ahrens <mahrens@delphix.com>
4185 add new cryptographic checksums to ZFS: SHA-512, Skein, Edon-R (fix studio build)
4185 add new cryptographic checksums to ZFS: SHA-512, Skein, Edon-R
Reviewed by: George Wilson <george.wilson@delphix.com>
Reviewed by: Prakash Surya <prakash.surya@delphix.com>
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
Reviewed by: Richard Lowe <richlowe@richlowe.net>
Approved by: Garrett D'Amore <garrett@damore.org>
6220 memleak in l2arc on debug build
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
Reviewed by: Simon Klinkert <simon.klinkert@gmail.com>
Reviewed by: George Wilson <george@delphix.com>
Approved by: Robert Mustacchi <rm@joyent.com>
5987 zfs prefetch code needs work
Reviewed by: Adam Leventhal <ahl@delphix.com>
Reviewed by: George Wilson <george.wilson@delphix.com>
Reviewed by: Paul Dagnelie <pcd@delphix.com>
Approved by: Gordon Ross <gordon.ross@nexenta.com>
5847 libzfs_diff should check zfs_prop_get() return
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: Albert Lee <trisk@omniti.com>
Approved by: Dan McDonald <danmcd@omniti.com>
5701 zpool list reports incorrect "alloc" value for cache devices
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: George Wilson <george@delphix.com>
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
Approved by: Dan McDonald <danmcd@omniti.com>
5817 change type of arcs_size from uint64_t to refcount_t
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: Paul Dagnelie <paul.dagnelie@delphix.com>
Reviewed by: Adam Leventhal <ahl@delphix.com>
Reviewed by: Alex Reece <alex@delphix.com>
Reviewed by: Richard Elling <richard.elling@richardelling.com>
Approved by: Garrett D'Amore <garrett@damore.org>
NEX-3879 L2ARC evict task allocates a useless struct
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
NEX-4408 backport illumos #6214 to avoid corruption (fix pL2ARC integration)
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
NEX-4408 backport illumos #6214 to avoid corruption
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
Reviewed by: Josef 'Jeff' Sipek <josef.sipek@nexenta.com>
NEX-3979 fix arc_mru/mfu typo
Reviewed by: Dan Fields <dan.fields@nexenta.com>
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
Reviewed by: Josef 'Jeff' Sipek <josef.sipek@nexenta.com>
NEX-3961 arc_meta_max is not counted correctly
Reviewed by: Dan Fields <dan.fields@nexenta.com>
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
Reviewed by: Josef 'Jeff' Sipek <josef.sipek@nexenta.com>
NEX-3946 Port Illumos 5983 to release-5.0
Reviewed by: Yuri Pankov <yuri.pankov@nexenta.com>
Reviewed by: Jean McCormack <jean.maccormack@nexenta.com>
NEX-3945 file-backed cache devices considered harmful
Reviewed by: Alek Pinchuk <alek@nexenta.com>
NEX-3541 Implement persistent L2ARC - fix build breakage in libzpool (v2).
NEX-3541 Implement persistent L2ARC
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
Reviewed by: Josef Sipek <josef.sipek@nexenta.com>
Conflicts:
        usr/src/uts/common/fs/zfs/sys/spa.h
NEX-3630 Backport illumos #5701 from master to 5.0
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
NEX-3558 KRRP Integration
NEX-3387 ARC stats appear to be in wrong/weird order
Reviewed by: Kirill Davydychev <kirill.davydychev@nexenta.com>
Reviewed by: Josef 'Jeff' Sipek <josef.sipek@nexenta.com>
NEX-3296 turn on DDT limit by default
Reviewed by: Josef 'Jeff' Sipek <josef.sipek@nexenta.com>
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
NEX-3300 ddt byte count ceiling tunables should not depend on zfs_ddt_limit_type being set
Reviewed by: Josef 'Jeff' Sipek <josef.sipek@nexenta.com>
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
NEX-3165 need some dedup improvements
Reviewed by: Josef 'Jeff' Sipek <josef.sipek@nexenta.com>
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
 NEX-3165 segregate ddt in arc (other lint fix)
Reviewed by: Jean McCormack <jean.mccormack@nexenta.com>
Reviewed by: Rob Gittins <rob.gittins@nexenta.com>
NEX-3165 segregate ddt in arc
NEX-3079 port illumos ARC improvements
NEX-2301 zpool destroy assertion failed: vd->vdev_stat.vs_alloc == 0 (part 2)
NEX-2704 smbstat man page needs update
NEX-2301 zpool destroy assertion failed: vd->vdev_stat.vs_alloc == 0
3995 Memory leak of compressed buffers in l2arc_write_done
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: Garrett D'Amore <garrett@damore.org>
Approved by: Garrett D'Amore <garrett@damore.org>
4370 avoid transmitting holes during zfs send
4371 DMU code clean up
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: George Wilson <george.wilson@delphix.com>
Reviewed by: Christopher Siden <christopher.siden@delphix.com>
Reviewed by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
Approved by: Garrett D'Amore <garrett@damore.org>
OS-80 support for vdev and CoS properties for the new I/O scheduler
OS-95 lint warning introduced by OS-61
NEX-463: bumped max queue size for L2ARC async evict
Maximum length of a taskq used for async arc and l2arc flush is
now a tuneable (zfs_flush_ntasks) that is initialized to 64.
The number is equally arbitrary, yet higher than original 4.
Real fix should rework l2arc evict according to OS-53, but for now
just longer queue should suffice.
Support for secondarycache=data option
Align mutex tables in arc.c and dbuf.c to 64 bytes (cache line), place each kmutex_t on cache line by itself to avoid false sharing
re #14119 BAD-TRAP panic under load
re #13989 port of illumos-3805
3805 arc shouldn't cache freed blocks
Reviewed by: George Wilson <george.wilson@delphix.com>
Reviewed by: Christopher Siden <christopher.siden@delphix.com>
Reviewed by: Richard Elling <richard.elling@dey-sys.com>
Reviewed by: Will Andrews <will@firepipe.net>
Approved by: Dan McDonald <danmcd@nexenta.com>
re #13729 assign each ARC hash bucket its own mutex
In ARC the number of buckets in buffer header hash table is
proportional to the size of physical RAM.
The number of locks protecting headers in the buckets is fixed to 256 though.
Hence, on systems with large memory (>= 128GB) too many unrelated buffer
headers are protected by the same mutex.
When the memory in the system is fragmented this may cause a deadlock:
- An arc_read thread may be trying to allocate a 128k buffer while holding
a header lock.
- The allocation uses KM_PUSHPAGE option that blocks the thread if no contigous
chunk of requested size is available.
- ARC eviction thread that is supposed to evict some buffers would call
an evict callback on one of the buffers.
- Before freing the memory, the callback will attempt to take a lock on buffer
header.
- Incidentally, this buffer header will be protected by the same lock as
the one in arc_read() thread.
The solution in this patch is not perfect - that is, it protects all headers
in the hash bucket by the same lock.
However, a probability of collision is very low and does not depend on memory
size.
By the same argument, padding locks to cacheline looks like a waste of memory
here since the probability of contention on a cacheline is quite low, given
the number of buckets, number of locks per cacheline (4) and the fact that
the hash function (crc64 % hash table size) is supposed to be a very good
randomizer.
This effect on memory usage is as follows:
Per hash table size n,
- Original code uses 16K + 16 + n * 8 bytes of memory
- This fix uses 2 * n * 8 + 8 bytes of memory
- The net memory overhead is therefore n * 8 - 16K - 8 bytes
The value of n grows proportionally to physical memory size.
For 128GB of physical memory it is 2M, so the memory overhead is
16M - 16K - 8 bytes.
For smaller memory configurations the overhead is proportionally smaller, and
for larger memory configurations it is propottionally bigger.
The patch has been tested for 30+ hours using vdbench script that reproduces
hang with original code 100% of times in 20-30 minutes.
re #10054 rb4467 Support for asynchronous ARC/L2ARC eviction
re #13165 rb4265 zfs-monitor should fallback to using DEV_BSIZE
re #10054 rb4249 Long export time causes failover to fail
        
@@ -21,11 +21,11 @@
 /*
  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  * Copyright (c) 2018, Joyent, Inc.
  * Copyright (c) 2011, 2017 by Delphix. All rights reserved.
  * Copyright (c) 2014 by Saso Kiselkov. All rights reserved.
- * Copyright 2017 Nexenta Systems, Inc.  All rights reserved.
+ * Copyright 2019 Nexenta Systems, Inc.  All rights reserved.
  */
 
 /*
  * DVA-based Adjustable Replacement Cache
  *
@@ -251,10 +251,11 @@
  * like the physical block in the main data pool before comparing the
  * checksum and determining its validity.
  */
 
 #include <sys/spa.h>
+#include <sys/spa_impl.h>
 #include <sys/zio.h>
 #include <sys/spa_impl.h>
 #include <sys/zio_compress.h>
 #include <sys/zio_checksum.h>
 #include <sys/zfs_context.h>
@@ -273,12 +274,12 @@
 #include <sys/dnlc.h>
 #endif
 #include <sys/callb.h>
 #include <sys/kstat.h>
 #include <zfs_fletcher.h>
-#include <sys/aggsum.h>
-#include <sys/cityhash.h>
+#include <sys/byteorder.h>
+#include <sys/spa_impl.h>
 
 #ifndef _KERNEL
 /* set with ZFS_DEBUG=watch, to enable watchpoints on frozen buffers */
 boolean_t arc_watch = B_FALSE;
 int arc_procfd;
@@ -355,15 +356,35 @@
  */
 uint64_t zfs_arc_max;
 uint64_t zfs_arc_min;
 uint64_t zfs_arc_meta_limit = 0;
 uint64_t zfs_arc_meta_min = 0;
+uint64_t zfs_arc_ddt_limit = 0;
+/*
+ * Tunable to control "dedup ceiling"
+ * Possible values:
+ *  DDT_NO_LIMIT        - default behaviour, ie no ceiling
+ *  DDT_LIMIT_TO_ARC    - stop DDT growth if DDT is bigger than it's "ARC space"
+ *  DDT_LIMIT_TO_L2ARC  - stop DDT growth when DDT size is bigger than the
+ *                        L2ARC DDT dev(s) for that pool
+ */
+zfs_ddt_limit_t zfs_ddt_limit_type = DDT_LIMIT_TO_ARC;
+/*
+ * Alternative to the above way of controlling "dedup ceiling":
+ * Stop DDT growth when in core DDTs size is above the below tunable.
+ * This tunable overrides the zfs_ddt_limit_type tunable.
+ */
+uint64_t zfs_ddt_byte_ceiling = 0;
+boolean_t zfs_arc_segregate_ddt = B_TRUE;
 int zfs_arc_grow_retry = 0;
 int zfs_arc_shrink_shift = 0;
 int zfs_arc_p_min_shift = 0;
 int zfs_arc_average_blocksize = 8 * 1024; /* 8KB */
 
+/* Tuneable, default is 64, which is essentially arbitrary */
+int zfs_flush_ntasks = 64;
+
 boolean_t zfs_compressed_arc_enabled = B_TRUE;
 
 /*
  * Note that buffers can be in one of 6 states:
  *      ARC_anon        - anonymous (discussed below)
@@ -405,15 +426,32 @@
          * total amount of evictable data in this state
          */
         refcount_t arcs_esize[ARC_BUFC_NUMTYPES];
         /*
          * total amount of data in this state; this includes: evictable,
-         * non-evictable, ARC_BUFC_DATA, and ARC_BUFC_METADATA.
+         * non-evictable, ARC_BUFC_DATA, ARC_BUFC_METADATA and ARC_BUFC_DDT.
+         * ARC_BUFC_DDT list is only populated when zfs_arc_segregate_ddt is
+         * true.
          */
         refcount_t arcs_size;
 } arc_state_t;
 
+/*
+ * We loop through these in l2arc_write_buffers() starting from
+ * PRIORITY_MFU_DDT until we reach PRIORITY_NUMTYPES or the buffer that we
+ * will be writing to L2ARC dev gets full.
+ */
+enum l2arc_priorities {
+        PRIORITY_MFU_DDT,
+        PRIORITY_MRU_DDT,
+        PRIORITY_MFU_META,
+        PRIORITY_MRU_META,
+        PRIORITY_MFU_DATA,
+        PRIORITY_MRU_DATA,
+        PRIORITY_NUMTYPES,
+};
+
 /* The 6 states: */
 static arc_state_t ARC_anon;
 static arc_state_t ARC_mru;
 static arc_state_t ARC_mru_ghost;
 static arc_state_t ARC_mfu;
@@ -420,19 +458,24 @@
 static arc_state_t ARC_mfu_ghost;
 static arc_state_t ARC_l2c_only;
 
 typedef struct arc_stats {
         kstat_named_t arcstat_hits;
+        kstat_named_t arcstat_ddt_hits;
         kstat_named_t arcstat_misses;
         kstat_named_t arcstat_demand_data_hits;
         kstat_named_t arcstat_demand_data_misses;
         kstat_named_t arcstat_demand_metadata_hits;
         kstat_named_t arcstat_demand_metadata_misses;
+        kstat_named_t arcstat_demand_ddt_hits;
+        kstat_named_t arcstat_demand_ddt_misses;
         kstat_named_t arcstat_prefetch_data_hits;
         kstat_named_t arcstat_prefetch_data_misses;
         kstat_named_t arcstat_prefetch_metadata_hits;
         kstat_named_t arcstat_prefetch_metadata_misses;
+        kstat_named_t arcstat_prefetch_ddt_hits;
+        kstat_named_t arcstat_prefetch_ddt_misses;
         kstat_named_t arcstat_mru_hits;
         kstat_named_t arcstat_mru_ghost_hits;
         kstat_named_t arcstat_mfu_hits;
         kstat_named_t arcstat_mfu_ghost_hits;
         kstat_named_t arcstat_deleted;
@@ -442,12 +485,17 @@
          * by something using the same buffer, since hash locks are shared
          * by multiple buffers.
          */
         kstat_named_t arcstat_mutex_miss;
         /*
+         * Number of buffers skipped when updating the access state due to the
+         * header having already been released after acquiring the hash lock.
+         */
+        kstat_named_t arcstat_access_skip;
+        /*
          * Number of buffers skipped because they have I/O in progress, are
-         * indrect prefetch buffers that have not lived long enough, or are
+         * indirect prefetch buffers that have not lived long enough, or are
          * not from the spa we're trying to evict from.
          */
         kstat_named_t arcstat_evict_skip;
         /*
          * Number of times arc_evict_state() was unable to evict enough
@@ -465,11 +513,10 @@
         kstat_named_t arcstat_hash_chain_max;
         kstat_named_t arcstat_p;
         kstat_named_t arcstat_c;
         kstat_named_t arcstat_c_min;
         kstat_named_t arcstat_c_max;
-        /* Not updated directly; only synced in arc_kstat_update. */
         kstat_named_t arcstat_size;
         /*
          * Number of compressed bytes stored in the arc_buf_hdr_t's b_pabd.
          * Note that the compressed bytes may match the uncompressed bytes
          * if the block is either not compressed or compressed arc is disabled.
@@ -494,159 +541,186 @@
          * for tracking purposes; these structures are not actually
          * backed by ARC buffers. This includes arc_buf_hdr_t structures
          * (allocated via arc_buf_hdr_t_full and arc_buf_hdr_t_l2only
          * caches), and arc_buf_t structures (allocated via arc_buf_t
          * cache).
-         * Not updated directly; only synced in arc_kstat_update.
          */
         kstat_named_t arcstat_hdr_size;
         /*
          * Number of bytes consumed by ARC buffers of type equal to
          * ARC_BUFC_DATA. This is generally consumed by buffers backing
          * on disk user data (e.g. plain file contents).
-         * Not updated directly; only synced in arc_kstat_update.
          */
         kstat_named_t arcstat_data_size;
         /*
          * Number of bytes consumed by ARC buffers of type equal to
          * ARC_BUFC_METADATA. This is generally consumed by buffers
          * backing on disk data that is used for internal ZFS
          * structures (e.g. ZAP, dnode, indirect blocks, etc).
-         * Not updated directly; only synced in arc_kstat_update.
          */
         kstat_named_t arcstat_metadata_size;
         /*
+         * Number of bytes consumed by ARC buffers of type equal to
+         * ARC_BUFC_DDT. This is consumed by buffers backing on disk data
+         * that is used to store DDT (ZAP, ddt stats).
+         * Only used if zfs_arc_segregate_ddt is true.
+         */
+        kstat_named_t arcstat_ddt_size;
+        /*
          * Number of bytes consumed by various buffers and structures
          * not actually backed with ARC buffers. This includes bonus
          * buffers (allocated directly via zio_buf_* functions),
          * dmu_buf_impl_t structures (allocated via dmu_buf_impl_t
          * cache), and dnode_t structures (allocated via dnode_t cache).
-         * Not updated directly; only synced in arc_kstat_update.
          */
         kstat_named_t arcstat_other_size;
         /*
          * Total number of bytes consumed by ARC buffers residing in the
          * arc_anon state. This includes *all* buffers in the arc_anon
          * state; e.g. data, metadata, evictable, and unevictable buffers
          * are all included in this value.
-         * Not updated directly; only synced in arc_kstat_update.
          */
         kstat_named_t arcstat_anon_size;
         /*
          * Number of bytes consumed by ARC buffers that meet the
          * following criteria: backing buffers of type ARC_BUFC_DATA,
          * residing in the arc_anon state, and are eligible for eviction
          * (e.g. have no outstanding holds on the buffer).
-         * Not updated directly; only synced in arc_kstat_update.
          */
         kstat_named_t arcstat_anon_evictable_data;
         /*
          * Number of bytes consumed by ARC buffers that meet the
          * following criteria: backing buffers of type ARC_BUFC_METADATA,
          * residing in the arc_anon state, and are eligible for eviction
          * (e.g. have no outstanding holds on the buffer).
-         * Not updated directly; only synced in arc_kstat_update.
          */
         kstat_named_t arcstat_anon_evictable_metadata;
         /*
+         * Number of bytes consumed by ARC buffers that meet the
+         * following criteria: backing buffers of type ARC_BUFC_DDT,
+         * residing in the arc_anon state, and are eligible for eviction
+         * Only used if zfs_arc_segregate_ddt is true.
+         */
+        kstat_named_t arcstat_anon_evictable_ddt;
+        /*
          * Total number of bytes consumed by ARC buffers residing in the
          * arc_mru state. This includes *all* buffers in the arc_mru
          * state; e.g. data, metadata, evictable, and unevictable buffers
          * are all included in this value.
-         * Not updated directly; only synced in arc_kstat_update.
          */
         kstat_named_t arcstat_mru_size;
         /*
          * Number of bytes consumed by ARC buffers that meet the
          * following criteria: backing buffers of type ARC_BUFC_DATA,
          * residing in the arc_mru state, and are eligible for eviction
          * (e.g. have no outstanding holds on the buffer).
-         * Not updated directly; only synced in arc_kstat_update.
          */
         kstat_named_t arcstat_mru_evictable_data;
         /*
          * Number of bytes consumed by ARC buffers that meet the
          * following criteria: backing buffers of type ARC_BUFC_METADATA,
          * residing in the arc_mru state, and are eligible for eviction
          * (e.g. have no outstanding holds on the buffer).
-         * Not updated directly; only synced in arc_kstat_update.
          */
         kstat_named_t arcstat_mru_evictable_metadata;
         /*
+         * Number of bytes consumed by ARC buffers that meet the
+         * following criteria: backing buffers of type ARC_BUFC_DDT,
+         * residing in the arc_mru state, and are eligible for eviction
+         * (e.g. have no outstanding holds on the buffer).
+         * Only used if zfs_arc_segregate_ddt is true.
+         */
+        kstat_named_t arcstat_mru_evictable_ddt;
+        /*
          * Total number of bytes that *would have been* consumed by ARC
          * buffers in the arc_mru_ghost state. The key thing to note
          * here, is the fact that this size doesn't actually indicate
          * RAM consumption. The ghost lists only consist of headers and
          * don't actually have ARC buffers linked off of these headers.
          * Thus, *if* the headers had associated ARC buffers, these
          * buffers *would have* consumed this number of bytes.
-         * Not updated directly; only synced in arc_kstat_update.
          */
         kstat_named_t arcstat_mru_ghost_size;
         /*
          * Number of bytes that *would have been* consumed by ARC
          * buffers that are eligible for eviction, of type
          * ARC_BUFC_DATA, and linked off the arc_mru_ghost state.
-         * Not updated directly; only synced in arc_kstat_update.
          */
         kstat_named_t arcstat_mru_ghost_evictable_data;
         /*
          * Number of bytes that *would have been* consumed by ARC
          * buffers that are eligible for eviction, of type
          * ARC_BUFC_METADATA, and linked off the arc_mru_ghost state.
-         * Not updated directly; only synced in arc_kstat_update.
          */
         kstat_named_t arcstat_mru_ghost_evictable_metadata;
         /*
+         * Number of bytes that *would have been* consumed by ARC
+         * buffers that are eligible for eviction, of type
+         * ARC_BUFC_DDT, and linked off the arc_mru_ghost state.
+         * Only used if zfs_arc_segregate_ddt is true.
+         */
+        kstat_named_t arcstat_mru_ghost_evictable_ddt;
+        /*
          * Total number of bytes consumed by ARC buffers residing in the
          * arc_mfu state. This includes *all* buffers in the arc_mfu
          * state; e.g. data, metadata, evictable, and unevictable buffers
          * are all included in this value.
-         * Not updated directly; only synced in arc_kstat_update.
          */
         kstat_named_t arcstat_mfu_size;
         /*
          * Number of bytes consumed by ARC buffers that are eligible for
          * eviction, of type ARC_BUFC_DATA, and reside in the arc_mfu
          * state.
-         * Not updated directly; only synced in arc_kstat_update.
          */
         kstat_named_t arcstat_mfu_evictable_data;
         /*
          * Number of bytes consumed by ARC buffers that are eligible for
          * eviction, of type ARC_BUFC_METADATA, and reside in the
          * arc_mfu state.
-         * Not updated directly; only synced in arc_kstat_update.
          */
         kstat_named_t arcstat_mfu_evictable_metadata;
         /*
+         * Number of bytes consumed by ARC buffers that are eligible for
+         * eviction, of type ARC_BUFC_DDT, and reside in the
+         * arc_mfu state.
+         * Only used if zfs_arc_segregate_ddt is true.
+         */
+        kstat_named_t arcstat_mfu_evictable_ddt;
+        /*
          * Total number of bytes that *would have been* consumed by ARC
          * buffers in the arc_mfu_ghost state. See the comment above
          * arcstat_mru_ghost_size for more details.
-         * Not updated directly; only synced in arc_kstat_update.
          */
         kstat_named_t arcstat_mfu_ghost_size;
         /*
          * Number of bytes that *would have been* consumed by ARC
          * buffers that are eligible for eviction, of type
          * ARC_BUFC_DATA, and linked off the arc_mfu_ghost state.
-         * Not updated directly; only synced in arc_kstat_update.
          */
         kstat_named_t arcstat_mfu_ghost_evictable_data;
         /*
          * Number of bytes that *would have been* consumed by ARC
          * buffers that are eligible for eviction, of type
          * ARC_BUFC_METADATA, and linked off the arc_mru_ghost state.
-         * Not updated directly; only synced in arc_kstat_update.
          */
         kstat_named_t arcstat_mfu_ghost_evictable_metadata;
+        /*
+         * Number of bytes that *would have been* consumed by ARC
+         * buffers that are eligible for eviction, of type
+         * ARC_BUFC_DDT, and linked off the arc_mru_ghost state.
+         * Only used if zfs_arc_segregate_ddt is true.
+         */
+        kstat_named_t arcstat_mfu_ghost_evictable_ddt;
         kstat_named_t arcstat_l2_hits;
+        kstat_named_t arcstat_l2_ddt_hits;
         kstat_named_t arcstat_l2_misses;
         kstat_named_t arcstat_l2_feeds;
         kstat_named_t arcstat_l2_rw_clash;
         kstat_named_t arcstat_l2_read_bytes;
+        kstat_named_t arcstat_l2_ddt_read_bytes;
         kstat_named_t arcstat_l2_write_bytes;
+        kstat_named_t arcstat_l2_ddt_write_bytes;
         kstat_named_t arcstat_l2_writes_sent;
         kstat_named_t arcstat_l2_writes_done;
         kstat_named_t arcstat_l2_writes_error;
         kstat_named_t arcstat_l2_writes_lock_retry;
         kstat_named_t arcstat_l2_evict_lock_retry;
@@ -656,39 +730,58 @@
         kstat_named_t arcstat_l2_abort_lowmem;
         kstat_named_t arcstat_l2_cksum_bad;
         kstat_named_t arcstat_l2_io_error;
         kstat_named_t arcstat_l2_lsize;
         kstat_named_t arcstat_l2_psize;
-        /* Not updated directly; only synced in arc_kstat_update. */
         kstat_named_t arcstat_l2_hdr_size;
+        kstat_named_t arcstat_l2_log_blk_writes;
+        kstat_named_t arcstat_l2_log_blk_avg_size;
+        kstat_named_t arcstat_l2_data_to_meta_ratio;
+        kstat_named_t arcstat_l2_rebuild_successes;
+        kstat_named_t arcstat_l2_rebuild_abort_unsupported;
+        kstat_named_t arcstat_l2_rebuild_abort_io_errors;
+        kstat_named_t arcstat_l2_rebuild_abort_cksum_errors;
+        kstat_named_t arcstat_l2_rebuild_abort_loop_errors;
+        kstat_named_t arcstat_l2_rebuild_abort_lowmem;
+        kstat_named_t arcstat_l2_rebuild_size;
+        kstat_named_t arcstat_l2_rebuild_bufs;
+        kstat_named_t arcstat_l2_rebuild_bufs_precached;
+        kstat_named_t arcstat_l2_rebuild_psize;
+        kstat_named_t arcstat_l2_rebuild_log_blks;
         kstat_named_t arcstat_memory_throttle_count;
-        /* Not updated directly; only synced in arc_kstat_update. */
         kstat_named_t arcstat_meta_used;
         kstat_named_t arcstat_meta_limit;
         kstat_named_t arcstat_meta_max;
         kstat_named_t arcstat_meta_min;
+        kstat_named_t arcstat_ddt_limit;
         kstat_named_t arcstat_sync_wait_for_async;
         kstat_named_t arcstat_demand_hit_predictive_prefetch;
 } arc_stats_t;
 
 static arc_stats_t arc_stats = {
         { "hits",                       KSTAT_DATA_UINT64 },
+        { "ddt_hits",                   KSTAT_DATA_UINT64 },
         { "misses",                     KSTAT_DATA_UINT64 },
         { "demand_data_hits",           KSTAT_DATA_UINT64 },
         { "demand_data_misses",         KSTAT_DATA_UINT64 },
         { "demand_metadata_hits",       KSTAT_DATA_UINT64 },
         { "demand_metadata_misses",     KSTAT_DATA_UINT64 },
+        { "demand_ddt_hits",            KSTAT_DATA_UINT64 },
+        { "demand_ddt_misses",          KSTAT_DATA_UINT64 },
         { "prefetch_data_hits",         KSTAT_DATA_UINT64 },
         { "prefetch_data_misses",       KSTAT_DATA_UINT64 },
         { "prefetch_metadata_hits",     KSTAT_DATA_UINT64 },
         { "prefetch_metadata_misses",   KSTAT_DATA_UINT64 },
+        { "prefetch_ddt_hits",          KSTAT_DATA_UINT64 },
+        { "prefetch_ddt_misses",        KSTAT_DATA_UINT64 },
         { "mru_hits",                   KSTAT_DATA_UINT64 },
         { "mru_ghost_hits",             KSTAT_DATA_UINT64 },
         { "mfu_hits",                   KSTAT_DATA_UINT64 },
         { "mfu_ghost_hits",             KSTAT_DATA_UINT64 },
         { "deleted",                    KSTAT_DATA_UINT64 },
         { "mutex_miss",                 KSTAT_DATA_UINT64 },
+        { "access_skip",                KSTAT_DATA_UINT64 },
         { "evict_skip",                 KSTAT_DATA_UINT64 },
         { "evict_not_enough",           KSTAT_DATA_UINT64 },
         { "evict_l2_cached",            KSTAT_DATA_UINT64 },
         { "evict_l2_eligible",          KSTAT_DATA_UINT64 },
         { "evict_l2_ineligible",        KSTAT_DATA_UINT64 },
@@ -707,32 +800,41 @@
         { "uncompressed_size",          KSTAT_DATA_UINT64 },
         { "overhead_size",              KSTAT_DATA_UINT64 },
         { "hdr_size",                   KSTAT_DATA_UINT64 },
         { "data_size",                  KSTAT_DATA_UINT64 },
         { "metadata_size",              KSTAT_DATA_UINT64 },
+        { "ddt_size",                   KSTAT_DATA_UINT64 },
         { "other_size",                 KSTAT_DATA_UINT64 },
         { "anon_size",                  KSTAT_DATA_UINT64 },
         { "anon_evictable_data",        KSTAT_DATA_UINT64 },
         { "anon_evictable_metadata",    KSTAT_DATA_UINT64 },
+        { "anon_evictable_ddt",         KSTAT_DATA_UINT64 },
         { "mru_size",                   KSTAT_DATA_UINT64 },
         { "mru_evictable_data",         KSTAT_DATA_UINT64 },
         { "mru_evictable_metadata",     KSTAT_DATA_UINT64 },
+        { "mru_evictable_ddt",          KSTAT_DATA_UINT64 },
         { "mru_ghost_size",             KSTAT_DATA_UINT64 },
         { "mru_ghost_evictable_data",   KSTAT_DATA_UINT64 },
         { "mru_ghost_evictable_metadata", KSTAT_DATA_UINT64 },
+        { "mru_ghost_evictable_ddt",    KSTAT_DATA_UINT64 },
         { "mfu_size",                   KSTAT_DATA_UINT64 },
         { "mfu_evictable_data",         KSTAT_DATA_UINT64 },
         { "mfu_evictable_metadata",     KSTAT_DATA_UINT64 },
+        { "mfu_evictable_ddt",          KSTAT_DATA_UINT64 },
         { "mfu_ghost_size",             KSTAT_DATA_UINT64 },
         { "mfu_ghost_evictable_data",   KSTAT_DATA_UINT64 },
         { "mfu_ghost_evictable_metadata", KSTAT_DATA_UINT64 },
+        { "mfu_ghost_evictable_ddt",    KSTAT_DATA_UINT64 },
         { "l2_hits",                    KSTAT_DATA_UINT64 },
+        { "l2_ddt_hits",                KSTAT_DATA_UINT64 },
         { "l2_misses",                  KSTAT_DATA_UINT64 },
         { "l2_feeds",                   KSTAT_DATA_UINT64 },
         { "l2_rw_clash",                KSTAT_DATA_UINT64 },
         { "l2_read_bytes",              KSTAT_DATA_UINT64 },
+        { "l2_ddt_read_bytes",          KSTAT_DATA_UINT64 },
         { "l2_write_bytes",             KSTAT_DATA_UINT64 },
+        { "l2_ddt_write_bytes",         KSTAT_DATA_UINT64 },
         { "l2_writes_sent",             KSTAT_DATA_UINT64 },
         { "l2_writes_done",             KSTAT_DATA_UINT64 },
         { "l2_writes_error",            KSTAT_DATA_UINT64 },
         { "l2_writes_lock_retry",       KSTAT_DATA_UINT64 },
         { "l2_evict_lock_retry",        KSTAT_DATA_UINT64 },
@@ -743,15 +845,30 @@
         { "l2_cksum_bad",               KSTAT_DATA_UINT64 },
         { "l2_io_error",                KSTAT_DATA_UINT64 },
         { "l2_size",                    KSTAT_DATA_UINT64 },
         { "l2_asize",                   KSTAT_DATA_UINT64 },
         { "l2_hdr_size",                KSTAT_DATA_UINT64 },
+        { "l2_log_blk_writes",          KSTAT_DATA_UINT64 },
+        { "l2_log_blk_avg_size",        KSTAT_DATA_UINT64 },
+        { "l2_data_to_meta_ratio",      KSTAT_DATA_UINT64 },
+        { "l2_rebuild_successes",       KSTAT_DATA_UINT64 },
+        { "l2_rebuild_unsupported",     KSTAT_DATA_UINT64 },
+        { "l2_rebuild_io_errors",       KSTAT_DATA_UINT64 },
+        { "l2_rebuild_cksum_errors",    KSTAT_DATA_UINT64 },
+        { "l2_rebuild_loop_errors",     KSTAT_DATA_UINT64 },
+        { "l2_rebuild_lowmem",          KSTAT_DATA_UINT64 },
+        { "l2_rebuild_size",            KSTAT_DATA_UINT64 },
+        { "l2_rebuild_bufs",            KSTAT_DATA_UINT64 },
+        { "l2_rebuild_bufs_precached",  KSTAT_DATA_UINT64 },
+        { "l2_rebuild_psize",           KSTAT_DATA_UINT64 },
+        { "l2_rebuild_log_blks",        KSTAT_DATA_UINT64 },
         { "memory_throttle_count",      KSTAT_DATA_UINT64 },
         { "arc_meta_used",              KSTAT_DATA_UINT64 },
         { "arc_meta_limit",             KSTAT_DATA_UINT64 },
         { "arc_meta_max",               KSTAT_DATA_UINT64 },
         { "arc_meta_min",               KSTAT_DATA_UINT64 },
+        { "arc_ddt_limit",              KSTAT_DATA_UINT64 },
         { "sync_wait_for_async",        KSTAT_DATA_UINT64 },
         { "demand_hit_predictive_prefetch", KSTAT_DATA_UINT64 },
 };
 
 #define ARCSTAT(stat)   (arc_stats.stat.value.ui64)
@@ -778,22 +895,40 @@
  * each of hits and misses (so eight statistics total).
  */
 #define ARCSTAT_CONDSTAT(cond1, stat1, notstat1, cond2, stat2, notstat2, stat) \
         if (cond1) {                                                    \
                 if (cond2) {                                            \
-                        ARCSTAT_BUMP(arcstat_##stat1##_##stat2##_##stat); \
+                        ARCSTAT_BUMP(arcstat_##stat1##_##stat##_##stat2); \
                 } else {                                                \
-                        ARCSTAT_BUMP(arcstat_##stat1##_##notstat2##_##stat); \
+                        ARCSTAT_BUMP(arcstat_##stat1##_##stat##_##notstat2); \
                 }                                                       \
         } else {                                                        \
                 if (cond2) {                                            \
-                        ARCSTAT_BUMP(arcstat_##notstat1##_##stat2##_##stat); \
+                        ARCSTAT_BUMP(arcstat_##notstat1##_##stat##_##stat2); \
                 } else {                                                \
-                        ARCSTAT_BUMP(arcstat_##notstat1##_##notstat2##_##stat);\
+                        ARCSTAT_BUMP(arcstat_##notstat1##_##stat##_##notstat2);\
                 }                                                       \
         }
 
+/*
+ * This macro allows us to use kstats as floating averages. Each time we
+ * update this kstat, we first factor it and the update value by
+ * ARCSTAT_AVG_FACTOR to shrink the new value's contribution to the overall
+ * average. This macro assumes that integer loads and stores are atomic, but
+ * is not safe for multiple writers updating the kstat in parallel (only the
+ * last writer's update will remain).
+ */
+#define ARCSTAT_F_AVG_FACTOR    3
+#define ARCSTAT_F_AVG(stat, value) \
+        do { \
+                uint64_t x = ARCSTAT(stat); \
+                x = x - x / ARCSTAT_F_AVG_FACTOR + \
+                    (value) / ARCSTAT_F_AVG_FACTOR; \
+                ARCSTAT(stat) = x; \
+                _NOTE(CONSTCOND) \
+        } while (0)
+
 kstat_t                 *arc_ksp;
 static arc_state_t      *arc_anon;
 static arc_state_t      *arc_mru;
 static arc_state_t      *arc_mru_ghost;
 static arc_state_t      *arc_mfu;
@@ -806,40 +941,34 @@
  * manipulate them.  For these variables, we therefore define them to be in
  * terms of the statistic variable.  This assures that we are not introducing
  * the possibility of inconsistency by having shadow copies of the variables,
  * while still allowing the code to be readable.
  */
+#define arc_size        ARCSTAT(arcstat_size)   /* actual total arc size */
 #define arc_p           ARCSTAT(arcstat_p)      /* target size of MRU */
 #define arc_c           ARCSTAT(arcstat_c)      /* target size of cache */
 #define arc_c_min       ARCSTAT(arcstat_c_min)  /* min target cache size */
 #define arc_c_max       ARCSTAT(arcstat_c_max)  /* max target cache size */
 #define arc_meta_limit  ARCSTAT(arcstat_meta_limit) /* max size for metadata */
 #define arc_meta_min    ARCSTAT(arcstat_meta_min) /* min size for metadata */
+#define arc_meta_used   ARCSTAT(arcstat_meta_used) /* size of metadata */
 #define arc_meta_max    ARCSTAT(arcstat_meta_max) /* max size of metadata */
+#define arc_ddt_size    ARCSTAT(arcstat_ddt_size) /* ddt size in arc */
+#define arc_ddt_limit   ARCSTAT(arcstat_ddt_limit) /* ddt in arc size limit */
 
+/*
+ * Used int zio.c to optionally keep DDT cached in ARC
+ */
+uint64_t const *arc_ddt_evict_threshold;
+
 /* compressed size of entire arc */
 #define arc_compressed_size     ARCSTAT(arcstat_compressed_size)
 /* uncompressed size of entire arc */
 #define arc_uncompressed_size   ARCSTAT(arcstat_uncompressed_size)
 /* number of bytes in the arc from arc_buf_t's */
 #define arc_overhead_size       ARCSTAT(arcstat_overhead_size)
 
-/*
- * There are also some ARC variables that we want to export, but that are
- * updated so often that having the canonical representation be the statistic
- * variable causes a performance bottleneck. We want to use aggsum_t's for these
- * instead, but still be able to export the kstat in the same way as before.
- * The solution is to always use the aggsum version, except in the kstat update
- * callback.
- */
-aggsum_t arc_size;
-aggsum_t arc_meta_used;
-aggsum_t astat_data_size;
-aggsum_t astat_metadata_size;
-aggsum_t astat_hdr_size;
-aggsum_t astat_other_size;
-aggsum_t astat_l2_hdr_size;
 
 static int              arc_no_grow;    /* Don't try to grow cache size */
 static uint64_t         arc_tempreserve;
 static uint64_t         arc_loaned_bytes;
 
@@ -896,20 +1025,22 @@
  * words in pointers. arc_hdr_realloc() is used to switch a header between
  * these two allocation states.
  */
 typedef struct l1arc_buf_hdr {
         kmutex_t                b_freeze_lock;
-        zio_cksum_t             *b_freeze_cksum;
 #ifdef ZFS_DEBUG
         /*
          * Used for debugging with kmem_flags - by allocating and freeing
          * b_thawed when the buffer is thawed, we get a record of the stack
          * trace that thawed it.
          */
         void                    *b_thawed;
 #endif
 
+        /* number of krrp tasks using this buffer */
+        uint64_t                b_krrp;
+
         arc_buf_t               *b_buf;
         uint32_t                b_bufcnt;
         /* for waiting on writes to complete */
         kcondvar_t              b_cv;
         uint8_t                 b_byteswap;
@@ -941,10 +1072,18 @@
 struct arc_buf_hdr {
         /* protected by hash lock */
         dva_t                   b_dva;
         uint64_t                b_birth;
 
+        /*
+         * Even though this checksum is only set/verified when a buffer is in
+         * the L1 cache, it needs to be in the set of common fields because it
+         * must be preserved from the time before a buffer is written out to
+         * L2ARC until after it is read back in.
+         */
+        zio_cksum_t             *b_freeze_cksum;
+
         arc_buf_contents_t      b_type;
         arc_buf_hdr_t           *b_hash_next;
         arc_flags_t             b_flags;
 
         /*
@@ -998,13 +1137,16 @@
 #define HDR_L2_WRITING(hdr)     ((hdr)->b_flags & ARC_FLAG_L2_WRITING)
 #define HDR_L2_EVICTED(hdr)     ((hdr)->b_flags & ARC_FLAG_L2_EVICTED)
 #define HDR_L2_WRITE_HEAD(hdr)  ((hdr)->b_flags & ARC_FLAG_L2_WRITE_HEAD)
 #define HDR_SHARED_DATA(hdr)    ((hdr)->b_flags & ARC_FLAG_SHARED_DATA)
 
+#define HDR_ISTYPE_DDT(hdr)     \
+            ((hdr)->b_flags & ARC_FLAG_BUFC_DDT)
 #define HDR_ISTYPE_METADATA(hdr)        \
         ((hdr)->b_flags & ARC_FLAG_BUFC_METADATA)
-#define HDR_ISTYPE_DATA(hdr)    (!HDR_ISTYPE_METADATA(hdr))
+#define HDR_ISTYPE_DATA(hdr)    (!HDR_ISTYPE_METADATA(hdr) && \
+        !HDR_ISTYPE_DDT(hdr))
 
 #define HDR_HAS_L1HDR(hdr)      ((hdr)->b_flags & ARC_FLAG_HAS_L1HDR)
 #define HDR_HAS_L2HDR(hdr)      ((hdr)->b_flags & ARC_FLAG_HAS_L2HDR)
 
 /* For storing compression mode in b_flags */
@@ -1028,32 +1170,26 @@
 
 /*
  * Hash table routines
  */
 
-#define HT_LOCK_PAD     64
-
-struct ht_lock {
-        kmutex_t        ht_lock;
-#ifdef _KERNEL
-        unsigned char   pad[(HT_LOCK_PAD - sizeof (kmutex_t))];
-#endif
+struct ht_table {
+        arc_buf_hdr_t   *hdr;
+        kmutex_t        lock;
 };
 
-#define BUF_LOCKS 256
 typedef struct buf_hash_table {
         uint64_t ht_mask;
-        arc_buf_hdr_t **ht_table;
-        struct ht_lock ht_locks[BUF_LOCKS];
+        struct ht_table *ht_table;
 } buf_hash_table_t;
 
+#pragma align 64(buf_hash_table)
 static buf_hash_table_t buf_hash_table;
 
 #define BUF_HASH_INDEX(spa, dva, birth) \
         (buf_hash(spa, dva, birth) & buf_hash_table.ht_mask)
-#define BUF_HASH_LOCK_NTRY(idx) (buf_hash_table.ht_locks[idx & (BUF_LOCKS-1)])
-#define BUF_HASH_LOCK(idx)      (&(BUF_HASH_LOCK_NTRY(idx).ht_lock))
+#define BUF_HASH_LOCK(idx) (&buf_hash_table.ht_table[idx].lock)
 #define HDR_LOCK(hdr) \
         (BUF_HASH_LOCK(BUF_HASH_INDEX(hdr->b_spa, &hdr->b_dva, hdr->b_birth)))
 
 uint64_t zfs_crc64_table[256];
 
@@ -1083,31 +1219,15 @@
 uint64_t l2arc_feed_min_ms = L2ARC_FEED_MIN_MS; /* min interval milliseconds */
 boolean_t l2arc_noprefetch = B_TRUE;            /* don't cache prefetch bufs */
 boolean_t l2arc_feed_again = B_TRUE;            /* turbo warmup */
 boolean_t l2arc_norw = B_TRUE;                  /* no reads during writes */
 
-/*
- * L2ARC Internals
- */
-struct l2arc_dev {
-        vdev_t                  *l2ad_vdev;     /* vdev */
-        spa_t                   *l2ad_spa;      /* spa */
-        uint64_t                l2ad_hand;      /* next write location */
-        uint64_t                l2ad_start;     /* first addr on device */
-        uint64_t                l2ad_end;       /* last addr on device */
-        boolean_t               l2ad_first;     /* first sweep through */
-        boolean_t               l2ad_writing;   /* currently writing */
-        kmutex_t                l2ad_mtx;       /* lock for buffer list */
-        list_t                  l2ad_buflist;   /* buffer list */
-        list_node_t             l2ad_node;      /* device list node */
-        refcount_t              l2ad_alloc;     /* allocated bytes */
-};
-
 static list_t L2ARC_dev_list;                   /* device list */
 static list_t *l2arc_dev_list;                  /* device list pointer */
 static kmutex_t l2arc_dev_mtx;                  /* device list mutex */
 static l2arc_dev_t *l2arc_dev_last;             /* last device used */
+static l2arc_dev_t *l2arc_ddt_dev_last;         /* last DDT device used */
 static list_t L2ARC_free_on_write;              /* free after write buf list */
 static list_t *l2arc_free_on_write;             /* free after write list ptr */
 static kmutex_t l2arc_free_on_write_mtx;        /* mutex for list */
 static uint64_t l2arc_ndev;                     /* number of devices */
 
@@ -1120,10 +1240,11 @@
 } l2arc_read_callback_t;
 
 typedef struct l2arc_write_callback {
         l2arc_dev_t     *l2wcb_dev;             /* device info */
         arc_buf_hdr_t   *l2wcb_head;            /* head of write buflist */
+        list_t          l2wcb_log_blk_buflist;  /* in-flight log blocks */
 } l2arc_write_callback_t;
 
 typedef struct l2arc_data_free {
         /* protected by l2arc_free_on_write_mtx */
         abd_t           *l2df_abd;
@@ -1145,28 +1266,325 @@
 static void arc_hdr_free_pabd(arc_buf_hdr_t *);
 static void arc_hdr_alloc_pabd(arc_buf_hdr_t *);
 static void arc_access(arc_buf_hdr_t *, kmutex_t *);
 static boolean_t arc_is_overflowing();
 static void arc_buf_watch(arc_buf_t *);
+static l2arc_dev_t *l2arc_vdev_get(vdev_t *vd);
 
 static arc_buf_contents_t arc_buf_type(arc_buf_hdr_t *);
 static uint32_t arc_bufc_to_flags(arc_buf_contents_t);
+static arc_buf_contents_t arc_flags_to_bufc(uint32_t);
 static inline void arc_hdr_set_flags(arc_buf_hdr_t *hdr, arc_flags_t flags);
 static inline void arc_hdr_clear_flags(arc_buf_hdr_t *hdr, arc_flags_t flags);
 
 static boolean_t l2arc_write_eligible(uint64_t, arc_buf_hdr_t *);
 static void l2arc_read_done(zio_t *);
 
+static void
+arc_update_hit_stat(arc_buf_hdr_t *hdr, boolean_t hit)
+{
+        boolean_t pf = !HDR_PREFETCH(hdr);
+        switch (arc_buf_type(hdr)) {
+        case ARC_BUFC_DATA:
+                ARCSTAT_CONDSTAT(pf, demand, prefetch, hit, hits, misses, data);
+                break;
+        case ARC_BUFC_METADATA:
+                ARCSTAT_CONDSTAT(pf, demand, prefetch, hit, hits, misses,
+                    metadata);
+                break;
+        case ARC_BUFC_DDT:
+                ARCSTAT_CONDSTAT(pf, demand, prefetch, hit, hits, misses, ddt);
+                break;
+        default:
+                break;
+        }
+}
 
+enum {
+        L2ARC_DEV_HDR_EVICT_FIRST = (1 << 0)    /* mirror of l2ad_first */
+};
+
 /*
- * We use Cityhash for this. It's fast, and has good hash properties without
- * requiring any large static buffers.
+ * Pointer used in persistent L2ARC (for pointing to log blocks & ARC buffers).
  */
-static uint64_t
+typedef struct l2arc_log_blkptr {
+        uint64_t        lbp_daddr;      /* device address of log */
+        /*
+         * lbp_prop is the same format as the blk_prop in blkptr_t:
+         *      * logical size (in sectors)
+         *      * physical size (in sectors)
+         *      * checksum algorithm (used for lbp_cksum)
+         *      * object type & level (unused for now)
+         */
+        uint64_t        lbp_prop;
+        zio_cksum_t     lbp_cksum;      /* fletcher4 of log */
+} l2arc_log_blkptr_t;
+
+/*
+ * The persistent L2ARC device header.
+ * Byte order of magic determines whether 64-bit bswap of fields is necessary.
+ */
+typedef struct l2arc_dev_hdr_phys {
+        uint64_t        dh_magic;       /* L2ARC_DEV_HDR_MAGIC_Vx */
+        zio_cksum_t     dh_self_cksum;  /* fletcher4 of fields below */
+
+        /*
+         * Global L2ARC device state and metadata.
+         */
+        uint64_t        dh_spa_guid;
+        uint64_t        dh_alloc_space;         /* vdev space alloc status */
+        uint64_t        dh_flags;               /* l2arc_dev_hdr_flags_t */
+
+        /*
+         * Start of log block chain. [0] -> newest log, [1] -> one older (used
+         * for initiating prefetch).
+         */
+        l2arc_log_blkptr_t      dh_start_lbps[2];
+
+        const uint64_t  dh_pad[44];             /* pad to 512 bytes */
+} l2arc_dev_hdr_phys_t;
+CTASSERT(sizeof (l2arc_dev_hdr_phys_t) == SPA_MINBLOCKSIZE);
+
+/*
+ * A single ARC buffer header entry in a l2arc_log_blk_phys_t.
+ */
+typedef struct l2arc_log_ent_phys {
+        dva_t                   le_dva; /* dva of buffer */
+        uint64_t                le_birth;       /* birth txg of buffer */
+        zio_cksum_t             le_freeze_cksum;
+        /*
+         * le_prop is the same format as the blk_prop in blkptr_t:
+         *      * logical size (in sectors)
+         *      * physical size (in sectors)
+         *      * checksum algorithm (used for b_freeze_cksum)
+         *      * object type & level (used to restore arc_buf_contents_t)
+         */
+        uint64_t                le_prop;
+        uint64_t                le_daddr;       /* buf location on l2dev */
+        const uint64_t          le_pad[7];      /* resv'd for future use */
+} l2arc_log_ent_phys_t;
+
+/*
+ * These design limits give us the following metadata overhead (before
+ * compression):
+ *      avg_blk_sz      overhead
+ *      1k              12.51 %
+ *      2k               6.26 %
+ *      4k               3.13 %
+ *      8k               1.56 %
+ *      16k              0.78 %
+ *      32k              0.39 %
+ *      64k              0.20 %
+ *      128k             0.10 %
+ * Compression should be able to sequeeze these down by about a factor of 2x.
+ */
+#define L2ARC_LOG_BLK_SIZE                      (128 * 1024)    /* 128k */
+#define L2ARC_LOG_BLK_HEADER_LEN                (128)
+#define L2ARC_LOG_BLK_ENTRIES                   /* 1023 entries */      \
+        ((L2ARC_LOG_BLK_SIZE - L2ARC_LOG_BLK_HEADER_LEN) /              \
+        sizeof (l2arc_log_ent_phys_t))
+/*
+ * Maximum amount of data in an l2arc log block (used to terminate rebuilding
+ * before we hit the write head and restore potentially corrupted blocks).
+ */
+#define L2ARC_LOG_BLK_MAX_PAYLOAD_SIZE  \
+        (SPA_MAXBLOCKSIZE * L2ARC_LOG_BLK_ENTRIES)
+/*
+ * For the persistency and rebuild algorithms to operate reliably we need
+ * the L2ARC device to at least be able to hold 3 full log blocks (otherwise
+ * excessive log block looping might confuse the log chain end detection).
+ * Under normal circumstances this is not a problem, since this is somewhere
+ * around only 400 MB.
+ */
+#define L2ARC_PERSIST_MIN_SIZE  (3 * L2ARC_LOG_BLK_MAX_PAYLOAD_SIZE)
+
+/*
+ * A log block of up to 1023 ARC buffer log entries, chained into the
+ * persistent L2ARC metadata linked list. Byte order of magic determines
+ * whether 64-bit bswap of fields is necessary.
+ */
+typedef struct l2arc_log_blk_phys {
+        /* Header - see L2ARC_LOG_BLK_HEADER_LEN above */
+        uint64_t                lb_magic;       /* L2ARC_LOG_BLK_MAGIC */
+        l2arc_log_blkptr_t      lb_back2_lbp;   /* back 2 steps in chain */
+        uint64_t                lb_pad[9];      /* resv'd for future use */
+        /* Payload */
+        l2arc_log_ent_phys_t    lb_entries[L2ARC_LOG_BLK_ENTRIES];
+} l2arc_log_blk_phys_t;
+
+CTASSERT(sizeof (l2arc_log_blk_phys_t) == L2ARC_LOG_BLK_SIZE);
+CTASSERT(offsetof(l2arc_log_blk_phys_t, lb_entries) -
+    offsetof(l2arc_log_blk_phys_t, lb_magic) == L2ARC_LOG_BLK_HEADER_LEN);
+
+/*
+ * These structures hold in-flight l2arc_log_blk_phys_t's as they're being
+ * written to the L2ARC device. They may be compressed, hence the uint8_t[].
+ */
+typedef struct l2arc_log_blk_buf {
+        uint8_t         lbb_log_blk[sizeof (l2arc_log_blk_phys_t)];
+        list_node_t     lbb_node;
+} l2arc_log_blk_buf_t;
+
+/* Macros for the manipulation fields in the blk_prop format of blkptr_t */
+#define BLKPROP_GET_LSIZE(_obj, _field)         \
+        BF64_GET_SB((_obj)->_field, 0, 16, SPA_MINBLOCKSHIFT, 1)
+#define BLKPROP_SET_LSIZE(_obj, _field, x)      \
+        BF64_SET_SB((_obj)->_field, 0, 16, SPA_MINBLOCKSHIFT, 1, x)
+#define BLKPROP_GET_PSIZE(_obj, _field)         \
+        BF64_GET_SB((_obj)->_field, 16, 16, SPA_MINBLOCKSHIFT, 0)
+#define BLKPROP_SET_PSIZE(_obj, _field, x)      \
+        BF64_SET_SB((_obj)->_field, 16, 16, SPA_MINBLOCKSHIFT, 0, x)
+#define BLKPROP_GET_COMPRESS(_obj, _field)      \
+        BF64_GET((_obj)->_field, 32, 7)
+#define BLKPROP_SET_COMPRESS(_obj, _field, x)   \
+        BF64_SET((_obj)->_field, 32, 7, x)
+#define BLKPROP_GET_ARC_COMPRESS(_obj, _field)  \
+        BF64_GET((_obj)->_field, 39, 1)
+#define BLKPROP_SET_ARC_COMPRESS(_obj, _field, x)       \
+        BF64_SET((_obj)->_field, 39, 1, x)
+#define BLKPROP_GET_CHECKSUM(_obj, _field)      \
+        BF64_GET((_obj)->_field, 40, 8)
+#define BLKPROP_SET_CHECKSUM(_obj, _field, x)   \
+        BF64_SET((_obj)->_field, 40, 8, x)
+#define BLKPROP_GET_TYPE(_obj, _field)          \
+        BF64_GET((_obj)->_field, 48, 8)
+#define BLKPROP_SET_TYPE(_obj, _field, x)       \
+        BF64_SET((_obj)->_field, 48, 8, x)
+
+/* Macros for manipulating a l2arc_log_blkptr_t->lbp_prop field */
+#define LBP_GET_LSIZE(_add)             BLKPROP_GET_LSIZE(_add, lbp_prop)
+#define LBP_SET_LSIZE(_add, x)          BLKPROP_SET_LSIZE(_add, lbp_prop, x)
+#define LBP_GET_PSIZE(_add)             BLKPROP_GET_PSIZE(_add, lbp_prop)
+#define LBP_SET_PSIZE(_add, x)          BLKPROP_SET_PSIZE(_add, lbp_prop, x)
+#define LBP_GET_COMPRESS(_add)          BLKPROP_GET_COMPRESS(_add, lbp_prop)
+#define LBP_SET_COMPRESS(_add, x)       BLKPROP_SET_COMPRESS(_add, lbp_prop, x)
+#define LBP_GET_CHECKSUM(_add)          BLKPROP_GET_CHECKSUM(_add, lbp_prop)
+#define LBP_SET_CHECKSUM(_add, x)       BLKPROP_SET_CHECKSUM(_add, lbp_prop, x)
+#define LBP_GET_TYPE(_add)              BLKPROP_GET_TYPE(_add, lbp_prop)
+#define LBP_SET_TYPE(_add, x)           BLKPROP_SET_TYPE(_add, lbp_prop, x)
+
+/* Macros for manipulating a l2arc_log_ent_phys_t->le_prop field */
+#define LE_GET_LSIZE(_le)       BLKPROP_GET_LSIZE(_le, le_prop)
+#define LE_SET_LSIZE(_le, x)    BLKPROP_SET_LSIZE(_le, le_prop, x)
+#define LE_GET_PSIZE(_le)       BLKPROP_GET_PSIZE(_le, le_prop)
+#define LE_SET_PSIZE(_le, x)    BLKPROP_SET_PSIZE(_le, le_prop, x)
+#define LE_GET_COMPRESS(_le)    BLKPROP_GET_COMPRESS(_le, le_prop)
+#define LE_SET_COMPRESS(_le, x) BLKPROP_SET_COMPRESS(_le, le_prop, x)
+#define LE_GET_ARC_COMPRESS(_le)        BLKPROP_GET_ARC_COMPRESS(_le, le_prop)
+#define LE_SET_ARC_COMPRESS(_le, x)     BLKPROP_SET_ARC_COMPRESS(_le, le_prop, x)
+#define LE_GET_CHECKSUM(_le)    BLKPROP_GET_CHECKSUM(_le, le_prop)
+#define LE_SET_CHECKSUM(_le, x) BLKPROP_SET_CHECKSUM(_le, le_prop, x)
+#define LE_GET_TYPE(_le)        BLKPROP_GET_TYPE(_le, le_prop)
+#define LE_SET_TYPE(_le, x)     BLKPROP_SET_TYPE(_le, le_prop, x)
+
+#define PTR_SWAP(x, y)          \
+        do {                    \
+                void *tmp = (x);\
+                x = y;          \
+                y = tmp;        \
+                _NOTE(CONSTCOND)\
+        } while (0)
+
+/*
+ * Sadly, after compressed ARC integration older kernels would panic
+ * when trying to rebuild persistent L2ARC created by the new code.
+ */
+#define L2ARC_DEV_HDR_MAGIC_V1  0x4c32415243763031LLU   /* ASCII: "L2ARCv01" */
+#define L2ARC_LOG_BLK_MAGIC     0x4c4f47424c4b4844LLU   /* ASCII: "LOGBLKHD" */
+
+/*
+ * Performance tuning of L2ARC persistency:
+ *
+ * l2arc_rebuild_enabled : Controls whether L2ARC device adds (either at
+ *              pool import or when adding one manually later) will attempt
+ *              to rebuild L2ARC buffer contents. In special circumstances,
+ *              the administrator may want to set this to B_FALSE, if they
+ *              are having trouble importing a pool or attaching an L2ARC
+ *              device (e.g. the L2ARC device is slow to read in stored log
+ *              metadata, or the metadata has become somehow
+ *              fragmented/unusable).
+ */
+boolean_t l2arc_rebuild_enabled = B_TRUE;
+
+/* L2ARC persistency rebuild control routines. */
+static void l2arc_dev_rebuild_start(l2arc_dev_t *dev);
+static int l2arc_rebuild(l2arc_dev_t *dev);
+
+/* L2ARC persistency read I/O routines. */
+static int l2arc_dev_hdr_read(l2arc_dev_t *dev);
+static int l2arc_log_blk_read(l2arc_dev_t *dev,
+    const l2arc_log_blkptr_t *this_lp, const l2arc_log_blkptr_t *next_lp,
+    l2arc_log_blk_phys_t *this_lb, l2arc_log_blk_phys_t *next_lb,
+    uint8_t *this_lb_buf, uint8_t *next_lb_buf,
+    zio_t *this_io, zio_t **next_io);
+static zio_t *l2arc_log_blk_prefetch(vdev_t *vd,
+    const l2arc_log_blkptr_t *lp, uint8_t *lb_buf);
+static void l2arc_log_blk_prefetch_abort(zio_t *zio);
+
+/* L2ARC persistency block restoration routines. */
+static void l2arc_log_blk_restore(l2arc_dev_t *dev, uint64_t load_guid,
+    const l2arc_log_blk_phys_t *lb, uint64_t lb_psize);
+static void l2arc_hdr_restore(const l2arc_log_ent_phys_t *le,
+    l2arc_dev_t *dev, uint64_t guid);
+
+/* L2ARC persistency write I/O routines. */
+static void l2arc_dev_hdr_update(l2arc_dev_t *dev, zio_t *pio);
+static void l2arc_log_blk_commit(l2arc_dev_t *dev, zio_t *pio,
+    l2arc_write_callback_t *cb);
+
+/* L2ARC persistency auxilliary routines. */
+static boolean_t l2arc_log_blkptr_valid(l2arc_dev_t *dev,
+    const l2arc_log_blkptr_t *lp);
+static void l2arc_dev_hdr_checksum(const l2arc_dev_hdr_phys_t *hdr,
+    zio_cksum_t *cksum);
+static boolean_t l2arc_log_blk_insert(l2arc_dev_t *dev,
+    const arc_buf_hdr_t *ab);
+static inline boolean_t l2arc_range_check_overlap(uint64_t bottom,
+    uint64_t top, uint64_t check);
+
+/*
+ * L2ARC Internals
+ */
+struct l2arc_dev {
+        vdev_t                  *l2ad_vdev;     /* vdev */
+        spa_t                   *l2ad_spa;      /* spa */
+        uint64_t                l2ad_hand;      /* next write location */
+        uint64_t                l2ad_start;     /* first addr on device */
+        uint64_t                l2ad_end;       /* last addr on device */
+        boolean_t               l2ad_first;     /* first sweep through */
+        boolean_t               l2ad_writing;   /* currently writing */
+        kmutex_t                l2ad_mtx;       /* lock for buffer list */
+        list_t                  l2ad_buflist;   /* buffer list */
+        list_node_t             l2ad_node;      /* device list node */
+        refcount_t              l2ad_alloc;     /* allocated bytes */
+        l2arc_dev_hdr_phys_t    *l2ad_dev_hdr;  /* persistent device header */
+        uint64_t                l2ad_dev_hdr_asize; /* aligned hdr size */
+        l2arc_log_blk_phys_t    l2ad_log_blk;   /* currently open log block */
+        int                     l2ad_log_ent_idx; /* index into cur log blk */
+        /* number of bytes in current log block's payload */
+        uint64_t                l2ad_log_blk_payload_asize;
+        /* flag indicating whether a rebuild is scheduled or is going on */
+        boolean_t               l2ad_rebuild;
+        boolean_t               l2ad_rebuild_cancel;
+        kt_did_t                l2ad_rebuild_did;
+};
+
+static inline uint64_t
 buf_hash(uint64_t spa, const dva_t *dva, uint64_t birth)
 {
-        return (cityhash4(spa, dva->dva_word[0], dva->dva_word[1], birth));
+        uint8_t *vdva = (uint8_t *)dva;
+        uint64_t crc = -1ULL;
+        int i;
+
+        ASSERT(zfs_crc64_table[128] == ZFS_CRC64_POLY);
+
+        for (i = 0; i < sizeof (dva_t); i++)
+                crc = (crc >> 8) ^ zfs_crc64_table[(crc ^ vdva[i]) & 0xFF];
+
+        crc ^= (spa>>8) ^ birth;
+
+        return (crc);
 }
 
 #define HDR_EMPTY(hdr)                                          \
         ((hdr)->b_dva.dva_word[0] == 0 &&                       \
         (hdr)->b_dva.dva_word[1] == 0)
@@ -1192,11 +1610,11 @@
         uint64_t idx = BUF_HASH_INDEX(spa, dva, birth);
         kmutex_t *hash_lock = BUF_HASH_LOCK(idx);
         arc_buf_hdr_t *hdr;
 
         mutex_enter(hash_lock);
-        for (hdr = buf_hash_table.ht_table[idx]; hdr != NULL;
+        for (hdr = buf_hash_table.ht_table[idx].hdr; hdr != NULL;
             hdr = hdr->b_hash_next) {
                 if (HDR_EQUAL(spa, dva, birth, hdr)) {
                         *lockp = hash_lock;
                         return (hdr);
                 }
@@ -1230,18 +1648,18 @@
                 mutex_enter(hash_lock);
         } else {
                 ASSERT(MUTEX_HELD(hash_lock));
         }
 
-        for (fhdr = buf_hash_table.ht_table[idx], i = 0; fhdr != NULL;
+        for (fhdr = buf_hash_table.ht_table[idx].hdr, i = 0; fhdr != NULL;
             fhdr = fhdr->b_hash_next, i++) {
                 if (HDR_EQUAL(hdr->b_spa, &hdr->b_dva, hdr->b_birth, fhdr))
                         return (fhdr);
         }
 
-        hdr->b_hash_next = buf_hash_table.ht_table[idx];
-        buf_hash_table.ht_table[idx] = hdr;
+        hdr->b_hash_next = buf_hash_table.ht_table[idx].hdr;
+        buf_hash_table.ht_table[idx].hdr = hdr;
         arc_hdr_set_flags(hdr, ARC_FLAG_IN_HASH_TABLE);
 
         /* collect some hash table performance data */
         if (i > 0) {
                 ARCSTAT_BUMP(arcstat_hash_collisions);
@@ -1264,11 +1682,11 @@
         uint64_t idx = BUF_HASH_INDEX(hdr->b_spa, &hdr->b_dva, hdr->b_birth);
 
         ASSERT(MUTEX_HELD(BUF_HASH_LOCK(idx)));
         ASSERT(HDR_IN_HASH_TABLE(hdr));
 
-        hdrp = &buf_hash_table.ht_table[idx];
+        hdrp = &buf_hash_table.ht_table[idx].hdr;
         while ((fhdr = *hdrp) != hdr) {
                 ASSERT3P(fhdr, !=, NULL);
                 hdrp = &fhdr->b_hash_next;
         }
         *hdrp = hdr->b_hash_next;
@@ -1276,12 +1694,12 @@
         arc_hdr_clear_flags(hdr, ARC_FLAG_IN_HASH_TABLE);
 
         /* collect some hash table performance data */
         ARCSTAT_BUMPDOWN(arcstat_hash_elements);
 
-        if (buf_hash_table.ht_table[idx] &&
-            buf_hash_table.ht_table[idx]->b_hash_next == NULL)
+        if (buf_hash_table.ht_table[idx].hdr &&
+            buf_hash_table.ht_table[idx].hdr->b_hash_next == NULL)
                 ARCSTAT_BUMPDOWN(arcstat_hash_chains);
 }
 
 /*
  * Global data structures and functions for the buf kmem cache.
@@ -1293,14 +1711,14 @@
 static void
 buf_fini(void)
 {
         int i;
 
+        for (i = 0; i < buf_hash_table.ht_mask + 1; i++)
+                mutex_destroy(&buf_hash_table.ht_table[i].lock);
         kmem_free(buf_hash_table.ht_table,
-            (buf_hash_table.ht_mask + 1) * sizeof (void *));
-        for (i = 0; i < BUF_LOCKS; i++)
-                mutex_destroy(&buf_hash_table.ht_locks[i].ht_lock);
+            (buf_hash_table.ht_mask + 1) * sizeof (struct ht_table));
         kmem_cache_destroy(hdr_full_cache);
         kmem_cache_destroy(hdr_l2only_cache);
         kmem_cache_destroy(buf_cache);
 }
 
@@ -1419,11 +1837,11 @@
         while (hsize * zfs_arc_average_blocksize < physmem * PAGESIZE)
                 hsize <<= 1;
 retry:
         buf_hash_table.ht_mask = hsize - 1;
         buf_hash_table.ht_table =
-            kmem_zalloc(hsize * sizeof (void*), KM_NOSLEEP);
+            kmem_zalloc(hsize * sizeof (struct ht_table), KM_NOSLEEP);
         if (buf_hash_table.ht_table == NULL) {
                 ASSERT(hsize > (1ULL << 8));
                 hsize >>= 1;
                 goto retry;
         }
@@ -1438,16 +1856,24 @@
 
         for (i = 0; i < 256; i++)
                 for (ct = zfs_crc64_table + i, *ct = i, j = 8; j > 0; j--)
                         *ct = (*ct >> 1) ^ (-(*ct & 1) & ZFS_CRC64_POLY);
 
-        for (i = 0; i < BUF_LOCKS; i++) {
-                mutex_init(&buf_hash_table.ht_locks[i].ht_lock,
+        for (i = 0; i < hsize; i++) {
+                mutex_init(&buf_hash_table.ht_table[i].lock,
                     NULL, MUTEX_DEFAULT, NULL);
         }
 }
 
+/* wait until krrp releases the buffer */
+static inline void
+arc_wait_for_krrp(arc_buf_hdr_t *hdr)
+{
+        while (HDR_HAS_L1HDR(hdr) && hdr->b_l1hdr.b_krrp != 0)
+                cv_wait(&hdr->b_l1hdr.b_cv, HDR_LOCK(hdr));
+}
+
 /*
  * This is the size that the buf occupies in memory. If the buf is compressed,
  * it will correspond to the compressed size. You should use this method of
  * getting the buf size unless you explicitly need the logical size.
  */
@@ -1499,13 +1925,13 @@
 static inline void
 arc_cksum_free(arc_buf_hdr_t *hdr)
 {
         ASSERT(HDR_HAS_L1HDR(hdr));
         mutex_enter(&hdr->b_l1hdr.b_freeze_lock);
-        if (hdr->b_l1hdr.b_freeze_cksum != NULL) {
-                kmem_free(hdr->b_l1hdr.b_freeze_cksum, sizeof (zio_cksum_t));
-                hdr->b_l1hdr.b_freeze_cksum = NULL;
+        if (hdr->b_freeze_cksum != NULL) {
+                kmem_free(hdr->b_freeze_cksum, sizeof (zio_cksum_t));
+                hdr->b_freeze_cksum = NULL;
         }
         mutex_exit(&hdr->b_l1hdr.b_freeze_lock);
 }
 
 /*
@@ -1535,25 +1961,25 @@
 
         if (!(zfs_flags & ZFS_DEBUG_MODIFY))
                 return;
 
         if (ARC_BUF_COMPRESSED(buf)) {
-                ASSERT(hdr->b_l1hdr.b_freeze_cksum == NULL ||
+                ASSERT(hdr->b_freeze_cksum == NULL ||
                     arc_hdr_has_uncompressed_buf(hdr));
                 return;
         }
 
         ASSERT(HDR_HAS_L1HDR(hdr));
 
         mutex_enter(&hdr->b_l1hdr.b_freeze_lock);
-        if (hdr->b_l1hdr.b_freeze_cksum == NULL || HDR_IO_ERROR(hdr)) {
+        if (hdr->b_freeze_cksum == NULL || HDR_IO_ERROR(hdr)) {
                 mutex_exit(&hdr->b_l1hdr.b_freeze_lock);
                 return;
         }
 
         fletcher_2_native(buf->b_data, arc_buf_size(buf), NULL, &zc);
-        if (!ZIO_CHECKSUM_EQUAL(*hdr->b_l1hdr.b_freeze_cksum, zc))
+        if (!ZIO_CHECKSUM_EQUAL(*hdr->b_freeze_cksum, zc))
                 panic("buffer modified while frozen!");
         mutex_exit(&hdr->b_l1hdr.b_freeze_lock);
 }
 
 static boolean_t
@@ -1580,13 +2006,14 @@
         if (!HDR_COMPRESSION_ENABLED(hdr) && compress != ZIO_COMPRESS_OFF) {
                 ASSERT3U(HDR_GET_COMPRESS(hdr), ==, ZIO_COMPRESS_OFF);
                 uint64_t lsize = HDR_GET_LSIZE(hdr);
                 uint64_t csize;
 
-                abd_t *cdata = abd_alloc_linear(HDR_GET_PSIZE(hdr), B_TRUE);
-                csize = zio_compress_data(compress, zio->io_abd,
-                    abd_to_buf(cdata), lsize);
+                void *cbuf = zio_buf_alloc(HDR_GET_PSIZE(hdr));
+                csize = zio_compress_data(compress, zio->io_abd, cbuf, lsize);
+                abd_t *cdata = abd_get_from_buf(cbuf, HDR_GET_PSIZE(hdr));
+                abd_take_ownership_of_buf(cdata, B_TRUE);
 
                 ASSERT3U(csize, <=, HDR_GET_PSIZE(hdr));
                 if (csize < HDR_GET_PSIZE(hdr)) {
                         /*
                          * Compressed blocks are always a multiple of the
@@ -1641,24 +2068,24 @@
                 return;
 
         ASSERT(HDR_HAS_L1HDR(hdr));
 
         mutex_enter(&buf->b_hdr->b_l1hdr.b_freeze_lock);
-        if (hdr->b_l1hdr.b_freeze_cksum != NULL) {
+        if (hdr->b_freeze_cksum != NULL) {
                 ASSERT(arc_hdr_has_uncompressed_buf(hdr));
                 mutex_exit(&hdr->b_l1hdr.b_freeze_lock);
                 return;
         } else if (ARC_BUF_COMPRESSED(buf)) {
                 mutex_exit(&hdr->b_l1hdr.b_freeze_lock);
                 return;
         }
 
         ASSERT(!ARC_BUF_COMPRESSED(buf));
-        hdr->b_l1hdr.b_freeze_cksum = kmem_alloc(sizeof (zio_cksum_t),
+        hdr->b_freeze_cksum = kmem_alloc(sizeof (zio_cksum_t),
             KM_SLEEP);
         fletcher_2_native(buf->b_data, arc_buf_size(buf), NULL,
-            hdr->b_l1hdr.b_freeze_cksum);
+            hdr->b_freeze_cksum);
         mutex_exit(&hdr->b_l1hdr.b_freeze_lock);
         arc_buf_watch(buf);
 }
 
 #ifndef _KERNEL
@@ -1706,12 +2133,15 @@
 
 static arc_buf_contents_t
 arc_buf_type(arc_buf_hdr_t *hdr)
 {
         arc_buf_contents_t type;
+
         if (HDR_ISTYPE_METADATA(hdr)) {
                 type = ARC_BUFC_METADATA;
+        } else if (HDR_ISTYPE_DDT(hdr)) {
+                type = ARC_BUFC_DDT;
         } else {
                 type = ARC_BUFC_DATA;
         }
         VERIFY3U(hdr->b_type, ==, type);
         return (type);
@@ -1730,17 +2160,29 @@
         case ARC_BUFC_DATA:
                 /* metadata field is 0 if buffer contains normal data */
                 return (0);
         case ARC_BUFC_METADATA:
                 return (ARC_FLAG_BUFC_METADATA);
+        case ARC_BUFC_DDT:
+                return (ARC_FLAG_BUFC_DDT);
         default:
                 break;
         }
         panic("undefined ARC buffer type!");
         return ((uint32_t)-1);
 }
 
+static arc_buf_contents_t
+arc_flags_to_bufc(uint32_t flags)
+{
+        if (flags & ARC_FLAG_BUFC_DDT)
+                return (ARC_BUFC_DDT);
+        if (flags & ARC_FLAG_BUFC_METADATA)
+                return (ARC_BUFC_METADATA);
+        return (ARC_BUFC_DATA);
+}
+
 void
 arc_buf_thaw(arc_buf_t *buf)
 {
         arc_buf_hdr_t *hdr = buf->b_hdr;
 
@@ -1752,11 +2194,11 @@
         /*
          * Compressed buffers do not manipulate the b_freeze_cksum or
          * allocate b_thawed.
          */
         if (ARC_BUF_COMPRESSED(buf)) {
-                ASSERT(hdr->b_l1hdr.b_freeze_cksum == NULL ||
+                ASSERT(hdr->b_freeze_cksum == NULL ||
                     arc_hdr_has_uncompressed_buf(hdr));
                 return;
         }
 
         ASSERT(HDR_HAS_L1HDR(hdr));
@@ -1784,20 +2226,20 @@
 
         if (!(zfs_flags & ZFS_DEBUG_MODIFY))
                 return;
 
         if (ARC_BUF_COMPRESSED(buf)) {
-                ASSERT(hdr->b_l1hdr.b_freeze_cksum == NULL ||
+                ASSERT(hdr->b_freeze_cksum == NULL ||
                     arc_hdr_has_uncompressed_buf(hdr));
                 return;
         }
 
         hash_lock = HDR_LOCK(hdr);
         mutex_enter(hash_lock);
 
         ASSERT(HDR_HAS_L1HDR(hdr));
-        ASSERT(hdr->b_l1hdr.b_freeze_cksum != NULL ||
+        ASSERT(hdr->b_freeze_cksum != NULL ||
             hdr->b_l1hdr.b_state == arc_anon);
         arc_cksum_compute(buf);
         mutex_exit(hash_lock);
 }
 
@@ -1885,11 +2327,11 @@
 
         /*
          * There were no decompressed bufs, so there should not be a
          * checksum on the hdr either.
          */
-        EQUIV(!copied, hdr->b_l1hdr.b_freeze_cksum == NULL);
+        EQUIV(!copied, hdr->b_freeze_cksum == NULL);
 
         return (copied);
 }
 
 /*
@@ -1964,11 +2406,11 @@
                  * decompressed version. If that's not possible, it's time to
                  * bite the bullet and decompress the data from the hdr.
                  */
                 if (arc_buf_try_copy_decompressed_data(buf)) {
                         /* Skip byteswapping and checksumming (already done) */
-                        ASSERT3P(hdr->b_l1hdr.b_freeze_cksum, !=, NULL);
+                        ASSERT3P(hdr->b_freeze_cksum, !=, NULL);
                         return (0);
                 } else {
                         int error = zio_decompress_data(HDR_GET_COMPRESS(hdr),
                             hdr->b_l1hdr.b_pabd, buf->b_data,
                             HDR_GET_PSIZE(hdr), HDR_GET_LSIZE(hdr));
@@ -2227,12 +2669,14 @@
                         arc_evictable_space_increment(hdr, new_state);
                 }
         }
 
         ASSERT(!HDR_EMPTY(hdr));
-        if (new_state == arc_anon && HDR_IN_HASH_TABLE(hdr))
+        if (new_state == arc_anon && HDR_IN_HASH_TABLE(hdr)) {
+                arc_wait_for_krrp(hdr);
                 buf_hash_remove(hdr);
+        }
 
         /* adjust state sizes (ignore arc_l2c_only) */
 
         if (update_new && new_state != arc_l2c_only) {
                 ASSERT(HDR_HAS_L1HDR(hdr));
@@ -2341,80 +2785,82 @@
 
         /*
          * L2 headers should never be on the L2 state list since they don't
          * have L1 headers allocated.
          */
-        ASSERT(multilist_is_empty(arc_l2c_only->arcs_list[ARC_BUFC_DATA]) &&
-            multilist_is_empty(arc_l2c_only->arcs_list[ARC_BUFC_METADATA]));
+        ASSERT(multilist_is_empty(arc_l2c_only->arcs_list[ARC_BUFC_DATA]));
+        ASSERT(multilist_is_empty(arc_l2c_only->arcs_list[ARC_BUFC_METADATA]));
+        ASSERT(multilist_is_empty(arc_l2c_only->arcs_list[ARC_BUFC_DDT]));
 }
 
 void
 arc_space_consume(uint64_t space, arc_space_type_t type)
 {
         ASSERT(type >= 0 && type < ARC_SPACE_NUMTYPES);
 
         switch (type) {
         case ARC_SPACE_DATA:
-                aggsum_add(&astat_data_size, space);
+                ARCSTAT_INCR(arcstat_data_size, space);
                 break;
         case ARC_SPACE_META:
-                aggsum_add(&astat_metadata_size, space);
+                ARCSTAT_INCR(arcstat_metadata_size, space);
                 break;
+        case ARC_SPACE_DDT:
+                ARCSTAT_INCR(arcstat_ddt_size, space);
+                break;
         case ARC_SPACE_OTHER:
-                aggsum_add(&astat_other_size, space);
+                ARCSTAT_INCR(arcstat_other_size, space);
                 break;
         case ARC_SPACE_HDRS:
-                aggsum_add(&astat_hdr_size, space);
+                ARCSTAT_INCR(arcstat_hdr_size, space);
                 break;
         case ARC_SPACE_L2HDRS:
-                aggsum_add(&astat_l2_hdr_size, space);
+                ARCSTAT_INCR(arcstat_l2_hdr_size, space);
                 break;
         }
 
-        if (type != ARC_SPACE_DATA)
-                aggsum_add(&arc_meta_used, space);
+        if (type != ARC_SPACE_DATA && type != ARC_SPACE_DDT)
+                ARCSTAT_INCR(arcstat_meta_used, space);
 
-        aggsum_add(&arc_size, space);
+        atomic_add_64(&arc_size, space);
 }
 
 void
 arc_space_return(uint64_t space, arc_space_type_t type)
 {
         ASSERT(type >= 0 && type < ARC_SPACE_NUMTYPES);
 
         switch (type) {
         case ARC_SPACE_DATA:
-                aggsum_add(&astat_data_size, -space);
+                ARCSTAT_INCR(arcstat_data_size, -space);
                 break;
         case ARC_SPACE_META:
-                aggsum_add(&astat_metadata_size, -space);
+                ARCSTAT_INCR(arcstat_metadata_size, -space);
                 break;
+        case ARC_SPACE_DDT:
+                ARCSTAT_INCR(arcstat_ddt_size, -space);
+                break;
         case ARC_SPACE_OTHER:
-                aggsum_add(&astat_other_size, -space);
+                ARCSTAT_INCR(arcstat_other_size, -space);
                 break;
         case ARC_SPACE_HDRS:
-                aggsum_add(&astat_hdr_size, -space);
+                ARCSTAT_INCR(arcstat_hdr_size, -space);
                 break;
         case ARC_SPACE_L2HDRS:
-                aggsum_add(&astat_l2_hdr_size, -space);
+                ARCSTAT_INCR(arcstat_l2_hdr_size, -space);
                 break;
         }
 
-        if (type != ARC_SPACE_DATA) {
-                ASSERT(aggsum_compare(&arc_meta_used, space) >= 0);
-                /*
-                 * We use the upper bound here rather than the precise value
-                 * because the arc_meta_max value doesn't need to be
-                 * precise. It's only consumed by humans via arcstats.
-                 */
-                if (arc_meta_max < aggsum_upper_bound(&arc_meta_used))
-                        arc_meta_max = aggsum_upper_bound(&arc_meta_used);
-                aggsum_add(&arc_meta_used, -space);
+        if (type != ARC_SPACE_DATA && type != ARC_SPACE_DDT) {
+                ASSERT(arc_meta_used >= space);
+                if (arc_meta_max < arc_meta_used)
+                        arc_meta_max = arc_meta_used;
+                ARCSTAT_INCR(arcstat_meta_used, -space);
         }
 
-        ASSERT(aggsum_compare(&arc_size, space) >= 0);
-        aggsum_add(&arc_size, -space);
+        ASSERT(arc_size >= space);
+        atomic_add_64(&arc_size, -space);
 }
 
 /*
  * Given a hdr and a buf, returns whether that buf can share its b_data buffer
  * with the hdr's b_pabd.
@@ -2464,11 +2910,12 @@
         arc_buf_t *buf;
 
         ASSERT(HDR_HAS_L1HDR(hdr));
         ASSERT3U(HDR_GET_LSIZE(hdr), >, 0);
         VERIFY(hdr->b_type == ARC_BUFC_DATA ||
-            hdr->b_type == ARC_BUFC_METADATA);
+            hdr->b_type == ARC_BUFC_METADATA ||
+            hdr->b_type == ARC_BUFC_DDT);
         ASSERT3P(ret, !=, NULL);
         ASSERT3P(*ret, ==, NULL);
 
         buf = *ret = kmem_cache_alloc(buf_cache, KM_PUSHPAGE);
         buf->b_hdr = hdr;
@@ -2544,10 +2991,60 @@
         /* assert that it did not wrap around */
         ASSERT3S(atomic_add_64_nv(&arc_loaned_bytes, 0), >=, 0);
 }
 
 /*
+ * Allocates an ARC buf header that's in an evicted & L2-cached state.
+ * This is used during l2arc reconstruction to make empty ARC buffers
+ * which circumvent the regular disk->arc->l2arc path and instead come
+ * into being in the reverse order, i.e. l2arc->arc.
+ */
+static arc_buf_hdr_t *
+arc_buf_alloc_l2only(uint64_t load_guid, arc_buf_contents_t type,
+    l2arc_dev_t *dev, dva_t dva, uint64_t daddr, uint64_t lsize,
+    uint64_t psize, uint64_t birth, zio_cksum_t cksum, int checksum_type,
+    enum zio_compress compress, boolean_t arc_compress)
+{
+        arc_buf_hdr_t *hdr;
+
+        if (type == ARC_BUFC_DDT && !zfs_arc_segregate_ddt)
+                type = ARC_BUFC_METADATA;
+
+        ASSERT(lsize != 0);
+        hdr = kmem_cache_alloc(hdr_l2only_cache, KM_PUSHPAGE);
+        ASSERT(HDR_EMPTY(hdr));
+        ASSERT3P(hdr->b_freeze_cksum, ==, NULL);
+
+        hdr->b_spa = load_guid;
+        hdr->b_type = type;
+        hdr->b_flags = 0;
+
+        if (arc_compress)
+                arc_hdr_set_flags(hdr, ARC_FLAG_COMPRESSED_ARC);
+        else
+                arc_hdr_clear_flags(hdr, ARC_FLAG_COMPRESSED_ARC);
+
+        HDR_SET_COMPRESS(hdr, compress);
+
+        arc_hdr_set_flags(hdr, arc_bufc_to_flags(type) | ARC_FLAG_HAS_L2HDR);
+        hdr->b_dva = dva;
+        hdr->b_birth = birth;
+        if (checksum_type != ZIO_CHECKSUM_OFF) {
+                hdr->b_freeze_cksum = kmem_alloc(sizeof (zio_cksum_t), KM_SLEEP);
+                bcopy(&cksum, hdr->b_freeze_cksum, sizeof (cksum));
+        }
+
+        HDR_SET_PSIZE(hdr, psize);
+        HDR_SET_LSIZE(hdr, lsize);
+
+        hdr->b_l2hdr.b_dev = dev;
+        hdr->b_l2hdr.b_daddr = daddr;
+
+        return (hdr);
+}
+
+/*
  * Loan out an anonymous arc buffer. Loaned buffers are not counted as in
  * flight data by arc_tempreserve_space() until they are "returned". Loaned
  * buffers must be returned to the arc before they can be used by the DMU or
  * freed.
  */
@@ -2632,11 +3129,13 @@
 
                 (void) refcount_remove_many(&state->arcs_esize[type],
                     size, hdr);
         }
         (void) refcount_remove_many(&state->arcs_size, size, hdr);
-        if (type == ARC_BUFC_METADATA) {
+        if (type == ARC_BUFC_DDT) {
+                arc_space_return(size, ARC_SPACE_DDT);
+        } else if (type == ARC_BUFC_METADATA) {
                 arc_space_return(size, ARC_SPACE_META);
         } else {
                 ASSERT(type == ARC_BUFC_DATA);
                 arc_space_return(size, ARC_SPACE_DATA);
         }
@@ -2664,11 +3163,11 @@
          * the refcount whenever an arc_buf_t is shared.
          */
         refcount_transfer_ownership(&state->arcs_size, buf, hdr);
         hdr->b_l1hdr.b_pabd = abd_get_from_buf(buf->b_data, arc_buf_size(buf));
         abd_take_ownership_of_buf(hdr->b_l1hdr.b_pabd,
-            HDR_ISTYPE_METADATA(hdr));
+            !HDR_ISTYPE_DATA(hdr));
         arc_hdr_set_flags(hdr, ARC_FLAG_SHARED_DATA);
         buf->b_flags |= ARC_BUF_FLAG_SHARED;
 
         /*
          * Since we've transferred ownership to the hdr we need
@@ -2857,10 +3356,11 @@
         hdr->b_l1hdr.b_byteswap = DMU_BSWAP_NUMFUNCS;
         ASSERT3P(hdr->b_l1hdr.b_pabd, !=, NULL);
 
         ARCSTAT_INCR(arcstat_compressed_size, arc_hdr_size(hdr));
         ARCSTAT_INCR(arcstat_uncompressed_size, HDR_GET_LSIZE(hdr));
+        arc_update_hit_stat(hdr, B_TRUE);
 }
 
 static void
 arc_hdr_free_pabd(arc_buf_hdr_t *hdr)
 {
@@ -2891,15 +3391,20 @@
 arc_hdr_alloc(uint64_t spa, int32_t psize, int32_t lsize,
     enum zio_compress compression_type, arc_buf_contents_t type)
 {
         arc_buf_hdr_t *hdr;
 
-        VERIFY(type == ARC_BUFC_DATA || type == ARC_BUFC_METADATA);
+        ASSERT3U(lsize, >, 0);
 
+        if (type == ARC_BUFC_DDT && !zfs_arc_segregate_ddt)
+                type = ARC_BUFC_METADATA;
+        VERIFY(type == ARC_BUFC_DATA || type == ARC_BUFC_METADATA ||
+            type == ARC_BUFC_DDT);
+
         hdr = kmem_cache_alloc(hdr_full_cache, KM_PUSHPAGE);
         ASSERT(HDR_EMPTY(hdr));
-        ASSERT3P(hdr->b_l1hdr.b_freeze_cksum, ==, NULL);
+        ASSERT3P(hdr->b_freeze_cksum, ==, NULL);
         ASSERT3P(hdr->b_l1hdr.b_thawed, ==, NULL);
         HDR_SET_PSIZE(hdr, psize);
         HDR_SET_LSIZE(hdr, lsize);
         hdr->b_spa = spa;
         hdr->b_type = type;
@@ -2960,11 +3465,11 @@
                 /* Verify previous threads set to NULL before freeing */
                 ASSERT3P(nhdr->b_l1hdr.b_pabd, ==, NULL);
         } else {
                 ASSERT3P(hdr->b_l1hdr.b_buf, ==, NULL);
                 ASSERT0(hdr->b_l1hdr.b_bufcnt);
-                ASSERT3P(hdr->b_l1hdr.b_freeze_cksum, ==, NULL);
+                ASSERT3P(hdr->b_freeze_cksum, ==, NULL);
 
                 /*
                  * If we've reached here, We must have been called from
                  * arc_evict_hdr(), as such we should have already been
                  * removed from any ghost list we were previously on
@@ -3065,11 +3570,11 @@
         ASSERT(!MUTEX_HELD(HDR_LOCK(hdr)));
 
         arc_buf_t *buf = NULL;
         VERIFY0(arc_buf_alloc_impl(hdr, tag, B_TRUE, B_FALSE, &buf));
         arc_buf_thaw(buf);
-        ASSERT3P(hdr->b_l1hdr.b_freeze_cksum, ==, NULL);
+        ASSERT3P(hdr->b_freeze_cksum, ==, NULL);
 
         if (!arc_buf_is_shared(buf)) {
                 /*
                  * To ensure that the hdr has the correct data in it if we call
                  * arc_decompress() on this buf before it's been written to
@@ -3097,10 +3602,14 @@
         list_remove(&dev->l2ad_buflist, hdr);
 
         ARCSTAT_INCR(arcstat_l2_psize, -psize);
         ARCSTAT_INCR(arcstat_l2_lsize, -HDR_GET_LSIZE(hdr));
 
+        /*
+         * l2ad_vdev can be NULL here if we async evicted it
+         */
+        if (dev->l2ad_vdev != NULL)
         vdev_space_update(dev->l2ad_vdev, -psize, 0, 0);
 
         (void) refcount_remove_many(&dev->l2ad_alloc, psize, hdr);
         arc_hdr_clear_flags(hdr, ARC_FLAG_HAS_L2HDR);
 }
@@ -3115,21 +3624,30 @@
                 ASSERT3P(hdr->b_l1hdr.b_state, ==, arc_anon);
         }
         ASSERT(!HDR_IO_IN_PROGRESS(hdr));
         ASSERT(!HDR_IN_HASH_TABLE(hdr));
 
-        if (!HDR_EMPTY(hdr))
-                buf_discard_identity(hdr);
-
         if (HDR_HAS_L2HDR(hdr)) {
                 l2arc_dev_t *dev = hdr->b_l2hdr.b_dev;
                 boolean_t buflist_held = MUTEX_HELD(&dev->l2ad_mtx);
 
+                /* To avoid racing with L2ARC the header needs to be locked */
+                ASSERT(MUTEX_HELD(HDR_LOCK(hdr)));
+
                 if (!buflist_held)
                         mutex_enter(&dev->l2ad_mtx);
 
                 /*
+                 * L2ARC buflist has been held, so we can safety discard
+                 * identity, otherwise L2ARC can lock incorrect mutex
+                 * for the hdr, that will cause a panic. That is possible,
+                 * because a mutex is selected according to identity.
+                 */
+                if (!HDR_EMPTY(hdr))
+                        buf_discard_identity(hdr);
+
+                /*
                  * Even though we checked this conditional above, we
                  * need to check this again now that we have the
                  * l2ad_mtx. This is because we could be racing with
                  * another thread calling l2arc_evict() which might have
                  * destroyed this header's L2 portion as we were waiting
@@ -3141,10 +3659,13 @@
 
                 if (!buflist_held)
                         mutex_exit(&dev->l2ad_mtx);
         }
 
+        if (!HDR_EMPTY(hdr))
+                buf_discard_identity(hdr);
+
         if (HDR_HAS_L1HDR(hdr)) {
                 arc_cksum_free(hdr);
 
                 while (hdr->b_l1hdr.b_buf != NULL)
                         arc_buf_destroy_impl(hdr->b_l1hdr.b_buf);
@@ -3216,10 +3737,12 @@
         int64_t bytes_evicted = 0;
 
         ASSERT(MUTEX_HELD(hash_lock));
         ASSERT(HDR_HAS_L1HDR(hdr));
 
+        arc_wait_for_krrp(hdr);
+
         state = hdr->b_l1hdr.b_state;
         if (GHOST_STATE(state)) {
                 ASSERT(!HDR_IO_IN_PROGRESS(hdr));
                 ASSERT3P(hdr->b_l1hdr.b_buf, ==, NULL);
 
@@ -3604,73 +4127,92 @@
 
         return (0);
 }
 
 /*
- * Evict metadata buffers from the cache, such that arc_meta_used is
- * capped by the arc_meta_limit tunable.
+ * Depending on the value of adjust_ddt arg evict either DDT (B_TRUE)
+ * or metadata (B_TRUE) buffers.
+ * Evict metadata or DDT buffers from the cache, such that arc_meta_used or
+ * arc_ddt_size is capped by the arc_meta_limit or arc_ddt_limit tunable.
  */
 static uint64_t
-arc_adjust_meta(uint64_t meta_used)
+arc_adjust_meta_or_ddt(boolean_t adjust_ddt)
 {
         uint64_t total_evicted = 0;
-        int64_t target;
+        int64_t target, over_limit;
+        arc_buf_contents_t type;
 
+        if (adjust_ddt) {
+                over_limit = arc_ddt_size - arc_ddt_limit;
+                type = ARC_BUFC_DDT;
+        } else {
+                over_limit = arc_meta_used - arc_meta_limit;
+                type = ARC_BUFC_METADATA;
+        }
+
         /*
-         * If we're over the meta limit, we want to evict enough
-         * metadata to get back under the meta limit. We don't want to
+         * If we're over the limit, we want to evict enough
+         * to get back under the limit. We don't want to
          * evict so much that we drop the MRU below arc_p, though. If
          * we're over the meta limit more than we're over arc_p, we
          * evict some from the MRU here, and some from the MFU below.
          */
-        target = MIN((int64_t)(meta_used - arc_meta_limit),
+        target = MIN(over_limit,
             (int64_t)(refcount_count(&arc_anon->arcs_size) +
             refcount_count(&arc_mru->arcs_size) - arc_p));
 
-        total_evicted += arc_adjust_impl(arc_mru, 0, target, ARC_BUFC_METADATA);
+        total_evicted += arc_adjust_impl(arc_mru, 0, target, type);
 
+        over_limit = adjust_ddt ? arc_ddt_size - arc_ddt_limit :
+            arc_meta_used - arc_meta_limit;
+
         /*
          * Similar to the above, we want to evict enough bytes to get us
          * below the meta limit, but not so much as to drop us below the
          * space allotted to the MFU (which is defined as arc_c - arc_p).
          */
-        target = MIN((int64_t)(meta_used - arc_meta_limit),
-            (int64_t)(refcount_count(&arc_mfu->arcs_size) -
-            (arc_c - arc_p)));
+        target = MIN(over_limit,
+            (int64_t)(refcount_count(&arc_mfu->arcs_size) - (arc_c - arc_p)));
 
-        total_evicted += arc_adjust_impl(arc_mfu, 0, target, ARC_BUFC_METADATA);
+        total_evicted += arc_adjust_impl(arc_mfu, 0, target, type);
 
         return (total_evicted);
 }
 
 /*
  * Return the type of the oldest buffer in the given arc state
  *
- * This function will select a random sublist of type ARC_BUFC_DATA and
- * a random sublist of type ARC_BUFC_METADATA. The tail of each sublist
+ * This function will select a random sublists of type ARC_BUFC_DATA,
+ * ARC_BUFC_METADATA, and ARC_BUFC_DDT. The tail of each sublist
  * is compared, and the type which contains the "older" buffer will be
  * returned.
  */
 static arc_buf_contents_t
 arc_adjust_type(arc_state_t *state)
 {
         multilist_t *data_ml = state->arcs_list[ARC_BUFC_DATA];
         multilist_t *meta_ml = state->arcs_list[ARC_BUFC_METADATA];
+        multilist_t *ddt_ml = state->arcs_list[ARC_BUFC_DDT];
         int data_idx = multilist_get_random_index(data_ml);
         int meta_idx = multilist_get_random_index(meta_ml);
+        int ddt_idx = multilist_get_random_index(ddt_ml);
         multilist_sublist_t *data_mls;
         multilist_sublist_t *meta_mls;
-        arc_buf_contents_t type;
+        multilist_sublist_t *ddt_mls;
+        arc_buf_contents_t type = ARC_BUFC_DATA; /* silence compiler warning */
         arc_buf_hdr_t *data_hdr;
         arc_buf_hdr_t *meta_hdr;
+        arc_buf_hdr_t *ddt_hdr;
+        clock_t oldest;
 
         /*
          * We keep the sublist lock until we're finished, to prevent
          * the headers from being destroyed via arc_evict_state().
          */
         data_mls = multilist_sublist_lock(data_ml, data_idx);
         meta_mls = multilist_sublist_lock(meta_ml, meta_idx);
+        ddt_mls = multilist_sublist_lock(ddt_ml, ddt_idx);
 
         /*
          * These two loops are to ensure we skip any markers that
          * might be at the tail of the lists due to arc_evict_state().
          */
@@ -3685,34 +4227,87 @@
             meta_hdr = multilist_sublist_prev(meta_mls, meta_hdr)) {
                 if (meta_hdr->b_spa != 0)
                         break;
         }
 
-        if (data_hdr == NULL && meta_hdr == NULL) {
+        for (ddt_hdr = multilist_sublist_tail(ddt_mls); ddt_hdr != NULL;
+            ddt_hdr = multilist_sublist_prev(ddt_mls, ddt_hdr)) {
+                if (ddt_hdr->b_spa != 0)
+                        break;
+        }
+
+        if (data_hdr == NULL && meta_hdr == NULL && ddt_hdr == NULL) {
                 type = ARC_BUFC_DATA;
-        } else if (data_hdr == NULL) {
+        } else if (data_hdr != NULL && meta_hdr != NULL && ddt_hdr != NULL) {
+                /* The headers can't be on the sublist without an L1 header */
+                ASSERT(HDR_HAS_L1HDR(data_hdr));
+                ASSERT(HDR_HAS_L1HDR(meta_hdr));
+                ASSERT(HDR_HAS_L1HDR(ddt_hdr));
+
+                oldest = data_hdr->b_l1hdr.b_arc_access;
+                type = ARC_BUFC_DATA;
+                if (oldest > meta_hdr->b_l1hdr.b_arc_access) {
+                        oldest = meta_hdr->b_l1hdr.b_arc_access;
+                        type = ARC_BUFC_METADATA;
+                }
+                if (oldest > ddt_hdr->b_l1hdr.b_arc_access) {
+                        type = ARC_BUFC_DDT;
+                }
+        } else if (data_hdr == NULL && ddt_hdr == NULL) {
                 ASSERT3P(meta_hdr, !=, NULL);
                 type = ARC_BUFC_METADATA;
-        } else if (meta_hdr == NULL) {
+        } else if (meta_hdr == NULL && ddt_hdr == NULL) {
                 ASSERT3P(data_hdr, !=, NULL);
                 type = ARC_BUFC_DATA;
+        } else if (meta_hdr == NULL && data_hdr == NULL) {
+                ASSERT3P(ddt_hdr, !=, NULL);
+                type = ARC_BUFC_DDT;
+        } else if (data_hdr != NULL && ddt_hdr != NULL) {
+                ASSERT3P(meta_hdr, ==, NULL);
+
+                /* The headers can't be on the sublist without an L1 header */
+                ASSERT(HDR_HAS_L1HDR(data_hdr));
+                ASSERT(HDR_HAS_L1HDR(ddt_hdr));
+
+                if (data_hdr->b_l1hdr.b_arc_access <
+                    ddt_hdr->b_l1hdr.b_arc_access) {
+                        type = ARC_BUFC_DATA;
         } else {
-                ASSERT3P(data_hdr, !=, NULL);
-                ASSERT3P(meta_hdr, !=, NULL);
+                        type = ARC_BUFC_DDT;
+                }
+        } else if (meta_hdr != NULL && ddt_hdr != NULL) {
+                ASSERT3P(data_hdr, ==, NULL);
 
                 /* The headers can't be on the sublist without an L1 header */
+                ASSERT(HDR_HAS_L1HDR(meta_hdr));
+                ASSERT(HDR_HAS_L1HDR(ddt_hdr));
+
+                if (meta_hdr->b_l1hdr.b_arc_access <
+                    ddt_hdr->b_l1hdr.b_arc_access) {
+                        type = ARC_BUFC_METADATA;
+                } else {
+                        type = ARC_BUFC_DDT;
+                }
+        } else if (meta_hdr != NULL && data_hdr != NULL) {
+                ASSERT3P(ddt_hdr, ==, NULL);
+
+                /* The headers can't be on the sublist without an L1 header */
                 ASSERT(HDR_HAS_L1HDR(data_hdr));
                 ASSERT(HDR_HAS_L1HDR(meta_hdr));
 
                 if (data_hdr->b_l1hdr.b_arc_access <
                     meta_hdr->b_l1hdr.b_arc_access) {
                         type = ARC_BUFC_DATA;
                 } else {
                         type = ARC_BUFC_METADATA;
                 }
+        } else {
+                /* should never get here */
+                ASSERT(0);
         }
 
+        multilist_sublist_unlock(ddt_mls);
         multilist_sublist_unlock(meta_mls);
         multilist_sublist_unlock(data_mls);
 
         return (type);
 }
@@ -3724,32 +4319,36 @@
 arc_adjust(void)
 {
         uint64_t total_evicted = 0;
         uint64_t bytes;
         int64_t target;
-        uint64_t asize = aggsum_value(&arc_size);
-        uint64_t ameta = aggsum_value(&arc_meta_used);
 
         /*
          * If we're over arc_meta_limit, we want to correct that before
          * potentially evicting data buffers below.
          */
-        total_evicted += arc_adjust_meta(ameta);
+        total_evicted += arc_adjust_meta_or_ddt(B_FALSE);
 
         /*
+         * If we're over arc_ddt_limit, we want to correct that before
+         * potentially evicting data buffers below.
+         */
+        total_evicted += arc_adjust_meta_or_ddt(B_TRUE);
+
+        /*
          * Adjust MRU size
          *
          * If we're over the target cache size, we want to evict enough
          * from the list to get back to our target size. We don't want
          * to evict too much from the MRU, such that it drops below
          * arc_p. So, if we're over our target cache size more than
          * the MRU is over arc_p, we'll evict enough to get back to
          * arc_p here, and then evict more from the MFU below.
          */
-        target = MIN((int64_t)(asize - arc_c),
+        target = MIN((int64_t)(arc_size - arc_c),
             (int64_t)(refcount_count(&arc_anon->arcs_size) +
-            refcount_count(&arc_mru->arcs_size) + ameta - arc_p));
+            refcount_count(&arc_mru->arcs_size) + arc_meta_used - arc_p));
 
         /*
          * If we're below arc_meta_min, always prefer to evict data.
          * Otherwise, try to satisfy the requested number of bytes to
          * evict from the type which contains older buffers; in an
@@ -3756,22 +4355,22 @@
          * effort to keep newer buffers in the cache regardless of their
          * type. If we cannot satisfy the number of bytes from this
          * type, spill over into the next type.
          */
         if (arc_adjust_type(arc_mru) == ARC_BUFC_METADATA &&
-            ameta > arc_meta_min) {
+            arc_meta_used > arc_meta_min) {
                 bytes = arc_adjust_impl(arc_mru, 0, target, ARC_BUFC_METADATA);
                 total_evicted += bytes;
 
                 /*
                  * If we couldn't evict our target number of bytes from
                  * metadata, we try to get the rest from data.
                  */
                 target -= bytes;
 
-                total_evicted +=
-                    arc_adjust_impl(arc_mru, 0, target, ARC_BUFC_DATA);
+                bytes += arc_adjust_impl(arc_mru, 0, target, ARC_BUFC_DATA);
+                total_evicted += bytes;
         } else {
                 bytes = arc_adjust_impl(arc_mru, 0, target, ARC_BUFC_DATA);
                 total_evicted += bytes;
 
                 /*
@@ -3778,36 +4377,44 @@
                  * If we couldn't evict our target number of bytes from
                  * data, we try to get the rest from metadata.
                  */
                 target -= bytes;
 
-                total_evicted +=
-                    arc_adjust_impl(arc_mru, 0, target, ARC_BUFC_METADATA);
+                bytes += arc_adjust_impl(arc_mru, 0, target, ARC_BUFC_METADATA);
+                total_evicted += bytes;
         }
 
         /*
+         * If we couldn't evict our target number of bytes from
+         * data and metadata, we try to get the rest from ddt.
+         */
+        target -= bytes;
+        total_evicted +=
+            arc_adjust_impl(arc_mru, 0, target, ARC_BUFC_DDT);
+
+        /*
          * Adjust MFU size
          *
          * Now that we've tried to evict enough from the MRU to get its
          * size back to arc_p, if we're still above the target cache
          * size, we evict the rest from the MFU.
          */
-        target = asize - arc_c;
+        target = arc_size - arc_c;
 
         if (arc_adjust_type(arc_mfu) == ARC_BUFC_METADATA &&
-            ameta > arc_meta_min) {
+            arc_meta_used > arc_meta_min) {
                 bytes = arc_adjust_impl(arc_mfu, 0, target, ARC_BUFC_METADATA);
                 total_evicted += bytes;
 
                 /*
                  * If we couldn't evict our target number of bytes from
                  * metadata, we try to get the rest from data.
                  */
                 target -= bytes;
 
-                total_evicted +=
-                    arc_adjust_impl(arc_mfu, 0, target, ARC_BUFC_DATA);
+                bytes += arc_adjust_impl(arc_mfu, 0, target, ARC_BUFC_DATA);
+                total_evicted += bytes;
         } else {
                 bytes = arc_adjust_impl(arc_mfu, 0, target, ARC_BUFC_DATA);
                 total_evicted += bytes;
 
                 /*
@@ -3814,15 +4421,23 @@
                  * If we couldn't evict our target number of bytes from
                  * data, we try to get the rest from data.
                  */
                 target -= bytes;
 
-                total_evicted +=
-                    arc_adjust_impl(arc_mfu, 0, target, ARC_BUFC_METADATA);
+                bytes += arc_adjust_impl(arc_mfu, 0, target, ARC_BUFC_METADATA);
+                total_evicted += bytes;
         }
 
         /*
+         * If we couldn't evict our target number of bytes from
+         * data and metadata, we try to get the rest from ddt.
+         */
+        target -= bytes;
+        total_evicted +=
+            arc_adjust_impl(arc_mfu, 0, target, ARC_BUFC_DDT);
+
+        /*
          * Adjust ghost lists
          *
          * In addition to the above, the ARC also defines target values
          * for the ghost lists. The sum of the mru list and mru ghost
          * list should never exceed the target size of the cache, and
@@ -3837,12 +4452,17 @@
         bytes = arc_adjust_impl(arc_mru_ghost, 0, target, ARC_BUFC_DATA);
         total_evicted += bytes;
 
         target -= bytes;
 
+        bytes += arc_adjust_impl(arc_mru_ghost, 0, target, ARC_BUFC_METADATA);
+        total_evicted += bytes;
+
+        target -= bytes;
+
         total_evicted +=
-            arc_adjust_impl(arc_mru_ghost, 0, target, ARC_BUFC_METADATA);
+            arc_adjust_impl(arc_mru_ghost, 0, target, ARC_BUFC_DDT);
 
         /*
          * We assume the sum of the mru list and mfu list is less than
          * or equal to arc_c (we enforced this above), which means we
          * can use the simpler of the two equations below:
@@ -3856,65 +4476,114 @@
         bytes = arc_adjust_impl(arc_mfu_ghost, 0, target, ARC_BUFC_DATA);
         total_evicted += bytes;
 
         target -= bytes;
 
+        bytes += arc_adjust_impl(arc_mfu_ghost, 0, target, ARC_BUFC_METADATA);
+        total_evicted += bytes;
+
+        target -= bytes;
+
         total_evicted +=
-            arc_adjust_impl(arc_mfu_ghost, 0, target, ARC_BUFC_METADATA);
+            arc_adjust_impl(arc_mfu_ghost, 0, target, ARC_BUFC_DDT);
 
         return (total_evicted);
 }
 
+typedef struct arc_async_flush_data {
+        uint64_t        aaf_guid;
+        boolean_t       aaf_retry;
+} arc_async_flush_data_t;
+
+static taskq_t *arc_flush_taskq;
+
+static void
+arc_flush_impl(uint64_t guid, boolean_t retry)
+{
+        arc_buf_contents_t arcs;
+
+        for (arcs = ARC_BUFC_DATA; arcs < ARC_BUFC_NUMTYPES; ++arcs) {
+                (void) arc_flush_state(arc_mru, guid, arcs, retry);
+                (void) arc_flush_state(arc_mfu, guid, arcs, retry);
+                (void) arc_flush_state(arc_mru_ghost, guid, arcs, retry);
+                (void) arc_flush_state(arc_mfu_ghost, guid, arcs, retry);
+        }
+}
+
+static void
+arc_flush_task(void *arg)
+{
+        arc_async_flush_data_t *aaf = (arc_async_flush_data_t *)arg;
+        arc_flush_impl(aaf->aaf_guid, aaf->aaf_retry);
+        kmem_free(aaf, sizeof (arc_async_flush_data_t));
+}
+
+boolean_t zfs_fastflush = B_TRUE;
+
 void
 arc_flush(spa_t *spa, boolean_t retry)
 {
         uint64_t guid = 0;
+        boolean_t async_flush = (spa != NULL ? zfs_fastflush : FALSE);
+        arc_async_flush_data_t *aaf = NULL;
 
         /*
          * If retry is B_TRUE, a spa must not be specified since we have
          * no good way to determine if all of a spa's buffers have been
          * evicted from an arc state.
          */
-        ASSERT(!retry || spa == 0);
+        ASSERT(!retry || spa == NULL);
 
-        if (spa != NULL)
+        if (spa != NULL) {
                 guid = spa_load_guid(spa);
+                if (async_flush) {
+                        aaf = kmem_alloc(sizeof (arc_async_flush_data_t),
+                            KM_SLEEP);
+                        aaf->aaf_guid = guid;
+                        aaf->aaf_retry = retry;
+                }
+        }
 
-        (void) arc_flush_state(arc_mru, guid, ARC_BUFC_DATA, retry);
-        (void) arc_flush_state(arc_mru, guid, ARC_BUFC_METADATA, retry);
-
-        (void) arc_flush_state(arc_mfu, guid, ARC_BUFC_DATA, retry);
-        (void) arc_flush_state(arc_mfu, guid, ARC_BUFC_METADATA, retry);
-
-        (void) arc_flush_state(arc_mru_ghost, guid, ARC_BUFC_DATA, retry);
-        (void) arc_flush_state(arc_mru_ghost, guid, ARC_BUFC_METADATA, retry);
-
-        (void) arc_flush_state(arc_mfu_ghost, guid, ARC_BUFC_DATA, retry);
-        (void) arc_flush_state(arc_mfu_ghost, guid, ARC_BUFC_METADATA, retry);
+        /*
+         * Try to flush per-spa remaining ARC ghost buffers asynchronously
+         * while a pool is being closed.
+         * An ARC buffer is bound to spa only by guid, so buffer can
+         * exist even when pool has already gone. If asynchronous flushing
+         * fails we fall back to regular (synchronous) one.
+         * NOTE: If asynchronous flushing had not yet finished when the pool
+         * was imported again it wouldn't be a problem, even when guids before
+         * and after export/import are the same. We can evict only unreferenced
+         * buffers, other are skipped.
+         */
+        if (!async_flush || (taskq_dispatch(arc_flush_taskq, arc_flush_task,
+            aaf, TQ_NOSLEEP) == NULL)) {
+                arc_flush_impl(guid, retry);
+                if (async_flush)
+                        kmem_free(aaf, sizeof (arc_async_flush_data_t));
+        }
 }
 
 void
 arc_shrink(int64_t to_free)
 {
-        uint64_t asize = aggsum_value(&arc_size);
         if (arc_c > arc_c_min) {
 
                 if (arc_c > arc_c_min + to_free)
                         atomic_add_64(&arc_c, -to_free);
                 else
                         arc_c = arc_c_min;
 
                 atomic_add_64(&arc_p, -(arc_p >> arc_shrink_shift));
-                if (asize < arc_c)
-                        arc_c = MAX(asize, arc_c_min);
+                if (arc_c > arc_size)
+                        arc_c = MAX(arc_size, arc_c_min);
                 if (arc_p > arc_c)
                         arc_p = (arc_c >> 1);
                 ASSERT(arc_c >= arc_c_min);
                 ASSERT((int64_t)arc_p >= 0);
         }
 
-        if (asize > arc_c)
+        if (arc_size > arc_c)
                 (void) arc_adjust();
 }
 
 typedef enum free_memory_reason_t {
         FMR_UNKNOWN,
@@ -4074,14 +4743,14 @@
         extern kmem_cache_t     *zio_data_buf_cache[];
         extern kmem_cache_t     *range_seg_cache;
         extern kmem_cache_t     *abd_chunk_cache;
 
 #ifdef _KERNEL
-        if (aggsum_compare(&arc_meta_used, arc_meta_limit) >= 0) {
+        if (arc_meta_used >= arc_meta_limit || arc_ddt_size >= arc_ddt_limit) {
                 /*
-                 * We are exceeding our meta-data cache limit.
-                 * Purge some DNLC entries to release holds on meta-data.
+                 * We are exceeding our meta-data or DDT cache limit.
+                 * Purge some DNLC entries to release holds on meta-data/DDT.
                  */
                 dnlc_reduce_cache((void *)(uintptr_t)arc_reduce_dnlc_percent);
         }
 #if defined(__i386)
         /*
@@ -4233,11 +4902,11 @@
                  * arc buffers being unevictable. Therefore, even if
                  * arc_size is above arc_c, another pass is unlikely to
                  * be helpful and could potentially cause us to enter an
                  * infinite loop.
                  */
-                if (aggsum_compare(&arc_size, arc_c) <= 0|| evicted == 0) {
+                if (arc_size <= arc_c || evicted == 0) {
                         /*
                          * We're either no longer overflowing, or we
                          * can't evict anything more, so we should wake
                          * up any threads before we go to sleep.
                          */
@@ -4315,12 +4984,11 @@
 
         /*
          * If we're within (2 * maxblocksize) bytes of the target
          * cache size, increment the target cache size
          */
-        if (aggsum_compare(&arc_size, arc_c - (2ULL << SPA_MAXBLOCKSHIFT)) >
-            0) {
+        if (arc_size > arc_c - (2ULL << SPA_MAXBLOCKSHIFT)) {
                 atomic_add_64(&arc_c, (int64_t)bytes);
                 if (arc_c > arc_c_max)
                         arc_c = arc_c_max;
                 else if (state == arc_anon)
                         atomic_add_64(&arc_p, (int64_t)bytes);
@@ -4339,29 +5007,20 @@
 {
         /* Always allow at least one block of overflow */
         uint64_t overflow = MAX(SPA_MAXBLOCKSIZE,
             arc_c >> zfs_arc_overflow_shift);
 
-        /*
-         * We just compare the lower bound here for performance reasons. Our
-         * primary goals are to make sure that the arc never grows without
-         * bound, and that it can reach its maximum size. This check
-         * accomplishes both goals. The maximum amount we could run over by is
-         * 2 * aggsum_borrow_multiplier * NUM_CPUS * the average size of a block
-         * in the ARC. In practice, that's in the tens of MB, which is low
-         * enough to be safe.
-         */
-        return (aggsum_lower_bound(&arc_size) >= arc_c + overflow);
+        return (arc_size >= arc_c + overflow);
 }
 
 static abd_t *
 arc_get_data_abd(arc_buf_hdr_t *hdr, uint64_t size, void *tag)
 {
         arc_buf_contents_t type = arc_buf_type(hdr);
 
         arc_get_data_impl(hdr, size, tag);
-        if (type == ARC_BUFC_METADATA) {
+        if (type == ARC_BUFC_METADATA || type == ARC_BUFC_DDT) {
                 return (abd_alloc(size, B_TRUE));
         } else {
                 ASSERT(type == ARC_BUFC_DATA);
                 return (abd_alloc(size, B_FALSE));
         }
@@ -4371,11 +5030,11 @@
 arc_get_data_buf(arc_buf_hdr_t *hdr, uint64_t size, void *tag)
 {
         arc_buf_contents_t type = arc_buf_type(hdr);
 
         arc_get_data_impl(hdr, size, tag);
-        if (type == ARC_BUFC_METADATA) {
+        if (type == ARC_BUFC_METADATA || type == ARC_BUFC_DDT) {
                 return (zio_buf_alloc(size));
         } else {
                 ASSERT(type == ARC_BUFC_DATA);
                 return (zio_data_buf_alloc(size));
         }
@@ -4430,11 +5089,13 @@
 
                 mutex_exit(&arc_reclaim_lock);
         }
 
         VERIFY3U(hdr->b_type, ==, type);
-        if (type == ARC_BUFC_METADATA) {
+        if (type == ARC_BUFC_DDT) {
+                arc_space_consume(size, ARC_SPACE_DDT);
+        } else if (type == ARC_BUFC_METADATA) {
                 arc_space_consume(size, ARC_SPACE_META);
         } else {
                 arc_space_consume(size, ARC_SPACE_DATA);
         }
 
@@ -4463,12 +5124,11 @@
 
                 /*
                  * If we are growing the cache, and we are adding anonymous
                  * data, and we have outgrown arc_p, update arc_p
                  */
-                if (aggsum_compare(&arc_size, arc_c) < 0 &&
-                    hdr->b_l1hdr.b_state == arc_anon &&
+                if (arc_size < arc_c && hdr->b_l1hdr.b_state == arc_anon &&
                     (refcount_count(&arc_anon->arcs_size) +
                     refcount_count(&arc_mru->arcs_size) > arc_p))
                         arc_p = MIN(arc_c, arc_p + size);
         }
 }
@@ -4484,11 +5144,11 @@
 arc_free_data_buf(arc_buf_hdr_t *hdr, void *buf, uint64_t size, void *tag)
 {
         arc_buf_contents_t type = arc_buf_type(hdr);
 
         arc_free_data_impl(hdr, size, tag);
-        if (type == ARC_BUFC_METADATA) {
+        if (type == ARC_BUFC_METADATA || type == ARC_BUFC_DDT) {
                 zio_buf_free(buf, size);
         } else {
                 ASSERT(type == ARC_BUFC_DATA);
                 zio_data_buf_free(buf, size);
         }
@@ -4512,11 +5172,13 @@
                     size, tag);
         }
         (void) refcount_remove_many(&state->arcs_size, size, tag);
 
         VERIFY3U(hdr->b_type, ==, type);
-        if (type == ARC_BUFC_METADATA) {
+        if (type == ARC_BUFC_DDT) {
+                arc_space_return(size, ARC_SPACE_DDT);
+        } else if (type == ARC_BUFC_METADATA) {
                 arc_space_return(size, ARC_SPACE_META);
         } else {
                 ASSERT(type == ARC_BUFC_DATA);
                 arc_space_return(size, ARC_SPACE_DATA);
         }
@@ -4658,10 +5320,69 @@
         } else {
                 ASSERT(!"invalid arc state");
         }
 }
 
+/*
+ * This routine is called by dbuf_hold() to update the arc_access() state
+ * which otherwise would be skipped for entries in the dbuf cache.
+ */
+void
+arc_buf_access(arc_buf_t *buf)
+{
+        mutex_enter(&buf->b_evict_lock);
+        arc_buf_hdr_t *hdr = buf->b_hdr;
+
+        /*
+         * Avoid taking the hash_lock when possible as an optimization.
+         * The header must be checked again under the hash_lock in order
+         * to handle the case where it is concurrently being released.
+         */
+        if (hdr->b_l1hdr.b_state == arc_anon || HDR_EMPTY(hdr)) {
+                mutex_exit(&buf->b_evict_lock);
+                return;
+        }
+
+        kmutex_t *hash_lock = HDR_LOCK(hdr);
+        mutex_enter(hash_lock);
+
+        if (hdr->b_l1hdr.b_state == arc_anon || HDR_EMPTY(hdr)) {
+                mutex_exit(hash_lock);
+                mutex_exit(&buf->b_evict_lock);
+                ARCSTAT_BUMP(arcstat_access_skip);
+                return;
+        }
+
+        mutex_exit(&buf->b_evict_lock);
+
+        ASSERT(hdr->b_l1hdr.b_state == arc_mru ||
+            hdr->b_l1hdr.b_state == arc_mfu);
+
+        DTRACE_PROBE1(arc__hit, arc_buf_hdr_t *, hdr);
+        arc_access(hdr, hash_lock);
+        mutex_exit(hash_lock);
+
+        ARCSTAT_BUMP(arcstat_hits);
+        /*
+         * Upstream used the ARCSTAT_CONDSTAT macro here, but they changed
+         * the argument format for that macro, which would requie that we
+         * go and modify all other uses of it. So it's easier to just expand
+         * this one invocation of the macro to do the right thing.
+         */
+        if (!HDR_PREFETCH(hdr)) {
+                if (!HDR_ISTYPE_METADATA(hdr))
+                        ARCSTAT_BUMP(arcstat_demand_data_hits);
+                else
+                        ARCSTAT_BUMP(arcstat_demand_metadata_hits);
+        } else {
+                if (!HDR_ISTYPE_METADATA(hdr))
+                        ARCSTAT_BUMP(arcstat_prefetch_data_hits);
+                else
+                        ARCSTAT_BUMP(arcstat_prefetch_metadata_hits);
+        }
+}
+
 /* a generic arc_done_func_t which you can use */
 /* ARGSUSED */
 void
 arc_bcopy_func(zio_t *zio, arc_buf_t *buf, void *arg)
 {
@@ -4800,12 +5521,15 @@
                 arc_hdr_verify(hdr, zio->io_bp);
         } else {
                 arc_hdr_set_flags(hdr, ARC_FLAG_IO_ERROR);
                 if (hdr->b_l1hdr.b_state != arc_anon)
                         arc_change_state(arc_anon, hdr, hash_lock);
-                if (HDR_IN_HASH_TABLE(hdr))
+                if (HDR_IN_HASH_TABLE(hdr)) {
+                        if (hash_lock)
+                                arc_wait_for_krrp(hdr);
                         buf_hash_remove(hdr);
+                }
                 freeable = refcount_is_zero(&hdr->b_l1hdr.b_refcnt);
         }
 
         /*
          * Broadcast before we drop the hash_lock to avoid the possibility
@@ -4844,10 +5568,59 @@
         if (freeable)
                 arc_hdr_destroy(hdr);
 }
 
 /*
+ * The function to process data from arc by a callback
+ * The main purpose is to directly copy data from arc to a target buffer
+ */
+int
+arc_io_bypass(spa_t *spa, const blkptr_t *bp,
+    arc_bypass_io_func func, void *arg)
+{
+        arc_buf_hdr_t *hdr;
+        kmutex_t *hash_lock = NULL;
+        int error = 0;
+        uint64_t guid = spa_load_guid(spa);
+
+top:
+        hdr = buf_hash_find(guid, bp, &hash_lock);
+        if (hdr && HDR_HAS_L1HDR(hdr) && hdr->b_l1hdr.b_bufcnt > 0 &&
+            hdr->b_l1hdr.b_buf->b_data) {
+                if (HDR_IO_IN_PROGRESS(hdr)) {
+                        cv_wait(&hdr->b_l1hdr.b_cv, hash_lock);
+                        mutex_exit(hash_lock);
+                        DTRACE_PROBE(arc_bypass_wait);
+                        goto top;
+                }
+
+                /*
+                 * As the func is an arbitrary callback, which can block, lock
+                 * should be released not to block other threads from
+                 * performing. A counter is used to hold a reference to block
+                 * which are held by krrp.
+                 */
+
+                hdr->b_l1hdr.b_krrp++;
+                mutex_exit(hash_lock);
+
+                error = func(hdr->b_l1hdr.b_buf->b_data, hdr->b_lsize, arg);
+
+                mutex_enter(hash_lock);
+                hdr->b_l1hdr.b_krrp--;
+                cv_broadcast(&hdr->b_l1hdr.b_cv);
+                mutex_exit(hash_lock);
+
+                return (error);
+        } else {
+                if (hash_lock)
+                        mutex_exit(hash_lock);
+                return (ENODATA);
+        }
+}
+
+/*
  * "Read" the block at the specified DVA (in bp) via the
  * cache.  If the block is found in the cache, invoke the provided
  * callback immediately and return.  Note that the `zio' parameter
  * in the callback will be NULL in this case, since no IO was
  * required.  If the block is not in the cache pass the read request
@@ -4983,13 +5756,13 @@
                 arc_access(hdr, hash_lock);
                 if (*arc_flags & ARC_FLAG_L2CACHE)
                         arc_hdr_set_flags(hdr, ARC_FLAG_L2CACHE);
                 mutex_exit(hash_lock);
                 ARCSTAT_BUMP(arcstat_hits);
-                ARCSTAT_CONDSTAT(!HDR_PREFETCH(hdr),
-                    demand, prefetch, !HDR_ISTYPE_METADATA(hdr),
-                    data, metadata, hits);
+                if (HDR_ISTYPE_DDT(hdr))
+                        ARCSTAT_BUMP(arcstat_ddt_hits);
+                arc_update_hit_stat(hdr, B_TRUE);
 
                 if (done)
                         done(NULL, buf, private);
         } else {
                 uint64_t lsize = BP_GET_LSIZE(bp);
@@ -5012,13 +5785,12 @@
                                 hdr->b_birth = BP_PHYSICAL_BIRTH(bp);
                                 exists = buf_hash_insert(hdr, &hash_lock);
                         }
                         if (exists != NULL) {
                                 /* somebody beat us to the hash insert */
-                                mutex_exit(hash_lock);
-                                buf_discard_identity(hdr);
                                 arc_hdr_destroy(hdr);
+                                mutex_exit(hash_lock);
                                 goto top; /* restart the IO request */
                         }
                 } else {
                         /*
                          * This block is in the ghost cache. If it was L2-only
@@ -5032,11 +5804,11 @@
                         ASSERT3P(hdr->b_l1hdr.b_pabd, ==, NULL);
                         ASSERT(GHOST_STATE(hdr->b_l1hdr.b_state));
                         ASSERT(!HDR_IO_IN_PROGRESS(hdr));
                         ASSERT(refcount_is_zero(&hdr->b_l1hdr.b_refcnt));
                         ASSERT3P(hdr->b_l1hdr.b_buf, ==, NULL);
-                        ASSERT3P(hdr->b_l1hdr.b_freeze_cksum, ==, NULL);
+                        ASSERT3P(hdr->b_freeze_cksum, ==, NULL);
 
                         /*
                          * This is a delicate dance that we play here.
                          * This hdr is in the ghost list so we access it
                          * to move it out of the ghost list before we
@@ -5084,11 +5856,11 @@
                 if (HDR_HAS_L2HDR(hdr) &&
                     (vd = hdr->b_l2hdr.b_dev->l2ad_vdev) != NULL) {
                         devw = hdr->b_l2hdr.b_dev->l2ad_writing;
                         addr = hdr->b_l2hdr.b_daddr;
                         /*
-                         * Lock out L2ARC device removal.
+                         * Lock out device removal.
                          */
                         if (vdev_is_dead(vd) ||
                             !spa_config_tryenter(spa, SCL_L2ARC, vd, RW_READER))
                                 vd = NULL;
                 }
@@ -5108,13 +5880,11 @@
                 ASSERT3U(HDR_GET_LSIZE(hdr), ==, lsize);
 
                 DTRACE_PROBE4(arc__miss, arc_buf_hdr_t *, hdr, blkptr_t *, bp,
                     uint64_t, lsize, zbookmark_phys_t *, zb);
                 ARCSTAT_BUMP(arcstat_misses);
-                ARCSTAT_CONDSTAT(!HDR_PREFETCH(hdr),
-                    demand, prefetch, !HDR_ISTYPE_METADATA(hdr),
-                    data, metadata, misses);
+                arc_update_hit_stat(hdr, B_FALSE);
 
                 if (vd != NULL && l2arc_ndev != 0 && !(l2arc_norw && devw)) {
                         /*
                          * Read from the L2ARC if the following are true:
                          * 1. The L2ARC vdev was previously cached.
@@ -5131,10 +5901,12 @@
                                 abd_t *abd;
                                 uint64_t asize;
 
                                 DTRACE_PROBE1(l2arc__hit, arc_buf_hdr_t *, hdr);
                                 ARCSTAT_BUMP(arcstat_l2_hits);
+                                if (vdev_type_is_ddt(vd))
+                                        ARCSTAT_BUMP(arcstat_l2_ddt_hits);
 
                                 cb = kmem_zalloc(sizeof (l2arc_read_callback_t),
                                     KM_SLEEP);
                                 cb->l2rcb_hdr = hdr;
                                 cb->l2rcb_bp = *bp;
@@ -5142,11 +5914,11 @@
                                 cb->l2rcb_flags = zio_flags;
 
                                 asize = vdev_psize_to_asize(vd, size);
                                 if (asize != size) {
                                         abd = abd_alloc_for_io(asize,
-                                            HDR_ISTYPE_METADATA(hdr));
+                                            !HDR_ISTYPE_DATA(hdr));
                                         cb->l2rcb_abd = abd;
                                 } else {
                                         abd = hdr->b_l1hdr.b_pabd;
                                 }
 
@@ -5170,11 +5942,15 @@
                                     ZIO_FLAG_CANFAIL |
                                     ZIO_FLAG_DONT_PROPAGATE |
                                     ZIO_FLAG_DONT_RETRY, B_FALSE);
                                 DTRACE_PROBE2(l2arc__read, vdev_t *, vd,
                                     zio_t *, rzio);
+
                                 ARCSTAT_INCR(arcstat_l2_read_bytes, size);
+                                if (vdev_type_is_ddt(vd))
+                                        ARCSTAT_INCR(arcstat_l2_ddt_read_bytes,
+                                            size);
 
                                 if (*arc_flags & ARC_FLAG_NOWAIT) {
                                         zio_nowait(rzio);
                                         return (0);
                                 }
@@ -5442,10 +6218,12 @@
                 ASSERT(!HDR_SHARED_DATA(nhdr));
 
                 nhdr->b_l1hdr.b_buf = buf;
                 nhdr->b_l1hdr.b_bufcnt = 1;
                 (void) refcount_add(&nhdr->b_l1hdr.b_refcnt, tag);
+                nhdr->b_l1hdr.b_krrp = 0;
+
                 buf->b_hdr = nhdr;
 
                 mutex_exit(&buf->b_evict_lock);
                 (void) refcount_add_many(&arc_anon->arcs_size,
                     arc_buf_size(buf), buf);
@@ -5652,12 +6430,13 @@
                                         panic("bad overwrite, hdr=%p exists=%p",
                                             (void *)hdr, (void *)exists);
                                 ASSERT(refcount_is_zero(
                                     &exists->b_l1hdr.b_refcnt));
                                 arc_change_state(arc_anon, exists, hash_lock);
-                                mutex_exit(hash_lock);
+                                arc_wait_for_krrp(exists);
                                 arc_hdr_destroy(exists);
+                                mutex_exit(hash_lock);
                                 exists = buf_hash_insert(hdr, &hash_lock);
                                 ASSERT3P(exists, ==, NULL);
                         } else if (zio->io_flags & ZIO_FLAG_NOPWRITE) {
                                 /* nopwrite */
                                 ASSERT(zio->io_prop.zp_nopwrite);
@@ -5691,11 +6470,12 @@
 zio_t *
 arc_write(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp, arc_buf_t *buf,
     boolean_t l2arc, const zio_prop_t *zp, arc_done_func_t *ready,
     arc_done_func_t *children_ready, arc_done_func_t *physdone,
     arc_done_func_t *done, void *private, zio_priority_t priority,
-    int zio_flags, const zbookmark_phys_t *zb)
+    int zio_flags, const zbookmark_phys_t *zb,
+    const zio_smartcomp_info_t *smartcomp)
 {
         arc_buf_hdr_t *hdr = buf->b_hdr;
         arc_write_callback_t *callback;
         zio_t *zio;
         zio_prop_t localprop = *zp;
@@ -5752,11 +6532,11 @@
         zio = zio_write(pio, spa, txg, bp,
             abd_get_from_buf(buf->b_data, HDR_GET_LSIZE(hdr)),
             HDR_GET_LSIZE(hdr), arc_buf_size(buf), &localprop, arc_write_ready,
             (children_ready != NULL) ? arc_write_children_ready : NULL,
             arc_write_physdone, arc_write_done, callback,
-            priority, zio_flags, zb);
+            priority, zio_flags, zb, smartcomp);
 
         return (zio);
 }
 
 static int
@@ -5844,13 +6624,16 @@
          * gets too large.  We try to keep the cache less than half full
          * of dirty blocks so that our sync times don't grow too large.
          * Note: if two requests come in concurrently, we might let them
          * both succeed, when one of them should fail.  Not a huge deal.
          */
-
         if (reserve + arc_tempreserve + anon_size > arc_c / 2 &&
             anon_size > arc_c / 4) {
+                DTRACE_PROBE4(arc__tempreserve__space__throttle, uint64_t,
+                    arc_tempreserve, arc_state_t *, arc_anon, uint64_t,
+                    reserve, uint64_t, arc_c);
+
                 uint64_t meta_esize =
                     refcount_count(&arc_anon->arcs_esize[ARC_BUFC_METADATA]);
                 uint64_t data_esize =
                     refcount_count(&arc_anon->arcs_esize[ARC_BUFC_DATA]);
                 dprintf("failing, arc_tempreserve=%lluK anon_meta=%lluK "
@@ -5863,17 +6646,20 @@
         return (0);
 }
 
 static void
 arc_kstat_update_state(arc_state_t *state, kstat_named_t *size,
-    kstat_named_t *evict_data, kstat_named_t *evict_metadata)
+    kstat_named_t *evict_data, kstat_named_t *evict_metadata,
+    kstat_named_t *evict_ddt)
 {
         size->value.ui64 = refcount_count(&state->arcs_size);
         evict_data->value.ui64 =
             refcount_count(&state->arcs_esize[ARC_BUFC_DATA]);
         evict_metadata->value.ui64 =
             refcount_count(&state->arcs_esize[ARC_BUFC_METADATA]);
+        evict_ddt->value.ui64 =
+            refcount_count(&state->arcs_esize[ARC_BUFC_DDT]);
 }
 
 static int
 arc_kstat_update(kstat_t *ksp, int rw)
 {
@@ -5883,36 +6669,32 @@
                 return (EACCES);
         } else {
                 arc_kstat_update_state(arc_anon,
                     &as->arcstat_anon_size,
                     &as->arcstat_anon_evictable_data,
-                    &as->arcstat_anon_evictable_metadata);
+                    &as->arcstat_anon_evictable_metadata,
+                    &as->arcstat_anon_evictable_ddt);
                 arc_kstat_update_state(arc_mru,
                     &as->arcstat_mru_size,
                     &as->arcstat_mru_evictable_data,
-                    &as->arcstat_mru_evictable_metadata);
+                    &as->arcstat_mru_evictable_metadata,
+                    &as->arcstat_mru_evictable_ddt);
                 arc_kstat_update_state(arc_mru_ghost,
                     &as->arcstat_mru_ghost_size,
                     &as->arcstat_mru_ghost_evictable_data,
-                    &as->arcstat_mru_ghost_evictable_metadata);
+                    &as->arcstat_mru_ghost_evictable_metadata,
+                    &as->arcstat_mru_ghost_evictable_ddt);
                 arc_kstat_update_state(arc_mfu,
                     &as->arcstat_mfu_size,
                     &as->arcstat_mfu_evictable_data,
-                    &as->arcstat_mfu_evictable_metadata);
+                    &as->arcstat_mfu_evictable_metadata,
+                    &as->arcstat_mfu_evictable_ddt);
                 arc_kstat_update_state(arc_mfu_ghost,
                     &as->arcstat_mfu_ghost_size,
                     &as->arcstat_mfu_ghost_evictable_data,
-                    &as->arcstat_mfu_ghost_evictable_metadata);
-
-                ARCSTAT(arcstat_size) = aggsum_value(&arc_size);
-                ARCSTAT(arcstat_meta_used) = aggsum_value(&arc_meta_used);
-                ARCSTAT(arcstat_data_size) = aggsum_value(&astat_data_size);
-                ARCSTAT(arcstat_metadata_size) =
-                    aggsum_value(&astat_metadata_size);
-                ARCSTAT(arcstat_hdr_size) = aggsum_value(&astat_hdr_size);
-                ARCSTAT(arcstat_other_size) = aggsum_value(&astat_other_size);
-                ARCSTAT(arcstat_l2_hdr_size) = aggsum_value(&astat_l2_hdr_size);
+                    &as->arcstat_mfu_ghost_evictable_metadata,
+                    &as->arcstat_mfu_ghost_evictable_ddt);
         }
 
         return (0);
 }
 
@@ -5958,112 +6740,79 @@
         arc_mru = &ARC_mru;
         arc_mru_ghost = &ARC_mru_ghost;
         arc_mfu = &ARC_mfu;
         arc_mfu_ghost = &ARC_mfu_ghost;
         arc_l2c_only = &ARC_l2c_only;
+        arc_buf_contents_t arcs;
 
-        arc_mru->arcs_list[ARC_BUFC_METADATA] =
+        for (arcs = ARC_BUFC_DATA; arcs < ARC_BUFC_NUMTYPES; ++arcs) {
+                arc_mru->arcs_list[arcs] =
             multilist_create(sizeof (arc_buf_hdr_t),
             offsetof(arc_buf_hdr_t, b_l1hdr.b_arc_node),
             arc_state_multilist_index_func);
-        arc_mru->arcs_list[ARC_BUFC_DATA] =
+                arc_mru_ghost->arcs_list[arcs] =
             multilist_create(sizeof (arc_buf_hdr_t),
             offsetof(arc_buf_hdr_t, b_l1hdr.b_arc_node),
             arc_state_multilist_index_func);
-        arc_mru_ghost->arcs_list[ARC_BUFC_METADATA] =
+                arc_mfu->arcs_list[arcs] =
             multilist_create(sizeof (arc_buf_hdr_t),
             offsetof(arc_buf_hdr_t, b_l1hdr.b_arc_node),
             arc_state_multilist_index_func);
-        arc_mru_ghost->arcs_list[ARC_BUFC_DATA] =
+                arc_mfu_ghost->arcs_list[arcs] =
             multilist_create(sizeof (arc_buf_hdr_t),
             offsetof(arc_buf_hdr_t, b_l1hdr.b_arc_node),
             arc_state_multilist_index_func);
-        arc_mfu->arcs_list[ARC_BUFC_METADATA] =
+                arc_l2c_only->arcs_list[arcs] =
             multilist_create(sizeof (arc_buf_hdr_t),
             offsetof(arc_buf_hdr_t, b_l1hdr.b_arc_node),
             arc_state_multilist_index_func);
-        arc_mfu->arcs_list[ARC_BUFC_DATA] =
-            multilist_create(sizeof (arc_buf_hdr_t),
-            offsetof(arc_buf_hdr_t, b_l1hdr.b_arc_node),
-            arc_state_multilist_index_func);
-        arc_mfu_ghost->arcs_list[ARC_BUFC_METADATA] =
-            multilist_create(sizeof (arc_buf_hdr_t),
-            offsetof(arc_buf_hdr_t, b_l1hdr.b_arc_node),
-            arc_state_multilist_index_func);
-        arc_mfu_ghost->arcs_list[ARC_BUFC_DATA] =
-            multilist_create(sizeof (arc_buf_hdr_t),
-            offsetof(arc_buf_hdr_t, b_l1hdr.b_arc_node),
-            arc_state_multilist_index_func);
-        arc_l2c_only->arcs_list[ARC_BUFC_METADATA] =
-            multilist_create(sizeof (arc_buf_hdr_t),
-            offsetof(arc_buf_hdr_t, b_l1hdr.b_arc_node),
-            arc_state_multilist_index_func);
-        arc_l2c_only->arcs_list[ARC_BUFC_DATA] =
-            multilist_create(sizeof (arc_buf_hdr_t),
-            offsetof(arc_buf_hdr_t, b_l1hdr.b_arc_node),
-            arc_state_multilist_index_func);
 
-        refcount_create(&arc_anon->arcs_esize[ARC_BUFC_METADATA]);
-        refcount_create(&arc_anon->arcs_esize[ARC_BUFC_DATA]);
-        refcount_create(&arc_mru->arcs_esize[ARC_BUFC_METADATA]);
-        refcount_create(&arc_mru->arcs_esize[ARC_BUFC_DATA]);
-        refcount_create(&arc_mru_ghost->arcs_esize[ARC_BUFC_METADATA]);
-        refcount_create(&arc_mru_ghost->arcs_esize[ARC_BUFC_DATA]);
-        refcount_create(&arc_mfu->arcs_esize[ARC_BUFC_METADATA]);
-        refcount_create(&arc_mfu->arcs_esize[ARC_BUFC_DATA]);
-        refcount_create(&arc_mfu_ghost->arcs_esize[ARC_BUFC_METADATA]);
-        refcount_create(&arc_mfu_ghost->arcs_esize[ARC_BUFC_DATA]);
-        refcount_create(&arc_l2c_only->arcs_esize[ARC_BUFC_METADATA]);
-        refcount_create(&arc_l2c_only->arcs_esize[ARC_BUFC_DATA]);
+                refcount_create(&arc_anon->arcs_esize[arcs]);
+                refcount_create(&arc_mru->arcs_esize[arcs]);
+                refcount_create(&arc_mru_ghost->arcs_esize[arcs]);
+                refcount_create(&arc_mfu->arcs_esize[arcs]);
+                refcount_create(&arc_mfu_ghost->arcs_esize[arcs]);
+                refcount_create(&arc_l2c_only->arcs_esize[arcs]);
+        }
 
+        arc_flush_taskq = taskq_create("arc_flush_tq",
+            max_ncpus, minclsyspri, 1, zfs_flush_ntasks, TASKQ_DYNAMIC);
+
         refcount_create(&arc_anon->arcs_size);
         refcount_create(&arc_mru->arcs_size);
         refcount_create(&arc_mru_ghost->arcs_size);
         refcount_create(&arc_mfu->arcs_size);
         refcount_create(&arc_mfu_ghost->arcs_size);
         refcount_create(&arc_l2c_only->arcs_size);
-
-        aggsum_init(&arc_meta_used, 0);
-        aggsum_init(&arc_size, 0);
-        aggsum_init(&astat_data_size, 0);
-        aggsum_init(&astat_metadata_size, 0);
-        aggsum_init(&astat_hdr_size, 0);
-        aggsum_init(&astat_other_size, 0);
-        aggsum_init(&astat_l2_hdr_size, 0);
 }
 
 static void
 arc_state_fini(void)
 {
-        refcount_destroy(&arc_anon->arcs_esize[ARC_BUFC_METADATA]);
-        refcount_destroy(&arc_anon->arcs_esize[ARC_BUFC_DATA]);
-        refcount_destroy(&arc_mru->arcs_esize[ARC_BUFC_METADATA]);
-        refcount_destroy(&arc_mru->arcs_esize[ARC_BUFC_DATA]);
-        refcount_destroy(&arc_mru_ghost->arcs_esize[ARC_BUFC_METADATA]);
-        refcount_destroy(&arc_mru_ghost->arcs_esize[ARC_BUFC_DATA]);
-        refcount_destroy(&arc_mfu->arcs_esize[ARC_BUFC_METADATA]);
-        refcount_destroy(&arc_mfu->arcs_esize[ARC_BUFC_DATA]);
-        refcount_destroy(&arc_mfu_ghost->arcs_esize[ARC_BUFC_METADATA]);
-        refcount_destroy(&arc_mfu_ghost->arcs_esize[ARC_BUFC_DATA]);
-        refcount_destroy(&arc_l2c_only->arcs_esize[ARC_BUFC_METADATA]);
-        refcount_destroy(&arc_l2c_only->arcs_esize[ARC_BUFC_DATA]);
+        arc_buf_contents_t arcs;
 
         refcount_destroy(&arc_anon->arcs_size);
         refcount_destroy(&arc_mru->arcs_size);
         refcount_destroy(&arc_mru_ghost->arcs_size);
         refcount_destroy(&arc_mfu->arcs_size);
         refcount_destroy(&arc_mfu_ghost->arcs_size);
         refcount_destroy(&arc_l2c_only->arcs_size);
 
-        multilist_destroy(arc_mru->arcs_list[ARC_BUFC_METADATA]);
-        multilist_destroy(arc_mru_ghost->arcs_list[ARC_BUFC_METADATA]);
-        multilist_destroy(arc_mfu->arcs_list[ARC_BUFC_METADATA]);
-        multilist_destroy(arc_mfu_ghost->arcs_list[ARC_BUFC_METADATA]);
-        multilist_destroy(arc_mru->arcs_list[ARC_BUFC_DATA]);
-        multilist_destroy(arc_mru_ghost->arcs_list[ARC_BUFC_DATA]);
-        multilist_destroy(arc_mfu->arcs_list[ARC_BUFC_DATA]);
-        multilist_destroy(arc_mfu_ghost->arcs_list[ARC_BUFC_DATA]);
+        for (arcs = ARC_BUFC_DATA; arcs < ARC_BUFC_NUMTYPES; ++arcs) {
+                multilist_destroy(arc_mru->arcs_list[arcs]);
+                multilist_destroy(arc_mru_ghost->arcs_list[arcs]);
+                multilist_destroy(arc_mfu->arcs_list[arcs]);
+                multilist_destroy(arc_mfu_ghost->arcs_list[arcs]);
+                multilist_destroy(arc_l2c_only->arcs_list[arcs]);
+
+                refcount_destroy(&arc_anon->arcs_esize[arcs]);
+                refcount_destroy(&arc_mru->arcs_esize[arcs]);
+                refcount_destroy(&arc_mru_ghost->arcs_esize[arcs]);
+                refcount_destroy(&arc_mfu->arcs_esize[arcs]);
+                refcount_destroy(&arc_mfu_ghost->arcs_esize[arcs]);
+                refcount_destroy(&arc_l2c_only->arcs_esize[arcs]);
+        }
 }
 
 uint64_t
 arc_max_bytes(void)
 {
@@ -6119,11 +6868,14 @@
         if (zfs_arc_min > 64 << 20 && zfs_arc_min <= arc_c_max)
                 arc_c_min = zfs_arc_min;
 
         arc_c = arc_c_max;
         arc_p = (arc_c >> 1);
+        arc_size = 0;
 
+        /* limit ddt meta-data to 1/4 of the arc capacity */
+        arc_ddt_limit = arc_c_max / 4;
         /* limit meta-data to 1/4 of the arc capacity */
         arc_meta_limit = arc_c_max / 4;
 
 #ifdef _KERNEL
         /*
@@ -6133,10 +6885,16 @@
         arc_meta_limit = MIN(arc_meta_limit,
             vmem_size(heap_arena, VMEM_ALLOC | VMEM_FREE) / 2);
 #endif
 
         /* Allow the tunable to override if it is reasonable */
+        if (zfs_arc_ddt_limit > 0 && zfs_arc_ddt_limit <= arc_c_max)
+                arc_ddt_limit = zfs_arc_ddt_limit;
+        arc_ddt_evict_threshold =
+            zfs_arc_segregate_ddt ? &arc_ddt_limit : &arc_meta_limit;
+
+        /* Allow the tunable to override if it is reasonable */
         if (zfs_arc_meta_limit > 0 && zfs_arc_meta_limit <= arc_c_max)
                 arc_meta_limit = zfs_arc_meta_limit;
 
         if (arc_c_min < arc_meta_limit / 2 && zfs_arc_min == 0)
                 arc_c_min = arc_meta_limit / 2;
@@ -6227,10 +6985,12 @@
         if (arc_ksp != NULL) {
                 kstat_delete(arc_ksp);
                 arc_ksp = NULL;
         }
 
+        taskq_destroy(arc_flush_taskq);
+
         mutex_destroy(&arc_reclaim_lock);
         cv_destroy(&arc_reclaim_thread_cv);
         cv_destroy(&arc_reclaim_waiters_cv);
 
         arc_state_fini();
@@ -6380,10 +7140,99 @@
  *      l2arc_write_size()      calculate how much to write
  *      l2arc_write_interval()  calculate sleep delay between writes
  *
  * These three functions determine what to write, how much, and how quickly
  * to send writes.
+ *
+ * L2ARC persistency:
+ *
+ * When writing buffers to L2ARC, we periodically add some metadata to
+ * make sure we can pick them up after reboot, thus dramatically reducing
+ * the impact that any downtime has on the performance of storage systems
+ * with large caches.
+ *
+ * The implementation works fairly simply by integrating the following two
+ * modifications:
+ *
+ * *) Every now and then we mix in a piece of metadata (called a log block)
+ *    into the L2ARC write. This allows us to understand what's been written,
+ *    so that we can rebuild the arc_buf_hdr_t structures of the main ARC
+ *    buffers. The log block also includes a "2-back-reference" pointer to
+ *    he second-to-previous block, forming a back-linked list of blocks on
+ *    the L2ARC device.
+ *
+ * *) We reserve SPA_MINBLOCKSIZE of space at the start of each L2ARC device
+ *    for our header bookkeeping purposes. This contains a device header,
+ *    which contains our top-level reference structures. We update it each
+ *    time we write a new log block, so that we're able to locate it in the
+ *    L2ARC device. If this write results in an inconsistent device header
+ *    (e.g. due to power failure), we detect this by verifying the header's
+ *    checksum and simply drop the entries from L2ARC.
+ *
+ * Implementation diagram:
+ *
+ * +=== L2ARC device (not to scale) ======================================+
+ * |       ___two newest log block pointers__.__________                  |
+ * |      /                                   \1 back   \latest           |
+ * |.____/_.                                   V         V                |
+ * ||L2 dev|....|lb |bufs |lb |bufs |lb |bufs |lb |bufs |lb |---(empty)---|
+ * ||   hdr|      ^         /^       /^        /         /                |
+ * |+------+  ...--\-------/  \-----/--\------/         /                 |
+ * |                \--------------/    \--------------/                  |
+ * +======================================================================+
+ *
+ * As can be seen on the diagram, rather than using a simple linked list,
+ * we use a pair of linked lists with alternating elements. This is a
+ * performance enhancement due to the fact that we only find out of the
+ * address of the next log block access once the current block has been
+ * completely read in. Obviously, this hurts performance, because we'd be
+ * keeping the device's I/O queue at only a 1 operation deep, thus
+ * incurring a large amount of I/O round-trip latency. Having two lists
+ * allows us to "prefetch" two log blocks ahead of where we are currently
+ * rebuilding L2ARC buffers.
+ *
+ * On-device data structures:
+ *
+ * L2ARC device header: l2arc_dev_hdr_phys_t
+ * L2ARC log block:     l2arc_log_blk_phys_t
+ *
+ * L2ARC reconstruction:
+ *
+ * When writing data, we simply write in the standard rotary fashion,
+ * evicting buffers as we go and simply writing new data over them (writing
+ * a new log block every now and then). This obviously means that once we
+ * loop around the end of the device, we will start cutting into an already
+ * committed log block (and its referenced data buffers), like so:
+ *
+ *    current write head__       __old tail
+ *                        \     /
+ *                        V    V
+ * <--|bufs |lb |bufs |lb |    |bufs |lb |bufs |lb |-->
+ *                         ^    ^^^^^^^^^___________________________________
+ *                         |                                                \
+ *                   <<nextwrite>> may overwrite this blk and/or its bufs --'
+ *
+ * When importing the pool, we detect this situation and use it to stop
+ * our scanning process (see l2arc_rebuild).
+ *
+ * There is one significant caveat to consider when rebuilding ARC contents
+ * from an L2ARC device: what about invalidated buffers? Given the above
+ * construction, we cannot update blocks which we've already written to amend
+ * them to remove buffers which were invalidated. Thus, during reconstruction,
+ * we might be populating the cache with buffers for data that's not on the
+ * main pool anymore, or may have been overwritten!
+ *
+ * As it turns out, this isn't a problem. Every arc_read request includes
+ * both the DVA and, crucially, the birth TXG of the BP the caller is
+ * looking for. So even if the cache were populated by completely rotten
+ * blocks for data that had been long deleted and/or overwritten, we'll
+ * never actually return bad data from the cache, since the DVA with the
+ * birth TXG uniquely identify a block in space and time - once created,
+ * a block is immutable on disk. The worst thing we have done is wasted
+ * some time and memory at l2arc rebuild to reconstruct outdated ARC
+ * entries that will get dropped from the l2arc as it is being updated
+ * with new blocks.
  */
 
 static boolean_t
 l2arc_write_eligible(uint64_t spa_guid, arc_buf_hdr_t *hdr)
 {
@@ -6445,18 +7294,24 @@
         next = MAX(now, MIN(now + interval, began + interval));
 
         return (next);
 }
 
+typedef enum l2ad_feed {
+        L2ARC_FEED_ALL = 1,
+        L2ARC_FEED_DDT_DEV,
+        L2ARC_FEED_NON_DDT_DEV,
+} l2ad_feed_t;
+
 /*
  * Cycle through L2ARC devices.  This is how L2ARC load balances.
  * If a device is returned, this also returns holding the spa config lock.
  */
 static l2arc_dev_t *
-l2arc_dev_get_next(void)
+l2arc_dev_get_next(l2ad_feed_t feed_type)
 {
-        l2arc_dev_t *first, *next = NULL;
+        l2arc_dev_t *start = NULL, *next = NULL;
 
         /*
          * Lock out the removal of spas (spa_namespace_lock), then removal
          * of cache devices (l2arc_dev_mtx).  Once a device has been selected,
          * both locks will be dropped and a spa config lock held instead.
@@ -6466,34 +7321,59 @@
 
         /* if there are no vdevs, there is nothing to do */
         if (l2arc_ndev == 0)
                 goto out;
 
-        first = NULL;
+        if (feed_type == L2ARC_FEED_DDT_DEV)
+                next = l2arc_ddt_dev_last;
+        else
         next = l2arc_dev_last;
-        do {
-                /* loop around the list looking for a non-faulted vdev */
-                if (next == NULL) {
+
+        /* figure out what the next device we look at should be */
+        if (next == NULL)
                         next = list_head(l2arc_dev_list);
-                } else {
+        else if (list_next(l2arc_dev_list, next) == NULL)
+                next = list_head(l2arc_dev_list);
+        else
                         next = list_next(l2arc_dev_list, next);
-                        if (next == NULL)
+        ASSERT(next);
+
+        /* loop through L2ARC devs looking for the one we need */
+        /* LINTED(E_CONSTANT_CONDITION) */
+        while (1) {
+                if (next == NULL) /* reached list end, start from beginning */
                                 next = list_head(l2arc_dev_list);
-                }
 
-                /* if we have come back to the start, bail out */
-                if (first == NULL)
-                        first = next;
-                else if (next == first)
+                if (start == NULL) { /* save starting dev */
+                        start = next;
+                } else if (start == next) { /* full loop completed - stop now */
+                        next = NULL;
+                        if (feed_type == L2ARC_FEED_DDT_DEV) {
+                                l2arc_ddt_dev_last = NULL;
+                                goto out;
+                        } else {
                         break;
+                        }
+                }
 
-        } while (vdev_is_dead(next->l2ad_vdev));
-
-        /* if we were unable to find any usable vdevs, return NULL */
-        if (vdev_is_dead(next->l2ad_vdev))
-                next = NULL;
-
+                if (!vdev_is_dead(next->l2ad_vdev) && !next->l2ad_rebuild) {
+                        if (feed_type == L2ARC_FEED_DDT_DEV) {
+                                if (vdev_type_is_ddt(next->l2ad_vdev)) {
+                                        l2arc_ddt_dev_last = next;
+                                        goto out;
+                                }
+                        } else if (feed_type == L2ARC_FEED_NON_DDT_DEV) {
+                                if (!vdev_type_is_ddt(next->l2ad_vdev)) {
+                                        break;
+                                }
+                        } else {
+                                ASSERT(feed_type == L2ARC_FEED_ALL);
+                                break;
+                        }
+                }
+                next = list_next(l2arc_dev_list, next);
+        }
         l2arc_dev_last = next;
 
 out:
         mutex_exit(&l2arc_dev_mtx);
 
@@ -6542,10 +7422,11 @@
         l2arc_dev_t *dev;
         list_t *buflist;
         arc_buf_hdr_t *head, *hdr, *hdr_prev;
         kmutex_t *hash_lock;
         int64_t bytes_dropped = 0;
+        l2arc_log_blk_buf_t *lb_buf;
 
         cb = zio->io_private;
         ASSERT3P(cb, !=, NULL);
         dev = cb->l2wcb_dev;
         ASSERT3P(dev, !=, NULL);
@@ -6638,14 +7519,18 @@
         list_remove(buflist, head);
         ASSERT(!HDR_HAS_L1HDR(head));
         kmem_cache_free(hdr_l2only_cache, head);
         mutex_exit(&dev->l2ad_mtx);
 
+        ASSERT(dev->l2ad_vdev != NULL);
         vdev_space_update(dev->l2ad_vdev, -bytes_dropped, 0, 0);
 
         l2arc_do_free_on_write();
 
+        while ((lb_buf = list_remove_tail(&cb->l2wcb_log_blk_buflist)) != NULL)
+                kmem_free(lb_buf, sizeof (*lb_buf));
+        list_destroy(&cb->l2wcb_log_blk_buflist);
         kmem_free(cb, sizeof (l2arc_write_callback_t));
 }
 
 /*
  * A read to a cache device completed.  Validate buffer contents before
@@ -6748,37 +7633,44 @@
         kmem_free(cb, sizeof (l2arc_read_callback_t));
 }
 
 /*
  * This is the list priority from which the L2ARC will search for pages to
- * cache.  This is used within loops (0..3) to cycle through lists in the
+ * cache.  This is used within loops to cycle through lists in the
  * desired order.  This order can have a significant effect on cache
  * performance.
  *
- * Currently the metadata lists are hit first, MFU then MRU, followed by
- * the data lists.  This function returns a locked list, and also returns
- * the lock pointer.
+ * Currently the ddt lists are hit first (MFU then MRU),
+ * followed by metadata then by the data lists.
+ * This function returns a locked list, and also returns the lock pointer.
  */
 static multilist_sublist_t *
-l2arc_sublist_lock(int list_num)
+l2arc_sublist_lock(enum l2arc_priorities prio)
 {
         multilist_t *ml = NULL;
         unsigned int idx;
 
-        ASSERT(list_num >= 0 && list_num <= 3);
+        ASSERT(prio >= PRIORITY_MFU_DDT);
+        ASSERT(prio < PRIORITY_NUMTYPES);
 
-        switch (list_num) {
-        case 0:
+        switch (prio) {
+        case PRIORITY_MFU_DDT:
+                ml = arc_mfu->arcs_list[ARC_BUFC_DDT];
+                break;
+        case PRIORITY_MRU_DDT:
+                ml = arc_mru->arcs_list[ARC_BUFC_DDT];
+                break;
+        case PRIORITY_MFU_META:
                 ml = arc_mfu->arcs_list[ARC_BUFC_METADATA];
                 break;
-        case 1:
+        case PRIORITY_MRU_META:
                 ml = arc_mru->arcs_list[ARC_BUFC_METADATA];
                 break;
-        case 2:
+        case PRIORITY_MFU_DATA:
                 ml = arc_mfu->arcs_list[ARC_BUFC_DATA];
                 break;
-        case 3:
+        case PRIORITY_MRU_DATA:
                 ml = arc_mru->arcs_list[ARC_BUFC_DATA];
                 break;
         }
 
         /*
@@ -6790,17 +7682,30 @@
         idx = multilist_get_random_index(ml);
         return (multilist_sublist_lock(ml, idx));
 }
 
 /*
+ * Calculates the maximum overhead of L2ARC metadata log blocks for a given
+ * L2ARC write size. l2arc_evict and l2arc_write_buffers need to include this
+ * overhead in processing to make sure there is enough headroom available
+ * when writing buffers.
+ */
+static inline uint64_t
+l2arc_log_blk_overhead(uint64_t write_sz)
+{
+        return ((write_sz / SPA_MINBLOCKSIZE / L2ARC_LOG_BLK_ENTRIES) + 1) *
+            L2ARC_LOG_BLK_SIZE;
+}
+
+/*
  * Evict buffers from the device write hand to the distance specified in
  * bytes.  This distance may span populated buffers, it may span nothing.
  * This is clearing a region on the L2ARC device ready for writing.
  * If the 'all' boolean is set, every buffer is evicted.
  */
 static void
-l2arc_evict(l2arc_dev_t *dev, uint64_t distance, boolean_t all)
+l2arc_evict_impl(l2arc_dev_t *dev, uint64_t distance, boolean_t all)
 {
         list_t *buflist;
         arc_buf_hdr_t *hdr, *hdr_prev;
         kmutex_t *hash_lock;
         uint64_t taddr;
@@ -6813,10 +7718,14 @@
                  * nothing to evict.
                  */
                 return;
         }
 
+        /*
+         * We need to add in the worst case scenario of log block overhead.
+         */
+        distance += l2arc_log_blk_overhead(distance);
         if (dev->l2ad_hand >= (dev->l2ad_end - (2 * distance))) {
                 /*
                  * When nearing the end of the device, evict to the end
                  * before the device write hand jumps to the start.
                  */
@@ -6896,11 +7805,77 @@
                 mutex_exit(hash_lock);
         }
         mutex_exit(&dev->l2ad_mtx);
 }
 
+static void
+l2arc_evict_task(void *arg)
+{
+        l2arc_dev_t *dev = arg;
+        ASSERT(dev);
+
+        /*
+         * Evict l2arc buffers asynchronously; we need to keep the device
+         * around until we are sure there aren't any buffers referencing it.
+         * We do not need to hold any config locks, etc. because at this point,
+         * we are the only ones who knows about this device (the in-core
+         * structure), so no new buffers can be created (e.g. if the pool is
+         * re-imported while the asynchronous eviction is in progress) that
+         * reference this same in-core structure. Also remove the vdev link
+         * since further use of it as l2arc device is prohibited.
+         */
+        dev->l2ad_vdev = NULL;
+        l2arc_evict_impl(dev, 0LL, B_TRUE);
+
+        /* Same cleanup as in the synchronous path */
+        list_destroy(&dev->l2ad_buflist);
+        mutex_destroy(&dev->l2ad_mtx);
+        refcount_destroy(&dev->l2ad_alloc);
+        kmem_free(dev->l2ad_dev_hdr, dev->l2ad_dev_hdr_asize);
+        kmem_free(dev, sizeof (l2arc_dev_t));
+}
+
+boolean_t zfs_l2arc_async_evict = B_TRUE;
+
 /*
+ * Perform l2arc eviction for buffers associated with this device
+ * If evicting all buffers (done at pool export time), try to evict
+ * asynchronously, and fall back to synchronous eviction in case of error
+ * Tell the caller whether to cleanup the device:
+ *  - B_TRUE means "asynchronous eviction, do not cleanup"
+ *  - B_FALSE means "synchronous eviction, done, please cleanup"
+ */
+static boolean_t
+l2arc_evict(l2arc_dev_t *dev, uint64_t distance, boolean_t all)
+{
+        /*
+         *  If we are evicting all the buffers for this device, which happens
+         *  at pool export time, schedule asynchronous task
+         */
+        if (all && zfs_l2arc_async_evict) {
+                if ((taskq_dispatch(arc_flush_taskq, l2arc_evict_task,
+                    dev, TQ_NOSLEEP) == NULL)) {
+                        /*
+                         * Failed to dispatch asynchronous task
+                         * cleanup, evict synchronously
+                         */
+                        l2arc_evict_impl(dev, distance, all);
+                } else {
+                        /*
+                         * Successful dispatch, vdev space updated
+                         */
+                        return (B_TRUE);
+                }
+        } else {
+                /* Evict synchronously */
+                l2arc_evict_impl(dev, distance, all);
+        }
+
+        return (B_FALSE);
+}
+
+/*
  * Find and write ARC buffers to the L2ARC device.
  *
  * An ARC_FLAG_L2_WRITING flag is set so that the L2ARC buffers are not valid
  * for reading until they have completed writing.
  * The headroom_boost is an in-out parameter used to maintain headroom boost
@@ -6908,31 +7883,48 @@
  *
  * Returns the number of bytes actually written (which may be smaller than
  * the delta by which the device hand has changed due to alignment).
  */
 static uint64_t
-l2arc_write_buffers(spa_t *spa, l2arc_dev_t *dev, uint64_t target_sz)
+l2arc_write_buffers(spa_t *spa, l2arc_dev_t *dev, uint64_t target_sz,
+    l2ad_feed_t feed_type)
 {
         arc_buf_hdr_t *hdr, *hdr_prev, *head;
+        /*
+         * We must carefully track the space we deal with here:
+         * - write_size: sum of the size of all buffers to be written
+         *      without compression or inter-buffer alignment applied.
+         *      This size is added to arcstat_l2_size, because subsequent
+         *      eviction of buffers decrements this kstat by only the
+         *      buffer's b_lsize (which doesn't take alignment into account).
+         * - write_asize: sum of the size of all buffers to be written
+         *      with inter-buffer alignment applied.
+         *      This size is used to estimate the maximum number of bytes
+         *      we could take up on the device and is thus used to gauge how
+         *      close we are to hitting target_sz.
+         */
         uint64_t write_asize, write_psize, write_lsize, headroom;
         boolean_t full;
         l2arc_write_callback_t *cb;
         zio_t *pio, *wzio;
+        enum l2arc_priorities try;
         uint64_t guid = spa_load_guid(spa);
+        boolean_t dev_hdr_update = B_FALSE;
 
         ASSERT3P(dev->l2ad_vdev, !=, NULL);
 
         pio = NULL;
+        cb = NULL;
         write_lsize = write_asize = write_psize = 0;
         full = B_FALSE;
         head = kmem_cache_alloc(hdr_l2only_cache, KM_PUSHPAGE);
         arc_hdr_set_flags(head, ARC_FLAG_L2_WRITE_HEAD | ARC_FLAG_HAS_L2HDR);
 
         /*
          * Copy buffers for L2ARC writing.
          */
-        for (int try = 0; try <= 3; try++) {
+        for (try = PRIORITY_MFU_DDT; try < PRIORITY_NUMTYPES; try++) {
                 multilist_sublist_t *mls = l2arc_sublist_lock(try);
                 uint64_t passed_sz = 0;
 
                 /*
                  * L2ARC fast warmup.
@@ -6998,10 +7990,19 @@
                                 full = B_TRUE;
                                 mutex_exit(hash_lock);
                                 break;
                         }
 
+                        /* make sure buf we select corresponds to feed_type */
+                        if ((feed_type == L2ARC_FEED_DDT_DEV &&
+                            arc_buf_type(hdr) != ARC_BUFC_DDT) ||
+                            (feed_type == L2ARC_FEED_NON_DDT_DEV &&
+                            arc_buf_type(hdr) == ARC_BUFC_DDT)) {
+                                        mutex_exit(hash_lock);
+                                        continue;
+                        }
+
                         if (pio == NULL) {
                                 /*
                                  * Insert a dummy header on the buflist so
                                  * l2arc_write_done() can find where the
                                  * write buffers begin without searching.
@@ -7008,14 +8009,17 @@
                                  */
                                 mutex_enter(&dev->l2ad_mtx);
                                 list_insert_head(&dev->l2ad_buflist, head);
                                 mutex_exit(&dev->l2ad_mtx);
 
-                                cb = kmem_alloc(
+                                cb = kmem_zalloc(
                                     sizeof (l2arc_write_callback_t), KM_SLEEP);
                                 cb->l2wcb_dev = dev;
                                 cb->l2wcb_head = head;
+                                list_create(&cb->l2wcb_log_blk_buflist,
+                                    sizeof (l2arc_log_blk_buf_t),
+                                    offsetof(l2arc_log_blk_buf_t, lbb_node));
                                 pio = zio_root(spa, l2arc_write_done, cb,
                                     ZIO_FLAG_CANFAIL);
                         }
 
                         hdr->b_l2hdr.b_dev = dev;
@@ -7046,11 +8050,11 @@
                         abd_t *to_write;
                         if (!HDR_SHARED_DATA(hdr) && psize == asize) {
                                 to_write = hdr->b_l1hdr.b_pabd;
                         } else {
                                 to_write = abd_alloc_for_io(asize,
-                                    HDR_ISTYPE_METADATA(hdr));
+                                    !HDR_ISTYPE_DATA(hdr));
                                 abd_copy(to_write, hdr->b_l1hdr.b_pabd, psize);
                                 if (asize != psize) {
                                         abd_zero_off(to_write, psize,
                                             asize - psize);
                                 }
@@ -7072,11 +8076,20 @@
                         dev->l2ad_hand += asize;
 
                         mutex_exit(hash_lock);
 
                         (void) zio_nowait(wzio);
+
+                        /*
+                         * Append buf info to current log and commit if full.
+                         * arcstat_l2_{size,asize} kstats are updated internally.
+                         */
+                        if (l2arc_log_blk_insert(dev, hdr)) {
+                                l2arc_log_blk_commit(dev, pio, cb);
+                                dev_hdr_update = B_TRUE;
                 }
+                }
 
                 multilist_sublist_unlock(mls);
 
                 if (full == B_TRUE)
                         break;
@@ -7088,22 +8101,32 @@
                 ASSERT(!HDR_HAS_L1HDR(head));
                 kmem_cache_free(hdr_l2only_cache, head);
                 return (0);
         }
 
+        /*
+         * If we wrote any logs as part of this write, update dev hdr
+         * to point to it.
+         */
+        if (dev_hdr_update)
+                l2arc_dev_hdr_update(dev, pio);
+
         ASSERT3U(write_asize, <=, target_sz);
         ARCSTAT_BUMP(arcstat_l2_writes_sent);
         ARCSTAT_INCR(arcstat_l2_write_bytes, write_psize);
+        if (feed_type == L2ARC_FEED_DDT_DEV)
+                ARCSTAT_INCR(arcstat_l2_ddt_write_bytes, write_psize);
         ARCSTAT_INCR(arcstat_l2_lsize, write_lsize);
         ARCSTAT_INCR(arcstat_l2_psize, write_psize);
         vdev_space_update(dev->l2ad_vdev, write_psize, 0, 0);
 
         /*
          * Bump device hand to the device start if it is approaching the end.
          * l2arc_evict() will already have evicted ahead for this case.
          */
-        if (dev->l2ad_hand >= (dev->l2ad_end - target_sz)) {
+        if (dev->l2ad_hand + target_sz + l2arc_log_blk_overhead(target_sz) >=
+            dev->l2ad_end) {
                 dev->l2ad_hand = dev->l2ad_start;
                 dev->l2ad_first = B_FALSE;
         }
 
         dev->l2ad_writing = B_TRUE;
@@ -7111,23 +8134,72 @@
         dev->l2ad_writing = B_FALSE;
 
         return (write_asize);
 }
 
+static boolean_t
+l2arc_feed_dev(l2ad_feed_t feed_type, uint64_t *wrote)
+{
+        spa_t *spa;
+        l2arc_dev_t *dev;
+        uint64_t size;
+
+        /*
+         * This selects the next l2arc device to write to, and in
+         * doing so the next spa to feed from: dev->l2ad_spa.   This
+         * will return NULL if there are now no l2arc devices or if
+         * they are all faulted.
+         *
+         * If a device is returned, its spa's config lock is also
+         * held to prevent device removal.  l2arc_dev_get_next()
+         * will grab and release l2arc_dev_mtx.
+         */
+        if ((dev = l2arc_dev_get_next(feed_type)) == NULL)
+                return (B_FALSE);
+
+        spa = dev->l2ad_spa;
+        ASSERT(spa != NULL);
+
+        /*
+         * If the pool is read-only - skip it
+         */
+        if (!spa_writeable(spa)) {
+                spa_config_exit(spa, SCL_L2ARC, dev);
+                return (B_FALSE);
+        }
+
+        ARCSTAT_BUMP(arcstat_l2_feeds);
+        size = l2arc_write_size();
+
+        /*
+         * Evict L2ARC buffers that will be overwritten.
+         * B_FALSE guarantees synchronous eviction.
+         */
+        (void) l2arc_evict(dev, size, B_FALSE);
+
+        /*
+         * Write ARC buffers.
+         */
+        *wrote = l2arc_write_buffers(spa, dev, size, feed_type);
+
+        spa_config_exit(spa, SCL_L2ARC, dev);
+
+        return (B_TRUE);
+}
+
 /*
  * This thread feeds the L2ARC at regular intervals.  This is the beating
  * heart of the L2ARC.
  */
 /* ARGSUSED */
 static void
 l2arc_feed_thread(void *unused)
 {
         callb_cpr_t cpr;
-        l2arc_dev_t *dev;
-        spa_t *spa;
-        uint64_t size, wrote;
+        uint64_t size, total_written = 0;
         clock_t begin, next = ddi_get_lbolt();
+        l2ad_feed_t feed_type = L2ARC_FEED_ALL;
 
         CALLB_CPR_INIT(&cpr, &l2arc_feed_thr_lock, callb_generic_cpr, FTAG);
 
         mutex_enter(&l2arc_feed_thr_lock);
 
@@ -7148,63 +8220,37 @@
                 }
                 mutex_exit(&l2arc_dev_mtx);
                 begin = ddi_get_lbolt();
 
                 /*
-                 * This selects the next l2arc device to write to, and in
-                 * doing so the next spa to feed from: dev->l2ad_spa.   This
-                 * will return NULL if there are now no l2arc devices or if
-                 * they are all faulted.
-                 *
-                 * If a device is returned, its spa's config lock is also
-                 * held to prevent device removal.  l2arc_dev_get_next()
-                 * will grab and release l2arc_dev_mtx.
-                 */
-                if ((dev = l2arc_dev_get_next()) == NULL)
-                        continue;
-
-                spa = dev->l2ad_spa;
-                ASSERT3P(spa, !=, NULL);
-
-                /*
-                 * If the pool is read-only then force the feed thread to
-                 * sleep a little longer.
-                 */
-                if (!spa_writeable(spa)) {
-                        next = ddi_get_lbolt() + 5 * l2arc_feed_secs * hz;
-                        spa_config_exit(spa, SCL_L2ARC, dev);
-                        continue;
-                }
-
-                /*
                  * Avoid contributing to memory pressure.
                  */
                 if (arc_reclaim_needed()) {
                         ARCSTAT_BUMP(arcstat_l2_abort_lowmem);
-                        spa_config_exit(spa, SCL_L2ARC, dev);
                         continue;
                 }
 
-                ARCSTAT_BUMP(arcstat_l2_feeds);
+                /* try to write to DDT L2ARC device if any */
+                if (l2arc_feed_dev(L2ARC_FEED_DDT_DEV, &size)) {
+                        total_written += size;
+                        feed_type = L2ARC_FEED_NON_DDT_DEV;
+                }
 
-                size = l2arc_write_size();
+                /* try to write to the regular L2ARC device if any */
+                if (l2arc_feed_dev(feed_type, &size)) {
+                        total_written += size;
+                        if (feed_type == L2ARC_FEED_NON_DDT_DEV)
+                                total_written /= 2; /* avg written per device */
+                }
 
                 /*
-                 * Evict L2ARC buffers that will be overwritten.
-                 */
-                l2arc_evict(dev, size, B_FALSE);
-
-                /*
-                 * Write ARC buffers.
-                 */
-                wrote = l2arc_write_buffers(spa, dev, size);
-
-                /*
                  * Calculate interval between writes.
                  */
-                next = l2arc_write_interval(begin, size, wrote);
-                spa_config_exit(spa, SCL_L2ARC, dev);
+                next = l2arc_write_interval(begin, l2arc_write_size(),
+                    total_written);
+
+                total_written = 0;
         }
 
         l2arc_thread_exit = 0;
         cv_broadcast(&l2arc_feed_thr_cv);
         CALLB_CPR_EXIT(&cpr);           /* drops l2arc_feed_thr_lock */
@@ -7212,29 +8258,43 @@
 }
 
 boolean_t
 l2arc_vdev_present(vdev_t *vd)
 {
+        return (l2arc_vdev_get(vd) != NULL);
+}
+
+/*
+ * Returns the l2arc_dev_t associated with a particular vdev_t or NULL if
+ * the vdev_t isn't an L2ARC device.
+ */
+static l2arc_dev_t *
+l2arc_vdev_get(vdev_t *vd)
+{
         l2arc_dev_t *dev;
+        boolean_t       held = MUTEX_HELD(&l2arc_dev_mtx);
 
+        if (!held)
         mutex_enter(&l2arc_dev_mtx);
         for (dev = list_head(l2arc_dev_list); dev != NULL;
             dev = list_next(l2arc_dev_list, dev)) {
                 if (dev->l2ad_vdev == vd)
                         break;
         }
+        if (!held)
         mutex_exit(&l2arc_dev_mtx);
 
-        return (dev != NULL);
+        return (dev);
 }
 
 /*
  * Add a vdev for use by the L2ARC.  By this point the spa has already
- * validated the vdev and opened it.
+ * validated the vdev and opened it. The `rebuild' flag indicates whether
+ * we should attempt an L2ARC persistency rebuild.
  */
 void
-l2arc_add_vdev(spa_t *spa, vdev_t *vd)
+l2arc_add_vdev(spa_t *spa, vdev_t *vd, boolean_t rebuild)
 {
         l2arc_dev_t *adddev;
 
         ASSERT(!l2arc_vdev_present(vd));
 
@@ -7242,15 +8302,21 @@
          * Create a new l2arc device entry.
          */
         adddev = kmem_zalloc(sizeof (l2arc_dev_t), KM_SLEEP);
         adddev->l2ad_spa = spa;
         adddev->l2ad_vdev = vd;
-        adddev->l2ad_start = VDEV_LABEL_START_SIZE;
+        /* leave extra size for an l2arc device header */
+        adddev->l2ad_dev_hdr_asize = MAX(sizeof (*adddev->l2ad_dev_hdr),
+            1 << vd->vdev_ashift);
+        adddev->l2ad_start = VDEV_LABEL_START_SIZE + adddev->l2ad_dev_hdr_asize;
         adddev->l2ad_end = VDEV_LABEL_START_SIZE + vdev_get_min_asize(vd);
+        ASSERT3U(adddev->l2ad_start, <, adddev->l2ad_end);
         adddev->l2ad_hand = adddev->l2ad_start;
         adddev->l2ad_first = B_TRUE;
         adddev->l2ad_writing = B_FALSE;
+        adddev->l2ad_dev_hdr = kmem_zalloc(adddev->l2ad_dev_hdr_asize,
+            KM_SLEEP);
 
         mutex_init(&adddev->l2ad_mtx, NULL, MUTEX_DEFAULT, NULL);
         /*
          * This is a list of all ARC buffers that are still valid on the
          * device.
@@ -7265,10 +8331,20 @@
          * Add device to global list
          */
         mutex_enter(&l2arc_dev_mtx);
         list_insert_head(l2arc_dev_list, adddev);
         atomic_inc_64(&l2arc_ndev);
+        if (rebuild && l2arc_rebuild_enabled &&
+            adddev->l2ad_end - adddev->l2ad_start > L2ARC_PERSIST_MIN_SIZE) {
+                /*
+                 * Just mark the device as pending for a rebuild. We won't
+                 * be starting a rebuild in line here as it would block pool
+                 * import. Instead spa_load_impl will hand that off to an
+                 * async task which will call l2arc_spa_rebuild_start.
+                 */
+                adddev->l2ad_rebuild = B_TRUE;
+        }
         mutex_exit(&l2arc_dev_mtx);
 }
 
 /*
  * Remove a vdev from the L2ARC.
@@ -7290,25 +8366,50 @@
                 }
         }
         ASSERT3P(remdev, !=, NULL);
 
         /*
+         * Cancel any ongoing or scheduled rebuild (race protection with
+         * l2arc_spa_rebuild_start provided via l2arc_dev_mtx).
+         */
+        remdev->l2ad_rebuild_cancel = B_TRUE;
+        if (remdev->l2ad_rebuild_did != 0) {
+                /*
+                 * N.B. it should be safe to thread_join with the rebuild
+                 * thread while holding l2arc_dev_mtx because it is not
+                 * accessed from anywhere in the l2arc rebuild code below
+                 * (except for l2arc_spa_rebuild_start, which is ok).
+                 */
+                thread_join(remdev->l2ad_rebuild_did);
+        }
+
+        /*
          * Remove device from global list
          */
         list_remove(l2arc_dev_list, remdev);
         l2arc_dev_last = NULL;          /* may have been invalidated */
+        l2arc_ddt_dev_last = NULL;      /* may have been invalidated */
         atomic_dec_64(&l2arc_ndev);
         mutex_exit(&l2arc_dev_mtx);
 
+        if (vdev_type_is_ddt(remdev->l2ad_vdev))
+                atomic_add_64(&remdev->l2ad_spa->spa_l2arc_ddt_devs_size,
+                    -(vdev_get_min_asize(remdev->l2ad_vdev)));
+
         /*
          * Clear all buflists and ARC references.  L2ARC device flush.
          */
-        l2arc_evict(remdev, 0, B_TRUE);
+        if (l2arc_evict(remdev, 0, B_TRUE) == B_FALSE) {
+                /*
+                 * The eviction was done synchronously, cleanup here
+                 * Otherwise, the asynchronous task will cleanup
+                 */
         list_destroy(&remdev->l2ad_buflist);
         mutex_destroy(&remdev->l2ad_mtx);
-        refcount_destroy(&remdev->l2ad_alloc);
+                kmem_free(remdev->l2ad_dev_hdr, remdev->l2ad_dev_hdr_asize);
         kmem_free(remdev, sizeof (l2arc_dev_t));
+        }
 }
 
 void
 l2arc_init(void)
 {
@@ -7370,6 +8471,784 @@
         cv_signal(&l2arc_feed_thr_cv);  /* kick thread out of startup */
         l2arc_thread_exit = 1;
         while (l2arc_thread_exit != 0)
                 cv_wait(&l2arc_feed_thr_cv, &l2arc_feed_thr_lock);
         mutex_exit(&l2arc_feed_thr_lock);
+}
+
+/*
+ * Punches out rebuild threads for the L2ARC devices in a spa. This should
+ * be called after pool import from the spa async thread, since starting
+ * these threads directly from spa_import() will make them part of the
+ * "zpool import" context and delay process exit (and thus pool import).
+ */
+void
+l2arc_spa_rebuild_start(spa_t *spa)
+{
+        /*
+         * Locate the spa's l2arc devices and kick off rebuild threads.
+         */
+        mutex_enter(&l2arc_dev_mtx);
+        for (int i = 0; i < spa->spa_l2cache.sav_count; i++) {
+                l2arc_dev_t *dev =
+                    l2arc_vdev_get(spa->spa_l2cache.sav_vdevs[i]);
+                if (dev == NULL) {
+                        /* Don't attempt a rebuild if the vdev is UNAVAIL */
+                        continue;
+                }
+                if (dev->l2ad_rebuild && !dev->l2ad_rebuild_cancel) {
+                        VERIFY3U(dev->l2ad_rebuild_did, ==, 0);
+#ifdef  _KERNEL
+                        dev->l2ad_rebuild_did = thread_create(NULL, 0,
+                            l2arc_dev_rebuild_start, dev, 0, &p0, TS_RUN,
+                            minclsyspri)->t_did;
+#endif
+                }
+        }
+        mutex_exit(&l2arc_dev_mtx);
+}
+
+/*
+ * Main entry point for L2ARC rebuilding.
+ */
+static void
+l2arc_dev_rebuild_start(l2arc_dev_t *dev)
+{
+        if (!dev->l2ad_rebuild_cancel) {
+                VERIFY(dev->l2ad_rebuild);
+                (void) l2arc_rebuild(dev);
+                dev->l2ad_rebuild = B_FALSE;
+        }
+}
+
+/*
+ * This function implements the actual L2ARC metadata rebuild. It:
+ *
+ * 1) reads the device's header
+ * 2) if a good device header is found, starts reading the log block chain
+ * 3) restores each block's contents to memory (reconstructing arc_buf_hdr_t's)
+ *
+ * Operation stops under any of the following conditions:
+ *
+ * 1) We reach the end of the log blk chain (the back-reference in the blk is
+ *    invalid or loops over our starting point).
+ * 2) We encounter *any* error condition (cksum errors, io errors, looped
+ *    blocks, etc.).
+ */
+static int
+l2arc_rebuild(l2arc_dev_t *dev)
+{
+        vdev_t                  *vd = dev->l2ad_vdev;
+        spa_t                   *spa = vd->vdev_spa;
+        int                     err;
+        l2arc_log_blk_phys_t    *this_lb, *next_lb;
+        uint8_t                 *this_lb_buf, *next_lb_buf;
+        zio_t                   *this_io = NULL, *next_io = NULL;
+        l2arc_log_blkptr_t      lb_ptrs[2];
+        boolean_t               first_pass, lock_held;
+        uint64_t                load_guid;
+
+        this_lb = kmem_zalloc(sizeof (*this_lb), KM_SLEEP);
+        next_lb = kmem_zalloc(sizeof (*next_lb), KM_SLEEP);
+        this_lb_buf = kmem_zalloc(sizeof (l2arc_log_blk_phys_t), KM_SLEEP);
+        next_lb_buf = kmem_zalloc(sizeof (l2arc_log_blk_phys_t), KM_SLEEP);
+
+        /*
+         * We prevent device removal while issuing reads to the device,
+         * then during the rebuilding phases we drop this lock again so
+         * that a spa_unload or device remove can be initiated - this is
+         * safe, because the spa will signal us to stop before removing
+         * our device and wait for us to stop.
+         */
+        spa_config_enter(spa, SCL_L2ARC, vd, RW_READER);
+        lock_held = B_TRUE;
+
+        load_guid = spa_load_guid(dev->l2ad_vdev->vdev_spa);
+        /*
+         * Device header processing phase.
+         */
+        if ((err = l2arc_dev_hdr_read(dev)) != 0) {
+                /* device header corrupted, start a new one */
+                bzero(dev->l2ad_dev_hdr, dev->l2ad_dev_hdr_asize);
+                goto out;
+        }
+
+        /* Retrieve the persistent L2ARC device state */
+        dev->l2ad_hand = vdev_psize_to_asize(dev->l2ad_vdev,
+            dev->l2ad_dev_hdr->dh_start_lbps[0].lbp_daddr +
+            LBP_GET_PSIZE(&dev->l2ad_dev_hdr->dh_start_lbps[0]));
+        dev->l2ad_first = !!(dev->l2ad_dev_hdr->dh_flags &
+            L2ARC_DEV_HDR_EVICT_FIRST);
+
+        /* Prepare the rebuild processing state */
+        bcopy(dev->l2ad_dev_hdr->dh_start_lbps, lb_ptrs, sizeof (lb_ptrs));
+        first_pass = B_TRUE;
+
+        /* Start the rebuild process */
+        for (;;) {
+                if (!l2arc_log_blkptr_valid(dev, &lb_ptrs[0]))
+                        /* We hit an invalid block address, end the rebuild. */
+                        break;
+
+                if ((err = l2arc_log_blk_read(dev, &lb_ptrs[0], &lb_ptrs[1],
+                    this_lb, next_lb, this_lb_buf, next_lb_buf,
+                    this_io, &next_io)) != 0)
+                        break;
+
+                spa_config_exit(spa, SCL_L2ARC, vd);
+                lock_held = B_FALSE;
+
+                /* Protection against infinite loops of log blocks. */
+                if (l2arc_range_check_overlap(lb_ptrs[1].lbp_daddr,
+                    lb_ptrs[0].lbp_daddr,
+                    dev->l2ad_dev_hdr->dh_start_lbps[0].lbp_daddr) &&
+                    !first_pass) {
+                        ARCSTAT_BUMP(arcstat_l2_rebuild_abort_loop_errors);
+                        err = SET_ERROR(ELOOP);
+                        break;
+                }
+
+                /*
+                 * Our memory pressure valve. If the system is running low
+                 * on memory, rather than swamping memory with new ARC buf
+                 * hdrs, we opt not to rebuild the L2ARC. At this point,
+                 * however, we have already set up our L2ARC dev to chain in
+                 * new metadata log blk, so the user may choose to re-add the
+                 * L2ARC dev at a later time to reconstruct it (when there's
+                 * less memory pressure).
+                 */
+                if (arc_reclaim_needed()) {
+                        ARCSTAT_BUMP(arcstat_l2_rebuild_abort_lowmem);
+                        cmn_err(CE_NOTE, "System running low on memory, "
+                            "aborting L2ARC rebuild.");
+                        err = SET_ERROR(ENOMEM);
+                        break;
+                }
+
+                /*
+                 * Now that we know that the next_lb checks out alright, we
+                 * can start reconstruction from this lb - we can be sure
+                 * that the L2ARC write hand has not yet reached any of our
+                 * buffers.
+                 */
+                l2arc_log_blk_restore(dev, load_guid, this_lb,
+                    LBP_GET_PSIZE(&lb_ptrs[0]));
+
+                /*
+                 * End of list detection. We can look ahead two steps in the
+                 * blk chain and if the 2nd blk from this_lb dips below the
+                 * initial chain starting point, then we know two things:
+                 *      1) it can't be valid, and
+                 *      2) the next_lb's ARC entries might have already been
+                 *      partially overwritten and so we should stop before
+                 *      we restore it
+                 */
+                if (l2arc_range_check_overlap(
+                    this_lb->lb_back2_lbp.lbp_daddr, lb_ptrs[0].lbp_daddr,
+                    dev->l2ad_dev_hdr->dh_start_lbps[0].lbp_daddr) &&
+                    !first_pass)
+                        break;
+
+                /* log blk restored, continue with next one in the list */
+                lb_ptrs[0] = lb_ptrs[1];
+                lb_ptrs[1] = this_lb->lb_back2_lbp;
+                PTR_SWAP(this_lb, next_lb);
+                PTR_SWAP(this_lb_buf, next_lb_buf);
+                this_io = next_io;
+                next_io = NULL;
+                first_pass = B_FALSE;
+
+                for (;;) {
+                        if (dev->l2ad_rebuild_cancel) {
+                                err = SET_ERROR(ECANCELED);
+                                goto out;
+                        }
+                        if (spa_config_tryenter(spa, SCL_L2ARC, vd,
+                            RW_READER)) {
+                                lock_held = B_TRUE;
+                                break;
+                        }
+                        /*
+                         * L2ARC config lock held by somebody in writer,
+                         * possibly due to them trying to remove us. They'll
+                         * likely to want us to shut down, so after a little
+                         * delay, we check l2ad_rebuild_cancel and retry
+                         * the lock again.
+                         */
+                        delay(1);
+                }
+        }
+out:
+        if (next_io != NULL)
+                l2arc_log_blk_prefetch_abort(next_io);
+        kmem_free(this_lb, sizeof (*this_lb));
+        kmem_free(next_lb, sizeof (*next_lb));
+        kmem_free(this_lb_buf, sizeof (l2arc_log_blk_phys_t));
+        kmem_free(next_lb_buf, sizeof (l2arc_log_blk_phys_t));
+        if (err == 0)
+                ARCSTAT_BUMP(arcstat_l2_rebuild_successes);
+
+        if (lock_held)
+                spa_config_exit(spa, SCL_L2ARC, vd);
+
+        return (err);
+}
+
+/*
+ * Attempts to read the device header on the provided L2ARC device and writes
+ * it to `hdr'. On success, this function returns 0, otherwise the appropriate
+ * error code is returned.
+ */
+static int
+l2arc_dev_hdr_read(l2arc_dev_t *dev)
+{
+        int                     err;
+        uint64_t                guid;
+        zio_cksum_t             cksum;
+        l2arc_dev_hdr_phys_t    *hdr = dev->l2ad_dev_hdr;
+        const uint64_t          hdr_asize = dev->l2ad_dev_hdr_asize;
+        abd_t *abd;
+
+        guid = spa_guid(dev->l2ad_vdev->vdev_spa);
+
+        abd = abd_get_from_buf(hdr, hdr_asize);
+        err = zio_wait(zio_read_phys(NULL, dev->l2ad_vdev,
+            VDEV_LABEL_START_SIZE, hdr_asize, abd,
+            ZIO_CHECKSUM_OFF, NULL, NULL, ZIO_PRIORITY_ASYNC_READ,
+            ZIO_FLAG_DONT_CACHE | ZIO_FLAG_CANFAIL |
+            ZIO_FLAG_DONT_PROPAGATE | ZIO_FLAG_DONT_RETRY, B_FALSE));
+        abd_put(abd);
+        if (err != 0) {
+                ARCSTAT_BUMP(arcstat_l2_rebuild_abort_io_errors);
+                return (err);
+        }
+
+        if (hdr->dh_magic == BSWAP_64(L2ARC_DEV_HDR_MAGIC_V1))
+                byteswap_uint64_array(hdr, sizeof (*hdr));
+
+        if (hdr->dh_magic != L2ARC_DEV_HDR_MAGIC_V1 ||
+            hdr->dh_spa_guid != guid) {
+                /*
+                 * Attempt to rebuild a device containing no actual dev hdr
+                 * or containing a header from some other pool.
+                 */
+                ARCSTAT_BUMP(arcstat_l2_rebuild_abort_unsupported);
+                return (SET_ERROR(ENOTSUP));
+        }
+
+        l2arc_dev_hdr_checksum(hdr, &cksum);
+        if (!ZIO_CHECKSUM_EQUAL(hdr->dh_self_cksum, cksum)) {
+                ARCSTAT_BUMP(arcstat_l2_rebuild_abort_cksum_errors);
+                return (SET_ERROR(EINVAL));
+        }
+
+        return (0);
+}
+
+/*
+ * Reads L2ARC log blocks from storage and validates their contents.
+ *
+ * This function implements a simple prefetcher to make sure that while
+ * we're processing one buffer the L2ARC is already prefetching the next
+ * one in the chain.
+ *
+ * The arguments this_lp and next_lp point to the current and next log blk
+ * address in the block chain. Similarly, this_lb and next_lb hold the
+ * l2arc_log_blk_phys_t's of the current and next L2ARC blk. The this_lb_buf
+ * and next_lb_buf must be buffers of appropriate to hold a raw
+ * l2arc_log_blk_phys_t (they are used as catch buffers for read ops prior
+ * to buffer decompression).
+ *
+ * The `this_io' and `next_io' arguments are used for block prefetching.
+ * When issuing the first blk IO during rebuild, you should pass NULL for
+ * `this_io'. This function will then issue a sync IO to read the block and
+ * also issue an async IO to fetch the next block in the block chain. The
+ * prefetch IO is returned in `next_io'. On subsequent calls to this
+ * function, pass the value returned in `next_io' from the previous call
+ * as `this_io' and a fresh `next_io' pointer to hold the next prefetch IO.
+ * Prior to the call, you should initialize your `next_io' pointer to be
+ * NULL. If no prefetch IO was issued, the pointer is left set at NULL.
+ *
+ * On success, this function returns 0, otherwise it returns an appropriate
+ * error code. On error the prefetching IO is aborted and cleared before
+ * returning from this function. Therefore, if we return `success', the
+ * caller can assume that we have taken care of cleanup of prefetch IOs.
+ */
+static int
+l2arc_log_blk_read(l2arc_dev_t *dev,
+    const l2arc_log_blkptr_t *this_lbp, const l2arc_log_blkptr_t *next_lbp,
+    l2arc_log_blk_phys_t *this_lb, l2arc_log_blk_phys_t *next_lb,
+    uint8_t *this_lb_buf, uint8_t *next_lb_buf,
+    zio_t *this_io, zio_t **next_io)
+{
+        int             err = 0;
+        zio_cksum_t     cksum;
+
+        ASSERT(this_lbp != NULL && next_lbp != NULL);
+        ASSERT(this_lb != NULL && next_lb != NULL);
+        ASSERT(this_lb_buf != NULL && next_lb_buf != NULL);
+        ASSERT(next_io != NULL && *next_io == NULL);
+        ASSERT(l2arc_log_blkptr_valid(dev, this_lbp));
+
+        /*
+         * Check to see if we have issued the IO for this log blk in a
+         * previous run. If not, this is the first call, so issue it now.
+         */
+        if (this_io == NULL) {
+                this_io = l2arc_log_blk_prefetch(dev->l2ad_vdev, this_lbp,
+                    this_lb_buf);
+        }
+
+        /*
+         * Peek to see if we can start issuing the next IO immediately.
+         */
+        if (l2arc_log_blkptr_valid(dev, next_lbp)) {
+                /*
+                 * Start issuing IO for the next log blk early - this
+                 * should help keep the L2ARC device busy while we
+                 * decompress and restore this log blk.
+                 */
+                *next_io = l2arc_log_blk_prefetch(dev->l2ad_vdev, next_lbp,
+                    next_lb_buf);
+        }
+
+        /* Wait for the IO to read this log block to complete */
+        if ((err = zio_wait(this_io)) != 0) {
+                ARCSTAT_BUMP(arcstat_l2_rebuild_abort_io_errors);
+                goto cleanup;
+        }
+
+        /* Make sure the buffer checks out */
+        fletcher_4_native(this_lb_buf, LBP_GET_PSIZE(this_lbp), NULL, &cksum);
+        if (!ZIO_CHECKSUM_EQUAL(cksum, this_lbp->lbp_cksum)) {
+                ARCSTAT_BUMP(arcstat_l2_rebuild_abort_cksum_errors);
+                err = SET_ERROR(EINVAL);
+                goto cleanup;
+        }
+
+        /* Now we can take our time decoding this buffer */
+        switch (LBP_GET_COMPRESS(this_lbp)) {
+        case ZIO_COMPRESS_OFF:
+                bcopy(this_lb_buf, this_lb, sizeof (*this_lb));
+                break;
+        case ZIO_COMPRESS_LZ4:
+                err = zio_decompress_data_buf(LBP_GET_COMPRESS(this_lbp),
+                    this_lb_buf, this_lb, LBP_GET_PSIZE(this_lbp),
+                    sizeof (*this_lb));
+                if (err != 0) {
+                        err = SET_ERROR(EINVAL);
+                        goto cleanup;
+                }
+
+                break;
+        default:
+                err = SET_ERROR(EINVAL);
+                break;
+        }
+
+        if (this_lb->lb_magic == BSWAP_64(L2ARC_LOG_BLK_MAGIC))
+                byteswap_uint64_array(this_lb, sizeof (*this_lb));
+
+        if (this_lb->lb_magic != L2ARC_LOG_BLK_MAGIC) {
+                err = SET_ERROR(EINVAL);
+                goto cleanup;
+        }
+
+cleanup:
+        /* Abort an in-flight prefetch I/O in case of error */
+        if (err != 0 && *next_io != NULL) {
+                l2arc_log_blk_prefetch_abort(*next_io);
+                *next_io = NULL;
+        }
+        return (err);
+}
+
+/*
+ * Restores the payload of a log blk to ARC. This creates empty ARC hdr
+ * entries which only contain an l2arc hdr, essentially restoring the
+ * buffers to their L2ARC evicted state. This function also updates space
+ * usage on the L2ARC vdev to make sure it tracks restored buffers.
+ */
+static void
+l2arc_log_blk_restore(l2arc_dev_t *dev, uint64_t load_guid,
+    const l2arc_log_blk_phys_t *lb, uint64_t lb_psize)
+{
+        uint64_t        size = 0, psize = 0;
+
+        for (int i = L2ARC_LOG_BLK_ENTRIES - 1; i >= 0; i--) {
+                /*
+                 * Restore goes in the reverse temporal direction to preserve
+                 * correct temporal ordering of buffers in the l2ad_buflist.
+                 * l2arc_hdr_restore also does a list_insert_tail instead of
+                 * list_insert_head on the l2ad_buflist:
+                 *
+                 *              LIST    l2ad_buflist            LIST
+                 *              HEAD  <------ (time) ------     TAIL
+                 * direction    +-----+-----+-----+-----+-----+    direction
+                 * of l2arc <== | buf | buf | buf | buf | buf | ===> of rebuild
+                 * fill         +-----+-----+-----+-----+-----+
+                 *              ^                               ^
+                 *              |                               |
+                 *              |                               |
+                 *      l2arc_fill_thread               l2arc_rebuild
+                 *      places new bufs here            restores bufs here
+                 *
+                 * This also works when the restored bufs get evicted at any
+                 * point during the rebuild.
+                 */
+                l2arc_hdr_restore(&lb->lb_entries[i], dev, load_guid);
+                size += LE_GET_LSIZE(&lb->lb_entries[i]);
+                psize += LE_GET_PSIZE(&lb->lb_entries[i]);
+        }
+
+        /*
+         * Record rebuild stats:
+         *      size            In-memory size of restored buffer data in ARC
+         *      psize           Physical size of restored buffers in the L2ARC
+         *      bufs            # of ARC buffer headers restored
+         *      log_blks        # of L2ARC log entries processed during restore
+         */
+        ARCSTAT_INCR(arcstat_l2_rebuild_size, size);
+        ARCSTAT_INCR(arcstat_l2_rebuild_psize, psize);
+        ARCSTAT_INCR(arcstat_l2_rebuild_bufs, L2ARC_LOG_BLK_ENTRIES);
+        ARCSTAT_BUMP(arcstat_l2_rebuild_log_blks);
+        ARCSTAT_F_AVG(arcstat_l2_log_blk_avg_size, lb_psize);
+        ARCSTAT_F_AVG(arcstat_l2_data_to_meta_ratio, psize / lb_psize);
+        vdev_space_update(dev->l2ad_vdev, psize, 0, 0);
+}
+
+/*
+ * Restores a single ARC buf hdr from a log block. The ARC buffer is put
+ * into a state indicating that it has been evicted to L2ARC.
+ */
+static void
+l2arc_hdr_restore(const l2arc_log_ent_phys_t *le, l2arc_dev_t *dev,
+    uint64_t load_guid)
+{
+        arc_buf_hdr_t           *hdr, *exists;
+        kmutex_t                *hash_lock;
+        arc_buf_contents_t      type = LE_GET_TYPE(le);
+
+        /*
+         * Do all the allocation before grabbing any locks, this lets us
+         * sleep if memory is full and we don't have to deal with failed
+         * allocations.
+         */
+        hdr = arc_buf_alloc_l2only(load_guid, type, dev, le->le_dva,
+            le->le_daddr, LE_GET_LSIZE(le), LE_GET_PSIZE(le),
+            le->le_birth, le->le_freeze_cksum, LE_GET_CHECKSUM(le),
+            LE_GET_COMPRESS(le), LE_GET_ARC_COMPRESS(le));
+
+        ARCSTAT_INCR(arcstat_l2_lsize, HDR_GET_LSIZE(hdr));
+        ARCSTAT_INCR(arcstat_l2_psize, arc_hdr_size(hdr));
+
+        mutex_enter(&dev->l2ad_mtx);
+        /*
+         * We connect the l2hdr to the hdr only after the hdr is in the hash
+         * table, otherwise the rest of the arc hdr manipulation machinery
+         * might get confused.
+         */
+        list_insert_tail(&dev->l2ad_buflist, hdr);
+        (void) refcount_add_many(&dev->l2ad_alloc, arc_hdr_size(hdr), hdr);
+        mutex_exit(&dev->l2ad_mtx);
+
+        exists = buf_hash_insert(hdr, &hash_lock);
+        if (exists) {
+                /* Buffer was already cached, no need to restore it. */
+                arc_hdr_destroy(hdr);
+                mutex_exit(hash_lock);
+                ARCSTAT_BUMP(arcstat_l2_rebuild_bufs_precached);
+                return;
+        }
+
+        mutex_exit(hash_lock);
+}
+
+/*
+ * Used by PL2ARC related functions that do
+ * async read/write
+ */
+static void
+pl2arc_io_done(zio_t *zio)
+{
+        abd_put(zio->io_private);
+        zio->io_private = NULL;
+}
+
+/*
+ * Starts an asynchronous read IO to read a log block. This is used in log
+ * block reconstruction to start reading the next block before we are done
+ * decoding and reconstructing the current block, to keep the l2arc device
+ * nice and hot with read IO to process.
+ * The returned zio will contain a newly allocated memory buffers for the IO
+ * data which should then be freed by the caller once the zio is no longer
+ * needed (i.e. due to it having completed). If you wish to abort this
+ * zio, you should do so using l2arc_log_blk_prefetch_abort, which takes
+ * care of disposing of the allocated buffers correctly.
+ */
+static zio_t *
+l2arc_log_blk_prefetch(vdev_t *vd, const l2arc_log_blkptr_t *lbp,
+    uint8_t *lb_buf)
+{
+        uint32_t        psize;
+        zio_t           *pio;
+        abd_t           *abd;
+
+        psize = LBP_GET_PSIZE(lbp);
+        ASSERT(psize <= sizeof (l2arc_log_blk_phys_t));
+        pio = zio_root(vd->vdev_spa, NULL, NULL, ZIO_FLAG_DONT_CACHE |
+            ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE |
+            ZIO_FLAG_DONT_RETRY);
+        abd = abd_get_from_buf(lb_buf, psize);
+        (void) zio_nowait(zio_read_phys(pio, vd, lbp->lbp_daddr, psize,
+            abd, ZIO_CHECKSUM_OFF, pl2arc_io_done, abd,
+                ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_DONT_CACHE | ZIO_FLAG_CANFAIL |
+            ZIO_FLAG_DONT_PROPAGATE | ZIO_FLAG_DONT_RETRY, B_FALSE));
+
+        return (pio);
+}
+
+/*
+ * Aborts a zio returned from l2arc_log_blk_prefetch and frees the data
+ * buffers allocated for it.
+ */
+static void
+l2arc_log_blk_prefetch_abort(zio_t *zio)
+{
+        (void) zio_wait(zio);
+}
+
+/*
+ * Creates a zio to update the device header on an l2arc device. The zio is
+ * initiated as a child of `pio'.
+ */
+static void
+l2arc_dev_hdr_update(l2arc_dev_t *dev, zio_t *pio)
+{
+        zio_t                   *wzio;
+        abd_t                   *abd;
+        l2arc_dev_hdr_phys_t    *hdr = dev->l2ad_dev_hdr;
+        const uint64_t          hdr_asize = dev->l2ad_dev_hdr_asize;
+
+        hdr->dh_magic = L2ARC_DEV_HDR_MAGIC_V1;
+        hdr->dh_spa_guid = spa_guid(dev->l2ad_vdev->vdev_spa);
+        hdr->dh_alloc_space = refcount_count(&dev->l2ad_alloc);
+        hdr->dh_flags = 0;
+        if (dev->l2ad_first)
+                hdr->dh_flags |= L2ARC_DEV_HDR_EVICT_FIRST;
+
+        /* checksum operation goes last */
+        l2arc_dev_hdr_checksum(hdr, &hdr->dh_self_cksum);
+
+        abd = abd_get_from_buf(hdr, hdr_asize);
+        wzio = zio_write_phys(pio, dev->l2ad_vdev, VDEV_LABEL_START_SIZE,
+            hdr_asize, abd, ZIO_CHECKSUM_OFF, pl2arc_io_done, abd,
+            ZIO_PRIORITY_ASYNC_WRITE, ZIO_FLAG_CANFAIL, B_FALSE);
+        DTRACE_PROBE2(l2arc__write, vdev_t *, dev->l2ad_vdev, zio_t *, wzio);
+        (void) zio_nowait(wzio);
+}
+
+/*
+ * Commits a log block to the L2ARC device. This routine is invoked from
+ * l2arc_write_buffers when the log block fills up.
+ * This function allocates some memory to temporarily hold the serialized
+ * buffer to be written. This is then released in l2arc_write_done.
+ */
+static void
+l2arc_log_blk_commit(l2arc_dev_t *dev, zio_t *pio,
+    l2arc_write_callback_t *cb)
+{
+        l2arc_log_blk_phys_t    *lb = &dev->l2ad_log_blk;
+        uint64_t                psize, asize;
+        l2arc_log_blk_buf_t     *lb_buf;
+        abd_t *abd;
+        zio_t                   *wzio;
+
+        VERIFY(dev->l2ad_log_ent_idx == L2ARC_LOG_BLK_ENTRIES);
+
+        /* link the buffer into the block chain */
+        lb->lb_back2_lbp = dev->l2ad_dev_hdr->dh_start_lbps[1];
+        lb->lb_magic = L2ARC_LOG_BLK_MAGIC;
+
+        /* try to compress the buffer */
+        lb_buf = kmem_zalloc(sizeof (*lb_buf), KM_SLEEP);
+        list_insert_tail(&cb->l2wcb_log_blk_buflist, lb_buf);
+        abd = abd_get_from_buf(lb, sizeof (*lb));
+        psize = zio_compress_data(ZIO_COMPRESS_LZ4, abd, lb_buf->lbb_log_blk,
+            sizeof (*lb));
+        abd_put(abd);
+        /* a log block is never entirely zero */
+        ASSERT(psize != 0);
+        asize = vdev_psize_to_asize(dev->l2ad_vdev, psize);
+        ASSERT(asize <= sizeof (lb_buf->lbb_log_blk));
+
+        /*
+         * Update the start log blk pointer in the device header to point
+         * to the log block we're about to write.
+         */
+        dev->l2ad_dev_hdr->dh_start_lbps[1] =
+            dev->l2ad_dev_hdr->dh_start_lbps[0];
+        dev->l2ad_dev_hdr->dh_start_lbps[0].lbp_daddr = dev->l2ad_hand;
+        _NOTE(CONSTCOND)
+        LBP_SET_LSIZE(&dev->l2ad_dev_hdr->dh_start_lbps[0], sizeof (*lb));
+        LBP_SET_PSIZE(&dev->l2ad_dev_hdr->dh_start_lbps[0], asize);
+        LBP_SET_CHECKSUM(&dev->l2ad_dev_hdr->dh_start_lbps[0],
+            ZIO_CHECKSUM_FLETCHER_4);
+        LBP_SET_TYPE(&dev->l2ad_dev_hdr->dh_start_lbps[0], 0);
+
+        if (asize < sizeof (*lb)) {
+                /* compression succeeded */
+                bzero(lb_buf->lbb_log_blk + psize, asize - psize);
+                LBP_SET_COMPRESS(&dev->l2ad_dev_hdr->dh_start_lbps[0],
+                    ZIO_COMPRESS_LZ4);
+        } else {
+                /* compression failed */
+                bcopy(lb, lb_buf->lbb_log_blk, sizeof (*lb));
+                LBP_SET_COMPRESS(&dev->l2ad_dev_hdr->dh_start_lbps[0],
+                    ZIO_COMPRESS_OFF);
+        }
+
+        /* checksum what we're about to write */
+        fletcher_4_native(lb_buf->lbb_log_blk, asize,
+            NULL, &dev->l2ad_dev_hdr->dh_start_lbps[0].lbp_cksum);
+
+        /* perform the write itself */
+        CTASSERT(L2ARC_LOG_BLK_SIZE >= SPA_MINBLOCKSIZE &&
+            L2ARC_LOG_BLK_SIZE <= SPA_MAXBLOCKSIZE);
+        abd = abd_get_from_buf(lb_buf->lbb_log_blk, asize);
+        wzio = zio_write_phys(pio, dev->l2ad_vdev, dev->l2ad_hand,
+            asize, abd, ZIO_CHECKSUM_OFF, pl2arc_io_done, abd,
+            ZIO_PRIORITY_ASYNC_WRITE, ZIO_FLAG_CANFAIL, B_FALSE);
+        DTRACE_PROBE2(l2arc__write, vdev_t *, dev->l2ad_vdev, zio_t *, wzio);
+        (void) zio_nowait(wzio);
+
+        dev->l2ad_hand += asize;
+        vdev_space_update(dev->l2ad_vdev, asize, 0, 0);
+
+        /* bump the kstats */
+        ARCSTAT_INCR(arcstat_l2_write_bytes, asize);
+        ARCSTAT_BUMP(arcstat_l2_log_blk_writes);
+        ARCSTAT_F_AVG(arcstat_l2_log_blk_avg_size, asize);
+        ARCSTAT_F_AVG(arcstat_l2_data_to_meta_ratio,
+            dev->l2ad_log_blk_payload_asize / asize);
+
+        /* start a new log block */
+        dev->l2ad_log_ent_idx = 0;
+        dev->l2ad_log_blk_payload_asize = 0;
+}
+
+/*
+ * Validates an L2ARC log blk address to make sure that it can be read
+ * from the provided L2ARC device. Returns B_TRUE if the address is
+ * within the device's bounds, or B_FALSE if not.
+ */
+static boolean_t
+l2arc_log_blkptr_valid(l2arc_dev_t *dev, const l2arc_log_blkptr_t *lbp)
+{
+        uint64_t psize = LBP_GET_PSIZE(lbp);
+        uint64_t end = lbp->lbp_daddr + psize;
+
+        /*
+         * A log block is valid if all of the following conditions are true:
+         * - it fits entirely between l2ad_start and l2ad_end
+         * - it has a valid size
+         */
+        return (lbp->lbp_daddr >= dev->l2ad_start && end <= dev->l2ad_end &&
+            psize > 0 && psize <= sizeof (l2arc_log_blk_phys_t));
+}
+
+/*
+ * Computes the checksum of `hdr' and stores it in `cksum'.
+ */
+static void
+l2arc_dev_hdr_checksum(const l2arc_dev_hdr_phys_t *hdr, zio_cksum_t *cksum)
+{
+        fletcher_4_native((uint8_t *)hdr +
+            offsetof(l2arc_dev_hdr_phys_t, dh_spa_guid),
+            sizeof (*hdr) - offsetof(l2arc_dev_hdr_phys_t, dh_spa_guid),
+            NULL, cksum);
+}
+
+/*
+ * Inserts ARC buffer `ab' into the current L2ARC log blk on the device.
+ * The buffer being inserted must be present in L2ARC.
+ * Returns B_TRUE if the L2ARC log blk is full and needs to be committed
+ * to L2ARC, or B_FALSE if it still has room for more ARC buffers.
+ */
+static boolean_t
+l2arc_log_blk_insert(l2arc_dev_t *dev, const arc_buf_hdr_t *ab)
+{
+        l2arc_log_blk_phys_t    *lb = &dev->l2ad_log_blk;
+        l2arc_log_ent_phys_t    *le;
+        int                     index = dev->l2ad_log_ent_idx++;
+
+        ASSERT(index < L2ARC_LOG_BLK_ENTRIES);
+
+        le = &lb->lb_entries[index];
+        bzero(le, sizeof (*le));
+        le->le_dva = ab->b_dva;
+        le->le_birth = ab->b_birth;
+        le->le_daddr = ab->b_l2hdr.b_daddr;
+        LE_SET_LSIZE(le, HDR_GET_LSIZE(ab));
+        LE_SET_PSIZE(le, HDR_GET_PSIZE(ab));
+
+        if ((ab->b_flags & ARC_FLAG_COMPRESSED_ARC) != 0) {
+                LE_SET_ARC_COMPRESS(le, 1);
+                LE_SET_COMPRESS(le, HDR_GET_COMPRESS(ab));
+        } else {
+                ASSERT3U(HDR_GET_COMPRESS(ab), ==, ZIO_COMPRESS_OFF);
+                LE_SET_ARC_COMPRESS(le, 0);
+                LE_SET_COMPRESS(le, ZIO_COMPRESS_OFF);
+        }
+
+        if (ab->b_freeze_cksum != NULL) {
+                le->le_freeze_cksum = *ab->b_freeze_cksum;
+                LE_SET_CHECKSUM(le, ZIO_CHECKSUM_FLETCHER_2);
+        } else {
+                LE_SET_CHECKSUM(le, ZIO_CHECKSUM_OFF);
+        }
+
+        LE_SET_TYPE(le, arc_flags_to_bufc(ab->b_flags));
+        dev->l2ad_log_blk_payload_asize += arc_hdr_size((arc_buf_hdr_t *)ab);
+
+        return (dev->l2ad_log_ent_idx == L2ARC_LOG_BLK_ENTRIES);
+}
+
+/*
+ * Checks whether a given L2ARC device address sits in a time-sequential
+ * range. The trick here is that the L2ARC is a rotary buffer, so we can't
+ * just do a range comparison, we need to handle the situation in which the
+ * range wraps around the end of the L2ARC device. Arguments:
+ *      bottom  Lower end of the range to check (written to earlier).
+ *      top     Upper end of the range to check (written to later).
+ *      check   The address for which we want to determine if it sits in
+ *              between the top and bottom.
+ *
+ * The 3-way conditional below represents the following cases:
+ *
+ *      bottom < top : Sequentially ordered case:
+ *        <check>--------+-------------------+
+ *                       |  (overlap here?)  |
+ *       L2ARC dev       V                   V
+ *       |---------------<bottom>============<top>--------------|
+ *
+ *      bottom > top: Looped-around case:
+ *                            <check>--------+------------------+
+ *                                           |  (overlap here?) |
+ *       L2ARC dev                           V                  V
+ *       |===============<top>---------------<bottom>===========|
+ *       ^               ^
+ *       |  (or here?)   |
+ *       +---------------+---------<check>
+ *
+ *      top == bottom : Just a single address comparison.
+ */
+static inline boolean_t
+l2arc_range_check_overlap(uint64_t bottom, uint64_t top, uint64_t check)
+{
+        if (bottom < top)
+                return (bottom <= check && check <= top);
+        else if (bottom > top)
+                return (check <= top || bottom <= check);
+        else
+                return (check == top);
 }