1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  23  * Copyright 2011 Nexenta Systems, Inc. All rights reserved.
  24  * Copyright (c) 2011, 2015 by Delphix. All rights reserved.
  25  * Copyright (c) 2014, Joyent, Inc. All rights reserved.
  26  * Copyright 2014 HybridCluster. All rights reserved.
  27  * Copyright 2016 RackTop Systems.
  28  * Copyright (c) 2014 Integros [integros.com]
  29  */
  30 
  31 #include <sys/dmu.h>
  32 #include <sys/dmu_impl.h>
  33 #include <sys/dmu_tx.h>
  34 #include <sys/dbuf.h>
  35 #include <sys/dnode.h>
  36 #include <sys/zfs_context.h>
  37 #include <sys/dmu_objset.h>
  38 #include <sys/dmu_traverse.h>
  39 #include <sys/dsl_dataset.h>
  40 #include <sys/dsl_dir.h>
  41 #include <sys/dsl_prop.h>
  42 #include <sys/dsl_pool.h>
  43 #include <sys/dsl_synctask.h>
  44 #include <sys/zfs_ioctl.h>
  45 #include <sys/zap.h>
  46 #include <sys/zio_checksum.h>
  47 #include <sys/zfs_znode.h>
  48 #include <zfs_fletcher.h>
  49 #include <sys/avl.h>
  50 #include <sys/ddt.h>
  51 #include <sys/zfs_onexit.h>
  52 #include <sys/dmu_send.h>
  53 #include <sys/dsl_destroy.h>
  54 #include <sys/blkptr.h>
  55 #include <sys/dsl_bookmark.h>
  56 #include <sys/zfeature.h>
  57 #include <sys/bqueue.h>
  58 
  59 /* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */
  60 int zfs_send_corrupt_data = B_FALSE;
  61 int zfs_send_queue_length = 16 * 1024 * 1024;
  62 int zfs_recv_queue_length = 16 * 1024 * 1024;
  63 /* Set this tunable to FALSE to disable setting of DRR_FLAG_FREERECORDS */
  64 int zfs_send_set_freerecords_bit = B_TRUE;
  65 
  66 static char *dmu_recv_tag = "dmu_recv_tag";
  67 const char *recv_clone_name = "%recv";
  68 
  69 #define BP_SPAN(datablkszsec, indblkshift, level) \
  70         (((uint64_t)datablkszsec) << (SPA_MINBLOCKSHIFT + \
  71         (level) * (indblkshift - SPA_BLKPTRSHIFT)))
  72 
  73 static void byteswap_record(dmu_replay_record_t *drr);
  74 
  75 struct send_thread_arg {
  76         bqueue_t        q;
  77         dsl_dataset_t   *ds;            /* Dataset to traverse */
  78         uint64_t        fromtxg;        /* Traverse from this txg */
  79         int             flags;          /* flags to pass to traverse_dataset */
  80         int             error_code;
  81         boolean_t       cancel;
  82         zbookmark_phys_t resume;
  83 };
  84 
  85 struct send_block_record {
  86         boolean_t               eos_marker; /* Marks the end of the stream */
  87         blkptr_t                bp;
  88         zbookmark_phys_t        zb;
  89         uint8_t                 indblkshift;
  90         uint16_t                datablkszsec;
  91         bqueue_node_t           ln;
  92 };
  93 
  94 static int
  95 dump_bytes(dmu_sendarg_t *dsp, void *buf, int len)
  96 {
  97         dsl_dataset_t *ds = dmu_objset_ds(dsp->dsa_os);
  98         ssize_t resid; /* have to get resid to get detailed errno */
  99 
 100         /*
 101          * The code does not rely on this (len being a multiple of 8).  We keep
 102          * this assertion because of the corresponding assertion in
 103          * receive_read().  Keeping this assertion ensures that we do not
 104          * inadvertently break backwards compatibility (causing the assertion
 105          * in receive_read() to trigger on old software).
 106          *
 107          * Removing the assertions could be rolled into a new feature that uses
 108          * data that isn't 8-byte aligned; if the assertions were removed, a
 109          * feature flag would have to be added.
 110          */
 111 
 112         ASSERT0(len % 8);
 113 
 114         dsp->dsa_err = vn_rdwr(UIO_WRITE, dsp->dsa_vp,
 115             (caddr_t)buf, len,
 116             0, UIO_SYSSPACE, FAPPEND, RLIM64_INFINITY, CRED(), &resid);
 117 
 118         mutex_enter(&ds->ds_sendstream_lock);
 119         *dsp->dsa_off += len;
 120         mutex_exit(&ds->ds_sendstream_lock);
 121 
 122         return (dsp->dsa_err);
 123 }
 124 
 125 /*
 126  * For all record types except BEGIN, fill in the checksum (overlaid in
 127  * drr_u.drr_checksum.drr_checksum).  The checksum verifies everything
 128  * up to the start of the checksum itself.
 129  */
 130 static int
 131 dump_record(dmu_sendarg_t *dsp, void *payload, int payload_len)
 132 {
 133         ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
 134             ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
 135         (void) fletcher_4_incremental_native(dsp->dsa_drr,
 136             offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
 137             &dsp->dsa_zc);
 138         if (dsp->dsa_drr->drr_type == DRR_BEGIN) {
 139                 dsp->dsa_sent_begin = B_TRUE;
 140         } else {
 141                 ASSERT(ZIO_CHECKSUM_IS_ZERO(&dsp->dsa_drr->drr_u.
 142                     drr_checksum.drr_checksum));
 143                 dsp->dsa_drr->drr_u.drr_checksum.drr_checksum = dsp->dsa_zc;
 144         }
 145         if (dsp->dsa_drr->drr_type == DRR_END) {
 146                 dsp->dsa_sent_end = B_TRUE;
 147         }
 148         (void) fletcher_4_incremental_native(&dsp->dsa_drr->
 149             drr_u.drr_checksum.drr_checksum,
 150             sizeof (zio_cksum_t), &dsp->dsa_zc);
 151         if (dump_bytes(dsp, dsp->dsa_drr, sizeof (dmu_replay_record_t)) != 0)
 152                 return (SET_ERROR(EINTR));
 153         if (payload_len != 0) {
 154                 (void) fletcher_4_incremental_native(payload, payload_len,
 155                     &dsp->dsa_zc);
 156                 if (dump_bytes(dsp, payload, payload_len) != 0)
 157                         return (SET_ERROR(EINTR));
 158         }
 159         return (0);
 160 }
 161 
 162 /*
 163  * Fill in the drr_free struct, or perform aggregation if the previous record is
 164  * also a free record, and the two are adjacent.
 165  *
 166  * Note that we send free records even for a full send, because we want to be
 167  * able to receive a full send as a clone, which requires a list of all the free
 168  * and freeobject records that were generated on the source.
 169  */
 170 static int
 171 dump_free(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset,
 172     uint64_t length)
 173 {
 174         struct drr_free *drrf = &(dsp->dsa_drr->drr_u.drr_free);
 175 
 176         /*
 177          * When we receive a free record, dbuf_free_range() assumes
 178          * that the receiving system doesn't have any dbufs in the range
 179          * being freed.  This is always true because there is a one-record
 180          * constraint: we only send one WRITE record for any given
 181          * object,offset.  We know that the one-record constraint is
 182          * true because we always send data in increasing order by
 183          * object,offset.
 184          *
 185          * If the increasing-order constraint ever changes, we should find
 186          * another way to assert that the one-record constraint is still
 187          * satisfied.
 188          */
 189         ASSERT(object > dsp->dsa_last_data_object ||
 190             (object == dsp->dsa_last_data_object &&
 191             offset > dsp->dsa_last_data_offset));
 192 
 193         if (length != -1ULL && offset + length < offset)
 194                 length = -1ULL;
 195 
 196         /*
 197          * If there is a pending op, but it's not PENDING_FREE, push it out,
 198          * since free block aggregation can only be done for blocks of the
 199          * same type (i.e., DRR_FREE records can only be aggregated with
 200          * other DRR_FREE records.  DRR_FREEOBJECTS records can only be
 201          * aggregated with other DRR_FREEOBJECTS records.
 202          */
 203         if (dsp->dsa_pending_op != PENDING_NONE &&
 204             dsp->dsa_pending_op != PENDING_FREE) {
 205                 if (dump_record(dsp, NULL, 0) != 0)
 206                         return (SET_ERROR(EINTR));
 207                 dsp->dsa_pending_op = PENDING_NONE;
 208         }
 209 
 210         if (dsp->dsa_pending_op == PENDING_FREE) {
 211                 /*
 212                  * There should never be a PENDING_FREE if length is -1
 213                  * (because dump_dnode is the only place where this
 214                  * function is called with a -1, and only after flushing
 215                  * any pending record).
 216                  */
 217                 ASSERT(length != -1ULL);
 218                 /*
 219                  * Check to see whether this free block can be aggregated
 220                  * with pending one.
 221                  */
 222                 if (drrf->drr_object == object && drrf->drr_offset +
 223                     drrf->drr_length == offset) {
 224                         drrf->drr_length += length;
 225                         return (0);
 226                 } else {
 227                         /* not a continuation.  Push out pending record */
 228                         if (dump_record(dsp, NULL, 0) != 0)
 229                                 return (SET_ERROR(EINTR));
 230                         dsp->dsa_pending_op = PENDING_NONE;
 231                 }
 232         }
 233         /* create a FREE record and make it pending */
 234         bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
 235         dsp->dsa_drr->drr_type = DRR_FREE;
 236         drrf->drr_object = object;
 237         drrf->drr_offset = offset;
 238         drrf->drr_length = length;
 239         drrf->drr_toguid = dsp->dsa_toguid;
 240         if (length == -1ULL) {
 241                 if (dump_record(dsp, NULL, 0) != 0)
 242                         return (SET_ERROR(EINTR));
 243         } else {
 244                 dsp->dsa_pending_op = PENDING_FREE;
 245         }
 246 
 247         return (0);
 248 }
 249 
 250 static int
 251 dump_write(dmu_sendarg_t *dsp, dmu_object_type_t type,
 252     uint64_t object, uint64_t offset, int lsize, int psize, const blkptr_t *bp,
 253     void *data)
 254 {
 255         uint64_t payload_size;
 256         struct drr_write *drrw = &(dsp->dsa_drr->drr_u.drr_write);
 257 
 258         /*
 259          * We send data in increasing object, offset order.
 260          * See comment in dump_free() for details.
 261          */
 262         ASSERT(object > dsp->dsa_last_data_object ||
 263             (object == dsp->dsa_last_data_object &&
 264             offset > dsp->dsa_last_data_offset));
 265         dsp->dsa_last_data_object = object;
 266         dsp->dsa_last_data_offset = offset + lsize - 1;
 267 
 268         /*
 269          * If there is any kind of pending aggregation (currently either
 270          * a grouping of free objects or free blocks), push it out to
 271          * the stream, since aggregation can't be done across operations
 272          * of different types.
 273          */
 274         if (dsp->dsa_pending_op != PENDING_NONE) {
 275                 if (dump_record(dsp, NULL, 0) != 0)
 276                         return (SET_ERROR(EINTR));
 277                 dsp->dsa_pending_op = PENDING_NONE;
 278         }
 279         /* write a WRITE record */
 280         bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
 281         dsp->dsa_drr->drr_type = DRR_WRITE;
 282         drrw->drr_object = object;
 283         drrw->drr_type = type;
 284         drrw->drr_offset = offset;
 285         drrw->drr_toguid = dsp->dsa_toguid;
 286         drrw->drr_logical_size = lsize;
 287 
 288         /* only set the compression fields if the buf is compressed */
 289         if (lsize != psize) {
 290                 ASSERT(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_COMPRESSED);
 291                 ASSERT(!BP_IS_EMBEDDED(bp));
 292                 ASSERT(!BP_SHOULD_BYTESWAP(bp));
 293                 ASSERT(!DMU_OT_IS_METADATA(BP_GET_TYPE(bp)));
 294                 ASSERT3U(BP_GET_COMPRESS(bp), !=, ZIO_COMPRESS_OFF);
 295                 ASSERT3S(psize, >, 0);
 296                 ASSERT3S(lsize, >=, psize);
 297 
 298                 drrw->drr_compressiontype = BP_GET_COMPRESS(bp);
 299                 drrw->drr_compressed_size = psize;
 300                 payload_size = drrw->drr_compressed_size;
 301         } else {
 302                 payload_size = drrw->drr_logical_size;
 303         }
 304 
 305         if (bp == NULL || BP_IS_EMBEDDED(bp)) {
 306                 /*
 307                  * There's no pre-computed checksum for partial-block
 308                  * writes or embedded BP's, so (like
 309                  * fletcher4-checkummed blocks) userland will have to
 310                  * compute a dedup-capable checksum itself.
 311                  */
 312                 drrw->drr_checksumtype = ZIO_CHECKSUM_OFF;
 313         } else {
 314                 drrw->drr_checksumtype = BP_GET_CHECKSUM(bp);
 315                 if (zio_checksum_table[drrw->drr_checksumtype].ci_flags &
 316                     ZCHECKSUM_FLAG_DEDUP)
 317                         drrw->drr_checksumflags |= DRR_CHECKSUM_DEDUP;
 318                 DDK_SET_LSIZE(&drrw->drr_key, BP_GET_LSIZE(bp));
 319                 DDK_SET_PSIZE(&drrw->drr_key, BP_GET_PSIZE(bp));
 320                 DDK_SET_COMPRESS(&drrw->drr_key, BP_GET_COMPRESS(bp));
 321                 drrw->drr_key.ddk_cksum = bp->blk_cksum;
 322         }
 323 
 324         if (dump_record(dsp, data, payload_size) != 0)
 325                 return (SET_ERROR(EINTR));
 326         return (0);
 327 }
 328 
 329 static int
 330 dump_write_embedded(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset,
 331     int blksz, const blkptr_t *bp)
 332 {
 333         char buf[BPE_PAYLOAD_SIZE];
 334         struct drr_write_embedded *drrw =
 335             &(dsp->dsa_drr->drr_u.drr_write_embedded);
 336 
 337         if (dsp->dsa_pending_op != PENDING_NONE) {
 338                 if (dump_record(dsp, NULL, 0) != 0)
 339                         return (EINTR);
 340                 dsp->dsa_pending_op = PENDING_NONE;
 341         }
 342 
 343         ASSERT(BP_IS_EMBEDDED(bp));
 344 
 345         bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
 346         dsp->dsa_drr->drr_type = DRR_WRITE_EMBEDDED;
 347         drrw->drr_object = object;
 348         drrw->drr_offset = offset;
 349         drrw->drr_length = blksz;
 350         drrw->drr_toguid = dsp->dsa_toguid;
 351         drrw->drr_compression = BP_GET_COMPRESS(bp);
 352         drrw->drr_etype = BPE_GET_ETYPE(bp);
 353         drrw->drr_lsize = BPE_GET_LSIZE(bp);
 354         drrw->drr_psize = BPE_GET_PSIZE(bp);
 355 
 356         decode_embedded_bp_compressed(bp, buf);
 357 
 358         if (dump_record(dsp, buf, P2ROUNDUP(drrw->drr_psize, 8)) != 0)
 359                 return (EINTR);
 360         return (0);
 361 }
 362 
 363 static int
 364 dump_spill(dmu_sendarg_t *dsp, uint64_t object, int blksz, void *data)
 365 {
 366         struct drr_spill *drrs = &(dsp->dsa_drr->drr_u.drr_spill);
 367 
 368         if (dsp->dsa_pending_op != PENDING_NONE) {
 369                 if (dump_record(dsp, NULL, 0) != 0)
 370                         return (SET_ERROR(EINTR));
 371                 dsp->dsa_pending_op = PENDING_NONE;
 372         }
 373 
 374         /* write a SPILL record */
 375         bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
 376         dsp->dsa_drr->drr_type = DRR_SPILL;
 377         drrs->drr_object = object;
 378         drrs->drr_length = blksz;
 379         drrs->drr_toguid = dsp->dsa_toguid;
 380 
 381         if (dump_record(dsp, data, blksz) != 0)
 382                 return (SET_ERROR(EINTR));
 383         return (0);
 384 }
 385 
 386 static int
 387 dump_freeobjects(dmu_sendarg_t *dsp, uint64_t firstobj, uint64_t numobjs)
 388 {
 389         struct drr_freeobjects *drrfo = &(dsp->dsa_drr->drr_u.drr_freeobjects);
 390 
 391         /*
 392          * If there is a pending op, but it's not PENDING_FREEOBJECTS,
 393          * push it out, since free block aggregation can only be done for
 394          * blocks of the same type (i.e., DRR_FREE records can only be
 395          * aggregated with other DRR_FREE records.  DRR_FREEOBJECTS records
 396          * can only be aggregated with other DRR_FREEOBJECTS records.
 397          */
 398         if (dsp->dsa_pending_op != PENDING_NONE &&
 399             dsp->dsa_pending_op != PENDING_FREEOBJECTS) {
 400                 if (dump_record(dsp, NULL, 0) != 0)
 401                         return (SET_ERROR(EINTR));
 402                 dsp->dsa_pending_op = PENDING_NONE;
 403         }
 404         if (dsp->dsa_pending_op == PENDING_FREEOBJECTS) {
 405                 /*
 406                  * See whether this free object array can be aggregated
 407                  * with pending one
 408                  */
 409                 if (drrfo->drr_firstobj + drrfo->drr_numobjs == firstobj) {
 410                         drrfo->drr_numobjs += numobjs;
 411                         return (0);
 412                 } else {
 413                         /* can't be aggregated.  Push out pending record */
 414                         if (dump_record(dsp, NULL, 0) != 0)
 415                                 return (SET_ERROR(EINTR));
 416                         dsp->dsa_pending_op = PENDING_NONE;
 417                 }
 418         }
 419 
 420         /* write a FREEOBJECTS record */
 421         bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
 422         dsp->dsa_drr->drr_type = DRR_FREEOBJECTS;
 423         drrfo->drr_firstobj = firstobj;
 424         drrfo->drr_numobjs = numobjs;
 425         drrfo->drr_toguid = dsp->dsa_toguid;
 426 
 427         dsp->dsa_pending_op = PENDING_FREEOBJECTS;
 428 
 429         return (0);
 430 }
 431 
 432 static int
 433 dump_dnode(dmu_sendarg_t *dsp, uint64_t object, dnode_phys_t *dnp)
 434 {
 435         struct drr_object *drro = &(dsp->dsa_drr->drr_u.drr_object);
 436 
 437         if (object < dsp->dsa_resume_object) {
 438                 /*
 439                  * Note: when resuming, we will visit all the dnodes in
 440                  * the block of dnodes that we are resuming from.  In
 441                  * this case it's unnecessary to send the dnodes prior to
 442                  * the one we are resuming from.  We should be at most one
 443                  * block's worth of dnodes behind the resume point.
 444                  */
 445                 ASSERT3U(dsp->dsa_resume_object - object, <,
 446                     1 << (DNODE_BLOCK_SHIFT - DNODE_SHIFT));
 447                 return (0);
 448         }
 449 
 450         if (dnp == NULL || dnp->dn_type == DMU_OT_NONE)
 451                 return (dump_freeobjects(dsp, object, 1));
 452 
 453         if (dsp->dsa_pending_op != PENDING_NONE) {
 454                 if (dump_record(dsp, NULL, 0) != 0)
 455                         return (SET_ERROR(EINTR));
 456                 dsp->dsa_pending_op = PENDING_NONE;
 457         }
 458 
 459         /* write an OBJECT record */
 460         bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
 461         dsp->dsa_drr->drr_type = DRR_OBJECT;
 462         drro->drr_object = object;
 463         drro->drr_type = dnp->dn_type;
 464         drro->drr_bonustype = dnp->dn_bonustype;
 465         drro->drr_blksz = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
 466         drro->drr_bonuslen = dnp->dn_bonuslen;
 467         drro->drr_checksumtype = dnp->dn_checksum;
 468         drro->drr_compress = dnp->dn_compress;
 469         drro->drr_toguid = dsp->dsa_toguid;
 470 
 471         if (!(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
 472             drro->drr_blksz > SPA_OLD_MAXBLOCKSIZE)
 473                 drro->drr_blksz = SPA_OLD_MAXBLOCKSIZE;
 474 
 475         if (dump_record(dsp, DN_BONUS(dnp),
 476             P2ROUNDUP(dnp->dn_bonuslen, 8)) != 0) {
 477                 return (SET_ERROR(EINTR));
 478         }
 479 
 480         /* Free anything past the end of the file. */
 481         if (dump_free(dsp, object, (dnp->dn_maxblkid + 1) *
 482             (dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT), -1ULL) != 0)
 483                 return (SET_ERROR(EINTR));
 484         if (dsp->dsa_err != 0)
 485                 return (SET_ERROR(EINTR));
 486         return (0);
 487 }
 488 
 489 static boolean_t
 490 backup_do_embed(dmu_sendarg_t *dsp, const blkptr_t *bp)
 491 {
 492         if (!BP_IS_EMBEDDED(bp))
 493                 return (B_FALSE);
 494 
 495         /*
 496          * Compression function must be legacy, or explicitly enabled.
 497          */
 498         if ((BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_LEGACY_FUNCTIONS &&
 499             !(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_LZ4)))
 500                 return (B_FALSE);
 501 
 502         /*
 503          * Embed type must be explicitly enabled.
 504          */
 505         switch (BPE_GET_ETYPE(bp)) {
 506         case BP_EMBEDDED_TYPE_DATA:
 507                 if (dsp->dsa_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)
 508                         return (B_TRUE);
 509                 break;
 510         default:
 511                 return (B_FALSE);
 512         }
 513         return (B_FALSE);
 514 }
 515 
 516 /*
 517  * This is the callback function to traverse_dataset that acts as the worker
 518  * thread for dmu_send_impl.
 519  */
 520 /*ARGSUSED*/
 521 static int
 522 send_cb(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
 523     const zbookmark_phys_t *zb, const struct dnode_phys *dnp, void *arg)
 524 {
 525         struct send_thread_arg *sta = arg;
 526         struct send_block_record *record;
 527         uint64_t record_size;
 528         int err = 0;
 529 
 530         ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT ||
 531             zb->zb_object >= sta->resume.zb_object);
 532 
 533         if (sta->cancel)
 534                 return (SET_ERROR(EINTR));
 535 
 536         if (bp == NULL) {
 537                 ASSERT3U(zb->zb_level, ==, ZB_DNODE_LEVEL);
 538                 return (0);
 539         } else if (zb->zb_level < 0) {
 540                 return (0);
 541         }
 542 
 543         record = kmem_zalloc(sizeof (struct send_block_record), KM_SLEEP);
 544         record->eos_marker = B_FALSE;
 545         record->bp = *bp;
 546         record->zb = *zb;
 547         record->indblkshift = dnp->dn_indblkshift;
 548         record->datablkszsec = dnp->dn_datablkszsec;
 549         record_size = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
 550         bqueue_enqueue(&sta->q, record, record_size);
 551 
 552         return (err);
 553 }
 554 
 555 /*
 556  * This function kicks off the traverse_dataset.  It also handles setting the
 557  * error code of the thread in case something goes wrong, and pushes the End of
 558  * Stream record when the traverse_dataset call has finished.  If there is no
 559  * dataset to traverse, the thread immediately pushes End of Stream marker.
 560  */
 561 static void
 562 send_traverse_thread(void *arg)
 563 {
 564         struct send_thread_arg *st_arg = arg;
 565         int err;
 566         struct send_block_record *data;
 567 
 568         if (st_arg->ds != NULL) {
 569                 err = traverse_dataset_resume(st_arg->ds,
 570                     st_arg->fromtxg, &st_arg->resume,
 571                     st_arg->flags, send_cb, st_arg);
 572 
 573                 if (err != EINTR)
 574                         st_arg->error_code = err;
 575         }
 576         data = kmem_zalloc(sizeof (*data), KM_SLEEP);
 577         data->eos_marker = B_TRUE;
 578         bqueue_enqueue(&st_arg->q, data, 1);
 579         thread_exit();
 580 }
 581 
 582 /*
 583  * This function actually handles figuring out what kind of record needs to be
 584  * dumped, reading the data (which has hopefully been prefetched), and calling
 585  * the appropriate helper function.
 586  */
 587 static int
 588 do_dump(dmu_sendarg_t *dsa, struct send_block_record *data)
 589 {
 590         dsl_dataset_t *ds = dmu_objset_ds(dsa->dsa_os);
 591         const blkptr_t *bp = &data->bp;
 592         const zbookmark_phys_t *zb = &data->zb;
 593         uint8_t indblkshift = data->indblkshift;
 594         uint16_t dblkszsec = data->datablkszsec;
 595         spa_t *spa = ds->ds_dir->dd_pool->dp_spa;
 596         dmu_object_type_t type = bp ? BP_GET_TYPE(bp) : DMU_OT_NONE;
 597         int err = 0;
 598 
 599         ASSERT3U(zb->zb_level, >=, 0);
 600 
 601         ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT ||
 602             zb->zb_object >= dsa->dsa_resume_object);
 603 
 604         if (zb->zb_object != DMU_META_DNODE_OBJECT &&
 605             DMU_OBJECT_IS_SPECIAL(zb->zb_object)) {
 606                 return (0);
 607         } else if (BP_IS_HOLE(bp) &&
 608             zb->zb_object == DMU_META_DNODE_OBJECT) {
 609                 uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level);
 610                 uint64_t dnobj = (zb->zb_blkid * span) >> DNODE_SHIFT;
 611                 err = dump_freeobjects(dsa, dnobj, span >> DNODE_SHIFT);
 612         } else if (BP_IS_HOLE(bp)) {
 613                 uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level);
 614                 uint64_t offset = zb->zb_blkid * span;
 615                 err = dump_free(dsa, zb->zb_object, offset, span);
 616         } else if (zb->zb_level > 0 || type == DMU_OT_OBJSET) {
 617                 return (0);
 618         } else if (type == DMU_OT_DNODE) {
 619                 int blksz = BP_GET_LSIZE(bp);
 620                 arc_flags_t aflags = ARC_FLAG_WAIT;
 621                 arc_buf_t *abuf;
 622 
 623                 ASSERT0(zb->zb_level);
 624 
 625                 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
 626                     ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL,
 627                     &aflags, zb) != 0)
 628                         return (SET_ERROR(EIO));
 629 
 630                 dnode_phys_t *blk = abuf->b_data;
 631                 uint64_t dnobj = zb->zb_blkid * (blksz >> DNODE_SHIFT);
 632                 for (int i = 0; i < blksz >> DNODE_SHIFT; i++) {
 633                         err = dump_dnode(dsa, dnobj + i, blk + i);
 634                         if (err != 0)
 635                                 break;
 636                 }
 637                 arc_buf_destroy(abuf, &abuf);
 638         } else if (type == DMU_OT_SA) {
 639                 arc_flags_t aflags = ARC_FLAG_WAIT;
 640                 arc_buf_t *abuf;
 641                 int blksz = BP_GET_LSIZE(bp);
 642 
 643                 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
 644                     ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL,
 645                     &aflags, zb) != 0)
 646                         return (SET_ERROR(EIO));
 647 
 648                 err = dump_spill(dsa, zb->zb_object, blksz, abuf->b_data);
 649                 arc_buf_destroy(abuf, &abuf);
 650         } else if (backup_do_embed(dsa, bp)) {
 651                 /* it's an embedded level-0 block of a regular object */
 652                 int blksz = dblkszsec << SPA_MINBLOCKSHIFT;
 653                 ASSERT0(zb->zb_level);
 654                 err = dump_write_embedded(dsa, zb->zb_object,
 655                     zb->zb_blkid * blksz, blksz, bp);
 656         } else {
 657                 /* it's a level-0 block of a regular object */
 658                 arc_flags_t aflags = ARC_FLAG_WAIT;
 659                 arc_buf_t *abuf;
 660                 int blksz = dblkszsec << SPA_MINBLOCKSHIFT;
 661                 uint64_t offset;
 662 
 663                 /*
 664                  * If we have large blocks stored on disk but the send flags
 665                  * don't allow us to send large blocks, we split the data from
 666                  * the arc buf into chunks.
 667                  */
 668                 boolean_t split_large_blocks = blksz > SPA_OLD_MAXBLOCKSIZE &&
 669                     !(dsa->dsa_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS);
 670                 /*
 671                  * We should only request compressed data from the ARC if all
 672                  * the following are true:
 673                  *  - stream compression was requested
 674                  *  - we aren't splitting large blocks into smaller chunks
 675                  *  - the data won't need to be byteswapped before sending
 676                  *  - this isn't an embedded block
 677                  *  - this isn't metadata (if receiving on a different endian
 678                  *    system it can be byteswapped more easily)
 679                  */
 680                 boolean_t request_compressed =
 681                     (dsa->dsa_featureflags & DMU_BACKUP_FEATURE_COMPRESSED) &&
 682                     !split_large_blocks && !BP_SHOULD_BYTESWAP(bp) &&
 683                     !BP_IS_EMBEDDED(bp) && !DMU_OT_IS_METADATA(BP_GET_TYPE(bp));
 684 
 685                 ASSERT0(zb->zb_level);
 686                 ASSERT(zb->zb_object > dsa->dsa_resume_object ||
 687                     (zb->zb_object == dsa->dsa_resume_object &&
 688                     zb->zb_blkid * blksz >= dsa->dsa_resume_offset));
 689 
 690                 ASSERT0(zb->zb_level);
 691                 ASSERT(zb->zb_object > dsa->dsa_resume_object ||
 692                     (zb->zb_object == dsa->dsa_resume_object &&
 693                     zb->zb_blkid * blksz >= dsa->dsa_resume_offset));
 694 
 695                 ASSERT3U(blksz, ==, BP_GET_LSIZE(bp));
 696 
 697                 enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
 698                 if (request_compressed)
 699                         zioflags |= ZIO_FLAG_RAW;
 700                 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
 701                     ZIO_PRIORITY_ASYNC_READ, zioflags, &aflags, zb) != 0) {
 702                         if (zfs_send_corrupt_data) {
 703                                 /* Send a block filled with 0x"zfs badd bloc" */
 704                                 abuf = arc_alloc_buf(spa, &abuf, ARC_BUFC_DATA,
 705                                     blksz);
 706                                 uint64_t *ptr;
 707                                 for (ptr = abuf->b_data;
 708                                     (char *)ptr < (char *)abuf->b_data + blksz;
 709                                     ptr++)
 710                                         *ptr = 0x2f5baddb10cULL;
 711                         } else {
 712                                 return (SET_ERROR(EIO));
 713                         }
 714                 }
 715 
 716                 offset = zb->zb_blkid * blksz;
 717 
 718                 if (split_large_blocks) {
 719                         ASSERT3U(arc_get_compression(abuf), ==,
 720                             ZIO_COMPRESS_OFF);
 721                         char *buf = abuf->b_data;
 722                         while (blksz > 0 && err == 0) {
 723                                 int n = MIN(blksz, SPA_OLD_MAXBLOCKSIZE);
 724                                 err = dump_write(dsa, type, zb->zb_object,
 725                                     offset, n, n, NULL, buf);
 726                                 offset += n;
 727                                 buf += n;
 728                                 blksz -= n;
 729                         }
 730                 } else {
 731                         err = dump_write(dsa, type, zb->zb_object, offset,
 732                             blksz, arc_buf_size(abuf), bp, abuf->b_data);
 733                 }
 734                 arc_buf_destroy(abuf, &abuf);
 735         }
 736 
 737         ASSERT(err == 0 || err == EINTR);
 738         return (err);
 739 }
 740 
 741 /*
 742  * Pop the new data off the queue, and free the old data.
 743  */
 744 static struct send_block_record *
 745 get_next_record(bqueue_t *bq, struct send_block_record *data)
 746 {
 747         struct send_block_record *tmp = bqueue_dequeue(bq);
 748         kmem_free(data, sizeof (*data));
 749         return (tmp);
 750 }
 751 
 752 /*
 753  * Actually do the bulk of the work in a zfs send.
 754  *
 755  * Note: Releases dp using the specified tag.
 756  */
 757 static int
 758 dmu_send_impl(void *tag, dsl_pool_t *dp, dsl_dataset_t *to_ds,
 759     zfs_bookmark_phys_t *ancestor_zb, boolean_t is_clone,
 760     boolean_t embedok, boolean_t large_block_ok, boolean_t compressok,
 761     int outfd, uint64_t resumeobj, uint64_t resumeoff,
 762     vnode_t *vp, offset_t *off)
 763 {
 764         objset_t *os;
 765         dmu_replay_record_t *drr;
 766         dmu_sendarg_t *dsp;
 767         int err;
 768         uint64_t fromtxg = 0;
 769         uint64_t featureflags = 0;
 770         struct send_thread_arg to_arg = { 0 };
 771 
 772         err = dmu_objset_from_ds(to_ds, &os);
 773         if (err != 0) {
 774                 dsl_pool_rele(dp, tag);
 775                 return (err);
 776         }
 777 
 778         drr = kmem_zalloc(sizeof (dmu_replay_record_t), KM_SLEEP);
 779         drr->drr_type = DRR_BEGIN;
 780         drr->drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC;
 781         DMU_SET_STREAM_HDRTYPE(drr->drr_u.drr_begin.drr_versioninfo,
 782             DMU_SUBSTREAM);
 783 
 784 #ifdef _KERNEL
 785         if (dmu_objset_type(os) == DMU_OST_ZFS) {
 786                 uint64_t version;
 787                 if (zfs_get_zplprop(os, ZFS_PROP_VERSION, &version) != 0) {
 788                         kmem_free(drr, sizeof (dmu_replay_record_t));
 789                         dsl_pool_rele(dp, tag);
 790                         return (SET_ERROR(EINVAL));
 791                 }
 792                 if (version >= ZPL_VERSION_SA) {
 793                         featureflags |= DMU_BACKUP_FEATURE_SA_SPILL;
 794                 }
 795         }
 796 #endif
 797 
 798         if (large_block_ok && to_ds->ds_feature_inuse[SPA_FEATURE_LARGE_BLOCKS])
 799                 featureflags |= DMU_BACKUP_FEATURE_LARGE_BLOCKS;
 800         if (embedok &&
 801             spa_feature_is_active(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) {
 802                 featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA;
 803                 if (spa_feature_is_active(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS))
 804                         featureflags |= DMU_BACKUP_FEATURE_LZ4;
 805         }
 806         if (compressok) {
 807                 featureflags |= DMU_BACKUP_FEATURE_COMPRESSED;
 808         }
 809         if ((featureflags &
 810             (DMU_BACKUP_FEATURE_EMBED_DATA | DMU_BACKUP_FEATURE_COMPRESSED)) !=
 811             0 && spa_feature_is_active(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) {
 812                 featureflags |= DMU_BACKUP_FEATURE_LZ4;
 813         }
 814 
 815         if (resumeobj != 0 || resumeoff != 0) {
 816                 featureflags |= DMU_BACKUP_FEATURE_RESUMING;
 817         }
 818 
 819         DMU_SET_FEATUREFLAGS(drr->drr_u.drr_begin.drr_versioninfo,
 820             featureflags);
 821 
 822         drr->drr_u.drr_begin.drr_creation_time =
 823             dsl_dataset_phys(to_ds)->ds_creation_time;
 824         drr->drr_u.drr_begin.drr_type = dmu_objset_type(os);
 825         if (is_clone)
 826                 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CLONE;
 827         drr->drr_u.drr_begin.drr_toguid = dsl_dataset_phys(to_ds)->ds_guid;
 828         if (dsl_dataset_phys(to_ds)->ds_flags & DS_FLAG_CI_DATASET)
 829                 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CI_DATA;
 830         if (zfs_send_set_freerecords_bit)
 831                 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_FREERECORDS;
 832 
 833         if (ancestor_zb != NULL) {
 834                 drr->drr_u.drr_begin.drr_fromguid =
 835                     ancestor_zb->zbm_guid;
 836                 fromtxg = ancestor_zb->zbm_creation_txg;
 837         }
 838         dsl_dataset_name(to_ds, drr->drr_u.drr_begin.drr_toname);
 839         if (!to_ds->ds_is_snapshot) {
 840                 (void) strlcat(drr->drr_u.drr_begin.drr_toname, "@--head--",
 841                     sizeof (drr->drr_u.drr_begin.drr_toname));
 842         }
 843 
 844         dsp = kmem_zalloc(sizeof (dmu_sendarg_t), KM_SLEEP);
 845 
 846         dsp->dsa_drr = drr;
 847         dsp->dsa_vp = vp;
 848         dsp->dsa_outfd = outfd;
 849         dsp->dsa_proc = curproc;
 850         dsp->dsa_os = os;
 851         dsp->dsa_off = off;
 852         dsp->dsa_toguid = dsl_dataset_phys(to_ds)->ds_guid;
 853         dsp->dsa_pending_op = PENDING_NONE;
 854         dsp->dsa_featureflags = featureflags;
 855         dsp->dsa_resume_object = resumeobj;
 856         dsp->dsa_resume_offset = resumeoff;
 857 
 858         mutex_enter(&to_ds->ds_sendstream_lock);
 859         list_insert_head(&to_ds->ds_sendstreams, dsp);
 860         mutex_exit(&to_ds->ds_sendstream_lock);
 861 
 862         dsl_dataset_long_hold(to_ds, FTAG);
 863         dsl_pool_rele(dp, tag);
 864 
 865         void *payload = NULL;
 866         size_t payload_len = 0;
 867         if (resumeobj != 0 || resumeoff != 0) {
 868                 dmu_object_info_t to_doi;
 869                 err = dmu_object_info(os, resumeobj, &to_doi);
 870                 if (err != 0)
 871                         goto out;
 872                 SET_BOOKMARK(&to_arg.resume, to_ds->ds_object, resumeobj, 0,
 873                     resumeoff / to_doi.doi_data_block_size);
 874 
 875                 nvlist_t *nvl = fnvlist_alloc();
 876                 fnvlist_add_uint64(nvl, "resume_object", resumeobj);
 877                 fnvlist_add_uint64(nvl, "resume_offset", resumeoff);
 878                 payload = fnvlist_pack(nvl, &payload_len);
 879                 drr->drr_payloadlen = payload_len;
 880                 fnvlist_free(nvl);
 881         }
 882 
 883         err = dump_record(dsp, payload, payload_len);
 884         fnvlist_pack_free(payload, payload_len);
 885         if (err != 0) {
 886                 err = dsp->dsa_err;
 887                 goto out;
 888         }
 889 
 890         err = bqueue_init(&to_arg.q, zfs_send_queue_length,
 891             offsetof(struct send_block_record, ln));
 892         to_arg.error_code = 0;
 893         to_arg.cancel = B_FALSE;
 894         to_arg.ds = to_ds;
 895         to_arg.fromtxg = fromtxg;
 896         to_arg.flags = TRAVERSE_PRE | TRAVERSE_PREFETCH;
 897         (void) thread_create(NULL, 0, send_traverse_thread, &to_arg, 0, curproc,
 898             TS_RUN, minclsyspri);
 899 
 900         struct send_block_record *to_data;
 901         to_data = bqueue_dequeue(&to_arg.q);
 902 
 903         while (!to_data->eos_marker && err == 0) {
 904                 err = do_dump(dsp, to_data);
 905                 to_data = get_next_record(&to_arg.q, to_data);
 906                 if (issig(JUSTLOOKING) && issig(FORREAL))
 907                         err = EINTR;
 908         }
 909 
 910         if (err != 0) {
 911                 to_arg.cancel = B_TRUE;
 912                 while (!to_data->eos_marker) {
 913                         to_data = get_next_record(&to_arg.q, to_data);
 914                 }
 915         }
 916         kmem_free(to_data, sizeof (*to_data));
 917 
 918         bqueue_destroy(&to_arg.q);
 919 
 920         if (err == 0 && to_arg.error_code != 0)
 921                 err = to_arg.error_code;
 922 
 923         if (err != 0)
 924                 goto out;
 925 
 926         if (dsp->dsa_pending_op != PENDING_NONE)
 927                 if (dump_record(dsp, NULL, 0) != 0)
 928                         err = SET_ERROR(EINTR);
 929 
 930         if (err != 0) {
 931                 if (err == EINTR && dsp->dsa_err != 0)
 932                         err = dsp->dsa_err;
 933                 goto out;
 934         }
 935 
 936         bzero(drr, sizeof (dmu_replay_record_t));
 937         drr->drr_type = DRR_END;
 938         drr->drr_u.drr_end.drr_checksum = dsp->dsa_zc;
 939         drr->drr_u.drr_end.drr_toguid = dsp->dsa_toguid;
 940 
 941         if (dump_record(dsp, NULL, 0) != 0)
 942                 err = dsp->dsa_err;
 943 
 944 out:
 945         mutex_enter(&to_ds->ds_sendstream_lock);
 946         list_remove(&to_ds->ds_sendstreams, dsp);
 947         mutex_exit(&to_ds->ds_sendstream_lock);
 948 
 949         VERIFY(err != 0 || (dsp->dsa_sent_begin && dsp->dsa_sent_end));
 950 
 951         kmem_free(drr, sizeof (dmu_replay_record_t));
 952         kmem_free(dsp, sizeof (dmu_sendarg_t));
 953 
 954         dsl_dataset_long_rele(to_ds, FTAG);
 955 
 956         return (err);
 957 }
 958 
 959 int
 960 dmu_send_obj(const char *pool, uint64_t tosnap, uint64_t fromsnap,
 961     boolean_t embedok, boolean_t large_block_ok, boolean_t compressok,
 962     int outfd, vnode_t *vp, offset_t *off)
 963 {
 964         dsl_pool_t *dp;
 965         dsl_dataset_t *ds;
 966         dsl_dataset_t *fromds = NULL;
 967         int err;
 968 
 969         err = dsl_pool_hold(pool, FTAG, &dp);
 970         if (err != 0)
 971                 return (err);
 972 
 973         err = dsl_dataset_hold_obj(dp, tosnap, FTAG, &ds);
 974         if (err != 0) {
 975                 dsl_pool_rele(dp, FTAG);
 976                 return (err);
 977         }
 978 
 979         if (fromsnap != 0) {
 980                 zfs_bookmark_phys_t zb;
 981                 boolean_t is_clone;
 982 
 983                 err = dsl_dataset_hold_obj(dp, fromsnap, FTAG, &fromds);
 984                 if (err != 0) {
 985                         dsl_dataset_rele(ds, FTAG);
 986                         dsl_pool_rele(dp, FTAG);
 987                         return (err);
 988                 }
 989                 if (!dsl_dataset_is_before(ds, fromds, 0))
 990                         err = SET_ERROR(EXDEV);
 991                 zb.zbm_creation_time =
 992                     dsl_dataset_phys(fromds)->ds_creation_time;
 993                 zb.zbm_creation_txg = dsl_dataset_phys(fromds)->ds_creation_txg;
 994                 zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
 995                 is_clone = (fromds->ds_dir != ds->ds_dir);
 996                 dsl_dataset_rele(fromds, FTAG);
 997                 err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone,
 998                     embedok, large_block_ok, compressok, outfd, 0, 0, vp, off);
 999         } else {
1000                 err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE,
1001                     embedok, large_block_ok, compressok, outfd, 0, 0, vp, off);
1002         }
1003         dsl_dataset_rele(ds, FTAG);
1004         return (err);
1005 }
1006 
1007 int
1008 dmu_send(const char *tosnap, const char *fromsnap, boolean_t embedok,
1009     boolean_t large_block_ok, boolean_t compressok, int outfd,
1010     uint64_t resumeobj, uint64_t resumeoff,
1011     vnode_t *vp, offset_t *off)
1012 {
1013         dsl_pool_t *dp;
1014         dsl_dataset_t *ds;
1015         int err;
1016         boolean_t owned = B_FALSE;
1017 
1018         if (fromsnap != NULL && strpbrk(fromsnap, "@#") == NULL)
1019                 return (SET_ERROR(EINVAL));
1020 
1021         err = dsl_pool_hold(tosnap, FTAG, &dp);
1022         if (err != 0)
1023                 return (err);
1024 
1025         if (strchr(tosnap, '@') == NULL && spa_writeable(dp->dp_spa)) {
1026                 /*
1027                  * We are sending a filesystem or volume.  Ensure
1028                  * that it doesn't change by owning the dataset.
1029                  */
1030                 err = dsl_dataset_own(dp, tosnap, FTAG, &ds);
1031                 owned = B_TRUE;
1032         } else {
1033                 err = dsl_dataset_hold(dp, tosnap, FTAG, &ds);
1034         }
1035         if (err != 0) {
1036                 dsl_pool_rele(dp, FTAG);
1037                 return (err);
1038         }
1039 
1040         if (fromsnap != NULL) {
1041                 zfs_bookmark_phys_t zb;
1042                 boolean_t is_clone = B_FALSE;
1043                 int fsnamelen = strchr(tosnap, '@') - tosnap;
1044 
1045                 /*
1046                  * If the fromsnap is in a different filesystem, then
1047                  * mark the send stream as a clone.
1048                  */
1049                 if (strncmp(tosnap, fromsnap, fsnamelen) != 0 ||
1050                     (fromsnap[fsnamelen] != '@' &&
1051                     fromsnap[fsnamelen] != '#')) {
1052                         is_clone = B_TRUE;
1053                 }
1054 
1055                 if (strchr(fromsnap, '@')) {
1056                         dsl_dataset_t *fromds;
1057                         err = dsl_dataset_hold(dp, fromsnap, FTAG, &fromds);
1058                         if (err == 0) {
1059                                 if (!dsl_dataset_is_before(ds, fromds, 0))
1060                                         err = SET_ERROR(EXDEV);
1061                                 zb.zbm_creation_time =
1062                                     dsl_dataset_phys(fromds)->ds_creation_time;
1063                                 zb.zbm_creation_txg =
1064                                     dsl_dataset_phys(fromds)->ds_creation_txg;
1065                                 zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
1066                                 is_clone = (ds->ds_dir != fromds->ds_dir);
1067                                 dsl_dataset_rele(fromds, FTAG);
1068                         }
1069                 } else {
1070                         err = dsl_bookmark_lookup(dp, fromsnap, ds, &zb);
1071                 }
1072                 if (err != 0) {
1073                         dsl_dataset_rele(ds, FTAG);
1074                         dsl_pool_rele(dp, FTAG);
1075                         return (err);
1076                 }
1077                 err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone,
1078                     embedok, large_block_ok, compressok,
1079                     outfd, resumeobj, resumeoff, vp, off);
1080         } else {
1081                 err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE,
1082                     embedok, large_block_ok, compressok,
1083                     outfd, resumeobj, resumeoff, vp, off);
1084         }
1085         if (owned)
1086                 dsl_dataset_disown(ds, FTAG);
1087         else
1088                 dsl_dataset_rele(ds, FTAG);
1089         return (err);
1090 }
1091 
1092 static int
1093 dmu_adjust_send_estimate_for_indirects(dsl_dataset_t *ds, uint64_t uncompressed,
1094     uint64_t compressed, boolean_t stream_compressed, uint64_t *sizep)
1095 {
1096         int err;
1097         uint64_t size;
1098         /*
1099          * Assume that space (both on-disk and in-stream) is dominated by
1100          * data.  We will adjust for indirect blocks and the copies property,
1101          * but ignore per-object space used (eg, dnodes and DRR_OBJECT records).
1102          */
1103         uint64_t recordsize;
1104         uint64_t record_count;
1105         objset_t *os;
1106         VERIFY0(dmu_objset_from_ds(ds, &os));
1107 
1108         /* Assume all (uncompressed) blocks are recordsize. */
1109         if (os->os_phys->os_type == DMU_OST_ZVOL) {
1110                 err = dsl_prop_get_int_ds(ds,
1111                     zfs_prop_to_name(ZFS_PROP_VOLBLOCKSIZE), &recordsize);
1112         } else {
1113                 err = dsl_prop_get_int_ds(ds,
1114                     zfs_prop_to_name(ZFS_PROP_RECORDSIZE), &recordsize);
1115         }
1116         if (err != 0)
1117                 return (err);
1118         record_count = uncompressed / recordsize;
1119 
1120         /*
1121          * If we're estimating a send size for a compressed stream, use the
1122          * compressed data size to estimate the stream size. Otherwise, use the
1123          * uncompressed data size.
1124          */
1125         size = stream_compressed ? compressed : uncompressed;
1126 
1127         /*
1128          * Subtract out approximate space used by indirect blocks.
1129          * Assume most space is used by data blocks (non-indirect, non-dnode).
1130          * Assume no ditto blocks or internal fragmentation.
1131          *
1132          * Therefore, space used by indirect blocks is sizeof(blkptr_t) per
1133          * block.
1134          */
1135         size -= record_count * sizeof (blkptr_t);
1136 
1137         /* Add in the space for the record associated with each block. */
1138         size += record_count * sizeof (dmu_replay_record_t);
1139 
1140         *sizep = size;
1141 
1142         return (0);
1143 }
1144 
1145 int
1146 dmu_send_estimate(dsl_dataset_t *ds, dsl_dataset_t *fromds,
1147     boolean_t stream_compressed, uint64_t *sizep)
1148 {
1149         dsl_pool_t *dp = ds->ds_dir->dd_pool;
1150         int err;
1151         uint64_t uncomp, comp;
1152 
1153         ASSERT(dsl_pool_config_held(dp));
1154 
1155         /* tosnap must be a snapshot */
1156         if (!ds->ds_is_snapshot)
1157                 return (SET_ERROR(EINVAL));
1158 
1159         /* fromsnap, if provided, must be a snapshot */
1160         if (fromds != NULL && !fromds->ds_is_snapshot)
1161                 return (SET_ERROR(EINVAL));
1162 
1163         /*
1164          * fromsnap must be an earlier snapshot from the same fs as tosnap,
1165          * or the origin's fs.
1166          */
1167         if (fromds != NULL && !dsl_dataset_is_before(ds, fromds, 0))
1168                 return (SET_ERROR(EXDEV));
1169 
1170         /* Get compressed and uncompressed size estimates of changed data. */
1171         if (fromds == NULL) {
1172                 uncomp = dsl_dataset_phys(ds)->ds_uncompressed_bytes;
1173                 comp = dsl_dataset_phys(ds)->ds_compressed_bytes;
1174         } else {
1175                 uint64_t used;
1176                 err = dsl_dataset_space_written(fromds, ds,
1177                     &used, &comp, &uncomp);
1178                 if (err != 0)
1179                         return (err);
1180         }
1181 
1182         err = dmu_adjust_send_estimate_for_indirects(ds, uncomp, comp,
1183             stream_compressed, sizep);
1184         /*
1185          * Add the size of the BEGIN and END records to the estimate.
1186          */
1187         *sizep += 2 * sizeof (dmu_replay_record_t);
1188         return (err);
1189 }
1190 
1191 struct calculate_send_arg {
1192         uint64_t uncompressed;
1193         uint64_t compressed;
1194 };
1195 
1196 /*
1197  * Simple callback used to traverse the blocks of a snapshot and sum their
1198  * uncompressed and compressed sizes.
1199  */
1200 /* ARGSUSED */
1201 static int
1202 dmu_calculate_send_traversal(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
1203     const zbookmark_phys_t *zb, const dnode_phys_t *dnp, void *arg)
1204 {
1205         struct calculate_send_arg *space = arg;
1206         if (bp != NULL && !BP_IS_HOLE(bp)) {
1207                 space->uncompressed += BP_GET_UCSIZE(bp);
1208                 space->compressed += BP_GET_PSIZE(bp);
1209         }
1210         return (0);
1211 }
1212 
1213 /*
1214  * Given a desination snapshot and a TXG, calculate the approximate size of a
1215  * send stream sent from that TXG. from_txg may be zero, indicating that the
1216  * whole snapshot will be sent.
1217  */
1218 int
1219 dmu_send_estimate_from_txg(dsl_dataset_t *ds, uint64_t from_txg,
1220     boolean_t stream_compressed, uint64_t *sizep)
1221 {
1222         dsl_pool_t *dp = ds->ds_dir->dd_pool;
1223         int err;
1224         struct calculate_send_arg size = { 0 };
1225 
1226         ASSERT(dsl_pool_config_held(dp));
1227 
1228         /* tosnap must be a snapshot */
1229         if (!ds->ds_is_snapshot)
1230                 return (SET_ERROR(EINVAL));
1231 
1232         /* verify that from_txg is before the provided snapshot was taken */
1233         if (from_txg >= dsl_dataset_phys(ds)->ds_creation_txg) {
1234                 return (SET_ERROR(EXDEV));
1235         }
1236 
1237         /*
1238          * traverse the blocks of the snapshot with birth times after
1239          * from_txg, summing their uncompressed size
1240          */
1241         err = traverse_dataset(ds, from_txg, TRAVERSE_POST,
1242             dmu_calculate_send_traversal, &size);
1243         if (err)
1244                 return (err);
1245 
1246         err = dmu_adjust_send_estimate_for_indirects(ds, size.uncompressed,
1247             size.compressed, stream_compressed, sizep);
1248         return (err);
1249 }
1250 
1251 typedef struct dmu_recv_begin_arg {
1252         const char *drba_origin;
1253         dmu_recv_cookie_t *drba_cookie;
1254         cred_t *drba_cred;
1255         uint64_t drba_snapobj;
1256 } dmu_recv_begin_arg_t;
1257 
1258 static int
1259 recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds,
1260     uint64_t fromguid)
1261 {
1262         uint64_t val;
1263         int error;
1264         dsl_pool_t *dp = ds->ds_dir->dd_pool;
1265 
1266         /* temporary clone name must not exist */
1267         error = zap_lookup(dp->dp_meta_objset,
1268             dsl_dir_phys(ds->ds_dir)->dd_child_dir_zapobj, recv_clone_name,
1269             8, 1, &val);
1270         if (error != ENOENT)
1271                 return (error == 0 ? EBUSY : error);
1272 
1273         /* new snapshot name must not exist */
1274         error = zap_lookup(dp->dp_meta_objset,
1275             dsl_dataset_phys(ds)->ds_snapnames_zapobj,
1276             drba->drba_cookie->drc_tosnap, 8, 1, &val);
1277         if (error != ENOENT)
1278                 return (error == 0 ? EEXIST : error);
1279 
1280         /*
1281          * Check snapshot limit before receiving. We'll recheck again at the
1282          * end, but might as well abort before receiving if we're already over
1283          * the limit.
1284          *
1285          * Note that we do not check the file system limit with
1286          * dsl_dir_fscount_check because the temporary %clones don't count
1287          * against that limit.
1288          */
1289         error = dsl_fs_ss_limit_check(ds->ds_dir, 1, ZFS_PROP_SNAPSHOT_LIMIT,
1290             NULL, drba->drba_cred);
1291         if (error != 0)
1292                 return (error);
1293 
1294         if (fromguid != 0) {
1295                 dsl_dataset_t *snap;
1296                 uint64_t obj = dsl_dataset_phys(ds)->ds_prev_snap_obj;
1297 
1298                 /* Find snapshot in this dir that matches fromguid. */
1299                 while (obj != 0) {
1300                         error = dsl_dataset_hold_obj(dp, obj, FTAG,
1301                             &snap);
1302                         if (error != 0)
1303                                 return (SET_ERROR(ENODEV));
1304                         if (snap->ds_dir != ds->ds_dir) {
1305                                 dsl_dataset_rele(snap, FTAG);
1306                                 return (SET_ERROR(ENODEV));
1307                         }
1308                         if (dsl_dataset_phys(snap)->ds_guid == fromguid)
1309                                 break;
1310                         obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
1311                         dsl_dataset_rele(snap, FTAG);
1312                 }
1313                 if (obj == 0)
1314                         return (SET_ERROR(ENODEV));
1315 
1316                 if (drba->drba_cookie->drc_force) {
1317                         drba->drba_snapobj = obj;
1318                 } else {
1319                         /*
1320                          * If we are not forcing, there must be no
1321                          * changes since fromsnap.
1322                          */
1323                         if (dsl_dataset_modified_since_snap(ds, snap)) {
1324                                 dsl_dataset_rele(snap, FTAG);
1325                                 return (SET_ERROR(ETXTBSY));
1326                         }
1327                         drba->drba_snapobj = ds->ds_prev->ds_object;
1328                 }
1329 
1330                 dsl_dataset_rele(snap, FTAG);
1331         } else {
1332                 /* if full, then must be forced */
1333                 if (!drba->drba_cookie->drc_force)
1334                         return (SET_ERROR(EEXIST));
1335                 /* start from $ORIGIN@$ORIGIN, if supported */
1336                 drba->drba_snapobj = dp->dp_origin_snap != NULL ?
1337                     dp->dp_origin_snap->ds_object : 0;
1338         }
1339 
1340         return (0);
1341 
1342 }
1343 
1344 static int
1345 dmu_recv_begin_check(void *arg, dmu_tx_t *tx)
1346 {
1347         dmu_recv_begin_arg_t *drba = arg;
1348         dsl_pool_t *dp = dmu_tx_pool(tx);
1349         struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
1350         uint64_t fromguid = drrb->drr_fromguid;
1351         int flags = drrb->drr_flags;
1352         int error;
1353         uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
1354         dsl_dataset_t *ds;
1355         const char *tofs = drba->drba_cookie->drc_tofs;
1356 
1357         /* already checked */
1358         ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC);
1359         ASSERT(!(featureflags & DMU_BACKUP_FEATURE_RESUMING));
1360 
1361         if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) ==
1362             DMU_COMPOUNDSTREAM ||
1363             drrb->drr_type >= DMU_OST_NUMTYPES ||
1364             ((flags & DRR_FLAG_CLONE) && drba->drba_origin == NULL))
1365                 return (SET_ERROR(EINVAL));
1366 
1367         /* Verify pool version supports SA if SA_SPILL feature set */
1368         if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) &&
1369             spa_version(dp->dp_spa) < SPA_VERSION_SA)
1370                 return (SET_ERROR(ENOTSUP));
1371 
1372         if (drba->drba_cookie->drc_resumable &&
1373             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EXTENSIBLE_DATASET))
1374                 return (SET_ERROR(ENOTSUP));
1375 
1376         /*
1377          * The receiving code doesn't know how to translate a WRITE_EMBEDDED
1378          * record to a plain WRITE record, so the pool must have the
1379          * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED
1380          * records.  Same with WRITE_EMBEDDED records that use LZ4 compression.
1381          */
1382         if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) &&
1383             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA))
1384                 return (SET_ERROR(ENOTSUP));
1385         if ((featureflags & DMU_BACKUP_FEATURE_LZ4) &&
1386             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS))
1387                 return (SET_ERROR(ENOTSUP));
1388 
1389         /*
1390          * The receiving code doesn't know how to translate large blocks
1391          * to smaller ones, so the pool must have the LARGE_BLOCKS
1392          * feature enabled if the stream has LARGE_BLOCKS.
1393          */
1394         if ((featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
1395             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_BLOCKS))
1396                 return (SET_ERROR(ENOTSUP));
1397 
1398         error = dsl_dataset_hold(dp, tofs, FTAG, &ds);
1399         if (error == 0) {
1400                 /* target fs already exists; recv into temp clone */
1401 
1402                 /* Can't recv a clone into an existing fs */
1403                 if (flags & DRR_FLAG_CLONE || drba->drba_origin) {
1404                         dsl_dataset_rele(ds, FTAG);
1405                         return (SET_ERROR(EINVAL));
1406                 }
1407 
1408                 error = recv_begin_check_existing_impl(drba, ds, fromguid);
1409                 dsl_dataset_rele(ds, FTAG);
1410         } else if (error == ENOENT) {
1411                 /* target fs does not exist; must be a full backup or clone */
1412                 char buf[ZFS_MAX_DATASET_NAME_LEN];
1413 
1414                 /*
1415                  * If it's a non-clone incremental, we are missing the
1416                  * target fs, so fail the recv.
1417                  */
1418                 if (fromguid != 0 && !(flags & DRR_FLAG_CLONE ||
1419                     drba->drba_origin))
1420                         return (SET_ERROR(ENOENT));
1421 
1422                 /*
1423                  * If we're receiving a full send as a clone, and it doesn't
1424                  * contain all the necessary free records and freeobject
1425                  * records, reject it.
1426                  */
1427                 if (fromguid == 0 && drba->drba_origin &&
1428                     !(flags & DRR_FLAG_FREERECORDS))
1429                         return (SET_ERROR(EINVAL));
1430 
1431                 /* Open the parent of tofs */
1432                 ASSERT3U(strlen(tofs), <, sizeof (buf));
1433                 (void) strlcpy(buf, tofs, strrchr(tofs, '/') - tofs + 1);
1434                 error = dsl_dataset_hold(dp, buf, FTAG, &ds);
1435                 if (error != 0)
1436                         return (error);
1437 
1438                 /*
1439                  * Check filesystem and snapshot limits before receiving. We'll
1440                  * recheck snapshot limits again at the end (we create the
1441                  * filesystems and increment those counts during begin_sync).
1442                  */
1443                 error = dsl_fs_ss_limit_check(ds->ds_dir, 1,
1444                     ZFS_PROP_FILESYSTEM_LIMIT, NULL, drba->drba_cred);
1445                 if (error != 0) {
1446                         dsl_dataset_rele(ds, FTAG);
1447                         return (error);
1448                 }
1449 
1450                 error = dsl_fs_ss_limit_check(ds->ds_dir, 1,
1451                     ZFS_PROP_SNAPSHOT_LIMIT, NULL, drba->drba_cred);
1452                 if (error != 0) {
1453                         dsl_dataset_rele(ds, FTAG);
1454                         return (error);
1455                 }
1456 
1457                 if (drba->drba_origin != NULL) {
1458                         dsl_dataset_t *origin;
1459                         error = dsl_dataset_hold(dp, drba->drba_origin,
1460                             FTAG, &origin);
1461                         if (error != 0) {
1462                                 dsl_dataset_rele(ds, FTAG);
1463                                 return (error);
1464                         }
1465                         if (!origin->ds_is_snapshot) {
1466                                 dsl_dataset_rele(origin, FTAG);
1467                                 dsl_dataset_rele(ds, FTAG);
1468                                 return (SET_ERROR(EINVAL));
1469                         }
1470                         if (dsl_dataset_phys(origin)->ds_guid != fromguid &&
1471                             fromguid != 0) {
1472                                 dsl_dataset_rele(origin, FTAG);
1473                                 dsl_dataset_rele(ds, FTAG);
1474                                 return (SET_ERROR(ENODEV));
1475                         }
1476                         dsl_dataset_rele(origin, FTAG);
1477                 }
1478                 dsl_dataset_rele(ds, FTAG);
1479                 error = 0;
1480         }
1481         return (error);
1482 }
1483 
1484 static void
1485 dmu_recv_begin_sync(void *arg, dmu_tx_t *tx)
1486 {
1487         dmu_recv_begin_arg_t *drba = arg;
1488         dsl_pool_t *dp = dmu_tx_pool(tx);
1489         objset_t *mos = dp->dp_meta_objset;
1490         struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
1491         const char *tofs = drba->drba_cookie->drc_tofs;
1492         dsl_dataset_t *ds, *newds;
1493         uint64_t dsobj;
1494         int error;
1495         uint64_t crflags = 0;
1496 
1497         if (drrb->drr_flags & DRR_FLAG_CI_DATA)
1498                 crflags |= DS_FLAG_CI_DATASET;
1499 
1500         error = dsl_dataset_hold(dp, tofs, FTAG, &ds);
1501         if (error == 0) {
1502                 /* create temporary clone */
1503                 dsl_dataset_t *snap = NULL;
1504                 if (drba->drba_snapobj != 0) {
1505                         VERIFY0(dsl_dataset_hold_obj(dp,
1506                             drba->drba_snapobj, FTAG, &snap));
1507                 }
1508                 dsobj = dsl_dataset_create_sync(ds->ds_dir, recv_clone_name,
1509                     snap, crflags, drba->drba_cred, tx);
1510                 if (drba->drba_snapobj != 0)
1511                         dsl_dataset_rele(snap, FTAG);
1512                 dsl_dataset_rele(ds, FTAG);
1513         } else {
1514                 dsl_dir_t *dd;
1515                 const char *tail;
1516                 dsl_dataset_t *origin = NULL;
1517 
1518                 VERIFY0(dsl_dir_hold(dp, tofs, FTAG, &dd, &tail));
1519 
1520                 if (drba->drba_origin != NULL) {
1521                         VERIFY0(dsl_dataset_hold(dp, drba->drba_origin,
1522                             FTAG, &origin));
1523                 }
1524 
1525                 /* Create new dataset. */
1526                 dsobj = dsl_dataset_create_sync(dd,
1527                     strrchr(tofs, '/') + 1,
1528                     origin, crflags, drba->drba_cred, tx);
1529                 if (origin != NULL)
1530                         dsl_dataset_rele(origin, FTAG);
1531                 dsl_dir_rele(dd, FTAG);
1532                 drba->drba_cookie->drc_newfs = B_TRUE;
1533         }
1534         VERIFY0(dsl_dataset_own_obj(dp, dsobj, dmu_recv_tag, &newds));
1535 
1536         if (drba->drba_cookie->drc_resumable) {
1537                 dsl_dataset_zapify(newds, tx);
1538                 if (drrb->drr_fromguid != 0) {
1539                         VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_FROMGUID,
1540                             8, 1, &drrb->drr_fromguid, tx));
1541                 }
1542                 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TOGUID,
1543                     8, 1, &drrb->drr_toguid, tx));
1544                 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TONAME,
1545                     1, strlen(drrb->drr_toname) + 1, drrb->drr_toname, tx));
1546                 uint64_t one = 1;
1547                 uint64_t zero = 0;
1548                 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OBJECT,
1549                     8, 1, &one, tx));
1550                 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OFFSET,
1551                     8, 1, &zero, tx));
1552                 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_BYTES,
1553                     8, 1, &zero, tx));
1554                 if (DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo) &
1555                     DMU_BACKUP_FEATURE_LARGE_BLOCKS) {
1556                         VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_LARGEBLOCK,
1557                             8, 1, &one, tx));
1558                 }
1559                 if (DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo) &
1560                     DMU_BACKUP_FEATURE_EMBED_DATA) {
1561                         VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_EMBEDOK,
1562                             8, 1, &one, tx));
1563                 }
1564                 if (DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo) &
1565                     DMU_BACKUP_FEATURE_COMPRESSED) {
1566                         VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_COMPRESSOK,
1567                             8, 1, &one, tx));
1568                 }
1569         }
1570 
1571         dmu_buf_will_dirty(newds->ds_dbuf, tx);
1572         dsl_dataset_phys(newds)->ds_flags |= DS_FLAG_INCONSISTENT;
1573 
1574         /*
1575          * If we actually created a non-clone, we need to create the
1576          * objset in our new dataset.
1577          */
1578         rrw_enter(&newds->ds_bp_rwlock, RW_READER, FTAG);
1579         if (BP_IS_HOLE(dsl_dataset_get_blkptr(newds))) {
1580                 (void) dmu_objset_create_impl(dp->dp_spa,
1581                     newds, dsl_dataset_get_blkptr(newds), drrb->drr_type, tx);
1582         }
1583         rrw_exit(&newds->ds_bp_rwlock, FTAG);
1584 
1585         drba->drba_cookie->drc_ds = newds;
1586 
1587         spa_history_log_internal_ds(newds, "receive", tx, "");
1588 }
1589 
1590 static int
1591 dmu_recv_resume_begin_check(void *arg, dmu_tx_t *tx)
1592 {
1593         dmu_recv_begin_arg_t *drba = arg;
1594         dsl_pool_t *dp = dmu_tx_pool(tx);
1595         struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
1596         int error;
1597         uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
1598         dsl_dataset_t *ds;
1599         const char *tofs = drba->drba_cookie->drc_tofs;
1600 
1601         /* already checked */
1602         ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC);
1603         ASSERT(featureflags & DMU_BACKUP_FEATURE_RESUMING);
1604 
1605         if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) ==
1606             DMU_COMPOUNDSTREAM ||
1607             drrb->drr_type >= DMU_OST_NUMTYPES)
1608                 return (SET_ERROR(EINVAL));
1609 
1610         /* Verify pool version supports SA if SA_SPILL feature set */
1611         if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) &&
1612             spa_version(dp->dp_spa) < SPA_VERSION_SA)
1613                 return (SET_ERROR(ENOTSUP));
1614 
1615         /*
1616          * The receiving code doesn't know how to translate a WRITE_EMBEDDED
1617          * record to a plain WRITE record, so the pool must have the
1618          * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED
1619          * records.  Same with WRITE_EMBEDDED records that use LZ4 compression.
1620          */
1621         if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) &&
1622             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA))
1623                 return (SET_ERROR(ENOTSUP));
1624         if ((featureflags & DMU_BACKUP_FEATURE_LZ4) &&
1625             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS))
1626                 return (SET_ERROR(ENOTSUP));
1627 
1628         /* 6 extra bytes for /%recv */
1629         char recvname[ZFS_MAX_DATASET_NAME_LEN + 6];
1630 
1631         (void) snprintf(recvname, sizeof (recvname), "%s/%s",
1632             tofs, recv_clone_name);
1633 
1634         if (dsl_dataset_hold(dp, recvname, FTAG, &ds) != 0) {
1635                 /* %recv does not exist; continue in tofs */
1636                 error = dsl_dataset_hold(dp, tofs, FTAG, &ds);
1637                 if (error != 0)
1638                         return (error);
1639         }
1640 
1641         /* check that ds is marked inconsistent */
1642         if (!DS_IS_INCONSISTENT(ds)) {
1643                 dsl_dataset_rele(ds, FTAG);
1644                 return (SET_ERROR(EINVAL));
1645         }
1646 
1647         /* check that there is resuming data, and that the toguid matches */
1648         if (!dsl_dataset_is_zapified(ds)) {
1649                 dsl_dataset_rele(ds, FTAG);
1650                 return (SET_ERROR(EINVAL));
1651         }
1652         uint64_t val;
1653         error = zap_lookup(dp->dp_meta_objset, ds->ds_object,
1654             DS_FIELD_RESUME_TOGUID, sizeof (val), 1, &val);
1655         if (error != 0 || drrb->drr_toguid != val) {
1656                 dsl_dataset_rele(ds, FTAG);
1657                 return (SET_ERROR(EINVAL));
1658         }
1659 
1660         /*
1661          * Check if the receive is still running.  If so, it will be owned.
1662          * Note that nothing else can own the dataset (e.g. after the receive
1663          * fails) because it will be marked inconsistent.
1664          */
1665         if (dsl_dataset_has_owner(ds)) {
1666                 dsl_dataset_rele(ds, FTAG);
1667                 return (SET_ERROR(EBUSY));
1668         }
1669 
1670         /* There should not be any snapshots of this fs yet. */
1671         if (ds->ds_prev != NULL && ds->ds_prev->ds_dir == ds->ds_dir) {
1672                 dsl_dataset_rele(ds, FTAG);
1673                 return (SET_ERROR(EINVAL));
1674         }
1675 
1676         /*
1677          * Note: resume point will be checked when we process the first WRITE
1678          * record.
1679          */
1680 
1681         /* check that the origin matches */
1682         val = 0;
1683         (void) zap_lookup(dp->dp_meta_objset, ds->ds_object,
1684             DS_FIELD_RESUME_FROMGUID, sizeof (val), 1, &val);
1685         if (drrb->drr_fromguid != val) {
1686                 dsl_dataset_rele(ds, FTAG);
1687                 return (SET_ERROR(EINVAL));
1688         }
1689 
1690         dsl_dataset_rele(ds, FTAG);
1691         return (0);
1692 }
1693 
1694 static void
1695 dmu_recv_resume_begin_sync(void *arg, dmu_tx_t *tx)
1696 {
1697         dmu_recv_begin_arg_t *drba = arg;
1698         dsl_pool_t *dp = dmu_tx_pool(tx);
1699         const char *tofs = drba->drba_cookie->drc_tofs;
1700         dsl_dataset_t *ds;
1701         uint64_t dsobj;
1702         /* 6 extra bytes for /%recv */
1703         char recvname[ZFS_MAX_DATASET_NAME_LEN + 6];
1704 
1705         (void) snprintf(recvname, sizeof (recvname), "%s/%s",
1706             tofs, recv_clone_name);
1707 
1708         if (dsl_dataset_hold(dp, recvname, FTAG, &ds) != 0) {
1709                 /* %recv does not exist; continue in tofs */
1710                 VERIFY0(dsl_dataset_hold(dp, tofs, FTAG, &ds));
1711                 drba->drba_cookie->drc_newfs = B_TRUE;
1712         }
1713 
1714         /* clear the inconsistent flag so that we can own it */
1715         ASSERT(DS_IS_INCONSISTENT(ds));
1716         dmu_buf_will_dirty(ds->ds_dbuf, tx);
1717         dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT;
1718         dsobj = ds->ds_object;
1719         dsl_dataset_rele(ds, FTAG);
1720 
1721         VERIFY0(dsl_dataset_own_obj(dp, dsobj, dmu_recv_tag, &ds));
1722 
1723         dmu_buf_will_dirty(ds->ds_dbuf, tx);
1724         dsl_dataset_phys(ds)->ds_flags |= DS_FLAG_INCONSISTENT;
1725 
1726         rrw_enter(&ds->ds_bp_rwlock, RW_READER, FTAG);
1727         ASSERT(!BP_IS_HOLE(dsl_dataset_get_blkptr(ds)));
1728         rrw_exit(&ds->ds_bp_rwlock, FTAG);
1729 
1730         drba->drba_cookie->drc_ds = ds;
1731 
1732         spa_history_log_internal_ds(ds, "resume receive", tx, "");
1733 }
1734 
1735 /*
1736  * NB: callers *MUST* call dmu_recv_stream() if dmu_recv_begin()
1737  * succeeds; otherwise we will leak the holds on the datasets.
1738  */
1739 int
1740 dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin,
1741     boolean_t force, boolean_t resumable, char *origin, dmu_recv_cookie_t *drc)
1742 {
1743         dmu_recv_begin_arg_t drba = { 0 };
1744 
1745         bzero(drc, sizeof (dmu_recv_cookie_t));
1746         drc->drc_drr_begin = drr_begin;
1747         drc->drc_drrb = &drr_begin->drr_u.drr_begin;
1748         drc->drc_tosnap = tosnap;
1749         drc->drc_tofs = tofs;
1750         drc->drc_force = force;
1751         drc->drc_resumable = resumable;
1752         drc->drc_cred = CRED();
1753 
1754         if (drc->drc_drrb->drr_magic == BSWAP_64(DMU_BACKUP_MAGIC)) {
1755                 drc->drc_byteswap = B_TRUE;
1756                 (void) fletcher_4_incremental_byteswap(drr_begin,
1757                     sizeof (dmu_replay_record_t), &drc->drc_cksum);
1758                 byteswap_record(drr_begin);
1759         } else if (drc->drc_drrb->drr_magic == DMU_BACKUP_MAGIC) {
1760                 (void) fletcher_4_incremental_native(drr_begin,
1761                     sizeof (dmu_replay_record_t), &drc->drc_cksum);
1762         } else {
1763                 return (SET_ERROR(EINVAL));
1764         }
1765 
1766         drba.drba_origin = origin;
1767         drba.drba_cookie = drc;
1768         drba.drba_cred = CRED();
1769 
1770         if (DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo) &
1771             DMU_BACKUP_FEATURE_RESUMING) {
1772                 return (dsl_sync_task(tofs,
1773                     dmu_recv_resume_begin_check, dmu_recv_resume_begin_sync,
1774                     &drba, 5, ZFS_SPACE_CHECK_NORMAL));
1775         } else  {
1776                 return (dsl_sync_task(tofs,
1777                     dmu_recv_begin_check, dmu_recv_begin_sync,
1778                     &drba, 5, ZFS_SPACE_CHECK_NORMAL));
1779         }
1780 }
1781 
1782 struct receive_record_arg {
1783         dmu_replay_record_t header;
1784         void *payload; /* Pointer to a buffer containing the payload */
1785         /*
1786          * If the record is a write, pointer to the arc_buf_t containing the
1787          * payload.
1788          */
1789         arc_buf_t *write_buf;
1790         int payload_size;
1791         uint64_t bytes_read; /* bytes read from stream when record created */
1792         boolean_t eos_marker; /* Marks the end of the stream */
1793         bqueue_node_t node;
1794 };
1795 
1796 struct receive_writer_arg {
1797         objset_t *os;
1798         boolean_t byteswap;
1799         bqueue_t q;
1800 
1801         /*
1802          * These three args are used to signal to the main thread that we're
1803          * done.
1804          */
1805         kmutex_t mutex;
1806         kcondvar_t cv;
1807         boolean_t done;
1808 
1809         int err;
1810         /* A map from guid to dataset to help handle dedup'd streams. */
1811         avl_tree_t *guid_to_ds_map;
1812         boolean_t resumable;
1813         uint64_t last_object, last_offset;
1814         uint64_t bytes_read; /* bytes read when current record created */
1815 };
1816 
1817 struct objlist {
1818         list_t list; /* List of struct receive_objnode. */
1819         /*
1820          * Last object looked up. Used to assert that objects are being looked
1821          * up in ascending order.
1822          */
1823         uint64_t last_lookup;
1824 };
1825 
1826 struct receive_objnode {
1827         list_node_t node;
1828         uint64_t object;
1829 };
1830 
1831 struct receive_arg {
1832         objset_t *os;
1833         vnode_t *vp; /* The vnode to read the stream from */
1834         uint64_t voff; /* The current offset in the stream */
1835         uint64_t bytes_read;
1836         /*
1837          * A record that has had its payload read in, but hasn't yet been handed
1838          * off to the worker thread.
1839          */
1840         struct receive_record_arg *rrd;
1841         /* A record that has had its header read in, but not its payload. */
1842         struct receive_record_arg *next_rrd;
1843         zio_cksum_t cksum;
1844         zio_cksum_t prev_cksum;
1845         int err;
1846         boolean_t byteswap;
1847         /* Sorted list of objects not to issue prefetches for. */
1848         struct objlist ignore_objlist;
1849 };
1850 
1851 typedef struct guid_map_entry {
1852         uint64_t        guid;
1853         dsl_dataset_t   *gme_ds;
1854         avl_node_t      avlnode;
1855 } guid_map_entry_t;
1856 
1857 static int
1858 guid_compare(const void *arg1, const void *arg2)
1859 {
1860         const guid_map_entry_t *gmep1 = arg1;
1861         const guid_map_entry_t *gmep2 = arg2;
1862 
1863         if (gmep1->guid < gmep2->guid)
1864                 return (-1);
1865         else if (gmep1->guid > gmep2->guid)
1866                 return (1);
1867         return (0);
1868 }
1869 
1870 static void
1871 free_guid_map_onexit(void *arg)
1872 {
1873         avl_tree_t *ca = arg;
1874         void *cookie = NULL;
1875         guid_map_entry_t *gmep;
1876 
1877         while ((gmep = avl_destroy_nodes(ca, &cookie)) != NULL) {
1878                 dsl_dataset_long_rele(gmep->gme_ds, gmep);
1879                 dsl_dataset_rele(gmep->gme_ds, gmep);
1880                 kmem_free(gmep, sizeof (guid_map_entry_t));
1881         }
1882         avl_destroy(ca);
1883         kmem_free(ca, sizeof (avl_tree_t));
1884 }
1885 
1886 static int
1887 receive_read(struct receive_arg *ra, int len, void *buf)
1888 {
1889         int done = 0;
1890 
1891         /*
1892          * The code doesn't rely on this (lengths being multiples of 8).  See
1893          * comment in dump_bytes.
1894          */
1895         ASSERT0(len % 8);
1896 
1897         while (done < len) {
1898                 ssize_t resid;
1899 
1900                 ra->err = vn_rdwr(UIO_READ, ra->vp,
1901                     (char *)buf + done, len - done,
1902                     ra->voff, UIO_SYSSPACE, FAPPEND,
1903                     RLIM64_INFINITY, CRED(), &resid);
1904 
1905                 if (resid == len - done) {
1906                         /*
1907                          * Note: ECKSUM indicates that the receive
1908                          * was interrupted and can potentially be resumed.
1909                          */
1910                         ra->err = SET_ERROR(ECKSUM);
1911                 }
1912                 ra->voff += len - done - resid;
1913                 done = len - resid;
1914                 if (ra->err != 0)
1915                         return (ra->err);
1916         }
1917 
1918         ra->bytes_read += len;
1919 
1920         ASSERT3U(done, ==, len);
1921         return (0);
1922 }
1923 
1924 static void
1925 byteswap_record(dmu_replay_record_t *drr)
1926 {
1927 #define DO64(X) (drr->drr_u.X = BSWAP_64(drr->drr_u.X))
1928 #define DO32(X) (drr->drr_u.X = BSWAP_32(drr->drr_u.X))
1929         drr->drr_type = BSWAP_32(drr->drr_type);
1930         drr->drr_payloadlen = BSWAP_32(drr->drr_payloadlen);
1931 
1932         switch (drr->drr_type) {
1933         case DRR_BEGIN:
1934                 DO64(drr_begin.drr_magic);
1935                 DO64(drr_begin.drr_versioninfo);
1936                 DO64(drr_begin.drr_creation_time);
1937                 DO32(drr_begin.drr_type);
1938                 DO32(drr_begin.drr_flags);
1939                 DO64(drr_begin.drr_toguid);
1940                 DO64(drr_begin.drr_fromguid);
1941                 break;
1942         case DRR_OBJECT:
1943                 DO64(drr_object.drr_object);
1944                 DO32(drr_object.drr_type);
1945                 DO32(drr_object.drr_bonustype);
1946                 DO32(drr_object.drr_blksz);
1947                 DO32(drr_object.drr_bonuslen);
1948                 DO64(drr_object.drr_toguid);
1949                 break;
1950         case DRR_FREEOBJECTS:
1951                 DO64(drr_freeobjects.drr_firstobj);
1952                 DO64(drr_freeobjects.drr_numobjs);
1953                 DO64(drr_freeobjects.drr_toguid);
1954                 break;
1955         case DRR_WRITE:
1956                 DO64(drr_write.drr_object);
1957                 DO32(drr_write.drr_type);
1958                 DO64(drr_write.drr_offset);
1959                 DO64(drr_write.drr_logical_size);
1960                 DO64(drr_write.drr_toguid);
1961                 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write.drr_key.ddk_cksum);
1962                 DO64(drr_write.drr_key.ddk_prop);
1963                 DO64(drr_write.drr_compressed_size);
1964                 break;
1965         case DRR_WRITE_BYREF:
1966                 DO64(drr_write_byref.drr_object);
1967                 DO64(drr_write_byref.drr_offset);
1968                 DO64(drr_write_byref.drr_length);
1969                 DO64(drr_write_byref.drr_toguid);
1970                 DO64(drr_write_byref.drr_refguid);
1971                 DO64(drr_write_byref.drr_refobject);
1972                 DO64(drr_write_byref.drr_refoffset);
1973                 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write_byref.
1974                     drr_key.ddk_cksum);
1975                 DO64(drr_write_byref.drr_key.ddk_prop);
1976                 break;
1977         case DRR_WRITE_EMBEDDED:
1978                 DO64(drr_write_embedded.drr_object);
1979                 DO64(drr_write_embedded.drr_offset);
1980                 DO64(drr_write_embedded.drr_length);
1981                 DO64(drr_write_embedded.drr_toguid);
1982                 DO32(drr_write_embedded.drr_lsize);
1983                 DO32(drr_write_embedded.drr_psize);
1984                 break;
1985         case DRR_FREE:
1986                 DO64(drr_free.drr_object);
1987                 DO64(drr_free.drr_offset);
1988                 DO64(drr_free.drr_length);
1989                 DO64(drr_free.drr_toguid);
1990                 break;
1991         case DRR_SPILL:
1992                 DO64(drr_spill.drr_object);
1993                 DO64(drr_spill.drr_length);
1994                 DO64(drr_spill.drr_toguid);
1995                 break;
1996         case DRR_END:
1997                 DO64(drr_end.drr_toguid);
1998                 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_end.drr_checksum);
1999                 break;
2000         }
2001 
2002         if (drr->drr_type != DRR_BEGIN) {
2003                 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_checksum.drr_checksum);
2004         }
2005 
2006 #undef DO64
2007 #undef DO32
2008 }
2009 
2010 static inline uint8_t
2011 deduce_nblkptr(dmu_object_type_t bonus_type, uint64_t bonus_size)
2012 {
2013         if (bonus_type == DMU_OT_SA) {
2014                 return (1);
2015         } else {
2016                 return (1 +
2017                     ((DN_MAX_BONUSLEN - bonus_size) >> SPA_BLKPTRSHIFT));
2018         }
2019 }
2020 
2021 static void
2022 save_resume_state(struct receive_writer_arg *rwa,
2023     uint64_t object, uint64_t offset, dmu_tx_t *tx)
2024 {
2025         int txgoff = dmu_tx_get_txg(tx) & TXG_MASK;
2026 
2027         if (!rwa->resumable)
2028                 return;
2029 
2030         /*
2031          * We use ds_resume_bytes[] != 0 to indicate that we need to
2032          * update this on disk, so it must not be 0.
2033          */
2034         ASSERT(rwa->bytes_read != 0);
2035 
2036         /*
2037          * We only resume from write records, which have a valid
2038          * (non-meta-dnode) object number.
2039          */
2040         ASSERT(object != 0);
2041 
2042         /*
2043          * For resuming to work correctly, we must receive records in order,
2044          * sorted by object,offset.  This is checked by the callers, but
2045          * assert it here for good measure.
2046          */
2047         ASSERT3U(object, >=, rwa->os->os_dsl_dataset->ds_resume_object[txgoff]);
2048         ASSERT(object != rwa->os->os_dsl_dataset->ds_resume_object[txgoff] ||
2049             offset >= rwa->os->os_dsl_dataset->ds_resume_offset[txgoff]);
2050         ASSERT3U(rwa->bytes_read, >=,
2051             rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff]);
2052 
2053         rwa->os->os_dsl_dataset->ds_resume_object[txgoff] = object;
2054         rwa->os->os_dsl_dataset->ds_resume_offset[txgoff] = offset;
2055         rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff] = rwa->bytes_read;
2056 }
2057 
2058 static int
2059 receive_object(struct receive_writer_arg *rwa, struct drr_object *drro,
2060     void *data)
2061 {
2062         dmu_object_info_t doi;
2063         dmu_tx_t *tx;
2064         uint64_t object;
2065         int err;
2066 
2067         if (drro->drr_type == DMU_OT_NONE ||
2068             !DMU_OT_IS_VALID(drro->drr_type) ||
2069             !DMU_OT_IS_VALID(drro->drr_bonustype) ||
2070             drro->drr_checksumtype >= ZIO_CHECKSUM_FUNCTIONS ||
2071             drro->drr_compress >= ZIO_COMPRESS_FUNCTIONS ||
2072             P2PHASE(drro->drr_blksz, SPA_MINBLOCKSIZE) ||
2073             drro->drr_blksz < SPA_MINBLOCKSIZE ||
2074             drro->drr_blksz > spa_maxblocksize(dmu_objset_spa(rwa->os)) ||
2075             drro->drr_bonuslen > DN_MAX_BONUSLEN) {
2076                 return (SET_ERROR(EINVAL));
2077         }
2078 
2079         err = dmu_object_info(rwa->os, drro->drr_object, &doi);
2080 
2081         if (err != 0 && err != ENOENT)
2082                 return (SET_ERROR(EINVAL));
2083         object = err == 0 ? drro->drr_object : DMU_NEW_OBJECT;
2084 
2085         /*
2086          * If we are losing blkptrs or changing the block size this must
2087          * be a new file instance.  We must clear out the previous file
2088          * contents before we can change this type of metadata in the dnode.
2089          */
2090         if (err == 0) {
2091                 int nblkptr;
2092 
2093                 nblkptr = deduce_nblkptr(drro->drr_bonustype,
2094                     drro->drr_bonuslen);
2095 
2096                 if (drro->drr_blksz != doi.doi_data_block_size ||
2097                     nblkptr < doi.doi_nblkptr) {
2098                         err = dmu_free_long_range(rwa->os, drro->drr_object,
2099                             0, DMU_OBJECT_END);
2100                         if (err != 0)
2101                                 return (SET_ERROR(EINVAL));
2102                 }
2103         }
2104 
2105         tx = dmu_tx_create(rwa->os);
2106         dmu_tx_hold_bonus(tx, object);
2107         err = dmu_tx_assign(tx, TXG_WAIT);
2108         if (err != 0) {
2109                 dmu_tx_abort(tx);
2110                 return (err);
2111         }
2112 
2113         if (object == DMU_NEW_OBJECT) {
2114                 /* currently free, want to be allocated */
2115                 err = dmu_object_claim(rwa->os, drro->drr_object,
2116                     drro->drr_type, drro->drr_blksz,
2117                     drro->drr_bonustype, drro->drr_bonuslen, tx);
2118         } else if (drro->drr_type != doi.doi_type ||
2119             drro->drr_blksz != doi.doi_data_block_size ||
2120             drro->drr_bonustype != doi.doi_bonus_type ||
2121             drro->drr_bonuslen != doi.doi_bonus_size) {
2122                 /* currently allocated, but with different properties */
2123                 err = dmu_object_reclaim(rwa->os, drro->drr_object,
2124                     drro->drr_type, drro->drr_blksz,
2125                     drro->drr_bonustype, drro->drr_bonuslen, tx);
2126         }
2127         if (err != 0) {
2128                 dmu_tx_commit(tx);
2129                 return (SET_ERROR(EINVAL));
2130         }
2131 
2132         dmu_object_set_checksum(rwa->os, drro->drr_object,
2133             drro->drr_checksumtype, tx);
2134         dmu_object_set_compress(rwa->os, drro->drr_object,
2135             drro->drr_compress, tx);
2136 
2137         if (data != NULL) {
2138                 dmu_buf_t *db;
2139 
2140                 VERIFY0(dmu_bonus_hold(rwa->os, drro->drr_object, FTAG, &db));
2141                 dmu_buf_will_dirty(db, tx);
2142 
2143                 ASSERT3U(db->db_size, >=, drro->drr_bonuslen);
2144                 bcopy(data, db->db_data, drro->drr_bonuslen);
2145                 if (rwa->byteswap) {
2146                         dmu_object_byteswap_t byteswap =
2147                             DMU_OT_BYTESWAP(drro->drr_bonustype);
2148                         dmu_ot_byteswap[byteswap].ob_func(db->db_data,
2149                             drro->drr_bonuslen);
2150                 }
2151                 dmu_buf_rele(db, FTAG);
2152         }
2153         dmu_tx_commit(tx);
2154 
2155         return (0);
2156 }
2157 
2158 /* ARGSUSED */
2159 static int
2160 receive_freeobjects(struct receive_writer_arg *rwa,
2161     struct drr_freeobjects *drrfo)
2162 {
2163         uint64_t obj;
2164         int next_err = 0;
2165 
2166         if (drrfo->drr_firstobj + drrfo->drr_numobjs < drrfo->drr_firstobj)
2167                 return (SET_ERROR(EINVAL));
2168 
2169         for (obj = drrfo->drr_firstobj;
2170             obj < drrfo->drr_firstobj + drrfo->drr_numobjs && next_err == 0;
2171             next_err = dmu_object_next(rwa->os, &obj, FALSE, 0)) {
2172                 int err;
2173 
2174                 if (dmu_object_info(rwa->os, obj, NULL) != 0)
2175                         continue;
2176 
2177                 err = dmu_free_long_object(rwa->os, obj);
2178                 if (err != 0)
2179                         return (err);
2180         }
2181         if (next_err != ESRCH)
2182                 return (next_err);
2183         return (0);
2184 }
2185 
2186 static int
2187 receive_write(struct receive_writer_arg *rwa, struct drr_write *drrw,
2188     arc_buf_t *abuf)
2189 {
2190         dmu_tx_t *tx;
2191         int err;
2192 
2193         if (drrw->drr_offset + drrw->drr_logical_size < drrw->drr_offset ||
2194             !DMU_OT_IS_VALID(drrw->drr_type))
2195                 return (SET_ERROR(EINVAL));
2196 
2197         /*
2198          * For resuming to work, records must be in increasing order
2199          * by (object, offset).
2200          */
2201         if (drrw->drr_object < rwa->last_object ||
2202             (drrw->drr_object == rwa->last_object &&
2203             drrw->drr_offset < rwa->last_offset)) {
2204                 return (SET_ERROR(EINVAL));
2205         }
2206         rwa->last_object = drrw->drr_object;
2207         rwa->last_offset = drrw->drr_offset;
2208 
2209         if (dmu_object_info(rwa->os, drrw->drr_object, NULL) != 0)
2210                 return (SET_ERROR(EINVAL));
2211 
2212         tx = dmu_tx_create(rwa->os);
2213 
2214         dmu_tx_hold_write(tx, drrw->drr_object,
2215             drrw->drr_offset, drrw->drr_logical_size);
2216         err = dmu_tx_assign(tx, TXG_WAIT);
2217         if (err != 0) {
2218                 dmu_tx_abort(tx);
2219                 return (err);
2220         }
2221         if (rwa->byteswap) {
2222                 dmu_object_byteswap_t byteswap =
2223                     DMU_OT_BYTESWAP(drrw->drr_type);
2224                 dmu_ot_byteswap[byteswap].ob_func(abuf->b_data,
2225                     DRR_WRITE_PAYLOAD_SIZE(drrw));
2226         }
2227 
2228         /* use the bonus buf to look up the dnode in dmu_assign_arcbuf */
2229         dmu_buf_t *bonus;
2230         if (dmu_bonus_hold(rwa->os, drrw->drr_object, FTAG, &bonus) != 0)
2231                 return (SET_ERROR(EINVAL));
2232         dmu_assign_arcbuf(bonus, drrw->drr_offset, abuf, tx);
2233 
2234         /*
2235          * Note: If the receive fails, we want the resume stream to start
2236          * with the same record that we last successfully received (as opposed
2237          * to the next record), so that we can verify that we are
2238          * resuming from the correct location.
2239          */
2240         save_resume_state(rwa, drrw->drr_object, drrw->drr_offset, tx);
2241         dmu_tx_commit(tx);
2242         dmu_buf_rele(bonus, FTAG);
2243 
2244         return (0);
2245 }
2246 
2247 /*
2248  * Handle a DRR_WRITE_BYREF record.  This record is used in dedup'ed
2249  * streams to refer to a copy of the data that is already on the
2250  * system because it came in earlier in the stream.  This function
2251  * finds the earlier copy of the data, and uses that copy instead of
2252  * data from the stream to fulfill this write.
2253  */
2254 static int
2255 receive_write_byref(struct receive_writer_arg *rwa,
2256     struct drr_write_byref *drrwbr)
2257 {
2258         dmu_tx_t *tx;
2259         int err;
2260         guid_map_entry_t gmesrch;
2261         guid_map_entry_t *gmep;
2262         avl_index_t where;
2263         objset_t *ref_os = NULL;
2264         dmu_buf_t *dbp;
2265 
2266         if (drrwbr->drr_offset + drrwbr->drr_length < drrwbr->drr_offset)
2267                 return (SET_ERROR(EINVAL));
2268 
2269         /*
2270          * If the GUID of the referenced dataset is different from the
2271          * GUID of the target dataset, find the referenced dataset.
2272          */
2273         if (drrwbr->drr_toguid != drrwbr->drr_refguid) {
2274                 gmesrch.guid = drrwbr->drr_refguid;
2275                 if ((gmep = avl_find(rwa->guid_to_ds_map, &gmesrch,
2276                     &where)) == NULL) {
2277                         return (SET_ERROR(EINVAL));
2278                 }
2279                 if (dmu_objset_from_ds(gmep->gme_ds, &ref_os))
2280                         return (SET_ERROR(EINVAL));
2281         } else {
2282                 ref_os = rwa->os;
2283         }
2284 
2285         err = dmu_buf_hold(ref_os, drrwbr->drr_refobject,
2286             drrwbr->drr_refoffset, FTAG, &dbp, DMU_READ_PREFETCH);
2287         if (err != 0)
2288                 return (err);
2289 
2290         tx = dmu_tx_create(rwa->os);
2291 
2292         dmu_tx_hold_write(tx, drrwbr->drr_object,
2293             drrwbr->drr_offset, drrwbr->drr_length);
2294         err = dmu_tx_assign(tx, TXG_WAIT);
2295         if (err != 0) {
2296                 dmu_tx_abort(tx);
2297                 return (err);
2298         }
2299         dmu_write(rwa->os, drrwbr->drr_object,
2300             drrwbr->drr_offset, drrwbr->drr_length, dbp->db_data, tx);
2301         dmu_buf_rele(dbp, FTAG);
2302 
2303         /* See comment in restore_write. */
2304         save_resume_state(rwa, drrwbr->drr_object, drrwbr->drr_offset, tx);
2305         dmu_tx_commit(tx);
2306         return (0);
2307 }
2308 
2309 static int
2310 receive_write_embedded(struct receive_writer_arg *rwa,
2311     struct drr_write_embedded *drrwe, void *data)
2312 {
2313         dmu_tx_t *tx;
2314         int err;
2315 
2316         if (drrwe->drr_offset + drrwe->drr_length < drrwe->drr_offset)
2317                 return (EINVAL);
2318 
2319         if (drrwe->drr_psize > BPE_PAYLOAD_SIZE)
2320                 return (EINVAL);
2321 
2322         if (drrwe->drr_etype >= NUM_BP_EMBEDDED_TYPES)
2323                 return (EINVAL);
2324         if (drrwe->drr_compression >= ZIO_COMPRESS_FUNCTIONS)
2325                 return (EINVAL);
2326 
2327         tx = dmu_tx_create(rwa->os);
2328 
2329         dmu_tx_hold_write(tx, drrwe->drr_object,
2330             drrwe->drr_offset, drrwe->drr_length);
2331         err = dmu_tx_assign(tx, TXG_WAIT);
2332         if (err != 0) {
2333                 dmu_tx_abort(tx);
2334                 return (err);
2335         }
2336 
2337         dmu_write_embedded(rwa->os, drrwe->drr_object,
2338             drrwe->drr_offset, data, drrwe->drr_etype,
2339             drrwe->drr_compression, drrwe->drr_lsize, drrwe->drr_psize,
2340             rwa->byteswap ^ ZFS_HOST_BYTEORDER, tx);
2341 
2342         /* See comment in restore_write. */
2343         save_resume_state(rwa, drrwe->drr_object, drrwe->drr_offset, tx);
2344         dmu_tx_commit(tx);
2345         return (0);
2346 }
2347 
2348 static int
2349 receive_spill(struct receive_writer_arg *rwa, struct drr_spill *drrs,
2350     void *data)
2351 {
2352         dmu_tx_t *tx;
2353         dmu_buf_t *db, *db_spill;
2354         int err;
2355 
2356         if (drrs->drr_length < SPA_MINBLOCKSIZE ||
2357             drrs->drr_length > spa_maxblocksize(dmu_objset_spa(rwa->os)))
2358                 return (SET_ERROR(EINVAL));
2359 
2360         if (dmu_object_info(rwa->os, drrs->drr_object, NULL) != 0)
2361                 return (SET_ERROR(EINVAL));
2362 
2363         VERIFY0(dmu_bonus_hold(rwa->os, drrs->drr_object, FTAG, &db));
2364         if ((err = dmu_spill_hold_by_bonus(db, FTAG, &db_spill)) != 0) {
2365                 dmu_buf_rele(db, FTAG);
2366                 return (err);
2367         }
2368 
2369         tx = dmu_tx_create(rwa->os);
2370 
2371         dmu_tx_hold_spill(tx, db->db_object);
2372 
2373         err = dmu_tx_assign(tx, TXG_WAIT);
2374         if (err != 0) {
2375                 dmu_buf_rele(db, FTAG);
2376                 dmu_buf_rele(db_spill, FTAG);
2377                 dmu_tx_abort(tx);
2378                 return (err);
2379         }
2380         dmu_buf_will_dirty(db_spill, tx);
2381 
2382         if (db_spill->db_size < drrs->drr_length)
2383                 VERIFY(0 == dbuf_spill_set_blksz(db_spill,
2384                     drrs->drr_length, tx));
2385         bcopy(data, db_spill->db_data, drrs->drr_length);
2386 
2387         dmu_buf_rele(db, FTAG);
2388         dmu_buf_rele(db_spill, FTAG);
2389 
2390         dmu_tx_commit(tx);
2391         return (0);
2392 }
2393 
2394 /* ARGSUSED */
2395 static int
2396 receive_free(struct receive_writer_arg *rwa, struct drr_free *drrf)
2397 {
2398         int err;
2399 
2400         if (drrf->drr_length != -1ULL &&
2401             drrf->drr_offset + drrf->drr_length < drrf->drr_offset)
2402                 return (SET_ERROR(EINVAL));
2403 
2404         if (dmu_object_info(rwa->os, drrf->drr_object, NULL) != 0)
2405                 return (SET_ERROR(EINVAL));
2406 
2407         err = dmu_free_long_range(rwa->os, drrf->drr_object,
2408             drrf->drr_offset, drrf->drr_length);
2409 
2410         return (err);
2411 }
2412 
2413 /* used to destroy the drc_ds on error */
2414 static void
2415 dmu_recv_cleanup_ds(dmu_recv_cookie_t *drc)
2416 {
2417         if (drc->drc_resumable) {
2418                 /* wait for our resume state to be written to disk */
2419                 txg_wait_synced(drc->drc_ds->ds_dir->dd_pool, 0);
2420                 dsl_dataset_disown(drc->drc_ds, dmu_recv_tag);
2421         } else {
2422                 char name[ZFS_MAX_DATASET_NAME_LEN];
2423                 dsl_dataset_name(drc->drc_ds, name);
2424                 dsl_dataset_disown(drc->drc_ds, dmu_recv_tag);
2425                 (void) dsl_destroy_head(name);
2426         }
2427 }
2428 
2429 static void
2430 receive_cksum(struct receive_arg *ra, int len, void *buf)
2431 {
2432         if (ra->byteswap) {
2433                 (void) fletcher_4_incremental_byteswap(buf, len, &ra->cksum);
2434         } else {
2435                 (void) fletcher_4_incremental_native(buf, len, &ra->cksum);
2436         }
2437 }
2438 
2439 /*
2440  * Read the payload into a buffer of size len, and update the current record's
2441  * payload field.
2442  * Allocate ra->next_rrd and read the next record's header into
2443  * ra->next_rrd->header.
2444  * Verify checksum of payload and next record.
2445  */
2446 static int
2447 receive_read_payload_and_next_header(struct receive_arg *ra, int len, void *buf)
2448 {
2449         int err;
2450 
2451         if (len != 0) {
2452                 ASSERT3U(len, <=, SPA_MAXBLOCKSIZE);
2453                 err = receive_read(ra, len, buf);
2454                 if (err != 0)
2455                         return (err);
2456                 receive_cksum(ra, len, buf);
2457 
2458                 /* note: rrd is NULL when reading the begin record's payload */
2459                 if (ra->rrd != NULL) {
2460                         ra->rrd->payload = buf;
2461                         ra->rrd->payload_size = len;
2462                         ra->rrd->bytes_read = ra->bytes_read;
2463                 }
2464         }
2465 
2466         ra->prev_cksum = ra->cksum;
2467 
2468         ra->next_rrd = kmem_zalloc(sizeof (*ra->next_rrd), KM_SLEEP);
2469         err = receive_read(ra, sizeof (ra->next_rrd->header),
2470             &ra->next_rrd->header);
2471         ra->next_rrd->bytes_read = ra->bytes_read;
2472         if (err != 0) {
2473                 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
2474                 ra->next_rrd = NULL;
2475                 return (err);
2476         }
2477         if (ra->next_rrd->header.drr_type == DRR_BEGIN) {
2478                 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
2479                 ra->next_rrd = NULL;
2480                 return (SET_ERROR(EINVAL));
2481         }
2482 
2483         /*
2484          * Note: checksum is of everything up to but not including the
2485          * checksum itself.
2486          */
2487         ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
2488             ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
2489         receive_cksum(ra,
2490             offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
2491             &ra->next_rrd->header);
2492 
2493         zio_cksum_t cksum_orig =
2494             ra->next_rrd->header.drr_u.drr_checksum.drr_checksum;
2495         zio_cksum_t *cksump =
2496             &ra->next_rrd->header.drr_u.drr_checksum.drr_checksum;
2497 
2498         if (ra->byteswap)
2499                 byteswap_record(&ra->next_rrd->header);
2500 
2501         if ((!ZIO_CHECKSUM_IS_ZERO(cksump)) &&
2502             !ZIO_CHECKSUM_EQUAL(ra->cksum, *cksump)) {
2503                 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
2504                 ra->next_rrd = NULL;
2505                 return (SET_ERROR(ECKSUM));
2506         }
2507 
2508         receive_cksum(ra, sizeof (cksum_orig), &cksum_orig);
2509 
2510         return (0);
2511 }
2512 
2513 static void
2514 objlist_create(struct objlist *list)
2515 {
2516         list_create(&list->list, sizeof (struct receive_objnode),
2517             offsetof(struct receive_objnode, node));
2518         list->last_lookup = 0;
2519 }
2520 
2521 static void
2522 objlist_destroy(struct objlist *list)
2523 {
2524         for (struct receive_objnode *n = list_remove_head(&list->list);
2525             n != NULL; n = list_remove_head(&list->list)) {
2526                 kmem_free(n, sizeof (*n));
2527         }
2528         list_destroy(&list->list);
2529 }
2530 
2531 /*
2532  * This function looks through the objlist to see if the specified object number
2533  * is contained in the objlist.  In the process, it will remove all object
2534  * numbers in the list that are smaller than the specified object number.  Thus,
2535  * any lookup of an object number smaller than a previously looked up object
2536  * number will always return false; therefore, all lookups should be done in
2537  * ascending order.
2538  */
2539 static boolean_t
2540 objlist_exists(struct objlist *list, uint64_t object)
2541 {
2542         struct receive_objnode *node = list_head(&list->list);
2543         ASSERT3U(object, >=, list->last_lookup);
2544         list->last_lookup = object;
2545         while (node != NULL && node->object < object) {
2546                 VERIFY3P(node, ==, list_remove_head(&list->list));
2547                 kmem_free(node, sizeof (*node));
2548                 node = list_head(&list->list);
2549         }
2550         return (node != NULL && node->object == object);
2551 }
2552 
2553 /*
2554  * The objlist is a list of object numbers stored in ascending order.  However,
2555  * the insertion of new object numbers does not seek out the correct location to
2556  * store a new object number; instead, it appends it to the list for simplicity.
2557  * Thus, any users must take care to only insert new object numbers in ascending
2558  * order.
2559  */
2560 static void
2561 objlist_insert(struct objlist *list, uint64_t object)
2562 {
2563         struct receive_objnode *node = kmem_zalloc(sizeof (*node), KM_SLEEP);
2564         node->object = object;
2565 #ifdef ZFS_DEBUG
2566         struct receive_objnode *last_object = list_tail(&list->list);
2567         uint64_t last_objnum = (last_object != NULL ? last_object->object : 0);
2568         ASSERT3U(node->object, >, last_objnum);
2569 #endif
2570         list_insert_tail(&list->list, node);
2571 }
2572 
2573 /*
2574  * Issue the prefetch reads for any necessary indirect blocks.
2575  *
2576  * We use the object ignore list to tell us whether or not to issue prefetches
2577  * for a given object.  We do this for both correctness (in case the blocksize
2578  * of an object has changed) and performance (if the object doesn't exist, don't
2579  * needlessly try to issue prefetches).  We also trim the list as we go through
2580  * the stream to prevent it from growing to an unbounded size.
2581  *
2582  * The object numbers within will always be in sorted order, and any write
2583  * records we see will also be in sorted order, but they're not sorted with
2584  * respect to each other (i.e. we can get several object records before
2585  * receiving each object's write records).  As a result, once we've reached a
2586  * given object number, we can safely remove any reference to lower object
2587  * numbers in the ignore list. In practice, we receive up to 32 object records
2588  * before receiving write records, so the list can have up to 32 nodes in it.
2589  */
2590 /* ARGSUSED */
2591 static void
2592 receive_read_prefetch(struct receive_arg *ra,
2593     uint64_t object, uint64_t offset, uint64_t length)
2594 {
2595         if (!objlist_exists(&ra->ignore_objlist, object)) {
2596                 dmu_prefetch(ra->os, object, 1, offset, length,
2597                     ZIO_PRIORITY_SYNC_READ);
2598         }
2599 }
2600 
2601 /*
2602  * Read records off the stream, issuing any necessary prefetches.
2603  */
2604 static int
2605 receive_read_record(struct receive_arg *ra)
2606 {
2607         int err;
2608 
2609         switch (ra->rrd->header.drr_type) {
2610         case DRR_OBJECT:
2611         {
2612                 struct drr_object *drro = &ra->rrd->header.drr_u.drr_object;
2613                 uint32_t size = P2ROUNDUP(drro->drr_bonuslen, 8);
2614                 void *buf = kmem_zalloc(size, KM_SLEEP);
2615                 dmu_object_info_t doi;
2616                 err = receive_read_payload_and_next_header(ra, size, buf);
2617                 if (err != 0) {
2618                         kmem_free(buf, size);
2619                         return (err);
2620                 }
2621                 err = dmu_object_info(ra->os, drro->drr_object, &doi);
2622                 /*
2623                  * See receive_read_prefetch for an explanation why we're
2624                  * storing this object in the ignore_obj_list.
2625                  */
2626                 if (err == ENOENT ||
2627                     (err == 0 && doi.doi_data_block_size != drro->drr_blksz)) {
2628                         objlist_insert(&ra->ignore_objlist, drro->drr_object);
2629                         err = 0;
2630                 }
2631                 return (err);
2632         }
2633         case DRR_FREEOBJECTS:
2634         {
2635                 err = receive_read_payload_and_next_header(ra, 0, NULL);
2636                 return (err);
2637         }
2638         case DRR_WRITE:
2639         {
2640                 struct drr_write *drrw = &ra->rrd->header.drr_u.drr_write;
2641                 arc_buf_t *abuf;
2642                 boolean_t is_meta = DMU_OT_IS_METADATA(drrw->drr_type);
2643                 if (DRR_WRITE_COMPRESSED(drrw)) {
2644                         ASSERT3U(drrw->drr_compressed_size, >, 0);
2645                         ASSERT3U(drrw->drr_logical_size, >=,
2646                             drrw->drr_compressed_size);
2647                         ASSERT(!is_meta);
2648                         abuf = arc_loan_compressed_buf(
2649                             dmu_objset_spa(ra->os),
2650                             drrw->drr_compressed_size, drrw->drr_logical_size,
2651                             drrw->drr_compressiontype);
2652                 } else {
2653                         abuf = arc_loan_buf(dmu_objset_spa(ra->os),
2654                             is_meta, drrw->drr_logical_size);
2655                 }
2656 
2657                 err = receive_read_payload_and_next_header(ra,
2658                     DRR_WRITE_PAYLOAD_SIZE(drrw), abuf->b_data);
2659                 if (err != 0) {
2660                         dmu_return_arcbuf(abuf);
2661                         return (err);
2662                 }
2663                 ra->rrd->write_buf = abuf;
2664                 receive_read_prefetch(ra, drrw->drr_object, drrw->drr_offset,
2665                     drrw->drr_logical_size);
2666                 return (err);
2667         }
2668         case DRR_WRITE_BYREF:
2669         {
2670                 struct drr_write_byref *drrwb =
2671                     &ra->rrd->header.drr_u.drr_write_byref;
2672                 err = receive_read_payload_and_next_header(ra, 0, NULL);
2673                 receive_read_prefetch(ra, drrwb->drr_object, drrwb->drr_offset,
2674                     drrwb->drr_length);
2675                 return (err);
2676         }
2677         case DRR_WRITE_EMBEDDED:
2678         {
2679                 struct drr_write_embedded *drrwe =
2680                     &ra->rrd->header.drr_u.drr_write_embedded;
2681                 uint32_t size = P2ROUNDUP(drrwe->drr_psize, 8);
2682                 void *buf = kmem_zalloc(size, KM_SLEEP);
2683 
2684                 err = receive_read_payload_and_next_header(ra, size, buf);
2685                 if (err != 0) {
2686                         kmem_free(buf, size);
2687                         return (err);
2688                 }
2689 
2690                 receive_read_prefetch(ra, drrwe->drr_object, drrwe->drr_offset,
2691                     drrwe->drr_length);
2692                 return (err);
2693         }
2694         case DRR_FREE:
2695         {
2696                 /*
2697                  * It might be beneficial to prefetch indirect blocks here, but
2698                  * we don't really have the data to decide for sure.
2699                  */
2700                 err = receive_read_payload_and_next_header(ra, 0, NULL);
2701                 return (err);
2702         }
2703         case DRR_END:
2704         {
2705                 struct drr_end *drre = &ra->rrd->header.drr_u.drr_end;
2706                 if (!ZIO_CHECKSUM_EQUAL(ra->prev_cksum, drre->drr_checksum))
2707                         return (SET_ERROR(ECKSUM));
2708                 return (0);
2709         }
2710         case DRR_SPILL:
2711         {
2712                 struct drr_spill *drrs = &ra->rrd->header.drr_u.drr_spill;
2713                 void *buf = kmem_zalloc(drrs->drr_length, KM_SLEEP);
2714                 err = receive_read_payload_and_next_header(ra, drrs->drr_length,
2715                     buf);
2716                 if (err != 0)
2717                         kmem_free(buf, drrs->drr_length);
2718                 return (err);
2719         }
2720         default:
2721                 return (SET_ERROR(EINVAL));
2722         }
2723 }
2724 
2725 /*
2726  * Commit the records to the pool.
2727  */
2728 static int
2729 receive_process_record(struct receive_writer_arg *rwa,
2730     struct receive_record_arg *rrd)
2731 {
2732         int err;
2733 
2734         /* Processing in order, therefore bytes_read should be increasing. */
2735         ASSERT3U(rrd->bytes_read, >=, rwa->bytes_read);
2736         rwa->bytes_read = rrd->bytes_read;
2737 
2738         switch (rrd->header.drr_type) {
2739         case DRR_OBJECT:
2740         {
2741                 struct drr_object *drro = &rrd->header.drr_u.drr_object;
2742                 err = receive_object(rwa, drro, rrd->payload);
2743                 kmem_free(rrd->payload, rrd->payload_size);
2744                 rrd->payload = NULL;
2745                 return (err);
2746         }
2747         case DRR_FREEOBJECTS:
2748         {
2749                 struct drr_freeobjects *drrfo =
2750                     &rrd->header.drr_u.drr_freeobjects;
2751                 return (receive_freeobjects(rwa, drrfo));
2752         }
2753         case DRR_WRITE:
2754         {
2755                 struct drr_write *drrw = &rrd->header.drr_u.drr_write;
2756                 err = receive_write(rwa, drrw, rrd->write_buf);
2757                 /* if receive_write() is successful, it consumes the arc_buf */
2758                 if (err != 0)
2759                         dmu_return_arcbuf(rrd->write_buf);
2760                 rrd->write_buf = NULL;
2761                 rrd->payload = NULL;
2762                 return (err);
2763         }
2764         case DRR_WRITE_BYREF:
2765         {
2766                 struct drr_write_byref *drrwbr =
2767                     &rrd->header.drr_u.drr_write_byref;
2768                 return (receive_write_byref(rwa, drrwbr));
2769         }
2770         case DRR_WRITE_EMBEDDED:
2771         {
2772                 struct drr_write_embedded *drrwe =
2773                     &rrd->header.drr_u.drr_write_embedded;
2774                 err = receive_write_embedded(rwa, drrwe, rrd->payload);
2775                 kmem_free(rrd->payload, rrd->payload_size);
2776                 rrd->payload = NULL;
2777                 return (err);
2778         }
2779         case DRR_FREE:
2780         {
2781                 struct drr_free *drrf = &rrd->header.drr_u.drr_free;
2782                 return (receive_free(rwa, drrf));
2783         }
2784         case DRR_SPILL:
2785         {
2786                 struct drr_spill *drrs = &rrd->header.drr_u.drr_spill;
2787                 err = receive_spill(rwa, drrs, rrd->payload);
2788                 kmem_free(rrd->payload, rrd->payload_size);
2789                 rrd->payload = NULL;
2790                 return (err);
2791         }
2792         default:
2793                 return (SET_ERROR(EINVAL));
2794         }
2795 }
2796 
2797 /*
2798  * dmu_recv_stream's worker thread; pull records off the queue, and then call
2799  * receive_process_record  When we're done, signal the main thread and exit.
2800  */
2801 static void
2802 receive_writer_thread(void *arg)
2803 {
2804         struct receive_writer_arg *rwa = arg;
2805         struct receive_record_arg *rrd;
2806         for (rrd = bqueue_dequeue(&rwa->q); !rrd->eos_marker;
2807             rrd = bqueue_dequeue(&rwa->q)) {
2808                 /*
2809                  * If there's an error, the main thread will stop putting things
2810                  * on the queue, but we need to clear everything in it before we
2811                  * can exit.
2812                  */
2813                 if (rwa->err == 0) {
2814                         rwa->err = receive_process_record(rwa, rrd);
2815                 } else if (rrd->write_buf != NULL) {
2816                         dmu_return_arcbuf(rrd->write_buf);
2817                         rrd->write_buf = NULL;
2818                         rrd->payload = NULL;
2819                 } else if (rrd->payload != NULL) {
2820                         kmem_free(rrd->payload, rrd->payload_size);
2821                         rrd->payload = NULL;
2822                 }
2823                 kmem_free(rrd, sizeof (*rrd));
2824         }
2825         kmem_free(rrd, sizeof (*rrd));
2826         mutex_enter(&rwa->mutex);
2827         rwa->done = B_TRUE;
2828         cv_signal(&rwa->cv);
2829         mutex_exit(&rwa->mutex);
2830         thread_exit();
2831 }
2832 
2833 static int
2834 resume_check(struct receive_arg *ra, nvlist_t *begin_nvl)
2835 {
2836         uint64_t val;
2837         objset_t *mos = dmu_objset_pool(ra->os)->dp_meta_objset;
2838         uint64_t dsobj = dmu_objset_id(ra->os);
2839         uint64_t resume_obj, resume_off;
2840 
2841         if (nvlist_lookup_uint64(begin_nvl,
2842             "resume_object", &resume_obj) != 0 ||
2843             nvlist_lookup_uint64(begin_nvl,
2844             "resume_offset", &resume_off) != 0) {
2845                 return (SET_ERROR(EINVAL));
2846         }
2847         VERIFY0(zap_lookup(mos, dsobj,
2848             DS_FIELD_RESUME_OBJECT, sizeof (val), 1, &val));
2849         if (resume_obj != val)
2850                 return (SET_ERROR(EINVAL));
2851         VERIFY0(zap_lookup(mos, dsobj,
2852             DS_FIELD_RESUME_OFFSET, sizeof (val), 1, &val));
2853         if (resume_off != val)
2854                 return (SET_ERROR(EINVAL));
2855 
2856         return (0);
2857 }
2858 
2859 /*
2860  * Read in the stream's records, one by one, and apply them to the pool.  There
2861  * are two threads involved; the thread that calls this function will spin up a
2862  * worker thread, read the records off the stream one by one, and issue
2863  * prefetches for any necessary indirect blocks.  It will then push the records
2864  * onto an internal blocking queue.  The worker thread will pull the records off
2865  * the queue, and actually write the data into the DMU.  This way, the worker
2866  * thread doesn't have to wait for reads to complete, since everything it needs
2867  * (the indirect blocks) will be prefetched.
2868  *
2869  * NB: callers *must* call dmu_recv_end() if this succeeds.
2870  */
2871 int
2872 dmu_recv_stream(dmu_recv_cookie_t *drc, vnode_t *vp, offset_t *voffp,
2873     int cleanup_fd, uint64_t *action_handlep)
2874 {
2875         int err = 0;
2876         struct receive_arg ra = { 0 };
2877         struct receive_writer_arg rwa = { 0 };
2878         int featureflags;
2879         nvlist_t *begin_nvl = NULL;
2880 
2881         ra.byteswap = drc->drc_byteswap;
2882         ra.cksum = drc->drc_cksum;
2883         ra.vp = vp;
2884         ra.voff = *voffp;
2885 
2886         if (dsl_dataset_is_zapified(drc->drc_ds)) {
2887                 (void) zap_lookup(drc->drc_ds->ds_dir->dd_pool->dp_meta_objset,
2888                     drc->drc_ds->ds_object, DS_FIELD_RESUME_BYTES,
2889                     sizeof (ra.bytes_read), 1, &ra.bytes_read);
2890         }
2891 
2892         objlist_create(&ra.ignore_objlist);
2893 
2894         /* these were verified in dmu_recv_begin */
2895         ASSERT3U(DMU_GET_STREAM_HDRTYPE(drc->drc_drrb->drr_versioninfo), ==,
2896             DMU_SUBSTREAM);
2897         ASSERT3U(drc->drc_drrb->drr_type, <, DMU_OST_NUMTYPES);
2898 
2899         /*
2900          * Open the objset we are modifying.
2901          */
2902         VERIFY0(dmu_objset_from_ds(drc->drc_ds, &ra.os));
2903 
2904         ASSERT(dsl_dataset_phys(drc->drc_ds)->ds_flags & DS_FLAG_INCONSISTENT);
2905 
2906         featureflags = DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo);
2907 
2908         /* if this stream is dedup'ed, set up the avl tree for guid mapping */
2909         if (featureflags & DMU_BACKUP_FEATURE_DEDUP) {
2910                 minor_t minor;
2911 
2912                 if (cleanup_fd == -1) {
2913                         ra.err = SET_ERROR(EBADF);
2914                         goto out;
2915                 }
2916                 ra.err = zfs_onexit_fd_hold(cleanup_fd, &minor);
2917                 if (ra.err != 0) {
2918                         cleanup_fd = -1;
2919                         goto out;
2920                 }
2921 
2922                 if (*action_handlep == 0) {
2923                         rwa.guid_to_ds_map =
2924                             kmem_alloc(sizeof (avl_tree_t), KM_SLEEP);
2925                         avl_create(rwa.guid_to_ds_map, guid_compare,
2926                             sizeof (guid_map_entry_t),
2927                             offsetof(guid_map_entry_t, avlnode));
2928                         err = zfs_onexit_add_cb(minor,
2929                             free_guid_map_onexit, rwa.guid_to_ds_map,
2930                             action_handlep);
2931                         if (ra.err != 0)
2932                                 goto out;
2933                 } else {
2934                         err = zfs_onexit_cb_data(minor, *action_handlep,
2935                             (void **)&rwa.guid_to_ds_map);
2936                         if (ra.err != 0)
2937                                 goto out;
2938                 }
2939 
2940                 drc->drc_guid_to_ds_map = rwa.guid_to_ds_map;
2941         }
2942 
2943         uint32_t payloadlen = drc->drc_drr_begin->drr_payloadlen;
2944         void *payload = NULL;
2945         if (payloadlen != 0)
2946                 payload = kmem_alloc(payloadlen, KM_SLEEP);
2947 
2948         err = receive_read_payload_and_next_header(&ra, payloadlen, payload);
2949         if (err != 0) {
2950                 if (payloadlen != 0)
2951                         kmem_free(payload, payloadlen);
2952                 goto out;
2953         }
2954         if (payloadlen != 0) {
2955                 err = nvlist_unpack(payload, payloadlen, &begin_nvl, KM_SLEEP);
2956                 kmem_free(payload, payloadlen);
2957                 if (err != 0)
2958                         goto out;
2959         }
2960 
2961         if (featureflags & DMU_BACKUP_FEATURE_RESUMING) {
2962                 err = resume_check(&ra, begin_nvl);
2963                 if (err != 0)
2964                         goto out;
2965         }
2966 
2967         (void) bqueue_init(&rwa.q, zfs_recv_queue_length,
2968             offsetof(struct receive_record_arg, node));
2969         cv_init(&rwa.cv, NULL, CV_DEFAULT, NULL);
2970         mutex_init(&rwa.mutex, NULL, MUTEX_DEFAULT, NULL);
2971         rwa.os = ra.os;
2972         rwa.byteswap = drc->drc_byteswap;
2973         rwa.resumable = drc->drc_resumable;
2974 
2975         (void) thread_create(NULL, 0, receive_writer_thread, &rwa, 0, curproc,
2976             TS_RUN, minclsyspri);
2977         /*
2978          * We're reading rwa.err without locks, which is safe since we are the
2979          * only reader, and the worker thread is the only writer.  It's ok if we
2980          * miss a write for an iteration or two of the loop, since the writer
2981          * thread will keep freeing records we send it until we send it an eos
2982          * marker.
2983          *
2984          * We can leave this loop in 3 ways:  First, if rwa.err is
2985          * non-zero.  In that case, the writer thread will free the rrd we just
2986          * pushed.  Second, if  we're interrupted; in that case, either it's the
2987          * first loop and ra.rrd was never allocated, or it's later, and ra.rrd
2988          * has been handed off to the writer thread who will free it.  Finally,
2989          * if receive_read_record fails or we're at the end of the stream, then
2990          * we free ra.rrd and exit.
2991          */
2992         while (rwa.err == 0) {
2993                 if (issig(JUSTLOOKING) && issig(FORREAL)) {
2994                         err = SET_ERROR(EINTR);
2995                         break;
2996                 }
2997 
2998                 ASSERT3P(ra.rrd, ==, NULL);
2999                 ra.rrd = ra.next_rrd;
3000                 ra.next_rrd = NULL;
3001                 /* Allocates and loads header into ra.next_rrd */
3002                 err = receive_read_record(&ra);
3003 
3004                 if (ra.rrd->header.drr_type == DRR_END || err != 0) {
3005                         kmem_free(ra.rrd, sizeof (*ra.rrd));
3006                         ra.rrd = NULL;
3007                         break;
3008                 }
3009 
3010                 bqueue_enqueue(&rwa.q, ra.rrd,
3011                     sizeof (struct receive_record_arg) + ra.rrd->payload_size);
3012                 ra.rrd = NULL;
3013         }
3014         if (ra.next_rrd == NULL)
3015                 ra.next_rrd = kmem_zalloc(sizeof (*ra.next_rrd), KM_SLEEP);
3016         ra.next_rrd->eos_marker = B_TRUE;
3017         bqueue_enqueue(&rwa.q, ra.next_rrd, 1);
3018 
3019         mutex_enter(&rwa.mutex);
3020         while (!rwa.done) {
3021                 cv_wait(&rwa.cv, &rwa.mutex);
3022         }
3023         mutex_exit(&rwa.mutex);
3024 
3025         cv_destroy(&rwa.cv);
3026         mutex_destroy(&rwa.mutex);
3027         bqueue_destroy(&rwa.q);
3028         if (err == 0)
3029                 err = rwa.err;
3030 
3031 out:
3032         nvlist_free(begin_nvl);
3033         if ((featureflags & DMU_BACKUP_FEATURE_DEDUP) && (cleanup_fd != -1))
3034                 zfs_onexit_fd_rele(cleanup_fd);
3035 
3036         if (err != 0) {
3037                 /*
3038                  * Clean up references. If receive is not resumable,
3039                  * destroy what we created, so we don't leave it in
3040                  * the inconsistent state.
3041                  */
3042                 dmu_recv_cleanup_ds(drc);
3043         }
3044 
3045         *voffp = ra.voff;
3046         objlist_destroy(&ra.ignore_objlist);
3047         return (err);
3048 }
3049 
3050 static int
3051 dmu_recv_end_check(void *arg, dmu_tx_t *tx)
3052 {
3053         dmu_recv_cookie_t *drc = arg;
3054         dsl_pool_t *dp = dmu_tx_pool(tx);
3055         int error;
3056 
3057         ASSERT3P(drc->drc_ds->ds_owner, ==, dmu_recv_tag);
3058 
3059         if (!drc->drc_newfs) {
3060                 dsl_dataset_t *origin_head;
3061 
3062                 error = dsl_dataset_hold(dp, drc->drc_tofs, FTAG, &origin_head);
3063                 if (error != 0)
3064                         return (error);
3065                 if (drc->drc_force) {
3066                         /*
3067                          * We will destroy any snapshots in tofs (i.e. before
3068                          * origin_head) that are after the origin (which is
3069                          * the snap before drc_ds, because drc_ds can not
3070                          * have any snaps of its own).
3071                          */
3072                         uint64_t obj;
3073 
3074                         obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
3075                         while (obj !=
3076                             dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) {
3077                                 dsl_dataset_t *snap;
3078                                 error = dsl_dataset_hold_obj(dp, obj, FTAG,
3079                                     &snap);
3080                                 if (error != 0)
3081                                         break;
3082                                 if (snap->ds_dir != origin_head->ds_dir)
3083                                         error = SET_ERROR(EINVAL);
3084                                 if (error == 0)  {
3085                                         error = dsl_destroy_snapshot_check_impl(
3086                                             snap, B_FALSE);
3087                                 }
3088                                 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
3089                                 dsl_dataset_rele(snap, FTAG);
3090                                 if (error != 0)
3091                                         break;
3092                         }
3093                         if (error != 0) {
3094                                 dsl_dataset_rele(origin_head, FTAG);
3095                                 return (error);
3096                         }
3097                 }
3098                 error = dsl_dataset_clone_swap_check_impl(drc->drc_ds,
3099                     origin_head, drc->drc_force, drc->drc_owner, tx);
3100                 if (error != 0) {
3101                         dsl_dataset_rele(origin_head, FTAG);
3102                         return (error);
3103                 }
3104                 error = dsl_dataset_snapshot_check_impl(origin_head,
3105                     drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred);
3106                 dsl_dataset_rele(origin_head, FTAG);
3107                 if (error != 0)
3108                         return (error);
3109 
3110                 error = dsl_destroy_head_check_impl(drc->drc_ds, 1);
3111         } else {
3112                 error = dsl_dataset_snapshot_check_impl(drc->drc_ds,
3113                     drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred);
3114         }
3115         return (error);
3116 }
3117 
3118 static void
3119 dmu_recv_end_sync(void *arg, dmu_tx_t *tx)
3120 {
3121         dmu_recv_cookie_t *drc = arg;
3122         dsl_pool_t *dp = dmu_tx_pool(tx);
3123 
3124         spa_history_log_internal_ds(drc->drc_ds, "finish receiving",
3125             tx, "snap=%s", drc->drc_tosnap);
3126 
3127         if (!drc->drc_newfs) {
3128                 dsl_dataset_t *origin_head;
3129 
3130                 VERIFY0(dsl_dataset_hold(dp, drc->drc_tofs, FTAG,
3131                     &origin_head));
3132 
3133                 if (drc->drc_force) {
3134                         /*
3135                          * Destroy any snapshots of drc_tofs (origin_head)
3136                          * after the origin (the snap before drc_ds).
3137                          */
3138                         uint64_t obj;
3139 
3140                         obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
3141                         while (obj !=
3142                             dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) {
3143                                 dsl_dataset_t *snap;
3144                                 VERIFY0(dsl_dataset_hold_obj(dp, obj, FTAG,
3145                                     &snap));
3146                                 ASSERT3P(snap->ds_dir, ==, origin_head->ds_dir);
3147                                 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
3148                                 dsl_destroy_snapshot_sync_impl(snap,
3149                                     B_FALSE, tx);
3150                                 dsl_dataset_rele(snap, FTAG);
3151                         }
3152                 }
3153                 VERIFY3P(drc->drc_ds->ds_prev, ==,
3154                     origin_head->ds_prev);
3155 
3156                 dsl_dataset_clone_swap_sync_impl(drc->drc_ds,
3157                     origin_head, tx);
3158                 dsl_dataset_snapshot_sync_impl(origin_head,
3159                     drc->drc_tosnap, tx);
3160 
3161                 /* set snapshot's creation time and guid */
3162                 dmu_buf_will_dirty(origin_head->ds_prev->ds_dbuf, tx);
3163                 dsl_dataset_phys(origin_head->ds_prev)->ds_creation_time =
3164                     drc->drc_drrb->drr_creation_time;
3165                 dsl_dataset_phys(origin_head->ds_prev)->ds_guid =
3166                     drc->drc_drrb->drr_toguid;
3167                 dsl_dataset_phys(origin_head->ds_prev)->ds_flags &=
3168                     ~DS_FLAG_INCONSISTENT;
3169 
3170                 dmu_buf_will_dirty(origin_head->ds_dbuf, tx);
3171                 dsl_dataset_phys(origin_head)->ds_flags &=
3172                     ~DS_FLAG_INCONSISTENT;
3173 
3174                 drc->drc_newsnapobj =
3175                     dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
3176 
3177                 dsl_dataset_rele(origin_head, FTAG);
3178                 dsl_destroy_head_sync_impl(drc->drc_ds, tx);
3179 
3180                 if (drc->drc_owner != NULL)
3181                         VERIFY3P(origin_head->ds_owner, ==, drc->drc_owner);
3182         } else {
3183                 dsl_dataset_t *ds = drc->drc_ds;
3184 
3185                 dsl_dataset_snapshot_sync_impl(ds, drc->drc_tosnap, tx);
3186 
3187                 /* set snapshot's creation time and guid */
3188                 dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
3189                 dsl_dataset_phys(ds->ds_prev)->ds_creation_time =
3190                     drc->drc_drrb->drr_creation_time;
3191                 dsl_dataset_phys(ds->ds_prev)->ds_guid =
3192                     drc->drc_drrb->drr_toguid;
3193                 dsl_dataset_phys(ds->ds_prev)->ds_flags &=
3194                     ~DS_FLAG_INCONSISTENT;
3195 
3196                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
3197                 dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT;
3198                 if (dsl_dataset_has_resume_receive_state(ds)) {
3199                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3200                             DS_FIELD_RESUME_FROMGUID, tx);
3201                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3202                             DS_FIELD_RESUME_OBJECT, tx);
3203                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3204                             DS_FIELD_RESUME_OFFSET, tx);
3205                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3206                             DS_FIELD_RESUME_BYTES, tx);
3207                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3208                             DS_FIELD_RESUME_TOGUID, tx);
3209                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3210                             DS_FIELD_RESUME_TONAME, tx);
3211                 }
3212                 drc->drc_newsnapobj =
3213                     dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj;
3214         }
3215         /*
3216          * Release the hold from dmu_recv_begin.  This must be done before
3217          * we return to open context, so that when we free the dataset's dnode,
3218          * we can evict its bonus buffer.
3219          */
3220         dsl_dataset_disown(drc->drc_ds, dmu_recv_tag);
3221         drc->drc_ds = NULL;
3222 }
3223 
3224 static int
3225 add_ds_to_guidmap(const char *name, avl_tree_t *guid_map, uint64_t snapobj)
3226 {
3227         dsl_pool_t *dp;
3228         dsl_dataset_t *snapds;
3229         guid_map_entry_t *gmep;
3230         int err;
3231 
3232         ASSERT(guid_map != NULL);
3233 
3234         err = dsl_pool_hold(name, FTAG, &dp);
3235         if (err != 0)
3236                 return (err);
3237         gmep = kmem_alloc(sizeof (*gmep), KM_SLEEP);
3238         err = dsl_dataset_hold_obj(dp, snapobj, gmep, &snapds);
3239         if (err == 0) {
3240                 gmep->guid = dsl_dataset_phys(snapds)->ds_guid;
3241                 gmep->gme_ds = snapds;
3242                 avl_add(guid_map, gmep);
3243                 dsl_dataset_long_hold(snapds, gmep);
3244         } else {
3245                 kmem_free(gmep, sizeof (*gmep));
3246         }
3247 
3248         dsl_pool_rele(dp, FTAG);
3249         return (err);
3250 }
3251 
3252 static int dmu_recv_end_modified_blocks = 3;
3253 
3254 static int
3255 dmu_recv_existing_end(dmu_recv_cookie_t *drc)
3256 {
3257 #ifdef _KERNEL
3258         /*
3259          * We will be destroying the ds; make sure its origin is unmounted if
3260          * necessary.
3261          */
3262         char name[ZFS_MAX_DATASET_NAME_LEN];
3263         dsl_dataset_name(drc->drc_ds, name);
3264         zfs_destroy_unmount_origin(name);
3265 #endif
3266 
3267         return (dsl_sync_task(drc->drc_tofs,
3268             dmu_recv_end_check, dmu_recv_end_sync, drc,
3269             dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL));
3270 }
3271 
3272 static int
3273 dmu_recv_new_end(dmu_recv_cookie_t *drc)
3274 {
3275         return (dsl_sync_task(drc->drc_tofs,
3276             dmu_recv_end_check, dmu_recv_end_sync, drc,
3277             dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL));
3278 }
3279 
3280 int
3281 dmu_recv_end(dmu_recv_cookie_t *drc, void *owner)
3282 {
3283         int error;
3284 
3285         drc->drc_owner = owner;
3286 
3287         if (drc->drc_newfs)
3288                 error = dmu_recv_new_end(drc);
3289         else
3290                 error = dmu_recv_existing_end(drc);
3291 
3292         if (error != 0) {
3293                 dmu_recv_cleanup_ds(drc);
3294         } else if (drc->drc_guid_to_ds_map != NULL) {
3295                 (void) add_ds_to_guidmap(drc->drc_tofs,
3296                     drc->drc_guid_to_ds_map,
3297                     drc->drc_newsnapobj);
3298         }
3299         return (error);
3300 }
3301 
3302 /*
3303  * Return TRUE if this objset is currently being received into.
3304  */
3305 boolean_t
3306 dmu_objset_is_receiving(objset_t *os)
3307 {
3308         return (os->os_dsl_dataset != NULL &&
3309             os->os_dsl_dataset->ds_owner == dmu_recv_tag);
3310 }