1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  23  * Copyright (c) 2011, 2017 by Delphix. All rights reserved.
  24  */
  25 /* Copyright (c) 2013 by Saso Kiselkov. All rights reserved. */
  26 /* Copyright (c) 2013, Joyent, Inc. All rights reserved. */
  27 /* Copyright 2016 Nexenta Systems, Inc. All rights reserved. */
  28 
  29 #include <sys/dmu.h>
  30 #include <sys/dmu_impl.h>
  31 #include <sys/dmu_tx.h>
  32 #include <sys/dbuf.h>
  33 #include <sys/dnode.h>
  34 #include <sys/zfs_context.h>
  35 #include <sys/dmu_objset.h>
  36 #include <sys/dmu_traverse.h>
  37 #include <sys/dsl_dataset.h>
  38 #include <sys/dsl_dir.h>
  39 #include <sys/dsl_pool.h>
  40 #include <sys/dsl_synctask.h>
  41 #include <sys/dsl_prop.h>
  42 #include <sys/dmu_zfetch.h>
  43 #include <sys/zfs_ioctl.h>
  44 #include <sys/zap.h>
  45 #include <sys/zio_checksum.h>
  46 #include <sys/zio_compress.h>
  47 #include <sys/sa.h>
  48 #include <sys/spa_impl.h>
  49 #include <sys/zfeature.h>
  50 #include <sys/abd.h>
  51 #ifdef _KERNEL
  52 #include <sys/vmsystm.h>
  53 #include <sys/zfs_znode.h>
  54 #include <sys/zfs_vfsops.h>
  55 #endif
  56 #include <sys/special.h>
  57 
  58 /*
  59  * Enable/disable nopwrite feature.
  60  */
  61 int zfs_nopwrite_enabled = 1;
  62 
  63 /*
  64  * Tunable to control percentage of dirtied blocks from frees in one TXG.
  65  * After this threshold is crossed, additional dirty blocks from frees
  66  * wait until the next TXG.
  67  * A value of zero will disable this throttle.
  68  */
  69 uint32_t zfs_per_txg_dirty_frees_percent = 30;
  70 
  71 const dmu_object_type_info_t dmu_ot[DMU_OT_NUMTYPES] = {
  72         { DMU_BSWAP_UINT8,  TRUE,  FALSE,  "unallocated"                },
  73         { DMU_BSWAP_ZAP,    TRUE,  TRUE,   "object directory"           },
  74         { DMU_BSWAP_UINT64, TRUE,  TRUE,   "object array"               },
  75         { DMU_BSWAP_UINT8,  TRUE,  FALSE,  "packed nvlist"              },
  76         { DMU_BSWAP_UINT64, TRUE,  FALSE,  "packed nvlist size"         },
  77         { DMU_BSWAP_UINT64, TRUE,  FALSE,  "bpobj"                      },
  78         { DMU_BSWAP_UINT64, TRUE,  FALSE,  "bpobj header"               },
  79         { DMU_BSWAP_UINT64, TRUE,  FALSE,  "SPA space map header"       },
  80         { DMU_BSWAP_UINT64, TRUE,  FALSE,  "SPA space map"              },
  81         { DMU_BSWAP_UINT64, TRUE,  FALSE,  "ZIL intent log"             },
  82         { DMU_BSWAP_DNODE,  TRUE,  FALSE,  "DMU dnode"                  },
  83         { DMU_BSWAP_OBJSET, TRUE,  TRUE,   "DMU objset"                 },
  84         { DMU_BSWAP_UINT64, TRUE,  TRUE,   "DSL directory"              },
  85         { DMU_BSWAP_ZAP,    TRUE,  TRUE,   "DSL directory child map"    },
  86         { DMU_BSWAP_ZAP,    TRUE,  TRUE,   "DSL dataset snap map"       },
  87         { DMU_BSWAP_ZAP,    TRUE,  TRUE,   "DSL props"                  },
  88         { DMU_BSWAP_UINT64, TRUE,  TRUE,   "DSL dataset"                },
  89         { DMU_BSWAP_ZNODE,  TRUE,  FALSE,  "ZFS znode"                  },
  90         { DMU_BSWAP_OLDACL, TRUE,  FALSE,  "ZFS V0 ACL"                 },
  91         { DMU_BSWAP_UINT8,  FALSE, FALSE,  "ZFS plain file"             },
  92         { DMU_BSWAP_ZAP,    TRUE,  FALSE,  "ZFS directory"              },
  93         { DMU_BSWAP_ZAP,    TRUE,  FALSE,  "ZFS master node"            },
  94         { DMU_BSWAP_ZAP,    TRUE,  FALSE,  "ZFS delete queue"           },
  95         { DMU_BSWAP_UINT8,  FALSE, FALSE,  "zvol object"                },
  96         { DMU_BSWAP_ZAP,    TRUE,  TRUE,   "zvol prop"                  },
  97         { DMU_BSWAP_UINT8,  FALSE, FALSE,  "other uint8[]"              },
  98         { DMU_BSWAP_UINT64, FALSE, FALSE,  "other uint64[]"             },
  99         { DMU_BSWAP_ZAP,    TRUE,  FALSE,  "other ZAP"                  },
 100         { DMU_BSWAP_ZAP,    TRUE,  FALSE,  "persistent error log"       },
 101         { DMU_BSWAP_UINT8,  TRUE,  FALSE,  "SPA history"                },
 102         { DMU_BSWAP_UINT64, TRUE,  FALSE,  "SPA history offsets"        },
 103         { DMU_BSWAP_ZAP,    TRUE,  TRUE,   "Pool properties"            },
 104         { DMU_BSWAP_ZAP,    TRUE,  TRUE,   "DSL permissions"            },
 105         { DMU_BSWAP_ACL,    TRUE,  FALSE,  "ZFS ACL"                    },
 106         { DMU_BSWAP_UINT8,  TRUE,  FALSE,  "ZFS SYSACL"                 },
 107         { DMU_BSWAP_UINT8,  TRUE,  FALSE,  "FUID table"                 },
 108         { DMU_BSWAP_UINT64, TRUE,  FALSE,  "FUID table size"            },
 109         { DMU_BSWAP_ZAP,    TRUE,  TRUE,   "DSL dataset next clones"    },
 110         { DMU_BSWAP_ZAP,    TRUE,  FALSE,  "scan work queue"            },
 111         { DMU_BSWAP_ZAP,    TRUE,  FALSE,  "ZFS user/group used"        },
 112         { DMU_BSWAP_ZAP,    TRUE,  FALSE,  "ZFS user/group quota"       },
 113         { DMU_BSWAP_ZAP,    TRUE,  TRUE,   "snapshot refcount tags"     },
 114         { DMU_BSWAP_ZAP,    TRUE,  FALSE,  "DDT ZAP algorithm"          },
 115         { DMU_BSWAP_ZAP,    TRUE,  FALSE,  "DDT statistics"             },
 116         { DMU_BSWAP_UINT8,  TRUE,  FALSE,  "System attributes"          },
 117         { DMU_BSWAP_ZAP,    TRUE,  FALSE,  "SA master node"             },
 118         { DMU_BSWAP_ZAP,    TRUE,  FALSE,  "SA attr registration"       },
 119         { DMU_BSWAP_ZAP,    TRUE,  FALSE,  "SA attr layouts"            },
 120         { DMU_BSWAP_ZAP,    TRUE,  FALSE,  "scan translations"          },
 121         { DMU_BSWAP_UINT8,  FALSE, FALSE,  "deduplicated block"         },
 122         { DMU_BSWAP_ZAP,    TRUE,  TRUE,   "DSL deadlist map"           },
 123         { DMU_BSWAP_UINT64, TRUE,  TRUE,   "DSL deadlist map hdr"       },
 124         { DMU_BSWAP_ZAP,    TRUE,  TRUE,   "DSL dir clones"             },
 125         { DMU_BSWAP_UINT64, TRUE,  FALSE,  "bpobj subobj"               }
 126 };
 127 
 128 const dmu_object_byteswap_info_t dmu_ot_byteswap[DMU_BSWAP_NUMFUNCS] = {
 129         {       byteswap_uint8_array,   "uint8"         },
 130         {       byteswap_uint16_array,  "uint16"        },
 131         {       byteswap_uint32_array,  "uint32"        },
 132         {       byteswap_uint64_array,  "uint64"        },
 133         {       zap_byteswap,           "zap"           },
 134         {       dnode_buf_byteswap,     "dnode"         },
 135         {       dmu_objset_byteswap,    "objset"        },
 136         {       zfs_znode_byteswap,     "znode"         },
 137         {       zfs_oldacl_byteswap,    "oldacl"        },
 138         {       zfs_acl_byteswap,       "acl"           }
 139 };
 140 
 141 int
 142 dmu_buf_hold_noread_by_dnode(dnode_t *dn, uint64_t offset,
 143     void *tag, dmu_buf_t **dbp)
 144 {
 145         uint64_t blkid;
 146         dmu_buf_impl_t *db;
 147 
 148         blkid = dbuf_whichblock(dn, 0, offset);
 149         rw_enter(&dn->dn_struct_rwlock, RW_READER);
 150         db = dbuf_hold(dn, blkid, tag);
 151         rw_exit(&dn->dn_struct_rwlock);
 152 
 153         if (db == NULL) {
 154                 *dbp = NULL;
 155                 return (SET_ERROR(EIO));
 156         }
 157 
 158         *dbp = &db->db;
 159         return (0);
 160 }
 161 int
 162 dmu_buf_hold_noread(objset_t *os, uint64_t object, uint64_t offset,
 163     void *tag, dmu_buf_t **dbp)
 164 {
 165         dnode_t *dn;
 166         uint64_t blkid;
 167         dmu_buf_impl_t *db;
 168         int err;
 169 
 170         err = dnode_hold(os, object, FTAG, &dn);
 171         if (err)
 172                 return (err);
 173         blkid = dbuf_whichblock(dn, 0, offset);
 174         rw_enter(&dn->dn_struct_rwlock, RW_READER);
 175         db = dbuf_hold(dn, blkid, tag);
 176         rw_exit(&dn->dn_struct_rwlock);
 177         dnode_rele(dn, FTAG);
 178 
 179         if (db == NULL) {
 180                 *dbp = NULL;
 181                 return (SET_ERROR(EIO));
 182         }
 183 
 184         *dbp = &db->db;
 185         return (err);
 186 }
 187 
 188 int
 189 dmu_buf_hold_by_dnode(dnode_t *dn, uint64_t offset,
 190     void *tag, dmu_buf_t **dbp, int flags)
 191 {
 192         int err;
 193         int db_flags = DB_RF_CANFAIL;
 194 
 195         if (flags & DMU_READ_NO_PREFETCH)
 196                 db_flags |= DB_RF_NOPREFETCH;
 197 
 198         err = dmu_buf_hold_noread_by_dnode(dn, offset, tag, dbp);
 199         if (err == 0) {
 200                 dmu_buf_impl_t *db = (dmu_buf_impl_t *)(*dbp);
 201                 err = dbuf_read(db, NULL, db_flags);
 202                 if (err != 0) {
 203                         dbuf_rele(db, tag);
 204                         *dbp = NULL;
 205                 }
 206         }
 207 
 208         return (err);
 209 }
 210 
 211 int
 212 dmu_buf_hold(objset_t *os, uint64_t object, uint64_t offset,
 213     void *tag, dmu_buf_t **dbp, int flags)
 214 {
 215         int err;
 216         int db_flags = DB_RF_CANFAIL;
 217 
 218         if (flags & DMU_READ_NO_PREFETCH)
 219                 db_flags |= DB_RF_NOPREFETCH;
 220 
 221         err = dmu_buf_hold_noread(os, object, offset, tag, dbp);
 222         if (err == 0) {
 223                 dmu_buf_impl_t *db = (dmu_buf_impl_t *)(*dbp);
 224                 err = dbuf_read(db, NULL, db_flags);
 225                 if (err != 0) {
 226                         dbuf_rele(db, tag);
 227                         *dbp = NULL;
 228                 }
 229         }
 230 
 231         return (err);
 232 }
 233 
 234 int
 235 dmu_bonus_max(void)
 236 {
 237         return (DN_MAX_BONUSLEN);
 238 }
 239 
 240 int
 241 dmu_set_bonus(dmu_buf_t *db_fake, int newsize, dmu_tx_t *tx)
 242 {
 243         dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
 244         dnode_t *dn;
 245         int error;
 246 
 247         DB_DNODE_ENTER(db);
 248         dn = DB_DNODE(db);
 249 
 250         if (dn->dn_bonus != db) {
 251                 error = SET_ERROR(EINVAL);
 252         } else if (newsize < 0 || newsize > db_fake->db_size) {
 253                 error = SET_ERROR(EINVAL);
 254         } else {
 255                 dnode_setbonuslen(dn, newsize, tx);
 256                 error = 0;
 257         }
 258 
 259         DB_DNODE_EXIT(db);
 260         return (error);
 261 }
 262 
 263 int
 264 dmu_set_bonustype(dmu_buf_t *db_fake, dmu_object_type_t type, dmu_tx_t *tx)
 265 {
 266         dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
 267         dnode_t *dn;
 268         int error;
 269 
 270         DB_DNODE_ENTER(db);
 271         dn = DB_DNODE(db);
 272 
 273         if (!DMU_OT_IS_VALID(type)) {
 274                 error = SET_ERROR(EINVAL);
 275         } else if (dn->dn_bonus != db) {
 276                 error = SET_ERROR(EINVAL);
 277         } else {
 278                 dnode_setbonus_type(dn, type, tx);
 279                 error = 0;
 280         }
 281 
 282         DB_DNODE_EXIT(db);
 283         return (error);
 284 }
 285 
 286 dmu_object_type_t
 287 dmu_get_bonustype(dmu_buf_t *db_fake)
 288 {
 289         dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
 290         dnode_t *dn;
 291         dmu_object_type_t type;
 292 
 293         DB_DNODE_ENTER(db);
 294         dn = DB_DNODE(db);
 295         type = dn->dn_bonustype;
 296         DB_DNODE_EXIT(db);
 297 
 298         return (type);
 299 }
 300 
 301 int
 302 dmu_rm_spill(objset_t *os, uint64_t object, dmu_tx_t *tx)
 303 {
 304         dnode_t *dn;
 305         int error;
 306 
 307         error = dnode_hold(os, object, FTAG, &dn);
 308         dbuf_rm_spill(dn, tx);
 309         rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
 310         dnode_rm_spill(dn, tx);
 311         rw_exit(&dn->dn_struct_rwlock);
 312         dnode_rele(dn, FTAG);
 313         return (error);
 314 }
 315 
 316 /*
 317  * returns ENOENT, EIO, or 0.
 318  */
 319 int
 320 dmu_bonus_hold(objset_t *os, uint64_t object, void *tag, dmu_buf_t **dbp)
 321 {
 322         dnode_t *dn;
 323         dmu_buf_impl_t *db;
 324         int error;
 325 
 326         error = dnode_hold(os, object, FTAG, &dn);
 327         if (error)
 328                 return (error);
 329 
 330         rw_enter(&dn->dn_struct_rwlock, RW_READER);
 331         if (dn->dn_bonus == NULL) {
 332                 rw_exit(&dn->dn_struct_rwlock);
 333                 rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
 334                 if (dn->dn_bonus == NULL)
 335                         dbuf_create_bonus(dn);
 336         }
 337         db = dn->dn_bonus;
 338 
 339         /* as long as the bonus buf is held, the dnode will be held */
 340         if (refcount_add(&db->db_holds, tag) == 1) {
 341                 VERIFY(dnode_add_ref(dn, db));
 342                 atomic_inc_32(&dn->dn_dbufs_count);
 343         }
 344 
 345         /*
 346          * Wait to drop dn_struct_rwlock until after adding the bonus dbuf's
 347          * hold and incrementing the dbuf count to ensure that dnode_move() sees
 348          * a dnode hold for every dbuf.
 349          */
 350         rw_exit(&dn->dn_struct_rwlock);
 351 
 352         dnode_rele(dn, FTAG);
 353 
 354         VERIFY(0 == dbuf_read(db, NULL, DB_RF_MUST_SUCCEED | DB_RF_NOPREFETCH));
 355 
 356         *dbp = &db->db;
 357         return (0);
 358 }
 359 
 360 /*
 361  * returns ENOENT, EIO, or 0.
 362  *
 363  * This interface will allocate a blank spill dbuf when a spill blk
 364  * doesn't already exist on the dnode.
 365  *
 366  * if you only want to find an already existing spill db, then
 367  * dmu_spill_hold_existing() should be used.
 368  */
 369 int
 370 dmu_spill_hold_by_dnode(dnode_t *dn, uint32_t flags, void *tag, dmu_buf_t **dbp)
 371 {
 372         dmu_buf_impl_t *db = NULL;
 373         int err;
 374 
 375         if ((flags & DB_RF_HAVESTRUCT) == 0)
 376                 rw_enter(&dn->dn_struct_rwlock, RW_READER);
 377 
 378         db = dbuf_hold(dn, DMU_SPILL_BLKID, tag);
 379 
 380         if ((flags & DB_RF_HAVESTRUCT) == 0)
 381                 rw_exit(&dn->dn_struct_rwlock);
 382 
 383         ASSERT(db != NULL);
 384         err = dbuf_read(db, NULL, flags);
 385         if (err == 0)
 386                 *dbp = &db->db;
 387         else
 388                 dbuf_rele(db, tag);
 389         return (err);
 390 }
 391 
 392 int
 393 dmu_spill_hold_existing(dmu_buf_t *bonus, void *tag, dmu_buf_t **dbp)
 394 {
 395         dmu_buf_impl_t *db = (dmu_buf_impl_t *)bonus;
 396         dnode_t *dn;
 397         int err;
 398 
 399         DB_DNODE_ENTER(db);
 400         dn = DB_DNODE(db);
 401 
 402         if (spa_version(dn->dn_objset->os_spa) < SPA_VERSION_SA) {
 403                 err = SET_ERROR(EINVAL);
 404         } else {
 405                 rw_enter(&dn->dn_struct_rwlock, RW_READER);
 406 
 407                 if (!dn->dn_have_spill) {
 408                         err = SET_ERROR(ENOENT);
 409                 } else {
 410                         err = dmu_spill_hold_by_dnode(dn,
 411                             DB_RF_HAVESTRUCT | DB_RF_CANFAIL, tag, dbp);
 412                 }
 413 
 414                 rw_exit(&dn->dn_struct_rwlock);
 415         }
 416 
 417         DB_DNODE_EXIT(db);
 418         return (err);
 419 }
 420 
 421 int
 422 dmu_spill_hold_by_bonus(dmu_buf_t *bonus, void *tag, dmu_buf_t **dbp)
 423 {
 424         dmu_buf_impl_t *db = (dmu_buf_impl_t *)bonus;
 425         dnode_t *dn;
 426         int err;
 427 
 428         DB_DNODE_ENTER(db);
 429         dn = DB_DNODE(db);
 430         err = dmu_spill_hold_by_dnode(dn, DB_RF_CANFAIL, tag, dbp);
 431         DB_DNODE_EXIT(db);
 432 
 433         return (err);
 434 }
 435 
 436 /*
 437  * Note: longer-term, we should modify all of the dmu_buf_*() interfaces
 438  * to take a held dnode rather than <os, object> -- the lookup is wasteful,
 439  * and can induce severe lock contention when writing to several files
 440  * whose dnodes are in the same block.
 441  */
 442 static int
 443 dmu_buf_hold_array_by_dnode(dnode_t *dn, uint64_t offset, uint64_t length,
 444     boolean_t read, void *tag, int *numbufsp, dmu_buf_t ***dbpp, uint32_t flags)
 445 {
 446         dmu_buf_t **dbp;
 447         uint64_t blkid, nblks, i;
 448         uint32_t dbuf_flags;
 449         int err;
 450         zio_t *zio;
 451 
 452         ASSERT(length <= DMU_MAX_ACCESS);
 453 
 454         /*
 455          * Note: We directly notify the prefetch code of this read, so that
 456          * we can tell it about the multi-block read.  dbuf_read() only knows
 457          * about the one block it is accessing.
 458          */
 459         dbuf_flags = DB_RF_CANFAIL | DB_RF_NEVERWAIT | DB_RF_HAVESTRUCT |
 460             DB_RF_NOPREFETCH;
 461 
 462         rw_enter(&dn->dn_struct_rwlock, RW_READER);
 463         if (dn->dn_datablkshift) {
 464                 int blkshift = dn->dn_datablkshift;
 465                 nblks = (P2ROUNDUP(offset + length, 1ULL << blkshift) -
 466                     P2ALIGN(offset, 1ULL << blkshift)) >> blkshift;
 467         } else {
 468                 if (offset + length > dn->dn_datablksz) {
 469                         zfs_panic_recover("zfs: accessing past end of object "
 470                             "%llx/%llx (size=%u access=%llu+%llu)",
 471                             (longlong_t)dn->dn_objset->
 472                             os_dsl_dataset->ds_object,
 473                             (longlong_t)dn->dn_object, dn->dn_datablksz,
 474                             (longlong_t)offset, (longlong_t)length);
 475                         rw_exit(&dn->dn_struct_rwlock);
 476                         return (SET_ERROR(EIO));
 477                 }
 478                 nblks = 1;
 479         }
 480         dbp = kmem_zalloc(sizeof (dmu_buf_t *) * nblks, KM_SLEEP);
 481 
 482         zio = zio_root(dn->dn_objset->os_spa, NULL, NULL, ZIO_FLAG_CANFAIL);
 483         blkid = dbuf_whichblock(dn, 0, offset);
 484         for (i = 0; i < nblks; i++) {
 485                 dmu_buf_impl_t *db = dbuf_hold(dn, blkid + i, tag);
 486                 if (db == NULL) {
 487                         rw_exit(&dn->dn_struct_rwlock);
 488                         dmu_buf_rele_array(dbp, nblks, tag);
 489                         zio_nowait(zio);
 490                         return (SET_ERROR(EIO));
 491                 }
 492 
 493                 /* initiate async i/o */
 494                 if (read)
 495                         (void) dbuf_read(db, zio, dbuf_flags);
 496                 dbp[i] = &db->db;
 497         }
 498 
 499         if ((flags & DMU_READ_NO_PREFETCH) == 0 &&
 500             DNODE_META_IS_CACHEABLE(dn) && length <= zfetch_array_rd_sz) {
 501                 dmu_zfetch(&dn->dn_zfetch, blkid, nblks,
 502                     read && DNODE_IS_CACHEABLE(dn));
 503         }
 504         rw_exit(&dn->dn_struct_rwlock);
 505 
 506         /* wait for async i/o */
 507         err = zio_wait(zio);
 508         if (err) {
 509                 dmu_buf_rele_array(dbp, nblks, tag);
 510                 return (err);
 511         }
 512 
 513         /* wait for other io to complete */
 514         if (read) {
 515                 for (i = 0; i < nblks; i++) {
 516                         dmu_buf_impl_t *db = (dmu_buf_impl_t *)dbp[i];
 517                         mutex_enter(&db->db_mtx);
 518                         while (db->db_state == DB_READ ||
 519                             db->db_state == DB_FILL)
 520                                 cv_wait(&db->db_changed, &db->db_mtx);
 521                         if (db->db_state == DB_UNCACHED)
 522                                 err = SET_ERROR(EIO);
 523                         mutex_exit(&db->db_mtx);
 524                         if (err) {
 525                                 dmu_buf_rele_array(dbp, nblks, tag);
 526                                 return (err);
 527                         }
 528                 }
 529         }
 530 
 531         *numbufsp = nblks;
 532         *dbpp = dbp;
 533         return (0);
 534 }
 535 
 536 static int
 537 dmu_buf_hold_array(objset_t *os, uint64_t object, uint64_t offset,
 538     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
 539 {
 540         dnode_t *dn;
 541         int err;
 542 
 543         err = dnode_hold(os, object, FTAG, &dn);
 544         if (err)
 545                 return (err);
 546 
 547         err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
 548             numbufsp, dbpp, DMU_READ_PREFETCH);
 549 
 550         dnode_rele(dn, FTAG);
 551 
 552         return (err);
 553 }
 554 
 555 int
 556 dmu_buf_hold_array_by_bonus(dmu_buf_t *db_fake, uint64_t offset,
 557     uint64_t length, boolean_t read, void *tag, int *numbufsp,
 558     dmu_buf_t ***dbpp)
 559 {
 560         dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
 561         dnode_t *dn;
 562         int err;
 563 
 564         DB_DNODE_ENTER(db);
 565         dn = DB_DNODE(db);
 566         err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
 567             numbufsp, dbpp, DMU_READ_PREFETCH);
 568         DB_DNODE_EXIT(db);
 569 
 570         return (err);
 571 }
 572 
 573 void
 574 dmu_buf_rele_array(dmu_buf_t **dbp_fake, int numbufs, void *tag)
 575 {
 576         int i;
 577         dmu_buf_impl_t **dbp = (dmu_buf_impl_t **)dbp_fake;
 578 
 579         if (numbufs == 0)
 580                 return;
 581 
 582         for (i = 0; i < numbufs; i++) {
 583                 if (dbp[i])
 584                         dbuf_rele(dbp[i], tag);
 585         }
 586 
 587         kmem_free(dbp, sizeof (dmu_buf_t *) * numbufs);
 588 }
 589 
 590 /*
 591  * Issue prefetch i/os for the given blocks.  If level is greater than 0, the
 592  * indirect blocks prefeteched will be those that point to the blocks containing
 593  * the data starting at offset, and continuing to offset + len.
 594  *
 595  * Note that if the indirect blocks above the blocks being prefetched are not in
 596  * cache, they will be asychronously read in.
 597  */
 598 void
 599 dmu_prefetch(objset_t *os, uint64_t object, int64_t level, uint64_t offset,
 600     uint64_t len, zio_priority_t pri)
 601 {
 602         dnode_t *dn;
 603         uint64_t blkid;
 604         int nblks, err;
 605 
 606         if (len == 0) {  /* they're interested in the bonus buffer */
 607                 dn = DMU_META_DNODE(os);
 608 
 609                 if (object == 0 || object >= DN_MAX_OBJECT)
 610                         return;
 611 
 612                 rw_enter(&dn->dn_struct_rwlock, RW_READER);
 613                 blkid = dbuf_whichblock(dn, level,
 614                     object * sizeof (dnode_phys_t));
 615                 dbuf_prefetch(dn, level, blkid, pri, 0);
 616                 rw_exit(&dn->dn_struct_rwlock);
 617                 return;
 618         }
 619 
 620         /*
 621          * XXX - Note, if the dnode for the requested object is not
 622          * already cached, we will do a *synchronous* read in the
 623          * dnode_hold() call.  The same is true for any indirects.
 624          */
 625         err = dnode_hold(os, object, FTAG, &dn);
 626         if (err != 0)
 627                 return;
 628 
 629         rw_enter(&dn->dn_struct_rwlock, RW_READER);
 630         /*
 631          * offset + len - 1 is the last byte we want to prefetch for, and offset
 632          * is the first.  Then dbuf_whichblk(dn, level, off + len - 1) is the
 633          * last block we want to prefetch, and dbuf_whichblock(dn, level,
 634          * offset)  is the first.  Then the number we need to prefetch is the
 635          * last - first + 1.
 636          */
 637         if (level > 0 || dn->dn_datablkshift != 0) {
 638                 nblks = dbuf_whichblock(dn, level, offset + len - 1) -
 639                     dbuf_whichblock(dn, level, offset) + 1;
 640         } else {
 641                 nblks = (offset < dn->dn_datablksz);
 642         }
 643 
 644         if (nblks != 0) {
 645                 blkid = dbuf_whichblock(dn, level, offset);
 646                 for (int i = 0; i < nblks; i++)
 647                         dbuf_prefetch(dn, level, blkid + i, pri, 0);
 648         }
 649 
 650         rw_exit(&dn->dn_struct_rwlock);
 651 
 652         dnode_rele(dn, FTAG);
 653 }
 654 
 655 /*
 656  * Get the next "chunk" of file data to free.  We traverse the file from
 657  * the end so that the file gets shorter over time (if we crashes in the
 658  * middle, this will leave us in a better state).  We find allocated file
 659  * data by simply searching the allocated level 1 indirects.
 660  *
 661  * On input, *start should be the first offset that does not need to be
 662  * freed (e.g. "offset + length").  On return, *start will be the first
 663  * offset that should be freed.
 664  */
 665 static int
 666 get_next_chunk(dnode_t *dn, uint64_t *start, uint64_t minimum)
 667 {
 668         uint64_t maxblks = DMU_MAX_ACCESS >> (dn->dn_indblkshift + 1);
 669         /* bytes of data covered by a level-1 indirect block */
 670         uint64_t iblkrange =
 671             dn->dn_datablksz * EPB(dn->dn_indblkshift, SPA_BLKPTRSHIFT);
 672 
 673         ASSERT3U(minimum, <=, *start);
 674 
 675         if (*start - minimum <= iblkrange * maxblks) {
 676                 *start = minimum;
 677                 return (0);
 678         }
 679         ASSERT(ISP2(iblkrange));
 680 
 681         for (uint64_t blks = 0; *start > minimum && blks < maxblks; blks++) {
 682                 int err;
 683 
 684                 /*
 685                  * dnode_next_offset(BACKWARDS) will find an allocated L1
 686                  * indirect block at or before the input offset.  We must
 687                  * decrement *start so that it is at the end of the region
 688                  * to search.
 689                  */
 690                 (*start)--;
 691                 err = dnode_next_offset(dn,
 692                     DNODE_FIND_BACKWARDS, start, 2, 1, 0);
 693 
 694                 /* if there are no indirect blocks before start, we are done */
 695                 if (err == ESRCH) {
 696                         *start = minimum;
 697                         break;
 698                 } else if (err != 0) {
 699                         return (err);
 700                 }
 701 
 702                 /* set start to the beginning of this L1 indirect */
 703                 *start = P2ALIGN(*start, iblkrange);
 704         }
 705         if (*start < minimum)
 706                 *start = minimum;
 707         return (0);
 708 }
 709 
 710 /*
 711  * If this dnode is in the ZFS object set
 712  * return true if vfs's unmounted flag is set or the
 713  * zfsvfs is currently suspended, otherwise return false.
 714  */
 715 /*ARGSUSED*/
 716 static boolean_t
 717 dmu_dnode_fs_unmounting_or_suspended(dnode_t *freeing_dn)
 718 {
 719 #ifdef _KERNEL
 720         boolean_t busy = B_FALSE;
 721         objset_t *os = freeing_dn->dn_objset;
 722         zfsvfs_t *zfsvfs;
 723 
 724         if (dmu_objset_type(os) == DMU_OST_ZFS) {
 725                 mutex_enter(&os->os_user_ptr_lock);
 726                 zfsvfs = dmu_objset_get_user(os);
 727                 if (zfsvfs != NULL && zfsvfs->z_vfs != NULL &&
 728                     ((zfsvfs->z_vfs->vfs_flag & VFS_UNMOUNTED) ||
 729                      zfsvfs->z_busy))
 730                         busy = B_TRUE;
 731                 mutex_exit(&os->os_user_ptr_lock);
 732         }
 733 
 734         return (busy);
 735 #else
 736         return (B_FALSE);
 737 #endif
 738 }
 739 
 740 static int
 741 dmu_free_long_range_impl(objset_t *os, dnode_t *dn, uint64_t offset,
 742     uint64_t length)
 743 {
 744         uint64_t object_size = (dn->dn_maxblkid + 1) * dn->dn_datablksz;
 745         int err;
 746         uint64_t dirty_frees_threshold;
 747         dsl_pool_t *dp = dmu_objset_pool(os);
 748 
 749         if (offset >= object_size)
 750                 return (0);
 751 
 752         if (zfs_per_txg_dirty_frees_percent <= 100)
 753                 dirty_frees_threshold =
 754                     zfs_per_txg_dirty_frees_percent * zfs_dirty_data_max / 100;
 755         else
 756                 dirty_frees_threshold = zfs_dirty_data_max / 4;
 757 
 758         if (length == DMU_OBJECT_END && offset == 0)
 759                 dnode_evict_dbufs(dn, 0);
 760 
 761         if (length == DMU_OBJECT_END || offset + length > object_size)
 762                 length = object_size - offset;
 763 
 764         mutex_enter(&dp->dp_lock);
 765         dp->dp_long_freeing_total += length;
 766         mutex_exit(&dp->dp_lock);
 767 
 768         while (length != 0) {
 769                 uint64_t chunk_end, chunk_begin, chunk_len;
 770                 uint64_t long_free_dirty_all_txgs = 0;
 771                 dmu_tx_t *tx;
 772 
 773                 if (dmu_dnode_fs_unmounting_or_suspended(dn)) {
 774                         mutex_enter(&dp->dp_lock);
 775                         dp->dp_long_freeing_total -= length;
 776                         mutex_exit(&dp->dp_lock);
 777 
 778                         return (SET_ERROR(EINTR));
 779                 }
 780 
 781                 chunk_end = chunk_begin = offset + length;
 782 
 783                 /* move chunk_begin backwards to the beginning of this chunk */
 784                 err = get_next_chunk(dn, &chunk_begin, offset);
 785                 if (err)
 786                         return (err);
 787                 ASSERT3U(chunk_begin, >=, offset);
 788                 ASSERT3U(chunk_begin, <=, chunk_end);
 789 
 790                 chunk_len = chunk_end - chunk_begin;
 791 
 792                 mutex_enter(&dp->dp_lock);
 793                 for (int t = 0; t < TXG_SIZE; t++) {
 794                         long_free_dirty_all_txgs +=
 795                             dp->dp_long_free_dirty_pertxg[t];
 796                 }
 797                 mutex_exit(&dp->dp_lock);
 798 
 799                 /*
 800                  * To avoid filling up a TXG with just frees wait for
 801                  * the next TXG to open before freeing more chunks if
 802                  * we have reached the threshold of frees
 803                  */
 804                 if (dirty_frees_threshold != 0 &&
 805                     long_free_dirty_all_txgs >= dirty_frees_threshold) {
 806                         txg_wait_open(dp, 0);
 807                         continue;
 808                 }
 809 
 810                 tx = dmu_tx_create(os);
 811                 dmu_tx_hold_free(tx, dn->dn_object, chunk_begin, chunk_len);
 812 
 813                 /*
 814                  * Mark this transaction as typically resulting in a net
 815                  * reduction in space used.
 816                  */
 817                 dmu_tx_mark_netfree(tx);
 818                 err = dmu_tx_assign(tx, TXG_WAIT);
 819                 if (err) {
 820                         dmu_tx_abort(tx);
 821                         mutex_enter(&dp->dp_lock);
 822                         dp->dp_long_freeing_total -= length - chunk_len;
 823                         mutex_exit(&dp->dp_lock);
 824                         return (err);
 825                 }
 826 
 827                 mutex_enter(&dp->dp_lock);
 828                 dp->dp_long_free_dirty_pertxg[dmu_tx_get_txg(tx) & TXG_MASK] +=
 829                     chunk_len;
 830                 mutex_exit(&dp->dp_lock);
 831                 DTRACE_PROBE3(free__long__range,
 832                     uint64_t, long_free_dirty_all_txgs, uint64_t, chunk_len,
 833                     uint64_t, dmu_tx_get_txg(tx));
 834                 dnode_free_range(dn, chunk_begin, chunk_len, tx);
 835                 dmu_tx_commit(tx);
 836 
 837                 length -= chunk_len;
 838         }
 839         return (0);
 840 }
 841 
 842 int
 843 dmu_free_long_range(objset_t *os, uint64_t object,
 844     uint64_t offset, uint64_t length)
 845 {
 846         dnode_t *dn;
 847         int err;
 848 
 849         err = dnode_hold(os, object, FTAG, &dn);
 850         if (err != 0)
 851                 return (err);
 852         err = dmu_free_long_range_impl(os, dn, offset, length);
 853 
 854         /*
 855          * It is important to zero out the maxblkid when freeing the entire
 856          * file, so that (a) subsequent calls to dmu_free_long_range_impl()
 857          * will take the fast path, and (b) dnode_reallocate() can verify
 858          * that the entire file has been freed.
 859          */
 860         if (err == 0 && offset == 0 && length == DMU_OBJECT_END)
 861                 dn->dn_maxblkid = 0;
 862 
 863         dnode_rele(dn, FTAG);
 864         return (err);
 865 }
 866 
 867 int
 868 dmu_free_long_object(objset_t *os, uint64_t object)
 869 {
 870         dmu_tx_t *tx;
 871         int err;
 872 
 873         err = dmu_free_long_range(os, object, 0, DMU_OBJECT_END);
 874         if (err != 0)
 875                 return (err);
 876 
 877         tx = dmu_tx_create(os);
 878         dmu_tx_hold_bonus(tx, object);
 879         dmu_tx_hold_free(tx, object, 0, DMU_OBJECT_END);
 880         dmu_tx_mark_netfree(tx);
 881         err = dmu_tx_assign(tx, TXG_WAIT);
 882         if (err == 0) {
 883                 err = dmu_object_free(os, object, tx);
 884                 dmu_tx_commit(tx);
 885         } else {
 886                 dmu_tx_abort(tx);
 887         }
 888 
 889         return (err);
 890 }
 891 
 892 int
 893 dmu_free_range(objset_t *os, uint64_t object, uint64_t offset,
 894     uint64_t size, dmu_tx_t *tx)
 895 {
 896         dnode_t *dn;
 897         int err = dnode_hold(os, object, FTAG, &dn);
 898         if (err)
 899                 return (err);
 900         ASSERT(offset < UINT64_MAX);
 901         ASSERT(size == -1ULL || size <= UINT64_MAX - offset);
 902         dnode_free_range(dn, offset, size, tx);
 903         dnode_rele(dn, FTAG);
 904         return (0);
 905 }
 906 
 907 static int
 908 dmu_read_impl(dnode_t *dn, uint64_t offset, uint64_t size,
 909     void *buf, uint32_t flags)
 910 {
 911         dmu_buf_t **dbp;
 912         int numbufs, err = 0;
 913 
 914         /*
 915          * Deal with odd block sizes, where there can't be data past the first
 916          * block.  If we ever do the tail block optimization, we will need to
 917          * handle that here as well.
 918          */
 919         if (dn->dn_maxblkid == 0) {
 920                 int newsz = offset > dn->dn_datablksz ? 0 :
 921                     MIN(size, dn->dn_datablksz - offset);
 922                 bzero((char *)buf + newsz, size - newsz);
 923                 size = newsz;
 924         }
 925 
 926         while (size > 0) {
 927                 uint64_t mylen = MIN(size, DMU_MAX_ACCESS / 2);
 928                 int i;
 929 
 930                 /*
 931                  * NB: we could do this block-at-a-time, but it's nice
 932                  * to be reading in parallel.
 933                  */
 934                 err = dmu_buf_hold_array_by_dnode(dn, offset, mylen,
 935                     TRUE, FTAG, &numbufs, &dbp, flags);
 936                 if (err)
 937                         break;
 938 
 939                 for (i = 0; i < numbufs; i++) {
 940                         int tocpy;
 941                         int bufoff;
 942                         dmu_buf_t *db = dbp[i];
 943 
 944                         ASSERT(size > 0);
 945 
 946                         bufoff = offset - db->db_offset;
 947                         tocpy = (int)MIN(db->db_size - bufoff, size);
 948 
 949                         bcopy((char *)db->db_data + bufoff, buf, tocpy);
 950 
 951                         offset += tocpy;
 952                         size -= tocpy;
 953                         buf = (char *)buf + tocpy;
 954                 }
 955                 dmu_buf_rele_array(dbp, numbufs, FTAG);
 956         }
 957         return (err);
 958 }
 959 
 960 int
 961 dmu_read(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
 962     void *buf, uint32_t flags)
 963 {
 964         dnode_t *dn;
 965         int err;
 966 
 967         err = dnode_hold(os, object, FTAG, &dn);
 968         if (err != 0)
 969                 return (err);
 970 
 971         err = dmu_read_impl(dn, offset, size, buf, flags);
 972         dnode_rele(dn, FTAG);
 973         return (err);
 974 }
 975 
 976 int
 977 dmu_read_by_dnode(dnode_t *dn, uint64_t offset, uint64_t size, void *buf,
 978     uint32_t flags)
 979 {
 980         return (dmu_read_impl(dn, offset, size, buf, flags));
 981 }
 982 
 983 static void
 984 dmu_write_impl(dmu_buf_t **dbp, int numbufs, uint64_t offset, uint64_t size,
 985     const void *buf, dmu_tx_t *tx)
 986 {
 987         int i;
 988 
 989         for (i = 0; i < numbufs; i++) {
 990                 int tocpy;
 991                 int bufoff;
 992                 dmu_buf_t *db = dbp[i];
 993 
 994                 ASSERT(size > 0);
 995 
 996                 bufoff = offset - db->db_offset;
 997                 tocpy = (int)MIN(db->db_size - bufoff, size);
 998 
 999                 ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
1000 
1001                 if (tocpy == db->db_size)
1002                         dmu_buf_will_fill(db, tx);
1003                 else
1004                         dmu_buf_will_dirty(db, tx);
1005 
1006                 bcopy(buf, (char *)db->db_data + bufoff, tocpy);
1007 
1008                 if (tocpy == db->db_size)
1009                         dmu_buf_fill_done(db, tx);
1010 
1011                 offset += tocpy;
1012                 size -= tocpy;
1013                 buf = (char *)buf + tocpy;
1014         }
1015 }
1016 
1017 void
1018 dmu_write(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
1019     const void *buf, dmu_tx_t *tx)
1020 {
1021         dmu_buf_t **dbp;
1022         int numbufs;
1023 
1024         if (size == 0)
1025                 return;
1026 
1027         VERIFY0(dmu_buf_hold_array(os, object, offset, size,
1028             FALSE, FTAG, &numbufs, &dbp));
1029         dmu_write_impl(dbp, numbufs, offset, size, buf, tx);
1030         dmu_buf_rele_array(dbp, numbufs, FTAG);
1031 }
1032 
1033 void
1034 dmu_write_by_dnode(dnode_t *dn, uint64_t offset, uint64_t size,
1035     const void *buf, dmu_tx_t *tx)
1036 {
1037         dmu_buf_t **dbp;
1038         int numbufs;
1039 
1040         if (size == 0)
1041                 return;
1042 
1043         VERIFY0(dmu_buf_hold_array_by_dnode(dn, offset, size,
1044             FALSE, FTAG, &numbufs, &dbp, DMU_READ_PREFETCH));
1045         dmu_write_impl(dbp, numbufs, offset, size, buf, tx);
1046         dmu_buf_rele_array(dbp, numbufs, FTAG);
1047 }
1048 
1049 void
1050 dmu_prealloc(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
1051     dmu_tx_t *tx)
1052 {
1053         dmu_buf_t **dbp;
1054         int numbufs, i;
1055 
1056         if (size == 0)
1057                 return;
1058 
1059         VERIFY(0 == dmu_buf_hold_array(os, object, offset, size,
1060             FALSE, FTAG, &numbufs, &dbp));
1061 
1062         for (i = 0; i < numbufs; i++) {
1063                 dmu_buf_t *db = dbp[i];
1064 
1065                 dmu_buf_will_not_fill(db, tx);
1066         }
1067         dmu_buf_rele_array(dbp, numbufs, FTAG);
1068 }
1069 
1070 void
1071 dmu_write_embedded(objset_t *os, uint64_t object, uint64_t offset,
1072     void *data, uint8_t etype, uint8_t comp, int uncompressed_size,
1073     int compressed_size, int byteorder, dmu_tx_t *tx)
1074 {
1075         dmu_buf_t *db;
1076 
1077         ASSERT3U(etype, <, NUM_BP_EMBEDDED_TYPES);
1078         ASSERT3U(comp, <, ZIO_COMPRESS_FUNCTIONS);
1079         VERIFY0(dmu_buf_hold_noread(os, object, offset,
1080             FTAG, &db));
1081 
1082         dmu_buf_write_embedded(db,
1083             data, (bp_embedded_type_t)etype, (enum zio_compress)comp,
1084             uncompressed_size, compressed_size, byteorder, tx);
1085 
1086         dmu_buf_rele(db, FTAG);
1087 }
1088 
1089 /*
1090  * DMU support for xuio
1091  */
1092 kstat_t *xuio_ksp = NULL;
1093 
1094 int
1095 dmu_xuio_init(xuio_t *xuio, int nblk)
1096 {
1097         dmu_xuio_t *priv;
1098         uio_t *uio = &xuio->xu_uio;
1099 
1100         uio->uio_iovcnt = nblk;
1101         uio->uio_iov = kmem_zalloc(nblk * sizeof (iovec_t), KM_SLEEP);
1102 
1103         priv = kmem_zalloc(sizeof (dmu_xuio_t), KM_SLEEP);
1104         priv->cnt = nblk;
1105         priv->bufs = kmem_zalloc(nblk * sizeof (arc_buf_t *), KM_SLEEP);
1106         priv->iovp = uio->uio_iov;
1107         XUIO_XUZC_PRIV(xuio) = priv;
1108 
1109         if (XUIO_XUZC_RW(xuio) == UIO_READ)
1110                 XUIOSTAT_INCR(xuiostat_onloan_rbuf, nblk);
1111         else
1112                 XUIOSTAT_INCR(xuiostat_onloan_wbuf, nblk);
1113 
1114         return (0);
1115 }
1116 
1117 void
1118 dmu_xuio_fini(xuio_t *xuio)
1119 {
1120         dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
1121         int nblk = priv->cnt;
1122 
1123         kmem_free(priv->iovp, nblk * sizeof (iovec_t));
1124         kmem_free(priv->bufs, nblk * sizeof (arc_buf_t *));
1125         kmem_free(priv, sizeof (dmu_xuio_t));
1126 
1127         if (XUIO_XUZC_RW(xuio) == UIO_READ)
1128                 XUIOSTAT_INCR(xuiostat_onloan_rbuf, -nblk);
1129         else
1130                 XUIOSTAT_INCR(xuiostat_onloan_wbuf, -nblk);
1131 }
1132 
1133 /*
1134  * Initialize iov[priv->next] and priv->bufs[priv->next] with { off, n, abuf }
1135  * and increase priv->next by 1.
1136  */
1137 int
1138 dmu_xuio_add(xuio_t *xuio, arc_buf_t *abuf, offset_t off, size_t n)
1139 {
1140         struct iovec *iov;
1141         uio_t *uio = &xuio->xu_uio;
1142         dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
1143         int i = priv->next++;
1144 
1145         ASSERT(i < priv->cnt);
1146         ASSERT(off + n <= arc_buf_lsize(abuf));
1147         iov = uio->uio_iov + i;
1148         iov->iov_base = (char *)abuf->b_data + off;
1149         iov->iov_len = n;
1150         priv->bufs[i] = abuf;
1151         return (0);
1152 }
1153 
1154 int
1155 dmu_xuio_cnt(xuio_t *xuio)
1156 {
1157         dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
1158         return (priv->cnt);
1159 }
1160 
1161 arc_buf_t *
1162 dmu_xuio_arcbuf(xuio_t *xuio, int i)
1163 {
1164         dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
1165 
1166         ASSERT(i < priv->cnt);
1167         return (priv->bufs[i]);
1168 }
1169 
1170 void
1171 dmu_xuio_clear(xuio_t *xuio, int i)
1172 {
1173         dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
1174 
1175         ASSERT(i < priv->cnt);
1176         priv->bufs[i] = NULL;
1177 }
1178 
1179 static void
1180 xuio_stat_init(void)
1181 {
1182         xuio_ksp = kstat_create("zfs", 0, "xuio_stats", "misc",
1183             KSTAT_TYPE_NAMED, sizeof (xuio_stats) / sizeof (kstat_named_t),
1184             KSTAT_FLAG_VIRTUAL);
1185         if (xuio_ksp != NULL) {
1186                 xuio_ksp->ks_data = &xuio_stats;
1187                 kstat_install(xuio_ksp);
1188         }
1189 }
1190 
1191 static void
1192 xuio_stat_fini(void)
1193 {
1194         if (xuio_ksp != NULL) {
1195                 kstat_delete(xuio_ksp);
1196                 xuio_ksp = NULL;
1197         }
1198 }
1199 
1200 void
1201 xuio_stat_wbuf_copied(void)
1202 {
1203         XUIOSTAT_BUMP(xuiostat_wbuf_copied);
1204 }
1205 
1206 void
1207 xuio_stat_wbuf_nocopy(void)
1208 {
1209         XUIOSTAT_BUMP(xuiostat_wbuf_nocopy);
1210 }
1211 
1212 #ifdef _KERNEL
1213 static int
1214 dmu_read_uio_dnode(dnode_t *dn, uio_t *uio, uint64_t size)
1215 {
1216         dmu_buf_t **dbp;
1217         int numbufs, i, err;
1218         xuio_t *xuio = NULL;
1219 
1220         /*
1221          * NB: we could do this block-at-a-time, but it's nice
1222          * to be reading in parallel.
1223          */
1224         err = dmu_buf_hold_array_by_dnode(dn, uio->uio_loffset, size,
1225             TRUE, FTAG, &numbufs, &dbp, 0);
1226         if (err)
1227                 return (err);
1228 
1229         if (uio->uio_extflg == UIO_XUIO)
1230                 xuio = (xuio_t *)uio;
1231 
1232         for (i = 0; i < numbufs; i++) {
1233                 int tocpy;
1234                 int bufoff;
1235                 dmu_buf_t *db = dbp[i];
1236 
1237                 ASSERT(size > 0);
1238 
1239                 bufoff = uio->uio_loffset - db->db_offset;
1240                 tocpy = (int)MIN(db->db_size - bufoff, size);
1241 
1242                 if (xuio) {
1243                         dmu_buf_impl_t *dbi = (dmu_buf_impl_t *)db;
1244                         arc_buf_t *dbuf_abuf = dbi->db_buf;
1245                         arc_buf_t *abuf = dbuf_loan_arcbuf(dbi);
1246                         err = dmu_xuio_add(xuio, abuf, bufoff, tocpy);
1247                         if (!err) {
1248                                 uio->uio_resid -= tocpy;
1249                                 uio->uio_loffset += tocpy;
1250                         }
1251 
1252                         if (abuf == dbuf_abuf)
1253                                 XUIOSTAT_BUMP(xuiostat_rbuf_nocopy);
1254                         else
1255                                 XUIOSTAT_BUMP(xuiostat_rbuf_copied);
1256                 } else {
1257                         err = uiomove((char *)db->db_data + bufoff, tocpy,
1258                             UIO_READ, uio);
1259                 }
1260                 if (err)
1261                         break;
1262 
1263                 size -= tocpy;
1264         }
1265         dmu_buf_rele_array(dbp, numbufs, FTAG);
1266 
1267         return (err);
1268 }
1269 
1270 /*
1271  * Read 'size' bytes into the uio buffer.
1272  * From object zdb->db_object.
1273  * Starting at offset uio->uio_loffset.
1274  *
1275  * If the caller already has a dbuf in the target object
1276  * (e.g. its bonus buffer), this routine is faster than dmu_read_uio(),
1277  * because we don't have to find the dnode_t for the object.
1278  */
1279 int
1280 dmu_read_uio_dbuf(dmu_buf_t *zdb, uio_t *uio, uint64_t size)
1281 {
1282         dmu_buf_impl_t *db = (dmu_buf_impl_t *)zdb;
1283         dnode_t *dn;
1284         int err;
1285 
1286         if (size == 0)
1287                 return (0);
1288 
1289         DB_DNODE_ENTER(db);
1290         dn = DB_DNODE(db);
1291         err = dmu_read_uio_dnode(dn, uio, size);
1292         DB_DNODE_EXIT(db);
1293 
1294         return (err);
1295 }
1296 
1297 /*
1298  * Read 'size' bytes into the uio buffer.
1299  * From the specified object
1300  * Starting at offset uio->uio_loffset.
1301  */
1302 int
1303 dmu_read_uio(objset_t *os, uint64_t object, uio_t *uio, uint64_t size)
1304 {
1305         dnode_t *dn;
1306         int err;
1307 
1308         if (size == 0)
1309                 return (0);
1310 
1311         err = dnode_hold(os, object, FTAG, &dn);
1312         if (err)
1313                 return (err);
1314 
1315         err = dmu_read_uio_dnode(dn, uio, size);
1316 
1317         dnode_rele(dn, FTAG);
1318 
1319         return (err);
1320 }
1321 
1322 static int
1323 dmu_write_uio_dnode(dnode_t *dn, uio_t *uio, uint64_t size, dmu_tx_t *tx)
1324 {
1325         dmu_buf_t **dbp;
1326         int numbufs;
1327         int err = 0;
1328         int i;
1329 
1330         err = dmu_buf_hold_array_by_dnode(dn, uio->uio_loffset, size,
1331             FALSE, FTAG, &numbufs, &dbp, DMU_READ_PREFETCH);
1332         if (err)
1333                 return (err);
1334 
1335         for (i = 0; i < numbufs; i++) {
1336                 int tocpy;
1337                 int bufoff;
1338                 dmu_buf_t *db = dbp[i];
1339 
1340                 ASSERT(size > 0);
1341 
1342                 bufoff = uio->uio_loffset - db->db_offset;
1343                 tocpy = (int)MIN(db->db_size - bufoff, size);
1344 
1345                 ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
1346 
1347                 if (tocpy == db->db_size)
1348                         dmu_buf_will_fill(db, tx);
1349                 else
1350                         dmu_buf_will_dirty(db, tx);
1351 
1352                 /*
1353                  * XXX uiomove could block forever (eg. nfs-backed
1354                  * pages).  There needs to be a uiolockdown() function
1355                  * to lock the pages in memory, so that uiomove won't
1356                  * block.
1357                  */
1358                 err = uiomove((char *)db->db_data + bufoff, tocpy,
1359                     UIO_WRITE, uio);
1360 
1361                 if (tocpy == db->db_size)
1362                         dmu_buf_fill_done(db, tx);
1363 
1364                 if (err)
1365                         break;
1366 
1367                 size -= tocpy;
1368         }
1369 
1370         dmu_buf_rele_array(dbp, numbufs, FTAG);
1371         return (err);
1372 }
1373 
1374 /*
1375  * Write 'size' bytes from the uio buffer.
1376  * To object zdb->db_object.
1377  * Starting at offset uio->uio_loffset.
1378  *
1379  * If the caller already has a dbuf in the target object
1380  * (e.g. its bonus buffer), this routine is faster than dmu_write_uio(),
1381  * because we don't have to find the dnode_t for the object.
1382  */
1383 int
1384 dmu_write_uio_dbuf(dmu_buf_t *zdb, uio_t *uio, uint64_t size,
1385     dmu_tx_t *tx)
1386 {
1387         dmu_buf_impl_t *db = (dmu_buf_impl_t *)zdb;
1388         dnode_t *dn;
1389         int err;
1390 
1391         if (size == 0)
1392                 return (0);
1393 
1394         DB_DNODE_ENTER(db);
1395         dn = DB_DNODE(db);
1396         err = dmu_write_uio_dnode(dn, uio, size, tx);
1397         DB_DNODE_EXIT(db);
1398 
1399         return (err);
1400 }
1401 
1402 /*
1403  * Write 'size' bytes from the uio buffer.
1404  * To the specified object.
1405  * Starting at offset uio->uio_loffset.
1406  */
1407 int
1408 dmu_write_uio(objset_t *os, uint64_t object, uio_t *uio, uint64_t size,
1409     dmu_tx_t *tx)
1410 {
1411         dnode_t *dn;
1412         int err;
1413 
1414         if (size == 0)
1415                 return (0);
1416 
1417         err = dnode_hold(os, object, FTAG, &dn);
1418         if (err)
1419                 return (err);
1420 
1421         err = dmu_write_uio_dnode(dn, uio, size, tx);
1422 
1423         dnode_rele(dn, FTAG);
1424 
1425         return (err);
1426 }
1427 
1428 int
1429 dmu_write_pages(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
1430     page_t *pp, dmu_tx_t *tx)
1431 {
1432         dmu_buf_t **dbp;
1433         int numbufs, i;
1434         int err;
1435 
1436         if (size == 0)
1437                 return (0);
1438 
1439         err = dmu_buf_hold_array(os, object, offset, size,
1440             FALSE, FTAG, &numbufs, &dbp);
1441         if (err)
1442                 return (err);
1443 
1444         for (i = 0; i < numbufs; i++) {
1445                 int tocpy, copied, thiscpy;
1446                 int bufoff;
1447                 dmu_buf_t *db = dbp[i];
1448                 caddr_t va;
1449 
1450                 ASSERT(size > 0);
1451                 ASSERT3U(db->db_size, >=, PAGESIZE);
1452 
1453                 bufoff = offset - db->db_offset;
1454                 tocpy = (int)MIN(db->db_size - bufoff, size);
1455 
1456                 ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
1457 
1458                 if (tocpy == db->db_size)
1459                         dmu_buf_will_fill(db, tx);
1460                 else
1461                         dmu_buf_will_dirty(db, tx);
1462 
1463                 for (copied = 0; copied < tocpy; copied += PAGESIZE) {
1464                         ASSERT3U(pp->p_offset, ==, db->db_offset + bufoff);
1465                         thiscpy = MIN(PAGESIZE, tocpy - copied);
1466                         va = zfs_map_page(pp, S_READ);
1467                         bcopy(va, (char *)db->db_data + bufoff, thiscpy);
1468                         zfs_unmap_page(pp, va);
1469                         pp = pp->p_next;
1470                         bufoff += PAGESIZE;
1471                 }
1472 
1473                 if (tocpy == db->db_size)
1474                         dmu_buf_fill_done(db, tx);
1475 
1476                 offset += tocpy;
1477                 size -= tocpy;
1478         }
1479         dmu_buf_rele_array(dbp, numbufs, FTAG);
1480         return (err);
1481 }
1482 #endif
1483 
1484 /*
1485  * Allocate a loaned anonymous arc buffer.
1486  */
1487 arc_buf_t *
1488 dmu_request_arcbuf(dmu_buf_t *handle, int size)
1489 {
1490         dmu_buf_impl_t *db = (dmu_buf_impl_t *)handle;
1491 
1492         return (arc_loan_buf(db->db_objset->os_spa, B_FALSE, size));
1493 }
1494 
1495 /*
1496  * Free a loaned arc buffer.
1497  */
1498 void
1499 dmu_return_arcbuf(arc_buf_t *buf)
1500 {
1501         arc_return_buf(buf, FTAG);
1502         arc_buf_destroy(buf, FTAG);
1503 }
1504 
1505 /*
1506  * When possible directly assign passed loaned arc buffer to a dbuf.
1507  * If this is not possible copy the contents of passed arc buf via
1508  * dmu_write().
1509  */
1510 void
1511 dmu_assign_arcbuf(dmu_buf_t *handle, uint64_t offset, arc_buf_t *buf,
1512     dmu_tx_t *tx)
1513 {
1514         dmu_buf_impl_t *dbuf = (dmu_buf_impl_t *)handle;
1515         dnode_t *dn;
1516         dmu_buf_impl_t *db;
1517         uint32_t blksz = (uint32_t)arc_buf_lsize(buf);
1518         uint64_t blkid;
1519 
1520         DB_DNODE_ENTER(dbuf);
1521         dn = DB_DNODE(dbuf);
1522         rw_enter(&dn->dn_struct_rwlock, RW_READER);
1523         blkid = dbuf_whichblock(dn, 0, offset);
1524         VERIFY((db = dbuf_hold(dn, blkid, FTAG)) != NULL);
1525         rw_exit(&dn->dn_struct_rwlock);
1526         DB_DNODE_EXIT(dbuf);
1527 
1528         /*
1529          * We can only assign if the offset is aligned, the arc buf is the
1530          * same size as the dbuf, and the dbuf is not metadata.
1531          */
1532         if (offset == db->db.db_offset && blksz == db->db.db_size) {
1533                 dbuf_assign_arcbuf(db, buf, tx);
1534                 dbuf_rele(db, FTAG);
1535         } else {
1536                 objset_t *os;
1537                 uint64_t object;
1538 
1539                 /* compressed bufs must always be assignable to their dbuf */
1540                 ASSERT3U(arc_get_compression(buf), ==, ZIO_COMPRESS_OFF);
1541                 ASSERT(!(buf->b_flags & ARC_BUF_FLAG_COMPRESSED));
1542 
1543                 DB_DNODE_ENTER(dbuf);
1544                 dn = DB_DNODE(dbuf);
1545                 os = dn->dn_objset;
1546                 object = dn->dn_object;
1547                 DB_DNODE_EXIT(dbuf);
1548 
1549                 dbuf_rele(db, FTAG);
1550                 dmu_write(os, object, offset, blksz, buf->b_data, tx);
1551                 dmu_return_arcbuf(buf);
1552                 XUIOSTAT_BUMP(xuiostat_wbuf_copied);
1553         }
1554 }
1555 
1556 typedef struct {
1557         dbuf_dirty_record_t     *dsa_dr;
1558         dmu_sync_cb_t           *dsa_done;
1559         zgd_t                   *dsa_zgd;
1560         dmu_tx_t                *dsa_tx;
1561 } dmu_sync_arg_t;
1562 
1563 /* ARGSUSED */
1564 static void
1565 dmu_sync_ready(zio_t *zio, arc_buf_t *buf, void *varg)
1566 {
1567         dmu_sync_arg_t *dsa = varg;
1568         dmu_buf_t *db = dsa->dsa_zgd->zgd_db;
1569         blkptr_t *bp = zio->io_bp;
1570 
1571         if (zio->io_error == 0) {
1572                 if (BP_IS_HOLE(bp)) {
1573                         /*
1574                          * A block of zeros may compress to a hole, but the
1575                          * block size still needs to be known for replay.
1576                          */
1577                         BP_SET_LSIZE(bp, db->db_size);
1578                 } else if (!BP_IS_EMBEDDED(bp)) {
1579                         ASSERT(BP_GET_LEVEL(bp) == 0);
1580                         bp->blk_fill = 1;
1581                 }
1582         }
1583 }
1584 
1585 static void
1586 dmu_sync_late_arrival_ready(zio_t *zio)
1587 {
1588         dmu_sync_ready(zio, NULL, zio->io_private);
1589 }
1590 
1591 /* ARGSUSED */
1592 static void
1593 dmu_sync_done(zio_t *zio, arc_buf_t *buf, void *varg)
1594 {
1595         dmu_sync_arg_t *dsa = varg;
1596         dbuf_dirty_record_t *dr = dsa->dsa_dr;
1597         dmu_buf_impl_t *db = dr->dr_dbuf;
1598         zgd_t *zgd = dsa->dsa_zgd;
1599 
1600         /*
1601          * Record the vdev(s) backing this blkptr so they can be flushed after
1602          * the writes for the lwb have completed.
1603          */
1604         if (zio->io_error == 0) {
1605                 zil_lwb_add_block(zgd->zgd_lwb, zgd->zgd_bp);
1606         }
1607 
1608         mutex_enter(&db->db_mtx);
1609         ASSERT(dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC);
1610         if (zio->io_error == 0) {
1611                 dr->dt.dl.dr_nopwrite = !!(zio->io_flags & ZIO_FLAG_NOPWRITE);
1612                 if (dr->dt.dl.dr_nopwrite) {
1613                         blkptr_t *bp = zio->io_bp;
1614                         blkptr_t *bp_orig = &zio->io_bp_orig;
1615                         uint8_t chksum = BP_GET_CHECKSUM(bp_orig);
1616 
1617                         ASSERT(BP_EQUAL(bp, bp_orig));
1618                         VERIFY(BP_EQUAL(bp, db->db_blkptr));
1619                         ASSERT(zio->io_prop.zp_compress != ZIO_COMPRESS_OFF);
1620                         ASSERT(zio_checksum_table[chksum].ci_flags &
1621                             ZCHECKSUM_FLAG_NOPWRITE);
1622                 }
1623                 dr->dt.dl.dr_overridden_by = *zio->io_bp;
1624                 dr->dt.dl.dr_override_state = DR_OVERRIDDEN;
1625                 dr->dt.dl.dr_copies = zio->io_prop.zp_copies;
1626 
1627                 /*
1628                  * Old style holes are filled with all zeros, whereas
1629                  * new-style holes maintain their lsize, type, level,
1630                  * and birth time (see zio_write_compress). While we
1631                  * need to reset the BP_SET_LSIZE() call that happened
1632                  * in dmu_sync_ready for old style holes, we do *not*
1633                  * want to wipe out the information contained in new
1634                  * style holes. Thus, only zero out the block pointer if
1635                  * it's an old style hole.
1636                  */
1637                 if (BP_IS_HOLE(&dr->dt.dl.dr_overridden_by) &&
1638                     dr->dt.dl.dr_overridden_by.blk_birth == 0)
1639                         BP_ZERO(&dr->dt.dl.dr_overridden_by);
1640         } else {
1641                 dr->dt.dl.dr_override_state = DR_NOT_OVERRIDDEN;
1642         }
1643         cv_broadcast(&db->db_changed);
1644         mutex_exit(&db->db_mtx);
1645 
1646         dsa->dsa_done(dsa->dsa_zgd, zio->io_error);
1647 
1648         kmem_free(dsa, sizeof (*dsa));
1649 }
1650 
1651 static void
1652 dmu_sync_late_arrival_done(zio_t *zio)
1653 {
1654         blkptr_t *bp = zio->io_bp;
1655         dmu_sync_arg_t *dsa = zio->io_private;
1656         blkptr_t *bp_orig = &zio->io_bp_orig;
1657         zgd_t *zgd = dsa->dsa_zgd;
1658 
1659         if (zio->io_error == 0) {
1660                 /*
1661                  * Record the vdev(s) backing this blkptr so they can be
1662                  * flushed after the writes for the lwb have completed.
1663                  */
1664                 zil_lwb_add_block(zgd->zgd_lwb, zgd->zgd_bp);
1665 
1666                 if (!BP_IS_HOLE(bp)) {
1667                         ASSERT(!(zio->io_flags & ZIO_FLAG_NOPWRITE));
1668                         ASSERT(BP_IS_HOLE(bp_orig) || !BP_EQUAL(bp, bp_orig));
1669                         ASSERT(zio->io_bp->blk_birth == zio->io_txg);
1670                         ASSERT(zio->io_txg > spa_syncing_txg(zio->io_spa));
1671                         zio_free(zio->io_spa, zio->io_txg, zio->io_bp);
1672                 }
1673         }
1674 
1675         dmu_tx_commit(dsa->dsa_tx);
1676 
1677         dsa->dsa_done(dsa->dsa_zgd, zio->io_error);
1678 
1679         abd_put(zio->io_abd);
1680         kmem_free(dsa, sizeof (*dsa));
1681 }
1682 
1683 static int
1684 dmu_sync_late_arrival(zio_t *pio, objset_t *os, dmu_sync_cb_t *done, zgd_t *zgd,
1685     zio_prop_t *zp, zbookmark_phys_t *zb, const zio_smartcomp_info_t *sc)
1686 {
1687         dmu_sync_arg_t *dsa;
1688         dmu_tx_t *tx;
1689 
1690         tx = dmu_tx_create(os);
1691         dmu_tx_hold_space(tx, zgd->zgd_db->db_size);
1692         if (dmu_tx_assign(tx, TXG_WAIT) != 0) {
1693                 dmu_tx_abort(tx);
1694                 /* Make zl_get_data do txg_waited_synced() */
1695                 return (SET_ERROR(EIO));
1696         }
1697 
1698         /*
1699          * In order to prevent the zgd's lwb from being free'd prior to
1700          * dmu_sync_late_arrival_done() being called, we have to ensure
1701          * the lwb's "max txg" takes this tx's txg into account.
1702          */
1703         zil_lwb_add_txg(zgd->zgd_lwb, dmu_tx_get_txg(tx));
1704 
1705         dsa = kmem_alloc(sizeof (dmu_sync_arg_t), KM_SLEEP);
1706         dsa->dsa_dr = NULL;
1707         dsa->dsa_done = done;
1708         dsa->dsa_zgd = zgd;
1709         dsa->dsa_tx = tx;
1710 
1711         /*
1712          * Since we are currently syncing this txg, it's nontrivial to
1713          * determine what BP to nopwrite against, so we disable nopwrite.
1714          *
1715          * When syncing, the db_blkptr is initially the BP of the previous
1716          * txg.  We can not nopwrite against it because it will be changed
1717          * (this is similar to the non-late-arrival case where the dbuf is
1718          * dirty in a future txg).
1719          *
1720          * Then dbuf_write_ready() sets bp_blkptr to the location we will write.
1721          * We can not nopwrite against it because although the BP will not
1722          * (typically) be changed, the data has not yet been persisted to this
1723          * location.
1724          *
1725          * Finally, when dbuf_write_done() is called, it is theoretically
1726          * possible to always nopwrite, because the data that was written in
1727          * this txg is the same data that we are trying to write.  However we
1728          * would need to check that this dbuf is not dirty in any future
1729          * txg's (as we do in the normal dmu_sync() path). For simplicity, we
1730          * don't nopwrite in this case.
1731          */
1732         zp->zp_nopwrite = B_FALSE;
1733 
1734         zio_nowait(zio_write(pio, os->os_spa, dmu_tx_get_txg(tx), zgd->zgd_bp,
1735             abd_get_from_buf(zgd->zgd_db->db_data, zgd->zgd_db->db_size),
1736             zgd->zgd_db->db_size, zgd->zgd_db->db_size, zp,
1737             dmu_sync_late_arrival_ready, NULL, NULL, dmu_sync_late_arrival_done,
1738             dsa, ZIO_PRIORITY_SYNC_WRITE, ZIO_FLAG_CANFAIL, zb, sc));
1739 
1740         return (0);
1741 }
1742 
1743 /*
1744  * Intent log support: sync the block associated with db to disk.
1745  * N.B. and XXX: the caller is responsible for making sure that the
1746  * data isn't changing while dmu_sync() is writing it.
1747  *
1748  * Return values:
1749  *
1750  *      EEXIST: this txg has already been synced, so there's nothing to do.
1751  *              The caller should not log the write.
1752  *
1753  *      ENOENT: the block was dbuf_free_range()'d, so there's nothing to do.
1754  *              The caller should not log the write.
1755  *
1756  *      EALREADY: this block is already in the process of being synced.
1757  *              The caller should track its progress (somehow).
1758  *
1759  *      EIO: could not do the I/O.
1760  *              The caller should do a txg_wait_synced().
1761  *
1762  *      0: the I/O has been initiated.
1763  *              The caller should log this blkptr in the done callback.
1764  *              It is possible that the I/O will fail, in which case
1765  *              the error will be reported to the done callback and
1766  *              propagated to pio from zio_done().
1767  */
1768 
1769 int
1770 dmu_sync(zio_t *pio, uint64_t txg, dmu_sync_cb_t *done, zgd_t *zgd)
1771 {
1772         dmu_buf_impl_t *db = (dmu_buf_impl_t *)zgd->zgd_db;
1773         objset_t *os = db->db_objset;
1774         dsl_dataset_t *ds = os->os_dsl_dataset;
1775         dbuf_dirty_record_t *dr;
1776         dmu_sync_arg_t *dsa;
1777         zbookmark_phys_t zb;
1778         zio_prop_t zp;
1779         dnode_t *dn;
1780         int flags = 0;
1781         zio_smartcomp_info_t sc;
1782 
1783         ASSERT(pio != NULL);
1784         ASSERT(txg != 0);
1785 
1786         SET_BOOKMARK(&zb, ds->ds_object,
1787             db->db.db_object, db->db_level, db->db_blkid);
1788 
1789         /* write to special only if proper conditions hold */
1790         if (spa_write_data_to_special(os->os_spa, os))
1791                 WP_SET_SPECIALCLASS(flags, B_TRUE);
1792 
1793         DB_DNODE_ENTER(db);
1794         dn = DB_DNODE(db);
1795         dmu_write_policy(os, dn, db->db_level, flags | WP_DMU_SYNC, &zp);
1796         dnode_setup_zio_smartcomp(db, &sc);
1797         DB_DNODE_EXIT(db);
1798 
1799         /*
1800          * If we're frozen (running ziltest), we always need to generate a bp.
1801          */
1802         if (txg > spa_freeze_txg(os->os_spa))
1803                 return (dmu_sync_late_arrival(pio, os, done, zgd, &zp, &zb,
1804                     &sc));
1805 
1806         /*
1807          * Grabbing db_mtx now provides a barrier between dbuf_sync_leaf()
1808          * and us.  If we determine that this txg is not yet syncing,
1809          * but it begins to sync a moment later, that's OK because the
1810          * sync thread will block in dbuf_sync_leaf() until we drop db_mtx.
1811          */
1812         mutex_enter(&db->db_mtx);
1813 
1814         if (txg <= spa_last_synced_txg(os->os_spa)) {
1815                 /*
1816                  * This txg has already synced.  There's nothing to do.
1817                  */
1818                 mutex_exit(&db->db_mtx);
1819                 return (SET_ERROR(EEXIST));
1820         }
1821 
1822         if (txg <= spa_syncing_txg(os->os_spa)) {
1823                 /*
1824                  * This txg is currently syncing, so we can't mess with
1825                  * the dirty record anymore; just write a new log block.
1826                  */
1827                 mutex_exit(&db->db_mtx);
1828                 return (dmu_sync_late_arrival(pio, os, done, zgd, &zp, &zb,
1829                     &sc));
1830         }
1831 
1832         dr = db->db_last_dirty;
1833         while (dr && dr->dr_txg != txg)
1834                 dr = dr->dr_next;
1835 
1836         if (dr == NULL) {
1837                 /*
1838                  * There's no dr for this dbuf, so it must have been freed.
1839                  * There's no need to log writes to freed blocks, so we're done.
1840                  */
1841                 mutex_exit(&db->db_mtx);
1842                 return (SET_ERROR(ENOENT));
1843         }
1844 
1845         ASSERT(dr->dr_next == NULL || dr->dr_next->dr_txg < txg);
1846 
1847         if (db->db_blkptr != NULL) {
1848                 /*
1849                  * We need to fill in zgd_bp with the current blkptr so that
1850                  * the nopwrite code can check if we're writing the same
1851                  * data that's already on disk.  We can only nopwrite if we
1852                  * are sure that after making the copy, db_blkptr will not
1853                  * change until our i/o completes.  We ensure this by
1854                  * holding the db_mtx, and only allowing nopwrite if the
1855                  * block is not already dirty (see below).  This is verified
1856                  * by dmu_sync_done(), which VERIFYs that the db_blkptr has
1857                  * not changed.
1858                  */
1859                 *zgd->zgd_bp = *db->db_blkptr;
1860         }
1861 
1862         /*
1863          * Assume the on-disk data is X, the current syncing data (in
1864          * txg - 1) is Y, and the current in-memory data is Z (currently
1865          * in dmu_sync).
1866          *
1867          * We usually want to perform a nopwrite if X and Z are the
1868          * same.  However, if Y is different (i.e. the BP is going to
1869          * change before this write takes effect), then a nopwrite will
1870          * be incorrect - we would override with X, which could have
1871          * been freed when Y was written.
1872          *
1873          * (Note that this is not a concern when we are nop-writing from
1874          * syncing context, because X and Y must be identical, because
1875          * all previous txgs have been synced.)
1876          *
1877          * Therefore, we disable nopwrite if the current BP could change
1878          * before this TXG.  There are two ways it could change: by
1879          * being dirty (dr_next is non-NULL), or by being freed
1880          * (dnode_block_freed()).  This behavior is verified by
1881          * zio_done(), which VERIFYs that the override BP is identical
1882          * to the on-disk BP.
1883          */
1884         DB_DNODE_ENTER(db);
1885         dn = DB_DNODE(db);
1886         if (dr->dr_next != NULL || dnode_block_freed(dn, db->db_blkid))
1887                 zp.zp_nopwrite = B_FALSE;
1888         DB_DNODE_EXIT(db);
1889 
1890         ASSERT(dr->dr_txg == txg);
1891         if (dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC ||
1892             dr->dt.dl.dr_override_state == DR_OVERRIDDEN) {
1893                 /*
1894                  * We have already issued a sync write for this buffer,
1895                  * or this buffer has already been synced.  It could not
1896                  * have been dirtied since, or we would have cleared the state.
1897                  */
1898                 mutex_exit(&db->db_mtx);
1899                 return (SET_ERROR(EALREADY));
1900         }
1901 
1902         ASSERT(dr->dt.dl.dr_override_state == DR_NOT_OVERRIDDEN);
1903         dr->dt.dl.dr_override_state = DR_IN_DMU_SYNC;
1904         mutex_exit(&db->db_mtx);
1905 
1906         dsa = kmem_alloc(sizeof (dmu_sync_arg_t), KM_SLEEP);
1907         dsa->dsa_dr = dr;
1908         dsa->dsa_done = done;
1909         dsa->dsa_zgd = zgd;
1910         dsa->dsa_tx = NULL;
1911 
1912         zio_nowait(arc_write(pio, os->os_spa, txg,
1913             zgd->zgd_bp, dr->dt.dl.dr_data, DBUF_IS_L2CACHEABLE(db),
1914             &zp, dmu_sync_ready, NULL, NULL, dmu_sync_done, dsa,
1915             ZIO_PRIORITY_SYNC_WRITE, ZIO_FLAG_CANFAIL, &zb, &sc));
1916 
1917         return (0);
1918 }
1919 
1920 int
1921 dmu_object_set_blocksize(objset_t *os, uint64_t object, uint64_t size, int ibs,
1922     dmu_tx_t *tx)
1923 {
1924         dnode_t *dn;
1925         int err;
1926 
1927         err = dnode_hold(os, object, FTAG, &dn);
1928         if (err)
1929                 return (err);
1930         err = dnode_set_blksz(dn, size, ibs, tx);
1931         dnode_rele(dn, FTAG);
1932         return (err);
1933 }
1934 
1935 void
1936 dmu_object_set_checksum(objset_t *os, uint64_t object, uint8_t checksum,
1937     dmu_tx_t *tx)
1938 {
1939         dnode_t *dn;
1940 
1941         /*
1942          * Send streams include each object's checksum function.  This
1943          * check ensures that the receiving system can understand the
1944          * checksum function transmitted.
1945          */
1946         ASSERT3U(checksum, <, ZIO_CHECKSUM_LEGACY_FUNCTIONS);
1947 
1948         VERIFY0(dnode_hold(os, object, FTAG, &dn));
1949         ASSERT3U(checksum, <, ZIO_CHECKSUM_FUNCTIONS);
1950         dn->dn_checksum = checksum;
1951         dnode_setdirty(dn, tx);
1952         dnode_rele(dn, FTAG);
1953 }
1954 
1955 void
1956 dmu_object_set_compress(objset_t *os, uint64_t object, uint8_t compress,
1957     dmu_tx_t *tx)
1958 {
1959         dnode_t *dn;
1960 
1961         /*
1962          * Send streams include each object's compression function.  This
1963          * check ensures that the receiving system can understand the
1964          * compression function transmitted.
1965          */
1966         ASSERT3U(compress, <, ZIO_COMPRESS_LEGACY_FUNCTIONS);
1967 
1968         VERIFY0(dnode_hold(os, object, FTAG, &dn));
1969         dn->dn_compress = compress;
1970         dnode_setdirty(dn, tx);
1971         dnode_rele(dn, FTAG);
1972 }
1973 
1974 int zfs_mdcomp_disable = 0;
1975 
1976 /*
1977  * When the "redundant_metadata" property is set to "most", only indirect
1978  * blocks of this level and higher will have an additional ditto block.
1979  */
1980 int zfs_redundant_metadata_most_ditto_level = 2;
1981 
1982 void
1983 dmu_write_policy(objset_t *os, dnode_t *dn, int level, int wp, zio_prop_t *zp)
1984 {
1985         dmu_object_type_t type = dn ? dn->dn_type : DMU_OT_OBJSET;
1986         boolean_t ismd = (level > 0 || DMU_OT_IS_METADATA(type) ||
1987             (wp & WP_SPILL));
1988         enum zio_checksum checksum = os->os_checksum;
1989         enum zio_compress compress = os->os_compress;
1990         enum zio_checksum dedup_checksum = os->os_dedup_checksum;
1991         boolean_t dedup = B_FALSE;
1992         boolean_t nopwrite = B_FALSE;
1993         boolean_t dedup_verify = os->os_dedup_verify;
1994         int copies = os->os_copies;
1995 
1996         /*
1997          * We maintain different write policies for each of the following
1998          * types of data:
1999          *       1. metadata
2000          *       2. preallocated blocks (i.e. level-0 blocks of a dump device)
2001          *       3. all other level 0 blocks
2002          */
2003         if (ismd) {
2004                 if (zfs_mdcomp_disable) {
2005                         compress = ZIO_COMPRESS_EMPTY;
2006                 } else {
2007                         /*
2008                          * XXX -- we should design a compression algorithm
2009                          * that specializes in arrays of bps.
2010                          */
2011                         compress = zio_compress_select(os->os_spa,
2012                             ZIO_COMPRESS_ON, ZIO_COMPRESS_ON);
2013                 }
2014 
2015                 /*
2016                  * Metadata always gets checksummed.  If the data
2017                  * checksum is multi-bit correctable, and it's not a
2018                  * ZBT-style checksum, then it's suitable for metadata
2019                  * as well.  Otherwise, the metadata checksum defaults
2020                  * to fletcher4.
2021                  */
2022                 if (!(zio_checksum_table[checksum].ci_flags &
2023                     ZCHECKSUM_FLAG_METADATA) ||
2024                     (zio_checksum_table[checksum].ci_flags &
2025                     ZCHECKSUM_FLAG_EMBEDDED))
2026                         checksum = ZIO_CHECKSUM_FLETCHER_4;
2027 
2028                 if (os->os_redundant_metadata == ZFS_REDUNDANT_METADATA_ALL ||
2029                     (os->os_redundant_metadata ==
2030                     ZFS_REDUNDANT_METADATA_MOST &&
2031                     (level >= zfs_redundant_metadata_most_ditto_level ||
2032                     DMU_OT_IS_METADATA(type) || (wp & WP_SPILL))))
2033                         copies++;
2034         } else if (wp & WP_NOFILL) {
2035                 ASSERT(level == 0);
2036 
2037                 /*
2038                  * If we're writing preallocated blocks, we aren't actually
2039                  * writing them so don't set any policy properties.  These
2040                  * blocks are currently only used by an external subsystem
2041                  * outside of zfs (i.e. dump) and not written by the zio
2042                  * pipeline.
2043                  */
2044                 compress = ZIO_COMPRESS_OFF;
2045                 checksum = ZIO_CHECKSUM_NOPARITY;
2046         } else {
2047                 compress = zio_compress_select(os->os_spa, dn->dn_compress,
2048                     compress);
2049 
2050                 checksum = (dedup_checksum == ZIO_CHECKSUM_OFF) ?
2051                     zio_checksum_select(dn->dn_checksum, checksum) :
2052                     dedup_checksum;
2053 
2054                 /*
2055                  * Determine dedup setting.  If we are in dmu_sync(),
2056                  * we won't actually dedup now because that's all
2057                  * done in syncing context; but we do want to use the
2058                  * dedup checkum.  If the checksum is not strong
2059                  * enough to ensure unique signatures, force
2060                  * dedup_verify.
2061                  */
2062                 if (dedup_checksum != ZIO_CHECKSUM_OFF) {
2063                         dedup = (wp & WP_DMU_SYNC) ? B_FALSE : B_TRUE;
2064                         if (!(zio_checksum_table[checksum].ci_flags &
2065                             ZCHECKSUM_FLAG_DEDUP))
2066                                 dedup_verify = B_TRUE;
2067                 }
2068 
2069                 /*
2070                  * Enable nopwrite if we have secure enough checksum
2071                  * algorithm (see comment in zio_nop_write) and
2072                  * compression is enabled.  We don't enable nopwrite if
2073                  * dedup is enabled as the two features are mutually
2074                  * exclusive.
2075                  */
2076                 nopwrite = (!dedup && (zio_checksum_table[checksum].ci_flags &
2077                     ZCHECKSUM_FLAG_NOPWRITE) &&
2078                     compress != ZIO_COMPRESS_OFF && zfs_nopwrite_enabled);
2079         }
2080 
2081         zp->zp_usesc = WP_GET_SPECIALCLASS(wp);
2082         zp->zp_checksum = checksum;
2083         zp->zp_compress = compress;
2084         ASSERT3U(zp->zp_compress, !=, ZIO_COMPRESS_INHERIT);
2085 
2086         zp->zp_type = (wp & WP_SPILL) ? dn->dn_bonustype : type;
2087         zp->zp_level = level;
2088         zp->zp_copies = MIN(copies, spa_max_replication(os->os_spa));
2089         zp->zp_dedup = dedup;
2090         zp->zp_dedup_verify = dedup && dedup_verify;
2091         zp->zp_metadata = ismd;
2092         zp->zp_nopwrite = nopwrite;
2093         zp->zp_zpl_meta_to_special = os->os_zpl_meta_to_special;
2094         zp->zp_usewbc = (zp->zp_usesc &&
2095             os->os_wbc_mode == ZFS_WBC_MODE_ON && !ismd);
2096 
2097         /* explicitly control the number for copies for DDT */
2098         if (DMU_OT_IS_DDT_META(type) &&
2099             os->os_spa->spa_ddt_meta_copies > 0) {
2100                 zp->zp_copies =
2101                     MIN(os->os_spa->spa_ddt_meta_copies,
2102                     spa_max_replication(os->os_spa));
2103         }
2104 
2105         DTRACE_PROBE2(dmu_wp, boolean_t, zp->zp_metadata,
2106             boolean_t, zp->zp_usesc);
2107 }
2108 
2109 int
2110 dmu_offset_next(objset_t *os, uint64_t object, boolean_t hole, uint64_t *off)
2111 {
2112         dnode_t *dn;
2113         int err;
2114 
2115         /*
2116          * Sync any current changes before
2117          * we go trundling through the block pointers.
2118          */
2119         err = dmu_object_wait_synced(os, object);
2120         if (err) {
2121                 return (err);
2122         }
2123 
2124         err = dnode_hold(os, object, FTAG, &dn);
2125         if (err) {
2126                 return (err);
2127         }
2128 
2129         err = dnode_next_offset(dn, (hole ? DNODE_FIND_HOLE : 0), off, 1, 1, 0);
2130         dnode_rele(dn, FTAG);
2131 
2132         return (err);
2133 }
2134 
2135 /*
2136  * Given the ZFS object, if it contains any dirty nodes
2137  * this function flushes all dirty blocks to disk. This
2138  * ensures the DMU object info is updated. A more efficient
2139  * future version might just find the TXG with the maximum
2140  * ID and wait for that to be synced.
2141  */
2142 int
2143 dmu_object_wait_synced(objset_t *os, uint64_t object)
2144 {
2145         dnode_t *dn;
2146         int error, i;
2147 
2148         error = dnode_hold(os, object, FTAG, &dn);
2149         if (error) {
2150                 return (error);
2151         }
2152 
2153         for (i = 0; i < TXG_SIZE; i++) {
2154                 if (list_link_active(&dn->dn_dirty_link[i])) {
2155                         break;
2156                 }
2157         }
2158         dnode_rele(dn, FTAG);
2159         if (i != TXG_SIZE) {
2160                 txg_wait_synced(dmu_objset_pool(os), 0);
2161         }
2162 
2163         return (0);
2164 }
2165 
2166 void
2167 dmu_object_info_from_dnode(dnode_t *dn, dmu_object_info_t *doi)
2168 {
2169         dnode_phys_t *dnp;
2170 
2171         rw_enter(&dn->dn_struct_rwlock, RW_READER);
2172         mutex_enter(&dn->dn_mtx);
2173 
2174         dnp = dn->dn_phys;
2175 
2176         doi->doi_data_block_size = dn->dn_datablksz;
2177         doi->doi_metadata_block_size = dn->dn_indblkshift ?
2178             1ULL << dn->dn_indblkshift : 0;
2179         doi->doi_type = dn->dn_type;
2180         doi->doi_bonus_type = dn->dn_bonustype;
2181         doi->doi_bonus_size = dn->dn_bonuslen;
2182         doi->doi_indirection = dn->dn_nlevels;
2183         doi->doi_checksum = dn->dn_checksum;
2184         doi->doi_compress = dn->dn_compress;
2185         doi->doi_nblkptr = dn->dn_nblkptr;
2186         doi->doi_physical_blocks_512 = (DN_USED_BYTES(dnp) + 256) >> 9;
2187         doi->doi_max_offset = (dn->dn_maxblkid + 1) * dn->dn_datablksz;
2188         doi->doi_fill_count = 0;
2189         for (int i = 0; i < dnp->dn_nblkptr; i++)
2190                 doi->doi_fill_count += BP_GET_FILL(&dnp->dn_blkptr[i]);
2191 
2192         mutex_exit(&dn->dn_mtx);
2193         rw_exit(&dn->dn_struct_rwlock);
2194 }
2195 
2196 /*
2197  * Get information on a DMU object.
2198  * If doi is NULL, just indicates whether the object exists.
2199  */
2200 int
2201 dmu_object_info(objset_t *os, uint64_t object, dmu_object_info_t *doi)
2202 {
2203         dnode_t *dn;
2204         int err = dnode_hold(os, object, FTAG, &dn);
2205 
2206         if (err)
2207                 return (err);
2208 
2209         if (doi != NULL)
2210                 dmu_object_info_from_dnode(dn, doi);
2211 
2212         dnode_rele(dn, FTAG);
2213         return (0);
2214 }
2215 
2216 /*
2217  * As above, but faster; can be used when you have a held dbuf in hand.
2218  */
2219 void
2220 dmu_object_info_from_db(dmu_buf_t *db_fake, dmu_object_info_t *doi)
2221 {
2222         dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
2223 
2224         DB_DNODE_ENTER(db);
2225         dmu_object_info_from_dnode(DB_DNODE(db), doi);
2226         DB_DNODE_EXIT(db);
2227 }
2228 
2229 /*
2230  * Faster still when you only care about the size.
2231  * This is specifically optimized for zfs_getattr().
2232  */
2233 void
2234 dmu_object_size_from_db(dmu_buf_t *db_fake, uint32_t *blksize,
2235     u_longlong_t *nblk512)
2236 {
2237         dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
2238         dnode_t *dn;
2239 
2240         DB_DNODE_ENTER(db);
2241         dn = DB_DNODE(db);
2242 
2243         *blksize = dn->dn_datablksz;
2244         /* add 1 for dnode space */
2245         *nblk512 = ((DN_USED_BYTES(dn->dn_phys) + SPA_MINBLOCKSIZE/2) >>
2246             SPA_MINBLOCKSHIFT) + 1;
2247         DB_DNODE_EXIT(db);
2248 }
2249 
2250 void
2251 byteswap_uint64_array(void *vbuf, size_t size)
2252 {
2253         uint64_t *buf = vbuf;
2254         size_t count = size >> 3;
2255         int i;
2256 
2257         ASSERT((size & 7) == 0);
2258 
2259         for (i = 0; i < count; i++)
2260                 buf[i] = BSWAP_64(buf[i]);
2261 }
2262 
2263 void
2264 byteswap_uint32_array(void *vbuf, size_t size)
2265 {
2266         uint32_t *buf = vbuf;
2267         size_t count = size >> 2;
2268         int i;
2269 
2270         ASSERT((size & 3) == 0);
2271 
2272         for (i = 0; i < count; i++)
2273                 buf[i] = BSWAP_32(buf[i]);
2274 }
2275 
2276 void
2277 byteswap_uint16_array(void *vbuf, size_t size)
2278 {
2279         uint16_t *buf = vbuf;
2280         size_t count = size >> 1;
2281         int i;
2282 
2283         ASSERT((size & 1) == 0);
2284 
2285         for (i = 0; i < count; i++)
2286                 buf[i] = BSWAP_16(buf[i]);
2287 }
2288 
2289 /* ARGSUSED */
2290 void
2291 byteswap_uint8_array(void *vbuf, size_t size)
2292 {
2293 }
2294 
2295 void
2296 dmu_init(void)
2297 {
2298         abd_init();
2299         zfs_dbgmsg_init();
2300         sa_cache_init();
2301         xuio_stat_init();
2302         dmu_objset_init();
2303         dnode_init();
2304         zfetch_init();
2305         l2arc_init();
2306         arc_init();
2307         dbuf_init();
2308 }
2309 
2310 void
2311 dmu_fini(void)
2312 {
2313         arc_fini(); /* arc depends on l2arc, so arc must go first */
2314         l2arc_fini();
2315         zfetch_fini();
2316         dbuf_fini();
2317         dnode_fini();
2318         dmu_objset_fini();
2319         xuio_stat_fini();
2320         sa_cache_fini();
2321         zfs_dbgmsg_fini();
2322         abd_fini();
2323 }